Turning the Database Inside Out

In the previous four chapters, we have covered a lot of ground:

  • In Events and Stream Processing, we discussed the idea of event sourcing; that is, representing the changes to a database as a log of immutable events. We explored the distinction between raw events (which are optimized for writing) and aggregated summaries of events (which are optimized for reading).

  • In Using Logs to Build a Solid Data Infrastructure, we saw how a log (an ordered sequence of events) can help integrate different storage systems by ensuring that data is written to all stores in the same order.

  • In Integrating Databases and Kafka with Change Data Capture, we discussed change data capture (CDC), a technique for taking the writes to a traditional database and turning them into a log. We saw how log compaction makes it possible for us to build new indexes onto existing data from scratch without affecting other parts of the system.

  • In The Unix Philosophy of Distributed Data, we explored the Unix philosophy for building composable systems and compared it to the traditional database philosophy. We saw how a Kafka-based stream data platform can scale to encompass the data flows in a large organization.

In this final chapter, we will pull all of those ideas together and use them to speculate about the future of databases and data-intensive applications. By extrapolating some current trends (such as the growing variety of SQL and NoSQL datastores being used, the growing mainstream use of functional programming, the increasing interactivity of user interfaces, and the proliferation of mobile devices) we can illuminate some of the path ahead: how will we be developing applications in a few years’ time?

To figure out an answer, we will examine some aspects of traditional database-backed applications (replication, secondary indexes, caching, and materialized views) and compare them to the event log approach discussed in the last few chapters. We will see that many of the internal architectural patterns of databases are being repeated at a larger scale on the infrastructure level.

What is happening here is very interesting: software engineers are taking the components of databases that have been traditionally fused together into a monolithic program, unbundling them into separate components, independently making each of those components highly scalable and robust, and then putting them back together again as a large-scale system. The final result looks somewhat like a database, except that it is flexibly composed around the structure of your application and operates at much larger scale. We are taking the database architecture we know and turning it inside out.

How Databases Are Used

To gain clarity, let’s take a few steps back and talk about databases. What I mean is not any particular brand of database—I don’t mind whether you’re using relational, or NoSQL, or something else. I’m really talking about the general concept of a database, as we use it when building applications.

Take, for example, the stereotypical web application architecture shown in Figure 5-1.

Simplest-case web application architecture.
Figure 5-1. Simplest-case web application architecture.

You have a client, which may be a web browser or a mobile app, and that client talks to some kind of server-side system (a “backend”). The backend typically implements some kind of business logic, performs access control, accepts input, and produces output. When the backend needs to remember something for the future, it stores that data in a database, and when it needs to look something up, it queries a database. That’s all very familiar stuff.

The way we typically build these sorts of applications is that we make the backend layer stateless: it processes every request independently, and doesn’t remember anything from one request to the next. That has a lot of advantages: you can scale-out the backend by just running more processes in parallel, and you can route any request to any backend instance (they are all equally well qualified to handle the request), so it’s easy to spread the load across multiple machines. Any state that is required to handle a request will be looked-up from the database on each request. That also works nicely with HTTP because HTTP is a stateless protocol.

However, the state must go somewhere, and so we put it in the database. We are now using the database as a kind of gigantic, global, shared, mutable state. It’s like a persistent global variable that’s shared between all your application servers.

This approach for building database-backed applications has worked for decades, so it can’t be all that bad. However, from time to time it’s worth looking beyond the familiar and explore potentially better ways of building software. For example, people who use functional programming languages say that the lack of mutable global variables is helpful for building better software, reducing bugs, making code easier to reason about, and so on. Perhaps something similar is true in database-backed applications?

The event sourcing approach we discussed in Events and Stream Processing is a way of moving from the imperative world of mutable state to the functional world of immutable values. In The Unix Philosophy of Distributed Data we also noticed that pipelines of Unix tools have a functional flavor. However, so far we have not been very clear about how to actually build systems that use these ideas.

To try to figure out a way forward, I’d like to review four different examples of things that databases currently do, and things that we do with databases. These four examples will help us structure the ideas around event streams and pave the way to applying them in practice.

1. Replication

We previously discussed replication in Using Logs to Build a Solid Data Infrastructure, and observed that leader-based replication uses a replication log to send data changes to followers (Figure 2-18). We came across the idea again in Integrating Databases and Kafka with Change Data Capture: change data capture is similar to replication, except that the follower is not another instance of the same database software, but a different storage technology.

What does such a replication log actually look like? For example, take the shopping cart example of Figure 1-10, in which customer 123 changes their cart to contain quantity 3 of product 999. The update is executed on the leader, and replicated to followers. There are several different ways by which the followers might apply this write. One option is to send the same update query to the follower, and it executes the same statement on its own copy of the database. Another option is to ship the write-ahead log from the leader to the follower.

A third option for replication, which I’ll focus on here, is called a logical log, which you can see illustrated in Figure 5-2. In this case, the leader writes out the effect that the query had—that is, which rows were inserted, updated, or deleted—like a kind of diff.

A logical change event in a replication log indicates which row changed and what its new value needs to be.
Figure 5-2. A logical change event in a replication log indicates which row changed and what its new value needs to be.

For an update, like in this example, the logical log identifies the row that was changed (using a primary key or some kind of internal tuple identifier), gives the new value of that row, and perhaps also the old value.

This might seem like nothing special, but notice that something interesting has happened (Figure 5-3).

In a logical replication log, imperative commands are transformed into immutable change events.
Figure 5-3. In a logical replication log, imperative commands are transformed into immutable change events.

At the top of Figure 5-3, we have the update statement, an imperative statement describing the state mutation. It is an instruction to the database, telling it to modify certain rows in the database that match certain conditions.

On the other hand, when the write is replicated from the leader to the follower as part of the logical log, it takes a different form: it becomes an event, stating that at a particular point in time, a particular customer changed the quantity of a particular product in their cart from 1 to 3. This is a fact—even if the customer later removes the item from their cart, or changes the quantity again, or goes away and never comes back, that doesn’t change the fact that this state change occurred. The fact always remains true.

We can see that a change event in the replication log actually looks quite similar to an event in the sense of event sourcing (Events and Stream Processing). Thus, even if you use your database in the traditional way—overwriting old state with new state—the database’s internal replication mechanism may still be translating those imperative statements into a stream of immutable events.

Hold that thought for now; I’m going to talk about some completely different things and return to this idea later.

2. Secondary Indexes

Our second example of things that databases do is secondary indexing. You’re probably familiar with secondary indexes; they are the bread and butter of relational databases.

Let’s use the shopping cart example again (Figure 5-4): to efficiently find all the items that a particular customer has in their cart, you need an index on customer_id. If you also create an index on product_id, you can efficiently find all the carts that contain a particular product.

Secondary indexes allow you to efficiently look up rows by their value in a particular column.
Figure 5-4. Secondary indexes allow you to efficiently look up rows by their value in a particular column.

What does the database do when you run one of these CREATE INDEX queries? It scans over the entire table, and it creates an auxiliary data structure for each index (Figure 5-5).

Each index is a separate data structure that is derived from the rows in the table.
Figure 5-5. Each index is a separate data structure that is derived from the rows in the table.

An index is a data structure that represents the information in the base table in some different way. In this case, the index is a key-value-like structure: the keys are the contents of the column that you’re indexing, and the values are the rows that contain this particular key.

Put another way: to build the index for the customer_id column, the database takes all the values that appear in that column, and uses them as keys in a dictionary. A value points to all occurrences of that value—for example, the index entry 123 points to all of the rows that have a customer_id of 123. This index construction is illustrated in Figure 5-6.

Values in the table become keys in the index.
Figure 5-6. Values in the table become keys in the index.

The important point here is that the process of going from the base table to the indexes is completely mechanical. You simply tell the database that you want a particular index to exist, and it goes away and builds that index for you.

The index doesn’t add any new information to the database—it just represents the same data in a different form. (Put another way, if you drop the index, that doesn’t delete any data from your database; see also Figure 2-5.) An index is a redundant data structure that only exists to make certain queries faster, and that can be entirely derived from the original table (Figure 5-7).

An index is derived from the data in the table by using a deterministic transformation.
Figure 5-7. An index is derived from the data in the table by using a deterministic transformation.

Creating an index is essentially a transformation which takes a database table as input and produces an index as output. The transformation consists of going through all the rows in the table, picking out the field that you want to index, and restructuring the data so that you can look up by that field. That transformation process is built into the database, so you don’t need to implement it yourself. You just tell the database that you want an index on a particular field to exist, and it does all the work of building it.

Here’s another great thing about indexes: whenever the data in the underlying table changes, the database automatically updates the indexes to be consistent with the new data in the table. In other words, this transformation function which derives the index from the original table is not just applied once when you create the index: it’s applied continuously.

With many databases, these index updates are even done in a transactionally consistent way. This means that any later transactions will see the data in the index in the same state as it is in the underlying table. If a transaction aborts and rolls back, the index modifications are also rolled back. This is a really great feature that we often don’t appreciate!

The CONCURRENTLY option in PostgreSQL allows an index to be built without locking the base table for writes.
Figure 5-8. The CONCURRENTLY option in PostgreSQL allows an index to be built without locking the base table for writes.

Moreover, some databases let you build an index at the same time as continuing to process write queries. In PostgreSQL, for example, you can say CREATE INDEX CONCURRENTLY (Figure 5-8). On a large table, creating an index could take several hours, and on a production database, you wouldn’t want to have to stop writing to the table while the index is being built. The index builder needs to be a background process that can run while your application is simultaneously reading and writing to the database as usual.

The fact that databases can do this is quite impressive. After all, to build an index, the database must scan the entire table contents, but those contents are changing at the same time as the scan is happening. The index builder is tracking a moving target. At the end, the database ends up with a transactionally consistent index, despite the fact that the data was changing concurrently.

To do this, the database needs to build the index from a consistent snapshot at one point in time. It also needs to keep track of all the changes that occurred since that snapshot while the index build was in progress. The procedure is remarkably similar to what we saw in Integrating Databases and Kafka with Change Data Capture in the context of change capture (Figure 3-2). Creating a new index outside of the database (Figure 3-7) is not that different from creating a new index inside of the database.

So far, we’ve discussed two aspects of databases: replication and secondary indexing. Let’s move on to the third: caching.

3. Caching

What I’m talking about here is caching that is explicitly done by the application. (Caching also happens automatically at various levels, such as the operating system’s page cache and the CPU’s cache, but that’s not what I’m referring to here.)

Suppose that you have a website that becomes popular, and it becomes too expensive or too slow to hit the database for every web request, so you introduce a caching layer—often implemented by using memcached or Redis or something of that sort. Often this cache is managed in application code, which typically looks something like Figure 5-9.

A read-through cache managed in application code.
Figure 5-9. A read-through cache managed in application code.

When a request arrives at the application, you first look in a cache to see whether the data you want is already there. The cache lookup is typically by some key that describes the data you want. If the data is in the cache, you can return it straight to the client.

If the data you want isn’t in the cache, that’s a cache miss. You then go to the underlying database and query the data that you want. On the way out, the application also writes that data to the cache so that it’s there for the next request that needs it. The thing it writes to the cache is whatever the application would have wanted to see there in the first place. Then, the application returns the data to the client.

This is a very common pattern, but there are several big problems with it (Figure 5-10).

Problems with application-managed read-through caches.
Figure 5-10. Problems with application-managed read-through caches.

Cache invalidation is considered by some to be a difficult problem to the point of cliché.1 When data in the underlying database changes, how do you know which entries in the cache to expire or update? Figuring out which database change affects which cache entries is tractable for simple data models, and algorithms such as generational caching and russian-doll caching2 are used. For more complex data dependencies, invalidation algorithms become complex, brittle, and error-prone. Some applications side-step the problem by using only a time-to-live (expiry time) and accepting that they sometimes read stale data from the cache.

Another problem is that this architecture is very prone to race conditions. In fact, it is an example of the dual-writes problem that we saw in Using Logs to Build a Solid Data Infrastructure (Figure 2-9): several clients concurrently accessing the same data can cause the cache to become inconsistent with the database.

A third problem is cold start. If you reboot your memcached servers and they lose all their cached contents, suddenly every request is a cache miss, the database is overloaded because of the sudden surge in requests, and you’re in a world of pain. If you want to create a new cache, you need some way of bootstrapping its contents without overloading other parts of the system.

So, here we have a contrast (Figure 5-11). On the one hand, creating a secondary index in a database is beautifully simple, one line of SQL—the database handles it automatically, keeping everything up-to-date and even making the index transactionally consistent. On the other hand, application-level cache maintenance is a complete mess of complicated invalidation logic, race conditions, and operational problems.

Databases hide the complexity of creating a secondary index behind a simple interface, but application-level cache maintenance is a complete mess.
Figure 5-11. Databases hide the complexity of creating a secondary index behind a simple interface, but application-level cache maintenance is a complete mess.

Why should it be that way? Secondary indexes and caches are not fundamentally different. We said earlier that a secondary index is just a redundant data structure on the side, which structures the same data in a different way, in order to speed up read queries. If you think about it, a cache is also the result of taking your data in one form (the form in which it’s stored in the database) and transforming it into a different form for faster reads. In other words, the contents of the cache are derived from the contents of the database (Figure 5-12) — very similar to an index.

Similarly to an index, the contents of a cache are derived from the contents of the database.
Figure 5-12. Similarly to an index, the contents of a cache are derived from the contents of the database.

We said that a secondary index is built by picking out one field from every record and using that as the key in a dictionary (Figure 5-7). In the case of a cache, we may apply an arbitrary function to the data (Figure 5-12): the data from the database may have gone through some kind of business logic or rendering before it’s put in the cache, and it may be the result of joining several records from different tables. But, the end result is similar: if you lose your cache, you can rebuild it from the underlying database; thus, the contents of the cache are derived from the database.

In a read-through cache, this transformation happens on the fly, when there is a cache miss. However, we could perhaps imagine making the process of building and updating a cache more systematic, and more similar to secondary indexes. Let’s return to that idea later.

Now, let’s move on to the fourth idea about databases: materialized views.

4. Materialized Views

You might already know what materialized views are, but let me explain them briefly in case you’ve not previously come across them. You might be more familiar with “normal” views—non-materialized views, or virtual views, or whatever you want to call them.

They work like this: in a relational database, where views are common, you would create a view by saying “CREATE VIEW viewname...” followed by a SELECT query (Figure 5-13).

A non-materialized (virtual) view is just an alias for a query; when you read from the view, the database translates it into the underlying query.
Figure 5-13. A non-materialized (virtual) view is just an alias for a query; when you read from the view, the database translates it into the underlying query.

When you look at this view in the database, it looks somewhat like a table—you can use it in read queries like any other table. And when you do this, say you SELECT * from that view, the database’s query planner actually rewrites the query into the underlying query that you used in the definition of the view.

So, you can think of a view as a kind of convenient alias, a wrapper that allows you to create an abstraction, hiding a complicated query behind a simpler interface—but it has no consequences for performance or data storage.

Contrast that with a materialized view, which is defined using almost identical syntax (see Figure 5-14).

Materialized view: very similar syntax, very different implementation.
Figure 5-14. Materialized view: very similar syntax, very different implementation.

You also define a materialized view in terms of a SELECT query; the only syntactic difference is that you say CREATE MATERIALIZED VIEW instead of CREATE VIEW. However, the implementation is totally different.

When you create a materialized view, the database starts with the underlying tables—that is, the tables you’re querying in the SELECT statement of the view (“bar” in the example). The database scans over the entire contents of those tables, executes that SELECT query on all of the data, and copies the results of that query into something like a temporary table.

The results of this query are actually written to disk, in a form that’s very similar to a normal table. And that’s really what “materialized” means in this context: the view’s query has been executed, and the results written to disk.

Remember that with the non-materialized view, the database would expand the view into the underlying query at query time. On the other hand, when you query a materialized view, the database can read its contents directly from the materialized query result because the view’s underlying query has already been executed ahead of time. This is especially useful if the underlying query is expensive.

If you’re thinking, “this seems like a cache of query results,” you would be right—that’s exactly what it is. However, the big difference between a materialized view and application-managed caches is the responsibility for keeping it up to date.

Like caches and secondary indexes, materialized views are also redundant data that is derived from the underlying tables.
Figure 5-15. Like caches and secondary indexes, materialized views are also redundant data that is derived from the underlying tables.

With a materialized view, you declare once how you want the materialized view to be defined, and the database takes care of building that view from a consistent snapshot of the underlying tables (Figure 5-15, much like building a secondary index). Moreover, when the data in the underlying tables changes, the database takes responsibility for maintaining the materialized view, keeping it up-to-date. Some databases do this materialized view maintenance on an ongoing basis, and some require you to periodically refresh the view so that changes take effect, but you certainly don’t have to do cache invalidation in your application code.

An advantage of application-managed caches is that you can apply arbitrary business logic to the data before storing it in the cache so that you can do less work at query time or reduce the amount of data you need to cache. Doing the same in a materialized view would require that you run your application code in the database as a stored procedure (Figure 4-10). As discussed in The Unix Philosophy of Distributed Data, this is possible in principle, but often operationally problematic in practice. However, materialized views address the concurrency control and bootstrapping problems of caches (Figure 5-10).

Materialized Views: Self-Updating Caches

There’s something really compelling about the idea of materialized views. I see a materialized view almost as a kind of cache that magically keeps itself up to date. Instead of putting all of the complexity of cache invalidation in the application (risking race conditions and all of the problems we have discussed), materialized views say that cache maintenance should be the responsibility of the data infrastructure.

So, let’s think about this: can we reinvent materialized views, implement them in a modern and scalable way, and use them as a general mechanism for cache maintenance? If we started with a clean slate, without the historical baggage of existing databases, what would the ideal architecture for applications look like (Figure 5-17)?

What would materialized views look like if we started with a clean slate?
Figure 5-17. What would materialized views look like if we started with a clean slate?

In Integrating Databases and Kafka with Change Data Capture, we discussed building a completely new index using the events in a log-compacted Kafka topic and then keeping it up-to-date by continuously consuming events from the log and applying them to the index. Whether we call this an index, or a cache, or a materialized view does not make a big difference: they are all derived representations of the data in the log (Figure 5-18).

An index, a cache and a materialized view are all just projections of the log into a read-optimized structure.
Figure 5-18. An index, a cache and a materialized view are all just projections of the log into a read-optimized structure.

The difference is that an index is typically built by extracting one field from an event, and using it as lookup key (Figure 5-6), whereas constructing a cache or a materialized view might require more complex transformations:

  • In a materialized view, you might want data from several sources to be joined together into a denormalized object, to save having to perform the join at read time. For example, in Figure 1-17, each tweet contains only the user_id of the author, but when reading tweets, you want the tweet to be joined with the user profile information (the username, profile photo, etc.).

  • The materialized view can contain aggregate functions such as sum or count (e.g., the number of likes in Figure 1-20, or the count of unread messages in Figure 2-10).

  • You might need some arbitrary business logic to be applied (e.g., to honor the user’s privacy settings).

Stream processing frameworks allow you to implement such joins, aggregations, and arbitrary business logic—we will look at an example shortly.

Let’s also be clear about how a materialized view is different from a cache (Figure 5-19).

Advantages of a materialized view over an application-managed read-through cache.
Figure 5-19. Advantages of a materialized view over an application-managed read-through cache.

As discussed, an application-managed read-through cache is invalidated or updated directly by application code, whereas a materialized view is maintained by consuming a log. This has some important advantages:

  • A cache is filled on demand when there is a cache miss (so the first request for a given object is always slow, and you have the cold-start problem mentioned in Figure 5-10). By contrast, a materialized view is precomputed; that is, its entire contents are computed before anyone asks for it—just like an index. This means there is no such thing as a cache miss: if an item doesn’t exist in the materialized view, it doesn’t exist in the database. There is no need to fall back to some other underlying database. (This doesn’t mean the entire view has to be in memory: just like an index, it can be written to disk, and the hot parts will automatically be kept in memory in the operating system’s page cache.)

  • With a materialized view there is a well-defined translation process that takes the write-optimized events in the log and transforms them into the read-optimized representation in the view. By contrast, in the typical read-through caching approach, the cache management logic is deeply interwoven with the rest of the application, making it prone to bugs and difficult to reason about.

  • That translation process runs in a stream processor which you can test, deploy, monitor, debug, scale, and maintain independently from the rest of your application. The stream processor consumes events in log order, making it much less susceptible to race conditions. If it fails and is restarted, it just keeps going from where it left off. If you deploy bad code, you can re-run the stream processor on historical data to fix up its mistakes.

  • With log compaction, you can build a brand new index by processing a stream from the beginning (Figure 3-7); the same is true of materialized views. If you want to present your existing data in some new way, you can simply create a new stream processing job, consume the input log from the beginning, and thus build a completely new view onto all the existing data. You can then maintain both views in parallel, gradually move clients to the new view, run A/B tests across the two views, and eventually discard the old view. No more scary stop-the-world schema migrations.

Example: Implementing Twitter

Let’s make materialized views more concrete by looking at an example. In Events and Stream Processing, we looked at how you might implement a Twitter-like messaging service. The most common read operation on that service is requesting the “home timeline”; that is, you want to see all recent tweets by users you follow (including username and profile picture for the sender of each tweet, see Figure 1-17).

In Figure 1-18, we saw a SQL query for a home timeline, but we noted that it is too slow to execute that query on every read. Instead, we need to precompute each user’s home timeline ahead of time so that it’s already there when the user asks for it. Sounds a bit like a materialized view, doesn’t it?

No existing database is able to provide materialized views at Twitter’s scale, but such materialized timelines can be implemented using stream processing tools.3 Figure 5-20 shows a sketch of how you might do this.4

Implementing Twitter timelines by using a stream processing system.
Figure 5-20. Implementing Twitter timelines by using a stream processing system.

To start with, you need to make all data sources available as event streams, either by using CDC (Integrating Databases and Kafka with Change Data Capture) or by writing events directly to a log (Using Logs to Build a Solid Data Infrastructure). In this example, we have event streams from three data sources:

Whenever a tweet is sent or retweeted, that is an event. It is quite natural to think of these as a stream.
User profiles
Every time a user changes their username or profile picture, that is a profile update event. This stream needs to be log-compacted, so that you can reconstruct the latest state of all user profiles from the stream.
Follow graph
Every time someone follows or unfollows another user, that’s an event. The full history of these events determines who is following whom.

If you put all of these streams in Kafka, you can create materialized views by writing stream processing jobs using Kafka Streams or Samza. For example, you can write a simple job that counts how many times a tweet has been retweeted, generating a “retweet count” materialized view.

You can also join streams together. For example, you can join tweets with user profile information, so the result is a stream of tweets in which each tweet carries a bit of denormalized profile information (e.g., username and profile photo of the sender). When someone updates their profile, you can decide whether the change should take effect only for their future tweets, or also for their most recent 100 tweets, or for every tweet they ever sent—any of these can be implemented in the stream processor. (It may be inefficient to rewrite thousands of cached historical tweets with a new username, but this is something you can easily adjust, as appropriate.)

Next, you can join tweets with followers. By collecting follow/unfollow events, you can build up a list of all users who currently follow user X. When user X tweets something, you can scan over that list, and deliver the new tweet to the home timeline of each of X’s followers (Twitter calls this fan-out5).

Thus, the home timelines are like a mailbox, containing all the tweets that the user should see when they next log in. That mailbox is continually updated as people send tweets, update their profiles, and follow and unfollow one another. We have effectively created a materialized view for the SQL query in Figure 1-18. Note that the two joins in that query correspond to the two stream joins in Figure 5-20: the stream processing system is like a continuously running query execution graph!

The Unbundled Database

What we see here is an interesting pattern: derived data structures (indexes, materialized views) have traditionally been implemented internally within a monolithic database, but now we are seeing similar structures increasingly being implemented at the application level, using stream processing tools.

This trend is driven by need: nobody would want to re-implement these features in a production system if existing databases already did the job well enough. Building database-like features is difficult: it’s easy to introduce bugs, and many storage systems have high reliability requirements. Our discussion of read-through caching shows that data management at the application level can get very messy.

However, for better or for worse, this trend is happening. We are not going to judge it; we’re going to try only to understand it and learn some lessons from the last few decades of work on databases and operating systems.

Earlier in this chapter (Figure 5-2) we observed that a database’s replication log can look quite similar to an event log that you might use for event sourcing. The big difference is that an event log is an application-level construct, whereas a replication log is traditionally considered to be an implementation detail of a database (Figure 5-21).

In traditional database architecture, the replication log is considered an implementation detail, not part of the database’s public API.
Figure 5-21. In traditional database architecture, the replication log is considered an implementation detail, not part of the database’s public API.

SQL queries and responses are traditionally the database’s public interface—and the replication log is an aspect that is hidden by that abstraction. (Change data capture is often retrofitted and not really part of the public interface.)

One way of interpreting stream processing is that it turns the database inside out: the commit log or replication log is no longer relegated to being an implementation detail; rather, it is made a first-class citizen of the application’s architecture. We could call this a log-centric architecture, and interestingly, it begins to look somewhat like a giant distributed database:6

  • You can think of various NoSQL databases, graph databases, time series databases, and full-text search servers as just being different index types. Just like a relational database might let you choose between a B-Tree, an R-Tree and a hash index (for example), your data system might write data to several different data stores in order to efficiently serve different access patterns.

  • The same data can easily be loaded into Hadoop, a data warehouse, or analytic database (without complicated ETL processes, because event streams are already analytics friendly) to provide business intelligence.

  • The Kafka Streams library and stream processing frameworks such as Samza are scalable implementations of triggers, stored procedures and materialized view maintenance routines.

  • Datacenter resource managers such as Mesos or YARN provide scheduling, resource allocation, and recovery from physical machine failures.

  • Serialization libraries such as Avro, Protocol Buffers, or Thrift handle the encoding of data on the network and on disk. They also handle schema evolution (allowing the schema to be changed over time without breaking compatibility).

  • A log service such as Apache Kafka or Apache BookKeeper7 is like the database’s commit log and replication log. It provides durability, ordering of writes, and recovery from consumer failures. (In fact, people have already built databases that use Kafka as transaction/replication log.8)

In a traditional database, all of those features are implemented in a single monolithic application. In a log-centric architecture, each feature is provided by a different piece of software. The result looks somewhat like a database, but with its individual components “unbundled” (Figure 5-22).

Updating indexes and materialized views based on writes in a log: more or less what a traditional database already does internally, at smaller scale.
Figure 5-22. Updating indexes and materialized views based on writes in a log: more or less what a traditional database already does internally, at smaller scale.

In the unbundled approach, each component is a separately developed project, and many of them are open source. Each component is specialized: the log implementation does not try to provide indexes for random-access reads and writes—that service is provided by other components. The log can therefore focus its effort on being a really good log: it does one thing well (cf. Figure 4-3). A similar argument holds for other parts of the system.

The advantage of this approach is that each component can be developed and scaled independently, providing great flexibility and scalability on commodity hardware.9 It essentially brings the Unix philosophy to databases: specialized tools are composed into an application that provides a complex service.

The downside is that there now many different pieces to learn about, deploy, and operate. Many practical details need to be figured out: how do we deploy and monitor these various components, how do we make the system robust to various kinds of fault, how do we productively write software in this kind of environment (Figure 5-23)?

These ideas are new, and many challenges lie ahead on the path toward maturity.
Figure 5-23. These ideas are new, and many challenges lie ahead on the path toward maturity.

Because many of the components were designed independently, without composability in mind, the integrations are not as smooth as one would hope (see change data capture, for example). And there is not yet a convincing equivalent of SQL or the Unix shell—that is, a high-level language for concisely describing data flows—for log-centric systems and materialized views. All in all, these systems are not nearly as elegantly integrated as a monolithic database from a single vendor.

Yet, there is hope. Linux distributions and Hadoop distributions are also assembled from many small parts written by many different groups of people, and they nevertheless feel like reasonably coherent products. We can expect the same will be the case with a Stream Data Platform.10

This log-centric architecture for applications is definitely not going to replace databases, because databases are still needed to serve the materialized views. Also, data warehouses and analytic databases will continue to be important for answering ad hoc, exploratory queries.

I draw the comparison between stream processing and database architecture only because it helps clarify what is going on here: at scale, no single tool is able to satisfy all use cases, so we need to find good patterns for integrating a diverse set of tools into a single system. The architecture of databases provides a good set of patterns.

Streaming All the Way to the User Interface

Before we wrap up, there is one more thing we should talk about in the context of event streams and materialized views. (I saved the best for last!)

Imagine what happens when a user of your application views some data. In a traditional database architecture, the data is loaded from a database, perhaps transformed with some business logic, and perhaps written to a cache. Data in the cache is rendered into a user interface in some way—for example, by rendering it to HTML on the server, or by transferring it to the client as JSON and rendering it on the client.

The result of template rendering is some kind of structure describing the user interface layout: in a web browser, this would be the HTML DOM, and in a native application this would be using the operating system’s UI components. Either way, a rendering engine eventually turns this description of UI components into pixels in video memory, and this is what the graphics device actually displays on the screen.

When you look at it like this, it looks very much like a data transformation pipeline (Figure 5-24). You can think of each lower layer as a materialized view onto the upper layer: the cache is a materialized view of the database (the cache contents are derived from the database contents); the HTML DOM is a materialized view of the cache (the HTML is derived from the JSON stored in the cache); and the pixels in video memory are a materialized view of the HTML DOM (the browser rendering engine derives the pixels from the UI layout).

Rendering data on screen requires a sequence of transformation steps, not unlike materialized views.
Figure 5-24. Rendering data on screen requires a sequence of transformation steps, not unlike materialized views.

Now, how well does each of these transformation steps work? I would argue that web browser rendering engines are brilliant feats of engineering. You can use JavaScript to change some CSS class, or have some CSS rules conditional on mouse-over, and the rendering engine automatically figures out which rectangle of the page needs to be redrawn as a result of the changes. It does hardware-accelerated animations and even 3D transformations. The pixels in video memory are automatically kept up to date with the underlying DOM state, and this very complex transformation process works remarkably well.

What about the transformation from data objects to user interface components? For now, I consider it “so-so,” because the techniques for updating user interface based on data changes are still quite new. However, they are rapidly maturing: on the web, frameworks such as Facebook’s React,11 Angular,12 and Ember13 are enabling user interfaces that can be updated from a stream, and Functional Reactive Programming (FRP) languages such as Elm14 are in the same area. There is a lot of activity in this field, and it is heading in a good direction.

The transformation from database contents to cache entries is now the weakest link in this entire data-transformation pipeline. The problem is that a cache is request-oriented: a client can read from it, but if the data subsequently changes, the client doesn’t find out about the change (it can poll periodically, but that soon becomes inefficient).

We are now in the bizarre situation in which the UI logic and the browser rendering engine can dynamically update the pixels on the screen in response to changes in the underlying data, but the database-driven backend services don’t have a way of notifying clients about data changes. To build applications that quickly respond to user input (such as real-time collaborative apps), we need to make this pipeline work smoothly, end to end.

Fortunately, if we build materialized views that are maintained by using stream processors, as discussed in this chapter, we have the missing piece of the pipeline (Figure 5-25).

If you update materialized views by using an event stream, you can also push changes to those views to clients.
Figure 5-25. If you update materialized views by using an event stream, you can also push changes to those views to clients.

When a client reads from a materialized view, it can keep the network connection open. If that view is later updated, due to some event that appeared in the stream, the server can use this connection to notify the client about the change (for example, using a WebSocket15 or Server-Sent Events16). The client can then update its user interface accordingly.

This means that the client is not just reading the view at one point in time, but actually subscribing to the stream of changes that may subsequently happen. Provided that the client’s Internet connection remains active, the server can push any changes to the client, and the client can immediately render it. After all, why would you ever want outdated information on your screen if more recent information is available? The notion of static web pages, which are requested once and then never change, is looking increasingly anachronistic.

However, allowing clients to subscribe to changes in data requires a big rethink of the way we write applications. The request-response model is very deeply engrained in our thinking, in our network protocols and in our programming languages: whether it’s a request to a RESTful service, or a method call on an object, the assumption is generally that you’re going to make one request, and get one response. In most APIs there is no provision for an ongoing stream of responses.

To support dynamically updated views we need to move away from request/response RPC models and use push-based publish-subscribe dataflow everywhere.
Figure 5-26. To support dynamically updated views we need to move away from request/response RPC models and use push-based publish-subscribe dataflow everywhere.

This will need to change. Instead of thinking of requests and responses, we need to begin thinking of subscribing to streams and notifying subscribers of new events (Figure 5-26). This needs to happen through all the layers of the stack—the databases, the client libraries, the application servers, the business logic, the frontends, and so on. If you want the user interface to dynamically update in response to data changes, that will only be possible if we systematically apply stream thinking everywhere so that data changes can propagate through all the layers.

Most RESTful APIs, database drivers, and web application frameworks today are based on a request/response assumption, and they will struggle to support streaming dataflow. In the future, I think we’re going to see a lot more people using stream-friendly programming models. We came across some of these in Events and Stream Processing (Figure 1-31): frameworks based on actors and channels, or reactive frameworks (ReactiveX, functional reactive programming), are a natural fit for applications that make heavy use of event streams.

I’m glad to see that some people are already working on better end-to-end support for event streams. For example, RethinkDB supports queries that notify the client if query results change.17 Meteor18 and Firebase19 are frameworks that integrate the database backend and user interface layers so as to be able to push changes into the user interface. These are excellent efforts. We need many more like them (Figure 5-27).

Event streams are a splendid idea. We should put them everywhere.
Figure 5-27. Event streams are a splendid idea. We should put them everywhere.


Application development is fairly easy if a single monolithic database can satisfy all of your requirements for data storage, access, and processing. As soon as that is no longer the case—perhaps due to scale, or complexity of data access patterns, or other reasons—there is a lack of guidance and patterns to help application developers build reliable, scalable and maintainable applications.

In this report, we explored a particular architectural style for building large-scale applications, based on streams of immutable events (event logs). Stream processing is already widely used for analytics and monitoring purposes (e.g., finding certain patterns of events for fraud detection purposes, or alerting about anomalies in time series data), but in this report we saw that stream processing is also good for situations that are traditionally considered to be in the realm of OLTP databases: maintaining indexes and materialized views.

In this world view, the event log is regarded as the system of record (source of truth), and other datastores are derived from it through stream transformations (mapping, joining, and aggregating events). Incoming data is written to the log, and read requests are served from a datastore containing some projection of the data.

The following are some of the most important observations we made about log-centric systems:

  • An event log such as Apache Kafka scales very well. Because it is such a simple data structure, it can easily be partitioned and replicated across multiple machines, and is comparatively easy to make reliable. It can achieve very high throughput on disks because its I/O is mostly sequential.

  • If all your data is available in the form of a log, it becomes much easier to integrate and synchronize data across different systems. You can easily avoid race conditions and recover from failures if all consumers see events in the same order. You can rewind the stream and re-process events to build new indexes and recover from corruption.

  • Materialized views, maintained through stream processors, are a good alternative to read-through caches. A view is fully precomputed (avoiding the cold-start problem, and allowing new views to be created easily) and kept up to date through streams of change events (avoiding race conditions and partial failures).

  • Writing data as an event log produces better-quality data than if you update a database directly. For example, if someone adds an item to their shopping cart and then removes it again, your analytics, audit, and recommendation systems might want to know. This is the motivation behind event sourcing.

  • Traditional database systems are based on the fallacy that data must be written in the same form as it is read. As we saw in Events and Stream Processing, an application’s inputs often look very different from its outputs. Materialized views allow us to write input data as simple, self-contained, immutable events, and then transform it into several different (denormalized or aggregated) representations for reading.

  • Asynchronous stream processors usually don’t have transactions in the traditional sense, but you can still guarantee integrity constraints (e.g., unique username, positive account balance) by using the ordering of the event log (Figure 2-31).

  • Change data capture is a good way of bringing existing databases into a log-centric architecture. In order to be fully useful, it must capture both a consistent snapshot of the entire database, and also the ongoing stream of writes in transaction commit order.

  • To support applications that dynamically update their user interface when underlying data changes, programming models need to move away from a request/response assumption and become friendlier to streaming dataflow.

We are still figuring out how to build large-scale applications well—what techniques we can use to make our systems scalable, reliable, and maintainable. However, to me, this approach of immutable events, stream processing, and materialized views seems like a very promising route forward. I am optimistic that this kind of application architecture will help us to build better software faster.

Fortunately, this is not science fiction—it’s happening now. People are working on various parts of the problem and finding good solutions. The tools at our disposal are rapidly becoming better. It’s an exciting time to be building software.

1Phil Karlton: “There are only two hard things in Computer Science: cache invalidation and naming things.” Quoted on martinfowler.com.

2David Heinemeier Hansson: “How Basecamp Next got to be so damn fast without using much client-side UI,” signalvnoise.com, 18 February 2012.

3Raffi Krikorian: “Timelines at Scale,” at QCon San Francisco, November 2012.

4Martin Kleppmann: “Samza newsfeed demo,” github.com, September 2014.

5Raffi Krikorian: “Timelines at Scale,” at QCon San Francisco, November 2012.

6Jay Kreps: “The Log: What every software engineer should know about real-time data’s unifying abstraction,” engineering.linkedin.com, 16 December 2013.

7Apache BookKeeper,” Apache Software Foundation, bookkeeper.apache.org.

8Gavin Li, Jianqiu Lv, and Hang Qi: “Pistachio: co-locate the data and compute for fastest cloud compute,” yahooeng.tumblr.com, 13 April 2015.

9Jun Rao: “The value of Apache Kafka in Big Data ecosystem,” odbms.org, 16 June 2015.

10Neha Narkhede: “Announcing the Confluent Platform 2.0,” confluent.io, 8 December, 2015.

11React,” Facebook Inc., facebook.github.io.

12AngularJS,” Google, Inc., angularjs.org.

13Ember,” Tilde Inc., emberjs.com.

14Evan Czaplicki: “Elm,” elm-lang.org.

15WebSockets,” Mozilla Developer Network, developer.mozilla.org.

16Server-sent events,” Mozilla Developer Network, developer.mozilla.org.

17Slava Akhmechet: “Advancing the realtime web,” rethinkdb.com, 27 January 2015.

18Meteor,” Meteor Development Group, meteor.com.

19Firebase,” Google Inc., firebase.com.

Article image: Flowing stream. (source: Getty Images / O'Reilly).