Chapter 4. Common Hadoop Processing Patterns
With an understanding of how to access and process data on Hadoop, we’d like to move on to discuss how to solve some fairly common problems in Hadoop using some of the tools we discussed in Chapter 3. We’ll cover the following data processing tasks, which in addition to being common patterns in processing data on Hadoop, also have a fairly high degree of complexity in implementation:
-
Removing duplicate records by primary key (compaction)
-
Using windowing analytics
-
Updating time series data
We’ll go into more detail on these patterns next, and take a deep dive into how they’re implemented. We’ll present implementations of these patterns in both Spark and SQL (for Impala and Hive). You’ll note that we’re not including implementations in MapReduce; this is because of the size and complexity of the code in MapReduce, as well as the move toward newer processing frameworks such as Spark and abstractions such as SQL.
Pattern: Removing Duplicate Records by Primary Key
Duplicate records are a common occurrence when you are working with data in Hadoop for two primary reasons:
- Resends during data ingest
-
As we’ve discussed elsewhere in the book, it’s difficult to ensure that records are sent exactly once, and it’s not uncommon to have to deal with duplicate records during ingest processing.
- Deltas (updated records)
-
HDFS is a “write once and read many” filesystem. Making modifications at a record level is not a simple thing to do. In the use case of deltas we would have an existing data set with a primary key (or composite key), and we will have updated records being added to that data set.
We cover methods for dealing with the first case, fully duplicate records, elsewhere in this book—for example, in the clickstream case study in Chapter 8—so we’ll discuss the second case, record updates, in this example. This will require implementing processing to rewrite an existing data set so that it only shows the newest versions of each record.
If you’re familiar with HBase, you might have noted that this is similar to the way HBase works; at a high level, a region in HBase has an HFile that has values linked to a key. When new data is added, a second HFile is added with keys and values. During cleanup activities called compactions, HBase does a merge join to execute this deduplication pattern, as shown in Figure 4-1.
Note that we’re omitting some additional complexity, such as HBase’s support for versioning, in the preceding example.
Data Generation for Deduplication Example
Before we get into examples of implementing this pattern, let’s first look at some code to create test data. We are going to use the Scala object GenDedupInput
, which uses the HDFS API to create a file on HDFS and write out records in the following format:
{PrimaryKey},{timeStamp},{value}
We’ll write x records and y unique primary keys. This means if we set x to 100 and y to 10, we will get something close to 10 duplicate records for every primary key as seen in this example:
object GenDedupInput { def main(args:Array[String]): Unit = { if (args.length < 3) { println("{outputPath} {numberOfRecords} {numberOfUniqueRecords}") return } // The output file that will hold the data val outputPath = new Path(args(0)) // Number of records to be written to the file val numberOfRecords = args(1).toLong // Number of unique primary keys val numberOfUniqueRecords = args(2).toInt // Open fileSystem to HDFS val fileSystem = FileSystem.get(new Configuration()) // Create buffered writer val writer = new BufferedWriter( new OutputStreamWriter(fileSystem.create(outputPath))) val r = new Random() // This loop will write out all the records // Every primary key will get about // numberOfRecords/numberOfUniqueRecords records for (i <- 0 until numberOfRecords) { val uniqueId = r.nextInt(numberOfUniqueRecords) // Format: {key}, {timeStamp}, {value} writer.write(uniqueId + "," + i + "," + r.nextInt(10000)) writer.newLine() } writer.close() } }
Code Example: Spark Deduplication in Scala
Now that we’ve created our test data in HDFS, let’s look at the code to deduplicate these records in the SparkDedupExecution
object:
object SparkDedupExecution { def main(args:Array[String]): Unit = { if (args.length < 2) { println("{inputPath} {outputPath}") return } // set up given parameters val inputPath = args(0) val outputPath = args(1) // set up spark conf and connection val sparkConf = new SparkConf().setAppName("SparkDedupExecution") sparkConf.set("spark.cleaner.ttl", "120000"); val sc = new SparkContext(sparkConf) // Read data in from HDFS val dedupOriginalDataRDD = sc.hadoopFile(inputPath, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1) // Get the data in a key-value format val keyValueRDD = dedupOriginalDataRDD.map(t => { val splits = t._2.toString.split(",") (splits(0), (splits(1), splits(2)))}) // reduce by key so we will only get one record for every primary key val reducedRDD = keyValueRDD.reduceByKey((a,b) => if (a._1.compareTo(b._1) > 0) a else b) // Format the data to a human-readable format and write it back out to HDFS reducedRDD .map(r => r._1 + "," + r._2._1 + "," + r._2._2) .saveAsTextFile(outputPath) } }
Let’s break this code down to discuss what’s going on. We’ll skip the setup code, which just gets the user arguments and sets up the SparkContext
, and skip to the following code that will get our duplicate record data from HDFS:
val dedupOriginalDataRDD = sc.hadoopFile(inputPath, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1)
There are many ways to read data in Spark, but for this example we’ll use the hadoopFile()
method so we can show how the existing input formats can be used. If you have done much MapReduce programing, you will be familiar with the TextInputFormat
, which is one of the most basic input formats available. The TextInputFormat
provides functionality that will allow Spark or MapReduce jobs to break up a directory into files, which are then broken up into blocks to be processed by different tasks.
The next item of code is the first map()
function:
val keyValueRDD = dedupOriginalDataRDD.map(t => { val splits = t._2.toString.split(",") (splits(0), (splits(1), splits(2)))})
This code will run in parallel across different workers and parse the incoming records into a Tuple
object that has two values representing a key and a value.
This key-value structure is required for the next piece of code, which will use the reduceByKey()
method. As you might guess by the name, in order to use the reduceByKey()
method we need a key.
Now let’s look at the code that calls reduceByKey()
:
val reducedRDD = keyValueRDD.reduceByKey((a,b) => if (a._1.compareTo(b._1) > 0) a else b)
The reduceByKey()
method takes a function that takes a left and right value and returns a value of the same type. The goal of reduceByKey()
is to combine all values of the same key. In the word count example, it is used to add all the counts of a single word to get the total count. In our example, the a
and b
are strings, and we will return a
or b
depending on which is greater. Since the key we’re reducing by is the primary key, this function will make sure that we only have one record per primary key—hence, deduplicating the data based on the greatest primary key-value.
The last bit of code will just write the results back to HDFS:
reducedRDD .map(r => r._1 + "," + r._2._1 + "," + r._2._2) .saveAsTextFile(outputPath)
We will get a text output file for every partition in Spark, similar to the way MapReduce will output a file for each mapper or reducer at the end of a MapReduce job.
Code Example: Deduplication in SQL
Now we’ll turn to the venerable SQL—well, more accurately, HiveQL, although the examples in this chapter will work with either Hive or Impala. First, we need to put our test data into a table using this data definition language (DDL) query:
CREATE EXTERNAL TABLE COMPACTION_TABLE ( PRIMARY_KEY STRING, TIME_STAMP BIGINT, EVENT_VALUE STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION 'compaction_data';
Now that we have a table, let’s look at the query to perform the deduplication:
SELECT A.PRIMARY_KEY, A.TIME_STAMP, MAX(A.EVENT_VALUE) FROM COMPACTION_TABLE A JOIN ( SELECT PRIMARY_KEY AS P_KEY, MAX(TIME_STAMP) as TIME_SP FROM COMPACTION_TABLE GROUP BY PRIMARY_KEY ) B WHERE A.PRIMARY_KEY = B.P_KEY AND A.TIME_STAMP = B.TIME_SP GROUP BY A.PRIMARY_KEY, A.TIME_STAMP
Here we have a two-level-deep SQL query. The deepest SELECT
is getting the latest TIME_STAMP
for all the PRIMARY_KEY
records. The outer SELECT
statement is taking the results from the inner SELECT
statement to pull out the latest EVENT_VALUE
. Also note that we apply a MAX()
function to the EVENT_VALUE
value; this is because we only want a single value, so if we have two EVENT_VALUE
s with the same timestamp we’ll select the one with the greatest value to keep for our new record.
Pattern: Windowing Analysis
Windowing functions provide the ability to scan over an ordered sequence of events over some window—for example, a specific slice of time. This pattern is very powerful and is useful in many industries:
-
It can be used in finance to gain a better understanding of changing security prices.
-
In sensor monitoring, it’s useful in predicting failure from abnormalities in readings.
-
It can be used in churn analysis for trying to predict if a customer will leave you based on behavior patterns.
-
In gaming, it can help to identify trends as users progress from novice to expert.
To illustrate, we’ll use an example that relates to the finance use case: finding peaks and valleys in equity prices in order to provide some insight into price changes. A peak is a record that has a lower price before it and a lower price after it, while a valley is just the opposite, with higher prices on both sides, as shown in Figure 4-2.
To implement this example we’ll need to maintain a window of stock prices in order to determine where the peaks and valleys occur.
Note that a simple example like this makes it possible for us to show the solution in both SQL and Spark. As windowing analysis gets more complex, SQL becomes a less suitable solution.
Data Generation for Windowing Analysis Example
Let’s create some test data containing records with a value that goes up and down, similar to stock prices. The following code example takes the same input parameters as our last data generation tool—numberOfRecords
and numberOfUniqueIds
—although the resulting records will be somewhat different:
- Primary key
-
An identifier for each sequence of events we are analyzing—for example, a stock ticker symbol. This will be based on the
numberOfUniqueIds
input parameter. - Incrementing counter
- Event value
-
This will have a value that increases and decreases for a random set of records for a given primary key.
Let’s take a look at the code to generate this test data:
def main(args: Array[String]): Unit = { if (args.length == 0) { println("{outputPath} {numberOfRecords} {numberOfUniqueIds}") return } val outputPath = new Path(args(0)) val numberOfRecords = args(1).toInt val numberOfUniqueIds = args(2).toInt val fileSystem = FileSystem.get(new Configuration()) val writer = new BufferedWriter( new OutputStreamWriter(fileSystem.create(outputPath))) val r = new Random() var direction = 1 var directionCounter = r.nextInt(numberOfUniqueIds * 10) var currentPointer = 0 for (i <- 0 until numberOfRecords) { val uniqueId = r.nextInt(numberOfUniqueIds) currentPointer = currentPointer + direction directionCounter = directionCounter - 1 if (directionCounter == 0) { var directionCounter = r.nextInt(numberOfUniqueIds * 10) direction = direction * -1 } writer.write(uniqueId + "," + i + "," + currentPointer) writer.newLine() } writer.close() }
Code Example: Peaks and Valleys in Spark
Now, let’s look at the code to implement this pattern in Spark. There’s quite a bit going on in the following code example, so after we present the code we’ll drill down further to help you to understand what’s going on.
You’ll find this code in the SparkPeaksAndValleysExecution.scala file:
object SparkPeaksAndValleysExecution { def main(args: Array[String]): Unit = { if (args.length == 0) { println("{inputPath} {outputPath} {numberOfPartitions}") return } val inputPath = args(0) val outputPath = args(1) val numberOfPartitions = args(2).toInt val sparkConf = new SparkConf().setAppName("SparkTimeSeriesExecution") sparkConf.set("spark.cleaner.ttl", "120000"); val sc = new SparkContext(sparkConf) // Read in the data var originalDataRDD = sc.hadoopFile(inputPath, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1).map(r => { val splits = r._2.toString.split(",") (new DataKey(splits(0), splits(1).toLong), splits(2).toInt) }) // Partitioner to partition by primaryKey only val partitioner = new Partitioner { override def numPartitions: Int = numberOfPartitions override def getPartition(key: Any): Int = { Math.abs(key.asInstanceOf[DataKey].uniqueId.hashCode() % numPartitions) } } // Partition and sort val partedSortedRDD = new ShuffledRDD[DataKey, Int, Int]( originalDataRDD, partitioner).setKeyOrdering(implicitly[Ordering[DataKey]]) // MapPartition to do windowing val pivotPointRDD = partedSortedRDD.mapPartitions(it => { val results = new mutable.MutableList[PivotPoint] // Keeping context var lastUniqueId = "foobar" var lastRecord: (DataKey, Int) = null var lastLastRecord: (DataKey, Int) = null var position = 0 it.foreach( r => { position = position + 1 if (!lastUniqueId.equals(r._1.uniqueId)) { lastRecord = null lastLastRecord = null } // Finding peaks and valleys if (lastRecord != null && lastLastRecord != null) { if (lastRecord._2 < r._2 && lastRecord._2 < lastLastRecord._2) { results.+=(new PivotPoint(r._1.uniqueId, position, lastRecord._1.eventTime, lastRecord._2, false)) } else if (lastRecord._2 > r._2 && lastRecord._2 > lastLastRecord._2) { results.+=(new PivotPoint(r._1.uniqueId, position, lastRecord._1.eventTime, lastRecord._2, true)) } } lastUniqueId = r._1.uniqueId lastLastRecord = lastRecord lastRecord = r }) results.iterator }) // Format output pivotPointRDD.map(r => { val pivotType = if (r.isPeak) "peak" else "valley" r.uniqueId + "," + r.position + "," + r.eventTime + "," + r.eventValue + "," + pivotType } ).saveAsTextFile(outputPath) } class DataKey(val uniqueId:String, val eventTime:Long) extends Serializable with Comparable[DataKey] { override def compareTo(other:DataKey): Int = { val compare1 = uniqueId.compareTo(other.uniqueId) if (compare1 == 0) { eventTime.compareTo(other.eventTime) } else { compare1 } } } class PivotPoint(val uniqueId: String, val position:Int, val eventTime:Long, val eventValue:Int, val isPeak:boolean) extends Serializable {} }
Nothing too interesting here: we’re simply reading the input data and parsing it into easy-to-consume objects.
This is where things get interesting. We’re defining a partition here, just like defining a custom partitioner with MapReduce. A partition will help us to decide which records go to which worker after the shuffle process. We need a custom partitioner here because we have a two-part key:
primary_key
andposition
. We want to sort by both, but we only want to partition by theprimary_key
so we get output like that shown in Figure 4-3.This is the shuffle action that will partition and sort the data for us. Note that the 1.3 release of Spark provides a transformation called
repartitionAndSortWithinPartitions()
, which would provide this functionality for us, but since this is coded with Spark 1.2 we need to manually implement the shuffle.This
mapPartition()
method will allow us to run through theprimary_key
in the order of the position. This is where the windowing will happen.This is context information we need in order to find peaks and valleys and to know if we have changed
primary_key
s. Remember, to find a peak or a valley we will need to know of the record before and the one after. So we will have thecurrentRow
,lastRow
, andlastLastRow
, and we can determine if thelastRow
is a peak or valley by comparing it against the others.Perform comparisons to determine if we’re in a peak or in a valley.
And finally, this is the code that will format the records and write them to HDFS.
Code Example: Peaks and Valleys in SQL
As in the previous example, we’ll first create a table over our test data:
CREATE EXTERNAL TABLE PEAK_AND_VALLEY_TABLE ( PRIMARY_KEY STRING, POSITION BIGINT, EVENT_VALUE BIGINT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION 'PeakAndValleyData';
Now that we have our table we need to order the records and then use the lead()
and lag()
functions, which will provide the context of the surrounding records:
SELECT PRIMARY_KEY, POSITION, EVENT_VALUE, CASE WHEN LEAD_EVENT_VALUE is null OR LAG_EVENT_VALUE is null THEN 'EDGE' WHEN EVENT_VALUE < LEAD_EVENT_VALUE AND EVENT_VALUE < LAG_EVENT_VALUE THEN 'VALLEY' WHEN EVENT_VALUE > LEAD_EVENT_VALUE AND EVENT_VALUE > LAG_EVENT_VALUE THEN 'PEAK' ELSE 'SLOPE' END AS POINT_TYPE FROM ( SELECT PRIMARY_KEY, POSITION, EVENT_VALUE, LEAD(EVENT_VALUE,1,null) OVER (PARTITION BY PRIMARY_KEY ORDER BY POSITION) AS LEAD_EVENT_VALUE, LAG(EVENT_VALUE,1,null) OVER (PARTITION BY PRIMARY_KEY ORDER BY POSITION) AS LAG_EVENT_VALUE FROM PEAK_AND_VALLEY_TABLE ) A
Although this SQL is not overly complex, it is big enough that we should break it down to explain what’s going on:
After execution of the subquery in step 2, we have the data organized in such a way that all the information we need is in the same record and we can now use that record to determine if we have one of the following: an edge, a point on the leftmost or rightmost part of the window timeline; a peak, a value that has a lower value before and after it; a valley, a value that has a higher value before and after it; or a slope point, a value that has a lower value either before or after it and a higher value either before or after it.
The subquery is where we’re doing all the windowing logic. This query is putting the values that appear before and after the current value in the same row. Figure 4-4 shows an example of the input and output of this subquery.
Windowing and SQL
Even in this example the SQL code is more concise than Spark, and could be considered simpler. At the same time we need to consider the limitations; doing multiple complex windowing operations in SQL will mean an increase in complexity that will need to read and write to disk, which will increase I/O and lead to a corresponding decrease in performance. With Spark we only need to order the data once and then do N windowing operations on it, using the functionality provided by Java or Scala to hold information in memory and perform operations.
Pattern: Time Series Modifications
For our final example we are going to mix components from the first two examples. We will update records based on a primary key while also keeping all of the history. This will allow a record to know when it was current and when it expired, providing information about an entity and a start and stop time, as shown in Figure 4-5.
The start and stop times will mark the time range over which that record represented the truth. If the stop time is null
, this means that the record is the current truth for that entity. In the preceding example, we can see the current price of Apple stock is $42.
The problem comes in when we need to update this table. As shown in Figure 4-6, when we add a new record of price 43, we need to close out the previous record with a new end time that equals the new record’s start time. The darker cells are cells that need to be updated.
At face value this looks like a very simple problem, but in fact, can be complex to implement when you are dealing with updates in large data sets. We’ll discuss some of the challenges and approaches to address them in the following discussion.
Use HBase and Versioning
One common way of storing this information is as versions in HBase. HBase has a way to store every change you make to a record as versions. Versions are defined at a column level and ordered by modification timestamp, so you can go to any point in time to see what the record looked like.
The advantage to this approach is that modifications are very fast, as is getting the latest values. There is, however, some penalty in getting historical versions and a major disadvantage in performing large scans or block cache reads.
Large scans will suffer because they have to pass over all the versions of one record before reaching the next record. So the larger your version count, the slower your large scans will be.
Block cache reads are disadvantageous because when retrieving a record HBase pulls a whole 64 KB HFile block into memory, and if your history is large, it may be pulling into memory other versions instead of the actual record you want.
Lastly, this data model does have the start and stop time in the same record. You need to look at a couple of versions to figure out when a version started and ended.
Use HBase with a RowKey of RecordKey and StartTime
In order to have HBase include the start and stop time in the same record, we have to create a composite key of RecordKey
and StartTime
. Now the difference from the versioning solution is that the record will have a column for stop time.
Note that start time in the RowKey
will need to be a reverse epoch number so that it can be sorted from newest to oldest.
So, to modify such a schema when a new version of a record is added, we will need to first do a single-record scan with the RecordKey
as the start RowKey
. This will return the most current version of the RecordKey
. When we have that information we need to do two put
s: one to update the stop time in the current record, and the other to add a new current record.
When we want to get a version for any given time, we just do a single-row scan of RecordKey
and the reverse epoch of the time for the record we wish to fetch.
This solution still has the large scan and block cache problems, but now we can get a version quickly and have the stop time in the same row. Just remember this comes at the cost of an additional get
and put
upon insertion of the record.
Use HDFS and Rewrite the Whole Table
If we remove HBase from the picture and just do the simplest HDFS implementation, we would have all the data in HDFS and we’d refresh the table as we get new data on some periodic basis—for example, once daily.
This solution might seem very expensive, but with Hadoop we can re-write terabytes of data in a short period of time. As data sets get larger, there are techniques we can use to optimize the execution—for example, separate partitions for the most current records. We’ll discuss that solution next.
Use Partitions on HDFS for Current and Historical Records
A smarter implementation on HDFS is to put the most current records in one partition and the historic records in another partition. This would allow you to rewrite just the latest version, not all the versions in history. Then we only append the new records to the older record’s partition.
The big win here is that the execution time is fixed to the number of current records as opposed to getting longer and longer with every version of history added.
We’ll provide an example of this technique next, again using Spark and SQL. For purposes of illustration, we’ll use a simple, contrived data set composed of a unique ID, event time, and a random integer, but it should be easy to imagine how this technique could be extended to real-world, time-based data, such as the stock ticker example described earlier.
Data Generation for Time Series Example
Let’s create our data set for our time series example, again using Scala and the HDFS FileSystem API:
def main(args: Array[String]): Unit = { if (args.length == 0) { println("{outputPath} {numberOfRecords} {numberOfUniqueRecords} {startTime}") return } val outputPath = new Path(args(0)) val numberOfRecords = args(1).toInt val numberOfUniqueRecords = args(2).toInt val startTime = args(3).toInt val fileSystem = FileSystem.get(new Configuration()) val writer = new BufferedWriter(new OutputStreamWriter(fileSystem.create(outputPath))) val r = new Random for (i <- 0 until numberOfRecords) { val uniqueId = r.nextInt(numberOfUniqueRecords) val madeUpValue = r.nextInt(1000) val eventTime = i + startTime writer.write(uniqueId + "," + eventTime + "," + madeUpValue) writer.newLine() } writer.close() }
Looking at this data generator, you will note it is very close in design to the previous examples of data generation.
Code Example: Time Series in Spark
Now, we move on to our Spark code implementation to update our single partition of time series data with the updated start and stop times. You can find this code in the GitHub in the SparkTimeSeriesExecution
Scala object. This is the largest example we have in this chapter, but we’ll walk you through the code to explain what’s going on.
object SparkTimeSeriesExecution { def main(args: Array[String]): Unit = { if (args.length == 0) { println("{newDataInputPath} " + "{outputPath} " + "{numberOfPartitions}") println("or") println("{newDataInputPath} " + "{existingTimeSeriesDataInputPath} " + "{outputPath} " + "{numberOfPartitions}") return } val newDataInputPath = args(0) val existingTimeSeriesDataInputPath = if (args.length == 4) args(1) else null val outputPath = args(args.length - 2) val numberOfPartitions = args(args.length - 1).toInt val sparkConf = new SparkConf().setAppName("SparkTimeSeriesExecution") sparkConf.set("spark.cleaner.ttl", "120000"); val sc = new SparkContext(sparkConf) // Load data from HDFS var unendedRecordsRDD = sc.hadoopFile(newDataInputPath, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1).map(r => { val splits = r._2.toString.split(",") (new TimeDataKey(splits(0), splits(1).toLong), new TimeDataValue(-1, splits(2))) }) var endedRecordsRDD:RDD[(TimeDataKey, TimeDataValue)] = null // Get existing records if they exist if (existingTimeSeriesDataInputPath != null) { val existingDataRDD = sc.hadoopFile(existingTimeSeriesDataInputPath, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], 1).map(r => { val splits = r._2.toString.split(",") (new TimeDataKey(splits(0), splits(1).toLong), new TimeDataValue(splits(2).toLong, splits(3))) }) unendedRecordsRDD = unendedRecordsRDD .union(existingDataRDD.filter(r => r._2.endTime == -1)) endedRecordsRDD = existingDataRDD.filter(r => r._2.endTime > -1) } // Define our partitioner val partitioner = new Partitioner { override def numPartitions: Int = numberOfPartitions override def getPartition(key: Any): Int = { Math.abs( key.asInstanceOf[TimeDataKey].uniqueId.hashCode() % numPartitions) } } val partedSortedRDD = new ShuffledRDD[TimeDataKey, TimeDataValue, TimeDataValue]( unendedRecordsRDD, partitioner).setKeyOrdering(implicitly[Ordering[TimeDataKey]]) // Walk down each primaryKey to make sure the stop times are updated var updatedEndedRecords = partedSortedRDD.mapPartitions(it => { val results = new mutable.MutableList[(TimeDataKey, TimeDataValue)] var lastUniqueId = "foobar" var lastRecord: (TimeDataKey, TimeDataValue) = null it.foreach(r => { if (!r._1.uniqueId.equals(lastUniqueId)) { if (lastRecord != null) { results.+=(lastRecord) } lastUniqueId = r._1.uniqueId lastRecord = null } else { if (lastRecord != null) { lastRecord._2.endTime = r._1.startTime results.+=(lastRecord) } } lastRecord = r }) if (lastRecord != null) { results.+=(lastRecord) } results.iterator }) // If there were existing records union them back in if (endedRecordsRDD != null) { updatedEndedRecords = updatedEndedRecords.union(endedRecordsRDD) } // Format and save the results to HDFS updatedEndedRecords .map(r => r._1.uniqueId + "," + r._1.startTime + "," + r._2.endTime + "," + r._2.data) .saveAsTextFile(outputPath) } class TimeDataKey(val uniqueId:String, val startTime:Long) extends Serializable with Comparable[TimeDataKey] { override def compareTo(other:TimeDataKey): Int = { val compare1 = uniqueId.compareTo(other.uniqueId) if (compare1 == 0) { startTime.compareTo(other.startTime) } else { compare1 } } } class TimeDataValue(var endTime:Long, val data:String) extends Serializable {} }
As in previous code examples, here we are just reading in the new data from HDFS.
Unlike previous examples in this chapter, in this case we have the possibility of two inputs: the new data and the data from the existing table in HDFS. We’re making the existing data set optional here, because the first time we add records we obviously won’t have an existing data set. Note that we will filter out any records that already have
endTime
s. This is because we don’t need them to go through the shuffle code and be transferred over the network and sorted. We will union these values back in later.Just as in the peak and valley example, we are going to need a custom partition and shuffle. This will be a common pattern we’ll use whenever we need to traverse over a data set in order by a given key. The partitioning here is similar to previous examples: we want to partition on the
primaryKey
and sort on the combination of theprimaryKey
and thestartTime
.This is the code that will traverse each
primaryKey
and update records that need new stop times.Here is where we union in the existing records that already had
endTime
s with aunion()
method.And finally, this is where we write out the formatted results to HDFS.
Code Example: Time Series in SQL
As before, we need to first set up our source tables for Hive or Impala. In this example we are going to have two tables: one for the existing time series data and one for the new data:
CREATE EXTERNAL TABLE EXISTING_TIME_SERIES_TABLE ( PRIMARY_KEY STRING, EFFECTIVE_DT BIGINT, EXPIRED_DT BIGINT, EVENT_VALUE STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION 'ExistingTimeSeriesData'; CREATE EXTERNAL TABLE NEW_TIME_SERIES_TABLE ( PRIMARY_KEY STRING, EFFECTIVE_DT BIGINT, EVENT_VALUE STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS TEXTFILE LOCATION 'NewTimeSeriesData';
The two tables are very close except the new records don’t have an expired date.
Note
Note that in a real-world use case we would likely have multiple partitions in our data set—for example, for current and historic records. This is because we don’t really need to read the existing expired records to perform this processing. We only need to read the existing active records to see if they have been expired with the addition of new records in the NEW_TIME_SERIES_TABLE
.
Therefore, the current records would be rewritten, whereas the history would only be appended to.
Now let’s take these two tables and populate a result table with the updated records:
SELECT PRIMARY_KEY, EFFECTIVE_DT, CASE WHEN LEAD(EFFECTIVE_DT,1,null) OVER (PARTITION BY PRIMARY_KEY ORDER BY EFFECTIVE_DT) IS NULL THEN NULL ELSE LEAD(EFFECTIVE_DT,1,null) OVER (PARTITION BY PRIMARY_KEY ORDER BY EFFECTIVE_DT) END AS EXPIRED_DT, EVENT_VALUE FROM ( SELECT PRIMARY_KEY, EFFECTIVE_DT, EVENT_VALUE FROM EXISTING_TIME_SERIES_TABLE WHERE EXPIRED_DT IS NULL UNION ALL SELECT PRIMARY_KEY, EFFECTIVE_DT, EVENT_VALUE FROM NEW_TIME_SERIES_TABLE ) sub_1 UNION ALL SELECT PRIMARY_KEY, EFFECTIVE_DT, EXPIRED_DT, EVENT_VALUE FROM EXISTING_TIME_SERIES_TABLE WHERE EXPIRED_DT IS NOT NULL
This is a fairly long query, so let’s break it down. At the top level there are two main SELECT
s that are unioned together. The first one handles updating existing current records and new records to see which records have expired. The second one is just moving over the existing expired records. If we used the partition strategy described earlier we wouldn’t need this second top-level SELECT
statement.
Focusing on the first SELECT
, shown in the following snippet, notice that in the subquery there is a union of the two starting tables. Note that we are filtering out the existing expired tables from the EXISTING_TIME_SERIES_TABLE
; this is because we don’t want the extra work of resorting all those records in the windowing function:
SELECT PRIMARY_KEY EFFECTIVE_DT, EVENT_VALUE FROM EXISTING_TIME_SERIES_TABLE WHERE EXPIRED_DT IS NULL UNION ALL SELECT PRIMARY_KEY, EFFECTIVE_DT, EVENT_VALUE FROM NEW_TIME_SERIES_TABLE
The results of that inner SELECT
will be partitioned by the primary key and ordered by the effective date and time. This will allow us to ask, “Is there a record with a greater timestamp and the same primary key, and that also has an effective date?” If the answer is yes, this record is expired. Here is that portion of the query:
PRIMARY_KEY, EFFECTIVE_DT, CASE WHEN LEAD(EFFECTIVE_DT,1,null) OVER (PARTITION BY PRIMARY_KEY ORDER BY EFFECTIVE_DT) IS NULL THEN NULL ELSE LEAD(EFFECTIVE_DT,1,null) OVER (PARTITION BY PRIMARY_KEY ORDER BY EFFECTIVE_DT) END AS EXPIRED_DT, EVENT_VALUE
As you can see, this is a somewhat complex query, but it’s still more concise than the Spark code. In this case Spark or SQL are both good choices, and your choice will likely be determined by comfort and familiarity with one or the other.
Conclusion
To wrap up this chapter, here are a few takeaways:
-
We can do some pretty complex stuff with Spark and SQL in Hadoop.
-
We don’t have to abandon SQL when moving to Hadoop, and in fact SQL remains a powerful abstraction for processing and analyzing data in Hadoop, just as it is with more traditional data management systems.
-
We can attack problems with different tools, each giving us a different approach to solving the problem.
It’s likely that SQL-on-Hadoop will become an increasingly powerful tool with the introduction of new tools like Impala and Hive on Spark, but it’s unlikely that SQL will replace Spark and other processing frameworks for solving a number of problems. Ultimately it will be necessary for well-rounded Hadoop developers to understand the available tools in order to evaluate the appropriate one to solve their particular problem.
Get Hadoop Application Architectures now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.