Endpoint Security Part 2: Enriching your security event data

Data enrichment with Elastic

In this second part series on 'Endpoint Security' I'm going to delve into how you can enrich your security event data to provide further context to assist your security analysts when investigating incidents.

In this example we will lead off where we ended from 'Endpoint Security Part 1'  and so if you were following the previous article you should now be collecting events from your core windows systems and pumping them into Elastic.   The questions we are going to ask and be answering and that provide contextual awareness to our existing event data concern:

  1. Who is the user? how important are they in the organisation. For example are they a senior executive such as the CEO, CIO or CFO.  Maybe they are a system admin responsible for the companies core business applications.
  2. What business unit do they belong to and who is their manager? Understanding what department or business unit a person belongs to as well as who their manager is provides important context around why this person or department may be a target and can be used for further trending.
  3. Where are they located? In a smaller office this might be fairly easy to answer but in an organisation with multiple office locations, multiple floors and a few thousand staff this may not be that easy.

For questions 1 and 2 we are going to be using Active Directory information to collect further attributes about the user and merge this with the existing event data.

For question 3 we will be collecting IP address information and mapping that to office location and combining that with GEO Location information and creating a custom MaxMind GeoIP database which we can query on each event to add relevant IP location information.

So let's dive in!

Active Directory Enrichment

There are a few ways you can get Active Directory information into Elastic, you could:

  1. Dump out all user information from active directory into a csv file and then ingest that data into a new index.  Then query that index for each event or relevant windows events and populate the event with the meta data you want from active directory.
  2.  Using Logstash create a Translate filter and map out the relevant objects and attributes you want to pull from AD.
  3.  Use an ingest pipeline and enrich processor to enrich your data.

For this example I'm going to use option 3 as this is the new way of enriching data in Elastic and a lot more efficient than the other 2 methods.

Step 1: Create our AD ingest template

Step 2: Populate our AD index with some data

Step 3: Create our AD enrich policy

Step 4: Create our AD ingest pipeline

Step 5: Test our AD ingest pipeline with dummy data.

GeoIP Enrichment

Enriching IP information for Public IP's is fairly easy and there are numerous IP GEO lookup services or databases such as iplocation.net and ip2location.com

However these services won't provide information for your private RFC 1918 address space.  So you can use either Elastic or Logstash to build your own custom IP database but this is not a trivial exercise and will require you to learn new tools and build out and maintain your own custom Max Mind database.

So for this example we will use a simpler method and use the 'enrich processor' and will be going down to the Country, City and Floor level for each office.

Step One - Create our private GeoIP Template

PUT my_private_geoips
{
"mappings": {
"properties": {
"city_name": {
"type": "keyword"
},
"continent_name": {
"type": "keyword"
},
"country_iso_code": {
"type": "keyword"
},
"country_name": {
"type": "keyword"
},
"location": {
"type": "geo_point"
},
"source.ip": {
"type": "ip"
},
"floor_level": {
"type": "keyword"
}
}
}
}

Step 2 - Populate the index my_private_geopips with some data.

For this example I will be using the following data:

Sydney: 172.16.1.0/20 (3 floors different subnet per floor)

Melbourne: 172.16.16.0/20 (5 floors different subnet per floor)

Perth: 172.16.32.0/20 (1 floor)

Brisbane: 172.16.48.0/20 (2 floors different subnet per floor)

Canberra: 172.16.64.0/20 (3 floors different subnet per floor)

POST my_private_geoips/_bulk
{"index":{"_id":"sydney-australia"}}
{"city_name":"Sydney","continent_name":"Australia","country_iso_code":"AU","country_name":"Australia","location":[-33.866676, 151.208043],"source.ip":["172.16.1.10"],"floor_level":"Level 17"}
{"city_name":"Sydney","continent_name":"Australia","country_iso_code":"AU","country_name":"Australia","location":[-33.866676, 151.208043],"source.ip":["172.16.2.20"],"floor_level":"Level 18"}
{"city_name":"Sydney","continent_name":"Australia","country_iso_code":"AU","country_name":"Australia","location":[-33.866676, 151.208043],"source.ip":["172.16.3.12"],"floor_level":"Level 19"}

{"index":{"_id":"melbourne-australia"}}
{"city_name":"Melbourne","continent_name":"Australia","country_iso_code":"AU","country_name":"Australia","location":[-37.826163, 144.9581847],"source.ip":["172.16.16.10"],"floor_level":"Level 5"}
{"city_name":"Melbourne","continent_name":"Australia","country_iso_code":"AU","country_name":"Australia","location":[-37.826163, 144.958184],"source.ip":["172.16.17.12"],"floor_level":"Level 6"}
{"city_name":"Melbourne","continent_name":"Australia","country_iso_code":"AU","country_name":"Australia","location":[-37.826163, 144.958184],"source.ip":["172.16.18.20"],"floor_level":"Level 7"}
{"city_name":"Melbourne","continent_name":"Australia","country_iso_code":"AU","country_name":"Australia","location":[-37.826163, 144.9581847],"source.ip":["172.16.19.100"],"floor_level":"Level 8"}
{"city_name":"Melbourne","continent_name":"Australia","country_iso_code":"AU","country_name":"Australia","location":[-37.826163, 144.958184],"source.ip":["172.16.20.50"],"floor_level":"Level 9"}

{"index":{"_id":"perth-australia"}}
{"city_name":"Perth","continent_name":"Australia","country_iso_code":"AU","location":[-31.9522400, 115.8614000],"country_name":"Austrlia","region_name":"","source.ip":["172.16.32.10"],"floor_level":"Level 3"}

{"index":{"_id":"brisbane-australia"}}
{"city_name":"Brisbane","continent_name":"Australia","country_iso_code":"AU","country_name":"Australia","location":[-27.4679400,153.0280900],"source.ip":["172.16.48.5"],"floor_level":"Level 10"}
{"city_name":"Brisbane","continent_name":"Australia","country_iso_code":"AU","country_name":"Australia","location":[-27.4679400,153.0280900],"source.ip":["172.16.50.10"],"floor_level":"Level 12"}

{"index":{"_id":"canberra-australia"}}
{"city_name":"Canberra","continent_name":"Australia","country_iso_code":"AU","country_name":"Australia","location":[-35.2834600,149.1280700],"source.ip":["172.16.64.89"],"floor_level":"Level 6"}
{"city_name":"Canberra","continent_name":"Australia","country_iso_code":"AU","country_name":"Australia","location":[-35.2834600,149.1280700],"source.ip":["172.16.65.100"],"floor_level":"Level 7"}
{"city_name":"Canberra","continent_name":"Australia","country_iso_code":"AU","country_name":"Australia","location":[-35.2834600,149.1280700],"source.ip":["172.16.66.72"],"floor_level":"Level 8"}

Step 3 - Create an Enrich Policy

All of our documents that we store in the index my_private_geoips must have all the fields mentioned in the enrich_fields array within our policy as follows:

PUT _enrich/policy/my_private_geoips_policy
{
"match": {
"indices": "my_private_geoips",
"match_field": "source.ip",
"enrich_fields": ["city_name", "continent_name", "country_iso_code", "country_name", "location", "floor_level"]
}
}

When we first create our policy or when we make changes to our index we need to re-execute our policy.

POST /_enrich/policy/my_private_geoips_policy/_execute

Step 4 - Create the ingest pipeline

PUT /_ingest/pipeline/my_private_geoips
{
"description": "_description",
"processors": [
{
"dot_expander": {
"field": "source.ip"
}
},
{
"enrich": {
"policy_name": "my_private_geoips_policy",
"field": "source.ip",
"target_field": "geo",
"max_matches": "1"
}
},
{
"script": {
"lang": "painless",
"source": "ctx.geo.remove('source.ip')"
}
}
]
}

Then lastly lets's test the pipeline!

POST /_ingest/_pipeline/my_private_geoips/_simulate
{
"docs": [
{
"_source": {
"source.ip": "172.16.1.10"
}
},
{
"_source": {
"source.ip": "172.16.64.89"
}
}
]
}

That's it! Hopefully you found that useful and if you need help with enriching your security data or anything else cyber related please get in touch!

For more info on GeoIP go here

Leave a Comment