O'Reilly logo

Enterprise Data Workflows with Cascading by Paco Nathan

Stay ahead with the world's most comprehensive technology and business learning platform.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, tutorials, and more.

Start Free Trial

No credit card required

Chapter 4. Scalding—A Scala DSL for Cascading

Why Use Scalding?

Cascading represents a pattern language where we use a “plumbing” metaphor with pipes and operators to build workflows. Looking at sample code in the previous chapter, the Java source requires much more detail than simply pipes and operators. Even so, we can use conceptual flow diagrams to keep track of the plumbing—the actual logic of what is being performed by a workflow. What if we could simply write code at the level of detail in those diagrams?

Scalding is a domain-specific language (DSL) in the Scala programming language, which integrates Cascading. The functional programming paradigm used in Scala is much closer than Java to the original model for MapReduce. Consequently, Scalding source code for workflows has a nearly 1:1 correspondence with the concise visual descriptions in our conceptual flow diagrams. In other words, developers work directly in the plumbing of pipes, where the pattern language becomes immediately visible. That aspect alone brings incredible advantages for software engineering with very large-scale data. Apps written in Java with the Cascading API almost seem like assembly language programming in comparison. Plus, Scala offers other advanced programming models used in large-scale Enterprise work such as the actor model for concurrency.

While Scalding builds on Cascading, other libraries build atop Scalding—including support for type-safe libraries, abstract algebra, very large sparse matrices, etc., which are used to implement distributed algorithms and robust infrastructure for data services. For example, simple operations such as calculating a running median can become hard problems when you are servicing hundreds of millions of customers with tight requirements for service-level agreements (SLAs). A running median is an example of a metric needed in anti-fraud classifiers, social recommenders, customer segmentation, etc. Scalding offers simple, concise ways to implement distributed algorithms for that kind of analysis. Those aspects are particularly important for the scale of operations at firms such as Twitter, eBay, LinkedIn, Etsy, etc., where Scalding is deployed.

Keep in mind that Apache Hadoop is based on the MapReduce research made public by Google nearly a decade ago. MapReduce became an important component of Google’s internal technology for large-scale batch workflows. Meanwhile, Google has continued to evolve its infrastructure; estimates place its current technology stack at least three generations beyond the original MapReduce work. The public sees only portions of that massive R&D effort (e.g., in papers about Dremel, Pregel, etc.).

What becomes clear from the published works is that Google scientists and engineers leverage advanced techniques based on abstract algebra, linear algebra for very large sparse matrices, sketches, etc., to build robust, efficient infrastructure at massive scale. Scalding represents a relatively public view of comparable infrastructure.

Let’s start here with a few simple examples in Scalding. Given a few subtle changes in the code, some of our brief examples can be turned into state-of-the-art parallel processing at scale. For instance, check out the PageRank implementation shown in the Scalding source, and also these sample recommender systems written by Twitter.

Getting Started with Scalding

The best resource for getting started with Scalding is the project wiki page on GitHub.

In addition to Git and Java, which were set up in Chapter 1, you will need to have a few other platforms and tools installed for the examples in this chapter:

Scala
Current version of Scalding works with Scala versions 2.8.1, 2.9.1, 2.9.2.
Simple Build Tool, a.k.a. SBT
Must be version 0.11.3.
Ruby
Required for the scald.rb script; most recent stable release.

Also, be sure to put the executable for sbt in your PATH.

The scald.rb script provides a limited command-line interface (CLI) for Scalding, so that one can conveniently compile and launch apps. Keep in mind that this is not a build system. For any serious work, you are better off using a build tool such as Gradle to create a “fat jar” that includes all the class dependencies that are not available on your Hadoop cluster. More about that later.

Connect somewhere you have space for downloads, and then use Git to clone the latest update from the master branch of the Scalding project on GitHub:

$ git clone git://github.com/twitter/scalding.git

Connect into that newly cloned directory and run the following steps with sbt to get Scalding set up:

$ cd scalding
$ export SCALDING_HOME=`pwd`
$ sbt update
$ sbt test
$ sbt assembly

These commands may take a few minutes to complete. Afterward, be sure to add the Scalding utility script in scripts/scald.rb to your path:

export PATH=`pwd`/scripts:$PATH

At this point, let’s test to see if Scalding is set up properly. The tutorial directory includes code samples, and Tutorial1.scala provides a simplest possible app in Scalding:

import com.twitter.scalding._

class Tutorial1(args : Args) extends Job(args) {
  val input = TextLine("tutorial/data/hello.txt")
  val output = TextLine("tutorial/data/output1.txt")

  input
    .read
    .project('line)
    .write(output)
}

This is comparable with Example 1: Simplest Possible App in Cascading because it copies text lines from one file to another. The example uses text in the tutorial/data/hello.txt sample data file:

$ cat tutorial/data/hello.txt
Hello world
Goodbye world

To run this Scalding code:

$ scald.rb --local tutorial/Tutorial1.scala
12/12/25 09:58:16 INFO property.AppProps: using app.id: \
8A7F63D2D42594F9A1CD9B5DE08100E8
12/12/25 09:58:16 INFO util.Version: Concurrent, Inc - Cascading 2.0.2
12/12/25 09:58:16 INFO flow.Flow: [Tutorial1] starting
12/12/25 09:58:16 INFO flow.Flow: [Tutorial1]
  source: FileTap["TextLine[['num', 'line']->[ALL]]"]["tutorial/data/hello.txt"]"]
12/12/25 09:58:16 INFO flow.Flow: [Tutorial1]
  sink: FileTap["TextLine[['num', 'line']->[ALL]]"]["tutorial/data/output1.txt"]"]
12/12/25 09:58:16 INFO flow.Flow: [Tutorial1]  parallel execution is enabled: true
12/12/25 09:58:16 INFO flow.Flow: [Tutorial1]  starting jobs: 1
12/12/25 09:58:16 INFO flow.Flow: [Tutorial1]  allocating threads: 1
12/12/25 09:58:16 INFO flow.FlowStep: [Tutorial1] starting step: local

Then to confirm the results after the Scalding code has run:

$ cat tutorial/data/output1.txt
Hello world
Goodbye world

If your results look similar, you should be good to go.

Otherwise, if you have any troubles, contact the cascading-user email forum or tweet to @Scalding on Twitter. Very helpful developers are available to assist.

Example 3 in Scalding: Word Count with Customized Operations

First, let’s try a simple app in Scalding. Starting from the “Impatient” source code directory that you cloned in Git, connect into the part8 subdirectory. Then we’ll write a Word Count app in Scalding that includes a token scrub operation, similar to Example 3: Customized Operations:

import com.twitter.scalding._

class Example3(args : Args) extends Job(args) {
  Tsv(args("doc"), ('doc_id, 'text), skipHeader = true)
    .read
    .flatMap('text -> 'token) { text : String => text.split("[ \\[\\]\\(\\),.]") }
    .mapTo('token -> 'token) { token : String => scrub(token) }
    .filter('token) { token : String => token.length > 0 }
    .groupBy('token) { _.size('count) }
    .write(Tsv(args("wc"), writeHeader = true))

  def scrub(token : String) : String = {
    token
      .trim
      .toLowerCase
  }

  override def config(implicit mode: Mode): Map[AnyRef, AnyRef] = {
    // resolves "ClassNotFoundException cascading.*" exception on a cluster
    super.config(mode) ++ Map("cascading.app.appjar.class" -> classOf[Example3])
  }
}

Let’s compare this code for Word Count with the conceptual flow diagram for Example 3: Customized Operations, which is shown in Figure 4-1. The lines of Scalding source code have an almost 1:1 correspondence with the elements in this flow diagram. In other words, Scalding provides an almost pure expression of the DAG for this Cascading flow. This point underscores the expressiveness of the functional programming paradigm.

Conceptual flow diagram for
Figure 4-1. Conceptual flow diagram for Example 3: Customized Operations

Examining this app line by line, the first thing to note is that we extend the Job() base class in Scalding:

class Example3(args : Args) extends Job(args) { ... }

Next, the source tap reads tuples from a data set in TSV format. This expects to have a header, then doc_id and text as the fields. The tap identifier for the data set gets specified by a --doc command-line parameter:

Tsv(args("doc"), ('doc_id, 'text), skipHeader = true)
  .read

The flatMap() function in Scalding is equivalent to a generator in Cascading. It maps each element to a list, then flattens that list—emitting a Cascading result tuple for each item in the returned list. In this case, it splits text into tokens based on RegexSplitGenerator:

  .flatMap('text -> 'token) { text : String => text.split("[ \\[\\]\\(\\),.]") }

In essence, Scalding extends the collections API in Scala. Scala has functional constructs such as map, reduce, filter, etc., built into the language, so the Cascading operations have been integrated as operations on its parallel iterators. In other words, the notion of a pipe in Scalding is the same as a distributed list. That provides a powerful abstraction for large-scale parallel processing. Keep that in mind for later.

The mapTo() function in the next line shows how to call a customized function for scrubbing tokens. This is substantially simpler to do in Scalding:

  .mapTo('token -> 'token) { token : String => scrub(token) }
  .filter('token) { token : String => token.length > 0 }

Defining new functions in Scalding is also much simpler, as the following code snippet shows:

  def scrub(token : String) : String = {
    token
      .trim
      .toLowerCase
  }

A few dozen lines of Java have been replaced by a few lines of Scala. On one hand, that represents an order of magnitude reduction in source code, which is a huge gain. On the other hand, using Java to define this same function allows for finer-grained behaviors. For example, the filter() call was added to create the same semantics that the ScrubFunction implemented—whereas in Java that logic could be specified directly within the operate() method.

Java, as an object-oriented language, is arguably quite good for defining the low-level behaviors of Scala, as a functional programming language. Think of the Java code in Cascading as a kind of “assembly language” for Scalding. Moreover, there is an enormous number of available packages in Java that can be used by Scala. In fact, part of the reason for the scald.rb utility script is to integrate other Java packages into Scalding.

The next line performs the equivalent of a GroupBy aggregation in Cascading, followed by an Every and a Count operation. The size() function in Scala performs the token count:

  .groupBy('token) { _.size('count) }

Finally, the sink tap writes tuples in the output stream to a data set in TSV format, including a header. The tap identifier for the output data set is defined by a --wc command-line parameter:

  .write(Tsv(args("wc"), writeHeader = true))

Also note the configuration override:

  override def config(implicit mode: Mode): Map[AnyRef, AnyRef] = { .. }

This resolves a ClassNotFoundException exception when running Scalding apps as fat jars on a remote Hadoop cluster. As of this writing (2013Q1) Twitter uses sbt to build Scalding apps for lots of deployments. However, other organizations have begun to use Maven, Gradle, etc., and for some reason apps created with these other build systems sometimes see exceptions when running fat jars on remote clusters. This workaround was created by Chris Severs at eBay to resolve the issue. Include it in each Scala class that defines a Scalding workflow.

This code is already in the part8/src/main/scala/ directory, in the Example3.scala file. To build and run:

$ rm -rf output
$ scald.rb --hdfs-local src/main/scala/Example3.scala \
   --doc data/rain.txt --wc output/wc

In the output, the first 10 lines (including the header) should look like this:

$ head output/wc/part-00000
token           count
a               8
air             1
an              1
and             2
area            4
as              2
australia       1
back            1
broken          1

A gist on GitHub shows building and running this app. If your run looks terribly different, something is probably not set up correctly. Ask the developer community for troubleshooting advice.

A Word or Two about Functional Programming

At the mention of functional programming, Java is not quite the first programming language that comes to mind. Cascading, however, with its pattern language and plumbing metaphor, borrows much from the functional programming paradigm. For example, there is no concept of “mutable variables” in the context of a flow—just the stream of data tuples.

Scalding integrates Cascading within Scala, which includes many functional programming features. The name “Scalding” is a portmanteau of SCALa and cascaDING. Formally, Scalding is a DSL embedded in Scala that binds to Cascading. A DSL is a language dedicated to a particular kind of problem and solution. The Scala language was designed in part to support a wide variety of DSLs. The domain for Scalding is about how to express robust, large-scale data workflows that run on parallel processing frameworks, typically for machine learning use cases.

Avi Bryant, author of Scalding, introduced his talk at the Strata 2012 conference with a special recipe:

Start on low heat with a base of Hadoop; map, then reduce. Flavor, to taste, with Scala’s concise, functional syntax and collections library. Simmer with some Pig bones: a tuple model and high-level join and aggregation operators. Mix in Cascading to hold everything together and boil until it’s very, very hot, and you get Scalding, a new API for MapReduce out of Twitter.

Avi Bryant Scala + Cascading = Scalding (2012)

The original release of Scalding was based on a fields-based API, which is what the examples here use. Subsequently, a type-safe API has been released, although it is currently marked “experimental.”

Twitter has released a type-safe Matrix API built on top of Scalding. This provides enormous benefits for the typical kinds of use cases encountered in Scalding. For example, matrix transforms can be used to implement machine learning algorithms that leverage social graph—at very large scale.

Another component is called Algebird, which is available as an open source project on GitHub.

This library was originally part of the Matrix API but was subsequently promoted into its own project with no dependencies. Algebird provides an abstract algebra library for building aggregation systems. It has excellent uses in streaming algorithms and probabilistic data structures, such as Bloom filters and Count-Min sketches.

Type-safe libraries, efficient operations on large sparse matrices, abstract algebra, etc.—these aspects become particularly important for building distributed algorithms and data services at scale.

Scalding has large-scale commercial deployments at companies such as Twitter, Etsy, eBay, LivePerson, etc. Twitter has many use cases, particularly on the revenue quality team: ad targeting, traffic quality, etc. Etsy had created the JRuby DSL for Cascading, and now also uses Scalding to perform web analytics and build recommender systems. eBay uses Scalding on its search analytics and other production data pipelines.

Scalding, which was first released in January 2012, won a Bossie 2012 Award from InfoWorld. The award described Scalding as “clean and concise” and “a natural fit”:

Hadoop puts a treasure trove of data at your fingertips, but the process for extracting those riches can be daunting. Cascading provides a thin layer of Java-based data processing functionality atop Hadoop’s MapReduce execution layer. It masks the complexity of MapReduce, simplifies the programming, and speeds you on your journey toward actionable analytics. Cascading works with JVM languages like Clojure and JRuby, but we prefer Scalding, a Scala API for Cascading from Twitter. A vast improvement over native MapReduce functions or Pig UDFs, Scalding code is clean and concise. Anyone comfortable with Ruby will find the Cascading/Scala pairing a natural fit.

James R. Borck InfoWorld magazine (2012)

In addition to the main Scalding website maintained by Twitter, there are several other excellent resources online:

Example 4 in Scalding: Replicated Joins

Next, let’s modify the Scalding code to create an app similar to the Cascading version in Example 4: Replicated Joins. We’ll show how simple it is to extend pipe assemblies in Scalding.

Starting from the “Impatient” source code directory that you cloned in Git, connect into the part8 subdirectory. Look at the code in scripts/scala/Example4.scala:

import com.twitter.scalding._

class Example4(args : Args) extends Job(args) {
  val stopPipe = Tsv(args("stop"), ('stop), skipHeader = true)
    .read

  Tsv(args("doc"), ('doc_id, 'text), skipHeader = true)
    .read
    .flatMap('text -> 'token) { text : String => text.split("[ \\[\\]\\(\\),.]") }
    .mapTo('token -> 'token) { token : String => scrub(token) }
    .filter('token) { token : String => token.length > 0 }
    .leftJoinWithTiny('token -> 'stop, stopPipe)
    .filter('stop) { stop : String => stop == null }
    .groupBy('token) { _.size('count) }
    .write(Tsv(args("wc"), writeHeader = true))

  def scrub(token : String) : String = {
    token
      .trim
      .toLowerCase
  }

  override def config(implicit mode: Mode): Map[AnyRef, AnyRef] = {
    // resolves "ClassNotFoundException cascading.*" exception on a cluster
    super.config(mode) ++ Map("cascading.app.appjar.class" -> classOf[Example4])
  }
}

Only a few lines have changed. First, we add a pipe called stopPipe to read the stop words list. Its tap identifier is specified by a --stop command-line parameter. Note that stopPipe is defined as an immutable variable (read-only) in Scala:

val stopPipe = Tsv(args("stop"), ('stop), skipHeader = true)
  .read

Next we use a leftJoinWithTiny() function in Scalding to perform the equivalent of a HashJoin in Cascading. This is a replicated join with a left outer join on the stop words. Scalding provides the full set of Join operations provided in Cascading.

After the join, we filter for null values—which is the equivalent of using a RegexFilter in Example 4: Replicated Joins:

  .leftJoinWithTiny('token -> 'stop, stopPipe)
  .filter('stop) { stop : String => stop == null }

This code is already in the part8/src/main/scala/ directory, in the Example4.scala file. To build and run:

$ rm -rf output
$ scald.rb --hdfs-local src/main/scala/Example4.scala \
   --doc data/rain.txt --stop data/en.stop --wc output/wc

In the output, the first 10 lines (including the header) should look like this:

$ head output/wc/part-00000
token           count
air             1
area            4
australia       1
broken          1
california's    1
cause           1
cloudcover      1
death           1
deserts         1

Again, a gist on GitHub shows building and running this app. If your run looks terribly different, something is probably not set up correctly. Ask the developer community for troubleshooting advice.

In a nutshell, that’s how to extend pipe assemblies in Scalding. Welcome to Enterprise data workflows in Scala.

Build Scalding Apps with Gradle

For the example in an upcoming section, we will need to incorporate extensions to Cascading, and the scald.rb script will not work for that. Instead, let’s look at how to use Gradle to build what is called a “fat jar.” In other words, create a JAR file that includes all the class dependencies for Scalding that Apache Hadoop would not normally provide. Note that Gradle version 1.3 or later is required for Scala support.

Starting from the “Impatient” source code directory that you cloned in Git, connect into the part8 subdirectory. Next, we’ll use a Gradle build.gradle script to build a Scalding app:

apply plugin: 'scala'

archivesBaseName = 'impatient'

repositories {
  mavenLocal()
  mavenCentral()
  mavenRepo name: 'conjars', url: 'http://conjars.org/repo/'
}

dependencies {
  // Scala compiler + related tools, and standard library
  scalaTools 'org.scala-lang:scala-compiler:2.9.2'
  compile 'org.scala-lang:scala-library:2.9.2'

  // Scalding
  compile( 'com.twitter:scalding_2.9.2:0.8.1' )

  // in case you need to add JARs from a local build
  compile fileTree( dir: 'lib', includes: ['*.jar'] )

  compile( 'cascading:cascading-core:2.0.2' )
  compile( 'cascading:cascading-hadoop:2.0.2' )
}

jar {
  description = "Assembles a Hadoop-ready JAR file"
  doFirst {
    into( 'lib' ) {
      from configurations.compile
    }
  }

  manifest {
    attributes( "Main-Class": "com.twitter.scalding.Tool" )
  }
}

Note the compile( 'com.twitter:scalding_2.9.2:0.8.1' ) directive. This pulls Scalding version 0.8.1 from a Maven repository; check to see if there’s a later stable version. Also note that the Cascading version is set to 2.0.2—that’s required for the current version of Scalding; again, check to see if that has been bumped up.

This script is already in the part8/src/main/scala/ directory, in the build.gradle file. By the way, if you reuse this script for other projects and ever need to add other JAR files that don’t come from Maven repositories—e.g., something built locally—just add them to the lib directory.

To verify that your Gradle build for Scalding works properly, let’s build and run Example 3 in Scalding: Word Count with Customized Operations:

$ gradle clean jar
$ rm -rf output
$ hadoop jar build/libs/impatient.jar Example3 --hdfs \
   --doc data/rain.txt --wc output/wc

Now we can verify the output/wc output:

$ head output/wc/part-00000
token           count
a               8
air             1
an              1
and             2
area            4
as              2
australia       1
back            1
broken          1

If that output looks correct, you should be good to go. Otherwise, there was probably an issue with the setup. Ask the developer community for troubleshooting advice.

Running on Amazon AWS

We have not yet shown how to run Cascading apps on Hadoop clusters in a cloud, and this Scalding app provides a good example.

To run Example 3 in Scalding: Word Count with Customized Operations on the Amazon AWS cloud, first you’ll need to have an AWS account set up. Make sure to sign up for EMR, S3, and SimpleDB. Also have your credentials set up in the local configuration—for example, in your ~/.aws_cred/ directory.

Next, install these excellent AWS tools:

s3cmd
Create, put, get, delete data in S3
EMR Ruby client
Command-line tool for Elastic MapReduce (EMR)

Then edit the emr.sh shell script, which is already in the part8/src/main/scala/ directory—you must update the BUCKET variable to be one of your S3 buckets.

Finally, use the emr.sh shell script to upload your JAR plus the input data to S3. That script launches an Apache Hadoop cluster on the Elastic MapReduce service where it runs the app.

#!/bin/bash -ex

BUCKET=temp.cascading.org/impatient
NAME=scalding3

# clear previous output
s3cmd del -r s3://$BUCKET/wc

# load built JAR + input data
s3cmd put build/libs/impatient.jar s3://$BUCKET/$NAME.jar
s3cmd put data/rain.txt s3://$BUCKET/rain.txt

# launch cluster and run
elastic-mapreduce --create --name "Scalding" \
  --debug \
  --enable-debugging \
  --log-uri s3n://$BUCKET/logs \
  --jar s3n://$BUCKET/$NAME.jar \
  --arg Example3 \
  --arg "--hdfs" \
  --arg "--doc" \
  --arg s3n://$BUCKET/rain.txt \
  --arg "--wc" \
  --arg s3n://$BUCKET/wc

Note that the output path in S3 must be deleted first. Hadoop checks this and will kill a job rather than overwrite an existing data set. Also note how the command-line arguments get passed to EMR through the --arg option. Each argument from the Scalding command line must be wrapped. Another nuance is that s3cmd uses the usual s3: protocol to reference data URIs in S3, whereas the Hadoop jobs running on the EMR service require the s3n: protocol. That represents a common reason for job failures on EMR; see also the discussion at the Amazon forums.

When that elastic-mapreduce line executes, it should return a job ID. As the Hadoop job runs on Elastic MapReduce, you can monitor progress in the AWS console based on that job ID. Figure 4-2 shows an example console view for EMR.

After the job completes, check the wc directory in the S3 bucket that you used, to confirm results.

With Safari, you learn the way you learn best. Get unlimited access to videos, live online training, learning paths, books, interactive tutorials, and more.

Start Free Trial

No credit card required