In many cases we want to use inputs from different databases which are not natively supported by Elasticsearch. In this post we show how to migrate data from a MySql database to Elasticsearch via Logstash.

JDBC Plugin

The JDBC plugin available for Logstash makes sure that the data from any database with a JDBC interface can be taken into Logstash as input. This plugin also supports a need for scheduling running logstash. It also makes selective data as input by using queries. In these kind of databases we have the concept of rows and columns. Each row is considered a single event and the columns in each row (event) are considered fields in the event.

The following block diagram explains the role of the JDBC connector plugin in migrating data from a JDBC supported database:
mysql-logstash1.png#asset:1073

In the diagram we have logstash running the configuration file which fires the predefined query we have set to collect the data of our interests to the sequential database. Once the query is fired to the JDBC plugin, it passes it to the database and collects the data, which it will hand over to Logstash. 

According to our requirements, we can process the data and make it in the desired form, after the processing the processed data is indexed to Elasticsearch. We show the detailed application in the example in the coming sections.

Inserting MySql Data to Elasticsearch

Let us move on to migrating data from a sequential database, such as MySql to Elasticsearch with the help of Logstash. We require the corresponding JDBC driver for MySql. You can download it here. Now let us create a database named "testdb" in MySql using the following command:

create testdb

The database is now created and we just make sure we are using the same database for our purposes:

show databases;
use testdb;

Create a table named "testtable" under the database "testdb" with the following schema:

create table testtable (PersonID int, LastName varchar(255), FirstName varchar(255), City varchar(255), Date datetime(6)); 

Now insert some test data into the above table:

INSERT INTO testtable (PersonID, LastName, FirstName, City, Date)
VALUES ('4005','Kallis','Jaques','Cape Town','2016-05-23 16:12:03.568810');
INSERT INTO testtable (PersonID, LastName, FirstName, City, Date)
VALUES ('4004','Baron','Richard','Cape Town','2016-05-23 16:12:04.370460');
INSERT INTO testtable (PersonID, LastName, FirstName, City, Date)
VALUES ('4003','McWell','Sharon','Cape Town','2016-05-23 16:12:06.874801'); 

We have created a table with the details of 3 employees. You can display the details of the table to show all of it contents by passing the query:

select * from testtable

The resulting table will look like: 

mysqllogstash2.png#asset:1074

Logstash Configuration

Now that we have created a MySql table with contents as shown in the above section, look how Logstash must be configured. In the logstash folder, we have a logstash.conf file which is the one to be configured and run to obtain the necessary results. The initial configuration is shown in the following screenshot: 

mysqllogstash3.png#asset:1075

Editors Note: I've included the code block above per request:

input {
  jdbc { 
    jdbc_connection_string => "jdbc:mysql://localhost:3306/testdb"
    # The user we wish to execute our statement as
    jdbc_user => "root"
    jdbc_password => "123456"
    # The path to our downloaded jdbc driver
    jdbc_driver_library => "/home/comp/Downloads/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # our query
    statement => "SELECT * FROM testtable"
    }
  }
output {
  stdout { codec => json_lines }
  elasticsearch {
  "hosts" => "localhost:9200"
  "index" => "test-migrate"
  "document_type" => "data"
  }
}

In the above configuration file we have mentioned numerous parameters such as: Which database should the JDBC connector check for data, the location for the JDBC plugin, the user name and password for MySql access, and the query statement. After the above settings are applied to the "logstash.conf" file, run Logstash by typing in the command below:

bin/logstash -f logstash.conf

As mentioned in the block diagram in the JDBC section, the logstash configuration file passes the query to the JDBC driver along with the user credentials. It also fetchs the data and gives the data to Logstash. Logstash will make it JSON formatted and index to the Elasticsearch database. Query the index "test-migrate" like below:

curl -XPOST 'http://localhost:9200/test-migrate/_search?pretty=true' -d '{}' 

The above query lists each row as a separate document with the columns as field. An example: 

{
  "_index": "test-migrate",
  "_type": "data",
  "_id": "4004",
  "_score": 1,
  "_source": {
    "personid": 4004,
    "lastname": "Baron",
    "firstname": "Richard",
    "city": "Cape Town",
    "date": "2016-05-23T10:42:04.370Z",
    "@version": "1",
    "@timestamp": "2016-07-10T10:36:43.685Z"
  }
}

More Configurations

In this section we show various use case scenarios. Add another row of data to the above MySql like below:

INSERT INTO testtable (PersonID, LastName, FirstName, City, Date)
VALUES ('4002','Cheers','Joan','Cape Town','2016-05-23 16:12:07.163681');

Also, update existing values of a row in the same table like below:

UPDATE testtable
-> SET FirstName='James'
 -> WHERE PersonID=4005; 

1. Duplication Issue

After the above procedure, run the logstash configuration file again. We expect a total of 4 documents inclusive to the new row and the updated row. However, when checking the index again, that it is not the case. Instead we have a total of 7 documents. This happens because the initial documents are left untouched in the elasticsearch database, which is due to not giving a specific id from the table itself. When we run the logstash configuration, the entire content in the "testtable" is indexed once more.

How can we solve this duplication? We have to give a unique ID for each document. For every run, the ID should be the same for each document which prevents the duplication issue. This can be made possible by editing the output section of the conf file as given below:

mysqllogstash4.png#asset:1076

2. Mutate Operation

One of the other important needs we encounter will be changing the field names and values as we index to elasticsearch. We add a requirement to our current example to demonstrate this. For all the documents which match "Cape Town" as the value, the field "City" should be replaced with value "South Africa" and the field value should be replaced with "Country".

In order to achieve this requirement, use the "filter" property to manipulate the ingested data in elasticsearch. Use the "mutate" property inside "filter" to execute the required change. With the above settings for the "input" and "output" sections, we need to add the following "filter" section in the logstash.conf file:

filter {
  if [city] == "Cape Town" {
    mutate {
       rename => {"city" => "country"}
       replace => [ "country", "South Africa" ]
      }
   }
}

The ingested data is checked for the value "Cape Town" for each event's "City" column. If it finds a match, the "City" field is renamed "country" and the value for each match is replaced with "South Africa".  

3. Scheduled & Incremental Updating

What if the data is continuously being updated in the MySql database and we need to index it incrementally and periodically? For periodically fetching the data, add the "scheduler" property in the input section. The "scheduler," when given a value, makes the conf file run at periodic intervals. It is highly customizable and uses the Rufus-scheduler syntax.

For incremental updating, modify the query to use the "sql_last_value" against a field. Here we give that field "Date". We also set the "use_column_value" to be true and link the corresponding column to "Date" using "tracking_column".

The complete configuration file for the cases 1, 2, and 3, is given below:

mysqllogstash5.png#asset:1077

Editors Note: I've included the code block above per request:

input {
  jdbc { 
    jdbc_connection_string => "jdbc:mysql://localhost:3306/testdb"
    jdbc_user => "root"
    jdbc_password => "factweavers"
    # The path to our downloaded jdbc driver
    jdbc_driver_library => "/home/comp/Downloads/mysql-connector-java-5.1.38.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    # our query
    schedule => "* * * *"
    statement => "SELECT" * FROM testtable where Date > :sql_last_value order by Date"
    use_column_value => true
    tracking_column => Date
}
filter {
if [city] == "Cape Town" {
mutate {
  rename => {"city" => "country"}
    replace => [ "country", "South Africa"]
  }
}
}
output {
  stdout { codec => json_lines }
  elasticsearch {
  "hosts" => "localhost:9200"
  "index" => "test-migrate"
  "document_type" => "data"
  "document_id" => "%{personid}"
  }
}

In order to see the above configuration working, add a few fields to the existing MySql table with "Date" values more recent than the ones existing before. Now run logstash and you can see only the new data has been indexed in the Elasticsearch index.

Conclusion

In this post we discussed the JDBC plugin used to migrate data from sequential databases to Elasticsearch using logstash. We also familiarized how to deal with the common issues like duplication, mutation of fields and values, and also the scheduling and incremental updating. Questions/Comments? Drop us a line below.