Apache Kafka is a very popular message broker, comparable in popularity to Logstash.

More and more companies build streaming pipelines to react on, and publish events.
Kafka gains accelerated adoption for event storage, distribution, and Elasticsearch for projection. My friend Hannes and I call it a perfect match, so we  gathered during a quiet christmas holiday to implement a connector between the two.

All code is available on Github and runs on Docker Compose.

Note

For this article I’ve worked with Hannes Stockner, Hannes is a freelance developer with significant experience working in Kafka.

In the past he worked as a Software Engineer for various companies in Vienna, Hamburg, and London. Currently he streams millions of events a day with Apache Kafka, Apache Samza, Spark and Cassandra in real-time. Meet him at Kafka London Meetup.

Prerequisites

  1. A Linux console or Apple OSX console (not tested on windows, but adaptable with little effort)
  2. A Git client to fetch the project
  3. Docker Compose
  4. Apache Maven installed, Maven would not work with a JRE, requires a JDK7 or greater and JAVA_HOME pointing to it. Check if ”’mvn -v” reports a JDK
  5. Git clone https://github.com/hannesstockner/kafka-connect-elasticsearch

What is Kafka Connect?

Until recently, the integration between Kafka and the outer world was totally left to the integrator. For example, there is an Elasticsearch plugin for consuming Kafka topics. With Connect, the Kafka team reacted to different quality levels of homegrown integrations, common problems were addressed: scalability, fault tolerance, and configuration management.

Up and running in one minute

For this post, we will be using hosted Elasticsearch on Qbox.io. You can sign up or launch your cluster here, or click “Get Started” in the header navigation. If you need help setting up, refer to “Provisioning a Qbox Elasticsearch Cluster.

Why not start from the end? All the work is already done, so you can experiment with our basic connector.
We used the flozano/kafka Docker Image together with the official elastic search image to create a Docker compose. Run docker pull -a flozano/kafka
If you have Elasticsearch already running, make sure to shut it down first.
Then build a fat jar out of the source code of the kafka-connect-elasticsearch project, in the project folder run:

mvn clean package

Export  the environment variable: export DOCKER_IP=<your prefered docker IP, example: 172.17.0.1>

Go to the project folder and run:

docker-compose up

Result:

Starting kafkaconnectelasticsearch_elasticsearch_1
Starting kafkaconnectelasticsearch_kafka_1
Attaching to kafkaconnectelasticsearch_elasticsearch_1, kafkaconnectelasticsearch_kafka_1
...
kafka_1         | 2016-01-03 09:44:13,952 INFO success: kafka entered RUNNING state, process has stayed up for > than 1 seconds (startsecs)

docker ps

should show something like:

e77a432c25e9
flozano/kafka:0.9.0.0   
"supervisord -n"  
19 hours ago  
Up 40 minutes       
0.0.0.0:2181->2181/tcp, 0.0.0.0:9092->9092/tcp 
kafkaconnectelasticsearch_kafka_1
6a161df0d06d
elasticsearch
"/docker-entrypoint.s"   
19 hours ago        
Up 40 minutes       
0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp   kafkaconnectelasticsearch_elasticsearch_1

goto http://<your prefered docker IP, example: 172.17.0.1>:9200/kafka_recipes/_search it should respond with:

{
error: 
{
root_cause: 
[
{
type: "index_not_found_exception",
reason: "no such index",
resource.type: "index_or_alias",
resource.id: "kafka_recipes",
index: "kafka_recipes"
}
],
type: "index_not_found_exception",
reason: "no such index",
resource.type: "index_or_alias",
resource.id: "kafka_recipes",
index: "kafka_recipes"
},
status: 404
}

Now run the script ./run_standalone.sh in order to start the Kafka Connect server (make sure your console has the exported DOCKER_IP variable). The script will import a bunch of recipes from the openrecipes database.

You can inspect Kafka connect at http://localhost:8083/connectors/local-elasticsearch-sink

{
name: "local-elasticsearch-sink",
config: 
{
topics: "recipes",
index.prefix: "kafka_",
tasks.max: "1",
name: "local-elasticsearch-sink",
es.host: "docker",
connector.class: "com.hannesstockner.connect.es.ElasticsearchSinkConnector"
},
tasks: 
[
{
connector: "local-elasticsearch-sink",
task: 0
}
]
}

How to build an Elasticsearch Connector

How to Connect Kafka to Elasticsearch

Figure: A Kafka Connector subscribes to a Topic and expands tasks according to the load of the Topic. Tasks feed an Elasticsearch cluster.

Kafka Connect consists of two classes:

(1) One representing the Connector, its duty is to configure and start
(2) Tasks, which are processing the incoming stream.

Our implementation requires a SinkConnector only, because the business cases for Elasticsearch as a source for Kafka Topics are not numerous we use the standard FileSourceConnector to feed Kafka with openrecipes.json data.

Kafka_Connect.png#asset:873

Figure: Components and Dependencies of the project. Docker Compose manages two Docker containers, one for Kafka, one for Elasticsearch.

public class ElasticsearchSinkConnector extends SinkConnector {
  public static final String ES_HOST = "es.host";
  public static final String INDEX_PREFIX = "index.prefix";
  private String esHost;
  private String indexPrefix;
  @Override
  public String version() {
    return AppInfoParser.getVersion();
  }
  @Override
  public void start(Map<String, String> props) {
    esHost = props.get(ES_HOST);
    indexPrefix = props.get(INDEX_PREFIX);
  }
  @Override
  public Class<? extends Task> taskClass() {
    return ElasticsearchSinkTask.class;
  }
  @Override
  public List<Map<String, String>> taskConfigs(int maxTasks) {
    ArrayList<Map<String, String>> configs = new ArrayList<>();
    for (int i = 0; i < maxTasks; i++) {
      Map<String, String> config = new HashMap<>();
      if (esHost != null)
        config.put(ES_HOST, esHost);
      if (indexPrefix != null)
        config.put(INDEX_PREFIX, indexPrefix);
      configs.add(config);
    }
    return configs;
  }
  @Override
  public void stop() {
    //not implemented
  }
}

Code: the Connector class configures the tasks and provides a template how to start the tasks

public class ElasticsearchSinkTask extends SinkTask {
  private static final Logger log = LoggerFactory.getLogger(ElasticsearchSinkTask.class);
  private String indexPrefix;
  private final String TYPE = "kafka";
  private Client client;
  @Override
  public void start(Map<String, String> props) {
    final String esHost = props.get(ElasticsearchSinkConnector.ES_HOST);
    indexPrefix = props.get(ElasticsearchSinkConnector.INDEX_PREFIX);
    try {
      client = TransportClient
        .builder()
        .build()
        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(esHost), 9300));
      client
        .admin()
        .indices()
        .preparePutTemplate("kafka_template")
        .setTemplate(indexPrefix + "*")
        .addMapping(TYPE, new HashMap<String, Object>() {{
          put("date_detection", false);
          put("numeric_detection", false);
        }})
        .get();
    } catch (UnknownHostException ex) {
      throw new ConnectException("Couldn't connect to es host", ex);
    }
  }
  @Override
  public void put(Collection<SinkRecord> records) {
    for (SinkRecord record : records) {
      log.info("Processing {}", record.value());
      log.info(record.value().getClass().toString());
      client
        .prepareIndex(indexPrefix + record.topic(), TYPE)
        .setSource(record.value().toString())
        .get();
    }
  }
  @Override
  public void flush(Map<TopicPartition, OffsetAndMetadata> offsets) {
  }
  @Override
  public void stop() {
    client.close();
  }
  @Override
  public String version() {
    return new ElasticsearchSinkConnector().version();
  }

Code: The SinkTask Class initializes the Elasticsearch index and streams new data from a Kafka Topic.

Lifecycle of a Kafka Connect Task

Based on the two represented Java classes we examine the lifecycle of the whole Connect infrastructure:

  1. The ElasticsearchSinkConnector reads the configuration properties in start(Map<String, String> props)
  2. In taskConfigs(int maxTasks) each Task can be configured separately, we are subscribing to the same Kafka Topic and everyone is forwarding Json to the same Elasticsearch Index
  3. Each ElasticsearchSinkTask receives its config with start(Map<String, String> props)
  4. It creates an Elasticsearch TransportClient
  5. A dynamic Elasticsearch mapping template is created for Kafka Connect related indexes. Since the Elasticsearch operation is idempotent, we can do this for every task-initialization.
  6. Initialization phase has ended. Now, Kafka Connect listens to the registered topic and we push records into Elasticsearch with SinkTask#put(Collection<SinkRecord> records).
  7. When shut down, ElasticsearchSinkTask#stop() is called, which closes all connections to ES. Finally, the Connect Standalone server is taken down and the program exited.

Cleaning the Docker Image if something goes wrong

If you need to start fresh during your experimentations, try:

docker volume rm $(docker volume ls -qf dangling=true)
docker volume ls -f dangling=true
docker-compose rm
docker-compose up

Conclusion

In this article we have demonstrated how Kafka can feed Elasticsearch through Kafka Connect. The new Connect library makes it easy to get data in or out of Kafka. Therefore, implementers can focus on the copy process. Connect guarantees robust scalable processing of Kafka topics and should be used instead of Elasticsearch plugins.

References

Great introduction to Kafka Connect:
http://www.confluent.io/blog/how-to-build-a-scalable-etl-pipeline-with-kafka-connect

We used dynamic mapping for this demo:
https://www.elastic.co/guide/en/elasticsearch/guide/current/dynamic-mapping.html