In this article, we continue the work from Elasticsearch in Apache Spark with Python, Machine Learning Series, Part 2. We are making some basic tools for doing data science, in which our goal is to be able to run machine-learning classification algorithms against large data sets using Apache Spark and Elasticsearch clusters in the cloud.

Introductory note: Sloan Ahrens is a co-founder of Qbox who is now a freelance data consultant. This is the third in a series of guest posts, in which he demonstrates how to set up a large scale machine learning infrastructure using Apache Spark and Elasticsearch. -Mark Brandon

In Part 1 of the series, Build an Elasticsearch Index with Python—Machine Learning Series, Part 1, we set up a VirtualBox Ubuntu 14 virtual machine, installed Elasticsearch, and built a simple index using Python. In Part 2, Elasticsearch in Apache Spark with Python—Machine Learning Series, Part 2, we set up Spark in our VM, and ran through a simple demonstration using Elasticsearch as the datastore for Spark programs. We read data from our Elasticsearch index, did some simple analytics on the data, and put the results into a new Elasticsearch index. In this third article, we’ll deploy our computational framework to the cloud. By the end, we’ll have a scalable platform that’s ready for big data analytics—at scale.

If you’ve not taken the time to work through the first two parts of the tutorial, you will likely find this article difficult to follow. We encourage you to work through them both. The code examples given below will execute properly from the terminal in the Ubunutu 14 virtual machine that we set up in the first part of the tutorial. The code from all of the articles throughout this series is available in this GitHub repository.

We continue using the Kaggle Titanic passengers dataset with which we began. It’s a very small dataset, yet quite adequate for our illustrations. We encourage you to set up a Kaggle account if you don’t have one, and then explore the tutorials involving this dataset.

NOTE: In this post, we will be spinning up cloud resources, both on Qbox and Amazon EC2. These resources cost money, so remember to terminate your instances when you’re done! If you do not, you will be billed for all of the time that these resources continue running.

Step 1: Set Up a Qbox Cluster

First, let’s get ourselves a hosted Elasticsearch cluster from Qbox. It only takes 30 seconds to sign up. Do-it-yourself setup of a production-ready Elasticsearch instance can be a bit of a hassle, so we’ll let Qbox take care of that infrastructure for us. Read our short article to see how quickly you can get one up and running: Easy How-To: Provisioning an Elasticsearch Cluster on Qbox.

For our tutorial here, we setup a two-node cluster on the smallest-available node size in the us-west-2 (Oregon) EC2 data center. This is more than enough for our small dataset. Though there are many additional features, we won’t bother with any of the security features or plug-ins for this demonstration.

After our Qbox cluster is up and running, we need to build our index within. To do so, we’re going to modify the Python index-building script from the first tutorial in the series. The new version of the script can be found here. There are two necessary changes. The first is that we need to change the host parameters on line 5 from this:

<code>ES_HOST = { "host" : "localhost", "port" : 9200 }</code>

to this:

<code>ES_HOST = { "host" : "", "port" : 80 }</code>

NOTE: Your endpoint will be different than ours, so you’ll need to edit it accordingly.

The second change we need to make is the number of shards in our index. The dataset we are using is quite small, so we could probably get away with using a single shard. However, Spark determines the number of data partitions to use when reading an Elasticsearch index from the number of shards in the index, which then determines the default number of tasks to use when doing operations on the dataset. Actually, it’s the elasticsearch-hadoop adapter that maps shards to partitions, which you can read about here.

In a real scenario with a large dataset, we are likely to want several partitions/tasks. To simulate that, we’re going to change the number of shards to 4. We’ll also add a single replica, just for good measure. And so, we change this (starting at line 50):

<code>request_body = { "settings" : { "number_of_shards": 1, "number_of_replicas": 0 } }</code>

to this:

<code>request_body = { "settings" : { "number_of_shards": 4, "number_of_replicas": 1 } }</code>

To create our index, we can run the script with this command:


Now we can once again use the Sense code from the last article. All we have to do is change the endpoint localhost:9200 to, and we should be able to see our data:sense_qbox.png#asset:239:url

Step 2: Deploy Spark to Amazon EC2

Out of the box, Spark comes with convenient scripts for managing clusters on EC2. We need to do a little prep work to use them. Before we can deploy Spark to EC2, we have to ensure that EC2 is ready for us. If you don’t already have an AWS account, you’ll need to create one. We also need to be able to SSH in to our EC2 instances. If you have not yet created an SSL key for your Ubuntu VM, you can do so with the following commands (also shown here):

<code># create SSH key, if needed 
ssh-keygen -t rsa -C "<your_email>" 
# start the ssh-agent in the background, if needed 
eval "$(ssh-agent -s)" 
# add the key 
ssh-add ~/.ssh/id_rsa 
# show key value 
cat ~/.ssh/ 
#[now copy your new public key to the clipboard]</code>

Next, we need to import the key to EC2 so we can use it for SSH. It’s a good idea to provision our Spark cluster in the same data center (region) as our Qbox cluster, to minimize latency. So, from the EC2 dashboard, let’s ensure that we have the correct region selected (Oregon, in this example), and then click on Key Pairs on the left-hand menu. We also need to ensure that we’ve copied the public key to the clipboard, then click Import Key Pair, and then paste the public key into the Public key contents box. Give your key a name, such as “ubuntu-spark”, and remember this name.

We also need to set a few environment variables. We need our AWS keys available in order to use the Spark deployment scripts. You can access these by clicking on the tab at the upper right (where it shows the username), then click Security Credentials. To access the Secret Key, we have to create a new one. Amazon recommends that you create an IAM user that has access keys rather than relying on root keys. Once we have our keys handy, we can set the following environment variables for use with the deployment script:

<code># set environment variables for use with Spark deployment script
export CLUSTER=sparkcluster 
export INSTANCE=t2.small 
export REGION=us-west-2 
export AWS_ACCESS_KEY_ID=<your_access_key> e
xport AWS_SECRET_ACCESS_KEY=<your_secret_access_key></code>

You’ll need to use your own keys, and possibly change REGION or INSTANCE if you need different values. We chose “sparkcluster” for the cluster name, but that’s arbitrary.

Having all of that preparation done, it’s easy to launch a cluster. Since we installed Spark in the ~/spark directory (in Part 2), we can go to root and call the spark-ec2 script with our parameters, including key name (“ubuntu-spark”), local path to the key, the number of worker nodes (2), the region, the instance type, and the cluster name, as follows:

<code>cd ~ 
# launch a cluster with 2 worker nodes
./spark/ec2/spark-ec2 -k ubuntu-spark -i ~/.ssh/ -s 2 -r $REGION -t $INSTANCE launch $CLUSTER</code>

You should see output that looks something like:

<code>Setting up security groups... 
Searching for existing cluster sparkcluster... 
Spark AMI: ami-ae6e0d9e Launching instances... 
Launched 2 slaves in us-west-2c, regid = r-9a53e795 
Launched master in us-west-2c, regid = r-0c52e603 
Waiting for instances to start up...</code>

If you look at the Instances tab in the EC2 dashboard, you should see your instances initializing. The process will take some time to complete. Once the initialization finishes, log in to the master node of the new cluster with:

<code># log in to cluster 
./spark/ec2/spark-ec2 -k ubuntu-spark -i ~/.ssh/ -r $REGION login $CLUSTER</code>

After logging in, we need to create a “jars” directory and then download the elasticsearch-hadoop jar, as we previously did—locally in our Ubuntu VM. We’ll also create a “code” directory for our Spark code. Once that is complete, we need to exit the session.

<code># create jars directory 
mkdir spark/jars; cd spark/jars 
# get elasticsearch-hadoop jar wget <a href=""></a> 
# make code directory 
cd ..; mkdir code 

Almost ready, but we need to do a few more things on our local VM. We need the URL of the master node, which we can get by looking at the last line in our terminal. For us, this was:

<code>logout Connection to closed.</code>

Go ahead and save that to an environment variable, as we’ll need it shortly:


Step 3: Deploy the Code

Now, we need to make a few changes to our code file, upload it to our Spark cluster, and then we can run the code. We’ll use a slight modification of the final code file we used in Part 2. Starting at line 8 of from last time, we need to change this:

<code>es_read_conf = { 
    "es.nodes" : "localhost", 
    "es.port" : "9200", 
    "es.resource" : "titanic/passenger" 
es_write_conf = { 
    "es.nodes" : "localhost", 
    "es.port" : "9200", 
    "es.resource" : "titanic/value_counts" 

to this:

<code>es_conf = {
    "" : "",      
    "": "80", "es.nodes.discovery": "false",
    "" : "titanic/passenger", 
    "es.resource.write" : "titanic/value_counts" 

There are several things to note here. First, we combine the two configuration dictionaries into one, using the read and write resource settings. So, we need to change es_read_conf andes_write_conf to es_conf in the Hadoop API calls. Next, we set a proxy host to point to our Qbox cluster, instead of using es.nodes directly. We also turn off node discovery. These two steps are necessary, since by default the elasticsearch-hadoop adapter tries to use the IPs of each node when streaming data. This won’t work for us in a hosted setting, since the IPs the nodes use to communicate with each other are private IPs—which are inaccessible from outside their private network. The updated code file is here.

The last thing to do is set one final environment variable for convenience. We’ll set the CODEFILEvariable to the location of the new version of our spark script. Setting this and the HOST variable will make it easier to upload the file again—if necessary later after making changes locally.

<code>export CODEFILE=~/local_code/qbox-blog-code/ch_3_deploy_spark_es/</code>

At this point everything is ready, so we can upload our file to the master node with scp:

<code># upload code file to master node 
scp -i ~/.ssh/ $CODEFILE root@$HOST:spark/code</code>

At long last, we’re ready to run our Spark script. As before, we log into the master node:

<code># log back into cluster master 
./spark/ec2/spark-ec2 -k ubuntu-spark -i ~/.ssh/ -r $REGION login $CLUSTER</code>

We execute our script with:

<code>./spark/bin/spark-submit --jars spark/jars/elasticsearch-hadoop-2.1.0.Beta2.jar spark/code/</code>

When this script finishes executing, our results will look like this (after changing the endpoint again):



Cleaning Up

You will receive a bill for any clusters that continue to run on EC2. When you’re done with the cluster, you can terminate it with this command:

<code># terminate cluster 
./spark/ec2/spark-ec2 -k ubuntu-spark -i ~/.ssh/ -r $REGION destroy $CLUSTER</code>

Also, be sure to delete your Qbox cluster when you’re done with it.

All done! We provisioned a Qbox Elasticsearch cluster and built our index there, deployed an Apache Spark cluster to Amazon EC2, uploaded and ran our simple Spark analytics script—using Elasticsearch as the datastore. We now have all the pieces in place to begin running scalable analytics operations on large datasets.

We invite you to continue to the next article in this series, Sparse Matrix Multiplication with Elasticsearch and Apache Spark—Machine Learning Series, Part 4.