In this article, we continue the work from Deploying Elasticsearch and Apache Spark to the Cloud, Machine-Learning Series, Part 3.

In previous posts, we've gone through a number of steps for creating a basic infrastructure for large-scale data analytics using Apache Spark and Elasticsearch clusters in the cloud. In this post will use that infrastructure to do a task that is common in machine-learning and data mining: a task known as sparse matrix multiplication.


Introductory note: Sloan Ahrens is a co-founder of Qbox who is now a freelance data consultant. This is the fourth in a series of guest posts, in which he demonstrates how to set up a large scale machine learning infrastructure using Apache Spark and Elasticsearch. -Mark Brandon">


Let's begin with a review of what we've done so far in this series:

Here in this article, we make the assumption that the reader has gone through each of these tutorials, or can otherwise accomplish the prerequisites given there.

Matrix multiplication is found in many machine-learning and data mining applications. For example, one formulation of PageRank involves power iteration, in which repetitious multiplication of a vector by a matrix results in convergence of the vector to a stable value (the principle eigenvector of the matrix)—where the vector is a list of the PageRanks of every website and the matrix is the adjacency matrix of the entire World Wide Web. This is a gigantic matrix, but it is also very sparse, which means that most of its entries are zero. If none of this makes any sense to you, don't worry. You can still gain some insight into using Elasticsearch and Spark together.

The algorithm that we use here comes from a Coursera class. You can find the algorithm in Chapter 2 here: http://www.mmds.org/. It's a two-stage MapReduce algorithm that we adapt to Spark. We'll use random sparse matrices that are stored in Elasticsearch. For simplicity, the code assumes that both input matrices are NxN, but you could easily adapt the code to the more general case of an NxM matrix multiplication with an MxR matrix. As before, we'll use Python. As always, the code is available in the GitHub repository. Let's continue, shall we?

Step 1: Setting Up a Qbox Cluster

To proceed, we need an Elasticsearch endpoint. As in the previous article in this series, we'll use a hosted Elasticsearch cluster from Qbox. Easy instructions for setting up a new cluster can be found here.

As before, we use a two-node cluster on the smallest available node size in the us-west-2 (Oregon) EC2 data center. The datasets that we use are not that large, so this should be plenty. The endpoint for my Elasticsearch cluster was 23ca3ca1db3fc430000.qbox.io, which we will use below. If you don't want to use Qbox, then you will need to find some other way to get an Elasticsearch endoint.

Step 2: Generating Random Matrices

We need to construct some matrices so that we can multiply them. One way to get them is to simply generate some with random elements. We're going to generate sparse matrices of varying sizes for testing our Spark code. You could use the following Python code to generate a sparse matrix with random entries in random places:

def createMatrixIndex(es_client, index_name, shards):
    if es_client.indices.exists(index_name):
        print("deleting '%s' index..." % (index_name))
        print(es_client.indices.delete(index = index_name, ignore=[400, 404]))
    request_body = {
        "settings" : {
            "number_of_shards": shards,
            "number_of_replicas": 1
        }
    }
    print("creating '%s' index..." % (index_name))
    print(es_client.indices.create(index = index_name, body = request_body))
def createRandomSparseMatrix(es_client, index_name, N, elem_range, shards, D):
    createMatrixIndex(es_client, index_name, shards)
    bulk_data = [] 
    num_of_elements = int(round(D * N**2))
    for elem_num in xrange(num_of_elements):
        # generate random row and column indices, and element value
        i = randint(1, N)
        j = randint(1, N)
        cell_val = randrange(-elem_range, elem_range, 1)
        # only store non-zero values
        if cell_val == 0:
            continue
        # use the ES bulk API
        bulk_data.append({
            "index": {
                "_index": index_name, 
                "_type": 'elem', 
                "_id": '%s-%s' % (i,j)
            }
        })
        bulk_data.append({
            'row': i,
            'col': j,
            'val': cell_val
        })
        if len(bulk_data) > 10000:
            res = es_client.bulk(index=index_name,body=bulk_data,refresh=True)
            bulk_data = []
    if len(bulk_data) > 0:
        res = es_client.bulk(index=index_name,body=bulk_data,refresh=True)

N is the number of rows and columns, and D is the density of the matrix, a number between zero and one. The density is the proportion of elements that are non-zero (one minus the sparsity). In this case, the input value of D is a target value; the resulting matrix will have a density approximately equal to the target value, for reasons we'll see in a moment.

The first thing that the code given above does is create an index to hold the generated values. We only want to store non-zero values—to save space. The number of values that we generate is determined by the density times the size of the matrix—or, in Python, D*N**2, rounded to the nearest integer.

There is also a loop for generating matrix elements. We could do this with a double loop from 1 to N, but for large N this becomes unwieldy. Since there is no reason to loop through all the elements we are not going to create, the code handles this another way. It loops only through the target number of elements, then randomly generates a row index, a column index, and a value for each one. Now there is nothing to stop a particular element from being generated more than once, but the matrix can only have one value per element. To deal with this, when indexing the element, we can set its id to -; this way, the value of the element that ends up in the index will be the last one that is indexed (ES will simply overwrite the previous document with that id). This means that the density given as input ends up being an upper bound on the density of the matrix actually created.

While I was testing this code, I wanted to come up with a way to scale up N without scaling up the number of elements too drastically so I could try a large range of values of N without having to spend a lot of money provisioning large clusters. I found that this formula works pretty well:

D = log(N, 2)**4 / N**2

To vary the number of elements slightly, we can just multiply the expression on the right by a constant, if necessary. So, for a given value of N, we can generate our two random matrices A and B, as well as an empty index to hold the product C, using the following:

D = log(N, 2)**4 / N**2 
createMatrixIndex(es_client, 'matrix-c', shards) createRandomSparseMatrix(es_client, 'matrix-a', N, 10, shards, D) createRandomSparseMatrix(es_client, 'matrix-b', N, 10, shards, D)

It's simple to use a series of nice "round" values of N, and we find that the progression 1e2, 3e2, 1e3, 3e3, ... works well. You can implement it, up to 3e8, with the following:

n_vals = sorted( [10**(p+2) for p in xrange(7) + [3*10**(p+2) for p in xrange(7)] )

To run the Spark code (coming up next) from this Python script, we can use the system function, together with a few convenience variables. Also notice that we are passing N to Spark via the command line:

master_path = 'local[4]' 
jar_path = '~/spark/jars/elasticsearch-hadoop-2.1.0.Beta2.jar' 
code_path = '~/qbox-blog-code/ch_4_matmult/es_spark_mm.py' system("~/spark/bin/spark-submit --master %s --jars %s %s %s" % (master_path, jar_path, code_path, N))

NOTE: If you need an explanation of what's going on in the system command, we encourage you to take a look at previous tutorials.

So, putting it all together, we have:

from os import system from math import sqrt, log, ceil from time import time from random import seed, randrange, randint from elasticsearch import Elasticsearch ES_HOST = { 
"host" : "23ca3ca1db3fc430000.qbox.io", 
"port" : 80 }
if __name__ == '__main__':
shards = 5
    seed(time())
    es_client = Elasticsearch(hosts = [ES_HOST])
    n_vals = sorted( [10**(p+2) for p in xrange(7)] + [3*10**(p+2) for p in xrange(7)] )
    for N in n_vals:
        start_time = time()
        D = log(N, 2)**4 / N**2
        print('\nN = %s\nD* = %s' % (N, D))
        createMatrixIndex(es_client, 'matrix-c', shards)
        createRandomSparseMatrix(es_client, 'matrix-a', N, 10, shards, D)
        createRandomSparseMatrix(es_client, 'matrix-b', N, 10, shards, D)
        matA_count = es_client.count(index='matrix-a', doc_type='elem')['count']
        matB_count = es_client.count(index='matrix-b', doc_type='elem')['count']
        D = (matA_count + matB_count) / float(2 * N**2)
        print('N = %s' % N)
        print('N^2 = %s' % N**2)
        print('D = %s' % D)
        print('D * N^2 = %s' % int(round(D * N**2)))
        elapsed = round(time() - start_time, 2)
        print("--- %s seconds ---" % elapsed)
        # master_path = 'spark://ec2-54-149-165-224.us-west-2.compute.amazonaws.com:7077'
        master_path = 'local[4]'
        jar_path = '~/spark/jars/elasticsearch-hadoop-2.1.0.Beta2.jar'
        code_path = '~/qbox-blog-code/ch_4_matmult/es_spark_mm.py'
        system("~/spark/bin/spark-submit --master %s --jars %s %s %s" % (master_path, jar_path, code_path, N))


This code above is a slight simplification of the version of the code that we provide in here.

NOTE: We have to decide how many shards we want to use in our indexes. The Elasticsearch-Hadoop adapter uses the number of index shards in determining how many tasks to assign to a computation on an RDD (Resilient Distributed Dataset). This can be an important way to tune performance. We use 5 for this project, but you may want to increase this number for large datasets.

Matrix Multiplication with Spark

The algorithm we'll be using is a two-pass matrix multiplication algorithm for MapReduce. The matrices A and B divide into rectangular "chunks", and elements in each chunk map into G groups (so each element is replicated G times). Each reducer will get a chunk of matrix A and a chunk of matrix B and will use them to compute partial sums corresponding to elements in the matrix product C. The second MapReduce job adds up all the partial sums, thereby computing the final values of the elements of matrix C.

Let's take this one step at a time. The first thing we need to do is read our two matrices--stored in Elasticsearch indexes—into Spark RDDs. Since we did this previously in this series, we don't repeat that here. Because we'll use this data more than once, we also want to cache it in memory so we only have to read it once:

# settings for connecting ES to Spark RDD 
es_conf = { 
"es.net.proxy.http.host" : "23ca3ca1db3fc430000.qbox.io", "es.net.proxy.http.port": "80", 
"es.nodes.discovery": "false"
} 
# read matrix A, cache in memory 
es_conf["es.resource"] = "matrix-a/elem" 
matA_rdd = sc.newAPIHadoopRDD( inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_conf).cache() 
# read matrix B, cache in memory es_conf["es.resource"] = "matrix-b/elem" matB_rdd = sc.newAPIHadoopRDD( inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", keyClass="org.apache.hadoop.io.NullWritable", valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", conf=es_conf).cache()

Next we need to do a couple of calculations. If we know N (which is passed to Spark as a command line argument), then we can determine the value of D and the number of elements in the matrices. We also have to decide what our replication factor G is should be. We won't take up space here providing a mathematical proof, but we find that the formula for G below works quite well. (You may want to experiment with different values.)

matA_count = matA_rdd.count() 
matB_count = matB_rdd.count() 
# D is the average density of the input matrices 
D = (matA_count + matB_count) / float(2 * N**2) 
# G is the replication factor 
G = int(round(sqrt(sqrt(D * N**2 / 2)))) 
# GRP_SIZE is the number of rows/cols in each grouping 
GRP_SIZE = int(ceil(N / float(G)))

GRP_SIZE is just a convenience variable that will be used in the map function.

Now we need to do the first mapping. Let's take a look at the grouping map function for matrix A:

# maps an element in matrix A to the appropriate groups 
def group_mapper_A(item): 
    row = item[1]['row'] 
    col = item[1]['col'] 
    val = item[1]['val'] 
    i_grp = int(ceil(row / float(GRP_SIZE))) 
# the factor of 2 turns out to reduce communication costs 
j_grp = int(ceil(2 * col / float(GRP_SIZE))) 
return [( (i_grp, j_grp, k + 1), ('A', row, col, val) ) for k in xrange(G + 1)]

First, the function decides to which "row group" and "column group" an element belongs, then replicates it to G groups. These G replications emit as a single list via a Python list comprehension. The factor of 2 indicates that we divide the matrix columns into twice as many groups as the rows are divided into, which reduces communication costs and consequently speeds up the computation. (You can prove to yourself that it works by running the task with and without the factor of 2. You will find that it does, in fact, speed up the computation.)

The items emitted have key tuples consisting of the three-way grouping indices, and value tuples consisting of the name of the matrix, and the row, column, and value of the element.

The map function for matrix B is similar:

# maps an element in matrix B to the appropriate groups 
def group_mapper_B(item): 
    row = item[1]['row'] 
    col = item[1]['col'] 
    val = item[1]['val'] 
# the factor of 2 turns out to reduce communication costs 
j_grp = int(ceil(2 * row / float(GRP_SIZE))) 
k_grp = int(ceil(col / float(GRP_SIZE))) 
return [( (i + 1, j_grp, k_grp), ('B', row, col, val) ) for i in xrange(G + 1)]

Since these map functions emit lists of results, we will apply them with Spark's flatMap function so that we end up with a "flat" sequence of items rather than with a sequence of lists of items. We can use the map functions as follows and then combine the results into a single dataset using the union function:

# map A and B to the appropriate groups 
A_groups = matA_rdd.flatMap(group_mapper_A) 
B_groups = matB_rdd.flatMap(group_mapper_B) 
# union the results 
mapped_union = A_groups.union(B_groups)

The next step is to compute the partial sums for each group and each element of the product matrix C. We will group the elements in our dataset by key with Spark's groupByKey function, then apply our partial sum mapping function with another application of flatMap:

<code># get partial sums for elements of C 
partial_results = mapped_union.groupByKey().flatMap(partialSums)</code>

The partialSums map function looks like this:

# computes the partial sums corresponding to the elements of C that can be 
# calculated from the elements in the given group only emits non-zero elements
def partialSums(item):
    partials = {}
    for elem in item[1]:
        if elem[0] == 'B': 
            continue
        A_row = elem[1]
        A_col = elem[2]
        A_val = elem[3]
        for elem in item[1]:
            if elem[0] == 'A':
                continue
            B_row = elem[1]
            B_col = elem[2]
            B_val = elem[3]
            if A_col == B_row:
                group = partials.setdefault((A_row, B_col), [])
                group.append(A_val * B_val)
    partial_sums = [(key, sum(partials[key])) for key in partials.keys()]
    return [item for item in partial_sums if item[1] != 0]

This function runs a double loop through the elements it is given, selecting an element of matrix A and an element of matrix <strong>B</strong>. If the two elements "match" (correspond to a computation needed for an element of C), the product of A and B is inserted in a Python dictionary. The keys of the dictionary correspond to elements of the product matrix C.

Following computation of all the necessary products of input elements, the results are summed and emitted (after first filtering out the unnecessary zero values). After computing the partial sums, we can simply sum up our results by key (remember that the keys are (, ) tuples corresponding to elements of the product matrix C), and filter out zero values:

# now reduce the groups by summing up the partial sums for each element, discarding zeros 
matrix_C = partial_results.reduceByKey(lambda a,b: a+b).filter(lambda item: item[1] != 0)

At this point, the calculation is technically complete. But, we want to save our results back to Elasticsearch, so we need to put them in an appropriate format—and cache them, since the results will be used to calculate some statistics as well:

# map to docs appropriate for ES, cache results 
result_docs = matrix_C.map(lambda item: 
    ('%s-%s' % (item[0][0],item[0][1]), 
        { 'row': item[0][0], 
          'col': item[0][1], 
          'val': item[1] 
        }
    )
).cache()

Now we can write the results out to the Elasticsearch index we created for that purpose:

# write results out to ES 
es_conf["es.resource"] = "matrix-c/elem" 
result_docs.saveAsNewAPIHadoopFile(
    path='-', outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_conf
)

NOTE: The code above creates a useful id for the elements of matrix C, exactly like the ids we created for the input matrices. Unfortunately, however, the Elasticsearch-Hadoop adapter ignores these keys. Perhaps there will be a fix in a future release of the adapter.

There is a bit more to the Spark code that I'm not going to go through in detail here, but you can take a look at es_spark_mm.py in the repo for the details. Some statistics for the calculation are computed and saved to another Elasticsearch index, which is useful for analysis (and a few results are printed to the command window as well). Also, the file contains some commented-out code that prints the input and output matrices to the command window. This is useful for doing a sanity check, as we'll see momentarily.

Sanity Check

To gain assurance that the code is actually working toward our goal, let's check the results against R, using RStudio.

We can un-comment the code in es_spark_mm.py that prints our matrices to the command window:

# # this section is a way to print out the matrices validation
# # can only be used with small matrices, for obvious reasons
# ##########################
# matA = matA_rdd.map(lambda i: ((i[1]['row'],i[1]['col']), i[1]['val'])).collect()
# matB = matB_rdd.map(lambda i: ((i[1]['row'],i[1]['col']), i[1]['val'])).collect()
# matC = matrix_C.collect()
# def print_matrix(A):
#     matrix = [[0 for i in range(N)] for j in range(N)]
#     for result in A:
#         row = result[0][0]
#         col = result[0][1]
#         matrix[row-1][col-1] = result[1]
#     for i in range(N):
#         print(','.join([str(matrix[i][j]) for j in range(N)]) + ',')
# print('A:')
# print_matrix(matA)
# print('B:')
# print_matrix(matB)
# print('C:')
# print_matrix(matC)
# ##########################

We can run the code for a small matrix. Let's use N = 10 and D = 1. Simply change line 79 of random_mm.py to for N in [10]:, and change line 83 to D = 1. We then run the code with:

cd ~/qbox-blog-code/ch_4_matmult # or equivalent python random_mm.py

Your matrices will be different, since they are random. Here's ours:

matrices_cw

When we copy the two input matrices into appropriate R code and run the multiplication, the results match:

matrices_r-1

Deploying to EC2

Finally, we can deploy the code to Amazon EC2. In the previous article, we cover the deployment process in detail. So, we don't spend too much time on it here, except to present the specific code and make a couple of points. You may want to fork the qbox-blog-code repository to make things easier.

You can find the deployment code here. First we need to set our environment variables (for 5 medium slave nodes, in this case):

# set environment variables for use with Spark deployment script: export CLUSTER=sparkcluster export 
INSTANCE=t2.medium export 
REGION=us-west-2 
export NODES=5 
export AWS_ACCESS_KEY_ID=<your_access_key> 
export AWS_SECRET_ACCESS_KEY=<your_secret_access_key>

Then we can provision our Spark cluster as follows:

cd ~ 
# launch a cluster ./spark/ec2/spark-ec2 -k ubuntu-spark -i ~/.ssh/id_rsa.pub -s $NODES -r $REGION -t $INSTANCE launch $CLUSTER

Once the cluster launches, we need to get the public DNS of the cluster master. Our was ec2-54-149-97-244.us-west-2.compute.amazonaws.com. So that our cluster can communicate, we need to change line 107 of random_mm.py from the value needed to run locally (master_path = 'local[4]') to the value we need to run our cluster in EC2 (7077 is the default port used by the master). The new value is:

master_path = 'spark://ec2-54-149-97-244.us-west-2.compute.amazonaws.com:7077'

If you forked the repository, you'll want to push this change so that you can clone the updated code onto the master for your cluster. Now we can SSH into the master:

# log in to cluster 
./spark/ec2/spark-ec2 -k ubuntu-spark -i ~/.ssh/id_rsa.pub -r $REGION login $CLUSTER

Next, run a little bit of setup code:

# create jars directory 
mkdir spark/jars; 
cd spark/jars 
# get elasticsearch-hadoop jar 
wget <a href="http://central.maven.org/maven2/org/elasticsearch/elasticsearch-hadoop/2.1.0.Beta2/elasticsearch-hadoop-2.1.0.Beta2.jar">http://central.maven.org/maven2/org/elasticsearch/...</a> 
# get the code 
cd ~ 
git clone <a href="https://github.com/sloanahrens/qbox-blog-code.git">https://github.com/sloanahrens/qbox-blog-code.git 
</a># install pip and the python ES client 
sudo yum -y install python-setuptools 
sudo easy_install pip 
sudo pip install elasticsearch

Now we are finally ready to run the code, which can be done as follows:

# run the script 
python ~/qbox-blog-code/ch_4_matmult/random_mm.py

Once we're finished, we can run the following command to terminate the Spark cluster:

# terminate cluster 
./spark/ec2/spark-ec2 -k ubuntu-spark -i ~/.ssh/id_rsa.pub -r $REGION destroy $CLUSTER

Below you can see some results from running the code in three different scenarios. First, we ran it locally in our VM (with four cores). Then we ran it on EC2 using t2.medium instance sizes: first with 5 slave nodes, and then again with 10. In the second case, we change the number of shards in the Elasticsearch indexes to 10 as well—in this way we don't have to tell Spark to use more tasks than the number of shards.

The plot shows the count of non-zero elements of matrix A versus the number of seconds the computation took to run. Matrix B will be comparable. You can find the JavaScript for creating this plot n the repo.

chart_results

Surely you'll agree that cover a lot of ground in this article, so we'll bring Part 4 to a close.

We invite you to continue to the next article in this series, Rectangular Matrix Multiplication with Elasticsearch and Apache Spark—Machine Learning Series, Part 5. There, among other task, we'll work through a slight generalization of this technique to allow for non-square matrix multiplication.


comments powered by Disqus