
Elasticsearch in Apache Spark with Python: Machine Learning Series (Part 2)
Posted by Sloan Ahrens October 24, 2014In this post we’re going to continue setting up some basic tools for doing data science. We began the setup in our first article in this series, Building an Elasticsearch Index with Python, Machine Learning Series, Part 1. The goal of this instruction throughout the series is to run machine learning classification algorithms against large data sets, using Apache Spark and Elasticsearch clusters in the cloud. In the first article, we set up a VirtualBox Ubuntu 14 virtual machine, installed Elasticsearch, and built a simple index using Python.
Here we will complete our setup by installing Spark on the VM that we established with the steps given in the first article. Then, we’ll perform some simple operations to exercise skill in reading data from an Elasticsearch index, do some transformations on that data, and then write the results into another Elasticsearch index. All the code for the posts in this series will be available in this GitHub repository.
For this second segment, we’ll remain local on our Ubuntu 14 VM. Our plan for the next article is to migrate our setup to the cloud.
Introductory note: Sloan Ahrens is a co-founder of Qbox who is now a freelance data consultant. This is the second in a series of guest posts, in which he demonstrates how to set up a large scale machine learning infrastructure using Apache Spark and Elasticsearch. -Mark Brandon”
For this post, we will be using hosted Elasticsearch on Qbox.io. You can sign up or launch your cluster here, or click “Get Started” in the header navigation. If you need help setting up, refer to “Provisioning a Qbox Elasticsearch Cluster.“
Step 1: Gear Check
Before continuing with the steps below, it’s important that you’ve gone all the way through first article in this series.
Also, you might like to ensure that your VM has access to several processors. Though it isn’t strictly necessary, it’s best to have your local Spark setup perform its work in a simulation of multiple nodes. We give our VM four cores:
Another tool that we find especially useful is Sense, which is an Elasticsearch developer consolt that comes in a Google Chrome extension. Sense is really good when you’re working with an Elasticsearch index, and it can broker cURL commands.
Step 2: Install Apache Spark
Before installing Spark, we recommend that you read a good overview of Spark. Spark can be used with Java, Scala, or Python. We’re going to use Python, but we need to get Spark into our VM first. Go to the Spark download page, choose the package type that is pre-built for the latest version of Hadoop, set the download type to Direct Download, and then click the link next to Download Spark. Put the file wherever you would like it, such as your root directory.
You can also accomplish this by running:
# download spark wget http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-hadoop2.4.tgz
Next, we extract the files and tidy up the place:
# extract, clean up tar -xf spark-1.1.0-bin-hadoop2.4.tgz rm spark-1.1.0-bin-hadoop2.4.tgz sudo mv spark-* spark
Run the Spark Python interpreter with:
# run spark cd ~/spark ./bin/pyspark
You should see something like:
Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 1.1.0 /_/ Using Python version 2.7.6 (default, Mar 22 2014 22:59:56) SparkContext available as sc. >>>
The interpreter will already have given us a Spark context object, which we can see by running:
>>> print(sc) <pyspark.context.SparkContext object at 0x7f34b61c4e50>
Step 3: Simple Word-count on a Document
For a simple illustration of some of Spark’s basic capabilities, let’s perform a simple word-count on a document. Since it’s easy, take a document already in the spark directory, CHANGES.txt, and run the task across all four processors (assuming your VM has that many; otherwise use the number of processor that you specified earlier).
Start the Spark interpreter with the following command:
# run spark with 4 cores ./bin/pyspark --master local[4]
Load the text file into a Spark RDD:
# read in a text file textFile = sc.textFile('CHANGES.txt')
Next we’ll count the number of lines and (non-newline) characters in the file:
# count lines print('lines in file: %s' % textFile.count()) # add up lenths of each line chars = textFile.map(lambda s: len(s)).reduce(lambda a, b: a + b) print('number of non-newline characters in file: %s' % chars)
Of course, if we were running this in a distributed environment, then we probably wouldn’t simply be printing our results to stdout—but this is for illustration and verification in this tutorial.
Our results:
lines in file: 14577 number of non-newline characters in file: 505483
Let’s go a bit further and count the number of occurences of each word in the document, parsing only by spaces. Then we’ll sort the results and print out the ten most recurrent words. Here’s the code:
# run a simple word count of the doc # map each line to its words wordCounts = textFile.flatMap(lambda line: line.split()) # emit value:1 for each key:word wordCounts = wordCounts.map(lambda word: (word, 1)) # add up word counts by key:word wordCounts = wordCounts.reduceByKey(lambda a, b: a+b) # sort in descending order by word counts wordCounts = wordCounts.sortBy(lambda item: -item[1]) # collect the results in an array results = wordCounts.collect() # print the first ten elements print(results[:10])
The output:
[(u'-0700', 2287), (u'Commit:', 2203), (u'from', 994), (u'pull', 896), (u'request', 889), (u'Merge', 888), (u'in', 720), (u'to', 673), (u'-0800', 648), (u'2013', 625)]
NOTE: Although all the operations on wordCounts
could have been chained in a single line, we simply broke it up to make it easier to read. Next, we’ll integrate Elasticsearch with Spark.
Step 4: Elasticsearch I/O with Spark
In order to use Elasticsearch with Spark, the first thing we’re going to need is the elasticsearch-hadoop adapter. We’re going to use the 2.1.Beta release which is available here.
We install the adapter in a subdirectory of our Spark directory as follows:
# get elasticsearch-hadoop adapter cd ~/spark # or equivalent mkdir jars; cd jars wget http://central.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/2.1.0.Beta2/elasticsearch-hadoop-2.1.0.Beta2.jar cd ..
Next, we run the Spark Python interpreter with the elasticsearch-hadoop jar:
# run spark with elasticsearch-hadoop jar ./bin/pyspark --master local[4] --jars jars/elasticsearch-hadoop-2.1.0.Beta2.jar
The Spark docs contain an example of reading an Elasticsearch index with Python, which you can find under the Python tab here. We use this technique to read the data from our index and print the first document:
# read in ES index/type "titanic/passenger" es_rdd = sc.newAPIHadoopRDD( inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf={ "es.resource" : "titanic/passenger" }) print(es_rdd.first())
Here’s the output:
(u'892', {u'fare': u'7.8292', u'name': u'Kelly, Mr. James', u'embarked': u'Q', u'age': u'34.5', u'parch': u'0', u'pclass': u'3', u'sex': u'male', u'ticket': u'330911', u'passengerid': u'892', u'sibsp': u'0', u'cabin': None})
Now let’s do some simple analytics on the data and write the results back to the index (under a different type):
# extract values for the "sex" field, count occurences of each value value_counts = es_rdd.map(lambda item: item[1]["sex"]) value_counts = value_counts.map(lambda word: (word, 1)) value_counts = value_counts.reduceByKey(lambda a, b: a+b) # put the results in the right format for the adapter value_counts = value_counts.map(lambda item: ('key', { 'field': 'sex', 'val': item[0], 'count': item[1] })) # write the results to "titanic/value_counts" value_counts.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf={ "es.resource" : "titanic/value_counts" })
The figure below presents the results in the Sense tool:
All of the code that we’ve run up to this point is available in this repository.
NOTE: If you do this more than once, you’ll probably want to rebuild the index, since we aren’t providing Elasticsearch with IDs for our analytics results (new ones will accumulate each time we execute the code above). If, for example, you have the repo in ~/local_code
, then you can delete and recreate the index by running the following:
cd ~/local_code/qbox-blog-code/ch_1_local_ubuntu_es python build_index.py
Step 5: Clean Up, Generalize
As a final step for this article, let’s clean up our code a little, generalize it, and put it in a form that makes it more portable so it will be easier for us to migrate and run it in the cloud (we cover this in the next article). So far, we’ve been running our code from the interpreter. Now, lets’ put it in a code file and run it with spark-submit
.
Also, we apply the analysis done on the sex
field to every field, filter the results by those that appear more than once, and then write all the results to the index. Here’s the code, which you may also find in the Github repo:
# es_spark_test.py from pyspark import SparkContext, SparkConf if __name__ == "__main__": conf = SparkConf().setAppName("ESTest") sc = SparkContext(conf=conf) es_read_conf = { "es.nodes" : "localhost", "es.port" : "9200", "es.resource" : "titanic/passenger" } es_rdd = sc.newAPIHadoopRDD( inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_read_conf) es_write_conf = { "es.nodes" : "localhost", "es.port" : "9200", "es.resource" : "titanic/value_counts" } doc = es_rdd.first()[1] for field in doc: value_counts = es_rdd.map(lambda item: item[1][field]) value_counts = value_counts.map(lambda word: (word, 1)) value_counts = value_counts.reduceByKey(lambda a, b: a+b) value_counts = value_counts.filter(lambda item: item[1] > 1) value_counts = value_counts.map(lambda item: ('key', { 'field': field, 'val': item[0], 'count': item[1] })) value_counts.saveAsNewAPIHadoopFile( path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_write_conf)
We can run this file with the following command (change the file path appropriately):
./bin/spark-submit --master local[4] --jars jars/elasticsearch-hadoop-2.1.0.Beta2.jar ~/local_code/qbox-blog-code/ch_2_local_spark_es/es_spark_test.py
Looking at our index again, we see a lot more results:
That’s a wrap on this article. Let’s recap: we installed Apache Spark on our Ubuntu VM, read data from an Elasticsearch index into Spark, did some analysis on it, and wrote the results back to Elasticsearch. In the next article, we take what we’ve done here and deploy it to the cloud. Stay tuned!
We invite you to continue to the next article in this series, Deploying Elasticsearch and Apache Spark to the Cloud, Machine-LearningSeries, Part 3.