Chapter 4. Stream Processing
The goal of stream processing is to get data immediately into the form that is needed for the target technology. That can involve any of the types of processing that we mentioned in Chapter 1, which we elaborate on in this chapter.
Although rare, there are use cases in which streaming integration is used to move data from a streaming source directly to a target without any in-stream processing. Here are examples of when this might occur:
-
Replicating a database
-
Moving changes from one database to another
-
Reading from a message queue and writing the output as-is into a file
-
Moving data from one filesystem to cloud storage without transforming the data
More commonly, however, the source data won’t match the target data structure. This may be because some of the source data needs to be filtered out; for example, some events or fields of an event might not be needed, so they are removed. Or some data needs to be obfuscated because it contains personally identifiable information (PII). Perhaps additional fields need to be added before delivery to the target. Or, maybe the streaming data needs to be joined with some reference data for enrichment purposes. Stream processing can perform all of these functions, continuously, with low latency, on any collected data (Figure 4-1).
In-Memory
In a true streaming-integration platform, in-memory data processing is required. And that processing needs to be performed as efficiently as possible.
To achieve low latency and high throughput, it is critical to avoid writing data to disk or utilizing storage I/O prior to processing data. Stream processing needs to be performed directly on the streaming data in-memory, before the data ever lands on disk.
There are only two reasons to go to storage:
-
The target being written to is a file-based system, such as a specific database or cloud storage.
-
Persistent data streams are in use.
Stream processing also needs to be parallelized as necessary across multiple threads – or multiple processes and nodes – to achieve desired performance. Even in a multistage data pipeline, no disk I/O or writing data to storage should happen between the intermediate steps. All the processing should happen in-memory between receiving the data and writing the data into targets to achieve the desired throughput.
Continuous Queries
A streaming architecture also requires a modern querying paradigm. With database systems, queries are run against bounded sets of existing data. A single set of data is returned, and that’s it. To see that query over time, you need to run the same query again – and again. To get updated results, you need to execute queries repeatedly.
With streaming systems, a single query is written based on the knowledge that data with a certain structure exists. That query sits in-memory, and waits for the data. As data appears on one or more incoming data streams, that query processes the incoming data and outputs results continuously in a never-ending fashion.
Effectively, there are two key differences between the in-memory continuous queries that happen in stream processing and the way people thought of queries in the past.
First, continuous queries work on a never-ending, infinite, and unbounded flow of data, as opposed to a bounded and known set of data that is resident in a table.
Second, although a database query is “one and done,” continuous in-memory queries continually produce new results as new data is presented on the incoming data streams.
Unlike Extract, Transform, and Load (ETL) systems and integration technologies of the past, where things were batch-job oriented, real-time stream-processing systems run continuously, 24/7, and the engine behind the processing in those systems is the continuous query. Every time new records appear on a data stream, the query outputs new results.
It’s important to understand that continuous queries aren’t limited to simply reading from a data stream. They can read from in-memory caches, from in-memory reference data that might have been stored, or via windows (more on windows shortly). They can read from other – even persistent – storage, event and data sources, as well, depending on the architecture of the streaming system.
End-to-end latencies can range from microseconds to seconds, depending on how much processing is required, as opposed to the hours or even days typical of batch ETL solutions. As emphasized before, to achieve the goals of continually producing results and producing them with very low latency, queries need to be in-memory.
SQL-Based Processing
There are many ways to process and manipulate data. We can do it via a scripting language, in an Excel spreadsheet, or even by writing lower-level code in Java, C++, Python or some other language.
In effect, there are three options in stream processing:
-
Low-level code or APIs
-
SQL-based processing
-
UI-based building blocks that perform transformations at higher levels of definition
In our opinion, SQL is the best solution – a great compromise between the other two choices when you consider overall power, speed, and ease of use. We explain why in this chapter.
Consider the Users
First and foremost, the people who typically extract value from streaming data are data scientists, data analysts, or business analysts. They all have experience working with database systems, and almost all are very familiar with SQL as a data manipulation language.
By choosing SQL as your language for manipulating data, you allow the people who actually know the data to work with it firsthand, without intermediaries.
SQL is also very rich. It’s easy to define filtering with WHERE clauses, to define column transformations, and to do conditional manipulations using case statements. Different types of objects can be JOINed as well as GROUP BYed and aggregated. Whereas with databases, you’re typically joining tables, in streaming cases, you’re joining streams, windows, and caches to produce results. It’s very easy to do that in SQL.
Of course, SQL is a higher-level declarative language. So, to achieve optimal performance, SQL must be transformed into high-performance code that executes on whatever stream processing platform has been chosen. If using Java, SQL is translated to high-performance Java byte code. This way, you get the best of both worlds: the declarative nature of SQL – which allows data professionals to work directly with the data – and the high performance as if a developer had written the code.
Most streaming technologies are moving toward SQL for these reasons: Striim, Spark Streaming, Kafka, and Samsa, among others, offer SQL interfaces.
User Interface–Based Processing
Instead of providing a declarative language like SQL, some streaming vendors go to a higher level and do everything through a user interface (UI). This UI is typically a graphical user interface (GUI) and provides transformer components that are capable of doing some of the operations that SQL can do. Providing such an interface democratizes the use of data even more given that virtually any user can be proficient with a GUI.
Still, ultimately, what results is quite a lengthy data pipeline. That’s because each of the GUI-based steps is performed as an individual task because each one of the transformers possesses very granular functionality. Whereas SQL could achieve its objective with a single statement – perhaps some filtering with a WHERE clause and some joins or column transformations – five or six different transformer boxes would need to be joined together using the GUI.
The upside of a GUI is that people who have no experience whatsoever in any programming language (including SQL) can build transformations. But there are downsides, as well. First, it might not be a good thing that people who possess no experience building out transformations are handling critical data. Second, the data pipelines themselves can suffer in terms of performance because, rather than a single processing step using an SQL statement, now quite a few processing steps are required. Performance can take a hit. Although having a GUI for the pipeline is essential, having multiple individual UI-based transformation steps is less efficient than a single SQL statement.
Multitemporality
As you recall, events are created whenever anything happens. If data is collected, an event is generated. If you’re doing CDC from a database, or reading from files line by line, or receiving data from IoT devices or messaging systems, it is classified as an event. And each event has a timestamp for when it enters the system.
But it’s also possible that there are additional time elements for such events. For example, with a database system, there is the time at which the event was committed into the database. Then there might be a timestamp for the time that the streaming system received it. Those two timestamps might be different, especially in a recovery scenario in which there is a discrepancy between when the database system wrote it and when it was read. Typically, there will be at least those two timestamps. They are considered metadata; that is, data about the data you’ve received.
These events will be at least bitemporal. But there could be additional timing elements within the data itself that are worth taking advantage of.
In an event system, you should be able to make use of any of those times for time-series analysis. You might do this to make sure the data is coming in the proper order, to use windowing functions and data aggregation in a time-based way, or to do time-series analyses using regression functions. The result is multitemporality, which means that any event can be defined to exist in time based on multiple pieces of time information.
This is important, because any operation that you perform on the data that relies on time should be able to make use of any of the timing attributes of an individual event. Simply utilizing the time at which it was collected might not be that useful. Being able to choose which of the timing elements can be better suited to your particular use case.
From an analytics perspective, however, timing information is crucial for building aggregates over time periods. We explain this later on when we explore windows and the different types of aggregations and functions that we can apply to data as part of time-series analyses.
Transformations
Transformations are about applying functions to incoming data. Transformations typically work on a record-by-record basis. The key is being able to manipulate the data, get it into the desired form, join it together, and perform functions on it to produce some desired output.
For example, you might want to concatenate two strings together so that you can combine first name and last name into a full name. That’s a very simple example. Or, you might need to look up something based on an incoming value by saying, “Output the zip code that corresponds to this incoming IP address” by doing a LOOKUP function.
More complex functions are possible, of course, such as conditional transformations that involve case statements in SQL, in which if a particular field has a certain value, you want to combine it with a different field.
Filtering
Data flows in stream processing can be arbitrarily complex. For example, they could have splits or branches in them. Filtering is used when the output stream doesn’t need all the data coming in.
Filtering for Data Reduction
Data reduction is one reason to do filtering. A simple example would be to avoid processing any debug log entries because you’re interested only in warning or error messages. Another would be filtering incoming CDC records so that they don’t include inputs from a particular database user. In the first case, a data element is being filtered. In the second case, the filter is based on metadata that includes which user made a given change because you don’t want those particular changes downstream (Figure 4-2).
Filtering for Writing
Another reason to use filtering is to ensure that only certain data is written to certain targets. You might have an incoming data stream with a huge volume of data in it – a big database schema undergoing CDC, so that the incoming data stream includes changes from all the tables in that schema. But suppose that you want to store only information about product orders in a cloud data warehouse. You don’t want changes to customer records or changes to products to be written to this particular reporting instance, just orders. Filtering enables this.
In SQL, most of the time, filtering is done by using the WHERE clause. In the cases of filtering based on aggregates, the HAVING clause is useful.
Analytics
We also can apply filtering for decision-making using analytics. You can use analytics to determine, for example, whether an event met or exceeded a specified threshold, or whether or not to generate an alert. We look deeper into analytics later on.
Windows
Windows are used to convert an infinite, unbounded incoming data stream into a finite bounded set of data, using whatever criteria is preferred (see Figure 4-3). Set-based operations can then be performed on that data. The two major uses of windows are correlation and aggregation. More on those later. There are several types of windows. Sliding windows change any time new data comes in, or as time passes. Every time a new record enters the window, or some time goes by, records can leave the window. Any queries that run on that sliding window are triggered every time the window changes.
Next, we have jumping windows or batch windows. These have criteria that determine how much data needs to be in the window before that data is output and before the queries reading from that window downstream are triggered with a windowful of data. The window is then emptied and ready to be filled again.
If you run a moving average over sliding windows, you see a smooth moving average rather than a jumping/batch window producing a moving average that happened only at a one-minute interval. Hybrid versions of that can be made, as well, where you say, “I don’t want to execute the query every time I get new data, I want to do it every 10 events.”
Then, there are session windows, which use timestamps. We could define such windows by, for example, “hold data until no new data corresponds to this for 30 seconds.” This is useful when a customer comes to a website and is active for a period of time before leaving. Grouping all of their activities by waiting until they don’t do anything else for a specified amount of time can trigger a query.
As such, there’s an entire spectrum of sliding, to fully batch, and then session windows. And with any of the windows, we can also add timeouts that trigger the output independent of anything else happening. For example, “hold 100 events or output as much as happened in the last 30 seconds.” You can do combinations of those windows, as well.
Windows are essential for correlation and aggregation use cases. The different types of windows are suited for different purposes. Yet windows might not be intuitive for database engineers. That’s because databases are inherently set based. Data exists in a table, and that’s it. Conceptualizing data as something that’s continually changing, and having to create sets around it to do basic aggregate functions like sums, averages, or linear regressions, might be a new and different way of thinking.
Almost all streaming integration platforms offer some degree of windowing, and all are able to do timing-based window functions. However, not all support all the different types of windows or multitemporality. This is important to know because windows are an essential component to any kind of any stream-processing platform. Without windows, streaming integration use cases can be limited.
Enrichment
Streaming data might not contain all of the information that you need at your target destination or to do analytics. That’s when enrichment comes in.
For example, when doing CDC from a relational database, the majority of the fields in any one particular table will be IDs that refer to other tables. A data stream of all product items that have been ordered that is coming from the customer-order item table, for example, might contain an order ID, a customer ID, an item ID, maybe a quantity and timestamp, but that might be it.
Attempting to do downstream analytics on this limited data would probably not be productive. It would be impossible to write queries like, “show all of the real-time orders from California,” or, “show all of the orders for umbrellas because a heavy storm is expected.” Without additional information or additional content, you’re not going to be able to perform rich analytics.
The answer is to enrich the existing data with reference data.
For example, products in an online store – let’s say 100,000 of them – could be loaded into memory and indexed by ID. Then, whenever a customer order item appears in the data stream, it could be joined with the items in memory and additional information added: the item name, its category, its current inventory, and other relevant data. The data stream now has much more information in it, and is much more suited for analytics.
Distributed Caches
The challenge of enriching real-time data is the size and speed of the data. In a database, everything is in a data store. It’s accessible within the same database. Two tables can easily be joined together to provide all the information needed. However, for real-time streaming environments, when we’re talking about hundreds of thousands of events per second, this is difficult.
If you’re joining against a remote database, for example, it would be necessary to do a query every event. Each query could take several milliseconds. With hundreds of thousands of events, it becomes impossible to query back to a database for every entry in a data stream in the required time. Similarly, with external caches or external data grids, it’s not feasible to do a remote request from that cache and maintain that speed of 100,000 events per second.
We can deal with this by including a distributed cache, or in-memory data grid, within the streaming integration platform itself. By placing the data in memory into the same process space as the streaming data and partitioning that data in the same way as the incoming data events, it’s possible to achieve very high throughput and low latency.
That doesn’t always happen naturally. For example, with 100,000 items in memory, a six-node cluster, and a caching system defined to always maintain two copies of the data for redundancy purposes, the chances of any one particular item of data being on a single node is one in three.
However, if we can arrange it so that the events are partitioned by the same algorithm as is used for partitioning the reference data, the event will always land on the correct node. Now the query is completely in-memory in that node, and is really, really fast. Otherwise, it’s necessary to do a remote lookup, which could take tens to hundreds of microseconds.
Correlation
Correlation in this context does not refer to statistical correlation. It’s not about matching variables or using linear regression to understand how variables are correlated. That’s part of analytics (Chapter 5). Here, by correlation, we mean matching events coming in a data stream with events that are coming from one or more other data streams.
A simple example is to have data that represents activity on a number of different hosts, with that data coming from different sources. Perhaps it includes machine information, CPU usage, and memory coming from system logs. Maybe it includes network traffic information coming from network routers, or firewall information coming from other sources. How do you join it all together to see everything that’s happening with a particular device?
In this case, they’d have an IP address or MAC ID in common. What’s required then is to join the data streams together to produce a single output data stream.
However, doing this with data streams is difficult because they are so fast moving. Having events coinciding at exactly the same time is unusual. It’s like aiming two proton beams at each other in a particle accelerator. The chances of two protons hitting are small because they’re fast and they’re tiny. The same is true for streaming events.
To join data streams together, you typically need to incorporate data windows. Imagine you have multiple physical pipelines and each has a temperature, rate of flow, and pressure. There are sensors on each pipeline measuring these attributes sending data to data streams, and each sensor is generating data at a different rate.
To understand the temperature, pressure, and flow of a particular pipe, it would be necessary to join those three data streams together. Now, because they all come at different speeds, the way to do that would be to create windows that have the last record per pipeline, per data stream. And whenever a new entry comes into the window, it would replace the old one for that pipeline.
The query is then written against the three windows. The query outputs an event whenever any of the windows change, and the output will be whatever the new value is for that pipeline on the window that changed, plus the existing measurements from the other windows.
This way, it’s possible to join streams together that run at different speeds and produce an output whenever data is received on any one of the streams.
It’s possible to go further than that by deciding to hold the last few values rather than only the last value. This allows a calculation to be made of what a value could be. Perhaps instead of simply using the last value, the average of the last 3 values is used, or a more complex regression mechanism could calculate the value based on the last 10 values.
In summary, windows aren’t just useful for joining together streams that are coming at the same rate. It’s also useful for joining streams that flow at different rates. Windows are the essential ingredient to being able to do real-time correlation across fast-moving data streams.
Get Streaming Integration now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.