In this post we're going to continue setting up some basic tools for doing data science. We began the setup in our first article in this series, Building an Elasticsearch Index with Python, Machine Learning Series, Part 1. The goal of this instruction throughout the series is to run machine learning classification algorithms against large data sets, using Apache Spark and Elasticsearch clusters in the cloud. In the first article, we set up a VirtualBox Ubuntu 14 virtual machine, installed Elasticsearch, and built a simple index using Python.

Here we will complete our setup by installing Spark on the VM that we established with the steps given in the first article. Then, we'll perform some simple operations to exercise skill in reading data from an Elasticsearch index, do some transformations on that data, and then write the results into another Elasticsearch index. All the code for the posts in this series will be available in this GitHub repository.

For this second segment, we'll remain local on our Ubuntu 14 VM. Our plan for the next article is to migrate our setup to the cloud.


Introductory note: Sloan Ahrens is a co-founder of Qbox who is now a freelance data consultant. This is the second in a series of guest posts, in which he demonstrates how to set up a large scale machine learning infrastructure using Apache Spark and Elasticsearch. -Mark Brandon"

For this post, we will be using hosted Elasticsearch on Qbox.io. You can sign up or launch your cluster here, or click "Get Started" in the header navigation. If you need help setting up, refer to "Provisioning a Qbox Elasticsearch Cluster."


Step 1: Gear Check

Before continuing with the steps below, it's important that you've gone all the way through first article in this series.

Also, you might like to ensure that your VM has access to several processors. Though it isn't strictly necessary, it's best to have your local Spark setup perform its work in a simulation of multiple nodes. We give our VM four cores:

toy_sass_flask_app_spark_job.png#asset:5

Another tool that we find especially useful is Sense, which is an Elasticsearch developer consolt that comes in a Google Chrome extension. Sense is really good when you're working with an Elasticsearch index, and it can broker cURL commands. 

Step 2: Install Apache Spark

Before installing Spark, we recommend that you read a good overview of Spark. Spark can be used with Java, Scala, or Python. We're going to use Python, but we need to get Spark into our VM first. Go to the Spark download page, choose the package type that is pre-built for the latest version of Hadoop, set the download type to Direct Download, and then click the link next to Download Spark. Put the file wherever you would like it, such as your root directory.

You can also accomplish this by running:

# download spark
wget http://d3kbcqa49mib13.cloudfront.net/spark-1.1.0-bin-hadoop2.4.tgz

Next, we extract the files and tidy up the place:

# extract, clean up
tar -xf spark-1.1.0-bin-hadoop2.4.tgz
rm spark-1.1.0-bin-hadoop2.4.tgz
sudo mv spark-* spark

Run the Spark Python interpreter with:

# run spark
cd ~/spark
./bin/pyspark

You should see something like:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.1.0
      /_/
Using Python version 2.7.6 (default, Mar 22 2014 22:59:56)
SparkContext available as sc.
>>>

The interpreter will already have given us a Spark context object, which we can see by running:

>>> print(sc)
<pyspark.context.SparkContext object at 0x7f34b61c4e50>

Step 3: Simple Word-count on a Document

For a simple illustration of some of Spark's basic capabilities, let's perform a simple word-count on a document. Since it's easy, take a document already in the spark directory, CHANGES.txt, and run the task across all four processors (assuming your VM has that many; otherwise use the number of processor that you specified earlier).

Start the Spark interpreter with the following command:

# run spark with 4 cores
./bin/pyspark --master local[4]

Load the text file into a Spark RDD:

# read in a text file
textFile = sc.textFile('CHANGES.txt')

Next we'll count the number of lines and (non-newline) characters in the file:

# count lines
print('lines in file: %s' % textFile.count())
# add up lenths of each line
chars = textFile.map(lambda s: len(s)).reduce(lambda a, b: a + b)
print('number of non-newline characters in file: %s' % chars)

Of course, if we were running this in a distributed environment, then we probably wouldn't simply be printing our results to stdout—but this is for illustration and verification in this tutorial.

Our results:

lines in file: 14577
number of non-newline characters in file: 505483

Let's go a bit further and count the number of occurences of each word in the document, parsing only by spaces. Then we'll sort the results and print out the ten most recurrent words. Here's the code:

# run a simple word count of the doc
# map each line to its words
wordCounts = textFile.flatMap(lambda line: line.split())
# emit value:1 for each key:word
wordCounts = wordCounts.map(lambda word: (word, 1))
# add up word counts by key:word
wordCounts = wordCounts.reduceByKey(lambda a, b: a+b)
# sort in descending order by word counts
wordCounts = wordCounts.sortBy(lambda item: -item[1])
# collect the results in an array
results = wordCounts.collect()
# print the first ten elements
print(results[:10])

The output:

[(u'-0700', 2287), (u'Commit:', 2203), (u'from', 994), (u'pull', 896), (u'request', 889), (u'Merge', 888), (u'in', 720), (u'to', 673), (u'-0800', 648), (u'2013', 625)]

NOTE: Although all the operations on wordCounts could have been chained in a single line, we simply broke it up to make it easier to read. Next, we'll integrate Elasticsearch with Spark.

Step 4: Elasticsearch I/O with Spark

In order to use Elasticsearch with Spark, the first thing we're going to need is the elasticsearch-hadoop adapter. We're going to use the 2.1.Beta release which is available here.

We install the adapter in a subdirectory of our Spark directory as follows:

# get elasticsearch-hadoop adapter
cd ~/spark # or equivalent
mkdir jars; cd jars
wget http://central.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/2.1.0.Beta2/elasticsearch-hadoop-2.1.0.Beta2.jar
cd ..

Next, we run the Spark Python interpreter with the elasticsearch-hadoop jar:

# run spark with elasticsearch-hadoop jar
./bin/pyspark --master local[4] --jars jars/elasticsearch-hadoop-2.1.0.Beta2.jar

The Spark docs contain an example of reading an Elasticsearch index with Python, which you can find under the Python tab here. We use this technique to read the data from our index and print the first document:

# read in ES index/type "titanic/passenger"
es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf={ "es.resource" : "titanic/passenger" })
print(es_rdd.first())

Here's the output:

(u'892', {u'fare': u'7.8292', u'name': u'Kelly, Mr. James', u'embarked': u'Q', u'age': u'34.5', u'parch': u'0', u'pclass': u'3', u'sex': u'male', u'ticket': u'330911', u'passengerid': u'892', u'sibsp': u'0', u'cabin': None})

Now let's do some simple analytics on the data and write the results back to the index (under a different type):

# extract values for the "sex" field, count occurences of each value
value_counts = es_rdd.map(lambda item: item[1]["sex"])
value_counts = value_counts.map(lambda word: (word, 1))
value_counts = value_counts.reduceByKey(lambda a, b: a+b)
# put the results in the right format for the adapter
value_counts = value_counts.map(lambda item: ('key', { 
    'field': 'sex', 
    'val': item[0], 
    'count': item[1] 
}))
# write the results to "titanic/value_counts"
value_counts.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf={ "es.resource" : "titanic/value_counts" })

The figure below presents the results in the Sense tool:

sense_qbox.png#asset:239

All of the code that we've run up to this point is available in this repository.

NOTE: If you do this more than once, you'll probably want to rebuild the index, since we aren't providing Elasticsearch with IDs for our analytics results (new ones will accumulate each time we execute the code above). If, for example, you have the repo in ~/local_code, then you can delete and recreate the index by running the following:

cd ~/local_code/qbox-blog-code/ch_1_local_ubuntu_es
python build_index.py

Step 5: Clean Up, Generalize

As a final step for this article, let's clean up our code a little, generalize it, and put it in a form that makes it more portable so it will be easier for us to migrate and run it in the cloud (we cover this in the next article). So far, we've been running our code from the interpreter. Now, lets' put it in a code file and run it with spark-submit.

Also, we apply the analysis done on the sex field to every field, filter the results by those that appear more than once, and then write all the results to the index. Here's the code, which you may also find in the Github repo:

# es_spark_test.py
from pyspark import SparkContext, SparkConf
if __name__ == "__main__":
    conf = SparkConf().setAppName("ESTest")
    sc = SparkContext(conf=conf)
    es_read_conf = {
        "es.nodes" : "localhost",
        "es.port" : "9200",
        "es.resource" : "titanic/passenger"
    } 
    es_rdd = sc.newAPIHadoopRDD(
        inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
        keyClass="org.apache.hadoop.io.NullWritable", 
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
        conf=es_read_conf)
    es_write_conf = {
        "es.nodes" : "localhost",
        "es.port" : "9200",
        "es.resource" : "titanic/value_counts"
    } 
    doc = es_rdd.first()[1]
    for field in doc:
        value_counts = es_rdd.map(lambda item: item[1][field])
        value_counts = value_counts.map(lambda word: (word, 1))
        value_counts = value_counts.reduceByKey(lambda a, b: a+b)
        value_counts = value_counts.filter(lambda item: item[1] > 1)
        value_counts = value_counts.map(lambda item: ('key', { 
            'field': field, 
            'val': item[0], 
            'count': item[1] 
        }))
        value_counts.saveAsNewAPIHadoopFile(
            path='-', 
            outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
            keyClass="org.apache.hadoop.io.NullWritable", 
            valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
            conf=es_write_conf)

We can run this file with the following command (change the file path appropriately):

./bin/spark-submit --master local[4] --jars jars/elasticsearch-hadoop-2.1.0.Beta2.jar ~/local_code/qbox-blog-code/ch_2_local_spark_es/es_spark_test.py

Looking at our index again, we see a lot more results:

sense_qbox_counts.png#asset:240

That's a wrap on this article. Let's recap: we installed Apache Spark on our Ubuntu VM, read data from an Elasticsearch index into Spark, did some analysis on it, and wrote the results back to Elasticsearch. In the next article, we take what we've done here and deploy it to the cloud. Stay tuned!

We invite you to continue to the next article in this series, Deploying Elasticsearch and Apache Spark to the Cloud, Machine-LearningSeries, Part 3.

comments powered by Disqus