Chapter 4. Common Hadoop Processing Patterns

With an understanding of how to access and process data on Hadoop, we’d like to move on to discuss how to solve some fairly common problems in Hadoop using some of the tools we discussed in Chapter 3. We’ll cover the following data processing tasks, which in addition to being common patterns in processing data on Hadoop, also have a fairly high degree of complexity in implementation:

  • Removing duplicate records by primary key (compaction)

  • Using windowing analytics

  • Updating time series data

We’ll go into more detail on these patterns next, and take a deep dive into how they’re implemented. We’ll present implementations of these patterns in both Spark and SQL (for Impala and Hive). You’ll note that we’re not including implementations in MapReduce; this is because of the size and complexity of the code in MapReduce, as well as the move toward newer processing frameworks such as Spark and abstractions such as SQL.

Pattern: Removing Duplicate Records by Primary Key

Duplicate records are a common occurrence when you are working with data in Hadoop for two primary reasons:

Resends during data ingest

As we’ve discussed elsewhere in the book, it’s difficult to ensure that records are sent exactly once, and it’s not uncommon to have to deal with duplicate records during ingest processing.

Deltas (updated records)

HDFS is a “write once and read many” filesystem. Making modifications at a record level is not a simple thing to do. In the use case of deltas we would have an existing data set with a primary key (or composite key), and we will have updated records being added to that data set.

We cover methods for dealing with the first case, fully duplicate records, elsewhere in this book—for example, in the clickstream case study in Chapter 8—so we’ll discuss the second case, record updates, in this example. This will require implementing processing to rewrite an existing data set so that it only shows the newest versions of each record.

If you’re familiar with HBase, you might have noted that this is similar to the way HBase works; at a high level, a region in HBase has an HFile that has values linked to a key. When new data is added, a second HFile is added with keys and values. During cleanup activities called compactions, HBase does a merge join to execute this deduplication pattern, as shown in Figure 4-1.

hdaa 0501
Figure 4-1. HBase compaction

Note that we’re omitting some additional complexity, such as HBase’s support for versioning, in the preceding example.

Data Generation for Deduplication Example

Before we get into examples of implementing this pattern, let’s first look at some code to create test data. We are going to use the Scala object GenDedupInput, which uses the HDFS API to create a file on HDFS and write out records in the following format:

{PrimaryKey},{timeStamp},{value}

We’ll write x records and y unique primary keys. This means if we set x to 100 and y to 10, we will get something close to 10 duplicate records for every primary key as seen in this example:

object GenDedupInput {
  def main(args:Array[String]): Unit = {
    if (args.length < 3) {
      println("{outputPath} {numberOfRecords} {numberOfUniqueRecords}")
      return
    }

    // The output file that will hold the data
    val outputPath = new Path(args(0))
    // Number of records to be written to the file
    val numberOfRecords = args(1).toLong
    // Number of unique primary keys
    val numberOfUniqueRecords = args(2).toInt

    // Open fileSystem to HDFS
    val fileSystem = FileSystem.get(new Configuration())

    // Create buffered writer
    val writer = new BufferedWriter(
      new OutputStreamWriter(fileSystem.create(outputPath)))

    val r = new Random()

    // This loop will write out all the records
    // Every primary key will get about
    // numberOfRecords/numberOfUniqueRecords records
    for (i <- 0 until numberOfRecords) {
      val uniqueId = r.nextInt(numberOfUniqueRecords)
      // Format: {key}, {timeStamp}, {value}
      writer.write(uniqueId + "," + i + "," + r.nextInt(10000))
      writer.newLine()
    }

    writer.close()
  }
}

Code Example: Spark Deduplication in Scala

Now that we’ve created our test data in HDFS, let’s look at the code to deduplicate these records in the SparkDedupExecution object:

object SparkDedupExecution {
  def main(args:Array[String]): Unit = {
    if (args.length < 2) {
      println("{inputPath} {outputPath}")
      return
    }

    // set up given parameters
    val inputPath = args(0)
    val outputPath = args(1)

    // set up spark conf and connection
    val sparkConf = new SparkConf().setAppName("SparkDedupExecution")
    sparkConf.set("spark.cleaner.ttl", "120000");
    val sc = new SparkContext(sparkConf)

    // Read data in from HDFS
    val dedupOriginalDataRDD = sc.hadoopFile(inputPath,
       classOf[TextInputFormat],
       classOf[LongWritable],
       classOf[Text],
       1)

    // Get the data in a key-value format
    val keyValueRDD = dedupOriginalDataRDD.map(t => {
      val splits = t._2.toString.split(",")
      (splits(0), (splits(1), splits(2)))})

    // reduce by key so we will only get one record for every primary key
    val reducedRDD =
        keyValueRDD.reduceByKey((a,b) => if (a._1.compareTo(b._1) > 0) a else b)

    // Format the data to a human-readable format and write it back out to HDFS
    reducedRDD
      .map(r => r._1 + "," + r._2._1 + "," + r._2._2)
      .saveAsTextFile(outputPath)
  }
}

Let’s break this code down to discuss what’s going on. We’ll skip the setup code, which just gets the user arguments and sets up the SparkContext, and skip to the following code that will get our duplicate record data from HDFS:

val dedupOriginalDataRDD = sc.hadoopFile(inputPath,
   classOf[TextInputFormat],
   classOf[LongWritable],
   classOf[Text],
   1)

There are many ways to read data in Spark, but for this example we’ll use the hadoopFile() method so we can show how the existing input formats can be used. If you have done much MapReduce programing, you will be familiar with the TextInputFormat, which is one of the most basic input formats available. The TextInputFormat provides functionality that will allow Spark or MapReduce jobs to break up a directory into files, which are then broken up into blocks to be processed by different tasks.

The next item of code is the first map() function:

val keyValueRDD = dedupOriginalDataRDD.map(t => {
  val splits = t._2.toString.split(",")
  (splits(0), (splits(1), splits(2)))})

This code will run in parallel across different workers and parse the incoming records into a Tuple object that has two values representing a key and a value.

This key-value structure is required for the next piece of code, which will use the reduceByKey() method. As you might guess by the name, in order to use the reduceByKey() method we need a key.

Now let’s look at the code that calls reduceByKey():

val reducedRDD =
    keyValueRDD.reduceByKey((a,b) => if (a._1.compareTo(b._1) > 0) a else b)

The reduceByKey() method takes a function that takes a left and right value and returns a value of the same type. The goal of reduceByKey() is to combine all values of the same key. In the word count example, it is used to add all the counts of a single word to get the total count. In our example, the a and b are strings, and we will return a or b depending on which is greater. Since the key we’re reducing by is the primary key, this function will make sure that we only have one record per primary key—hence, deduplicating the data based on the greatest primary key-value.

The last bit of code will just write the results back to HDFS:

reducedRDD
    .map(r => r._1 + "," + r._2._1 + "," + r._2._2)
    .saveAsTextFile(outputPath)

We will get a text output file for every partition in Spark, similar to the way MapReduce will output a file for each mapper or reducer at the end of a MapReduce job.

Code Example: Deduplication in SQL

Now we’ll turn to the venerable SQL—well, more accurately, HiveQL, although the examples in this chapter will work with either Hive or Impala. First, we need to put our test data into a table using this data definition language (DDL) query:

CREATE EXTERNAL TABLE COMPACTION_TABLE (
  PRIMARY_KEY STRING,
  TIME_STAMP BIGINT,
  EVENT_VALUE STRING
  )
 ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
 STORED AS TEXTFILE
 LOCATION 'compaction_data';

Now that we have a table, let’s look at the query to perform the deduplication:

SELECT
 A.PRIMARY_KEY,
 A.TIME_STAMP,
 MAX(A.EVENT_VALUE)
FROM COMPACTION_TABLE A JOIN (
    SELECT
     PRIMARY_KEY AS P_KEY,
     MAX(TIME_STAMP) as TIME_SP
    FROM COMPACTION_TABLE
    GROUP BY PRIMARY_KEY
  ) B
WHERE A.PRIMARY_KEY = B.P_KEY AND A.TIME_STAMP = B.TIME_SP
GROUP BY A.PRIMARY_KEY, A.TIME_STAMP

Here we have a two-level-deep SQL query. The deepest SELECT is getting the latest TIME_STAMP for all the PRIMARY_KEY records. The outer SELECT statement is taking the results from the inner SELECT statement to pull out the latest EVENT_VALUE. Also note that we apply a MAX() function to the EVENT_VALUE value; this is because we only want a single value, so if we have two EVENT_VALUEs with the same timestamp we’ll select the one with the greatest value to keep for our new record.

Pattern: Windowing Analysis

Windowing functions provide the ability to scan over an ordered sequence of events over some window—for example, a specific slice of time. This pattern is very powerful and is useful in many industries:

  • It can be used in finance to gain a better understanding of changing security prices.

  • In sensor monitoring, it’s useful in predicting failure from abnormalities in readings.

  • It can be used in churn analysis for trying to predict if a customer will leave you based on behavior patterns.

  • In gaming, it can help to identify trends as users progress from novice to expert.

To illustrate, we’ll use an example that relates to the finance use case: finding peaks and valleys in equity prices in order to provide some insight into price changes. A peak is a record that has a lower price before it and a lower price after it, while a valley is just the opposite, with higher prices on both sides, as shown in Figure 4-2.

hdaa 05in01
Figure 4-2. Peaks and valleys in stock prices over time

To implement this example we’ll need to maintain a window of stock prices in order to determine where the peaks and valleys occur.

Note that a simple example like this makes it possible for us to show the solution in both SQL and Spark. As windowing analysis gets more complex, SQL becomes a less suitable solution.

Data Generation for Windowing Analysis Example

Let’s create some test data containing records with a value that goes up and down, similar to stock prices. The following code example takes the same input parameters as our last data generation tool—numberOfRecords and numberOfUniqueIds—although the resulting records will be somewhat different:

Primary key

An identifier for each sequence of events we are analyzing—for example, a stock ticker symbol. This will be based on the numberOfUniqueIds input parameter.

Incrementing counter

This will be unique for every record in the generated data.

Event value

This will have a value that increases and decreases for a random set of records for a given primary key.

Let’s take a look at the code to generate this test data:

def main(args: Array[String]): Unit = {
  if (args.length == 0) {
   println("{outputPath} {numberOfRecords} {numberOfUniqueIds}")
   return
  }

  val outputPath = new Path(args(0))
  val numberOfRecords = args(1).toInt
  val numberOfUniqueIds = args(2).toInt

  val fileSystem = FileSystem.get(new Configuration())

  val writer =
      new BufferedWriter( new OutputStreamWriter(fileSystem.create(outputPath)))

  val r = new Random()

  var direction = 1
  var directionCounter = r.nextInt(numberOfUniqueIds * 10)
  var currentPointer = 0

  for (i <- 0 until numberOfRecords) {
    val uniqueId = r.nextInt(numberOfUniqueIds)

    currentPointer = currentPointer + direction
    directionCounter = directionCounter - 1
    if (directionCounter == 0) {
      var directionCounter = r.nextInt(numberOfUniqueIds * 10)
      direction = direction * -1
    }

    writer.write(uniqueId + "," + i + "," + currentPointer)
    writer.newLine()
  }

  writer.close()
 }

Code Example: Peaks and Valleys in Spark

Now, let’s look at the code to implement this pattern in Spark. There’s quite a bit going on in the following code example, so after we present the code we’ll drill down further to help you to understand what’s going on.

You’ll find this code in the SparkPeaksAndValleysExecution.scala file:

object SparkPeaksAndValleysExecution {
  def main(args: Array[String]): Unit = {
    if (args.length == 0) {
      println("{inputPath} {outputPath} {numberOfPartitions}")
      return
    }

    val inputPath = args(0)
    val outputPath = args(1)
    val numberOfPartitions = args(2).toInt

    val sparkConf = new SparkConf().setAppName("SparkTimeSeriesExecution")
    sparkConf.set("spark.cleaner.ttl", "120000");

    val sc = new SparkContext(sparkConf)

    // Read in the data
    var originalDataRDD = sc.hadoopFile(inputPath,                   1
      classOf[TextInputFormat],
      classOf[LongWritable],
      classOf[Text],
      1).map(r => {
      val splits = r._2.toString.split(",")
      (new DataKey(splits(0), splits(1).toLong), splits(2).toInt)
    })

    // Partitioner to partition by primaryKey only
    val partitioner = new Partitioner {                              2
      override def numPartitions: Int = numberOfPartitions

      override def getPartition(key: Any): Int = {
        Math.abs(key.asInstanceOf[DataKey].uniqueId.hashCode() % numPartitions)
      }
    }

    // Partition and sort
    val partedSortedRDD =                                            3
      new ShuffledRDD[DataKey, Int, Int](
        originalDataRDD,
        partitioner).setKeyOrdering(implicitly[Ordering[DataKey]])

    // MapPartition to do windowing
    val pivotPointRDD = partedSortedRDD.mapPartitions(it => {        4

      val results = new mutable.MutableList[PivotPoint]

      // Keeping context
      var lastUniqueId = "foobar"                                    5
      var lastRecord: (DataKey, Int) = null
      var lastLastRecord: (DataKey, Int) = null

      var position = 0

      it.foreach( r => {
        position = position + 1

        if (!lastUniqueId.equals(r._1.uniqueId)) {

          lastRecord = null
          lastLastRecord = null
        }

        // Finding peaks and valleys
        if (lastRecord != null && lastLastRecord != null) {         6
          if (lastRecord._2 < r._2 && lastRecord._2 < lastLastRecord._2) {
            results.+=(new PivotPoint(r._1.uniqueId,
              position,
              lastRecord._1.eventTime,
              lastRecord._2,
              false))
          } else if (lastRecord._2 > r._2 && lastRecord._2 > lastLastRecord._2) {
            results.+=(new PivotPoint(r._1.uniqueId,
              position,
              lastRecord._1.eventTime,
              lastRecord._2,
              true))
          }
        }
        lastUniqueId = r._1.uniqueId
        lastLastRecord = lastRecord
        lastRecord = r

      })

      results.iterator
    })

    // Format output
    pivotPointRDD.map(r => {                                       7
      val pivotType = if (r.isPeak) "peak" else "valley"
      r.uniqueId + "," +
        r.position + "," +
        r.eventTime + "," +
        r.eventValue + "," +
        pivotType
    } ).saveAsTextFile(outputPath)

  }


  class DataKey(val uniqueId:String, val eventTime:Long)
    extends Serializable with Comparable[DataKey] {
    override def compareTo(other:DataKey): Int = {
      val compare1 = uniqueId.compareTo(other.uniqueId)
      if (compare1 == 0) {
        eventTime.compareTo(other.eventTime)
      } else {
        compare1
      }
    }
  }

  class PivotPoint(val uniqueId: String,
                   val position:Int,
                   val eventTime:Long,
                   val eventValue:Int,
                   val isPeak:boolean) extends Serializable {}

}
1

Nothing too interesting here: we’re simply reading the input data and parsing it into easy-to-consume objects.

2

This is where things get interesting. We’re defining a partition here, just like defining a custom partitioner with MapReduce. A partition will help us to decide which records go to which worker after the shuffle process. We need a custom partitioner here because we have a two-part key: primary_key and position. We want to sort by both, but we only want to partition by the primary_key so we get output like that shown in Figure 4-3.

3

This is the shuffle action that will partition and sort the data for us. Note that the 1.3 release of Spark provides a transformation called repartitionAndSortWithinPartitions(), which would provide this functionality for us, but since this is coded with Spark 1.2 we need to manually implement the shuffle.

4

This mapPartition() method will allow us to run through the primary_key in the order of the position. This is where the windowing will happen.

5

This is context information we need in order to find peaks and valleys and to know if we have changed primary_keys. Remember, to find a peak or a valley we will need to know of the record before and the one after. So we will have the currentRow, lastRow, and lastLastRow, and we can determine if the lastRow is a peak or valley by comparing it against the others.

6

Perform comparisons to determine if we’re in a peak or in a valley.

7

And finally, this is the code that will format the records and write them to HDFS.

hdaa 0502
Figure 4-3. Partitioning in the peaks and valleys example—here we partition the sequences into two groups, so we can distribute the analysis on two workers while still keeping all events for each sequence together

Code Example: Peaks and Valleys in SQL

As in the previous example, we’ll first create a table over our test data:

CREATE EXTERNAL TABLE PEAK_AND_VALLEY_TABLE (
  PRIMARY_KEY STRING,
  POSITION BIGINT,
  EVENT_VALUE BIGINT
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TEXTFILE
LOCATION 'PeakAndValleyData';

Now that we have our table we need to order the records and then use the lead() and lag() functions, which will provide the context of the surrounding records:

SELECT                               1
    PRIMARY_KEY,
    POSITION,
    EVENT_VALUE,
    CASE
     WHEN LEAD_EVENT_VALUE is null OR LAG_EVENT_VALUE is null THEN 'EDGE'
     WHEN
       EVENT_VALUE < LEAD_EVENT_VALUE AND EVENT_VALUE < LAG_EVENT_VALUE
     THEN
       'VALLEY'
     WHEN
       EVENT_VALUE > LEAD_EVENT_VALUE AND EVENT_VALUE > LAG_EVENT_VALUE
     THEN
       'PEAK'
     ELSE 'SLOPE'
    END AS POINT_TYPE
  FROM
   (
    SELECT                            2
     PRIMARY_KEY,
     POSITION,
     EVENT_VALUE,
     LEAD(EVENT_VALUE,1,null)
          OVER (PARTITION BY PRIMARY_KEY ORDER BY POSITION) AS LEAD_EVENT_VALUE,
     LAG(EVENT_VALUE,1,null)
          OVER (PARTITION BY PRIMARY_KEY ORDER BY POSITION) AS LAG_EVENT_VALUE
    FROM PEAK_AND_VALLEY_TABLE
   ) A

Although this SQL is not overly complex, it is big enough that we should break it down to explain what’s going on:

1

After execution of the subquery in step 2, we have the data organized in such a way that all the information we need is in the same record and we can now use that record to determine if we have one of the following: an edge, a point on the leftmost or rightmost part of the window timeline; a peak, a value that has a lower value before and after it; a valley, a value that has a higher value before and after it; or a slope point, a value that has a lower value either before or after it and a higher value either before or after it.

2

The subquery is where we’re doing all the windowing logic. This query is putting the values that appear before and after the current value in the same row. Figure 4-4 shows an example of the input and output of this subquery.

hdaa 0503
Figure 4-4. Input and output of the preceding subquery: the first step groups the events for each sequence together and sorts them by the order the events occurred; the second step adds to each event the values from the previous and following event, which is then used by the main query to detect peaks and valleys

Windowing and SQL

Even in this example the SQL code is more concise than Spark, and could be considered simpler. At the same time we need to consider the limitations; doing multiple complex windowing operations in SQL will mean an increase in complexity that will need to read and write to disk, which will increase I/O and lead to a corresponding decrease in performance. With Spark we only need to order the data once and then do N windowing operations on it, using the functionality provided by Java or Scala to hold information in memory and perform operations.

Pattern: Time Series Modifications

For our final example we are going to mix components from the first two examples. We will update records based on a primary key while also keeping all of the history. This will allow a record to know when it was current and when it expired, providing information about an entity and a start and stop time, as shown in Figure 4-5.

hdaa 05in02
Figure 4-5. The price of Apple stock over time

The start and stop times will mark the time range over which that record represented the truth. If the stop time is null, this means that the record is the current truth for that entity. In the preceding example, we can see the current price of Apple stock is $42.

The problem comes in when we need to update this table. As shown in Figure 4-6, when we add a new record of price 43, we need to close out the previous record with a new end time that equals the new record’s start time. The darker cells are cells that need to be updated.

hdaa 05in03
Figure 4-6. Adding a new record to Apple stock price table

At face value this looks like a very simple problem, but in fact, can be complex to implement when you are dealing with updates in large data sets. We’ll discuss some of the challenges and approaches to address them in the following discussion.

Use HBase and Versioning

One common way of storing this information is as versions in HBase. HBase has a way to store every change you make to a record as versions. Versions are defined at a column level and ordered by modification timestamp, so you can go to any point in time to see what the record looked like.

The advantage to this approach is that modifications are very fast, as is getting the latest values. There is, however, some penalty in getting historical versions and a major disadvantage in performing large scans or block cache reads.

Large scans will suffer because they have to pass over all the versions of one record before reaching the next record. So the larger your version count, the slower your large scans will be.

Block cache reads are disadvantageous because when retrieving a record HBase pulls a whole 64 KB HFile block into memory, and if your history is large, it may be pulling into memory other versions instead of the actual record you want.

Lastly, this data model does have the start and stop time in the same record. You need to look at a couple of versions to figure out when a version started and ended.

Use HBase with a RowKey of RecordKey and StartTime

In order to have HBase include the start and stop time in the same record, we have to create a composite key of RecordKey and StartTime. Now the difference from the versioning solution is that the record will have a column for stop time.

Note that start time in the RowKey will need to be a reverse epoch number so that it can be sorted from newest to oldest.

So, to modify such a schema when a new version of a record is added, we will need to first do a single-record scan with the RecordKey as the start RowKey. This will return the most current version of the RecordKey. When we have that information we need to do two puts: one to update the stop time in the current record, and the other to add a new current record.

When we want to get a version for any given time, we just do a single-row scan of RecordKey and the reverse epoch of the time for the record we wish to fetch.

This solution still has the large scan and block cache problems, but now we can get a version quickly and have the stop time in the same row. Just remember this comes at the cost of an additional get and put upon insertion of the record.

Use HDFS and Rewrite the Whole Table

If we remove HBase from the picture and just do the simplest HDFS implementation, we would have all the data in HDFS and we’d refresh the table as we get new data on some periodic basis—for example, once daily.

This solution might seem very expensive, but with Hadoop we can re-write terabytes of data in a short period of time. As data sets get larger, there are techniques we can use to optimize the execution—for example, separate partitions for the most current records. We’ll discuss that solution next.

Use Partitions on HDFS for Current and Historical Records

A smarter implementation on HDFS is to put the most current records in one partition and the historic records in another partition. This would allow you to rewrite just the latest version, not all the versions in history. Then we only append the new records to the older record’s partition.

The big win here is that the execution time is fixed to the number of current records as opposed to getting longer and longer with every version of history added.

We’ll provide an example of this technique next, again using Spark and SQL. For purposes of illustration, we’ll use a simple, contrived data set composed of a unique ID, event time, and a random integer, but it should be easy to imagine how this technique could be extended to real-world, time-based data, such as the stock ticker example described earlier.

Data Generation for Time Series Example

Let’s create our data set for our time series example, again using Scala and the HDFS FileSystem API:

def main(args: Array[String]): Unit = {
  if (args.length == 0) {
   println("{outputPath} {numberOfRecords} {numberOfUniqueRecords} {startTime}")
   return
  }

  val outputPath = new Path(args(0))
  val numberOfRecords = args(1).toInt
  val numberOfUniqueRecords = args(2).toInt
  val startTime = args(3).toInt

  val fileSystem = FileSystem.get(new Configuration())
  val writer =
      new BufferedWriter(new OutputStreamWriter(fileSystem.create(outputPath)))

  val r = new Random


  for (i <- 0 until numberOfRecords) {
   val uniqueId = r.nextInt(numberOfUniqueRecords)
   val madeUpValue = r.nextInt(1000)
   val eventTime = i + startTime

   writer.write(uniqueId + "," + eventTime + "," + madeUpValue)
   writer.newLine()
  }
  writer.close()
 }

Looking at this data generator, you will note it is very close in design to the previous examples of data generation.

Code Example: Time Series in Spark

Now, we move on to our Spark code implementation to update our single partition of time series data with the updated start and stop times. You can find this code in the GitHub in the SparkTimeSeriesExecution Scala object. This is the largest example we have in this chapter, but we’ll walk you through the code to explain what’s going on.

object SparkTimeSeriesExecution {
  def main(args: Array[String]): Unit = {
    if (args.length == 0) {
      println("{newDataInputPath} " +
        "{outputPath} " +
        "{numberOfPartitions}")
      println("or")
      println("{newDataInputPath} " +
        "{existingTimeSeriesDataInputPath} " +
        "{outputPath} " +
        "{numberOfPartitions}")
      return
    }

    val newDataInputPath = args(0)
    val existingTimeSeriesDataInputPath = if (args.length == 4) args(1) else null
    val outputPath = args(args.length - 2)
    val numberOfPartitions = args(args.length - 1).toInt

    val sparkConf = new SparkConf().setAppName("SparkTimeSeriesExecution")
    sparkConf.set("spark.cleaner.ttl", "120000");

    val sc = new SparkContext(sparkConf)

    // Load data from HDFS
    var unendedRecordsRDD = sc.hadoopFile(newDataInputPath,           1
      classOf[TextInputFormat],
      classOf[LongWritable],
      classOf[Text],
      1).map(r => {
      val splits = r._2.toString.split(",")

      (new TimeDataKey(splits(0), splits(1).toLong),
        new TimeDataValue(-1, splits(2)))
    })


    var endedRecordsRDD:RDD[(TimeDataKey, TimeDataValue)] = null

    // Get existing records if they exist
    if (existingTimeSeriesDataInputPath != null) {                    2
      val existingDataRDD = sc.hadoopFile(existingTimeSeriesDataInputPath,
        classOf[TextInputFormat],
        classOf[LongWritable],
        classOf[Text],
        1).map(r => {
        val splits = r._2.toString.split(",")
        (new TimeDataKey(splits(0), splits(1).toLong),
          new TimeDataValue(splits(2).toLong, splits(3)))
      })

      unendedRecordsRDD = unendedRecordsRDD
        .union(existingDataRDD.filter(r => r._2.endTime == -1))

      endedRecordsRDD = existingDataRDD.filter(r => r._2.endTime > -1)
    }

    // Define our partitioner
    val partitioner = new Partitioner {                               3
      override def numPartitions: Int = numberOfPartitions

      override def getPartition(key: Any): Int = {
        Math.abs(
          key.asInstanceOf[TimeDataKey].uniqueId.hashCode() % numPartitions)
      }
    }

    val partedSortedRDD =
      new ShuffledRDD[TimeDataKey, TimeDataValue, TimeDataValue](
        unendedRecordsRDD,
        partitioner).setKeyOrdering(implicitly[Ordering[TimeDataKey]])

    // Walk down each primaryKey to make sure the stop times are updated
    var updatedEndedRecords = partedSortedRDD.mapPartitions(it => {   4
      val results = new mutable.MutableList[(TimeDataKey, TimeDataValue)]

      var lastUniqueId = "foobar"
      var lastRecord: (TimeDataKey, TimeDataValue) = null

      it.foreach(r => {
        if (!r._1.uniqueId.equals(lastUniqueId)) {
          if (lastRecord != null) {
            results.+=(lastRecord)
          }
          lastUniqueId = r._1.uniqueId
          lastRecord = null
        } else {
          if (lastRecord != null) {
            lastRecord._2.endTime = r._1.startTime
            results.+=(lastRecord)
          }
        }
        lastRecord = r
      })
      if (lastRecord != null) {
        results.+=(lastRecord)
      }
      results.iterator
    })

    // If there were existing records union them back in
    if (endedRecordsRDD != null) {                                    5
      updatedEndedRecords = updatedEndedRecords.union(endedRecordsRDD)
    }

    // Format and save the results to HDFS
    updatedEndedRecords                                               6
      .map(r => r._1.uniqueId + "," +
      r._1.startTime + "," +
      r._2.endTime + "," +
      r._2.data)
      .saveAsTextFile(outputPath)
  }

  class TimeDataKey(val uniqueId:String, val startTime:Long)
      extends Serializable with Comparable[TimeDataKey] {
    override def compareTo(other:TimeDataKey): Int = {
      val compare1 = uniqueId.compareTo(other.uniqueId)
      if (compare1 == 0) {
        startTime.compareTo(other.startTime)
      } else {
        compare1
      }
    }
  }

  class TimeDataValue(var endTime:Long, val data:String) extends Serializable {}
}
1

As in previous code examples, here we are just reading in the new data from HDFS.

2

Unlike previous examples in this chapter, in this case we have the possibility of two inputs: the new data and the data from the existing table in HDFS. We’re making the existing data set optional here, because the first time we add records we obviously won’t have an existing data set. Note that we will filter out any records that already have endTimes. This is because we don’t need them to go through the shuffle code and be transferred over the network and sorted. We will union these values back in later.

3

Just as in the peak and valley example, we are going to need a custom partition and shuffle. This will be a common pattern we’ll use whenever we need to traverse over a data set in order by a given key. The partitioning here is similar to previous examples: we want to partition on the primaryKey and sort on the combination of the primaryKey and the startTime.

4

This is the code that will traverse each primaryKey and update records that need new stop times.

5

Here is where we union in the existing records that already had endTimes with a union() method.

6

And finally, this is where we write out the formatted results to HDFS.

Code Example: Time Series in SQL

As before, we need to first set up our source tables for Hive or Impala. In this example we are going to have two tables: one for the existing time series data and one for the new data:

CREATE EXTERNAL TABLE EXISTING_TIME_SERIES_TABLE (
  PRIMARY_KEY STRING,
  EFFECTIVE_DT BIGINT,
  EXPIRED_DT BIGINT,
  EVENT_VALUE STRING
  )
 ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
 STORED AS TEXTFILE
 LOCATION 'ExistingTimeSeriesData';

CREATE EXTERNAL TABLE NEW_TIME_SERIES_TABLE (
  PRIMARY_KEY STRING,
  EFFECTIVE_DT BIGINT,
  EVENT_VALUE STRING
  )
 ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
 STORED AS TEXTFILE
 LOCATION 'NewTimeSeriesData';

The two tables are very close except the new records don’t have an expired date.

Note

Note that in a real-world use case we would likely have multiple partitions in our data set—for example, for current and historic records. This is because we don’t really need to read the existing expired records to perform this processing. We only need to read the existing active records to see if they have been expired with the addition of new records in the NEW_TIME_SERIES_TABLE.

Therefore, the current records would be rewritten, whereas the history would only be appended to.

Now let’s take these two tables and populate a result table with the updated records:

 SELECT
     PRIMARY_KEY,
     EFFECTIVE_DT,
     CASE
       WHEN LEAD(EFFECTIVE_DT,1,null)
         OVER
           (PARTITION BY PRIMARY_KEY ORDER BY EFFECTIVE_DT)
              IS NULL THEN NULL
       ELSE LEAD(EFFECTIVE_DT,1,null)
         OVER
           (PARTITION BY PRIMARY_KEY ORDER BY EFFECTIVE_DT)
     END AS EXPIRED_DT,
     EVENT_VALUE
    FROM (
      SELECT
        PRIMARY_KEY,
        EFFECTIVE_DT,
        EVENT_VALUE
      FROM
        EXISTING_TIME_SERIES_TABLE
      WHERE
        EXPIRED_DT IS NULL
      UNION ALL
      SELECT
        PRIMARY_KEY,
        EFFECTIVE_DT,
        EVENT_VALUE
      FROM NEW_TIME_SERIES_TABLE
    ) sub_1
UNION ALL
SELECT
  PRIMARY_KEY,
  EFFECTIVE_DT,
  EXPIRED_DT,
  EVENT_VALUE
FROM
  EXISTING_TIME_SERIES_TABLE
WHERE
  EXPIRED_DT IS NOT NULL

This is a fairly long query, so let’s break it down. At the top level there are two main SELECTs that are unioned together. The first one handles updating existing current records and new records to see which records have expired. The second one is just moving over the existing expired records. If we used the partition strategy described earlier we wouldn’t need this second top-level SELECT statement.

Focusing on the first SELECT, shown in the following snippet, notice that in the subquery there is a union of the two starting tables. Note that we are filtering out the existing expired tables from the EXISTING_TIME_SERIES_TABLE; this is because we don’t want the extra work of resorting all those records in the windowing function:

SELECT
  PRIMARY_KEY
  EFFECTIVE_DT,
  EVENT_VALUE
FROM
  EXISTING_TIME_SERIES_TABLE
WHERE
  EXPIRED_DT IS NULL
UNION ALL
  SELECT
    PRIMARY_KEY,
    EFFECTIVE_DT,
    EVENT_VALUE
  FROM NEW_TIME_SERIES_TABLE

The results of that inner SELECT will be partitioned by the primary key and ordered by the effective date and time. This will allow us to ask, “Is there a record with a greater timestamp and the same primary key, and that also has an effective date?” If the answer is yes, this record is expired. Here is that portion of the query:

PRIMARY_KEY,
EFFECTIVE_DT,
CASE
  WHEN LEAD(EFFECTIVE_DT,1,null)
    OVER
      (PARTITION BY PRIMARY_KEY ORDER BY EFFECTIVE_DT)
          IS NULL THEN NULL
  ELSE LEAD(EFFECTIVE_DT,1,null)
    OVER
      (PARTITION BY PRIMARY_KEY ORDER BY EFFECTIVE_DT)
END AS EXPIRED_DT,
EVENT_VALUE

As you can see, this is a somewhat complex query, but it’s still more concise than the Spark code. In this case Spark or SQL are both good choices, and your choice will likely be determined by comfort and familiarity with one or the other.

Conclusion

To wrap up this chapter, here are a few takeaways:

  • We can do some pretty complex stuff with Spark and SQL in Hadoop.

  • We don’t have to abandon SQL when moving to Hadoop, and in fact SQL remains a powerful abstraction for processing and analyzing data in Hadoop, just as it is with more traditional data management systems.

  • We can attack problems with different tools, each giving us a different approach to solving the problem.

It’s likely that SQL-on-Hadoop will become an increasingly powerful tool with the introduction of new tools like Impala and Hive on Spark, but it’s unlikely that SQL will replace Spark and other processing frameworks for solving a number of problems. Ultimately it will be necessary for well-rounded Hadoop developers to understand the available tools in order to evaluate the appropriate one to solve their particular problem.

Get Hadoop Application Architectures now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.