Chapter 4. Left Outer Join

This chapter shows you how to implement a left outer join in the MapReduce environment. I provide three distinct implementations in MapReduce/Hadoop and Spark:

  • MapReduce/Hadoop solution using the classic map() and reduce() functions

  • Spark solution without using the built-in JavaPairRDD.leftOuterJoin()

  • Spark solution using the built-in JavaPairRDD.leftOuterJoin()

Left Outer Join Example

Consider a company such as Amazon, which has over 200 million users and can do hundreds of millions of transactions per day. To understand the concept of a left outer join, assume we have two types of data: users and transactions. The users data consists of users’ location information (say, location_id) and the transactions data includes user identity information (say, user_id), but no direct information about a user’s location. Given users and transactions, then:

users(user_id, location_id)
transactions(transaction_id, product_id, user_id, quantity, amount)

our goal is to find the number of unique locations in which each product has been sold.

But what exactly is a left outer join? Let T1 (a left table) and T2 (a right table) be two relations defined as follows (where t1 is attributes of T1 and t2 is attributes of T2):

  • T1 = (K, t1)

  • T2 = (K, t2)

The result of a left outer join for relations T1 and T2 on the join key of K contains all records of the left table (T1), even if the join condition does not find any matching record in the right table (T2). If the ON clause with key K matches zero records in T2 (for a given record in T1), the join will still return a row in the result (for that record), but with NULL in each column from T2. A left outer join returns all the values from an inner join plus all values in the left table that do not match to those of the right table. Formally, we can express this as:

LeftOuterJoin(T1, T2, K) = {(k, t1, t2) where kT1.K and kT2.K}
   
    {(k, t1, null) where kT1.K and kT2.K}

In SQL, we can express a left outer join as (where K is the column on which T1 and T2 are joined):

SELECT field_1, field_2, ...
   FROM T1 LEFT OUTER JOIN T2
      ON T1.K = T2.K;

A left outer join is visually expressed in Figure 4-1 (the colored part is included and the white part is excluded).

Left Out Join Illustration
Figure 4-1. Left outer join

Consider the values in Tables 4-1 and 4-2 for our users and transactions (note that these values are just examples to demonstrate the concept of a left outer join in the MapReduce environment).

Table 4-1. Users in our left outer join
user_id location_id
u1 UT
u2 GA
u3 CA
u4 CA
u5 GA
Table 4-2. Transactions in our left outer join
transaction_id product_id user_id quantity amount
t1 p3 u1 1 300
t2 p1 u2 1 100
t3 p1 u1 1 100
t4 p2 u2 1 10
t5 p4 u4 1 9
t6 p1 u1 1 100
t7 p4 u1 1 9
t8 p4 u5 2 40

Example Queries

Here are some example SQL queries relating to our left outer join:

  • Query 1: find all products sold (and their associated locations):

    mysql> SELECT product_id, location_id
        -> FROM transactions LEFT OUTER JOIN users
        ->   ON transactions.user_id = users.user_id;
    +------------+-------------+
    | product_id | location_id |
    +------------+-------------+
    | p3         | UT          |
    | p1         | GA          |
    | p1         | UT          |
    | p2         | GA          |
    | p4         | CA          |
    | p1         | UT          |
    | p4         | UT          |
    | p4         | GA          |
    +------------+-------------+
    8 rows in set (0.00 sec)
  • Query 2: find all products sold (and their associated location counts):

    mysql> SELECT product_id, count(location_id)
        -> FROM transactions LEFT OUTER JOIN users
        ->   ON transactions.user_id = users.user_id
        ->   group by product_id;
    +------------+--------------------+
    | product_id | count(location_id) |
    +------------+--------------------+
    | p1         |                  3 |
    | p2         |                  1 |
    | p3         |                  1 |
    | p4         |                  3 |
    +------------+--------------------+
    4 rows in set (0.00 sec)
  • Query 3: find all products sold (and their unique location counts):

    mysql> SELECT product_id, count(distinct location_id)
        -> FROM transactions LEFT OUTER JOIN users
        ->   ON transactions.user_id = users.user_id
        ->   group by product_id;
    +------------+-----------------------------+
    | product_id | count(distinct location_id) |
    +------------+-----------------------------+
    | p1         |                           2 |
    | p2         |                           1 |
    | p3         |                           1 |
    | p4         |                           3 |
    +------------+-----------------------------+
    4 rows in set (0.00 sec)

Implementation of Left Outer Join in MapReduce

Our desired output is provided in the preceding section by SQL query 3, which finds all distinct (unique) locations in which each product has been sold given all transactions. We present our solution for the left outer join problem in two phases:

  • MapReduce phase 1: find all products sold (and their associated locations). We accomplish this using SQL query 1 from the previous section.

  • MapReduce phase 2: find all products sold (and their associated unique location counts). We accomplish this using SQL query 3 from the previous section.

MapReduce Phase 1: Finding Product Locations

This phase will perform the left outer join operation with a MapReduce job, which will utilize two mappers (one for users and the other for transactions) and whose reducer will emit a key-value pair with the key being product_id, and the value being location_id. Using multiple mappers is enabled by the MultipleInputs class (note that if we had a single mapper, we would have used Job.setMapperClass() instead):

import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
...
Job job = new Job(...);
...
Path transactions = <hdfs-path-to-transactions-data>;
Path users = <hdfs-path-to-users-data>;

MultipleInputs.addInputPath(job,
                            transactions,
                            TextInputFormat.class,
                            TransactionMapper.class);
MultipleInputs.addInputPath(job,
                            users,
                            TextInputFormat.class,
                            UserMapper.class);

Figures 4-2 and 4-3 illustrate the working MapReduce flow of the Left Outer Join algorithm, consisting of the two MapReduce jobs (phases 1 and 2).

Left Outer Join Data Flow – Phase I
Figure 4-2. Left outer join data flow, phase 1
Left Outer Join Data Flow – Phase II
Figure 4-3. Left outer join data flow, phase 2

The core pieces of the left outer join data flow are as follows:

Transaction mapper
The transaction map() reads (transaction_id, product_id, user_id, quantity, amount) and emits a key-value pair composed of (user_id, product_id).
User mapper
The user map() reads (user_id, location_id) and emits a key-value pair composed of (user_id, location_id).

The reducer for phase 1 gets both the user’s location_id and product_id and emits (product_id, location_id). Now, the question is how the reducer will distinguish location_id from product_id. In Hadoop, the order of reducer values is undefined. Therefore, the reducer for a specific key (user_id) has no clue how to process the values. To remedy this problem we modify the transaction and user mappers/reducers (which we will call version 2):

Transaction mapper (version 2)
As shown in Example 4-1, the transaction map() reads (transaction_id, product_id, user_id, quantity, amount) and emits the key pair (user_id, 2) and the value pair ("P", product_id). By adding a “2” to the reducer key, we guarantee that product_id(s) arrive at the end. This will be accomplished through the secondary sorting technique described in Chapters 1 and 2. We added “P” to the value to identify products. In Hadoop, to implement Pair(String, String), we will use the PairOfStrings class.1
Example 4-1. Transaction mapper (version 2)
1 /**
2  * @param key is framework generated, ignored here
3  * @param value is the 
4  *    transaction_id<TAB>product_id<TAB>user_id<TAB>quantity<TAB>amount
5  */
6 map(key, value) {
7    String[] tokens = StringUtil.split(value, "\t");
8    String productID = tokens[1];
9    String userID = tokens[2];
10   outputKey = Pair(userID, 2);
11   outputValue = Pair("P", productID);
12   emit(outputKey, outputValue);
13 }
User mapper (version 2)
As shown in Example 4-2, the user map() reads (user_id, location_id) and emits the key pair (user_id, 1) and the value pair ("L", location_id). By adding a “1” to the reducer key, we guarantee that location_id(s) arrive first. This will be accomplished through the secondary sorting technique described in Chapters 1 and 2. We added “L” to the value to identify locations.
Example 4-2. User mapper (version 2)
 1 /**
 2  * @param key is framework generated, ignored here
 3  * @param value is the user_id<TAB>location_id
 4  */
 5 map(key, value) {
 6    String[] tokens = StringUtil.split(value, "\t");
 7    String userID = tokens[0];
 8    String locationID = tokens[1];
 9    outputKey = Pair(userID, 1); // make sure location shows before products
10    outputValue = Pair("L", locationID);
11    emit(outputKey, outputValue);
12 }

As shown in Example 4-3, the reducer for phase 1 (version 2) gets both the ("L", location_id) and ("P", product_id) pairs and emits a key-value pair of (product_id, location_id). Note that since 1 < 2, this means that the user’s location_id arrives first.

Example 4-3. The reducer for phase 1 (version 2)
1  /**
2   * @param key is user_id
3   * @param values is List<Pair<left, right>>, where
4   * values = List<{
5   *                 Pair<"L", locationID>, 
6   *                 Pair<"P", productID1>, 
7   *                 Pair<"P", productID2>, 
8   *                 ...
9   *               }
10  * NOTE that the Pair<"L", locationID> arrives   
11  * before all product pairs. The first value is location;  
12  * if it's not, then we don't have a user record, so we'll  
13  * set the locationID as "undefined".
14  */
15 reduce(key, values) {
16    locationID = "undefined";
17    for (Pair<left, right> value: values) {
18        // the following if-stmt will be true
19        // once at the first iteration
20        if (value.left.equals("L")) {
21            locationID = value.right;
22            continue;
23        }
24
25        // here we have a product: value.left.equals("P")
26        productID = value.right;
27        emit(productID, locationID);
28    }
29 }

MapReduce Phase 2: Counting Unique Locations

This phase will use the output of phase 1 (which is a sequence of pairs of (product_id, location_id) and generate pairs of (product_id, number_of_unique_locations). The mapper for this phase is an identity mapper (Example 4-4), and the reducer will count the number of unique locations (by using a Set data structure) per product (Example 4-5).

Example 4-4. Mapper phase 2: counting unique locations
1 /**
2  * @param key is product_id
3  * @param value is location_id
4  */
5 map(key, value) {
6    emit(key, value);
7 }
Example 4-5. Reducer phase 2: counting unique locations
 1 /**
 2  * @param key is product_id
 3  * @param value is List<location_id>
 4  */
 5 reduce(key, values) {
 6    Set<String> set = new HashSet<String>();
 7    for (String locationID : values) {
 8       set.add(locationID);
 9    }
10
11    int uniqueLocationsCount = set.size();
12    emit(key, uniqueLocationsCount);
13 }

Implementation Classes in Hadoop

The classes shown in Table 4-3 implement both phases of the Left Outer Join design pattern using the MapReduce/Hadoop framework.

Table 4-3. Implementation classes in Hadoop
Phase Class name Class description
Phase 1 LeftJoinDriver
LeftJoinReducer
LeftJoinTransactionMapper
LeftJoinUserMapper
SecondarySortPartitioner
SecondarySortGroupComparator
Driver to submit job for phase 1
Left join reducer
Left join transaction mapper
Left join user mapper
How to partition natural keys
How to group by natural key
Phase 2 LocationCountDriver
LocationCountMapper
LocationCountReducer
Driver to submit job for phase 2
Define map() for location count
Define reduce() for location count

Sample Run

Input for phase 1

# hadoop fs -cat /left_join/zbook/users/users.txt
u1 UT
u2 GA
u3 CA
u4 CA
u5 GA

# hadoop fs -cat /left_join/zbook/transactions/transactions.txt
t1 p3 u1 3 330
t2 p1 u2 1 400
t3 p1 u1 3 600
t4 p2 u2 10 1000
t5 p4 u4 9 90
t6 p1 u1 4 120
t7 p4 u1 8 160
t8 p4 u5 2 40

Running phase 1

# ./run_phase1_left_join.sh
...
13/12/29 21:17:48 INFO input.FileInputFormat: Total input paths to process : 1
...
13/12/29 21:17:48 INFO input.FileInputFormat: Total input paths to process : 1
13/12/29 21:17:49 INFO mapred.JobClient: Running job: job_201312291929_0004
13/12/29 21:17:50 INFO mapred.JobClient:  map 0% reduce 0%
...
13/12/29 21:18:41 INFO mapred.JobClient:  map 100% reduce 100%
13/12/29 21:18:41 INFO mapred.JobClient: Job complete: job_201312291929_0004
...
13/12/29 21:18:41 INFO mapred.JobClient: Map-Reduce Framework
13/12/29 21:18:41 INFO mapred.JobClient:   Map input records=13
...
13/12/29 21:18:41 INFO mapred.JobClient:   Reduce input records=13
13/12/29 21:18:41 INFO mapred.JobClient:   Reduce input groups=5
13/12/29 21:18:41 INFO mapred.JobClient:   Combine output records=0
13/12/29 21:18:41 INFO mapred.JobClient:   Reduce output records=8
13/12/29 21:18:41 INFO mapred.JobClient:   Map output records=13

Output of phase 1 (input for phase 2)

# hadoop fs -text /left_join/zbook/output/part*
p4 GA
p3 UT
p1 UT
p1 UT
p4 UT
p1 GA
p2 GA
p4 CA

Running phase 2

# ./run_phase2_location_count.sh
...
13/12/29 21:19:28 INFO input.FileInputFormat: Total input paths to process : 10
13/12/29 21:19:28 INFO mapred.JobClient: Running job: job_201312291929_0005
13/12/29 21:19:29 INFO mapred.JobClient:  map 0% reduce 0%
...
13/12/29 21:20:24 INFO mapred.JobClient:  map 100% reduce 100%
13/12/29 21:20:25 INFO mapred.JobClient: Job complete: job_201312291929_0005
...
13/12/29 21:20:25 INFO mapred.JobClient:   Map-Reduce Framework
13/12/29 21:20:25 INFO mapred.JobClient:     Map input records=8
...
13/12/29 21:20:25 INFO mapred.JobClient:     Reduce input records=8
13/12/29 21:20:25 INFO mapred.JobClient:     Reduce input groups=4
13/12/29 21:20:25 INFO mapred.JobClient:     Combine output records=0
13/12/29 21:20:25 INFO mapred.JobClient:     Reduce output records=4
13/12/29 21:20:25 INFO mapred.JobClient:     Map output records=8

Output of phase 2

# hadoop fs -cat /left_join/zbook/output2/part*
p1 2
p2 1
p3 1
p4 3

Spark Implementation of Left Outer Join

Since Spark provides a higher-level Java API than the MapReduce/Hadoop API, I will present the whole solution in a single Java class (called LeftOuterJoin), which will include a series of map(), groupByKey(), and reduce() functions. In the MapReduce/Hadoop implementation we used the MultipleInputs class to process two different types of input by two different mappers. As you’ve learned, Spark provides a much richer API for mappers and reducers. Without needing special plug-in classes, you can have many different types of mappers (by using the map(), flatMap(), and flatMapToPair() functions). In Spark, instead of using Hadoop’s MultipleInputs class, we will use the JavaRDD.union() function to return the union of two JavaRDDs (a users RDD and a transactions RDD), which will be merged to create a new RDD. The JavaRDD.union() function is defined as:

JavaRDD<T> union(JavaRDD<T> other)
JavaPairRDD<T> union(JavaPairRDD<T> other)

Description: Return the union of this JavaRDD and another one.
             Any identical elements will appear multiple
             times (use .distinct() to eliminate them).

You can only apply the union() function to JavaRDDs of the same type (T). Therefore, we will create the same RDD type for users and transactions. This is how we do so:

JavaPairRDD<String,Tuple2<String,String>> usersRDD = users.map(...);
JavaPairRDD<String,Tuple2<String,String>> transactionsRDD =
   transactions.map(...);

// here we perform a union() on usersRDD and transactionsRDD
JavaPairRDD<String,Tuple2<String,String>> allRDD =
   transactionsRDD.union(usersRDD);

The union() workflow is illustrated in Figure 4-4.

Union Data Flow
Figure 4-4. union() data flow

To refresh your memory, here is the data for users and transactions (this data will be used as input files for our sample run at the end of this chapter):

# hadoop fs -cat /data/leftouterjoins/users.txt
u1 UT
u2 GA
u3 CA
u4 CA
u5 GA

# hadoop fs -cat /data/leftouterjoins/transactions.txt
t1 p3 u1 3 330
t2 p1 u2 1 400
t3 p1 u1 3 600
t4 p2 u2 10 1000
t5 p4 u4 9 90
t6 p1 u1 4 120
t7 p4 u1 8 160
t8 p4 u5 2 40

Let’s see how the algorithm works. For the users and transactions data, we generate:

users => (userID, T2("L", location))
transactions => (userID, T2("P", product))

(Here, T2 stands for Tuple2.)

Next, we create a union of these two sets of data:

all = transactions.union(users);
    = { (userID1, T2("L", location)),
        (userID1, T2("P", P11)),

        (userID1, T2("P", P12)),
        ...
        (userID1, T2("P", P1n)),
        ...
     }

where Pij is a product ID.

The next step is to group the data by userID. This will generate:

{
  (userID1, List<T2("L", L1), T2("P", P11), T2("P", P12), T2("P", P13), ...>),
  (userID2, List<T2("L", L2), T2("P", P21), T2("P", P22), T2("P", P23), ...>),
  ...
}

where Li is a locationID, and Pij is a product ID.

Spark Program

First, I will provide a high-level solution in 11 steps (shown in Example 4-6), and then we will dissect each step with a proper working Spark code example.

Example 4-6. LeftOuterJoin high-level solution
 1 // Step 1: import required classes and interfaces
 2 public class LeftOuterJoin {
 3   public static void main(String[] args) throws Exception {
 4     // Step 2: read input parameters
 5     // Step 3: create a JavaSparkContext object
 6     // Step 4: create a JavaRDD for users
 7     // Step 5: create a JavaRDD for transactions
 8     // Step 6: create a union of the RDDs created in step 4 and step 5
 9     // Step 7: create a JavaPairRDD(userID, List<T2>) by calling groupByKey()
10     // Step 8: create a productLocationsRDD as JavaPairRDD<String,String>
11     // Step 9: find all locations for a product;
12     // result will be JavaPairRDD<String, List<String>>
13     // Step 10: finalize output by changing "value" from List<String>
14     // to Tuple2<Set<String>, Integer>, where you have a unique
15     // set of locations and their count
16     // Step 11: print the final result RDD
17     System.exit(0);
18  }
19 }

Step 1: Import required classes

We first import the required classes and interfaces from the JAR files provided by the binary distributions of the Spark framework. Spark provides two Java packages (org.apache.spark.api.java and org.apache.spark.api.java.function) for creating and manipulating RDDs. Example 4-7 demonstrates this step.

Example 4-7. Step 1: import required classes and interfaces
 1 // Step 1: import required classes and interfaces
 2 import scala.Tuple2;
 3 import org.apache.spark.api.java.JavaRDD;
 4 import org.apache.spark.api.java.JavaPairRDD;
 5 import org.apache.spark.api.java.JavaSparkContext;
 6 import org.apache.spark.api.java.function.Function;
 7 import org.apache.spark.api.java.function.PairFlatMapFunction;
 8 import org.apache.spark.api.java.function.FlatMapFunction;
 9 import org.apache.spark.api.java.function.PairFunction;
10
11 import java.util.Set;
12 import java.util.HashSet;
13 import java.util.Arrays;
14 import java.util.List;
15 import java.util.ArrayList;
16 import java.util.Collections;

Step 2: Read input parameters

As shown in Example 4-8, next we read two input parameters: users data and the transactions data. The users and transactions data are provided as HDFS text files.

Example 4-8. Step 2: read input parameters
1   // Step 2: read input parameters
2   if (args.length < 2) {
3     System.err.println("Usage: LeftOuterJoin <users> <transactions>");
4     System.exit(1);
5   }
6   String usersInputFile = args[0]; // HDFS text file
7   String transactionsInputFile = args[1]; // HDFS text file
8   System.out.println("users="+ usersInputFile);
9   System.out.println("transactions="+ transactionsInputFile);

The output of this step is:

users=/data/leftouterjoins/users.txt
transactions=/data/leftouterjoins/transactions.txt

Step 3: Create a JavaSparkContext object

As shown in Example 4-9, next we create a JavaSparkContext object. This object is used to create the first RDD.

Example 4-9. Step 3: create a JavaSparkContext object
1   // Step 3: create a JavaSparkContext object
2   JavaSparkContext ctx = new JavaSparkContext();

Step 4: Create a JavaRDD for users

In this step, shown in Example 4-10, we create a users JavaRDD<String>, where the RDD element is a single record of the text file (representing userID and locationID). Next, we use the JavaRDD<String>.mapToPair() function to create a new JavaPairRDD<String,Tuple2<String,String>>, where the key is a userID and the value is a Tuple2("L", location). Later we will create Tuple2("P", product) pairs for our transactions data. The tags "L" and "P" identify locations and products, respectively.

Example 4-10. Step 4: create a JavaRDD for users
1   // Step 4: create a JavaRDD for users
2   JavaRDD<String> users = ctx.textFile(usersInputFile, 1);
3   // <K2,V2> JavaPairRDD<K2,V2> mapToPair(PairFunction<T,K2,V2> f)
4   // Return a new RDD by applying a function to all elements of this RDD.
5   // PairFunction<T, K, V>  where T => Tuple2<K, V>
6   JavaPairRDD<String,Tuple2<String,String>> usersRDD =
7      users.mapToPair(new PairFunction<
8                                       String, // T
9                                       String, // K
10                                      Tuple2<String,String> // V
11                                   >() {
12    public Tuple2<String,Tuple2<String,String>> call(Strings) {
13       String[] userRecord = s.split("\t");
14       Tuple2<String,String> location = 
15          new Tuple2<String,String>("L", userRecord[1]);
16       return new Tuple2<String,Tuple2<String,String>>(userRecord[0], location);
17    }
18 });

Step 5: Create a JavaRDD for transactions

In this step, shown in Example 4-11, we create a transactions JavaRDD<String>, where the RDD element is a single record of the text file (representing a transaction record). Next, we use the JavaRDD<String>.mapToPair() function to create a new JavaPairRDD<String,Tuple2<String,String>>, where the key is a userID and the value is a Tuple2("P", product). In the previous step, we created Tuple2("L", location) pairs for users. The tags "L" and "P" identify locations and products, respectively.

Example 4-11. Step 5: create a JavaRDD for transactions
 1   // Step 5: create a JavaRDD for transactions
 2   JavaRDD<String> transactions = ctx.textFile(transactionsInputFile, 1);
 3
 4   // mapToPair
 5   // <K2,V2> JavaPairRDD<K2,V2> mapToPair(PairFunction<T,K2,V2> f)
 6   // Return a new RDD by applying a function to all elements of this RDD.
 7   // PairFunction<T, K, V>
 8   // T => Tuple2<K, V>
 9   JavaPairRDD<String,Tuple2<String,String>> transactionsRDD =
10         transactions.mapToPair(new PairFunction<
11                                                 String,               // T
12                                                 String,               // K
13                                                 Tuple2<String,String> // V
14                                                >() {
15     public Tuple2<String,Tuple2<String,String>> call(String s) {
16       String[] transactionRecord = s.split("\t");
17       Tuple2<String,String> product =
18           new Tuple2<String,String>("P", transactionRecord[1]);
19       return new Tuple2<String,Tuple2<String,String>>(transactionRecord[2], 
20                                                                   product);
21     }
22   });

Step 6: Create a union of the RDDs created in steps 4 and 5

This step, shown in Example 4-12, creates a union of two instances of JavaPairRDD<String,Tuple2<String,String>>. The JavaPairRDD.union() method requires both RDDs to have the same exact types.

Example 4-12. Step 6: create a union of RDDs
1   // Step 6: create a union of the RDDs created in step 4 and step 5
2   JavaPairRDD<String,Tuple2<String,String>> allRDD =
3         transactionsRDD.union(usersRDD);

Example 4-13 shows a semantically equivalent implementation of this step; we’ve simply changed the order of the union parameters.

Example 4-13. Step 6: Create a union of RDDs (alternative implementation)
1   // Here we perform a union() on usersRDD and transactionsRDD.
2   JavaPairRDD<String,Tuple2<String,String>> allRDD =
3         usersRDD.union(transactionsRDD);

The result of the union of the two JavaPairRDDs is the following key-value pairs:

{
   (userID, Tuple2("L", location)),
   ...
   (userID, Tuple2("P", product))
   ...
}

Step 7: Create a JavaPairRDD(userID, List(T2)) by calling groupByKey()

Next, we group our data (created in step 6) by userID. As you can see in Example 4-14, this step is accomplished by JavaPairRDD.groupByKey().

Example 4-14. Step 7: create a JavaPairRDD
 1   // Step 7: create a JavaPairRDD (userID, List<T2>) by calling groupBy()
 1   // group allRDD by userID
 2   JavaPairRDD<String, Iterable<Tuple2<String,String>>> groupedRDD =
 3       allRDD.groupByKey();
 4   // now the groupedRDD entries will be as follows:
 5   // <userIDi, List[T2("L", location),
 6   //                T2("P", Pi1),
 7   //                T2("P", Pi2),
 8   //                T2("P", Pi3), ...
 9   //               ]
10   // >

The result of this step is:

(userID1, List[T2("L", location1),
               T2("P", P11),
               T2("P", P12),
               T2("P", P13), ...]),
(userID2, List[T2("L", location2),
               T2("P", P21),
               T2("P", P22),
               T2("P", P23), ...]),
...

where Pij is a productID.

Step 8: Create a productLocationsRDD as a JavaPairRDD(String,String)

In this step, the userIDs are dropped from the RDDs. For a given RDD element:

(userID, List[T2("L", location),
              T2("P", p1),
              T2("P", p2),
              T2("P", p3), ...])

we create a JavaPairRDD<String,String> as:

(p1, location)
(p2, location)
(p3, location)
...

This step is accomplished by the JavaPairRDD.flatMapToPair() function, which we implement as a PairFlatMapFunction.call() method. PairFlatMapFunction works as follows:

PairFlatMapFunction<T, K, V>
T => Iterable<Tuple2<K, V>>

where in our example: T is an input and
we create (K, V) pairs as output:

   T = Tuple2<String, Iterable<Tuple2<String,String>>>
   K = String
   V = String

Example 4-15 shows the complete implementation of the PairFlatMapFunction.call() method.

Example 4-15. Step 8: create a productLocationsRDD
 1   // Step 8: create a productLocationsRDD as JavaPairRDD<String,String>
 2   // PairFlatMapFunction<T, K, V>
 3   // T => Iterable<Tuple2<K, V>>
 4   JavaPairRDD<String,String> productLocationsRDD =
 5        groupedRDD.flatMapToPair(new PairFlatMapFunction<
 6           Tuple2<String, Iterable<Tuple2<String,String>>>, // T
 7           String,                                          // K
 8           String>() {                                      // V
 9     public Iterable<Tuple2<String,String>>
10        call(Tuple2<String, Iterable<Tuple2<String,String>>> s) {
11           // String userID = s._1; // NOT Needed
12           Iterable<Tuple2<String,String>> pairs = s._2;
13            String location = "UNKNOWN";
14            List<String> products = new ArrayList<String>();
15            for (Tuple2<String,String> t2 : pairs) {
16                  if (t2._1.equals("L")) {
17                        location = t2._2;
18                  }
19                  else {
20                        // t2._1.equals("P")
21                        products.add(t2._2);
22                  }
23            }
24
25            // now emit (K, V) pairs
26            List<Tuple2<String,String>> kvList =
27               new ArrayList<Tuple2<String,String>>();
28            for (String product : products) {
29                  kvList.add(new Tuple2<String, String>(product, location));
30            }
31            // Note that edges must be reciprocal; that
32            // is, every {source, destination} edge must have
33            // a corresponding {destination, source}.
34       return kvList;
35     }
36   });

Step 9: Find all locations for a product

In this step, RDD pairs of (product, location) are grouped by product. We use JavaPairRDD.groupByKey() to accomplish this, as shown in Example 4-16. This step does some basic debugging too, by calling the JavaPairRDD.collect() function.

Example 4-16. Step 9: find all locations for a product
 1   // Step 9: find all locations for a product;
 2   // result will be JavaPairRDD <String, List<String>>
 3   JavaPairRDD<String, Iterable<String>> productByLocations =
 4      productLocationsRDD.groupByKey();
 5
 6    // debug3
 7    List<Tuple2<String, List<String>>> debug3 = productByLocations.collect();
 8    System.out.println("--- debug3 begin ---");
 9    for (Tuple2<String, Iterable<String>> t2 : debug3) {
10      System.out.println("debug3 t2._1="+t2._1);
11      System.out.println("debug3 t2._2="+t2._2);
12    }
13    System.out.println("--- debug3 end ---");

Step 10: Finalize output by changing value

Step 9 produced a JavaPairRDD<String, List<String>> object, where the key is the product (as a string) and the value is a List<String>, which is a list of locations and might have duplicates. To remove duplicate elements from a value, we use the JavaPairRDD.mapValues() function. We implement this function by converting a List<String> to a Set<String>. Note that the keys are not altered. Mapping values is implemented by Function(T, R).call(), where T is an input (as List<String>) and R is an output (as Tuple2<Set<String>, Integer>). See Example 4-17.

Example 4-17. Step 10: finalize output
 1   // Step 10: finalize output by changing "value" from List<String>
 2   // to Tuple2<Set<String>, Integer>, where you have a unique
 3   // set of locations and their count
 4   JavaPairRDD<String, Tuple2<Set<String>, Integer>> productByUniqueLocations =
 5         productByLocations.mapValues(
 6            new Function<Iterable<String>,       // input
 7            Tuple2<Set<String>, Integer>          // output
 8   >() {
 9     public Tuple2<Set<String>, Integer> call(Iterable<String> s) {
10       Set<String> uniqueLocations = new HashSet<String>();
11       for (String location : s) {
12           uniqueLocations.add(location);
13       }
14       return new Tuple2<Set<String>, Integer>(uniqueLocations,
15                                               uniqueLocations.size());
16     }
17   });

Step 11: Print the final result RDD

The final step, shown in Example 4-18, emits the results using the JavaPairRDD.collect() method.

Example 4-18. Step 11: print the final result RDD
 1   // Step 11: print the final result RDD
 2   // debug4
 3   System.out.println("=== Unique Locations and Counts ===");
 4   List<Tuple2<String, Tuple2<Set<String>, Integer>>> debug4 =
 5      productByUniqueLocations.collect();
 6   System.out.println("--- debug4 begin ---");
 7   for (Tuple2<String, Tuple2<Set<String>, Integer>> t2 : debug4) {
 8     System.out.println("debug4 t2._1="+t2._1);
 9     System.out.println("debug4 t2._2="+t2._2);
10   }
11   System.out.println("--- debug4 end ---");

Running the Spark Solution

The shell script

# cat run_left_outer_join.sh
#!/bin/bash
export JAVA_HOME=/usr/java/jdk7
export SPARK_HOME=/usr/local/spark-1.1.0
export SPARK_MASTER=spark://myserver100:7077
export BOOK_HOME=/home/data-algorithms-book
export APP_JAR=$BOOK_HOME/dist/data_algorithms_book.jar
USERS=/data/leftouterjoins/users.txt
TRANSACTIONS=/data/leftouterjoins/transactions.txt
# Run on a Spark cluster
prog=org.dataalgorithms.chap04.spark.SparkLeftOuterJoin
$SPARK_HOME/bin/spark-submit \
  --class $prog \
  --master $SPARK_MASTER \
  --executor-memory 2G \
  --total-executor-cores 20 \
    $APP_JAR $USERS $TRANSACTIONS

Running the shell script

The log output from a sample run is shown here; it has been trimmed and formatted to fit the page:

# ./run_left_outer_join.sh
users=/data/leftouterjoins/users.txt
transactions=/data/leftouterjoins/transactions.txt
...
14/06/03 17:52:01 INFO scheduler.DAGScheduler: Stage 0
  (collect at LeftOuterJoin2.java:112) finished in 0.163 s
14/06/03 17:52:01 INFO spark.SparkContext: Job finished:
  collect at LeftOuterJoin2.java:112, took 6.365762312 s
--- debug3 begin ---
debug3 t2._1=p2
debug3 t2._2=[GA]
debug3 t2._1=p4
debug3 t2._2=[GA, UT, CA]
debug3 t2._1=p1
debug3 t2._2=[GA, UT, UT]
debug3 t2._1=p3
debug3 t2._2=[UT]
--- debug3 end ---
=== Unique Locations and Counts ===
14/06/03 17:52:01 INFO spark.SparkContext: Starting job:
  collect at LeftOuterJoin2.java:137
14/06/03 17:52:01 INFO spark.MapOutputTrackerMaster: Size
  of output statuses for shuffle 1 is 156 bytes
...
14/06/03 17:52:01 INFO scheduler.DAGScheduler: Stage 3
  (collect at LeftOuterJoin2.java:137) finished in 0.058 s
14/06/03 17:52:01 INFO spark.SparkContext: Job finished:
  collect at LeftOuterJoin2.java:137, took 0.081830132 s
--- debug4 begin ---
debug4 t2._1=p2
debug4 t2._2=([GA],1)
debug4 t2._1=p4
debug4 t2._2=([UT, GA, CA],3)
debug4 t2._1=p1
debug4 t2._2=([UT, GA],2)
debug4 t2._1=p3
debug4 t2._2=([UT],1)
--- debug4 end ---
...
14/06/03 17:52:02 INFO scheduler.DAGScheduler: Stage 6
  (saveAsTextFile at LeftOuterJoin2.java:144) finished in 1.060 s
14/06/03 17:52:02 INFO spark.SparkContext: Job finished: saveAsTextFile
  at LeftOuterJoin2.java:144, took 1.169724354 s

Running Spark on YARN

In this section, we cover how to submit Spark’s ApplicationMaster to Hadoop YARN’s ResourceManager, and instruct Spark to run the left outer join program. Further, we will instruct our Spark program to save our final result into an HDFS file. We save a file to HDFS by adding the following line after creating the productByUniqueLocations RDD (/left/output is an HDFS output directory):

productByUniqueLocations.saveAsTextFile("/left/output");

Script to run Spark on YARN

The script to run Spark on YARN is as follows:

# cat leftjoin.sh
export SPARK_HOME=/usr/local/spark-1.0.0
export SPARK_JAR=spark-assembly-1.0.0-hadoop2.3.0.jar
export SPARK_ASSEMBLY_JAR=$SPARK_HOME/assembly/target/scala-2.10/$SPARK_JAR
export JAVA_HOME=/usr/java/jdk6
export HADOOP_HOME=/usr/local/hadoop-2.3.0
export HADOOP_CONF_DIR=$HADOOP_HOME/conf
export YARN_CONF_DIR=$HADOOP_HOME/conf
export BOOK_HOME=/mp/data-algorithms-book
export APP_JAR=$BOOK_HOME/dist/data_algorithms_book.jar
# Submit Spark's ApplicationMaster to YARN's ResourceManager,
# and instruct Spark to run the LeftOuterJoin2 example
prog=org.dataalgorithms.chap04.spark.SparkLeftOuterJoin
SPARK_JAR=$SPARK_ASSEMBLY_JAR \
      $SPARK_HOME/bin/spark-class org.apache.spark.deploy.yarn.Client \
      --jar $APP_JAR \
      --class $prog \
      --args yarn-standalone \
      --args /left/users.txt \
      --args /left/transactions.txt \
      --num-workers 3 \
      --master-memory 4g \
      --worker-memory 2g \
      --worker-cores 1

For details on Spark parameters (such as num-workers and worker-memory) and environment variables, refer to the Spark Summit slides. Most of these parameters depend on the number of worker nodes, number of cores per cluster node, and amount of RAM available. Finding the optimal settings will require some trial and error and experimentation with different sizes of input data.

Running the script

The log output from a sample run is shown here. It has been trimmed and formatted to fit the page:

# ./leftjoin.sh
14/05/28 16:49:31 INFO RMProxy: Connecting to 
.. ResourceManager at myserver100:8032
14/05/28 16:49:31 INFO Client: 
  Got Cluster metric info from ApplicationsManager (ASM),
number of NodeManagers: 13
...
14/05/28 16:49:33 INFO Client: Command for starting 
.. the Spark ApplicationMaster:
$JAVA_HOME/bin/java -server -Xmx4096m -Djava.io.tmpdir=$PWD/tmp
org.apache.spark.deploy.yarn.ApplicationMaster
--class LeftOuterJoin2 --jar /usr/local/spark/tmp/data_algorithms_book.jar 
--args 'yarn-standalone' --args '/left/users.txt'
--args '/left/transactions.txt' --worker-memory 2048 --worker-cores 1
--num-workers 3 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr
14/05/28 16:49:33 INFO Client: Submitting application to ASM
...
yarnAppState: FINISHED
distributedFinalState: SUCCEEDED
appTrackingUrl: http://myserver100:50030/proxy/application_1401319796895_0008/A
appUser: hadoop

Checking the expected output

Here we examine the generated HDFS output:

# hadoop fs -ls /left/output
Found 3 items
-rw-r--r--   2 hadoop supergroup    0 2014-05-28 16:49 /left/output/_SUCCESS
-rw-r--r--   2 hadoop supergroup   36 2014-05-28 16:49 /left/output/part-00000
-rw-r--r--   2 hadoop supergroup   32 2014-05-28 16:49 /left/output/part-00001

# hadoop fs -cat /left/output/part*
(p2,([GA],1))
(p4,([UT, GA, CA],3))
(p1,([UT, GA],2))
(p3,([UT],1))

Spark Implementation with leftOuterJoin()

This section implements a left outer join by using Spark’s built-in JavaPairRDD.leftOuterJoin() method (note that MapReduce/Hadoop does not offer higher-level API functionality such as a leftOuterJoin() method):

import scala.Tuple2;
import com.google.common.base.Optional;
import org.apache.spark.api.java.JavaPairRDD;

JavaPairRDD<K,Tuple2<V,Optional<W>>> leftOuterJoin(JavaPairRDD<K,W> other)
  // Perform a left outer join of this and other. For each
  // element (k, v) in this, the resulting RDD will either
  // contain all pairs (k, (v, Some(w))) for w in other, or
  // the pair (k, (v, None)) if no elements in other have key k.

Using Sparks’s JavaPairRDD.leftOuterJoin() method helps us avoid:

  • Using the JavaPairRDD.union() operation between users and transactions, which is costly

  • Introducing custom flags such as "L" for location and "P" for products

  • Using extra RDD transformations to separate custom flags from each other

Using the JavaPairRDD.leftOuterJoin() method enables us to produce the result efficiently. transactionsRDD is the left table and usersRDD is the right table:

JavaPairRDD<String,String> usersRDD = ...;        // (K=userID, V=location)
JavaPairRDD<String,String> transactionsRDD = ...; // (K=userID, V=product)
// perform left outer join by built-in leftOuterJoin()
JavaPairRDD<String, Tuple2<String,Optional<String>>> joined =
    transactionsRDD.leftOuterJoin(usersRDD);

Now, the joined RDD contains:

(u4,(p4,Optional.of(CA)))
(u5,(p4,Optional.of(GA)))
(u2,(p1,Optional.of(GA)))
(u2,(p2,Optional.of(GA)))
(u1,(p3,Optional.of(UT)))
(u1,(p1,Optional.of(UT)))
(u1,(p1,Optional.of(UT)))
(u1,(p4,Optional.of(UT)))

Since we are interested only in the products and unique locations, in the next step, we ignore userIDs (the key). We accomplish this through another JavaPairRDD.mapToPair() function. After ignoring userIDs, we generate:

(p4,CA)
(p4,GA)
(p1,GA)
(p2,GA)
(p3,UT)
(p1,UT)
(p1,UT)
(p4,UT)

which has the desired information to generate the list of products and unique locations.

Spark Program

In Example 4-19, I present the high-level steps to show you how to use Spark’s built-in JavaPairRDD.leftOuterJoin() method. Each step is discussed in detail after the example.

Example 4-19. High-level steps
 1 // Step 1: import required classes and interfaces
 2 public class SparkLeftOuterJoin {
 3   public static void main(String[] args) throws Exception {
 4     // Step 2: read input parameters
 5     // Step 3: create Spark's context object
 6     // Step 4: create RDD for users data
 7     // Step 5: create (userID,location) pairs for users (the right table)
 8     // Step 6: create RDD for transactions data
 9     // Step 7: create (userID,product) pairs for transactions (the left table)
10     // Step 8: use Spark's built-in JavaPairRDD.leftOuterJoin() method
11     // Step 9: create (product, location) pairs
12     // Step 10: group (product, location) pairs by key
13     // Step 11: create final output (product, Set<location>) pairs by key
14     System.exit(0);
15   }
16 }

Step 1: Import required classes and interfaces

We begin with the necessary imports (Example 4-20).Optional2 represents an immutable object that may contain a non-null reference to another object (useful for the left outer join, since the joined values may contain nulls if the key is in the left table, but not in the right table). The factory class JavaSparkContext is used to create new RDDs.

Example 4-20. Step 1: import required classes and interfaces
 1 // Step 1: import required classes and interfaces
 2 import scala.Tuple2;
 3 import org.apache.spark.api.java.JavaRDD;
 4 import org.apache.spark.api.java.JavaPairRDD;
 5 import org.apache.spark.api.java.JavaSparkContext;
 6 import org.apache.spark.api.java.function.Function;
 7 import org.apache.spark.api.java.function.PairFunction;
 8 import com.google.common.base.Optional;
 9 import java.util.Set;
10 import java.util.HashSet;

Step 2: Read input parameters

This step, shown in Example 4-21, reads the locations of the HDFS input files containing our users and transactions data. These input files will be used to create the left table (transactionsRDD) and the right table (usersRDD).

Example 4-21. Step 2: read input parameters
 1   // Step 2: read input parameters
 2   if (args.length < 2) {
 3     System.err.println("Usage: LeftOuterJoin <users> <transactions>");
 4     System.exit(1);
 5   }
 6
 7   String usersInputFile = args[0];
 8   String transactionsInputFile = args[1];
 9   System.out.println("users="+ usersInputFile);
10   System.out.println("transactions="+ transactionsInputFile);

Step 3: Create Spark’s context object

This step, shown in Example 4-22, creates a JavaSparkContext object, which will be used to create new RDDs.

Example 4-22. Step 3: create Spark’s context object
1   // Step 3: create Spark's context object
2   JavaSparkContext ctx = new JavaSparkContext();

Step 4: Create an RDD for the users data

This step, demonstrated in Example 4-23, creates usersRDD, which is a set of (userID, location) pairs. usersRDD represents the “right” table for the left outer join operation.

Example 4-23. Step 4: create RDD for users data
1   // Step 4: create RDD for users data
2   JavaRDD<String> users = ctx.textFile(usersInputFile, 1);

Step 5: Create usersRDD (the right table)

As you can see in Example 4-24, this step creates the right table, usersRDD, which contains (userID,location) pairs from the users input data.

Example 4-24. Step 5: create (userID,location) pairs for users
 1   // Step 5: create (userID,location) pairs for users
 2   // <K2,V2> JavaPairRDD<K2,V2> mapToPair(PairFunction<T,K2,V2> f)
 3   // Return a new RDD by applying a function to all elements of this RDD.
 4   // PairFunction<T, K, V>
 5   // T => Tuple2<K, V>
 6   JavaPairRDD<String,String> usersRDD =
 7         //                               T       K       V
 8         users.mapToPair(new PairFunction<String, String, String>() {
 9     public Tuple2<String,String> call(String s) {
10        String[] userRecord = s.split("\t");
11        String userID = userRecord[0];
12        String location = userRecord[1];
13        return new Tuple2<String,String>(userID, location);
14     }
15   });

Step 6: Create an RDD for the transactions data

This step, shown in Example 4-25, creates transactionsRDD, which is a set of (userID, product) pairs. transactionsRDD represents the left table for the left outer join operation.

Example 4-25. Step 6: create RDD for transactions data
1   // Step 6: create RDD for the transactions data
2   JavaRDD<String> transactions = ctx.textFile(transactionsInputFile, 1);

Step 7: Create transactionsRDD (the left table)

This step, shown in Example 4-26, creates the left table, transactionsRDD, which contains (userID,product) pairs from the transactions input data.

Example 4-26. Step 7: create (userID,product) pairs for transactions
 1   // Step 7: create (userID,product) pairs for transactions
 2   // PairFunction<T, K, V>
 3   // T => Tuple2<K, V>
 4   // sample transaction input: t1   p3   u1  3  330
 5   JavaPairRDD<String,String> transactionsRDD =
 6         //                                      T          K          V
 7         transactions.mapToPair(new PairFunction<String, String, String>() {
 8     public Tuple2<String,String> call(String s) {
 9        String[] transactionRecord = s.split("\t");
10        String userID = transactionRecord[2];
11        String product = transactionRecord[1];
12        return new Tuple2<String,String>(userID, product);
13     }
14   });

Step 8: Use Spark’s built-in JavaPairRDD.leftOuterJoin() method

This is the core step for performing the left outer join operation, using Spark’s JavaPairRDD.leftOuterJoin() method (see Example 4-27).

Example 4-27. Step 8: use Spark’s built-in JavaPairRDD.leftOuterJoin() method
 1   // Step 8: use Spark's built-in JavaPairRDD.leftOuterJoin() method.
 2   // JavaPairRDD<K,Tuple2<V,Optional<W>>> leftOuterJoin(JavaPairRDD<K,W> other)
 3   // Perform a left outer join of this and other. For each element (k, v) in this,
 4   // the resulting RDD will either contain all pairs (k, (v, Some(w))) for w in
 5   // other, or the pair (k, (v, None)) if no elements in other have key k.
 6   //
 7   // Here we perform a transactionsRDD.leftOuterJoin(usersRDD).
 8   JavaPairRDD<String, Tuple2<String,Optional<String>>> joined =
 9       transactionsRDD.leftOuterJoin(usersRDD);
10   joined.saveAsTextFile("/output/1");

This step creates the following output (the result of the left outer join operation):

# hadoop fs -cat /output/1/part*
(u4,(p4,Optional.of(CA)))
(u5,(p4,Optional.of(GA)))
(u2,(p1,Optional.of(GA)))
(u2,(p2,Optional.of(GA)))
(u1,(p3,Optional.of(UT)))
(u1,(p1,Optional.of(UT)))
(u1,(p1,Optional.of(UT)))
(u1,(p4,Optional.of(UT)))

Step 9: Create (product, location) pairs

This step builds another JavaPairRDD, which contains (product, location) pairs. Note in Example 4-28 that we completely ignore the userIDs, since we are interested only in products and their unique user locations.

Example 4-28. Step 9: create (product, location) pairs
1  // Step 9: create (product, location) pairs
2  JavaPairRDD<String,String> products =
3     joined.mapToPair(new PairFunction<
4         Tuple2<String, Tuple2<String,Optional<String>>>, // T
5         String, // K
6         String> // V
7     () {
8     public Tuple2<String,String> call(Tuple2<String, 
9                                       Tuple2<String,Optional<String>>> t) {
10       Tuple2<String,Optional<String>> value = t._2;
11       return new Tuple2<String,String>(value._1, value._2.get());
12    }
13 });
14 products.saveAsTextFile("/output/2");

This step creates the following output:

# hadoop fs -cat /output/2/part*
(p4,CA)
(p4,GA)
(p1,GA)
(p2,GA)
(p3,UT)
(p1,UT)
(p1,UT)
(p4,UT)

Step 10: Group (K=product, V=location) pairs by key

This step groups (K=product, V=location) pairs by key. The result will be (K, V2), where V2 is a list of locations (which will include duplicate locations).

Example 4-29. Step 10: group (K=product, V=location) pairs by key
1   // Step 10: group (K=product, V=location) pairs by key
2   JavaPairRDD<String, Iterable<String>> productByLocations = products.groupByKey();
3   productByLocations.saveAsTextFile("/output/3");

This step creates the following output:

# hadoop fs -cat /output/3/p*
(p1,[GA, UT, UT])
(p2,[GA])
(p3,[UT])
(p4,[CA, GA, UT])

Step 11: Create final output (K=product, V=Set(location))

This final step, shown in Example 4-30, removes duplicate locations and creates (K, V2), where V2 is a Tuple2<Set<location>, size>.

Example 4-30. Step 11: create final output (K=product, V=Set<location>) pairs by key
1   // Step 11: create final output (K=product, V=Set<location>) pairs by K
2   JavaPairRDD<String, Tuple2<Set<String>, Integer>> productByUniqueLocations =
3      productByLocations.mapValues(
4         new Function< Iterable<String>,            // input
5                       Tuple2<Set<String>, Integer> // output
6         >() {
7       public Tuple2<Set<String>, Integer> call(Iterable<String>s) {
8          Set<String> uniqueLocations = new HashSet<String>();
9          for (String location : s) {
10             uniqueLocations.add(location);
11         }
12         return new Tuple2<Set<String>, Integer>(uniqueLocations, 
13                                                 uniqueLocations.size());
14      }
15   });
16
17   productByUniqueLocations.saveAsTextFile("/output/4");

This step creates the following final output:

# hadoop fs -cat /output/4/p*
(p1,([UT, GA],2))
(p2,([GA],1))
(p3,([UT],1))
(p4,([UT, GA, CA],3))

Combining steps 10 and 11

It is possible to combine steps 10 and 11 into a single Spark operation. We accomplish this with Spark’s combineByKey(), the most general of the per-key aggregation functions, which enables us to combine values with the same key in a flexible manner. What is the main difference between reduceByKey() and combineByKey()? reduceByKey() reduces values of type V into V (the same data type—for example, adding or multiplying integer values). combineByKey(), however, can combine/transform values of type V into another type, C—for example, we may want to combine/transform integer (V) values into a set of integers (Set<Integer>). In a nutshell, combineByKey() allows us to return values that are not the same type as our input data. To use combineByKey() we need to provide a number of functions. The simplest form of the combineByKey() signature is shown here:

   public <C> JavaPairRDD<K,C> combineByKey(
                                            Function<V,C> createCombiner,
                                            Function2<C,V,C> mergeValue,
                                            Function2<C,C,C> mergeCombiners
                                            )

   Description: Generic function to combine the elements for each key
   using a custom set of aggregation functions. Turns a JavaPairRDD[(K, V)]
   into a result of type JavaPairRDD[(K, C)], for a "combined type" C. Note
   that V and C can be different -- for example, one might group an RDD of
   type (Int, Int) into an RDD of type (Int, List[Int]).
   Users provide three functions:
   - createCombiner, which turns a V into a C (e.g., creates a one-element list)
   - mergeValue, to merge a V into a C (e.g., adds it to the end of a list)
   - mergeCombiners, to combine two Cs into a single one.

Our goal is to create a Set<String> for each key (each key has a list of values, and each value is a string). To accomplish this, we need to implement three basic functions, as shown in Example 4-31.

Example 4-31. Basic functions to be used by combineByKey()
 1 Function<String, Set<String>> createCombiner =
 2   new Function<String, Set<String>>() {
 3   @Override
 4   public Set<String> call(String x) {
 5     Set<String> set = new HashSet<String>();
 6     set.add(x);
 7     return set;
 8   }
 9 };
10 Function2<Set<String>, String, Set<String>> mergeValue =
11   new Function2<Set<String>, String, Set<String>>() {
12   @Override
13   public Set<String> call(Set<String> set, String x) {
14     set.add(x);
15     return set;
16   }
17 };
18  Function2<Set<String>, Set<String>, Set<String>> mergeCombiners =
19   new Function2<Set<String>, Set<String>, Set<String>>() {
20   @Override
21   public Set<String> call(Set<String> a, Set<String> b) {
22     a.addAll(b);
23     return a;
24  }
25 };

After implementing these three basic functions, we are ready to combine steps 10 and 11 by using combineByKey(). Before we do so, let’s identify the input and output in Table 4-4.

Table 4-4. Input, output, and transformer
Input products(K:String, V:String)
Output productUniqueLocations(K: String, V: Set<String>)
Transformer combineByKey()

Example 4-32 shows you how to use the combineByKey() transformer.

Example 4-32. Using combineByKey()
1 JavaPairRDD<String, Set<String>> productUniqueLocations =
2     products.combineByKey(createCombiner, mergeValue, mergeCombiners);
3 // emit the final output
4 Map<String, Set<String>> productMap = productLocations.collectAsMap();
5 for (Entry<String, Set<String>> entry : productMap.entrySet()) {
6   System.out.println(entry.getKey() + ":" + entry.getValue());
7 }

Sample Run on YARN

Input (right table)

# hadoop fs -cat /data/leftouterjoins/users.txt
u1   UT
u2   GA
u3   CA
u4   CA
u5   GA

Input (left table)

# hadoop fs -cat /data/leftouterjoins/transactions.txt
t1   p3   u1   3    330
t2   p1   u2   1    400
t3   p1   u1   3    600
t4   p2   u2  10   1000
t5   p4   u4  9      90
t6   p1   u1  4     120
t7   p4   u1  8     160
t8   p4   u5  2      40

Script

# cat ./run_left_outer_join_spark.sh
#!/bin/bash
export JAVA_HOME=/usr/java/jdk7
export SPARK_HOME=/usr/local/spark-1.0.0
export HADOOP_HOME=/usr/local/hadoop-2.5.0
export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
export YARN_CONF_DIR=$HADOOP_HOME/etc/hadoop
BOOK_HOME=/mp/data-algorithms-book
APP_JAR=$BOOK_HOME/dist/data_algorithms_book.jar
USERS=/data/leftouterjoins/users.txt
TRANSACTIONS=/data/leftouterjoins/transactions.txt
prog=org.dataalgorithms.chap04.spark.SparkLeftOuterJoin
$SPARK_HOME/bin/spark-submit
    --class $prog \
    --master yarn-cluster \
    --num-executors 3 \
    --driver-memory 1g \
    --executor-memory 1g \
    --executor-cores 10 \
    $APP_JAR $USERS $TRANSACTIONS

Generated HDFS output

# hadoop fs -cat /output/1/p*
(u5,(p4,Optional.of(GA)))
(u2,(p1,Optional.of(GA)))
(u2,(p2,Optional.of(GA)))
(u1,(p3,Optional.of(UT)))
(u1,(p1,Optional.of(UT)))
(u1,(p1,Optional.of(UT)))
(u1,(p4,Optional.of(UT)))

# hadoop fs -cat /output/2/p*
(p4,CA)
(p4,GA)
(p1,GA)
(p2,GA)
(p3,UT)
(p1,UT)
(p1,UT)
(p4,UT)

# hadoop fs -cat /output/3/p*
(p1,[GA, UT, UT])
(p2,[GA])
(p3,[UT])
(p4,[CA, GA, UT])

# hadoop fs -cat /output/4/p*
(p1,([UT, GA],2))
(p2,([GA],1))
(p3,([UT],1))
(p4,([UT, GA, CA],3))

This chapter implemented the Left Outer Join design pattern, which is often used in analyzing business transactions in a distributed programming environment. The next chapter introduces another design pattern, Order Inversion, which will be implemented in the MapReduce paradigm.

1 edu.umd.cloud9.io.pair.PairOfStrings (which implements WritableComparable<PairOfStrings>)

2 com.google.common.base.Optional<T> is an abstract class.

Get Data Algorithms now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.