04 November 2018

Enriching Logs With Active Directory Attributes

Date of writing: 4th November 2018
Last edit: 4th November 2018

JUST GIVE ME THE SCRIPT AND CONFIG


If all you need are the logstash configuration and script to generate the translate data, go here:

https://github.com/kevinwilcox/python-elk

It is in the directory called "crawl_ad_ldap"!

Everyone Else 8^)


I recently tweeted about something that popped up in my day job and promised a bit more detail in a blog post. It has sort of worked out "to the good" because I've mentioned wanting to cover enrichment with AD info before but this really pushed that into overdrive...so here we are!

The Problem


When I talk to folks about what they're logging, and why they want to use something like Elastic, Splunk, GrayLog, etc., there are a few log types that regularly pop up.  It seems like everyone wants to pull login logs, be they 4624 events in Windows or SSH logins from the *nix world (I know, OpenSSH has been ported to Windows).  Both of these use pretty well-defined formats -- even though openssh logs to auth.log (generally) and uses syslog (generally), the format of an SSH success or failure is pretty consistent.

For the purpose of this post, we're going to focus on one field that's common to both Windows 4624 events and SSH authentications - the username field.

Let's make things a bit "real life".  Think about the scenario where you want to search for every login to a system in Human Resources, or for every login by a user in Finance and Payroll.  You would need to pull every login over <x> time period, do an OU lookup for every workstation name or username and then discard anything not in the OU you care about, or you'd need to pull a list of each user in those specific OUs and look for logins for those users. Those methods are pretty traditional but I was 100% sure there's a better way using modern tools (specifically, using my SIEM of choice - Elastic).

Method One - Elasticsearch


My initial solution was to use python to crawl Active Directory and LDAP for all computer and user objects (and the properties I need), cache that data locally in Elasticsearch and then query for the relevant data each time logstash parses or extracts a username field.  By doing that I can somewhat normalise all of my login logs - and then it doesn't matter if I know what all of the OUs or groups are in my organisation, or who is a member of which one, as long as I have mostly-current information from AD and LDAP.

I figured I already did this with OUI info for DHCP logs and Elasticsearch was performing great so doing it for username fields shouldn't be a big issue, right?  I'd just finished writing the python to pull the data and was working on the filter when I had a chat with Justin Henderson, author of the SANS SEC555 course, and he completely changed my approach.

Method Two - Translate


Justin recommended I try the translate filter.  I already used the filter for login logs to add a field with the description for each login type so I wasn't completely new to using it but I had never used it with a dictionary file.  After chatting with him and reading the documentation for a bit, I realised I could create a dictionary file that looked like this:

username_0:
  distinguished_name: username_0, ou=foo, ou=foo2, ou=org, ou=local
  member_of:
    - cn=foo
    - cn=another_foo
    - cn=yet_another_foo
username_1:
  distinguished_name: username_1, ou=foo3, ou=foo4, ou=org, ou=local
  member_of:
    - cn=foo3

Then any time logstash see username_0 in the "username" field, it can add the distinguished_name and member_of fields to the metadata for that log.

The operative word being "could"...I'd still have a bit of work to go from that idea to something I could roll out to my logstash nodes, especially since they run on BSD and Linux where there isn't exactly an option to run "get-aduser".

A First Pass


The AD controller I used for this post is configured to use osg.local as its domain. I've added an OU called "osg_users" and that OU has two users, "testuser" and "charlatan".

"charlatan" is also a member of a group called "InfoSec" and "testuser" is member of a group called "Random Group".

First, I needed to setup a configuration file so I could put directory-specific information in there. The one I wrote for this blog is named conn_info.py and looks like this:

ad_server = "IP_of_AD_controller"
ad_search_user = "osg\charlatan"
ad_search_pass = 'charlatans_password'
ad_search_base = "ou=osg_users,dc=osg,dc=local"
ad_search_props = ['displayname', 'distinguishedname', 'memberof']

Ideally the charlatan user/password would be a service account but it works for this example.

You may assign any manner of properties to the users in your AD or LDAP but displayName, distinguishedName and memberOf are practically universal.

The next step was a script that could connect to and interrogate my AD server.

A really basic script using the ldap3 module and my conn_info.py file might look like this:

#!/usr/bin/env python
from ldap3 import Server, Connection, ALL, NTLM
import conn_info
server = Server(conn_info.ad_server, get_info=ALL)
conn = Connection(server, conn_info.ad_search_user, conn_info.ad_search_pass, auto_bind=True, auto_range=True)
print("starting search...")
try:
  searchString = "(objectclass=user)"
  conn.search(conn_info.ad_search_base, searchString, attributes=conn_info.ad_search_props, paged_size = 500)
  for entry in conn.entries:
    print(str(entry))
  cookie = conn.result['controls']['1.2.840.113556.1.4.319']['value']['cookie']
  while(cookie):
    print('receiving next batch')
    conn.search(conn_info.ad_search_base, searchString, attributes=conn_info.ad_search_props, paged_size = 500, paged_cookie = cookie)
    for entry in conn.entries:
      print(str(entry))
    cookie = conn.result['controls']['1.2.840.113556.1.4.319']['value']['cookie']
    print('searches finished')
except Exception as e:
  print("error")
  print(e)
exit("exiting...")

Let's step through that a bit. ldap3 is a fantastic module for interrogating AD and LDAP and it works with both python 2 and python 3. Importing conn_info makes sure we can read variables (I know, constants, and they should be in all caps) from the conn_info.py file. The next few lines connect to the AD server; the auto_bind option saves a step by binding to the server after connecting as the specified user.

Unless the directory configuration is modified, most objects in AD will probably be of either type "user" or type "computer".

"paged_size" is an interesting attribute in that it will limit the results to 500 at a time, then the cookie is used to paginate through to the last result set (hence the "for" loop).

This script doesn't produce output that's useful to logstash but it is really useful to us as people. For my example DC, I get this:

(ldap_post) test@u18:~/ad_ldap_post$ python test.py
starting search...
DN: CN=Just A. Charlatan,OU=osg_users,DC=osg,DC=local - STATUS: Read - READ TIME: 2018-10-15T00:32:44.624881
    displayName: Just A. Charlatan
    distinguishedName: CN=Just A. Charlatan,OU=osg_users,DC=osg,DC=local
    memberOf: CN=InfoSec,DC=osg,DC=local
DN: CN=Testing User,OU=osg_users,DC=osg,DC=local - STATUS: Read - READ TIME: 2018-10-15T00:32:44.625089
    displayName: Testing User
    distinguishedName: CN=Testing User,OU=osg_users,DC=osg,DC=local
    memberOf: CN=Random Group,DC=osg,DC=local
exiting...

Notice how similar this output is to doing:

get-aduser -searchbase "ou=osg_users,dc=osg,dc=local" -filter * -properties displayname, distinguishedname, memberof

Making it More Useful


The real goal is to have something I can read into logstash. The translate module can read CSV, JSON and YAML and, for this purpose, in my opinion the cleanest to read is YAML. Instead of using a library to convert between object types and output YAML, I'm just going to output the YAML directly.

A sample function to make it easy to build the output might look like:

def format_output(user_entry):
  entry_string = str(user_entry['samaccountname']) + ":\n"
  entry_string += "  dn: " + str(user_entry['distinguishedname']) + "\n"
  entry_string += "  displayname: " + str(user_entry['displayname']) + "\n"
  entry_string += "  memberof: " + "\n"
  for i in user_entry['memberof']:
    entry_string += "    - " + str(i) + "\n"
  return entry_string

With a couple of minor edits, my output now looks like this:

charlatan:
  dn: CN=Just A. Charlatan,OU=osg_users,DC=osg,DC=local
  displayname: Just A. Charlatan
  memberof:
    - CN=InfoSec,DC=osg,DC=local
testuser:
  dn: CN=Testing User,OU=osg_users,DC=osg,DC=local
  displayname: Testing User
  memberof:
    - CN=Random Group,DC=osg,DC=local

It's important to have memberof output like that as default because as soon as you add users to two or more groups, that's how YAML parsers will expect them to be formatted.

The last step in the script is to write to file instead of outputting to the display. Stripping out the print statements and using file output instead, my script now looks like this:

#!/usr/bin/env python
def format_output(user_entry):
  entry_string = str(user_entry['samaccountname']) + ":\n"
  entry_string += "  dn: " + str(user_entry['distinguishedname']) + "\n"
  entry_string += "  displayname: " + str(user_entry['displayname']) + "\n"
  entry_string += "  memberof: " + "\n"
  for i in user_entry['memberof']:
    entry_string += "    - " + str(i) + "\n"
  return entry_string
from ldap3 import Server, Connection, ALL, NTLM
import conn_info
server = Server(conn_info.ad_server, get_info=ALL)
conn = Connection(server, conn_info.ad_search_user, conn_info.ad_search_pass, auto_bind=True, auto_range=True)
try:
  out_string = ""
  searchString = "(objectclass=user)"
  conn.search(conn_info.ad_search_base, searchString, attributes=conn_info.ad_search_props, paged_size = 500)
  for entry in conn.entries:
    out_string = out_string + format_output(entry)
  cookie = conn.result['controls']['1.2.840.113556.1.4.319']['value']['cookie']
  while(cookie):
    conn.search(conn_info.ad_search_base, searchString, attributes=conn_info.ad_search_props, paged_size = 500, paged_cookie = cookie)
    for entry in conn.entries:
      out_string = out_string + format_output(entry)
    cookie = conn.result['controls']['1.2.840.113556.1.4.319']['value']['cookie']
  out_fh = open('ad_users_file.yml', 'w')
  out_fh.write(out_string)
  out_fh.close()
except Exception as e:
  print("error: " + e)
exit()

When I run it, I have a new file created called "ad_users_file.yml". It looks like this:

charlatan:
  dn: CN=Just A. Charlatan,OU=osg_users,DC=osg,DC=local
  displayname: Just A. Charlatan
  memberof:
    - CN=InfoSec,DC=osg,DC=local
testuser:
  dn: CN=Testing User,OU=osg_users,DC=osg,DC=local
  displayname: Testing User
  memberof:
    - CN=Random Group,DC=osg,DC=local

This is ideal for logstash to use as a dictionary - now I can tell it if it ever sees "charlatan" in a field called "user" then add the information in my dictionary to those log files.

A Sample Logstash Config


Instead of testing with a Windows log or an OpenSSH log, I can have a really simple logstash "testing" config file that takes in user input in JSON format, looks up the information in a dictionary when it sees a field called "user" then sends the resulting object to the display.

That "simple" configuration for logstash, which I'm naming ls_test.conf,  could look like this:

input { stdin { codec => json } }
filter {
  if [user] {
    translate {
      field                       => "user"
      destination             => "from_ad"
      dictionary_path     => "ad_users_file.yml"
      refresh_behaviour => "replace"
    }
  }
}
output { stdout { codec => rubydebug } }

Logstash can be started with a specific config file with:

sudo /usr/share/logstash/bin/logstash -f ls_test.conf

Once it's ready for input, if I use the following two objects then I get a good test of what happens if logstash both does and does not find my object in the dictionary:

{"user":"charlie"}
{"user":"charlatan"}

When I use them, I get the following output:

{"user":"charlie"}
{
      "@version" => "1",
          "user" => "charlie",
    "@timestamp" => 2018-10-15T02:10:06.240Z,
          "host" => "u18"
}
{"user":"charlatan"}
{
      "@version" => "1",
          "user" => "charlatan",
    "@timestamp" => 2018-10-15T02:10:14.579Z,
          "host" => "u18",
       "from_ad" => {
                 "dn" => "CN=Just A. Charlatan,OU=osg_users,DC=osg,DC=local",
        "displayname" => "Just A. Charlatan",
           "memberof" => [
            [0] "CN=InfoSec,DC=osg,DC=local"
        ]
    }
}

This is exactly what I'd expect to see. Having everything under the "from_ad" field may not be ideal, I can always use a mutate statement to move or rename any field to a more "reasonable" or "usable" place.

Wrapping Up


This is an enrichment technique that I really like and that I recommend implementing anywhere possible. It's also really flexible! I know I've written about it in the context of Active Directory but it's really in the context of any LDAP - Active Directory just happens to be the one folks tend to be familiar with these days. Want to search something by username? Great, here you go! Need to search the rest of your logs and limit it to only members of a given OU? No more waiting for your directory to return all of those objects and search for those, no more retrieving all of your logs and waiting while your SIEM looks for them one-at-a-time in a lookup table *at search time* with potentially out-of-date information - it's all added when the log is cut with the most current information at the time!

14 July 2018

Enriching Domain Names with Frequency Analysis and ASN Information

I saw a really interesting question on Twitter today.  Someone asked if there were an IP/whois plugin for ELK so that folks could add ASN information to log data.  I thought that question was just perfect because I've spent a lot of time lately talking to folks about the importance of log enrichment and how to do it with logstash (for example, see the SANS webcast Justin Henderson and I did: https://www.sans.org/webcasts/high-fidelity-alerts-context-context-107870).

Since the question was specifically about ASN information, we're going to take a look at the "geoip" filter to get information about an IP.  Since I like DNS enrichment, I'm going to use the "dns" filter to get information about a hostname and then we're going to look at a Mark Baggett tool called domain_stats to get whois information - specifically, the creation date of the domain.  Since it runs as a web service, you can use logstash's "rest" filter to query it from inside your configuration!

That got me thinking about another Mark Baggett tool called "freq_server".  Have you ever looked at a domain name and thought, "there's no way a human being created that, it had to be an algorithm..."? Generally speaking, in the English language, some letters come after others a LOT more frequently than some other combinations.  For example, in English, if we see the letter 'Q' then it is followed by the letter 'U' much more frequently than it is followed by the letter 'J'.  If I see a domain name that has 'qj' in it then that is much more interesting (*very* generally speaking) than a domain name that has 'qu' in it.  Mark's tool looks at those tuples and returns a score based on how frequently the letter combinations in a given word fit with the English literature used to "train" it - and it runs as a web service, too, so we can query it with logstash's "rest" filter as well!

The Environment


My environment is Ubuntu Server 18.04 LTS with logstash 6.3.1 installed per the instructions here:


The "rest" filter is not installed by default but it is available for the latest version of logstash.  You can install it with:

sudo /usr/share/logstash/bin/logstash-plugin install logstash-filter-rest

Both of Mark's tools are written with python2.7 in mind so I've installed python2.7 and pip via apt:

sudo apt install python python-pip

And then I've added python-whois via pip:

sudo pip install python-whois

One quick word: ordinarily I would use something like virtualenv for this.  There is absolutely no reason not to do that right now other than I'm destroying this VM when I'm done tonight.

Getting domain_stats and freq_server


Both of the tools I'm using for enrichment, domain_stats.py and freq_server.py, are available from Mark's GitHub:


I have the latest version of each from GitHub using "git clone <above_url>", so I have a directory called domain_stats and another called freq.

The easiest way to run domain_stats.py (remember, it starts a web server!!) is to cd to the domain_stats directory and just run it as a backgrounded python process.

cd domain_stats
python domain_stats.py 10000 &

This starts domain_stats on port 10000.

I'm going to do the same thing for freq_server.py except I want it on port 20000. It doesn't matter which port you run them on (as long as it's a free port above 1024!). In production I would use a service and set the owner/group to logstash but for this demo, it's sufficient. Notice that freq_server takes a frequency table as its final option -- Mark provides one at the GitHub page!

cd freq
python freq_server.py 20000 freqtable2018 &

You should get something saying the server is ready and an example of the URL to use to query it.

First Logstash Config


I am starting with a VERY simple logstash config file, called "test.conf", in my local directory. My initial config looks like this:

input { stdin { codec => json } }
filter {
  if [message] == "" {
    drop { }
  }
}
output { stdout { codec => rubydebug } }

Basically this lets me type in a JSON object as input and it parses the fields and prints it as a formatted JSON object. The filter block lets me hit enter a few times without throwing a parse error or printing an object for null input by telling logstash to just drop those events instead of displaying them.

I'm starting logstash with:

sudo /usr/share/logstash/bin/logstash -f test.conf

For the entire blog post, I'm going to use a simple example log entry of:

{"host":"google.co.uk"}

With my existing logstash configuration, I get something like this:


Notice that because I told logstash to expect JSON as input, and because I gave it a JSON object as input, it parsed out the "host" field for me.  This will be really important later.

The First Enrichment: DNS


The very first thing we need to do is go from a hostname to an IP address.  In a "real" environment you'd have something like passive DNS information or netflow so you may already have an interesting IP address.  I still like letting my enrichment boxes do DNS lookups, though, because I find it interesting if I see one domain looked up ten times in two minutes and the results are for vastly different ASNs (warning warning warning!! that could be malware using fast flux DNS!).

To use the filter, I'm going to add a "filter { }" section to my test.conf file that looks like this:

filter {
  if [host] {
    dns {
      resolve => "host"
      action => "append"
    }
  }
}

This will do a lookup for whatever is in the "host" field and then *append* it to the host field, effectively turning it into an array.  If I use my new config, my result looks something like this:


Move the IP With Mutate


I think that's a little difficult to deal with later because I don't like arrays in my config files.  The easiest way to solve that wee issue is to use the "mutate" filter to rename fields into something more useful.  I want to break the IP address into its own field, resolved_ip, and keep the hostname in its own field, host.  With a little work, my filter now looks like this:

filter {
  if [host] {
    dns {
      resolve => "host"
      action => "append"
    }
    mutate {
      rename => {
        "[host][1]" => "resolved_ip"
        "[host][0]" => "host"
      }
    }
  }
}

When I run the new config, I get this:


Much easier to read (and definitely easier to use later!!).

Next Enrichment: geoip


Now that I have an IP address, I can answer the question that prompted this post - which ASN is assigned the IP in question!

There are two ways to use the built-in geoip filter.  The first adds city/state/country information, the second adds ASN information.  You can't add these with a single block, you have to call geoip twice, so I'm going to add a new filter block that looks for "resolved_ip" and, if its present, calls geoip twice.

filter {
  if [resolved_ip] {
    geoip {
      source => "resolved_ip"
      target => "geoip_city"
      default_database_type => "City"
    }
    geoip {
      source => "resolved_ip"
      target => "geoip_asn"
      default_database_type => "ASN"
    }
  }
}
This adds a *considerable* amount of information to the log event!


The most important fields, generally speaking, are the geoip_asn[asn] and geoip_asn[as_org] fields.  Microsoft, for example, may have some 20 million IPs across multiple CIDR blocks, but I don't need to know what each block is - I just need to know to look for "Microsoft" as the as_org.  With mutate I could rename any of these fields to make searching easier.

Using REST With domain_stats.py


In day-to-day Internet usage, it is rare to see people visiting domains that are "new".  Sure, the occasional start-up makes a name for themselves or a site takes off practically overnight but that is not the norm.  If I see domains that are only a few days or weeks old, I find that *REALLY* interesting.  It may not be the sole reason to alert on a given domain but it certainly can contribute towards me taking a second look at one.

Now that I have domain_stats running, I can query it for information using the "rest" filter.  To do so, I'll add the following filter block to my test.conf:

filter {
  if [host] {
    rest {
      request => {
        url => "http://localhost:10000/domain/creation_date/%{host}"
      }
      sprintf => true
      json => false
      target => "creation_date"
    }
  }
}

This tells it to query the service running on port 10000 on the local machine using the URL that tells domain_stats to return the creation date for the hostname in [host].  It then stores that information in "creation_date":


Note the addition of the "creation_date" field - Google's UK domain has been around for quite a while, almost 20 years!

Final Enrichment: Using REST With freq_server


The final enrichment I'll add is the frequency analysis.  This assigns a numeric value between 1 and 35 (configurable) depending on how "randomly" the letter sequences appear - the smaller the number, the more likely the domain is the product of a Domain Generation Algorithm.  Because it uses the rest filter, the final "filter" block to be added will look a lot like the block for domain_stats:

filter {
  if [host] {
    rest {
      request => {
        url => "http://localhost:20000/measure/%{host}"
      }
      sprintf => true
      json => false
      target => "freq_score"
    }
  }
}

With that added, my final output looks like this:


The "freq_score" value has two different values based on a recent code update and using two different algorithms.  You can use whichever value you prefer but be consistent with whether you want the first or second one.  I'll leave it to you to determine how to extract the values but grok, dissect or ruby would be viable starting points...

Wrapping Up


I really like the performance of Elasticsearch but for me, the *best* thing about the Elastic stack is logstash.  To be able to take a single domain name or IP address and produce a log event, in real time, that has whois, GeoIP, passive DNS and anything else you can think of, is a MASSIVE win in my opinion.

To make things easier, here is the full content of my config file:

input { stdin { codec => json } }
filter {
  if [message] == "" {
    drop { }
  }
}
filter {
  if [host] {
    dns {
      resolve => "host"
      action => "append"
    }
    mutate {
      rename => {
        "[host][1]" => "resolved_ip"
        "[host][0]" => "host"
      }
    }
  }
}
filter {
  if [resolved_ip] {
    geoip {
      source => "resolved_ip"
      target => "geoip_city"
      default_database_type => "City"
    }
    geoip {
      source => "resolved_ip"
      target => "geoip_asn"
      default_database_type => "ASN"
    }
  }
}
filter {
  if [host] {
    rest {
      request => {
        url => "http://localhost:10000/domain/creation_date/%{host}"
      }
      sprintf => true
      json => false
      target => "creation_date"
    }
  }
}
filter {
  if [host] {
    rest {
      request => {
        url => "http://localhost:20000/measure/%{host}"
      }
      sprintf => true
      json => false
      target => "freq_score"
    }
  }
}
output { stdout { codec => rubydebug } }

21 January 2018

SIEM From Scratch: Custom Data With Python

This was a bit of an unexpected post for me to make. When I laid out all of the "SIEM From Scratch" posts I wanted to do, I fully expected to use use filebeat and syslog to get data from endpoints. If I have programs (or scripts...) that query for data via an API, such as my previous post about getting logs from Google, I typically run those on a schedule via cron and then import that data into ELK via filebeat. I can write each login event as a JSON object so, really, why would I NOT do that?

Then this week I was at work and was bitten, a bit harder than I found comfortable, by an inode re-use issue with Linux and filebeat and that got me to thinking (I know, how problematic is *that*?!)...

For the impatient curious - the ultimate solution was to create the new file and have filebeat pick it up, then delete it after <x> number of minutes so that it drops out of the filebeat registry before the new file is created. This lets me avoid fiddling with filebeat options and I can control it all from my script (or with cron).

The Issue - inode Re-Use


Before I go into a possible solution, allow me to outline the problem. Filebeat is amazing for watching text files and getting them into the Elastic stack - it has all the TLS and congestion-control bells and whistles, plus it lets you do some interesting things on the client if you're reading JSON files. To make sure it gets your data (and doesn't get your data twice), it has a registry file that makes note of which byte it just read in your file. To know that it's your file, it also keeps track of the name, the inode and the device on which that file resides.

The important part here is the concept of an inode.

Without getting too "into the weeds", an "inode" is a way to address where a file starts on a hard drive. An analogy would be your home. If you want to invite someone over, you give them your address. The equivalent for a file or directory may be its path -- "/var/log/syslog" or "C:\Windows\system32", for example.

However, you could ALSO give them your GPS coordinates. Even if your address changes, your GPS coordinates would remain the same. The equivalent for a file in POSIX-compliant operating systems is an inode number. Just like giving someone your GPS coordinates is kind of a pain, trying to remember inode numbers is a bit of a pain. This is why we don't really use them in day-to-day work but we use filenames and paths all the time. One is really easy for people to remember, one is really problematic.

If the town/city/village/whatever renames your street then your address will change, even though your home didn't move - therefore your GPS coordinates won't change. Likewise, if I rename my file from "my_file.txt" to "my_file2.txt", I'm just changing the path and the inode number remains the same. Here is the output of "stat" on FreeBSD. The second column is the inode number:


Here's where it gets really interesting. If I have a file called "my_file.txt", delete it and create a new file called "my_file.txt", the inode number may be the same - even though it's a completely new file with new information. Again, here is the output of doing that and of the 'stat' command:


Notice the inode number is the same for both files. If I have a program, like filebeat, that knows to watch "my_file.txt" with inode '348828', it may read the content of the original file ("hi") and store that it read <n> bytes. Then, even though I deleted the file and have *completely new data* in my new file, because the inode number is the same, it will start reading again at <n + 1> bytes. That means I will have a log entry of "hi" - and then another that says " someone else". Now my SIEM has missing log data!

One Solution - Write Directly to Logstash


This brings me to the fun part of this post - what to do when you really don't need an intermediary and just want to log something directly to logstash. I think Solaris may have been the Last Great Unix in some ways but there isn't a filebeat for Solaris and nxlog (community edition) is a bit of a pain to compile and install. That's okay because you can have something pick up a logfile and send it *as JSON* to your SIEM. Can't get a log shipper approved for install on <x> system but you need output from your scripts? That's okay because you can send that output directly to your SIEM!

Step One - Logstash Needs to Listen For a TCP Connection


The first step is to setup a TCP listener in logstash. I have logstash 6.1.2 installed from Elastic's "apt" repo and a config file, "test.conf", with just enough in it to be able to test input/output. The "stdin" input type tells logstash to accept from "standard input", in this case my keyboard, and print to "standard output", in this case my monitor. The "rubydebug" code tells logstash to make the output pretty so it's easier for a human to read:


And when I run logstash manually with that configuration using:

sudo -u logstash /usr/share/logstash/bin/logstash --path.config test.conf

I can confirm it works by hitting "enter" a couple of times and then typing in some text (I just wanted to show that it grabs lines where the only input is the "enter" key):


Logstash has a multitude of possible input types. For example, I have some logstash servers that accept syslog from some devices, un-encrypted beats from others, encrypted beats from still others, raw data over UDP from even more and, finally, raw data from TCP from yet *another* set of devices. There are a LOT of possibilities!

NB: logstash will run as the logstash user, not root, so by default it can NOT bind to ports lower than 1024. Generally speaking you want to use high-numbered ports so things "Just Work"!

For this post, I want to tell logstash to listen for TCP connections on port 10001 and I want it to parse that input as JSON. To do this, I add a "TCP" input section. I've also added a comment line above each section telling whether it was in the above configuration so you can more easily see the added section:


Since I'm already logged into that VM, I'm going to go ahead and restart logstash with my custom configuration using the same sudo command as above:

sudo -u logstash /usr/share/logstash/bin/logstash --path.config test.conf

Since I still have the section for stdin as an input, logstash should start and give me a message saying it's waiting for input from "stdin":


Now I'm going to move over to my FreeBSD VM that has python installed to write my test script.

Step Two - The Scripting VM


My development VM is running FreeBSD 11 and the only things I have done to it are add a user named 'demo' and installed pkg, vim-lite and python3:

adduser demo
pkg install pkg
pkg install vim-lite
pkg install python3

After logging in, I made sure I could run "python3" and get an interpreter (the ">>>" means it is ready for me to start entering python code):


To exit the interpreter, I can either type "quit()" and hit the enter key or I can hold down the Control key and press 'd'.

With that done, it's time to write my script!

Step Three - Python, JSON and Sockets


I am a very beginner-level python programmer - I know just enough about it to read a little of it, so don't be intimidated if you're new to it. We're going to keep things as basic as possible.

First I need to tell python that I'm either going to read, parse or output JSON. It has a native module for JSON called, appropriately enough, 'json', and all I need to do is "import" it for my script to use it as a library. It also has a module called 'socket' that is used for network programming so I want to include that as well.

import json
import socket

Next I want to build an item to use for testing. I'm going to name it "sample_item" and it's going to have a single field called "user_name". Since my user on the VM is named 'demo', I'm going to use 'demo' as the value for the "user_name" field:

sample_item = { 'user_name': 'demo' }

That's kind of a JSON object but not quite standards-compliant. To make sure it is exactly what I want, I am going to use the json.dumps() function to force it to be converted. Logstash expects each JSON object to be delineated with a newline character ("\n") so I'm going to add one of those to the end and I'm going to save all of that into a new variable called "sample_to_send":

sample_to_send = json.dumps(sample_item) + "\n"

Now I need to create a network connection to my logstash server. I know that it has an IP address of 192.168.1.8 because I checked it earlier, and I know that logstash is listening for TCP connections on port 10001 because I configured it that way. To open that connection in python is a multi-step process. First, I create a socket item that I call "my_socket":

my_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

Then I'm going to tell it to connect to the logstash server:

my_socket.connect(("192.168.1.8", 10001))

If that doesn't give an error then I'm going to try to write the test object, sample_to_send, to the socket. With python 3.x, sendall() takes a *bytes-like* object, not a string, so I need to use the encode() function -- by default it will encode to 'utf-8' which is just fine:

my_socket.sendall(sample_to_send.encode())

Finally I'm going to close my socket and exit the script:

my_socket.close()
exit()

Since I can run all of that in the interpreter, it looks like this:


Notice I didn't get an error after the "sendall()" function. Moving back over to my logstash VM I see that I have received something:


192.168.1.7 is indeed the IP address of my FreeBSD VM and my JSON object has been parsed by logstash!

Step Four - Save A Script and Add Another Field


Running things inside the interpreter is pretty handy for testing small things but I want to run this repeatedly so I'm going to save it all in a script. At that point I can do interesting things like add error checking, send multiple items in a loop, etc.

I'm going to name the script "test.py" and it will be pretty much exactly what I ran in the interpreter but with some blank lines thrown in to break it up for readability:


What if I want to add an email address for my user? How would that look? Well, it's just another field in the "sample_item" object (it's called a dictionary but dictionaries are objects...):

sample_item = { 'user_name': 'demo', 'email_address': 'demo@my-great-domain.com' }

In my script it looks like this (notice I've put the new field on a new line - this is purely for readability):


If I modify my script so that "sample_item" looks like the above and run it with "python3 test.py", I should see a new JSON object come in with a field called "email_address":


Sure enough, there it is!

Wrapping Up


This script is in pretty poor shape - for starters, it has zero error handling and it isn't documented. In a production environment I would want to do both of those things, as well as write out any errors to an optional error log on the disk. For now, though, I think it is a good starting point for getting custom data from a script to logstash without needing to rely on additional software like filebeat or nxlog. Never again will I have to look at custom log data sitting on a Solaris server and think, "you know, I'd like to have that in my SIEM but I don't know an easy way to get it there..."

There are some additional benefits, too. I don't *HAVE* to use python to send my events over. I could just as easily have sent data via netcat (nc) or via any other language that supports writing to a network socket (or, for that matter, just writing to file!). Bash even supports doing things like:

echo '{"user_name":"demo"}' > /dev/tcp/192.168.1.8/10001

It does not even need to be bash on Linux - I've tested it on both Linux and FreeBSD, and it even works on Windows 10 using the Windows Subsystem for Linux.

A slightly better-documented version of the script, and the sample logstash configuration, can be found at:

https://github.com/kevinwilcox/python-elk

A New Year, A New Lab -- libvirt and kvm

For years I have done the bulk of my personal projects with either virtualbox or VMWare Professional (all of the SANS courses use VMWare). R...