Events and Stream Processing
The idea of structuring data as a stream of events is nothing new, and it is used in many different fields. Even though the underlying principles are often similar, the terminology is frequently inconsistent across different fields, which can be quite confusing. Although the jargon can be intimidating when you first encounter it, don’t let that put you off; many of the ideas are quite simple when you get down to the core.
We will begin in this chapter by clarifying some of the terminology and foundational ideas. In the following chapters, we will go into more detail of particular technologies such as Apache Kafka1 and explain the reasoning behind their design. This will help you make effective use of those technologies in your applications.
Figure 1-1 lists some of the technologies using the idea of event streams. Part of the confusion seems to arise because similar techniques originated in different communities, and people often seem to stick within their own community rather than looking at what their neighbors are doing.
The current tools for distributed stream processing have come out of Internet companies such as LinkedIn, with philosophical roots in database research of the early 2000s. On the other hand, complex event processing (CEP) originated in event simulation research in the 1990s2 and is now used for operational purposes in enterprises. Event sourcing has its roots in the domain-driven design (DDD) community, which deals with enterprise software development—people who have to work with very complex data models but often smaller datasets than Internet companies.
My background is in Internet companies, but here we’ll explore the jargon of the other communities and figure out the commonalities and differences. To make our discussion concrete, I’ll begin by giving an example from the field of stream processing, specifically analytics. I’ll then draw parallels with other areas.
Implementing Google Analytics: A Case Study
How would you implement something like Google Analytics? First take the input to the system. Every time a user views a page, we need to log an event to record that fact. A page view event might look something like the example in Figure 1-3 (using a kind of pseudo-JSON).
A page view has an event type (PageViewEvent), a Unix timestamp that indicates when the event happened, the IP address of the client, the session ID (this may be a unique identifier from a cookie that allows you to figure out which series of page views is from the same person), the URL of the page that was viewed, how the user got to that page (for example, from a search engine, or by clicking a link from another site), the user’s browser and language settings, and so on.
Note that each page view event is a simple, immutable fact—it simply records that something happened.
Now, how do you go from these page view events to the nice graphical dashboard on which you can explore how people are using your website?
Broadly speaking, you have two options, as shown in Figure 1-4.
- Option (a)
- You can simply store every single event as it comes in, and then dump them all into a big database, a data warehouse, or a Hadoop cluster. Now, whenever you want to analyze this data in some way, you run a big SELECT query against this dataset. For example, you might group by URL and by time period, or you might filter by some condition and then COUNT(*) to get the number of page views for each URL over time. This will scan essentially all of the events, or at least some large subset, and do the aggregation on the fly.
- Option (b)
- If storing every single event is too much for you, you can instead store an aggregated summary of the events. For example, if you’re counting things, you can increment a few counters every time an event comes in, and then you throw away the actual event. You might keep several counters in an OLAP cube:3 imagine a multidimensional cube for which one dimension is the URL, another dimension is the time of the event, another dimension is the browser, and so on. For each event, you just need to increment the counters for that particular URL, that particular time, and so on.
With an OLAP cube, when you want to find the number of page views for a particular URL on a particular day, you just need to read the counter for that combination of URL and date. You don’t need to scan over a long list of events—it’s just a matter of reading a single value.
Now, option (a) in Figure 1-5 might sound a bit crazy, but it actually works surprisingly well. I believe Google Analytics actually does store the raw events—or at least a large sample of events—and performs a big scan over those events when you look at the data. Modern analytic databases have become really good at scanning quickly over large amounts of data.
The big advantage of storing raw event data is that you have maximum flexibility for analysis. For example, you can trace the sequence of pages that one person visited over the course of their session. You can’t do that if you’ve squashed all the events into counters. That sort of analysis is really important for some offline processing tasks such as training a recommender system (e.g., “people who bought X also bought Y”). For such use cases, it’s best to simply keep all the raw events so that you can later feed them all into your shiny new machine-learning system.
However, option (b) in Figure 1-5 also has its uses, especially when you need to make decisions or react to things in real time. For example, if you want to prevent people from scraping your website, you can introduce a rate limit so that you only allow 100 requests per hour from any particular IP address; if a client exceeds the limit, you block it. Implementing that with raw event storage would be incredibly inefficient because you’d be continually rescanning your history of events to determine whether someone has exceeded the limit. It’s much more efficient to just keep a counter of number of page views per IP address per time window, and then you can check on every request whether that number has crossed your threshold.
Similarly, for alerting purposes, you need to respond quickly to what the events are telling you. For stock market trading, you also need to be quick.
The bottom line here is that raw event storage and aggregated summaries of events are both very useful—they just have different use cases.
Let’s focus on aggregated summaries for now—how do you implement them?
Well, in the simplest case, you simply have the web server update the aggregates directly, as illustrated in Figure 1-6. Suppose that you want to count page views per IP address per hour, for rate limiting purposes. You can keep those counters in something like memcached or Redis, which have an atomic increment operation. Every time a web server processes a request, it directly sends an increment command to the store, with a key that is constructed from the client IP address and the current time (truncated to the nearest hour).
If you want to get a bit more sophisticated, you can introduce an event stream, or a message queue, or an event log (or whatever you want to call it), as illustrated in Figure 1-7. The messages on that stream are the PageViewEvent records that we saw earlier: one message contains the content of one particular page view.
The advantage of this architecture is that you can now have multiple consumers for the same event data. You can have one consumer that simply archives the raw events to some big storage; even if you don’t yet have the capability to process the raw events, you might as well store them, since storage is cheap and you can figure out how to use them in future. Then, you can have another consumer that does some aggregation (for example, incrementing counters), and another consumer that does monitoring or something else—those can all feed off of the same event stream.
Event Sourcing: From the DDD Community
Now let’s change the topic for a moment, and look at similar ideas from a different field. Event sourcing is an idea that has come out of the DDD community4—it seems to be fairly well known among enterprise software developers, but it’s totally unknown in Internet companies. It comes with a large amount of jargon that I find confusing, but it also contains some very good ideas.
Let’s try to extract those good ideas without going into all of the jargon, and we’ll see that there are some surprising parallels with the last example from the field of stream processing analytics.
Event sourcing is concerned with how we structure data in databases. A sample database I’m going to use is a shopping cart from an e-commerce website (Figure 1-9). Each customer may have some number of different products in their cart at one time, and for each item in the cart there is a quantity.
Now, suppose that customer 123 updates their cart: instead of quantity 1 of product 999, they now want quantity 3 of that product. You can imagine this being recorded in the database using an UPDATE query, which matches the row for customer 123 and product 999, and modifies that row, changing the quantity from 1 to 3 (Figure 1-10).
This example uses a relational data model, but that doesn’t really matter. With most non-relational databases you’d do more or less the same thing: overwrite the old value with the new value when it changes.
However, event sourcing says that this isn’t a good way to design databases. Instead, we should individually record every change that happens to the database.
For example, Figure 1-11 shows an example of the events logged during a user session. We recorded an AddedToCart event when customer 123 first added product 888 to their cart, with quantity 1. We then recorded a separate UpdatedCartQuantity event when they changed the quantity to 3. Later, the customer changed their mind again, and reduced the quantity to 2, and, finally, they went to the checkout.
Each of these actions is recorded as a separate event and appended to the database. You can imagine having a timestamp on every event, too.
When you structure the data like this, every change to the shopping cart is an immutable event—a fact (Figure 1-12). Even if the customer did change the quantity to 2, it is still true that at a previous point in time, the selected quantity was 3. If you overwrite data in your database, you lose this historic information. Keeping the list of all changes as a log of immutable events thus gives you strictly richer information than if you overwrite things in the database.
And this is really the essence of event sourcing: rather than performing destructive state mutation on a database when writing to it, we should record every write as an immutable event.
Bringing Together Event Sourcing and Stream Processing
This brings us back to our stream-processing example (Google Analytics). Remember we discussed two options for storing data: (a) raw events, or (b) aggregated summaries (Figure 1-13).
Put like this, stream processing for analytics and event sourcing are beginning to look quite similar. Both PageViewEvent (Figure 1-3) and an event-sourced database (AddedToCart, UpdatedCartQuantity) comprise the history of what happened over time. But, when you’re looking at the contents of your shopping cart, or the count of page views, you see the current state of the system—the end result, which is what you get when you have applied the entire history of events and squashed them together into one thing.
So the current state of the cart might say quantity 2. The history of raw events will tell you that at some previous point in time the quantity was 3, but that the customer later changed their mind and updated it to 2. The aggregated end result only tells you that the current quantity is 2.
Thinking about it further, you can observe that the raw events are the form in which it’s ideal to write the data: all the information in the database write is contained in a single blob. You don’t need to go and update five different tables if you’re storing raw events—you only need to append the event to the end of a log. That’s the simplest and fastest possible way of writing to a database (Figure 1-14).
On the other hand, the aggregated data is the form in which it’s ideal to read data from the database. If a customer is looking at the contents of their shopping cart, they are not interested in the entire history of modifications that led to the current state: they only want to know what’s in the cart right now. An analytics application normally doesn’t need to show the user the full list of all page views—only the aggregated summary in the form of a chart.
Thus, when you’re reading, you can get the best performance if the history of changes has already been squashed together into a single object representing the current state. In general, the form of data that’s best optimized for writing is not the same as the form that is best optimized for reading. It can thus make sense to separate the way you write to your system from the way you read from it (this idea is sometimes known as command-query responsibility segregation, or CQRS5)—more on this later.
Going even further, think about the user interfaces that lead to database writes and database reads. A database write typically happens because the user clicks some button; for example, they edit some data and then click the save button. So, buttons in the user interface correspond to raw events in the event sourcing history (Figure 1-15).
On the other hand, a database read typically happens because the user views some screen; they click on some link or open some document, and now they need to read the contents. These reads typically want to know the current state of the database. Thus, screens in the user interface correspond to aggregated state.
This is quite an abstract idea, so let me go through a few examples.
For our first example, let’s take a look at Twitter (Figure 1-16). The most common way of writing to Twitter’s database—that is, to provide input into the Twitter system—is to tweet something. A tweet is very simple: it consists of some text, a timestamp, and the ID of the user who tweeted (perhaps also optionally a location or a photo). The user then clicks that “Tweet” button, which causes a database write to happen—an event is generated.
On the output side, how you read from Twitter’s database is by viewing your timeline. It shows all the stuff that was written by people you follow. It’s a vastly more complicated structure (Figure 1-17).
For each tweet, you now have not just the text, timestamp, and user ID, but also the name of the user, their profile photo, and other information that has been joined with the tweet. Also, the list of tweets has been selected based on the people you follow, which may itself change.
How would you go from the simple input to the more complex output? Well, you could try expressing it in SQL, as shown in Figure 1-18.
That is, find all of the users who
$user is following, find all the tweets that they have written, order them by time and pick the 100 most recent. It turns out this query really doesn’t scale very well. Do you remember in the early days of Twitter, when it kept having the fail whale all the time? Essentially, that was because they were using something like the query above6.
When a user views their timeline, it’s too expensive to iterate over all the people they are following to get those users’ tweets. Instead, Twitter must compute a user’s timeline ahead of time, and cache it so that it’s fast to read when a user looks at it. To do that, the system needs a process that translates from the write-optimized event (a single tweet) to the read-optimized aggregate (a timeline). Twitter has such a process, and calls it the fanout service. We will discuss it in more detail in Turning the Database Inside Out.
For another example, let’s look at Facebook. It has many buttons that enable you to write something to Facebook’s database, but a classic one is the “Like” button. When you click it, you generate an event, a fact with a very simple structure: you (identified by your user ID) like (an action verb) some item (identified by its ID) (Figure 1-19).
However, if you look at the output side—reading something on Facebook—it’s incredibly complicated. In this example, we have a Facebook post which is not just some text, but also the name of the author and his profile photo; and it’s telling me that 160,216 people like this update, of which three have been especially highlighted (presumably because Facebook thinks that among those who liked this update, these are the ones I am most likely to know); it’s telling me that there are 6,027 shares and 12,851 comments, of which the top 4 comments are shown (clearly some kind of comment ranking is happening here); and so on.
There must be some translation process happening here, which takes the very simple events as input and then produces a massively complex and personalized output structure (Figure 1-20).
One can’t even conceive what the database query would look like to fetch all of the information in that one Facebook update. It is unlikely that Facebook could efficiently query all of this on the fly—not with over 100,000 likes. Clever caching is absolutely essential if you want to build something like this.
Immutable Facts and the Source of Truth
From the Twitter and Facebook examples we can see a certain pattern: the input events, corresponding to the buttons in the user interface, are quite simple. They are immutable facts, we can simply store them all, and we can treat them as the source of truth (Figure 1-21).
You can derive everything that you can see on a website—that is, everything that you read from the database—from those raw events. There is a process that derives those aggregates from the raw events, and which updates the caches when new events come in, and that process is entirely deterministic. You could, if necessary, re-run it from scratch: if you feed in the entire history of everything that ever happened on the site, you can reconstruct every cache entry to be exactly as it was before. The database you read from is just a cached view of the event log.7
The beautiful thing about this separation between source of truth and caches is that in your caches, you can denormalize data to your heart’s content. In regular databases, it is often considered best practice to normalize data, because if something changes, you then only need to change it one place. Normalization makes writes fast and simple, but it means you must do more work (joins) at read time.
To speed up reads, you can denormalize data; that is, duplicate information in various places so that it can be read faster. The problem now is that if the original data changes, all the places where you copied it to also need to change. In a typical database, that’s a nightmare because you might not know all the places where something has been copied. But, if your caches are built from your raw events using a repeatable process, you have much more freedom to denormalize because you know what data is flowing where.
Another example is Wikipedia. This is almost a counter-example to Twitter and Facebook, because on Wikipedia the input and the output are almost the same (Figure 1-22).
When you edit a page on Wikipedia, you get a big text field containing the entire page content (using wiki markup), and when you click the save button, it sends that entire page content back to the server. The server replaces the entire page with whatever you posted to it. When someone views the page, it returns that same content back to the user (formatted into HTML), as illustrated in Figure 1-23.
So, in this case, the input and the output are essentially the same.
What would event sourcing mean in this case? Would it perhaps make sense to represent a write event as a diff, like a patch file, rather than a copy of the entire page? It’s an interesting case to think about. (Google Docs works by continually applying diffs at the granularity of individual characters—effectively an event per keystroke.8)
Using Append-Only Streams of Immutable Events
Now, let’s return to stream processing.
I first described how you might build something like Google Analytics, compared storing raw page view events versus aggregated counters, and discussed how you can maintain those aggregates by consuming a stream of events (Figure 1-7). I then explained event sourcing, which applies a similar approach to databases: treat all the database writes as a stream of events, and build aggregates (views, caches, search indexes) from that stream.
When you have that event stream, you can do many great things with it (Figure 1-26):
You can take all the raw events, perhaps transform them a bit, and load them into Hadoop or a big data warehouse where analysts can query the data to their heart’s content.
You can update full-text search indexes so that when a user hits the search box, they are searching an up-to-date version of the data. We will discuss this in more detail in Using Logs to Build a Solid Data Infrastructure.
You can invalidate or refill any caches so that reads can be served from fast caches while also ensuring that the data in the cache remains fresh.
And finally, you can even take one event stream, and process it in some way (perhaps joining a few streams together) to create a new output stream. This way, you can plug the output of one system into the input of another system. This is a very powerful way of building complex applications cleanly, which we will discuss in The Unix Philosophy of Distributed Data.
Moving to an event-sourcing-like approach for databases is a big change from the way that databases have traditionally been used (in which you can update and delete data at will). Why would you want to go to all that effort of changing the way you do things? What’s the benefit of using append-only streams of immutable events?
There are several reasons (Figure 1-27):
- Loose coupling
- If you write data to the database in the same schema as you use for reading, you have tight coupling between the part of the application doing the writing (the “button”) and the part doing the reading (the “screen”). We know that loose coupling is a good design principle for software. By separating the form in which you write and read data, and by explicitly translating from one to the other, you get much looser coupling between different parts of your application.
- Read and write performance
- The decades-old debate over normalization (faster writes) versus denormalization (faster reads) exists only because of the assumption that writes and reads use the same schema. If you separate the two, you can have fast writes and fast reads.
- Event streams are great for scalability because they are a simple abstraction (comparatively easy to parallelize and scale across multiple machines), and because they allow you to decompose your application into producers and consumers of streams (which can operate independently and can take advantage of more parallelism in hardware).
- Flexibility and agility
- Raw events are so simple and obvious that a “schema migration” doesn’t really make sense (you might just add a new field from time to time, but you don’t usually need to rewrite historic data into a new format). On the other hand, the ways in which you want to present data to users are much more complex, and can be continually changing. If you have an explicit translation process between the source of truth and the caches that you read from, you can experiment with new user interfaces by just building new caches using new logic, running the new system in parallel with the old one, gradually moving people over from the old system, and then discarding the old system (or reverting to the old system if the new one didn’t work). Such flexibility is incredibly liberating.
- Error scenarios
- Error scenarios are much easier to reason about if data is immutable. If something goes wrong in your system, you can always replay events in the same order and reconstruct exactly what happened9 (especially important in finance, for which auditability is crucial). If you deploy buggy code that writes bad data to a database, you can just re-run it after you fixed the bug and thus correct the outputs. Those things are not possible if your database writes are destructive.
Tools: Putting Ideas into Practice
Let’s talk about how you might put these ideas into practice. How do you build applications using this idea of event streams?
Some databases such as Event Store10 have oriented themselves specifically at the event sourcing model, and some people have implemented event sourcing on top of relational databases.
The systems I have worked with most—and that we discuss most in this report—are Apache Kafka11 and Apache Samza.12 Both are open source projects that originated at LinkedIn and now have a big community around them. Kafka provides a publish-subscribe message queuing service, supporting event streams with many millions of messages per second, durably stored on disk and replicated across multiple machines.13,14
For consuming input streams and producing output streams, Kafka comes with a client library called Kafka Streams (Figure 1-28): it lets you write code to process messages, and it handles stuff like state management and recovering from failures.15
I would definitely recommend Kafka as a system for high-throughput reliable event streams. When you want to write code to process those events, you can either use Kafka’s client libraries directly, or you can use one of several frameworks (Figure 1-29): Samza,16 Storm,17 Spark Streaming18 and Flink19 are the most popular. Besides message processing, these frameworks also include tools for deploying a processing job to a cluster of machines and scheduling its tasks.
There are interesting design differences (pros and cons) between these tools. In this report we will not go into the details of stream processing frameworks and their APIs—you can find a detailed comparison in the Samza documentation.20 Instead, in this report we focus on the conceptual foundations that underpin all stream processing systems.
Today’s distributed stream processing systems have their roots in stream processing research from the early 2000s (TelegraphCQ,21 Borealis,22 and so on), which originated from a relational database background. Just as NoSQL datastores stripped databases down to a minimal feature set, modern stream processing systems look quite stripped-down compared to the earlier research.
CEP, Actors, Reactive, and More
Contemporary distributed stream processing frameworks (Kafka Streams, Samza, Storm, Spark Streaming, Flink) are mostly concerned with low-level matters: how to scale processing across multiple machines, how to deploy a job to a cluster, how to handle faults (crashes, machine failures, network outages), and how to achieve reliable performance in a multitenant environment.23 The APIs they provide are often quite low-level (e.g., a callback that is invoked for every message). They look much more like MapReduce and less like a database, although there is work in progress to provide high-level query languages such as streaming SQL.
There is also some existing work on high-level query languages for stream processing, and CEP is especially worth mentioning (Figure 1-30). It originated in 1990s research on event-driven simulation.24 Many CEP products are commercial, expensive enterprise software, although Esper25 has an open source version. (Esper is a library that you can run inside a distributed stream processing framework, but it does not provide distributed query execution.)
With CEP, you write queries or rules that match certain patterns in the events. They are comparable to SQL queries (which describe what results you want to return from a database), except that the CEP engine continually searches the stream for sets of events that match the query and notifies you (generates a “complex event”) whenever a match is found. This is useful for fraud detection or monitoring business processes, for example.
For use cases that can be easily described in terms of a CEP query language, such a high-level language is much more convenient than a low-level event processing API. On the other hand, a low-level API gives you more freedom, allowing you to do a wider range of things than a query language would let you do. Also, by focusing their efforts on scalability and fault tolerance, stream processing frameworks provide a solid foundation upon which query languages can be built.
Another idea for high-level querying is doing full-text search on streams, whereby you register a search query in advance and then are notified whenever an event matches your query. For example, Elasticsearch Percolator26 provides this as a service, and Luwak27 implements full-text search on streams as an embeddable library.
Finally, there are a lot of other ideas that are somehow related to event streams (Figure 1-31). Here is a brief summary:
Distributed actor frameworks such as Akka,28 Orleans,29 and Erlang OTP30 are also based on streams of immutable events/messages. However, they are primarily a mechanism for programming concurrent systems, less a mechanism for data management. In principle, you could build a distributed stream processing framework on top of actors, but it’s worth looking carefully at the fault-tolerance guarantees and failure modes of these systems: many don’t provide durability, for example. SEDA architectures31 have some similarities to actors.
There’s a lot of buzz around “reactive”, which seems to encompass a quite loosely defined set of ideas.32 My impression is that there is some good work happening in dataflow languages, ReactiveX and functional reactive programming (FRP), which I see as mostly about bringing event streams to the user interface (i.e., updating the user interface when some underlying data changes).33 This is a natural counterpart to event streams in the data backend (we touch on it in Turning the Database Inside Out).
Finally, change data capture (CDC) means using an existing database in the familiar way, but extracting any inserts, updates, and deletes into a stream of data change events that other applications can consume. We discuss this in detail in Integrating Databases and Kafka with Change Data Capture.
I hope this chapter helped you make some sense of the many facets of stream processing. In Using Logs to Build a Solid Data Infrastructure, we dig deep into the idea of a “log,” which is a particularly good way of implementing streams.
2David C Luckham: “Rapide: A Language and Toolset for Simulation of Distributed Systems by Partial Orderings of Events,” Stanford University, Computer Systems Laboratory, Technical Report CSL-TR-96-705, September 1996.
3Jim N Gray, Surajit Chaudhuri, Adam Bosworth, et al.: “Data Cube: A Relational Aggregation Operator Generalizing Group-By, Cross-Tab, and Sub-Totals,” Data Mining and Knowledge Discovery, volume 1, number 1, pages 29–53, March 2007. doi:10.1023/A:1009726021843
4Vaughn Vernon: Implementing Domain-Driven Design. Addison-Wesley Professional, February 2013. ISBN: 0321834577
8John Day-Richter: “What’s different about the new Google Docs: Making collaboration fast,” googledrive.blogspot.com, 23 September 2010.
13Jay Kreps: “Benchmarking Apache Kafka: 2 Million Writes Per Second (On Three Cheap Machines),” engineering.linkedin.com, 27 April 2014.
21Sirish Chandrasekaran, Owen Cooper, Amol Deshpande, et al.: “TelegraphCQ: Continuous Dataflow Processing for an Uncertain World,” at 1st Biennial Conference on Innovative Data Systems Research (CIDR), January 2003.
22Daniel J Abadi, Yanif Ahmad, Magdalena Balazinska, et al.: “The Design of the Borealis Stream Processing Engine,” at 2nd Biennial Conference on Innovative Data Systems Research (CIDR), November 2004.
23Jay Kreps: “But the multi-tenancy thing is actually really really hard,” tweetstorm, twitter.com, 31 October 2014.