Matei Zaharia / Spark

Spark

New: A detailed Spark technical report is published.
New: A Spark homepage is up at www.spark-project.org.

Spark is a cluster computing framework built on top of Mesos. It was originally designed to support parallel machine learning jobs, but can be useful for other applications as well. A short paper on Spark appeared at HotCloud 2010, while more details are available in the Spark technical report.

Spark's programming model is based on two constructs: parallel loops over "distributed datasets", and a limited set of types of shared variables that can be accessed from tasks running on different machines. The goal is to expose an interface similar to the global address space programming model of tools like OpenMP, but to limit the types of shared variables to those that are easy to implement in a distributed system. The types of shared variables currently supported are read-only variables and "accumulators" whose value can be updated through an associative operation like addition. An important feature distinguishing Spark from "data flow" frameworks like Hadoop and Dryad is that distributed datasets can be reused across different parallel loops. This is especially useful for jobs that need to load a dataset into memory and use it across a series of iterations.

Spark is implemented in Scala, a statically typed functional/OO language that runs on the JVM. Spark uses Scala features to let users write parallel jobs through syntactic sugar in a regular Scala program, similar to DryadLINQ. The easiest way to explain is through some examples:

Example 1: Estimating π

A simple way to estimate Pi is to sample random points in the unit square ((-1,-1) to (1,1)) and count how many of them fall in the unit circle. The fraction of points that fall in should be roughly Pi/4, because the area of the unit circle is Pi and the area of the unit square is 4. Below is some serial Scala code that estimates Pi this way:

var count = 0
for (i <- 1 to 10000) {
  val x = Math.random * 2 - 1
  val y = Math.random * 2 - 1
  if (x*x + y*y < 1) count += 1
}
println("Pi is roughly " + 4 * count / 10000.0)

To implement the same algorithm in parallel using Spark, we can change the code to the following:

val spark = new SparkContext(<Mesos master>)
var count = spark.accumulator(0)
for (i <- spark.parallelize(1 to 10000, 10)) {
  val x = Math.random * 2 - 1
  val y = Math.random * 2 - 1
  if (x*x + y*y < 1) count += 1
}
println("Pi is roughly " + 4 * count.value / 10000.0)

There are four changes from the serial version:

  1. We create a SparkContext to talk to a Mesos master.
  2. The count variable becomes an accumulator initialized at 0.
  3. The for loop is now over spark.parallelize(1 to 10000, 10). This object is a distributed dataset formed by taking the Scala range object 1 to 10000 and breaking it into 10 slices. Spark will run the slices as separate Mesos tasks, potentially on a different machines.
  4. In the print statement, we access count's value through count.value.

This example works because the for loop in Scala is syntactic sugar for calling a foreach method on the object that is being iterated over, passing the loop body as a closure. Therefore, the for loop above translates into:

spark.parallelize(1 to 10000, 10).foreach(i => {...})

where the i => {...} is syntax for a closure. The foreach method of the "distributed dataset" object returned by spark.parallelize takes the closure and serializes it into a sequence of bytes -- this is possible because Scala closures are just Java objects, and will be Serializable as long as they does not reference any non-serializable fields. The serialized closure is passed as an argument to Mesos tasks, along with one slice of the distributed dataset for each task. Finally, accumulators are special objects that only provide += and value methods, and whose whose serialized form includes a unique accumulator ID as well as the "zero" value for the accumulated type. Spark's Mesos executors create a separate copy of the accumulator for each task and initialize it at the zero value. Then, for any successful tasks, they pass the final value of that task's accumulator along with its ID to the user's driver program. The Spark library updates all local accumulators with the deltas from various tasks before returning from the foreach call.

Note that these implementaton choices also mean tasks are idempotent. A task can therefore be run multiple times to recover from failures or to mitigate slow nodes, as in MapReduce.

Example 2: Logistic Regression

In more interesting applications, users probably need to read a data set and potentially transform it before performing calculations on it. For this purpose, Spark provides a second type of distributed dataset -- a file in the Hadoop Distributed File System (HDFS). Currently, only text files are supported. The HDFS file looks to the programmer like a collection of records (in text files, each record is a line). However, operations on it run at the nodes that contain each block of the file, as in MapReduce.

The following code implements logistic regression, an algorithm for finding a separating line between two clusters of labelled data points, as a serial Scala program:

// Read data file and convert it into Point objects
val lines = scala.io.Source.fromFile("data.txt").getLines()
val points = lines.map(x => parsePoint(x))

// Run logistic regression
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) {
  val gradient = Vector.zeros(D)
  for (p <- points) {
    val scale = (1/(1+Math.exp(-p.y*(w dot p.x)))-1)*p.y
    gradient += scale * p.x
  }
  w -= gradient
}
println("Result: " + w)

The parsePoint function, elided in order to save space, transforms a line of text into a DataPoint object p with position p.x (a vector) and label p.y (a scalar equal to 1 or -1). The Vector class is a simple implementation of a vector of doubles that is also elided to save space.

The corresponding parallel program in Spark is the following:

// Read data file and transform it into Point objects
val spark = new SparkContext(<Mesos master>)
val lines = spark.hdfsTextFile("hdfs://.../data.txt")
val points = lines.map(x => parsePoint(x)).cache()

// Run logistic regression
var w = Vector.random(D)
for (i <- 1 to ITERATIONS) {
  val gradient = spark.accumulator(Vector.zeros(D))
  for (p <- points) {
    val scale = (1/(1+Math.exp(-p.y*(w dot p.x)))-1)*p.y
    gradient += scale * p.x
  }
  w -= gradient.value
}
println("Result: " + w)

There are three main changes from the serial version: 1. lines is a HDFS text file accessed as a collection of lines through Spark instead of a local file accessed through scala.io.Source. 2. points is still obtained by calling map on lines, but we also call cache() on it. This tells Mesos to cache the transformed value of each block of the input file at the executor that processes it. This is quite useful for performance because it means that multiple iterations of the outer for loop can reuse the same parsed data points, without having to re-read and re-parse the input file. 3. gradient becomes an accumulator.

Implementation Status

The current implementation of Spark is 5000 lines of Scala code. It supports the example programs above, as well as a feature that has not been shown in these examples called broadcast variables, which allows a read-only variable to be marked as shared across parallel for calls so that it does not need to be sent as part of the task descriptions every time. Broadcast variables are disseminated using an efficient BitTorrent-like mechanism and kept in memory at the executors for use in later tasks.

Open Source

Spark is open source and available for download from GitHub. Please note that Spark is currently in alpha and hence only suitable for early testing.

Related Work

Spark's language integration is similar to that of DryadLINQ, although we provide one additional feature - shared variables in the form of accumulators and cached variables. The major difference from DryadLINQ, MapReduce, and other data-flow frameworks, however, is that Spark allows data to be kept in memory across parallel jobs. In addition, this work was inspired by SMR, a Scala binding for Hadoop by David Hall that also serializes closures implementing to map and reduce functions to run them on a cluster, and also supports for loop integration. Unlike SMR, Spark is a full cluster computing framework that manages its own execution and scheduling. Our support for accumulators and cached variables is also an addition over SMR.

Future Work

In future work on Spark, we plan to pursue some of the following ideas: