Chapter 4. Data Processing at Its Core

This book began with a very high-level overview of data processing history and then descended to a more detailed review of the different components of a data ecosystem. We are now diving to a low enough altitude that we will look into the foundations of all data processing at a job level.

The goal of this chapter is to lay a foundation for you to build on later in the book when we begin categorizing different systems and discussing issues that frequently come up in data processing systems. This book is about optimizing the entire pipeline which you can’t do without understanding what is happening in its smallest units.

Then, we ascend a bit and look at Directed Acyclic Graphs (DAGs) at a pipeline level, which are DAGs that make up many processing DAGs and storage systems.

What Is a DAG?

The first time I heard about a DAG engine, I was super hopeful it was going to be groundbreaking and would change my understanding of data processing. If you have a name as cool as a DAG engine, by law you need to be awesome. However, as with many things in life, as you learn the details, reality becomes less exciting than we had hoped.

A DAG is nothing more than a graph of nodes and edges in which you don’t loop back, as shown in Figure 4-1.

Simple DAG
Figure 4-1. Simple DAG

The DAG provides a great way to explain all data processing and even data pipelines. In general, a DAG can represent the execution of a job in the oven or the path data takes throughout the whole data ecosystem.

If the DAG is for a single job, each node represents a unique operation, and the lines between are normally the optimized transmission of data through memory and network transmissions.

If the DAG is for an entire pipeline, each node signifies one job or DAG within that larger DAG. The difference with a pipeline DAG is that the line between nodes is normally an ingest or extraction from a storage system.

The main body of this chapter focuses on the single-job DAG and optimizations when you’re designing one. Some of the high-level ideas discussed in the single-job DAG section will translate into the viewpoint of the pipeline DAG. However, how they apply and why they apply can differ. The main idea is to give you a low- then high-level view of DAG optimization, first at a job level and then at multiple job levels in a pipeline.

Single-Job DAGs

The term single-job DAGs refers to one SQL statement or one Spark job. The scope in functionality of a single job is not limited by any other factor. It can read in zero to many data sources and output zero to many resulting data sources.

As we discuss in the next major section, the limitations to a single-job’s DAG will be separated by a storage system or serialization of the data. For this section, think of a single DAG as a single SQL or Spark job, for which the DAG is the instructions or recipe on how to take the inputs and produce the outputs.

DAGs as Recipes

When making cookies (mmm, cookies), you need directions to tell you which ingredients to add when, and what operations need to take place at every stage of baking.

Consider the DAG in Figure 4-2 for executing word count.

DAG stage separation
Figure 4-2. DAG stage separation

Let’s examine each stage of this process:

  1. Stage 1

    1. Spin up map tasks.

    2. Read data from source.

    3. Split each line into words.

    4. Convert each word to a tuple (a value with two subvalues) of the word and the number 1.

  2. Stage 2

    1. Shuffle the values by the word.

    2. Perform a reduce job on the word and add the numbers of the words together, as shown in Figure 4-3.

      Example of reduce by
      Figure 4-3. Example of reduce by
  3. Stage 3

    1. Repartition to one partition.

    2. Write out the results to a single file.

In the end, with many partitions, the data flow for this DAG will look something like Figure 4-4.

Example of a shuffle (Stage 1 -> 2) and a repartition (Stage 2 -> 3)
Figure 4-4. : Example of a shuffle (Stage 1 → 2) and a repartition (Stage 2 → 3)

I reuse this diagram as we work through the chapter and give a visual for different components of a DAG, which will be important as we troubleshoot later on in the book.

DAG Operations/Transformations/Actions

In this subsection, we look into a number of parts that make up our common DAG, such as the following:

Inputs

How data comes into our process

Map-only processes

Processing in place

Shuffle

Moving data around

Repartitioning

Properly sizing the parallelism

Advanced techniques

Extra credit for abnormal DAG activity

The goal of these subsections is to help you get a base understanding of how distributed processing works within the context of a DAG topology.

Inputs

As you see in our word count example, we have three starting points. This is indicating that we are reading from the source with three different threads that might or might not be executing from the same node, as shown in Figure 4-5.

Inputs into a DAG
Figure 4-5. Inputs into a DAG

Normally, the number of input streams is defined by the source you are reading from. For example, if you are reading from Amazon Simple Storage Service (Amazon S3), the number of inputs will depend on the number and sizes of your files and whether your files are splittable.

Finding the appropriate level of separation is not easy, but in general you don’t want more than 1 GB of uncompressed data going to one process.

Note

Input Bottlenecks

Earlier, I talked about different storage systems and how they allow for access. If your source system doesn’t allow for more than one parallel read, you might be experiencing a bottleneck on the read.

Read bottlenecks are irritating because everything goes slowly, but nothing looks busy—not the CPU, network, disk, or anything else.

Map-only processes

Map-only processes are stages in the DAG during which data is read and processed without exchanging any data between the processing system’s nodes. To help understand what a map-only process is, let’s look at some common map-only functions in Apache Spark:

Map

A process in which a record comes into the function and one value comes out, to continue on to the next stage in the DAG.

FlatMap

A process in which a record comes into the function and zero or more value(s) come out, to continue on to the next stage in the DAG.

Foreach

A process in which a record comes in and then a process is executed on the record, but no records are processed. This is sometimes used to send data outside the DAG.

A map-only process normally is the first process in the DAG as you read the initial data sources or processes that follow a shuffle, repartition, or reduceBy. Figure 4-6 shows a partition diagram.

Highlighting the first map-only part of a DAG
Figure 4-6. Highlighting the first map-only part of a DAG

If you look closely at Figure 4-6, you will note that the boxes in Stage 2 and 3 can also be map-only processes. However, some will not consider the entire job to be map-only if it includes a shuffle or repartition.

Maps are great because they don’t require a shuffle. A shuffle is that crazy line-crossing thing you see between Stage 1 and Stage 2 in Figure 4-6. The maps read data as it was given to them and write data out as it was given to them. For the most part, map jobs are single-record actions.

Shuffle

The shuffle, as just mentioned, is the crazy line-crossing part of the diagram, as shown in Figure 4-7.

Shuffle operation in a DAG
Figure 4-7. Shuffle operation in a DAG

There are a number of things going on here in our word count example:

A map-side reduce

Because reducing is not super expensive and there are opportunities to reduce within a given partition, the result of a map-side reduce could mean less data going over the wire.

Partitioning the data

Normally, data is partitioned by a hash of the key value (in our case, it is the word) and then that value will mathematical modulo (%) the number of partitions. This will randomly put all the same key values in the same partitions. This allows for a pseudo-random distribution that is also deterministic in its results. Meaning, if you run the job twice, all the keys and their records would land in the same partitions as they did the first time around.

Sorting or grouped

Additionally, the data will be sorted or grouped within a partition so that at the end of the shuffle we can reduce by key or in other use cases do group by’s or joins.

Send over the network

The act of sending data over the network is where the real pain comes from. Not only does your data go over the wire, but you need to serialize and deserialize it on the other side. All this serialization is very CPU intensive.

In general, the shuffle is where you will find a good deal of your processing pain. There are so many things that can go wrong with designing shuffles into your workload:

They are expensive

Use with care because sometimes you send a lot of data over the wire.

How many shuffles

The goal should always be as few as possible.

Skew

Skew is when you have too many of one key, so partitions become uneven or lopsided (more on skew coming up soon).

Resulting partitions

The number of resulting partitions is up to you, unlike the input partitions. So, depending on what you need to do on the other end, you might need more or fewer partitions.

Repartitioning

Not all shuffles are created equal. Sometimes you just need to repartition. The example in Figure 4-8 shows repartitions to write out a single file.

Repartitioning in a DAG
Figure 4-8. Repartitioning in a DAG

Although repartitioning is still expensive because of all the network and CPU involved, we don’t need to do the sorting or grouping.

Additionally, you should use repartitioning because you either need a different level of processing power or you want a different type of output.

Advanced techniques

So far we have looked at the word count example to help describe DAG operations, but for the next couple of operations we need to go beyond word count. Here’s a short use case with each explanation to give you some background.

Broadcasts

The idea of a broadcast is to send out a copy of the data to every executing node in the processing engine. This information can be as simple as connection information; for example, to give each node the instructions to write to a remote store. In the scope of processing data, you can use a broadcast to send records from a smaller dataset. In this case, that’s called a broadcast join, and it looks something like Figure 4-9.

Shuffle join versus a broadcast join
Figure 4-9. Shuffle join versus a broadcast join

The idea is that if one of the tables is small (fewer than a million records), it makes sense to first send the smaller table out to every node and then do a map-only job to join that data to the larger dataset. The result is that you don’t need to shuffle the larger dataset.

Remote interactions

Another cheat that people sometimes try is remote fetching of data, which is similar to a broadcast join. In this model, you put the smaller table into a remote low-latency cache and fetch the joins when you need them. It looks something like Figure 4-10.

Local-to-remote cache in broadcast operation
Figure 4-10. Local-to-remote cache in broadcast operation

This approach can sometimes work but is sometimes really bad. The determining factor is whether the remote cache can keep up with the load. Here are some tricks to help in your design:

Scale

Make sure the cache can scale.

Skew

Make sure your cache won’t fall down if there is key skew.

Round trip

Make sure the latency round trip between the job and the cache is minimal.

Batching

The greater the round trip, the more you want to batch requests into a single send.

Overclocking

Depending on your design, your processing system might be doing a lot of waiting for the remote call, so it might be safe to overclock your cores—that is, have more threads than virtual cores.

Using memory

Using memory is always helpful because it removes the need to reach out to a slower storage option; however, memory alone doesn’t solve everything. Memory is constrained to a single process, so as data leaves an initial process or the local node, the benefit from memory is mostly lost. With a memory-based system, you want to focus on reduce-by-key operations in order to keep as much in memory as possible.

Checkpointing

Checkpointing means taking time and resources to record your state, which allows you to continue from that state if something were to go wrong with your job. This trick comes in handy if you have jobs that take more than 30 minutes.

You don’t want to have a 20-hour job that fails in the last hour and requires you to repeat 19 hours’ worth of work. Instead, break up the problem into smaller chunks and consider checkpointing. That way, if you do fail, you have a place to recover from that is more optimal than starting from the beginning.

Pipeline DAGs

The pipeline DAG is different from the single-job DAG in that it involves more than one job. With that difference, our pipeline DAG will look a little different, as illustrated in Figure 4-11. Each processing node reads from and writes to persistent nodes.

Pipeline example (difference in processing versus storage)
Figure 4-11. Pipeline example (difference in processing versus storage)

There are a few things to note in Figure 4-11:

No direct lines

There are no lines from a processing node to another processing node or from a storage node to another storage node. You always go from a storage node to a processing node and back.

Start and end with storage

You always start and end with a storage node, which implies that a processing node exists only to populate a storage node.

Storage reuse

Sometimes storage nodes (like the data marts in the persistent gray boxes in Figure 4-11) are there only to bridge two different types of processing.

Those three points will serve as reasons to drill down into pipeline DAG optimization. The following subsections look at each one to see how the decisions around each can affect you.

No Direct Lines

Normally, a storage node does not mutate itself. Examples of storage nodes include the following:

  • RDBMS

  • Kafka topics

  • RabbitMQ queue

  • Stream

  • NoSQL

  • Caches

  • Streaming in memory stores (Spark Streaming, Flink)

  • Elasticsearch

All of those storage systems store data in one form or another. They don’t mutate the data or send the data out to anyone else—their main job is to store the data. In some cases, they might notify of data mutation; however, mutation of data and moving data around is the job of the processing systems, which we talk about soon.

Note

What About Triggers?

Some of these storage systems will enable triggers that might mutate or transmit data, but remember our perspective: from a pipeline these triggers are process nodes.

A processing node could be anything from SQL, Spark, Code, Triggers, and more. It is these processing actions that can take data, do something with it, and deliver it to one or more storage nodes.

Additionally, from the perspective of our pipeline, a processing node cannot send to another processing node. The gap between the two processing nodes must be some type of storage medium, even if that medium is as basic as a stream of messages.

Start and End with Storage

The previous subsection talked about storage nodes always being consumed by a processing node and a processing node sending its output to one or more storage nodes. With that logic, we can derive that the storage nodes are always our beginnings and our endings to our pipelines.

That’s because processing nodes are all about processing and transmission. As soon as it’s done, a processing node has no state and is empty. It is the storage node that retains the value.

Storage Reuse

In the pipeline DAG diagram shown earlier in Figure 4-11, there is a data mart labeled Y-Mart. This data mart is used by multiple processing nodes, allowing them to reuse the output of processes that are complex, require consistency, or are expensive to run.

Complex

Complex logic is difficult to implement and would be wasteful to implement multiple times.

Consistent

Consistent logic is, importantly, calculated only once. Think about a customer’s balance. If you had more than one processing system process customer balances, there is a chance that they might differ, and then there could be confusion in the company about which one is right.

Expensive

The one that is most important to processing speed is performance. A great example would be to create a data mart that is the result of a very expensive join. Many people could use the results without having to go through the cost of independently performing the join.

To recap: the idea here is to look for cases in which the cost of replicating the data is overcome by the cost of reprocessing its results more than once—the cost of complexity, consistency, or processing expense.

Summary

In this chapter, we looked in depth at DAGs, both for a single data-processing job and for executing many different processing jobs in a processing pipeline.

I hope by now that you are beginning to see your systems as DAGs within DAGs, looking for patterns and optimizations at any level of the DAGs that can move the bar at the highest-most DAG.

Chapter 5 uses this foundational information to discuss how things can go wrong with your single jobs, which will hopefully give you more things to think about while in the design stage. Chapter 6 then returns to a higher-level view to look at the running of many jobs in a pipeline.

Get Rebuilding Reliable Data Pipelines Through Modern Tools now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.