With the first alpha release of Elasticsearch 5.0 comes a ton of new and awesome features, and if you’ve been paying attention then you know that one of the more prominent of these features is the new shiny ingest node. Simply put, ingest aims to provide a lightweight solution for pre-processing and enriching documents within Elasticsearch itself before they are indexed.

We can use ingest node to pre-process documents before the actual indexing takes place. This pre-processing happens by an ingest node that intercepts bulk and index requests, applies the transformations, and then passes the documents back to the index or bulk APIs.

For this post, we will be using hosted Elasticsearch on Qbox.io. You can sign up or launch your cluster here, or click “Get Started” in the header navigation. If you need help setting up, refer to “Provisioning a Qbox Elasticsearch Cluster.

When Elastic first presented the Ingest API, their introduction to this subject was: “I just want to tail a file”. What they mean is that right now, with the current Elastic Stack, you need quite a bit of setup in order to get logs into Elasticsearch. A possible setup could be using beats followed by logstash (GrokGeoIP etc.) in pipeline before finally pushing documents to elasticsearch.

Let’s assume we have some logs sitting in our application server (say access logs). In order to get index these logs into Elasticsearch, we can ship those to a queue (for example RabbitMq or Kafka) using Beats. We then have Logstash in pipeline listening and pulling logs out of the queue so that we can process these raw logs and turn them into JSON documents. After enriching and processing our logs we can let Logstash to output our processed logs to Elasticsearch and  we can now either search for our logs in Elasticsearch or visualise them using Kibana.

Our purpose is undoubtedly achieved but if we notice the above setup, there are some components needed in order to ‘just tail a file’. With the new Ingest feature, Elasticsearch has taken the ‘filter’ part of Logstash so that we can do our processing of raw logs and enrichment within Elasticsearch.

Tutorial: Kibana Export to CSV

We can enable ingest on any node or even have dedicated ingest nodes. Ingest is enabled by default on all nodes. To disable ingest on a node, configure the following setting in the elasticsearch.yml file:

node.ingest: false

Pipeline Definition

A pipeline is a definition of a series of processors that are to be executed in the same order as they are declared. A pipeline consists of two main fields: a description and a list of processors:

{
  "description" : "...",
  "processors" : [ ... ]
}
  • The description is a special field to store a helpful description of what the pipeline does.
  • The processors parameter defines a list of processors to be executed in order.

In order to pre-process documents before indexing, we define a pipeline that specifies a series of processors. Each processor transforms the document in some way. For example, you may have a pipeline that consists of one processor that removes a field from the document followed by another processor that renames a field.

Tutorial: Elasticsearch & MySQL Tutorial 

To use a pipeline, you simply specify the pipeline parameter on an index or bulk request to tell the ingest node which pipeline to use. For example:

curl -XPUT 'ES_HOST:ES_PORT/my-index/my-type/my-id?pipeline=my_pipeline_id&pretty' -H 'Content-Type: application/json' -d '{
 "name": "john"
}'

Ingest Nodes

Ingest Nodes are a new type of Elasticsearch node you can use to perform common data transformation and enrichments.

Each task is represented by a processor. Processors are configured to form pipelines. At the time of writing the Ingest Node had 20 built-in processors, for example grokdategsublowercase/uppercaseremove and rename.

Besides those, there are currently also three Ingest plugins:

  • Ingest Attachment converts binary documents like Powerpoints, Excel Spreadsheets, and PDF documents to text and metadata
  • Ingest Geoip looks up the geographic locations of IP addresses in an internal database
  • Ingest user agent parses and extracts information from the user agent strings used by browsers and other applications when using HTTP

Ingest Pipeline

We can configure a new ingest pipeline with the _ingest API endpoint.

curl -XPUT 'ES_HOST:ES_PORT/_ingest/pipeline/rename_device -d '{
  "processors": [
    {
      "rename": {
        "field": "device",
        "target_field": "client",
        "ignore_missing": true
      }
    }
  ]
}'

In this example, we configure a pipeline called rename_device that simply takes the field device and renames it to client. If the device field does not exist, the processor continues without error.

To use this pipeline, there are several ways.

When using plain Elasticsearch APIs, we specify the pipeline parameter in the query string, e.g.:

curl -XPOST 'ES_HOST:ES_PORT server/values/?pipeline=rename_device' -d '{
  "device": "Google Android 5.2.0"
}'

In Logstash, we can add the pipeline parameter to the elasticsearch output:

output {
  elasticsearch {
    hosts => "ELASTICSEARCH_HOST_IP"
    index => "access_logs"
    pipeline => "rename_device"
  }
}

Similarly, we can add a parameter to the elasticsearch output of any Beat:

output.elasticsearch:
  hosts: ["ELASTICSEARCH_HOST_IP:ELASTICSEARCH_HOST_PORT"]
  index: "access_logs"
  pipeline: "rename_device"

When configuring a new pipeline, it is often very valuable to test it before feeding it with real data and discovering then if it throws any error! We can use the Simulate API for that purpose:

curl -XPOST 'ES_HOST:ES_PORT/_ingest/pipeline/rename_device/_simulate -d '{
  "docs": [
    {
      "_source": {
        "device": "Google Android 5.2.0"
      }
    }
  ]
}'

The result shows us that our field has been successfully renamed:

      [...]
        "_source": {
          "client": "Google Android 5.2.0"
        },
        [...]

Ingest Pipeline: Parsing Web Logs

Let’s turn to something from the real world: Web logs.

This is an example of an access log in the Combined Log Format supported by both Apache httpd and nginx:

321.54.23.239 - - [18/Nov/2017:13:27:46 +0000] "GET /favicon.ico HTTP/1.1" 200 4560 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/456.16 (KHTML, like Gecko) Chrome/63.0.4532.234 Safari/543.45"

As you can see, it contains several pieces of information: IP addresstimestamp, a user agent string, and so on.

To allow fast search and visualisation we need to give every piece its own field in Elasticsearch. It would also be useful to know where this request is coming from. We can do all this with the following Ingest pipeline.

curl -XPOST 'ES_HOST:ES_PORT/_ingest/pipeline/parsing_access_log -d '{
  "description" : "Ingest pipeline for Apache httpd Combined Log Format",
  "processors" : [
    {
      "grok": {
        "field": "message",
        "patterns": ["%{IPORHOST:client_ip} %{USER:identity} %{USER:authorisation} \\[%{HTTPDATE:timestamp}\\] \"%{WORD:http_verb} %{DATA:request} HTTP/%{NUMBER:httpversion}\" %{NUMBER:response_code:int} (?:-|%{NUMBER:bytes:int}) %{QS:referrer} %{QS:agent}"]
      }
    },
    {
      "date": {
        "field": "timestamp",
        "formats": [ "dd/MMM/YYYY:HH:mm:ss Z" ]
      }
    },
    {
      "geoip": {
        "field": "client_ip"
      }
    },
    {
      "user_agent": {
        "field": "agent"
      }
    }
  ]
}'

The pipeline parsing_access_log contains a total of four processors:

  • grok uses a regular expression to parse the whole log line into individual fields.
  • date identifies the timestamp of the document.
  • geoip takes the IP address of the requester and looks it up in an internal database to determine its geographical location.
  • user_agent takes the user agent string and splits it up into individual components.

Since the last two processors are plugins that do not ship with Elasticsearch by default we will have to install them first:

cd ES_HOME
bin/elasticsearch-plugin install ingest-geoip
bin/elasticsearch-plugin install ingest-user-agent

To test our pipeline, we can again use the Simulate API (the double quotes inside message have to be escaped):

curl -XPOST 'ES_HOST:ES_PORT/_ingest/pipeline/access_log/_simulate' -d '{
  "docs": [
    {
      "_source": {
        "message": "321.54.23.239 - - [18/Nov/2017:13:27:46 +0000] \"GET /favicon.ico HTTP/1.1\" 200 4560 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/456.16 (KHTML, like Gecko) Chrome/63.0.4532.234 Safari/543.45\""
      }
    }
  ]
}'

The result from Elasticsearch shows us that the pipeline worked:

{
  "docs": [
    {
      "doc": {
        "_index": "_index",
        "_type": "_type",
        "_id": "_id",
        "_source": {
          "request": "/favicon.ico",
          "agent": "\"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/456.16 (KHTML, like Gecko) Chrome/63.0.4532.234 Safari/543.45\"",
          "geoip": {
            "continent_name": "Asia",
            "city_name": null,
            "country_iso_code": "IN",
            "region_name": null,
            "location": {
              "lon": 78,
              "lat": 20
            }
          },
          "authorisation": "-",
          "identity": "-",
          "http_verb": "GET",
          "message": "321.54.23.239 - - [18/Nov/2017:13:27:46 +0000] \"GET /favicon.ico HTTP/1.1\" 200 4560 \"-\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_11_6) AppleWebKit/456.16 (KHTML, like Gecko) Chrome/63.0.4532.234 Safari/543.45\"",
          "referrer": "\"-\"",
          "@timestamp": "2017-10-18T16:13:27.46.000Z",
          "response": 200,
          "bytes": 4560,
          "clientip": "321.54.23.239",
          "httpversion": "1.1",
          "user_agent": {
            "patch": "4532",
            "major": "63",
            "minor": "0",
            "os": "Mac OS X 10.11.6",
            "os_minor": "11",
            "os_major": "10",
            "name": "Chrome",
            "os_name": "Mac OS X",
            "device": "Other"
          },
          "timestamp": "18/Nov/2017:13:27:46 +0000"
        },
        "_ingest": {
          "timestamp": "2017-10-13T09:15:38.123+0000"
        }
      }
    }
  ]
}

Conclusion

When we don’t need the additional power and flexibility of Logstash filters, Ingest APIs allows us to simplify our architecture for simpler use cases. Using Kibana itself and Timelion as one of its new built-in features, we have the perfect tool for visualizing the data.

The introduction of ingest node in Elastic Stack 5.0 powers us to transform data inside Elasticsearch before indexing it. This is especially useful if only simpler operations are required, while more complex ones can still be performed using Logstash. The operations performed in the ingest node are very efficient and result in seamless indexing.

Give it a Whirl!

It’s easy to spin up a standard hosted Elasticsearch cluster on any of our 47 Rackspace, Softlayer, or Amazon data centers. And you can now provision your own AWS Credits on Qbox Private Hosted Elasticsearch.

Questions? Drop us a note, and we’ll get you a prompt response.

Not yet enjoying the benefits of a hosted ELK-stack enterprise search on Qbox? We invite you to create an account today and discover how easy it is to manage and scale your Elasticsearch environment in our cloud hosting service.