Errata

Learning Spark

Errata for Learning Spark

Submit your own errata for this product.

The errata list is a list of errors and their corrections that were found after the product was released.

The following errata were submitted by our customers and have not yet been approved or disproved by the author or editor. They solely represent the opinion of the customer.

Color Key: Serious technical mistake Minor technical mistake Language or formatting error Typo Question Note Update

Version Location Description Submitted by Date submitted
ePub Page Kindle location 707
botton third

In:

Example 2-14. Scala build and run sbt clean package $ SPARK_HOME/ bin/ spark-submit \ --class com.oreilly.learningsparkexamples.mini.scala.WordCount \ ./ target/...( as above) \ ./ README.md ./ wordcounts

Karau, Holden; Konwinski, Andy; Wendell, Patrick; Zaharia, Matei (2015-01-28). Learning Spark: Lightning-Fast Big Data Analysis (Kindle Locations 746-749). O'Reilly Media. Kindle Edition.

/target/...(as above) is totally ambiguous. It is imposible to understand what "(as ablve)" is referring to and makes this example not runnable.

Please resolve this ambiguity by providing the complete command text for running this example.

James Hufton  May 29, 2015 
ePub Throughout the ePub

Throughout the ePub, there are entire sections of text that are rendered in red instead of black. One example is the introductory material for Chapter 10. The opening paragraph reads "Many applications benefit from acting on data as soon as it arrives. For example, an application might track statistics..." The first sentence is black as it should be but starting with the second sentence, the remainder of the paragraph and subsequent paragraphs are red.

Looking at the HTML source, this appears to be happening because of these anchors:

<a data-type="indexterm" data-primary="Spark Streaming" data-secondary="DStreams" id="idp23532880"/>

The browser is not interpreting the anchor as a self-closing tag. There needs to be a separate </a> tag to close the tag and prevent the subsequent text from being highlighted.

j.b.langston iii  Nov 13, 2015 
Example 4-11

Submitting this for a customer:

"See below - it should read input.flatMap(.....

JavaRDD<String> input = sc.textFile("s3://...")
JavaRDD<String> words = rdd.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String x) { return Arrays.asList(x.split(" ")); }
});
JavaPairRDD<String, Integer> result = words.mapToPair(
new PairFunction<String, String, Integer>() {
public Tuple2<String, Integer> call(String x) { return new Tuple2(x, 1); }
}).reduceByKey(
new Function2<Integer, Integer, Integer>() {
public Integer call(Integer a, Integer b) { return a + b; }
});"

Thank you

Chase Koches  Sep 27, 2016 
Printed Page 5
Third paragraph under "Data Science Tesks" section

The fifth line of the third paragraph has a capitalization error - it reads "MLLib" rather than "MLlib"

Tim Hutchinson  Apr 30, 2018 
Printed Page 14
second paragraph no in the code example

The text in the paragraph located just before the section labeled "Introduction to Core Spark Concepts" reads
"In Examples 2-1 and 2-2, the variable called lines is an RDD, created here from a local text file on our local machine."
I take that to mean that the file is not in my hdfs partition, rather it is located as a plain text file in my regular user data space.
Yet, I discovered that for me creating lines only works when I get the file from my hadoop distributed file space.
If I should read "local text file" to mean "in the hdfs distributed disk space", then I stand corrected
== To illustrate; one copy in hdfs & the other in regular file space
nlp@hadoop01:~$ hdfs dfs -ls /user/nlp/HoundOfTheBaskervilles*
-rw-r--r-- 3 hdfs hadoop 352886 2017-12-26 21:46 /user/nlp/HoundOfTheBaskervilles.txt

#---file in non-HDFS space
nlp@hadoop01:~$ ls -la /home/nlp/texts/conanDoyle/HoundOfTheBaskervilles.txt
-rw-r--r-- 1 nlp nlp 352886 Sep 30 2016 /home/nlp/texts/conanDoyle/HoundOfTheBaskervilles.txt
nlp@hadoop01:~$ -rw-r--r-- 3 hdfs hadoop 352886 2017-12-26 21:46 /user/nlp/HoundOfTheBaskervilles.txt

Using Python version 2.7.12 (default, Nov 20 2017 18:23:56)
SparkSession available as 'spark'.
>>> lines = sc.textFile("/user/nlp/HoundOfTheBaskervilles.txt")
>>> lines.count()
7224
>>> lines.first()
u'Project Gutenberg\u2019s The Hound of the Baskervilles, by A. Conan Doyle'
>>> lines = sc.textFile("/home/nlp/texts/conanDoyle/HoundOfTheBaskervilles.txt")
>>> lines.count()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 1041, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 1032, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 906, in fold
vals = self.mapPartitions(func).collect()
File "/usr/hdp/current/spark2-client/python/pyspark/rdd.py", line 809, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://spark05.corp.precysesolutions.com:8020/home/nlp/texts/conanDoyle/HoundOfTheBaskervilles.txt

Andrew Freeman  Jan 03, 2018 
Printed Page 17
United Kingdom

A trivial typo:

In the 4th paragraph, should 'The spark-submit script includes the Spark dependencies for us in Python' be 'The spark-submit script includes the Spark dependencies for using Python'?

Greg  Nov 28, 2015 
PDF Page 21
Example 2.14

Example 2-14. Scala build and run
sbt clean package
$SPARK_HOME/bin/spark-submit \
--class com.oreilly.learningsparkexamples.mini.scala.WordCount \
./target/...(as above) \
./README.md ./wordcounts

(1) When I read above I understood the whole as one command and then I realized there are two commands. you first do the :sbt clean package" and then you do "spark submit"
It would be more easier if we clearly mention that with snapshots of output.

(2) please remove as above with this
./target/scala-2.10/learning-spark-mini-example_2.10-0.0.1.jar \

(3) we can mention that after executing this command user can run hadoop fs -ls and see the folder wordcounts have been created on hadoop system.

(4) also if we mention readme.md is file on hadoop system.. or to avoid the confusion.. we have a steo to create a test file on hadoop system before running the command.. things would be more easy.

Thank you. The book has been very helpful thus far.

Neel Priyadarshi  Jan 27, 2017 
Printed Page 24
Last paragraph

"Finally, Spark's RDDs are by default recomputed each time you run an action on them."

This sentence is incorrect. Spark saves intermediate data during the shuffle stage [1][2]. It will then re-use this data, rather than recomputing all RDDs [3].

If I understand correctly, persist is just a way to guarantee (node failure not included) that this data will be available for re-use, as well as to indicate where this data has to be persisted? (Although the persisted file in spark_local_dir will be called rdd_0_0 versus shuffle_0_0 in the file system when persisted to disk; rdd_0_0 will never actually replace shuffle_0_0).

If this is the case, I can see why you would like to point out that RDDs need to be persisted, but it is confusing as one would now think that if RDDs are not persisted, the RDDs are guaranteed to be recomputed (which is a false assumption). I believe some clarification/revisioning here would be helpful.

[1] Spark Summit East 2015 Advanced Devops Student Slides, slide 114: " Intermediate data is automatically persisted during shuffle operations.
Source: http://www.slideshare.net/databricks/spark-summit-east-2015-advdevopsstudentslides
[2] Advanced Spark training during the Spark Summit 2014, slide 21:
"Shuffle
Write intermediate files to disk"
Source: https://databricks-training.s3.amazonaws.com/slides/advanced-spark-training.pdf
[3] In this trace, Spark reuses the data from the shuffle file, rather than recomputing all involved RDDs

Tom Hubregtsen  Apr 21, 2015 
Printed Page 32
Example 3-21

The Example 3-21 is the Scala apparently looks to be the same as the Python code of the examples 3-19 and 3-20. But it's confusing as it's not doing the same thing.

# PYTHON code to run examples 3-19/3.20
distData = sc.parallelize(['pino', 'marino', felipe''])
SearchFunctions("ino").getMatchesNoReference(distData).collect()
# Output:
['pino', 'marino']

// SCALA, example 3-21, using the method is:
//
// def getMatchesNoReference(rdd: RDD[String]): RDD[Array[String]] = {
// // Safe: extract just the field we need into a local variable
// val query_ = this.query
// rdd.map(x => x.split(query_))
// }
val inputs = sc.parallelize(Seq("pino", "marino", "felipe"))
new SearchFunctions("ino").getMatchesNoReference(inputs).collect()
// Output
Array[Array[String]] = Array(Array(p), Array(mar), Array(felipe))


At least the getMatchesNoReference should be updated to to the same things, as in the code below:
To have the same behaviour with the Scala code, it should be changed as below:

import org.apache.spark.rdd.RDD

class SearchFunctions(val query: String) {

def isMatch(s: String): Boolean = {
s.contains(query)
}

def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = {
// Problem: "isMatch" means "this.isMatch", so we pass all of "this"
rdd.filter(isMatch)
}

def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
// Problem: "query" means "this.query", so we pass all of "this"
rdd.filter(x => x.contains(query))
}

def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {
// Safe: extract just the field we need into a local variable
val query_ = this.query
rdd.filter(x => x.contains(query_))
}
}


Kind Regards, Niccolo

Niccolo Becchi  May 21, 2016 
PDF Page 33
Example 3-22 to 3-24

There are few mistakes in these examples.

Example 3-22:
- RDD should be JavaRDD:

JavaRDD<String> errors = lines.filter(new Function<String, Boolean>() {
public Boolean call(String x) { return x.contains("error"); }
});


Example 3-23
- Remove '()'
- RDD should be JavaRDD

class ContainsError implements Function<String, Boolean> {
public Boolean call(String x) { return x.contains("error"); }
}
JavaRDD<String> errors = lines.filter(new ContainsError());


Example 3-24
- Remove '()'

class Contains implements Function<String, Boolean> {
private String query;
public Contains(String query) { this.query = query; }
public Boolean call(String x) { return x.contains(query); }
}

Example 3-25
- RDD should be JavaRDD

JavaRDD<String> errors = lines.filter(s -> s.contains("error"));

Tatsuo Kawasaki  Jul 20, 2015 
PDF Page 36
Section Pseudo set operations (at the bottom)

In the section "Pseudo set operations" it is, rightfully, stated that RDDs are not sets from a mathematical point of view. This is obviously correct; e.g. elements in an RDD are not unique. One is tempted to argue that RDDs are multisets (https://en.wikipedia.org/wiki/Multiset). It raises two questions:

1) Are RDDs multisets or not?
2) - If RDDs are NOT multisets, why?
- If RDDs are multisets, why the (multi)set-operations are not defined accordingly? For example If M1 = [a,b,a,b] and M2 = [a,a,b,c], then from a mathematical point of view their intersection should be [a,a,b]. However, Spark returns [a,b]; kind of the purely set view of the operation. What is the motivation behind?

Dror Atariah  Jan 10, 2017 
49
Tabulation

Under groupByKey() function, the example given is:
rdd.groupByKey()
with an expected output of {(1, [2]), (3, [4, 6])}.
The output for this function when you collect() it is actually
[(1, <pyspark.resultiterable.ResultIterable object at 0x03F2C090>), (3, <pyspark.resultiterable.ResultIterable object at 0x03F1A1D0>)].

i.e it returns an object which allows you to iterate over the results.

To get the expected result, the example function needs to be,
rdd.groupByKey().map(lambda x:(x[0],list(x[1]))).collect()
which gives you the expected output as under.
[(1, [2]), (3, [4, 6])]

Prithvi Paresh Shah  Dec 10, 2017 
PDF Page 50
Example 4-5

As RDD is immutable we need to assign the filtered result to new RDD like

val result = pairs.filter{case(key, value) => value.length <11}

also if we print using result.collect() with some test data.. it would be easy for the readers to understand

Neel Priyadarshi  Jan 29, 2017 
Printed Page 53
Last paragraph

In the last paragraph of the page,
"If it's a new element, "
should better be "If it's a new key, "

Jerry He  Dec 29, 2015 
PDF Page 53
Note

It was mentioned that using countByValue is faster than reduceByKey, that is :

input.flatMap(x => x.split(" ")).countByValue(). will be faster than
input.flatMap(x => x.split(" ")).map(x => (x, 1)).reduceByKey((x, y) => x + y).

When I check the implementation if countByValue and countByKey, I found :
def countByValue(): Map[T, Long]={
map(value => (value, null)).countByKey()
}
def countByKey() : Map[K, Long] = {
self.mapValues(_ => 1L).reduceByKey(_ + _).collect().toMap
}

So, effectively, using countByValue approach will be :
input.flatMap(x => x.split(" ")).map(value => (value, null)).mapValues(_ => 1L).reduceByKey(_ + _)

I do not see that it will be faster!

Anonymous  Apr 09, 2016 
Printed Page 54
2nd paragraph

In the beginning of the 2nd paragraph:
"If it is a value we have seen before ..."
should be "If it is a key we have seen before ...'

Jerry He  Dec 29, 2015 
Printed Page 54
Example 4-12. Per-key average using combineByKey() in Python:

Code:
"sumCount.map(lambda key, xy: (key, xy[0]/xy[1])).collectAsMap()" should be
"sumCount.map(lambda xy: (xx[0], xy[1][0]/xy[1][1])).collectAsMap()"

Anonymous  Oct 07, 2016 
PDF Page 55
Final part of Example 4-14.

The object "AvgCount initial" is neither used nor needed

Ruben Hernando  Sep 22, 2015 
Printed Page 57
Example 4-16

In Example 4-16 (Scala), the two parallelize commands are exactly the same.
i.e.,

sc.parallelize(data).reduceByKey((x,y) => (x+y))

Apostolos Papadopoulos  May 30, 2015 
Printed Page 57
first paragraph (not code paragraphs)

I believe this to be the case that the rrd.partitions.size is a property not a method:

near end of paragraph a mention to Scala/Java code is mentioned but the reference should be without parenthesis. Should be:

To know whether you can safely call coalesce(), you can check the size of the RDD using rdd.partitions.size

Emmett Keyser  Aug 01, 2016 
PDF Page 57
1st para

As pointed out by other person rdd.partitions.size() is not correct and it need to be rdd.partitions.size. If we combine this is in the example 4.16 where we create a custom parallelism and then print the size of partition it would be great to understand the concept.

Neel Priyadarshi  Feb 04, 2017 
PDF Page 58
Example-4-17

Sample 4-17 is demonstrating join on the paired RDD and apparently is using a CASE class.
Without giving out the case class definition I did the following testing the result shows 0 record returned from join. Hence the sample is not right. For some reason the CASE class cannot be used as the key for the join.
Spark 1.3.0

case class Store (name:String)
val storeAddress = sc.parallelize(List(
(Store("Ritual"), "1026 Valencia St"), (Store("Philz"), "748 Van Ness Ave"),
(Store("Philz"), "3101 24th St"), (Store("Starbucks"), "Seattle")
));
val storeRating = sc.parallelize(List(
(Store("Ritual"), 4.9),
(Store("Philz"), 4.8)
));
storeAddress.join(storeRating).count



--- execution
scala> case class Store (name:String)
defined class Store

scala> val storeAddress = sc.parallelize(List(
| (Store("Ritual"), "1026 Valencia St"), (Store("Philz"), "748 Van Ness Ave"),
| (Store("Philz"), "3101 24th St"), (Store("Starbucks"), "Seattle")
| ));
storeAddress: org.apache.spark.rdd.RDD[(Store, String)] = ParallelCollectionRDD[33] at parallelize at <console>:23

scala> val storeRating = sc.parallelize(List(
| (Store("Ritual"), 4.9),
| (Store("Philz"), 4.8)
| ));
storeRating: org.apache.spark.rdd.RDD[(Store, Double)] = ParallelCollectionRDD[34] at parallelize at <console>:23

scala> storeAddress.join(storeRating).count
res17: Long = 0

yuren wu  Nov 12, 2015 
PDF Page 58
Example 4-17

I also could not make example 4-17 work. Saw the comments added by yuren wu . To keep it simple if we remove the use of store class but just use the name of store as string.. then it works.

val storeAddress = sc.parallelize(List(
("Ritual", "1026 Valencia St"), ("Philz", "748 Van Ness Ave"),
("Philz", "3101 24th St"), ("Starbucks", "Seattle")
));
val storeRating = sc.parallelize(List(
("Ritual", 4.9),
("Philz", 4.8)
));
storeAddress.join(storeRating).count
storeAddress.join(storeRating).foreach(println)

Neel Priyadarshi  Feb 04, 2017 
PDF Page 61
"Projections and filters" subtitle

It says: "A projection in relational parlance is a way to return only the
rows matching a certain relational condition" when it meant to say columns, not rows

Anonymous  Sep 08, 2022 
67
Example 4-25

In the simplified Page rank example you have:

contributions.reduceByKey((x, y) => x + y).mapValues(v => 0.15 + 0.85*v)

The 0.15 should be divided by the total number of pages. Otherwise, the sum of ranks increases in every iteration. Moreover, if the initial ranks would be one divided by the total number of pages, then the ranks would sum up to 100%.

Timo Aaltonen  Sep 19, 2017 
PDF Page 73
2nd Paragraph

It talks about using minPartitions. while loading file but it will be great if we gvie an example like below on how to use the same:
scala> val input = sc.textFile("wordcountinput/words.dat",5)
input: org.apache.spark.rdd.RDD[String] = wordcountinput/words.dat MapPartitionsRDD[3] at textFile at <console>:27

scala> input.partitions.size
res0: Int = 5

Neel Priyadarshi  Feb 05, 2017 
Printed Page 75
Example 5-7

It contains a line "Some(mapper.readValue(record, classOf[Person])". But "mapper" is not defined or imported

Zhendong Jia  May 04, 2016 
PDF Page 78
Example 5-14

Example 5-14 does not even compile. Invalid
JavaPairRDD<String[]> csvData ...

I wonder if the author(s) compiled all Java examples in the book. I am under impression that there are lots of errors of this kind there.

Alexander Bootman  Oct 28, 2015 
PDF Page 105
Example 6-7

In "def processSignCount(sign_count, signPrefixes):", there should not be "signPrefixes".

Zheng Wenjie  Mar 16, 2016 
PDF Page 114
Examples 6-20 ----- 6-21

Hi

In the example 6-20:-------------------------------------------------------------
It appears as:
val distanceDouble = distance.map(string => string.toDouble)

Should appears:
val distanceDoubles = distances.map(string => string.toDouble)

(...)

It appears:
println(reasonableDistance.collect().toList)
Should appears:
println(reasonableDistances.collect().toList)

In the example 6-21:-------------------------------------------------------------
It appears:
System.out.println(StringUtils.join(reasonableDistance.collect(), ","));
Should appears.
System.out.println(StringUtils.join(reasonableDistances.collect(), ","));

Anonymous  Mar 31, 2016 
PDF Page 121
Table 7-1

About YARN:

How about to change the value 'yarn' to 'yarn-client' and 'yarn-cluster'?

As documented below, 'yarn-client' and 'yarn-cluster' are more common than 'yarn'.


https://spark.apache.org/docs/1.4.0/running-on-yarn.html

"Unlike in Spark standalone and Mesos mode, in which the master’s address is specified in the “master” parameter, in YARN mode the ResourceManager’s address is picked up from the Hadoop configuration. Thus, the master parameter is simply “yarn-client” or “yarn-cluster”."

Tatsuo Kawasaki  Jul 20, 2015 
PDF Page 121
Table 7-1

About YARN:

How about to change the value 'yarn' to 'yarn-client' and 'yarn-cluster'?

As documented below, 'yarn-client' and 'yarn-cluster' are more common than 'yarn'.


https://spark.apache.org/docs/1.4.0/running-on-yarn.html

"Unlike in Spark standalone and Mesos mode, in which the master’s address is specified in the “master” parameter, in YARN mode the ResourceManager’s address is picked up from the Hadoop configuration. Thus, the master parameter is simply “yarn-client” or “yarn-cluster”."

Tatsuo Kawasaki  Jul 20, 2015 
PDF Page 136
2nd paragraph

The ec2 script is said to default to an m1.xlarge, however it defaults to an m1.large.

Justin Pihony  Aug 31, 2015 
PDF Page 150
1st paragraph under "Spark UI" heading

The text says:

"One caveat is that in the case of the YARN cluster mode, where the application driver runs inside the cluster, you should access the UI through the YARN ResourceManager, which proxies requests directly to the driver."

But it doesn't give any example for how to actually proxy through the YARN ResourceManager to access the driver. It would be useful if an example was given.

Anonymous  Mar 16, 2015 
Printed Page 171
4th paragraph, line 9

JavaRDD<Integer> keys = rdd.toJavaRDD()
should be avaRDD<Integer> keys = rows.toJavaRDD()

Jerry He  Jan 05, 2016 
PDF Page 188
first listing, second command

Netcat (nc) must be started in server mode. Spark is the client. To make the example work, you must add the `-l` flag.

Instead of reading:
`$ nc localhost 7777`
We should read:
`$ nc -l localhost 7777`

marcandretr  May 17, 2016 
PDF, ePub Page 196
Examples 10-17 and 10-18

Both the example described in the previous page (195) and the Figure 10-06 describe a sliding window taking data from the previous 3 batches (30 seconds) recalculated each 2 batches (20 seconds).

However, the examples 10-17 and 10-18 use a slideDuration of 10 seconds (1 batch) instead 20 seconds which would fit better with the cases exposed.

pfcoperez  Dec 28, 2015 
Other Digital Version 2956
Example 6-6

Dear Editors,

I really enjoyed reading the Learning Spark book on the Amazon Kindle. Hopefully, after practicing the examples and reviewing other material, I will be able to pass Spark Certified Developer and continue my career into Big Data.

Moving to the point of this email, there seems to be an issue in the Kindle edition of Example 6-6, with the "contactCounts.map(processSignCount)" call. Specifically, function "processSignCount" has two parameters, sign_count and signPrefixes, but map() only passes the sign_count parameter.

Your online code at
https://github.com/databricks/learning-spark/blob/master/src/python/ChapterSixExample.py shows this issue has been fixed by replacing the above by using a lambda function as "contactCounts.map(lambda signCount: processSignCount(signCount, signPrefixes))".

I tried checking your http://www.oreilly.com/catalog/errata.csp?isbn=0636920028512 but did not see the correction, so if this has not already been fixed, perhaps Example 6-6 will be corrected in your next Kindle edition.

Hope this helps,

Harold

Harold Brown  May 19, 2016