Using Logs to Build a Solid Data Infrastructure

In Events and Stream Processing, we explored the idea of representing data as a series of events. This idea applies not only if you want to keep track of things that happened (e.g., page views in an analytics application), but we also saw that events work well for describing changes to a database (event sourcing).

However, so far we have been a bit vague about what the stream should look like. In this chapter, we will explore the answer in detail: a stream should be implemented as a log; that is, an append-only sequence of events in a fixed order. (This is what Apache Kafka does.)

It turns out that the ordering of events is really important, and many systems (such as AMQP or JMS message queues) do not provide a fixed ordering. In this chapter, we will go on a few digressions outside of stream processing, to look at logs appearing in other places: in database storage engines, in database replication, and even in distributed consensus systems.

Then, we will take what we have learned from those other areas of computing and apply it to stream processing. Those lessons will help us build applications that are operationally robust, reliable, and that perform well.

But before we get into logs, we will begin this chapter with a motivating example: the sprawling complexity of data integration in a large application. If you work on a non-trivial application—something with more than just one database—you’ll probably find these ideas very useful. (Spoiler: the solution involves a log.)

Case Study: Web Application Developers Driven to Insanity

To begin, let’s assume that you’re working on a web application. In the simplest case, it probably has the stereotypical three-tier architecture (Figure 2-1): you have some clients (which may be web browsers, or mobile apps, or both), which make requests to a web application running on your servers. The web application is where your application code or business logic lives.

One web app, one database: life is simple.
Figure 2-1. One web app, one database: life is simple.

Whenever the application wants to remember something for the future, it stores it in a database. Accordingly, whenever the application wants to look up something that it stored previously, it queries the database. This approach is simple to understand and works pretty well.

Things usually don’t stay so simple for long.
Figure 2-2. Things usually don’t stay so simple for long.

However, things usually don’t stay so simple for long (Figure 2-2). Perhaps you get more users, making more requests, your database becomes too slow, and you add a cache to speed it up—perhaps memcached or Redis, for example. Perhaps you need to add full-text search to your application, and the basic search facility built into your database is not good enough, so you set up a separate indexing service such as Elasticsearch or Solr.

Perhaps you need to do some graph operations that are not efficient on a relational or document database—for example for social features or recommendations—so you add a separate graph index to your system. Perhaps you need to move some expensive operations out of the web request flow and into an asynchronous background process, so you add a message queue that lets you send jobs to your background workers.

And it gets worse... (Figure 2-3)

As the features and the business requirements of an application grow, we see a proliferation of different tools being used in combination with one another.
Figure 2-3. As the features and the business requirements of an application grow, we see a proliferation of different tools being used in combination with one another.

By now, other parts of the system are becoming slow again, so you add another cache. More caches always make things faster, right? But now you have a lot of systems and services, so you need to add metrics and monitoring so that you can see whether they are actually working. Of course, the metrics system is another system in its own right.

Next, you want to send notifications, such as email or push notifications to your users, so you chain a notification system off the side of the job queue for background workers, and perhaps it needs some kind of database of its own to keep track of stuff. However, now you’re generating a lot of data that needs to be analyzed, and you can’t have your business analysts running big expensive queries on your main database, so you add Hadoop or a data warehouse and load the data from the database into it.

Now that your business analytics are working, you find that your search system is no longer keeping up... but you realize that because you have all the data in HDFS anyway, you could actually build your search indexes in Hadoop and push them out to the search servers. All the while, the system just keeps growing more and more complicated.

The result is complete and utter insanity (Figure 2-4).

A system with many interdependent components becomes very complex and difficult to manage and understand.
Figure 2-4. A system with many interdependent components becomes very complex and difficult to manage and understand.

How did we get to that state? How did we end up with such complexity, where everything is calling everything else and nobody understands what is going on?

It’s not that any particular decision we made along the way was bad. There is no one database or tool that can do everything that our application requires.1 We use the best tool for the job, and for an application with a variety of features that implies using a variety of tools.

Also, as a system grows, you need a way of decomposing it into smaller components in order to keep it manageable. That’s what microservices are all about (see The Unix Philosophy of Distributed Data). But, if your system becomes a tangled mess of interdependent components, that’s not manageable either.

Simply having many different storage systems is not a problem in and of itself: if they were all independent from one another, it wouldn’t be a big deal. The real trouble here is that many of them end up containing the same data, or related data, but in different form (Figure 2-5).

Denormalization, caching, indexes, and aggregations are various kinds of redundant data: keeping the same data in a different representation in order to speed up reads.
Figure 2-5. Denormalization, caching, indexes, and aggregations are various kinds of redundant data: keeping the same data in a different representation in order to speed up reads.

For example, the documents in your full-text indexes are typically also stored in a database because search indexes are not intended to be used as systems of record. The data in your caches is a duplicate of data in some database (perhaps joined with other data, or rendered into HTML, or something)—that’s the definition of a cache.

Also, denormalization is just another form of duplicating data, similar to caching—if some value is too expensive to recompute on reads, you can store that value somewhere, but now you need to also keep it up-to-date when the underlying data changes. Materialized aggregates, such as those in the analytics example in Events and Stream Processing, are again a form of redundant data.

I’m not saying that this duplication of data is bad—far from it. Caching, indexing, and other forms of redundant data are often essential for achieving good performance on reads. However, keeping the data synchronized between all these various different representations and storage systems becomes a real challenge (Figure 2-6).

The problem of data integration: keeping data systems synchronized.
Figure 2-6. The problem of data integration: keeping data systems synchronized.

For lack of a better term, I’m going to call this the problem of “data integration.” With that I really just mean making sure that the data ends up in all the right places. Whenever a piece of data changes in one place, it needs to change correspondingly in all the other places where there is a copy or derivative of that data.

So, how do we keep these different data systems synchronized? There are a few different techniques.

Dual Writes

A popular approach is called dual writes (Figure 2-7). The dual-writes technique is simple: it’s the responsibility of your application code to update data in all the appropriate places. For example, if a user submits some data to your web app, there’s some code in the web app that first writes the data to your database, then invalidates or refreshes the appropriate cache entries, then re-indexes the document in your full-text search index, and so on. (Or, maybe it does those things in parallel—that doesn’t matter for our purposes.)

With dual writes, your application code is responsible for writing data to all the appropriate places.
Figure 2-7. With dual writes, your application code is responsible for writing data to all the appropriate places.

The dual-writes approach is popular because it’s easy to build, and it more or less works at first. But I’d like to argue that it’s a really bad idea, because it has some fundamental problems. The first problem is race conditions.

Figure 2-8 shows two clients making dual writes to two datastores. Time flows from left to right, following the black arrows.

Timing diagram showing two different clients concurrently writing to the same key, using dual writes.
Figure 2-8. Timing diagram showing two different clients concurrently writing to the same key, using dual writes.

Here, the first client (teal) is setting the key X to be some value A. They first make a request to the first datastore—perhaps that’s the database, for example—and set X=A. The datastore responds by saying the write was successful. Then, the client makes a request to the second datastore—perhaps that’s the search index—and also sets X=A.

Simultaneously, another client (red) is also active. It wants to write to the same key X, but it wants to set the key to a different value B. The client proceeds in the same way: it first sends a request, X=B, to the first datastore and then sends a request, X=B, to the second datastore.

All of these writes are successful. However, look at what value is stored in each database over time (Figure 2-9).

A race condition with dual writes leads to perpetual inconsistency between two datastores.
Figure 2-9. A race condition with dual writes leads to perpetual inconsistency between two datastores.

In the first datastore, the value is first set to A by the teal client, and then set to B by the red client, so the final value is B.

In the second datastore, the requests arrive in a different order: the value is first set to B and then set to A, so the final value is A. Now, the two datastores are inconsistent with each other, and they will permanently remain inconsistent until sometime later when someone comes and overwrites X again.

The worst thing is this: you probably won’t even notice that your database and your search indexes have become inconsistent because no errors occurred. You’ll probably only realize it six months later, while you’re doing something completely different, that your database and your indexes don’t match up, and you’ll have no idea how that could have happened. This is not a problem of eventual consistency—it’s perpetual inconsistency.

That alone should be enough to put anyone off dual writes.

But wait, there’s more...

Denormalized data

Let’s look at denormalized data. Suppose, for example, that you have an application with which users can send each other messages or emails, and you have an inbox for each user. When a new message is sent, you want to do two things: add the message to the list of messages in the user’s inbox, and also increment the user’s count of unread messages (Figure 2-10).

A counter of unread messages, which needs to be kept up-to-date when a new message comes in.
Figure 2-10. A counter of unread messages, which needs to be kept up-to-date when a new message comes in.

You keep a separate counter because you display it in the user interface all the time, and it would be too slow to query the number of unread messages by scanning over the list of messages every time you need to display the number. However, this counter is denormalized information: it’s derived from the actual messages in the inbox, and whenever the messages change, you also need to update the counter accordingly.

Let’s keep this one simple: one client, one database. Think about what happens over time: first, the client inserts the new message into the recipient’s inbox. Then, the client makes a request to increment the unread counter.

However, just in that moment, something goes wrong—perhaps the database goes down, or a process crashes, or the network is interrupted, or someone unplugs the wrong network cable (Figure 2-11). Whatever the reason, the update to the unread counter fails.

One write succeeds; the other write fails. What now?
Figure 2-11. One write succeeds; the other write fails. What now?

Now, your database is inconsistent: the message has been added to the inbox, but the counter hasn’t been updated. And unless you periodically recompute all your counter values from scratch, or undo the insertion of the message, it will forever remain inconsistent. Such problems are not hypothetical—they do occur in practice.2

Of course, you could argue that this problem was solved decades ago by transactions: atomicity, the “A” in “ACID,” means that if you make several changes within one transaction, they either all happen or none happen (Figure 2-12).

Transaction atomicity means that if you make several changes, they either all happen or none happen.
Figure 2-12. Transaction atomicity means that if you make several changes, they either all happen or none happen.

The purpose of atomicity is to solve precisely this issue—if something goes wrong during your writes, you don’t need to worry about a half-finished set of changes making your data inconsistent.

The traditional approach of wrapping the two writes in a transaction works fine in databases that support it, but many of the new generation of databases (“NoSQL”) don’t, so you’re on your own.

Also, if the denormalized information is stored in a different database—for example, if you keep your emails in a database but your unread counters in Redis—you lose the ability to tie the writes together into a single transaction. If one write succeeds and the other fails, you’re going to have a difficult time clearing up the inconsistency.

Some systems support distributed transactions, based on 2-phase commit, for example.3 However, many datastores nowadays don’t support it, and even if they did, it’s not clear whether distributed transactions are a good idea in the first place.4 So, we must assume that with dual writes the application needs to deal with partial failure, which is difficult.

Making Sure Data Ends Up in the Right Places

So, back to our original question: how do we make sure that all the data ends up in all the right places (Figure 2-6)? How do we get a copy of the same data to appear in several different storage systems, and keep them all consistently synchronized as the data changes?

As we saw, dual writes isn’t the solution, because it can introduce inconsistencies due to race conditions and partial failures. Then, how can we do better?

I’m a fan of stupidly simple solutions. The great thing about simple solutions is that you have a chance of understanding them and convincing yourself that they’re correct. In this case, the simplest solution I can see is to store all your writes in a fixed order, and apply them in that fixed order to the various places they need to go (Figure 2-13).

A totally ordered, persistently stored sequence of events, also known as a log.
Figure 2-13. A totally ordered, persistently stored sequence of events, also known as a log.

If you do all your writes sequentially, without any concurrency, you have removed the potential for race conditions. Moreover, if you write down the order in which you make your writes, it becomes much easier to recover from partial failures, as I will show later.

So, the stupidly simple solution that I propose looks like this: whenever anyone wants to write some data, we append that write to the end of a sequence of records. That sequence is totally ordered, it’s append-only (we never modify existing records, only ever add new records at the end), and it’s persistent (we store it durably on disk).

Figure 2-13 shows an example of such a data structure: moving left to right, it records that we first wrote X=5, then we wrote Y=8, then we wrote X=6, and so on. That data structure has a name: we call it a log.

The Ubiquitous Log

The interesting thing about logs is that they pop up in many different areas of computing. Although it might seem like a stupidly simple idea that can’t possibly work, it actually turns out to be incredibly powerful.

When I say “logs”, the first thing you probably think of is textual application logs of the style you might get from Log4j or Syslog. Sure, that’s one kind of log, but when I talk about logs here I mean something more general. I mean any kind of data structure of totally ordered records that is append-only and persistent—any kind of append-only file.

How Logs Are Used in Practice

Throughout the rest of this chapter, I’ll run through a few examples of how logs are used in practice (Figure 2-14). It turns out that logs are already present in the databases and systems you likely use every day. When we understand how logs are used in various different systems, we’ll be in a better position to understand how they can help us solve the problem of data integration.

Four areas of computing that use logs; we will look at each of them in turn.
Figure 2-14. Four areas of computing that use logs; we will look at each of them in turn.

The first area we’ll discuss is the internals of database storage engines.

1) Database Storage Engines

Do you remember B-Trees5 from your algorithms classes (Figure 2-15)? They are a very widely used data structure for storage engines—almost all relational databases, and many non-relational databases, use them.

The upper levels of a B-Tree.
Figure 2-15. The upper levels of a B-Tree.

To summarize briefly: a B-Tree consists of pages, which are fixed-size blocks on disk, typically 4 or 8 KB in size. When you want to look up a particular key, you start with one page, which is at the root of the tree. The page contains pointers to other pages, and each pointer is tagged with a range of keys. For example, if your key is between 0 and 100, you follow the first pointer; if your key is between 100 and 300, you follow the second pointer; and so on.

The pointer takes you to another page, which further breaks down the key range into sub-ranges. Eventually you end up at the page containing the particular key for which you’re looking.

Now what happens if you need to insert a new key/value pair into a B-Tree? You have to insert it into the page whose key range contains the key you’re inserting. If there is enough spare space in that page, no problem. But, if the page is full, it needs to be split into two separate pages (Figure 2-16).

Splitting a full B-Tree page into two sibling pages (red outline). Page pointers in the parent (black outline, red fill) need to be updated, too.
Figure 2-16. Splitting a full B-Tree page into two sibling pages (red outline). Page pointers in the parent (black outline, red fill) need to be updated, too.

When you split a page, you need to write at least three pages to disk: the two pages that are the result of the split, and the parent page (to update the pointers to the split pages). However, these pages might be stored at various different locations on disk.

This raises the question: what happens if the database crashes (or the power goes out, or something else goes wrong) halfway through the operation, after only some of those pages have been written to disk? In that case, you have the old (pre-split) data in some pages, and the new (post-split) data in other pages, and that’s bad news. You’re most likely going to end up with dangling pointers or pages to which nobody is pointing. In other words, you’ve got a corrupted index.

Now, storage engines have been doing this for decades, so how do they make B-Trees reliable? The answer is that they use a write-ahead log.6

Write-ahead log

A write-ahead log (WAL) is a particular kind of log. Whenever the storage engine wants to make any kind of change to the B-Tree, it must first write the change that it intends to make to the WAL, which is an append-only file on disk. Only after the change has been written to the WAL, and durably written to disk, is the storage engine allowed to modify the actual B-Tree pages on disk.

This makes the B-Tree reliable: if the database crashes while data was being appended to the WAL, no problem, because the B-Tree hasn’t been touched yet. And if it crashes while the B-Tree is being modified, no problem, because the WAL contains the information about what changes were about to happen. When the database comes back up after the crash, it can use the WAL to repair the B-Tree and get it back into a consistent state.

This has been our first example to show that logs are a really neat idea.

Log-structured storage

Storage engines didn’t stop with B-Trees. Some clever folks realized that if we’re writing everything to a log anyway, we might as well use the log as the primary storage medium. This is known as log-structured storage,7 which is used in HBase8 and Cassandra,9 and a variant appears in Riak.10

In log-structured storage, writes are appended to log segments, and periodically merged/compacted in the background.
Figure 2-17. In log-structured storage, writes are appended to log segments, and periodically merged/compacted in the background.

In log-structured storage we don’t always keep appending to the same file, because it would become too large and it would be too difficult to find the key we’re looking for. Instead, the log is broken into segments, and from time to time the storage engine merges segments and discards duplicate keys, as illustrated in Figure 2-17. Segments can also be internally sorted by key, which can make it easier to find the key you’re looking for and also simplifies merging. However, these segments are still logs: they are only written sequentially, and they are immutable after they have been written.

As you can see, logs play an important role in storage engines.

2) Database Replication

Let’s move on to the second example where logs are used: database replication.

Replication is a feature that you find in many databases: it allows you to keep a copy of the same data on several different nodes. That can be useful for spreading the load, and it also means that if one node dies, you can failover to another one.

There are a few different ways of implementing replication, but a common choice is to designate one node as the leader (also known as primary or master), and the other replicas as followers (also known as standby or slave) (Figure 2-18). I don’t like the master/slave terminology, so I’m going to stick with leader/follower.

In leader-based replication, the leader processes writes, and uses a replication log to tell followers about writes.
Figure 2-18. In leader-based replication, the leader processes writes, and uses a replication log to tell followers about writes.

Whenever a client wants to write something to the database, it needs to talk to the leader. Read-only clients can use either the leader or the follower (although the follower is typically asynchronous, so it might have slightly out-of-date information if the latest writes haven’t yet been applied).

When clients write data to the leader, how does that data get to the followers? Big surprise: they use a log! They use a replication log, which may in fact be the same as the write-ahead log (this is what Postgres does, for example), or it may be a separate replication log (MySQL does this).

The replication log works as follows: whenever some data is written to the leader, it is also appended to the replication log. The followers read that log in the order in which it was written, and apply each of the writes to their own copy of the data. As a result, each follower processes the same writes in the same order as the leader, and thus it ends up with a copy of the same data (Figure 2-19).

The follower applies writes in the order in which they appear in the replication log.
Figure 2-19. The follower applies writes in the order in which they appear in the replication log.

Even if the writes happen concurrently on the leader, the log still contains the writes in a total order. Thus, the log actually removes the concurrency from the writes—it “squeezes all the non-determinism out of the stream of writes,”11 and on the follower there’s no doubt about the order in which the writes happened.

So, what about the dual-writes race condition we discussed earlier (Figure 2-9)?

This race condition cannot happen with leader-based replication, because clients don’t write directly to the followers. The only writes processed by followers are the ones they receive from the replication log. And because the log fixes the order of those writes, there is no ambiguity regarding which one happened first.

Moreover, all followers are guaranteed to see the log in the same order, so if two overwrites occur in quick succession, that’s no problem: all followers apply writes in that same order, and as a result they all end up in the same final state.

But, what about the second problem with dual writes that we discussed earlier, namely that one write could succeed and another could fail (Figure 2-11)? This could still happen: a follower could successfully process the first write from a transaction, but fail to process the second write from the transaction (perhaps because the disk is full or the network is interrupted, as illustrated in Figure 2-20).

A network interruption causes the follower to stop applying writes from the log, but it can easily resume replication when the network is repaired.
Figure 2-20. A network interruption causes the follower to stop applying writes from the log, but it can easily resume replication when the network is repaired.

If the network between the leader and the follower is interrupted, the replication log cannot flow from the leader to the follower. This could lead to an inconsistent replica, as we discussed previously. How does database replication recover from such errors and avoid becoming inconsistent?

Notice that the log has a very nice property: because the leader only ever appends to it, we can give each record in the log a sequential number that is always increasing (which we might call log position or offset). Furthermore, followers only process it in sequential order (from left to right; i.e., in order of increasing log position), so we can describe a follower’s current state with a single number: the position of the latest record it has processed.

When you know a follower’s current position in the log, you can be sure that all the prior records in the log have already been processed, and none of the subsequent records have been processed.

This is great, because it makes error recovery quite simple. If a follower becomes disconnected from the leader, or it crashes, the follower just needs to store the log position up to which it has processed the replication log. When the follower recovers, it reconnects to the leader, and asks for the replication log beginning from the last offset that it previously processed. Thus, the follower can catch up on all the writes that it missed while it was disconnected, without losing any data or receiving duplicates.

The fact that the log is totally ordered makes this recovery much simpler than if you had to keep track of every write individually.

3) Distributed Consensus

The third example of logs in practice is in a different area: distributed consensus.

Achieving consensus is one of the well-known and often-discussed problems in distributed systems. It is important, but it is also surprisingly difficult to solve.

An example of consensus in the real world would be trying to get a group of friends to agree on where to go for lunch (Figure 2-21). This is a distinctive feature of a sophisticated civilization12 and can be a surprisingly difficult problem, especially if some of your friends are easily distractible (so they don’t always respond to your questions) or if they are fussy eaters.

Consensus is useful if you don’t want to stay hungry, and don’t want to lose data.
Figure 2-21. Consensus is useful if you don’t want to stay hungry, and don’t want to lose data.

Closer to our usual domain of computers, an example of where you might want consensus is in a distributed database system: for instance, you might require all your database nodes to agree on which node is the leader for a particular partition (shard) of the database.

It’s pretty important that they all agree on whom the leader is: if two different nodes both think they are leader, they might both accept writes from clients. Later, when one of them finds out that it was wrong and it wasn’t leader after all, the writes that it accepted might be lost. This situation is known as split brain, and it can cause nasty data loss.13

There are a few different algorithms for implementing consensus. Paxos14 is perhaps the most well-known, but there are also Zab15 (used by ZooKeeper16), Raft,17 and others.18 These algorithms are quite tricky and have some non-obvious subtleties.19 In this report, I will very briefly sketch one part of the Raft algorithm (Figure 2-22).

Raft consensus protocol: a value X=8 is proposed, and nodes vote on it.
Figure 2-22. Raft consensus protocol: a value X=8 is proposed, and nodes vote on it.

In a consensus system, there are a number of nodes (three in Figure 2-22) which are in charge of agreeing what the value of a particular variable should be. A client proposes a value, for example X=8 (which might mean that node X is the leader for partition 8), by sending it to one of the Raft nodes. That node collects votes from the other nodes. If a majority of nodes agree that the value should be X=8, the first node is allowed to commit the value.

When that value is committed, what happens? In Raft, that value is appended to the end of a log. Thus, what Raft is doing is not just getting the nodes to agree on one particular value, it’s actually building up a log of values that have been agreed over time. All Raft nodes are guaranteed to have exactly the same sequence of committed values in their log, and clients can consume this log (Figure 2-23).

The Raft protocol provides consensus not just for a single value, but a log of agreed values.
Figure 2-23. The Raft protocol provides consensus not just for a single value, but a log of agreed values.

After the newly agreed value has been committed, appended to the log, and replicated to the other nodes, the client that originally proposed the value X=8 is sent a response saying that the system succeeded in reaching consensus, and that the proposed value is now part of the Raft log.

(As a theoretical aside, the problems of consensus and atomic broadcast—that is, creating a log with exactly-once delivery—are reducible to each other.20 This means Raft’s use of a log is not just a convenient implementation detail, but also reflects a fundamental property of the consensus problem it is solving.)

4) Kafka

We’ve seen that logs are a recurring theme in surprisingly many areas of computing: storage engines, database replication, and consensus. As the fourth and final example, we’ll cover Apache Kafka—another system that is built around the idea of logs. The interesting thing about Kafka is that it doesn’t hide the log from you. Rather than treating the log as an implementation detail, Kafka exposes it to you so that you can build applications around it.

Kafka is typically used as a message broker for publish-subscribe event streams.
Figure 2-24. Kafka is typically used as a message broker for publish-subscribe event streams.

The typical use of Kafka is as a message broker (message queue), as illustrated in Figure 2-24—so it is somewhat comparable to AMQP (e.g. RabbitMQ), JMS (e.g. ActiveMQ or HornetQ), and other messaging systems. Kafka has two types of clients: producers or publishers (which send messages to Kafka) and consumers or subscribers (which read the streams of messages in Kafka).

For example, producers can be your web servers or mobile apps, and the types of messages they send to Kafka might be logging information—that is, events that indicate which user clicked which link at which point in time. The consumers are various processes that need to find out about stuff that is happening; for example, to generate analytics, to monitor for unusual activity, to generate personalized recommendations for users, and so on.

The thing that makes Kafka interestingly different from other message brokers is that it is structured as a log. In fact, it somewhat resembles a log file in the sense of Log4j or Syslog: when a producer sends a message to Kafka, it is literally appended to the end of a file on disk. Thus, Kafka’s internal data files are just a sequence of log messages, as illustrated in Figure 2-25. (While application log files typically use a newline character to delimit records, Kafka uses a binary format with checksums and a bit of useful metadata. But the principle is very similar.)

A message in Kafka is appended as a log record to the end of a file.
Figure 2-25. A message in Kafka is appended as a log record to the end of a file.

If Kafka wrote everything sequentially to a single file, its throughput would be limited to the sequential write throughput of a disk—which is perhaps tens of megabytes per second, but that’s not enough. In order to make Kafka scalable, a stream of messages—a topic—is split into partitions (Figure 2-26). Each partition is a log, that is, a totally ordered sequence of messages. However, different partitions are completely independent from one another, so there is no ordering guarantee across different partitions. This allows different partitions to be handled on different servers, and so Kafka can scale horizontally.

Data streams in Kafka are split into partitions.
Figure 2-26. Data streams in Kafka are split into partitions.

Each partition is stored on disk and replicated across several machines, so it is durable and can tolerate machine failure without data loss. Producing and consuming logs is very similar to what we saw previously in the context of database replication:

  • Every message that is sent to Kafka is appended to the end of a partition. That is the only write operation supported by Kafka: appending to the end of a log. It’s not possible to modify past messages.

  • Within each partition, messages have a monotonically increasing offset (log position). To consume messages from Kafka, a client reads messages sequentially, beginning from a particular offset, as indicated by the violet arrow in Figure 2-26. That offset is managed by the consumer.

We said previously that Kafka is a message broker somewhat like AMQP or JMS messaging systems. However, the similarity is superficial—although they all allow messages to be relayed from producers to consumers, the implementation under the hood is very different.

The biggest difference is in how the system ensures that consumers process every message, without dropping messages in case of failure. With AMQP and JMS-based queues, the consumer acknowledges every individual message after it has been successfully processed. The broker keeps track of the acknowledgement status of every message; if a consumer dies without acknowledging a message, the broker retries delivery, as shown in Figure 2-27.

AMQP and JMS message brokers use per-message acknowledgements to keep track of which messages were successfully consumed, and redeliver any messages on which the consumer failed.
Figure 2-27. AMQP and JMS message brokers use per-message acknowledgements to keep track of which messages were successfully consumed, and redeliver any messages on which the consumer failed.

A consequence of this redelivery behavior is that messages can be delivered out-of-order: a consumer does not necessarily see messages in exactly the same order as the producer sent the messages. AMQP and JMS are designed for situations in which the exact ordering of messages is not important, and so this redelivery behavior is desirable.

However, in situations like database replication, the ordering of messages is critical. For example, in Figure 2-13 it matters that X is first set to 6 and then to 7, so the final value is 7. If the replication system were allowed to reorder messages, they would no longer mean the same thing.

Kafka maintains a fixed ordering of messages within one partition, and always delivers those messages in the same order. For that reason, Kafka doesn’t need to keep track of acknowledgements for every single message: instead, it is sufficient to keep track of the latest message offset that a consumer has processed in each partition. Because the order of messages is fixed, we know that all messages prior to the current offset have been processed, and all messages after the current offset have not yet been processed.

Kafka’s model has the advantage that it can be used for database-like applications where the order of messages is important. On the other hand, the consumer offset tracking means that a consumer must process messages sequentially on a single thread. Thus, we can distinguish two different families of messaging systems (Figure 2-28).

AMQP and JMS are good for job queues; Kafka is good for event logs.
Figure 2-28. AMQP and JMS are good for job queues; Kafka is good for event logs.

On the one hand, message brokers that keep track of acknowledgements for every individual message are well suited for job queues, where one service needs to ask another service to perform some task (e.g. sending an email, charging a credit card) on its behalf. For these situations, the ordering of messages is not important, but it is important to be able to easily use a pool of threads to process jobs in parallel and retry any failed jobs.

On the other hand, Kafka shines when it comes to logging events (e.g. the fact that a user viewed a web page, or that a customer purchased some product). When subscribers process these events, it is normally a very lightweight operation (such as storing the event in a database, or incrementing some counters), so it is feasible to process all of the events in one Kafka partition on a single thread. For parallelism—using multiple threads on multiple machines—Kafka consumers can simply spread the data across multiple partitions.

Different tools are good for different purposes, and so it is perfectly reasonable to use both Kafka and a JMS or AMQP messaging system in the same application.

Solving the Data Integration Problem

Let’s return to the data integration problem from the beginning of this chapter. Suppose that you have a tangle of different datastores, caches, and indexes that need to be synchronized with each other (Figure 2-3).

Now that we have seen a bunch of examples of practical applications of logs, can we use what we’ve learned to figure out how to solve data integration in a better way?

Stop doing dual writes—it leads to inconsistent data.
Figure 2-29. Stop doing dual writes—it leads to inconsistent data.

First, we need to stop doing dual writes (Figure 2-29). As discussed, it’s probably going to make your data inconsistent, unless you have very carefully thought about the potential race conditions and partial failures that can occur in your application.

Note this inconsistency isn’t just a kind of “eventual consistency” that is often quoted in asynchronous systems. What I’m talking about here is permanent inconsistency—if you’ve written two different values to two different datastores, due to a race condition or partial failure, that difference won’t simply resolve itself. You’d have to take explicit actions to search for data mismatches and resolve them (which is difficult because the data is constantly changing).

What I propose is this: rather than having the application write directly to the various datastores, the application only appends the data to a log (such as Kafka). All the different representations of this data—your databases, your caches,21 your indexes—are constructed by consuming the log in sequential order (Figure 2-30).

Have your application only append data to a log, and all databases, indexes, and caches constructed by reading sequentially from the log.
Figure 2-30. Have your application only append data to a log, and all databases, indexes, and caches constructed by reading sequentially from the log.

Each datastore that needs to be kept in sync is an independent consumer of the log. Every consumer takes the data in the log, one record at a time, and writes it to its own datastore. The log guarantees that the consumers all see the records in the same order; by applying the writes in the same order, the problem of race conditions is gone. This looks very much like the database replication we saw earlier!

However, what about the problem of partial failure (Figure 2-11)? What if one of your stores has a problem and can’t accept writes for a while?

That problem is also solved by the log: each consumer keeps track of the log position up to which it has processed the log. When the error in the datastore-writing consumer is resolved, it can resume processing records in the log from its last position, and catch up on everything that happened. That way, a datastore won’t lose any updates, even if it’s offline for a while. This is great for decoupling parts of your system: even if there is a problem in one datastore, the rest of the system remains unaffected.

You can even use the log to bootstrap a completely new cache or index when required. We discuss how this works in Integrating Databases and Kafka with Change Data Capture.

A log is such a stupidly simple idea: put your writes in a total order and show them to all consumers in the same order. As we saw, this simple idea turns out to be very powerful.

Transactions and Integrity Constraints

Just one problem remains: the consumers of the log all update their datastores asynchronously, so they are eventually consistent. This is not sufficient if you want to guarantee that your data meets certain constraints, for example that each username in your database must be unique, or that a user cannot spend more money than their account balance.

There are a few approaches for solving this issue. One is called change data capture, and we will discuss it in Integrating Databases and Kafka with Change Data Capture. Another, fairly simple approach is illustrated in Figure 2-31.

Validating that usernames are unique, while still making all writes through a log.
Figure 2-31. Validating that usernames are unique, while still making all writes through a log.

Suppose that you want to ensure that usernames are unique. You can check whether a username is already taken when a user tries to register, but that still allows the race condition of two people trying to claim the same username at just the same time. Traditionally, in a relational database, you’d use transactions and a unique constraint on the username column to prevent this.

When using an architecture in which you can only append to a log, we can solve this problem as a two-step process. First, when a user wants to claim a username, you send an event to a “username claims” stream. This event doesn’t yet guarantee uniqueness; it merely establishes an ordering of claims. (If you’re using a partitioned stream like a Kafka topic, you need to ensure that all claims to the same username go to the same partition. You can do this by using the username as the Kafka partitioning key.)

A stream processor consumes this stream, checks a database for uniqueness, writes the new username to the database, and then writes the outcome (“successfully registered” or “username already taken”) to a separate “registrations” event stream. This validation processor can handle one event at a time, in a single-threaded fashion. To get more parallelism, use more Kafka partitions, each of which is processed independently—this approach scales to millions of events per second. As the messages in each partition are processed serially, there are no concurrency problems, and conflicting registrations are sure to be found.

How does the user find out whether their username registration was successful? One option is that the server that submitted the claim can consume the “registrations” stream, and wait for the outcome of the uniqueness check to be reported. With a fast stream processor like Samza, this should only take a few milliseconds.

If conflicts are sufficiently rare, it might even be acceptable to tell the user “ok” as soon as the claim has been submitted. In the rare case that their registration failed, you can assign them a temporary random username, send them an email notification to apologize, and ask them to choose a new username.

The same approach can be used to make sure that an account balance does not go negative. For more complex situations, you can layer a transaction protocol on top of Kafka, such as the Tango project from Microsoft Research.22

Conclusion: Use Logs to Make Your Infrastructure Solid

To close this chapter, I’d like to leave you with a thought experiment (Figure 2-32).

What if the only way to modify data in your service was to append an event to a log?
Figure 2-32. What if the only way to modify data in your service was to append an event to a log?

Most APIs we work with have endpoints for both reading and writing. In RESTful terms, GET is for reading (i.e., side-effect-free operations) and POST, PUT, and DELETE are for writing. These endpoints for writing are ok if you only have one system you’re writing to, but if you have more than one such system, you quickly end up with dual writes and all their aforementioned problems.

Imagine a system with an API in which you eliminate all the endpoints for writing. Imagine that you keep all the GET requests but prohibit any POST, PUT, or DELETE. Instead, the only way you can send writes into the system is by appending them to a log, and having the system consume that log. (The log must be outside of the system to accommodate several consumers for the same log.)

For example, imagine a variant of Elasticsearch in which you cannot write documents through the REST API, but only write documents by sending them to Kafka. Elasticsearch would internally include a Kafka consumer that takes documents and adds them to the index. This would actually simplify some of the internals of Elasticsearch because it would no longer need to worry about concurrency control, and replication would be simpler to implement. And it would sit neatly alongside other tools that might be consuming the same log.

In this world view, the log is the authoritative source of what has happened, and consumers of the log present that information in various different ways (Figure 2-33). Similar ideas appear at many different levels of the stack: from wear leveling on SSDs to database storage engines and file systems.23 We expand on this idea in Turning the Database Inside Out.

The idea of using the log as source of truth appears in various different places.Pat Helland: “Immutability Changes Everything,” at 7th Biennial Conference on Innovative Data Systems Research (CIDR), January 2015.
Figure 2-33. The idea of using the log as source of truth appears in various different places.

This is in fact very similar to the Event Sourcing approach we saw in Events and Stream Processing, presented slightly differently. The lesson from this chapter is simple: to make an event-sourced approach work, you need to fix the ordering of the events using a log, because reordering events might lead to a different outcome (e.g., a different person getting the desired username).

In this chapter, we saw that logs are a good way of solving the data integration problem: ensuring that the same data ends up in several different places, without introducing inconsistencies. Kafka is a good implementation of a log. In the next chapter we will look into the issue of integrating Kafka with your existing databases, so that you can begin integrating them in a log-centric architecture.

Further Reading

Many of the ideas in this chapter were previously laid out by Jay Kreps in his blog post “The Log.”24 An edited version was published as an ebook by O’Reilly Media.25

Confluent’s vision of a Kafka-based stream data platform for data integration closely matches the approach we discussed in this chapter, as described in two blog posts by Jay Kreps.26,27

1Michael Stonebraker and Uğur Çetintemel: “‘One Size Fits All’: An Idea Whose Time Has Come and Gone,” at 21st International Conference on Data Engineering (ICDE), April 2005.

2Martin Kleppmann: “Eventual consistency? More like perpetual inconsistency,” twitter.com, 17 November 2014.

3Henry Robinson: “Consensus Protocols: Two-Phase Commit,” the-paper-trail.org, 27 November 2008.

4Pat Helland: “Life beyond Distributed Transactions: an Apostate’s Opinion,” at 3rd Biennial Conference on Innovative Data Systems Research (CIDR), pages 132–141, January 2007.

5Goetz Graefe: “Modern B-Tree Techniques,” Foundations and Trends in Databases, volume 3, number 4, pages 203–402, August 2011. doi:10.1561/1900000028

6C Mohan, Don Haderle, Bruce G Lindsay, Hamid Pirahesh, and Peter Schwarz: “ARIES: A Transaction Recovery Method Supporting Fine-Granularity Locking and Partial Rollbacks Using Write-Ahead Logging,” ACM Transactions on Database Systems (TODS), volume 17, number 1, pages 94–162, March 1992. doi:10.1145/128765.128770

7Patrick O’Neil, Edward Cheng, Dieter Gawlick, and Elizabeth O’Neil: “The Log-Structured Merge-Tree (LSM-Tree),” Acta Informatica, volume 33, number 4, pages 351–385, June 1996. doi:10.1007/s002360050048

8Matteo Bertozzi: “Apache HBase I/O – HFile,” blog.cloudera.com, 29 June 2012.

9Jonathan Hui: “How Cassandra Read, Persists Data and Maintain Consistency,” jonathanhui.com.

10Justin Sheehy and David Smith: “Bitcask: A Log-Structured Hash Table for Fast Key/Value Data,” Basho Technologies, April 2010.

11Jay Kreps: “The Log: What every software engineer should know about real-time data’s unifying abstraction,” engineering.linkedin.com, 16 December 2013.

12Douglas Adams: The Restaurant at the End of the Universe. Pan Books, 1980. ISBN: 9780330262132

13Kyle Kingsbury: “Call me maybe: MongoDB,” aphyr.com, 18 May 2013.

14Tushar Deepak Chandra, Robert Griesemer, and Joshua Redstone: “Paxos Made Live - An Engineering Perspective,” at 26th ACM Symposium on Principles of Distributed Computing (PODC), June 2007.

15Flavio P Junqueira, Benjamin C Reed, and Marco Serafini: “Zab: High-performance broadcast for primary-backup systems,” at 41st IEEE International Conference on Dependable Systems and Networks (DSN), pages 245–256, June 2011. doi:10.1109/DSN.2011.5958223

16Apache ZooKeeper,” Apache Software Foundation, zookeeper.apache.org.

17Diego Ongaro and John K Ousterhout: “In Search of an Understandable Consensus Algorithm (Extended Version),” at USENIX Annual Technical Conference (USENIX ATC), June 2014.

18Robbert van Renesse, Nicolas Schiper, and Fred B Schneider: “Vive La Différence: Paxos vs. Viewstamped Replication vs. Zab,” IEEE Transactions on Dependable and Secure Computing, volume 12, number 4, pages 472–484, September 2014. doi:10.1109/TDSC.2014.2355848

19Heidi Howard, Malte Schwarzkopf, Anil Madhavapeddy, and Jon Crowcroft: “Raft Refloated: Do We Have Consensus?,” ACM SIGOPS Operating Systems Review, volume 49, number 1, pages 12–21, January 2015. doi:10.1145/2723872.2723876

20Tushar Deepak Chandra and Sam Toueg: “Unreliable Failure Detectors for Reliable Distributed Systems,” Journal of the ACM, volume 43, number 2, pages 225–267, March 1996. doi:10.1145/226643.226647

21Jason Sobel: “Scaling Out,” facebook.com, 20 August 2008.

22Mahesh Balakrishnan, Dahlia Malkhi, Ted Wobber, et al.: “Tango: Distributed Data Structures over a Shared Log,” at 24th ACM Symposium on Operating Systems Principles (SOSP), pages 325–340, November 2013. doi:10.1145/2517349.2522732

23Pat Helland: “Immutability Changes Everything,” at 7th Biennial Conference on Innovative Data Systems Research (CIDR), January 2015.

24Jay Kreps: “The Log: What every software engineer should know about real-time data’s unifying abstraction,” engineering.linkedin.com, 16 December 2013.

25Jay Kreps: I Heart Logs. O’Reilly Media, September 2014. ISBN: 978-1-4919-0932-4

26Jay Kreps: “Putting Apache Kafka to use: A practical guide to building a stream data platform (Part 1),” confluent.io, 24 February 2015.

27Jay Kreps: “Putting Apache Kafka to use: A practical guide to building a stream data platform (Part 2),” confluent.io, 24 February 2015.

Article image: Flowing stream. (source: Getty Images / O'Reilly).