I saw a really interesting question on Twitter today. Someone asked if there were an IP/whois plugin for ELK so that folks could add ASN information to log data. I thought that question was just perfect because I've spent a lot of time lately talking to folks about the importance of log enrichment and how to do it with logstash (for example, see the SANS webcast Justin Henderson and I did: https://www.sans.org/webcasts/high-fidelity-alerts-context-context-107870).
Since the question was specifically about ASN information, we're going to take a look at the "geoip" filter to get information about an IP. Since I like DNS enrichment, I'm going to use the "dns" filter to get information about a hostname and then we're going to look at a Mark Baggett tool called domain_stats to get whois information - specifically, the creation date of the domain. Since it runs as a web service, you can use logstash's "rest" filter to query it from inside your configuration!
That got me thinking about another Mark Baggett tool called "freq_server". Have you ever looked at a domain name and thought, "there's no way a human being created that, it had to be an algorithm..."? Generally speaking, in the English language, some letters come after others a LOT more frequently than some other combinations. For example, in English, if we see the letter 'Q' then it is followed by the letter 'U' much more frequently than it is followed by the letter 'J'. If I see a domain name that has 'qj' in it then that is much more interesting (*very* generally speaking) than a domain name that has 'qu' in it. Mark's tool looks at those tuples and returns a score based on how frequently the letter combinations in a given word fit with the English literature used to "train" it - and it runs as a web service, too, so we can query it with logstash's "rest" filter as well!
The Environment
My environment is Ubuntu Server 18.04 LTS with logstash 6.3.1 installed per the instructions here:
The "rest" filter is not installed by default but it is available for the latest version of logstash. You can install it with:
sudo /usr/share/logstash/bin/logstash-plugin install logstash-filter-rest
Both of Mark's tools are written with python2.7 in mind so I've installed python2.7 and pip via apt:
sudo apt install python python-pip
And then I've added python-whois via pip:
sudo pip install python-whois
One quick word: ordinarily I would use something like virtualenv for this. There is absolutely no reason not to do that right now other than I'm destroying this VM when I'm done tonight.
Getting domain_stats and freq_server
Both of the tools I'm using for enrichment, domain_stats.py and freq_server.py, are available from Mark's GitHub:
I have the latest version of each from GitHub using "git clone <above_url>", so I have a directory called domain_stats and another called freq.
The easiest way to run domain_stats.py (remember, it starts a web server!!) is to cd to the domain_stats directory and just run it as a backgrounded python process.
cd domain_stats
python domain_stats.py 10000 &
This starts domain_stats on port 10000.
I'm going to do the same thing for freq_server.py except I want it on port 20000. It doesn't matter which port you run them on (as long as it's a free port above 1024!). In production I would use a service and set the owner/group to logstash but for this demo, it's sufficient. Notice that freq_server takes a frequency table as its final option -- Mark provides one at the GitHub page!
cd freq
python freq_server.py 20000 freqtable2018 &
You should get something saying the server is ready and an example of the URL to use to query it.
First Logstash Config
I am starting with a VERY simple logstash config file, called "test.conf", in my local directory. My initial config looks like this:
input { stdin { codec => json } }
filter {
if [message] == "" {
drop { }
}
}
output { stdout { codec => rubydebug } }
Basically this lets me type in a JSON object as input and it parses the fields and prints it as a formatted JSON object. The filter block lets me hit enter a few times without throwing a parse error or printing an object for null input by telling logstash to just drop those events instead of displaying them.
I'm starting logstash with:
sudo /usr/share/logstash/bin/logstash -f test.conf
For the entire blog post, I'm going to use a simple example log entry of:
{"host":"google.co.uk"}
With my existing logstash configuration, I get something like this:
Notice that because I told logstash to expect JSON as input, and because I gave it a JSON object as input, it parsed out the "host" field for me. This will be really important later.
The First Enrichment: DNS
The very first thing we need to do is go from a hostname to an IP address. In a "real" environment you'd have something like passive DNS information or netflow so you may already have an interesting IP address. I still like letting my enrichment boxes do DNS lookups, though, because I find it interesting if I see one domain looked up ten times in two minutes and the results are for vastly different ASNs (warning warning warning!! that could be malware using fast flux DNS!).
To use the filter, I'm going to add a "filter { }" section to my test.conf file that looks like this:
filter {
if [host] {
dns {
resolve => "host"
action => "append"
}
}
}
This will do a lookup for whatever is in the "host" field and then *append* it to the host field, effectively turning it into an array. If I use my new config, my result looks something like this:
Move the IP With Mutate
I think that's a little difficult to deal with later because I don't like arrays in my config files. The easiest way to solve that wee issue is to use the "mutate" filter to rename fields into something more useful. I want to break the IP address into its own field, resolved_ip, and keep the hostname in its own field, host. With a little work, my filter now looks like this:
filter {
if [host] {
dns {
resolve => "host"
action => "append"
}
mutate {
rename => {
"[host][1]" => "resolved_ip"
"[host][0]" => "host"
}
}
}
}
if [host] {
dns {
resolve => "host"
action => "append"
}
mutate {
rename => {
"[host][1]" => "resolved_ip"
"[host][0]" => "host"
}
}
}
}
When I run the new config, I get this:
Much easier to read (and definitely easier to use later!!).
Next Enrichment: geoip
Now that I have an IP address, I can answer the question that prompted this post - which ASN is assigned the IP in question!
There are two ways to use the built-in geoip filter. The first adds city/state/country information, the second adds ASN information. You can't add these with a single block, you have to call geoip twice, so I'm going to add a new filter block that looks for "resolved_ip" and, if its present, calls geoip twice.
filter {
if [resolved_ip] {
geoip {
source => "resolved_ip"
target => "geoip_city"
default_database_type => "City"
}
geoip {
source => "resolved_ip"
target => "geoip_asn"
default_database_type => "ASN"
}
}
}
if [resolved_ip] {
geoip {
source => "resolved_ip"
target => "geoip_city"
default_database_type => "City"
}
geoip {
source => "resolved_ip"
target => "geoip_asn"
default_database_type => "ASN"
}
}
}
This adds a *considerable* amount of information to the log event!
The most important fields, generally speaking, are the geoip_asn[asn] and geoip_asn[as_org] fields. Microsoft, for example, may have some 20 million IPs across multiple CIDR blocks, but I don't need to know what each block is - I just need to know to look for "Microsoft" as the as_org. With mutate I could rename any of these fields to make searching easier.
Using REST With domain_stats.py
In day-to-day Internet usage, it is rare to see people visiting domains that are "new". Sure, the occasional start-up makes a name for themselves or a site takes off practically overnight but that is not the norm. If I see domains that are only a few days or weeks old, I find that *REALLY* interesting. It may not be the sole reason to alert on a given domain but it certainly can contribute towards me taking a second look at one.
Now that I have domain_stats running, I can query it for information using the "rest" filter. To do so, I'll add the following filter block to my test.conf:
filter {
if [host] {
rest {
request => {
url => "http://localhost:10000/domain/creation_date/%{host}"
}
sprintf => true
json => false
target => "creation_date"
}
}
}
if [host] {
rest {
request => {
url => "http://localhost:10000/domain/creation_date/%{host}"
}
sprintf => true
json => false
target => "creation_date"
}
}
}
This tells it to query the service running on port 10000 on the local machine using the URL that tells domain_stats to return the creation date for the hostname in [host]. It then stores that information in "creation_date":
Note the addition of the "creation_date" field - Google's UK domain has been around for quite a while, almost 20 years!
Final Enrichment: Using REST With freq_server
The final enrichment I'll add is the frequency analysis. This assigns a numeric value between 1 and 35 (configurable) depending on how "randomly" the letter sequences appear - the smaller the number, the more likely the domain is the product of a Domain Generation Algorithm. Because it uses the rest filter, the final "filter" block to be added will look a lot like the block for domain_stats:
filter {
if [host] {
rest {
request => {
url => "http://localhost:20000/measure/%{host}"
}
sprintf => true
json => false
target => "freq_score"
}
}
}
if [host] {
rest {
request => {
url => "http://localhost:20000/measure/%{host}"
}
sprintf => true
json => false
target => "freq_score"
}
}
}
With that added, my final output looks like this:
The "freq_score" value has two different values based on a recent code update and using two different algorithms. You can use whichever value you prefer but be consistent with whether you want the first or second one. I'll leave it to you to determine how to extract the values but grok, dissect or ruby would be viable starting points...
Wrapping Up
I really like the performance of Elasticsearch but for me, the *best* thing about the Elastic stack is logstash. To be able to take a single domain name or IP address and produce a log event, in real time, that has whois, GeoIP, passive DNS and anything else you can think of, is a MASSIVE win in my opinion.
To make things easier, here is the full content of my config file:
input { stdin { codec => json } }
filter {
if [message] == "" {
drop { }
}
}
if [message] == "" {
drop { }
}
}
filter {
if [host] {
dns {
resolve => "host"
action => "append"
}
mutate {
rename => {
"[host][1]" => "resolved_ip"
"[host][0]" => "host"
}
}
}
}
if [host] {
dns {
resolve => "host"
action => "append"
}
mutate {
rename => {
"[host][1]" => "resolved_ip"
"[host][0]" => "host"
}
}
}
}
filter {
if [resolved_ip] {
geoip {
source => "resolved_ip"
target => "geoip_city"
default_database_type => "City"
}
geoip {
source => "resolved_ip"
target => "geoip_asn"
default_database_type => "ASN"
}
}
}
if [resolved_ip] {
geoip {
source => "resolved_ip"
target => "geoip_city"
default_database_type => "City"
}
geoip {
source => "resolved_ip"
target => "geoip_asn"
default_database_type => "ASN"
}
}
}
filter {
if [host] {
rest {
request => {
url => "http://localhost:10000/domain/creation_date/%{host}"
}
sprintf => true
json => false
target => "creation_date"
}
}
}
if [host] {
rest {
request => {
url => "http://localhost:10000/domain/creation_date/%{host}"
}
sprintf => true
json => false
target => "creation_date"
}
}
}
filter {
if [host] {
rest {
request => {
url => "http://localhost:20000/measure/%{host}"
}
sprintf => true
json => false
target => "freq_score"
}
}
}
if [host] {
rest {
request => {
url => "http://localhost:20000/measure/%{host}"
}
sprintf => true
json => false
target => "freq_score"
}
}
}
output { stdout { codec => rubydebug } }
No comments:
Post a Comment
Note: only a member of this blog may post a comment.