Logstash has an interesting feature called the Translate Filter. The translate filter is used to filter specific fields in incoming data against a dictionary or lookup file. It then performs an action if the incoming field matches an entry in the dictionary lookup file such as adding a field to your data or sending an email.

An Example Use Case

Perhaps you are indexing data from Twitter and would like to know when users mention certain specific words in their tweets. Create a list of words that are considered interesting. Every time the word is mentioned in a tweet, you can add a field to the data that marks the data as “interesting”. Now you can easily make a graph in Kibana showing only tweets that were marked as “interesting”.

This is also a good feature when dealing with Logfiles of security events, as you might want to alert on certain file hashes or blacklisted IP’s within your log files. Before we get to that, let’s look at the structure of a Logstash config file that makes use of the translate filter.

Installation & Configuration

To use the translate filter you need to install it. The translate filter is a Logstash plugin. Install it with:

$ cd /opt/logstash
$ sudo bin/plugin install logstash-filter-translate

This is what our config is going to look like. You can just copy paste it and use it, but read over it so that you understand how it works and what it does. There are different ways that you can use the translate filter. I’ll shortly discuss two of these methods. The translate section goes in the filter section of your configuration file. See below:

input {
  stdin {
    codec => json
    }
}
filter {
 translate {
# Your translate filter specific options go here.    
...
}
...
}
output {
  stdout {
 codec => plain {
 charset => "ISO-8859-1"
}
 }
 elasticsearch {
    index => "nginx_json_elk_example"
    document_type => "logs"
 }
}

Now you know where to put your options related to the translate filter. We need to define a field in our incoming data that we are performing the translate filter against. In the below example, we are looking at the destination IP in our HTTP proxy server’s Logs. We can check the destination IP on all incoming log data by specifying our field in the translate filter in this way.

translate {
    field => "dst_ip"
...
  }

Dictionary Entries

There are two ways that you can define the dictionary entries for the translate filter to reference incoming data against. They are:

  • The dictionary configuration option, where you define the values to represent directly in a configuration file. This is useful when you have a small set of data to represent that do not change often.
# Web server related ports and what they represent. Just an example.
dictionary => [     "80", "http",    "443", "https",    "8080", "http-alt"    ]
  • Dictionary lookup file in yaml format. This is useful for large datasets, especially data that needs to be updated often. For example, this is useful for things like IP or email blacklists that contain many entries and are often updated daily.
# This is our Logstash configuration   
 translate {
      field => "agent"
      destination => "good_bot"
      dictionary_path => '/tmp/crawler_bot_list.yaml'
    }

This is what the “yaml” file would look like:

"Exalead Mozilla/5.0 (compatible; Exabot/3.0 (BiggerBetter); +http://www.exabot.com/go/robot)": "YES"
"Alexa Mozilla/5.0 (compatible; alexa site audit/1.0; +http://www.alexa.com/help/webmasters; siteaudit@alexa.com)": "YES"
"Grapeshot UK Mozilla/5.0 (compatible; GrapeshotCrawler/2.0; +http://www.grapeshot.co.uk/crawler.php)": "YES"
"Facebook facebookexternalhit/1.0 (+http://www.facebook.com/externalhit_uatext.php)": "YES"
"Archive.org Mozilla/5.0 (compatible; archive.org_bot +http://www.archive.org/details/archive.org_bot)": "YES"
"Shopstyle.com ShopStyle Server/1.0 (ShopStyle Server Agent; http://www.shopstyle.com/; info@shopstyle.com))": "YES"
"jobrapido.com Mozilla/5.0 (compatible; Jobrapido/1.1; +http://www.jobrapido.com)": "YES"

It’s easier to understand what these two types are used for via practical examples. For the examples, I’m going to make a so-called blacklist of IPs. These IP’s are not actually blacklisted, I just took IP’s that were already in the data and created a make-believe blacklist from these IP’s. The example will show how to check your nginx Log’s for blacklisted IP’s hitting your web server.

First create a dictionary file or yaml file. We are going to check the IP’s in this file against incoming IP’s hitting the web server. There hasn’t been much written on this topic. However, you can look at some good config on using the ELK stack in a security context with this github repo: https://github.com/TravisFSmith/MyBroElk. Have a look at the logstash.conf. It will help you understand how to use the Logstash translate filter.

Now back to our example. This is what our blacklist file will look like for this example:
(Put these contents in a file with the name: /tmp/blacklisted_ip.yaml)

"216.46.173.126": "true"
"180.179.174.219": "true"
"204.77.168.241": "true"
"65.39.197.164": "true"
"80.91.33.133": "true"
"84.208.15.12": "true"
"74.125.60.158": "true"

We are going to be checking our incoming log data for these specific IP’s on the “remote_ip” field. If we see one of these IP’s in the “remote_ip” field then we will add a new field “blacklisted_ip” with the value “true” to the document. This is our config:

input {
  stdin {
    codec => json
    }
}
filter {
 date {
    match => ["time", "dd/MMM/YYYY:HH:mm:ss Z" ]
    locale => en
  }
 geoip {
    source => "remote_ip"
    target => "geoip"
  }
   translate {
     field => "remote_ip"
      destination => "blacklisted_ip"
      dictionary_path => '/tmp/blacklisted_ip.yaml'
   }
 grok {
     match => [ "request" , "%{WORD:request_action} %{DATA:request1} HTTP/%{NUMBER:http_version}" ]
   }
 }
output {
   stdout {
         codec => plain {
         charset => "ISO-8859-1"
}
  }
  elasticsearch {
  index => "logstash-nginx"
    document_type => "logs"
  }
}

This is making use of some of the official Elasticsearch examples files. You will need to download the logfile before you can try your new configuration file.

$ wget https://raw.githubusercontent.com/elastic/examples/master/ELK_NGINX-json/nginx_json_logs

We can now run our configuration on the log file. Make sure you created the dictionary lookup file /tmp/blacklisted_ip.yaml.

$ cat nginx_json_logs | /opt/logstash/bin/logstash -f blacklisted_ip.conf

You might be thinking that the field "blacklisted_ip" isn’t very useful. It will take some playing around in Kibana for you to see the value. Open up Kibana and create an index with the name “logstash-nginx-blacklisted”. In your discovery tab search for: blacklisted_ip:true.

kibana_discovery_blacklisted_ip.png#asse

All we did was add this to each document that had a “remote_ip” value that matched one of the values in our blacklisted_ip.yaml file. This is essentially what we added to each log entry and essentially to each document:

…
"blacklisted_ip": "true",
...

See the below screenshot, which is raw view of the message data in Kibana via the discovery section.

2_kibana_discovery_blacklisted_ip_json_r

Essentially we have marked each log entry that was made by a blacklisted IP, of which we can now visualize. For example, we can make a pie chart by filtering for entries of: blacklisted_ip:true. Then we can use an aggregation that shows the “remote_ip”, thus we will the hits per “remote_ip” that was made by a “remote_ip” that was blacklisted. See the screenshot below:

3_visualise_hits_by_blacklisted_ips.png#

Conclusion

As you can see this is one example of using the Logstash translate filter. The translate filter is extremely useful and can be used to enrich your data to help add context to seemingly meaningless data. You could even indirectly use the Logstash translate filter to alert on certain events, such as when a blacklisted IP makes a request to your webserver, then you could be alerted via email. Questions/Comments? Leave them below.