A journey to Amazon EMR (and Spark)

A few weeks ago I had to recompute some counters and statistics on most of our database, which represents several hundred of gigabytes. It was the time for us to overcome long-running scripts and to dig a bit further into more efficient solutions.

Amazon EMR provides a managed platform that makes it easy, fast, and cost-effective to process large-scale data across dynamically scalable Amazon EC2 instances, on which you can run several popular distributed frameworks such as Apache Spark. This seemed to be the solution, but it was not clear how to get started: I couldn’t figure out how to get the data into Spark, and configuring the jobs execution was counter-intuitive.

I’m not really used to AWS, and I must admit that the whole documentation is dense. But after a mighty struggle, I finally figured out. This is the “Amazon EMR Spark in 10 minutes” tutorial I would love to have found when I started. Read on to learn how we managed to get Spark doing great things on our dataset.

Feeling the need

Our infrastructure is currently hosted on AWS. We use the Simple Queue Service (SQS) to enqueue and process incoming events thanks to home-made digesters that run on an auto-scalable cluster. These events are first saved in a raw format in a Mongo database, and a few quick calculations take place immediately to render some “real-time” metrics.

Data Flow design at Sqreen using Amazon SQL
Our first data flow design

More computation-heavy tasks run every few minutes or so, using a crontab. This is a fast-to-implement solution that works quite well, but it has several flaws:

  • Apart from the digestion layer, the whole solution is not easily scalable. We begin to hit the limits as our data volume rises with a lot of new clients. Cron jobs take ever more time to complete. As a consequence, we frequently experience issues with long-running scripts that are launched way before the previous instance completes.
  • We still don’t have yet an easy and elegant solution to recompute metrics on historical data. This far we are using a custom script to do so, and sometimes we have to wait for several days until their completion. Can’t exactly talk about sustainability, can we?

Introducing Spark

- What about using Spark?
- What is it? Can you set it up alone? Yeah, why not?

That’s how it all started. I already used Spark a bit, some time ago in a different company, and I was a bit tired of writing quick and dirty python scripts; I was looking for a more robust and scalable solution. I knew that Spark clusters could be a real pain to set up and to maintain (configuring the network, Mesos or YARN, not to mention Hadoop HDFS nor python dependencies…).

In my previous experience, we had almost two people working full-time for a few months just to make sure that everything was working properly and efficiently. Sqreen doesn’t have this type of resource to revolutionize its architecture (that works pretty well so far).

I was looking for some Dockerfiles to setup a cluster easily when a friend of mine told me: “Do you know that you can set a Spark cluster up directly on AWS?”. Amazon EMR was what I was looking for!

Digging into the doc

It may be sometimes difficult to find the information you need especially when you don’t really know what you are looking for.

Even if according to AWS EMR docs it is supposed to be easy as hell to set up and use, digging into some concepts of the AWS platform to understand what I was doing was a bit time-consuming.

So here is the plan:

  • set up the cluster
  • export my input data to a CSV file on S3
  • send my Spark job to the cluster
  • gather the results somewhere on S3

According to many sources, using S3 as the central data exchange platform with the Spark cluster is the easiest and the more efficient way. Since it is completely integrated and there is nothing more to do, it will do just fine for now.

Starting the cluster

There are at least two ways to do so. The AWS interface (available here) and the awscli command line tool (available here).

AWS Elastic Map-Reduce cluster list

AWS Elastic Map-Reduce cluster list

Creating a new cluster via the user interface is quite straightforward. Just click on the Create cluster button and fill the following form. You don’t need to spend any time configuring the cluster, the machines, and the software; it’s all done for you.

Cluster creation form on Amazon EMR for Spark

Cluster creation form

Beware of the security and access section, especially about EMR role and EC2 instance profile. These roles may not be selected by default and you may need to create it. (just run $ aws ear create-default-roles using the awscli tool). I lost a lot of time dealing with this error.

That’s it. A few seconds after running the command, the top entry in your cluster list should look like this:

Starting Spark cluster Amazon EMR

My cluster is starting…

Note that the cluster may take up to 15 minutes before being ready to run your jobs.

We can achieve the exact same result using awscli:

$ aws emr create-cluster \
--name "My cluster" \
--instance-type m3.xlarge \
--release-label emr-5.4.0 \
--instance-count 3\
--use-default-roles \
--applications Name=Spark

Submitting jobs to the cluster

Via the GUI, just click on the Add step button. The following form should pop up:

Configuring first Spark cluster job on Amazon EMR

Configuring my first Spark job

Select a Spark application and type the path to your Spark script and your arguments. Note that the Spark job script needs to be submitted to the master node (and will then be copied on the slave nodes by the Spark platform). I uploaded the script in an S3 bucket to make it immediately available to the EMR platform.

For more complex scripts including dependencies or external libraries, it is possible to embed all of the needed sources into a zip file and submit it to the cluster via the –py-files dependencies.zip argument in the Spark-submit options field.

The same result can be obtained via awscli. The first thing to do is to create a file called step.json and add the description for the step you want to run. You need to tell AWS where your Python script is located and pass any parameters your script may need (in our case, two S3 urls):

[
 {
   "Name": "Spark application - test - 2",
   "Type":"CUSTOM_JAR",
   "Jar":"command-runner.jar",
   "Args":
   [
     "spark-submit",
     "--deploy-mode", "cluster",
     "s3://sqreen-spark/spark.py",
     "s3://sqreen-spark/input.json",
     "s3://sqreen-spark/output-files"
   ]
 }
]

In the same directory, you can now run:

$ aws emr add-steps \
--cluster-id \
--steps file://step.json

Completed Spark Jobs on Amazon EMR

My jobs completed

The computation time went from dozens of minutes to a couple of minutes only. Great first shot! \o/

Gathering results

Gathering results on S3 is almost straightforward. I was used to having the Spark worker write their results in a database as an output. I was a bit surprised when I got the first results on S3. The cluster is creating a folder with multiple files (output-files/part-*) because each partition is saved individually and it is much more efficient to do so in different files.

If you need a single output file (still in a folder), you can split data frame before saving. Using coalesce(1) or repartition(1) may work for small datasets, but it implies that the dataset will all be thrown into one partition on one node. This is likely to throw out memory errors, or at best, process slowly. You may want to reconsider if this is really what you want to do.

In fact, I found it much more efficient to concatenate all of the output files with a simple bash script after gathering all parts from S3 after the Spark job completion.

Celebrating

Here we are. We have been able to setup a scalable Spark cluster, that runs our script within minutes where it would have lasted few hours without it. We are now able to recompute historical data regularly and rapidly in order to extract detection patterns to be applied to new incoming data.

From here we are considering our whole infrastructure again since new paths have appeared. We currently are detecting anomalies in time series with a few minutes delay. What if we could achieve the same result within seconds? What about working with data streaming directly instead of batches of aggregated data? Can we make the detection and alerting of some attacks even more real-time?

Conclusion

This quick tutorial is the result of our first exploratory work using Spark. Using AWS EMR solution seems to be easy to integrate into our framework, but we will now have to answer to a lot more questions:

  • How do we automatize data transfers between our database to S3 and from S3 to database back? Is S3 the only solution for the EMR platform to get data or is there better and more elegant solutions? What is the more efficient data format for our use case?
  • What about automating scripts deployment through a continuous integration flow? For now, we just uploaded the script on S3, but since many developers will have to work on the Spark platform and as the code will become more and more complex, we will need a more robust deployment and testing process.
  • How can we schedule scripts to finally get rid of crontabs in our actual infrastructure? We will need to explore what can be done with complex data workflow. How can we schedule dependencies between Spark jobs? How to handle errors in a complex flow?

Spark may become an essential element in our architecture and a lot of things may need to be rethought.

About the author

Christophe is the lead data scientist at Sqreen. He has a PhD in Computer Science and is passionate about cryptography and deep learning­. He has worked for several successful startups as a back-end engineer and data scientist. When he is not thinking distributively, he dreams of electric sheep.

 

Other useful resources: