We have already discussed Elasticsearch 5.0 and its ton of new and awesome features, and if you've been paying attention, then you know that one of the more prominent of these features is the new shiny ingest node. Simply put, ingest aims to provide a lightweight solution for pre-processing and enriching documents within Elasticsearch itself before they are indexed.

We can use ingest node to pre-process documents before the actual indexing takes place. This pre-processing happens by an ingest node that intercepts bulk and index requests, applies the transformations, and then passes the documents back to the index or bulk APIs.

For this post, we will be using hosted Elasticsearch on Qbox.io. You can sign up or launch your cluster here, or click "Get Started" in the header navigation. If you need help setting up, refer to "Provisioning a Qbox Elasticsearch Cluster."

Ingest Nodes are a new type of Elasticsearch node you can use to perform common data transformation and enrichments. Each task is represented by a processor. Processors are configured to form pipelines.

At the time of writing the Ingest Node had 20 built-in processors, for example grok, date, gsub, lowercase/uppercase, remove and rename.

Besides those, there are currently also three Ingest plugins:

  • Ingest Attachment converts binary documents like Powerpoints, Excel Spreadsheets, and PDF documents to text and metadata

  • Ingest Geoip looks up the geographic locations of IP addresses in an internal database

  • Ingest user agent parses and extracts information from the user agent strings used by browsers and other applications when using HTTP

We have covered Elasticsearch 5.0 Ingest APIs tutorials under the series Elastic Stack 5.0 Ingest APIs.

For this post, we will be using hosted Elasticsearch on Qbox.io. You can sign up or launch your cluster here, or click "Get Started" in the header navigation. If you need help setting up, refer to "Provisioning a Qbox Elasticsearch Cluster."

Accessing Data in Pipelines

The processors in a pipeline have read and write access to documents that pass through the pipeline. The processors can access fields in the source of a document and the document’s metadata fields.

Accessing Fields in the Source

Accessing a field in the source is straightforward. We can simply refer to fields by their name. For example:

{
  "set": {
    "field": "my_field"
    "value": 582.1
  }
}
{
  "set": {
    "field": "my_another_field"
    "Value": "elasticsearch"
  }
}

On top of this, fields from the source are always accessible via the _source prefix:

{
  "set": {
    "field": "_source.my_field"
    "value": 582.1
  }
}
{
  "set": {
    "field": "_source.my_another_field"
    "value": "elasticsearch"
  }
}

Accessing Metadata Fields

We can access metadata fields in the same way that we access fields in the source. This is possible because Elasticsearch doesn’t allow fields in the source that have the same name as metadata fields.

The following example sets the _id metadata field of a document to 100 and _type field to test:

{
  "set": {
    "field": "_id"
    "value": "100"
  }
}
{
  "set": {
    "field": "_type"
    "value": "test"
  }
}

The following metadata fields are accessible by a processor: _index, _type, _id, _routing, _parent.

Accessing Ingest Metadata Fields

Apart from metadata fields and source fields, ingest also adds ingest metadata to the documents that it processes. These metadata properties are accessible under the _ingest key. Currently ingest adds the ingest timestamp under the _ingest.timestamp key of the ingest metadata. The ingest timestamp is the time when Elasticsearch received the index or bulk request to pre-process the document.

Any processor can add ingest-related metadata during document processing. Ingest metadata is transient and is lost after a document has been processed by the pipeline. Therefore, ingest metadata won’t be indexed.

The following example adds a field with the name created_at. The value is the ingest timestamp:

{
  "set": {
    "field": "created_at"
    "value": "{{_ingest.timestamp}}"
  }
}

Unlike Elasticsearch metadata fields, the ingest metadata field name _ingest can be used as a valid field name in the source of a document. Use _source._ingest to refer to the field in the source document. Otherwise, _ingest will be interpreted as an ingest metadata field.

Accessing Fields and Meta Fields in Templates

A number of processor settings also support templating. Settings that support templating can have zero or more template snippets. A template snippet begins with {{ and ends with }}. Accessing fields and metafields in templates is exactly the same as via regular processor field settings.

The following example adds a field named field_third. Its value is a concatenation of the values of field_first and field_second.

{
  "set": {
    "field": "field_third"
    "value": "{{field_first}} {{field_second}}"
  }
}

The following example uses the value of the geoip.country_geo_code field in the source to set the index that the document will be indexed into:

{
  "set": {
    "field": "_index"
    "value": "{{geoip.country_geo_code}}"
  }
}

Handling Failures in Pipelines

A pipeline defines a list of processors that are executed sequentially, and processing halts at the first exception. This behavior may not be desirable when failures are expected. For example, you may have logs that don’t match the specified grok expression. Instead of halting execution, you may want to index such documents into a separate index.

To enable this behavior, you can use the on_failure parameter. The on_failure parameter defines a list of processors to be executed immediately following the failed processor. We can specify this parameter at the pipeline level, as well as at the processor level. If a processor specifies an on_failure configuration, whether it is empty or not, any exceptions that are thrown by the processor are caught, and the pipeline continues executing the remaining processors. Since we can define further processors within the scope of an on_failure statement, we can nest failure handling.

The following example defines a pipeline that renames the old_field field in the processed document to new_field. If the document does not contain the foo field, the processor attaches an error message to the document for later analysis within Elasticsearch.

{
  "description" : "my first pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "old_field",
        "target_field" : "new_field",
        "on_failure" : [
          {
            "set" : {
              "field" : "error",
              "value" : "field \"old_field\" does not exist, cannot rename to \"new_field\""
            }
          }
        ]
      }
    }
  ]
}

The following example defines an on_failure block on a whole pipeline to change the index to which failed documents get sent.

{
  "description" : "my test pipeline with handled exceptions",
  "processors" : [ ... ],
  "on_failure" : [
    {
      "set" : {
        "field" : "_index",
        "value" : "failed-{{ _index }}"
      }
    }
  ]
}

Alternatively instead of defining behaviour in case of processor failure, it is also possible to ignore a failure and continue with the next processor by specifying the ignore_failure setting.

Here, if the field old_field doesn’t exist, the failure will be caught and the pipeline continues to execute. It means that the pipeline does nothing if the field old_field doesn’t exist.

{
  "description" : "my test pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "old_field",
        "target_field" : "new_field",
        "ignore_failure" : true
      }
    }
  ]
}

The ignore_failure can be set on any processor and defaults to false.

Accessing Error Metadata from Processors Handling Exceptions

We may want to retrieve the actual error message that was thrown by a failed processor. To do so you can access metadata fields called on_failure_message, on_failure_processor_type, and on_failure_processor_tag. These fields are only accessible from within the context of an on_failure block.

Here is an updated version of the example that we saw earlier. But instead of setting the error message manually, the ingest processor leverages the on_failure_message metadata field to provide the error message.

{
  "description" : "my test pipeline with handled exceptions",
  "processors" : [
    {
      "rename" : {
        "field" : "old_field",
        "to" : "new_field",
        "on_failure" : [
          {
            "set" : {
              "field" : "error",
              "value" : "{{ _ingest.on_failure_message }}"
            }
          }
        ]
      }
    }
  ]
}

Give it a Whirl!

It's easy to spin up a standard hosted Elasticsearch cluster on any of our 47 Rackspace, Softlayer, or Amazon data centers. And you can now provision your own AWS Credits on Qbox Private Hosted Elasticsearch

Questions? Drop us a note, and we'll get you a prompt response.

Not yet enjoying the benefits of a hosted ELK-stack enterprise search on Qbox? We invite you to create an account today and discover how easy it is to manage and scale your Elasticsearch environment in our cloud hosting service.

comments powered by Disqus