There is growing interest in the power of Apache Spark to do large-scale data analytics, including tests of machine-learning algorithms against large datasets. We also take interest in Spark as part of a larger technical solution featuring a web front-end that allowing users to start jobs on the back end. In this article, we take you through the building of a software-as-a-service application.


Introductory note: This is a continuation of our series on building a machine-learning infrastructure using Apache Spark and Elasticsearch. The first five parts were written by Sloan Ahrens, co-founder of Qbox and currently a freelance data consultant. John Vanderzyden, our Director of Content & Outreach, continues the series in this sixth installment. — Mark Brandon“>

Suppose you want to build a machine-learning-as-a-service application. Where would you start?

There are many ways you could go about building such an application, but we are presenting in this article a simplified approach that provides an accessible place to begin. This is by no means an exhaustive treatment on the subject, nor do we claim that the example code is production-ready. What we offer is an easily digestible recipe for connecting all of the necessary components, and also some starter code for a larger project. In a future article, we’ll explore deployment of such a system to a cloud production environment.

Code

As with the other articles in this series, all of the example code can be found this Qbox Github repository: https://github.com/sloanahrens/qbox-blog-code.

In our previous article, Building an Elasticsearch Index with Python on an Ubuntu VM, you can learn how to build a simple index in Elasticsearch using Python. Also, in Elasticsearch in Apache Spark with Python, we present a simple scenario using Apache Spark to read that index, perform simple analysis, and then write the results back to Elasticsearch. Here, we will leverage code from both of these articles.

Credit Where Credit is Due

A large chunk of the code for this tutorial came from Miguel Grinberg’s excellent post, Using Celery With Flask. We encourage you to work through his code before attempting this one, especially if you are new to Celery. We build upon what he explains there, so it’s worth your time to get familiar with it.

App Outline

There are several components to this application, and the following is a short description of each. We later explain the thread of execution in a little more detail.

  • Flask app: The centerpiece of the application stack, which has three views: index, which returns the app’s only webpage; sparktask, which fires off the Spark job via Celery; and taskstatus, which provides some of the mechanism for tracking progress on the Spark job.
  • Index template, JavaScript: The front-end. The html and css are very simple. Some simple JavaScript code does the following: sends the request that fires off the Spark job when the button is pressed in the UI, polls the Flask app for the status of the Spark job, and also displays the results that are passed back (the name of the results index created by the Spark job).
  • Celery task: The task that handles running the Spark job asynchronously. It simply calls spark-submit locally from the command line with the appropriate arguments. We recognize that this code could easily be generalized to send a command to a remote Spark cluster over SSH.
  • Redis: Redis is used as the Celery broker. It’s how the Flask app and the Celery tasks communicate with each other. We use Elasticsearch as something of a broker as well, as shown below.
  • Apache Spark: Spark does the analytics heavy lifting. While the lifting isn’t too cumbersome in this case, it will be easy to see how the technique would generalize to much larger datasets and more extensive analytics tasks.
  • Elasticsearch: Elasticsearch serves two functions here. The first is as a datastore. That is, after the analyzed data is loaded into an ES index, the Spark job finishes with the creation of a new index for the results. Elasticsearch also serves as our ad hoc message broker. Redis is typically the broker for communicating job status back to the web application. However, because status updates need to come from the Spark application (which has no knowledge of the Flask application, or even Celery), we find it to be simple and elegant to use Elasticsearch to relay messages. An additional benefit is that it’s relatively easy to track the history of the Spark jobs.

 

Provision the Virtual Machine

Before we go any further, let’s get the app up and running on an Ubuntu virtual machine using the same instructions given in a previous article in this series, Building an Elasticsearch Index with Python on an Ubuntu VM. You might be thinking that it might be more efficient to use a headless Vagrant virtual machine, but our design requires keeping multiple terminal tabs open at once. Therefore, this alternative might be a bit overwhelming on a architecture that already has numerous moving parts. In addition, we continue to build on the previous segments in this tutorial. If you want to follow along using a Vagrant VM, the same code will work in that environment.

Using the instructions in Step 1 of Building an Elasticsearch Index with Python on an Ubuntu VM, we set up an Ubuntu 14 VirtualBox virtual machine with 6144 MB of RAM and 4 processors. If you follow this approach, then you might find these commands to be useful.

 

Get the Code

After you finish setting up the VM, ensure that you’ve installed Git and run this from the terminal:

cd ~
git clone https://github.com/sloanahrens/qbox-blog-code.git

NOTE: If you choose not to clone the repo into the root directory of your OS user, then you’ll have to make a few code changes to make things work.

 

Install Elasticsearch

Now, go to the directory for this tutorial and run the Elasticsearch setup script:

cd qbox-blog-code/ch_6_toy_saas/
./setup_elasticsearch.sh

We recommend that you verify a successful installation using cURL:

curl localhost:9200

which should result in the following:

sloan@sloan-vb:~/qbox-blog-code/ch_6_toy_saas$ curl localhost:9200
{
  "status" : 200,
  "name" : "Enforcer",
  "cluster_name" : "elasticsearch",
  "version" : {
    "number" : "1.5.1",
    "build_hash" : "5e38401bc4e4388537a615569ac60925788e1cf4",
    "build_timestamp" : "2015-04-09T13:41:35Z",
    "build_snapshot" : false,
    "lucene_version" : "4.10.4"
  },
  "tagline" : "You Know, for Search"
}

 

Install Spark

Ensuring that you’re in the ch_6_toy_saas directory, install Apache Spark with this script, which also installs the Elasticsearch-Hadoop adapter jar file.:

./setup_spark.sh

Confirm the installation is a success by running:

cd ~/spark
./bin/pyspark

You’re ready to proceed if you get something like this:

...
15/06/06 18:58:31 INFO BlockManagerMaster: Registered BlockManager
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.3.1
      /_/
Using Python version 2.7.6 (default, Mar 22 2014 22:59:56)
SparkContext available as sc, HiveContext available as sqlContext.
>>>

 

Exit the Spark Python interpreter by entering exit().

 

The Python Environment (Virtualenv)

The final bit of necessary infrastructure is the Python virtualenv, which we need to run the application. We provide a script for this as well, which you can run:

cd ~/qbox-blog-code/ch_6_toy_saas/
./setup_python.sh

 

Build the Titanic Index

The app will be using the Titanic index given in the article, Building an Elasticsearch Index with Python on an Ubuntu VM, and we refer you to that post for a description. To build the index, execute the following sequence:

cd ~/qbox-blog-code/ch_6_toy_saas/
. venv/bin/activate
python build_titanic_index.py
deactivate

We can test to ensure that the index was built by querying Elasticsearch:

POST /titanic/_search
{
   "query": {
      "match_all": {}
   },
   "sort": [
      {
         "ticket": {
            "order": "asc"
         }
      }
   ]
}

The results:

{
   "took": 7,
   "timed_out": false,
   "_shards": {
      "total": 1,
      "successful": 1,
      "failed": 0
   },
   "hits": {
      "total": 418,
      "max_score": null,
      "hits": [
         {
...
}

Running the App

To run the application, we need to open three different terminal tabs. We’ll run a different script in each one.

First Tab: Run Redis

Run Redis in the first tab, using the script from Grinberg’s repo:

./run_redis.sh

It’s running when you see this:

                _._                                                  
           _.-``__ ''-._                                             
      _.-``    `.  `_.  ''-._           Redis 3.0.2 (41e84941/1) 64 bit
  .-`` .-```.  ```\/    _.,_ ''-._                                   
 (    '      ,       .-`  | `,    )     Running in standalone mode
 |`-._`-...-` __...-.``-._|'` _.-'|     Port: 6379
 |    `-._   `._    /     _.-'    |     PID: 26704
  `-._    `-._  `-./  _.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |           http://redis.io        
  `-._    `-._`-.__.-'_.-'    _.-'                                   
 |`-._`-._    `-.__.-'    _.-'_.-'|                                  
 |    `-._`-._        _.-'_.-'    |                                  
  `-._    `-._`-.__.-'_.-'    _.-'                                   
      `-._    `-.__.-'    _.-'                                       
          `-._        _.-'                                           
              `-.__.-'                                               
26704:M 06 Jun 19:44:23.871 # Server started, Redis version 3.0.2

 

Second tab: Celery Worker

Open a second terminal tab, and run the celery worker with:

./run_celery.sh

You’ll see this:

 -------------- celery@sloan-vb v3.1.17 (Cipater)
---- **** ----- 
--- * ***  * -- Linux-3.13.0-32-generic-x86_64-with-Ubuntu-14.04-trusty
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         flaskapp:0x7ffb9cb37110
- ** ---------- .> transport:   redis://localhost:6379/0
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
[tasks]
  . flaskapp.spark_job_task
[2015-06-06 19:45:37,402: INFO/MainProcess] Connected to redis://localhost:6379/0
[2015-06-06 19:45:37,412: INFO/MainProcess] mingle: searching for neighbors
[2015-06-06 19:45:38,419: INFO/MainProcess] mingle: all alone
[2015-06-06 19:45:38,433: WARNING/MainProcess] celery@sloan-vb ready.

 

Third tab: the Python Flask App

Also, let’s spin up the Flask app in a third tab:

./run_flask.sh

You should see something like:

sloan@sloan-vb:~/qbox-blog-code/ch_6_toy_saas$ ./run_flask.sh 
 * Running on http://127.0.0.1:5000/
 * Restarting with reloader

 

Run a Spark job

Now that the app is running locally in the VM, you can access it from the browser at http://localhost:5000/. It will look like this:

elasticsearch apache spark python hosted elasticsearch machine learning celery flask

Click the Start Spark Job button, and you should see a progress indicator. If you look at the terminal tab running Celery, you should see the Spark output. When the job is done, you’ll see this result:

15/06/06 19:51:55 WARN EsOutputFormat: Cannot determine task id...
15/06/06 19:51:55 INFO PythonRDD: Times: total = 9, boot = 4, init = 4, finish = 1
15/06/06 19:51:55 INFO Executor: Finished task 0.0 in stage 45.0 (TID 34). 1927 bytes result sent to driver
15/06/06 19:51:55 INFO TaskSetManager: Finished task 0.0 in stage 45.0 (TID 34) in 113 ms on localhost (1/1)
15/06/06 19:51:55 INFO TaskSchedulerImpl: Removed TaskSet 45.0, whose tasks have all completed, from pool 
15/06/06 19:51:55 INFO DAGScheduler: Stage 45 (saveAsNewAPIHadoopFile at PythonRDD.scala:751) finished in 0.113 s
15/06/06 19:51:55 INFO DAGScheduler: Job 23 finished: saveAsNewAPIHadoopFile at PythonRDD.scala:751, took 0.138099 s
[2015-06-06 19:51:56,334: INFO/MainProcess] Task flaskapp.spark_job_task[6c9052a4-2b37-4084-bdda-f84858c89ae8] succeeded in 16.35855026s: {'current': 100, 'status': 'Task completed!', 'total': 100, 'result': 42}

And on the webpage, you should see:

elasticsearch apache spark python hosted elasticsearch machine learning celery flask

Click the link to see the first few results in the computed index, which will be something like the following:

{
  "took" : 4,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 204,
    "max_score" : null,
    "hits" : [ {
      "_index" : "titanic-results-2",
      "_type" : "value-counts",
      "_id" : "AU3Lg7_sfBzHc3wiSyrF",
      "_score" : null,
      "_source":{"count":327,"field":"cabin","val":null},
      "sort" : [ 327 ]
    }, {
      "_index" : "titanic-results-2",

You can start as many Spark tasks as you want by clicking the button multiple times. Tthe script that starts Celery specifies a concurrency of 1, so only one Spark job will run at a time—which is necessary to avoid various errors. In a production-grade design, you might allow several Spark jobs to run concurrently.


How It Works

Now that we have the app working, let’s walk through some of the details of how it works.

Button Click

Look at line 28 and following in the index.html script to see what happens at the click of the webpage button. (This code is a slightly modified version of the code in Grinberg’s repo). A POST request is sent to the relative URL /sparktask. Using jQuery, the start_long_task JavaScript function is bound to the click event on line 81.

 

The sparktask Flask View

The sparktask view starts the spark_job_task Celery task in Line 43 of flaskapp.py. This view then verifies the existence of the spark-jobs Elasticsearch index, creating it if necessary. Next, a document having the same ID as the Celery task is saved to the index, which will be used to communicate progress from the Spark job back to the Flask app.

The spark_job_task Celery Task

The spark_job_task is simple: it merely calls Spark from the command line with the spark-submit command, passing along the necessary arguments. We explain this in Step 5 of the article, Elasticsearch in Apache Spark with Python. Note that the Celery task ID is also passed to Spark as a command line argument.

The Spark Job

The Spark code is similar to what we provide in Elasticsearch in Apache Spark with Python, with some additions. Line 11 of es_spark_test.py reads in the Celery task ID from the command line, and then line 20 and line 45 use this ID to update the status document in the spark-jobs Elasticsearch index. Finally, after execution is complete, line 70 sends a final status update regarding the index containing the results.

Progress Updates

When the initial request is sent from the JavaScript to start the Spark job, the Flask application returns the URL for the status updates, as you can see in line 62 of flaskapp.py. The URL location is returned as a response header. In line 45 of index.html, this URL is passed to another JavaScript function, update_progress, which then uses it to poll for status updates every second (a real-world app would update every few seconds).

The taskstatus Flask view then uses the task ID it was given to load the Celery task (line 68 of flaskapp.py). The task itself is verified to ensure no errors on the Celery side, and discovery of an error will cause an exception message to be passed back (of course, this would be handled differently in a production application).

Otherwise, the task ID is used to retrieve the status document from Elasticsearch and its contents are passed back in the JSON response. Finally, line 61 of index.html will execute when the task status is either SUCCESS or FAILURE. If all has gone well, at least one result will be found in the ES index containing the analytics results. Also, a URL is built for the user to click and run a simple query to show a few of the analytics results.

That’ll do it for this article, in which we’ve gone through many steps necessary to setup a tutorial SaaS application, an app that can launch asynchronous Apache Spark jobs from a web application using Flask, Celery, and Elasticsearch. We welcome your feedback in the comments.