Spark
New: A detailed Spark technical report is published.
New: A Spark homepage is up at www.spark-project.org.
Spark is a cluster computing framework built on top of Mesos. It was originally designed to support parallel machine learning jobs, but can be useful for other applications as well. A short paper on Spark appeared at HotCloud 2010, while more details are available in the Spark technical report.
Spark's programming model is based on two constructs: parallel loops over "distributed datasets", and a limited set of types of shared variables that can be accessed from tasks running on different machines. The goal is to expose an interface similar to the global address space programming model of tools like OpenMP, but to limit the types of shared variables to those that are easy to implement in a distributed system. The types of shared variables currently supported are read-only variables and "accumulators" whose value can be updated through an associative operation like addition. An important feature distinguishing Spark from "data flow" frameworks like Hadoop and Dryad is that distributed datasets can be reused across different parallel loops. This is especially useful for jobs that need to load a dataset into memory and use it across a series of iterations.
Spark is implemented in Scala, a statically typed functional/OO language that runs on the JVM. Spark uses Scala features to let users write parallel jobs through syntactic sugar in a regular Scala program, similar to DryadLINQ. The easiest way to explain is through some examples:
Example 1: Estimating π
A simple way to estimate Pi is to sample random points in the unit square ((-1,-1) to (1,1)) and count how many of them fall in the unit circle. The fraction of points that fall in should be roughly Pi/4, because the area of the unit circle is Pi and the area of the unit square is 4. Below is some serial Scala code that estimates Pi this way:
var count = 0
for (i <- 1 to 10000) {
val x = Math.random * 2 - 1
val y = Math.random * 2 - 1
if (x*x + y*y < 1) count += 1
}
println("Pi is roughly " + 4 * count / 10000.0)
To implement the same algorithm in parallel using Spark, we can change the code to the following:
val spark = new SparkContext(<Mesos master>)
var count = spark.accumulator(0)
for (i <- spark.parallelize(1 to 10000, 10)) {
val x = Math.random * 2 - 1
val y = Math.random * 2 - 1
if (x*x + y*y < 1) count += 1
}
println("Pi is roughly " + 4 * count.value / 10000.0)
There are four changes from the serial version:
- We create a
SparkContextto talk to a Mesos master. - The
countvariable becomes an accumulator initialized at 0. - The for loop is now over
spark.parallelize(1 to 10000, 10). This object is a distributed dataset formed by taking the Scala range object1 to 10000and breaking it into 10 slices. Spark will run the slices as separate Mesos tasks, potentially on a different machines. - In the print statement, we access
count's value throughcount.value.
This example works because the for loop in Scala is syntactic sugar for
calling a foreach method on the object that is being iterated over,
passing the loop body as a closure. Therefore, the for loop above
translates into:
spark.parallelize(1 to 10000, 10).foreach(i => {...})
where the i => {...} is syntax for a closure.
The foreach method of the "distributed dataset" object
returned by spark.parallelize takes the closure
and serializes it into a sequence of bytes -- this is possible because Scala
closures are just Java objects, and will be Serializable as long as they
does not reference any non-serializable fields.
The serialized closure is passed as an argument to Mesos tasks, along with one
slice of the distributed dataset for each task.
Finally, accumulators are special objects that only provide += and value
methods, and whose whose serialized form includes
a unique accumulator ID as well as the "zero" value for the accumulated type.
Spark's Mesos executors create a separate copy of the accumulator for each
task and initialize it at the zero value. Then, for any successful tasks,
they pass the final value of that task's accumulator along with its ID
to the user's driver program.
The Spark library updates all local accumulators with
the deltas from various tasks before returning from the foreach call.
Note that these implementaton choices also mean tasks are idempotent. A task can therefore be run multiple times to recover from failures or to mitigate slow nodes, as in MapReduce.
Example 2: Logistic Regression
In more interesting applications, users probably need to read a data set and potentially transform it before performing calculations on it. For this purpose, Spark provides a second type of distributed dataset -- a file in the Hadoop Distributed File System (HDFS). Currently, only text files are supported. The HDFS file looks to the programmer like a collection of records (in text files, each record is a line). However, operations on it run at the nodes that contain each block of the file, as in MapReduce.
The following code implements logistic regression, an algorithm for finding a separating line between two clusters of labelled data points, as a serial Scala program:
// Read data file and convert it into Point objects
val lines = scala.io.Source.fromFile("data.txt").getLines()
val points = lines.map(x => parsePoint(x))
// Run logistic regression
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) {
val gradient = Vector.zeros(D)
for (p <- points) {
val scale = (1/(1+Math.exp(-p.y*(w dot p.x)))-1)*p.y
gradient += scale * p.x
}
w -= gradient
}
println("Result: " + w)
The parsePoint function, elided in order to save space,
transforms a line of text into
a DataPoint object p with position p.x (a vector) and label p.y
(a scalar equal to 1 or -1).
The Vector class is a simple implementation of a vector of doubles
that is also elided to save space.
The corresponding parallel program in Spark is the following:
// Read data file and transform it into Point objects
val spark = new SparkContext(<Mesos master>)
val lines = spark.hdfsTextFile("hdfs://.../data.txt")
val points = lines.map(x => parsePoint(x)).cache()
// Run logistic regression
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) {
val gradient = spark.accumulator(Vector.zeros(D))
for (p <- points) {
val scale = (1/(1+Math.exp(-p.y*(w dot p.x)))-1)*p.y
gradient += scale * p.x
}
w -= gradient.value
}
println("Result: " + w)
There are three main changes from the serial version:
1. lines is a HDFS text file accessed as a collection of lines through
Spark instead of a local file accessed through scala.io.Source.
2. points is still obtained by calling map on lines, but we
also call cache() on it. This tells Mesos to cache the transformed
value of each block of the input file at the executor that processes
it. This is quite useful for performance because it means that
multiple iterations of the outer for loop can reuse the same
parsed data points, without having to re-read and re-parse the
input file.
3. gradient becomes an accumulator.
Implementation Status
The current implementation of Spark is 5000 lines of Scala code. It supports the example programs above, as well as a feature that has not been shown in these examples called broadcast variables, which allows a read-only variable to be marked as shared across parallel for calls so that it does not need to be sent as part of the task descriptions every time. Broadcast variables are disseminated using an efficient BitTorrent-like mechanism and kept in memory at the executors for use in later tasks.
Open Source
Spark is open source and available for download from GitHub. Please note that Spark is currently in alpha and hence only suitable for early testing.
Related Work
Spark's language integration is similar to that of
DryadLINQ,
although we provide one additional feature - shared variables in
the form of accumulators and cached variables.
The major difference from DryadLINQ, MapReduce, and other data-flow
frameworks, however, is that Spark allows data to be kept in memory across
parallel jobs.
In addition, this work was inspired by
SMR,
a Scala binding for Hadoop by David Hall that also serializes closures
implementing to map and reduce functions to run them on a cluster,
and also supports for loop integration.
Unlike SMR, Spark is a full cluster computing framework that manages its
own execution and scheduling.
Our support for accumulators and cached variables is also an addition
over SMR.
Future Work
In future work on Spark, we plan to pursue some of the following ideas:
- More types of shared variables: For example, it would be interesting to add a "weakly consistent monotonic" type for variables such as the current best solution in branch-and-bound search. These variables are used to prune the search, so they do not need to be kept perfectly consistent across tasks. However, new solutions should be broadcast to tasks quickly for efficiency. (This idea was suggested by Eric Brewer.)
- Parallel operations other than
foreachandmap: It may be interesting to support group by and join operations, although we may instead opt to keep the framework simple and specialized for its original purpose (machine learning). - Tweakable fault tolerance: In many machine learning and data mining jobs, it is okay to lose slices of the data, because the job is computing statistics to begin with. This could be used to improve response times when only a small number of tasks fail or run slowly.
- Java API: Spark jobs can be written in Java by subclassing from Scala's classes for closures etc, but it would be nice to allow existing functional programming libraries for Java to be used.