Chapter 4. Spark with Python

Spark is a cluster computing framework that uses in-memory primitives to enable programs to run up to a hundred times faster than Hadoop MapReduce applications. Spark applications consist of a driver program that controls the execution of parallel operations across a cluster. The main programming abstraction provided by Spark is known as Resilient Distributed Datasets (RDDs). RDDs are collections of elements partitioned across the nodes of the cluster that can be operated on in parallel.

Spark was created to run on many platforms and be developed in many languages. Currently, Spark can run on Hadoop 1.0, Hadoop 2.0, Apache Mesos, or a standalone Spark cluster. Spark also natively supports Scala, Java, Python, and R. In addition to these features, Spark can be used interactively from a command-line shell.

This chapter begins with an example Spark script. PySpark is then introduced, and RDDs are described in detail with examples. The chapter concludes with example Spark programs written in Python.

WordCount in PySpark

The code in Example 4-1 implements the WordCount algorithm in PySpark. It assumes that a data file, input.txt, is loaded in HDFS under /user/hduser/input, and the output will be placed in HDFS under /user/hduser/output.

Example 4-1. python/Spark/word_count.py
from pyspark import SparkContext

def main():

   sc = SparkContext(appName='SparkWordCount')

   input_file = sc.textFile('/user/hduser/input/input.txt')
   counts = input_file.flatMap(lambda line: line.split()) \
                     .map(lambda word: (word, 1)) \
                     .reduceByKey(lambda a, b: a + b)
   counts.saveAsTextFile('/user/hduser/output')

   sc.stop()

if __name__ == '__main__':
   main()

To execute the Spark application, pass the name of the file to the spark-submit script:

$ spark-submit --master local word_count.py

While the job is running, a lot of text will be printed to the console. The results of a word_count.py Spark script are displayed in Example 4-2 and can be found in HDFS under /user/hduser/output/part-00000.

Example 4-2. /user/hduser/output/part-00000
(u'be', 2)
(u'jumped', 1)
(u'over', 1)
(u'candlestick', 1)
(u'nimble', 1)
(u'jack', 3)
(u'quick', 1)
(u'the', 1)

WordCount Described

This section describes the transformations being applied in the word_count.py Spark script.

The first statement creates a SparkContext object. This object tells Spark how and where to access a cluster:

sc = SparkContext(appName='SparkWordCount')

The second statement uses the SparkContext to load a file from HDFS and store it in the variable input_file:

input_file = sc.textFile('/user/hduser/input/input.txt')

The third statement performs multiple transformations on the input data. Spark automatically parallelizes these transformations to run across multiple machines:

counts = input_file.flatMap(lambda line: line.split()) \
                   .map(lambda word: (word, 1)) \
                   .reduceByKey(lambda a, b: a + b)

The fourth statement stores the results to HDFS:

counts.saveAsTextFile('/user/hduser/output')

The fifth statement shuts down the SparkContext:

sc.stop()

PySpark

PySpark is Spark’s Python API. PySpark allows Spark applications to be created from an interactive shell or from Python programs.

Before executing any code within Spark, the application must create a SparkContext object. The SparkContext object tells Spark how and where to access a cluster. The master property is a cluster URL that determines where the Spark appliction will run. The most common values for master are:

local
Run Spark with one worker thread.
local[n]
Run Spark with n worker threads.
spark://HOST:PORT
Connect to a Spark standalone cluster.
mesos://HOST:PORT
Connect to a Mesos cluster.

Interactive Shell

In the Spark shell, the SparkContext is created when the shell launches. The SparkContext is held in the variable sc. The master for the interactive shell can be set by using the --master argument when the shell is launched. To start an interactive shell, run the pyspark command:

$ pyspark --master local[4]
...
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.5.0
      /_/
Using Python version 2.7.10 (default, Jul 13 2015 12:05:58)
SparkContext available as sc, HiveContext available as sqlContext.
>>>

For a complete list of options, run pyspark --help.

Self-Contained Applications

Self-contained applications must first create a SparkContext object before using any Spark methods. The master can be set when the SparkContext() method is called:

sc = SparkContext(master='local[4]')

To execute self-contained applications, they must be submitted to the spark-submit script. The spark-submit script contains many options; to see a complete listing, run spark-submit --help from the command line:

$ spark-submit --master local spark_app.py

Resilient Distributed Datasets (RDDs)

Resilient Distributed Datasets (RDDs) are the fundamental programming abstraction in Spark. RDDs are immutable collections of data, partitioned across machines, that enable operations to be performed on elements in parallel. RDDs can be constructed in multiple ways: by parallelizing existing Python collections, by referencing files in an external storage system such as HDFS, or by applying transformations to existing RDDs.

Creating RDDs from Collections

RDDs can be created from a Python collection by calling the SparkContext.parallelize() method. The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. The following example creates a parallelized collection from a Python list:

>>> data = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(data)
>>> rdd.glom().collect()
...
[[1, 2, 3, 4, 5]]

The RDD.glom() method returns a list of all of the elements within each partition, and the RDD.collect() method brings all the elements to the driver node. The result, [[1, 2, 3, 4, 5]], is the original collection within a list.

To specify the number of partitions an RDD should be created with, a second argument can be passed to the parallelize() method. The following example creates an RDD from the same Python collection in the previous example, except this time four partitions are created:

>>> rdd = sc.parallelize(data, 4)
>>> rdd.glom().collect()
...
[[1], [2], [3], [4, 5]]

Using the glom() and collect() methods, the RDD created in this example contains four inner lists: [1], [2], [3], and [4, 5]. The number of inner lists represents the number of partitions within the RDD.

Creating RDDs from External Sources

RDDs can also be created from files using the SparkContext.textFile() method. Spark can read files residing on the local filesystem, any storage source supported by Hadoop, Amazon S3, and so on. Spark supports text files, SequenceFiles, any other Hadoop InputFormat, directories, compressed files, and wildcards, e.g., my/directory/*.txt. The following example creates a distributed dataset from a file located on the local filesystem:

>>> distFile = sc.textFile('data.txt')
>>> distFile.glom().collect()
...
[[u'jack be nimble', u'jack be quick', u'jack jumped over the candlestick']]

As before, the glom() and collect() methods allow the RDD to be displayed in its partitions. This result shows that distFile only has a single partition.

Similar to the parallelize() method, the textFile() method takes a second parameter that specifies the number of partitions to create. The following example creates an RDD with three partitions from the input file:

>>> distFile = sc.textFile('data.txt', 3)
>>> distFile.glom().collect()
...
[[u'jack be nimble', u'jack be quick'], [u'jack jumped over the candlestick'], []]

RDD Operations

RDDs support two types of operations: transformations and actions. Transformations create new datasets from existing ones, and actions run a computation on the dataset and return results to the driver program.

Transformations are lazy: that is, their results are not computed immediately. Instead, Spark remembers all of the transformations applied to a base dataset. Transformations are computed when an action requires a result to be returned to the driver program. This allows Spark to operate efficiently and only transfer the results of the transformations before an action.

By default, transformations may be recomputed each time an action is performed on it. This allows Spark to efficiently utilize memory, but it may utilize more processing resources if the same transformations are constantly being processed. To ensure a transformation is only computed once, the resulting RDD can be persisted in memory using the RDD.cache() method.

RDD Workflow

The general workflow for working with RDDs is as follows:

  1. Create an RDD from a data source.
  2. Apply transformations to an RDD.
  3. Apply actions to an RDD.

The following example uses this workflow to calculate the number of characters in a file:

>>> lines = sc.textFile('data.txt')
>>> line_lengths = lines.map(lambda x: len(x))
>>> document_length = line_lengths.reduce(lambda x,y: x+y)
>>> print document_length
59

The first statement creates an RDD from the external file data.txt. This file is not loaded at this point; the variable lines is just a pointer to the external source. The second statement performs a transformation on the base RDD by using the map() function to calculate the number of characters in each line. The variable line_lengths is not immediately computed due to the laziness of transformations. Finally, the reduce() method is called, which is an action. At this point, Spark divides the computations into tasks to run on separate machines. Each machine runs both the map and reduction on its local data, returning only the results to the driver program.

If the application were to use line_lengths again, it would be best to persist the result of the map transformation to ensure that the map would not be recomputed. The following line will save line_lengths into memory after the first time it is computed:

>>> line_lengths.persist()

Python Lambda Functions

Many of Spark’s transformations and actions require function objects to be passed from the driver program to run on the cluster. The easiest way to define and pass a function is through the use of Python lambda functions.

Lambda functions are anonymous functions (i.e., they do not have a name) that are created at runtime. They can be used wherever function objects are required and are syntactically restricted to a single expression. The following example shows a lambda function that returns the sum of its two arguments:

lambda a, b: a + b

Lambdas are defined by the keyword lambda, followed by a comma-separated list of arguments. A colon separates the function declaration from the function expression. The function expression is a single expression that produces a result for the provided arguments.

In the previous Spark example, the map() function uses the following lambda function:

lambda x: len(x)

This lambda has one argument and returns the length of the argument.

Transformations

Transformations create new datasets from existing ones. Lazy evaluation of transformation allows Spark to remember the set of transformations applied to the base RDD. This enables Spark to optimize the required calculations.

This section describes some of Spark’s most common transformations. For a full listing of transformations, refer to Spark’s Python RDD API doc.

map

The map(func) function returns a new RDD by applying a function, func, to each element of the source. The following example multiplies each element of the source RDD by two:

>>> data = [1, 2, 3, 4, 5, 6]
>>> rdd = sc.parallelize(data)
>>> map_result = rdd.map(lambda x: x * 2)
>>> map_result.collect()
[2, 4, 6, 8, 10, 12]

filter

The filter(func) function returns a new RDD containing only the elements of the source that the supplied function returns as true. The following example returns only the even numbers from the source RDD:

>>> data = [1, 2, 3, 4, 5, 6]
>>> filter_result = rdd.filter(lambda x: x % 2 == 0)
>>> filter_result.collect()
[2, 4, 6]

distinct

The distinct() method returns a new RDD containing only the distinct elements from the source RDD. The following example returns the unique elements in a list:

>>> data = [1, 2, 3, 2, 4, 1]
>>> rdd = sc.parallelize(data)
>>> distinct_result = rdd.distinct()
>>> distinct_result.collect()
[4, 1, 2, 3]

flatMap

The flatMap(func) function is similar to the map() function, except it returns a flattened version of the results. For comparison, the following examples return the original element from the source RDD and its square. The example using the map() function returns the pairs as a list within a list:

>>> data = [1, 2, 3, 4]
>>> rdd = sc.parallelize(data)
>>> map = rdd.map(lambda x: [x, pow(x,2)])
>>> map.collect()
[[1, 1], [2, 4], [3, 9], [4, 16]]

While the flatMap() function concatenates the results, returning a single list:

>>> rdd = sc.parallelize()
>>> flat_map = rdd.flatMap(lambda x: [x, pow(x,2)])
>>> flat_map.collect()
[1, 1, 2, 4, 3, 9, 4, 16]

Actions

Actions cause Spark to compute transformations. After transforms are computed on the cluster, the result is returned to the driver program.

The following section describes some of Spark’s most common actions. For a full listing of actions, refer to Spark’s Python RDD API doc.

reduce

The reduce() method aggregates elements in an RDD using a function, which takes two arguments and returns one. The function used in the reduce method is commutative and associative, ensuring that it can be correctly computed in parallel. The following example returns the product of all of the elements in the RDD:

>>> data = [1, 2, 3]
>>> rdd = sc.parallelize(data)
>>> rdd.reduce(lambda a, b: a * b)
6

take

The take(n) method returns an array with the first n elements of the RDD. The following example returns the first two elements of an RDD:

>>> data = [1, 2, 3]
>>> rdd = sc.parallelize(data)
>>> rdd.take(2)
[1, 2]

collect

The collect() method returns all of the elements of the RDD as an array. The following example returns all of the elements from an RDD:

>>> data = [1, 2, 3, 4, 5]
>>> rdd = sc.parallelize(data)
>>> rdd.collect()
[1, 2, 3, 4, 5]

It is important to note that calling collect() on large datasets could cause the driver to run out of memory. To inspect large RDDs, the take() and collect() methods can be used to inspect the top n elements of a large RDD. The following example will return the first 100 elements of the RDD to the driver:

>>> rdd.take(100).collect()

takeOrdered

The takeOrdered(n, key=func) method returns the first n elements of the RDD, in their natural order, or as specified by the function func. The following example returns the first four elements of the RDD in descending order:

>>> data = [6,1,5,2,4,3]
>>> rdd = sc.parallelize(data)
>>> rdd.takeOrdered(4, lambda s: -s)
[6, 5, 4, 3]

Text Search with PySpark

The text search program searches for movie titles that match a given string (Example 4-3). The movie data is from the groupLens datasets; the application expects this to be stored in HDFS under /user/hduser/input/movies.

Example 4-3. python/Spark/text_search.py
from pyspark import SparkContext
import re
import sys

def main():

   # Insure a search term was supplied at the command line
   if len(sys.argv) != 2:
      sys.stderr.write('Usage: {} <search_term>'.format(sys.argv[0]))
      sys.exit()

   # Create the SparkContext
   sc = SparkContext(appName='SparkWordCount')

   # Broadcast the requested term
   requested_movie = sc.broadcast(sys.argv[1])

   # Load the input file
   source_file = sc.textFile('/user/hduser/input/movies')

   # Get the movie title from the second fields
   titles = source_file.map(lambda line: line.split('|')[1])

   # Create a map of the normalized title to the raw title
   normalized_title = titles.map(lambda title: (re.sub(r'\s*\(\d{4}\)','', title).lower(), title))
   
   # Find all movies matching the requested_movie
   matches = normalized_title.filter(lambda x: requested_movie.value in x[0])

   # Collect all the matching titles
   matching_titles = matches.map(lambda x: x[1]).distinct().collect()

   # Display the result
   print '{} Matching titles found:'.format(len(matching_titles))
   for title in matching_titles:
      print title

   sc.stop()

if __name__ == '__main__':
   main()

The Spark application can be executed by passing to the spark-submit script the name of the program, text_search.py, and the term for which to search. A sample run of the application can be seen here:

$ spark-submit text_search.py gold
...
6 Matching titles found:
GoldenEye (1995)
On Golden Pond (1981)
Ulee's Gold (1997)
City Slickers II: The Legend of Curly's Gold (1994)
Golden Earrings (1947)
Gold Diggers: The Secret of Bear Mountain (1995)
...

Since computing the transformations can be a costly operation, Spark can cache the results of the normalized_titles to memory to speed up future searches. From the example above, to load the normalized_titles into memory, use the cache() method:

normalized_title.cache()

Chapter Summary

This chapter introduced Spark and and PySpark. It described Spark’s main programming abstraction, RDDs, with many examples of dataset transformations. This chapter also contained a Spark application that returned movie titles that matched a given string.

Get Hadoop with Python now with O’Reilly online learning.

O’Reilly members experience live online training, plus books, videos, and digital content from 200+ publishers.