Integrating Databases and Kafka with Change Data Capture
The approach we’ve discussed in the last two chapters has been a radical departure from the way databases are traditionally used: away from transactions that query and update a database in place, and toward an ordered log of immutable events. We saw that this new approach offers many benefits, such as better integration of heterogeneous data systems, better scalability, reliability, and performance.
However, fundamentally changing the way we store and process data is a big, scary step. In reality, most of us have existing systems that we need to keep running and for which a rewrite is not an option. In this chapter, we will discuss a solution for those situations where you already have an existing database as system of record.
Introducing Change Data Capture
As discussed in Using Logs to Build a Solid Data Infrastructure, if you have data in a database, it’s likely that you also need a copy of that data in other places: perhaps in a full-text index (for keyword search), in Hadoop or a data warehouse (for business analytics and offline processing such as recommendation systems), and perhaps in various other caches or indexes (to make reads faster and to take load off the database).
A log is still a great way of implementing this data integration. And if the data source is an existing database, we can simply extract that log from your database. This idea is called change data capture (CDC), illustrated in Figure 3-1.
Whereas in Figure 2-30 the application appended events directly to the log, the web app in Figure 3-1 uses the database for reading and writing. If it’s a relational database, the application may insert, update, and delete rows arbitrarily, as usual.
The question is: how do we get the data in the database into a log without forcing the web app to change its behavior?
To begin, observe this: most databases have the ability to export a consistent snapshot of the entire database contents (e.g., for backup purposes). For example, MySQL has
mysqldump, and PostgreSQL has
pg_dump. If you want a copy of your database in a search index, you could take such a snapshot and then import it into your search server.
However, most databases never stand still: there is always someone writing to them. This means the snapshot is already outdated before you’ve even finished copying the data. But, maybe you can cope with slightly stale data; in that case you could take snapshots periodically (e.g., once a day) and update the search index with each new snapshot.
To get more up-to-date information in the search index, you could take snapshots more frequently, although this quickly becomes inefficient: on a large database, it can take hours to make a copy of the entire database and re-index it.
Typically, only a small part of the database changes between one snapshot and the next. What if you could process only a “diff” of what changed in the database since the last snapshot? That would also be a smaller amount of data, so you could take such diffs more frequently. What if you could take such a “diff” every minute? Every second? 100 times a second?
Database = Log of Changes
When you take it to the extreme, the changes to a database become a stream of events. Every time someone writes to the database, that is an event in the stream. If you apply those events to a database in exactly the same order as the original database committed them, you end up with an exact copy of the database.
If you think about it, this is exactly how database replication works (see Using Logs to Build a Solid Data Infrastructure, Figure 2-19). The leader database produces a replication log—that is, a stream of events that tells the followers what changes they need to make to their copy of the data in order to stay up-to-date with the leader. By continually applying this stream, they maintain a copy of the leader’s data.
We want to do the same, except that the follower isn’t another instance of the same database software, but a different technology (a search index, cache, data warehouse, etc). Although replication is a common feature in databases, most databases unfortunately consider the replication log to be an implementation detail, not a public API. This means it is often difficult to get access to the replication events in a format that an application can use.
Oracle GoldenGate,1 the MySQL binlog,2 the MongoDB oplog,3 or the CouchDB changes feed4 do something like this, but they’re not exactly easy to use correctly. More recently, a few databases such as RethinkDB5 or Firebase6 have oriented themselves toward real-time change streams.
Change Data Capture (CDC) effectively means replicating data from one storage technology to another. To make it work, we need to extract two things from the source database, in an application-readable data format:
A consistent snapshot of the entire database contents at one point in time
A real-time stream of changes from that point onward—every insert, update, or delete needs to be represented in a way that we can apply it to a copy of the data and ensure a consistent outcome.
At some companies, CDC has become a key building block for applications—for example, LinkedIn built Databus7 and Facebook built Wormhole8 for this purpose. Kafka 0.9 includes an API called Kafka Connect,9 designed to connect Kafka to other systems, such as databases. A Kafka connector can use CDC to bring a snapshot and stream of changes from a database into Kafka, from where it can be used for various applications. Kafka Connect draws from the lessons learnt from Databus and similar systems.
If you have a change stream that goes all the way back to the first ever write to the database, you don’t need the snapshot, because the change stream contains the entire contents of the database already. However, most databases delete transaction logs after a while, to avoid running out of disk space. In this case, you need both a one-time snapshot (at the time when you start consuming the change stream) and the change stream (from that point onward) in order to reconstruct the database contents.
Implementing the Snapshot and the Change Stream
Figure 3-2 shows one good approach for getting both the snapshot and the change stream. Users are continually reading and writing to the database, and we want to allow the change capture process to begin without interfering with this (i.e., without downtime).
Many databases can take a point-in-time snapshot of the database without locking the database for writes (this is implemented by using the MVCC mechanism in PostgreSQL, MySQL/InnoDB, and Oracle). That is, the snapshot sees the entire database in a consistent state, as it existed at one point in time, even though parts of it may be modified by other transactions while the snapshot is running. This is a great feature, because it would be very difficult to reason about a copy of the database in which some parts are older and other parts are newer.
The change stream needs to be coordinated with this snapshot so that it contains exactly those data changes that occurred since the snapshot was taken, no more and no less. Achieving this is more difficult, and depends on the particular database system you’re using. In the next section, we will discuss a particular implementation for PostgreSQL which does this.
With Kafka and Kafka Connect, we can actually unify the snapshot and the change stream into a single event log. The snapshot is translated into a log by generating an “insert row” event for every row in the database snapshot. This is then followed by the change stream, which consists of “insert row,” “update row,” or “delete row” events. Later in this chapter we will discuss how and why this works.
While the snapshot is being captured (which can take hours on a large database, as previously noted), clients continue writing to the database, as illustrated in Figure 3-2. The change events from these writes must be queued up, and sent to the log when the snapshot is complete. Finally, when the backlog is cleared, the change capture system can just pick up data change events, as and when they happen, and send them to the change log.
The resulting change log has all the good properties that we discussed in Using Logs to Build a Solid Data Infrastructure, without changing the way the application uses the database. We just need to figure out how to make the change data capture work. That’s what the rest of this chapter is about.
Bottled Water: Change Data Capture with PostgreSQL and Kafka
There are many databases to choose from, and the right choice of database depends on the situation. In this section, we’ll talk specifically about PostgreSQL10 (or Postgres for short), an open source relational database that is surprisingly full-featured.11 However, you can draw lessons from the general approach described here and apply them to any other database.
Until recently, if you wanted to get a stream of changes from Postgres, you had to use triggers. This is possible, but it is fiddly, requires schema changes, and doesn’t perform very well. However, Postgres 9.4 (released in December 2014) introduced a new feature that changes everything: logical decoding.12
With logical decoding, change data capture for Postgres suddenly becomes much more feasible. So, when this feature was released, I set out to build a change data capture tool for Postgres that would take advantage of the new facilities. Confluent sponsored me to work on it (thank you, Confluent!), and we have released an alpha version of this tool as open source. It is called Bottled Water13 (Figure 3-3).
At the time of writing, Bottled Water is a standalone tool that copies a consistent snapshot and a stream of changes from Postgres to Kafka. There are plans to integrate it with the Kafka Connect framework for easier deployment.
The name “logical decoding” comes from the fact that this feature decodes the database’s write-ahead log (WAL). We encountered the WAL previously in Using Logs to Build a Solid Data Infrastructure (Figure 2-16), in the context of making B-Trees robust against crashes. Besides crash recovery, Postgres also uses the WAL for replication. Follower nodes continuously apply the changes from the WAL to their own copy of the database, as if they were constantly recovering from a crash.
This is a good way of implementing replication, but it has a downside: the log records in the WAL are very low-level, describing byte modifications to Postgres’ internal data structures. It’s not feasible for an application to decode the WAL by itself.
Enter logical decoding: this feature parses the WAL, and gives us access to row-level change events. Every time a row in a table is inserted, updated, or deleted, that’s an event. Those events are grouped by transaction, and appear in the order in which they were committed to the database. Aborted/rolled-back transactions do not appear in the stream. Thus, if you apply the change events in the same order, you end up with an exact, transactionally consistent copy of the database—precisely what we want for change capture.
The Postgres logical decoding is well-designed: it even creates a consistent snapshot that is coordinated with the change stream. You can use this snapshot to make a point-in-time copy of the entire database (without locking—you can continue writing to the database while the copy is being made) and then use the change stream to get all writes that happened since the snapshot.
Bottled Water uses these features to extract the entire contents of a database, and encode it using Avro,14 an efficient binary data format. The encoded data is sent to Kafka, where you can use it in many ways: index it in Elasticsearch, use it to populate a cache, process it with Kafka Streams or a stream processing framework, load it into HDFS with the Kafka HDFS connector,15 and so on. The nice thing is that you only need to get the data into Kafka once, and then you can have arbitrarily many subscribers, without putting any additional load on Postgres.
Kafka is best known for transporting high-volume activity events, such as web server logs, and user click events. Such events are typically retained for a certain period of time (e.g., a few days) and then discarded or archived to long-term storage. Is Kafka really a good fit for database change events? We don’t want database data to be discarded!
In fact, Kafka is a perfect fit—the key is Kafka’s log compaction feature, which was designed precisely for this purpose (Figure 3-4).
If you enable log compaction, there is no time-based expiry of data. Instead, every message has a key, and Kafka retains the latest message for a given key indefinitely. Earlier messages for a given key are eventually garbage-collected. This is quite similar to new values overwriting old values in a key-value store and is essentially the same technique as log-structured storage engines use (Figure 2-17).
Bottled Water identifies the primary key (or replica identity16) of each table in Postgres and uses that as the key of the messages sent to Kafka. The value of the message depends on the kind of event (Figure 3-5):
For inserts and updates, the message value contains all of the row’s fields, encoded as Avro.
For deletes, the message value is set to null. This causes Kafka to remove the message during log compaction, so its disk space is freed up.
Each table in Postgres is sent to a separate topic in Kafka. It wouldn’t necessarily have to be that way, but this approach makes log compaction work best: in SQL, a primary key uniquely identifies a row in a table, and in Kafka, a message key defines the unit of log compaction in a topic. (Tables with no primary key or replica identity are currently not well supported by logical decoding; this will hopefully be addressed in future versions of Postgres.)
The great thing about log compaction is that it blurs the distinction between the initial snapshot of the database and the ongoing change stream. Bottled Water writes the initial snapshot to Kafka by turning every single row in the database into a message, keyed by primary key, and sending them all to the Kafka brokers. When the snapshot is done, every row that is inserted, updated, or deleted similarly turns into a message.
If a row is frequently updated, there will be many messages with the same key (because each update turns into a message). Fortunately, Kafka’s log compaction will sort this out and garbage-collect the old values so that we don’t waste disk space. On the other hand, if a row is never updated or deleted, it just stays unchanged in Kafka forever—it is never garbage-collected.
This means that with log compaction, every row that exists in the database also exists in Kafka—it is only removed from Kafka after it is overwritten or deleted in the database. In other words, the Kafka topic contains a complete copy of the entire database (Figure 3-6).
Having the full database dump and the real-time stream in the same system (Kafka) is tremendously powerful because it allows you to bootstrap new consumers by loading their contents from the log in Kafka.
For example, suppose that you’re feeding a database into Kafka by using Bottled Water and you currently have a search index that you’re maintaining by consuming that Kafka topic. Now suppose that you’re working on a new application feature for which you need to support searching on a new field that you are currently not indexing.
In a traditional setup, you would need to somehow go through all of your documents and re-index them with the new field. Doing this at the same time as processing live updates is dangerous, because you might end up overwriting new data with older data.
If you have a full database dump in a log-compacted Kafka topic, this is no problem. You just create a new, completely empty index, and start your Kafka consumer from the beginning of the topic (also known as “offset 0”), as shown in Figure 3-7.
Your consumer then gradually works its way forward through the topic, sequentially processing each message in order and writing it to the new index (including the new field). While this is going on, the old index is still being maintained as usual—it is completely unaffected by the new index being built at the same time. Users’ reads are being handled by the old index.
Finally, after some time, the new index reaches the latest message in the topic (Figure 3-8). At this point, nothing special happens—it just continues consuming messages as they appear in Kafka, the same as it was doing before. However, we have done a great thing: we have created a new index that contains all the data in the topic, and thus all the data in the database!
You now have two full indexes of the data, side by side: the old one and the new one, both being kept current with real-time updates from Kafka. Users are still reading from the old index, but as soon as you have tested the new index, you can switch users from the old index to the new one. Even this switch can be gradual, and you can always go back in case something goes wrong; the old index is still there, still being maintained.
After all users have been moved to the new index and you have assured yourself that everything is fine, you can stop updating the old index, shut it down and reclaim its resources.
This approach avoids a large, dangerous data migration, and replaces it with a gradual process that takes small steps. At each step you can always go back if something went wrong, which can give you much greater confidence about proceeding. This approach “minimizes irreversibility” (as Martin Fowler puts it17), which allows you to move faster and be more agile without breaking things.
Moreover, you can use this technique to recover from bugs. Suppose that you deploy a bad version of your application that writes incorrect data to a database. In a traditional setup, where the application writes directly to the database, it is difficult to recover (restoring from a backup would most likely incur data loss). However, if you’re going via a log and the bug is downstream from the log, you can recover by using the same technique as just described: process all the data in the log again using a bug-fixed version of the code. Being able to recover from incorrectly written data by re-processing is sometimes known as human fault-tolerance.18
The idea of maintaining a copy of your database in Kafka surprises people who are more familiar with traditional enterprise messaging and its limitations. Actually, this use case is exactly why Kafka is built around a replicated log: it makes this kind of large-scale data retention and distribution possible. Downstream systems can reload and re-process data at will without impacting the performance of the upstream database that is serving low-latency queries.
When Bottled Water extracts data from Postgres, it could be encoded as JSON, or Protocol Buffers, or Thrift, or any number of formats. However, I believe Avro is the best choice. Gwen Shapira has written about the advantages of Avro for schema management,19 and I’ve written a blog post comparing it to Protobuf and Thrift.20 The Confluent stream data platform guide21 gives some more reasons why Avro is good for data integration.
Bottled Water inspects the schema of your database tables and automatically generates an Avro schema for each table. The schemas are automatically registered with Confluent’s schema registry,22 and the schema version is embedded in the messages sent to Kafka. This means it “just works” with the stream data platform’s serializers: you can work with the data from Postgres as meaningful application objects and rich datatypes, without writing a lot of tedious parsing code.
The Logical Decoding Output Plug-In
Now that we’ve examined Bottled Water’s use of Kafka log compaction and Avro data encoding, let’s have a little peek into the internals of its integration with Postgres, and see how it uses the logical decoding feature.
An interesting property of Postgres’ logical decoding feature is that it does not define a wire format in which change data is sent over the network to a consumer. Instead, it defines an output plug-in API23 that receives a function call for every inserted, updated, or deleted row. Bottled Water uses this API to read data in the database’s internal format, and serializes it to Avro.
The output plug-in must be written in C, using the Postgres extension mechanism, and then loaded into the database server as a shared library (Figure 3-10). This requires superuser privileges and filesystem access on the database server, so it’s not something to be undertaken lightly. I understand that many a database administrator will be scared by the prospect of running custom code inside the database server. Unfortunately, this is the only way logical decoding can currently be used.
At the moment, the logical decoding plug-in must be installed on the leader database. In principle, it would be possible to have it run on a separate follower so that it cannot impact other clients, but the current implementation in Postgres does not allow this. This limitation will hopefully be lifted in future versions of Postgres.
The Client Daemon
Besides the plug-in (which runs inside the database server), Bottled Water consists of a client program which you can run anywhere. It connects to the Postgres server and to the Kafka brokers, receives the Avro-encoded data from the database, and forwards it to Kafka.
The client is also written in C because it’s easiest to use the Postgres client libraries that way, and because some code is shared between the plug-in and the client. It’s fairly lightweight and doesn’t need to write to disk. At the time of writing, work is underway to integrate the Bottled Water client with the Kafka Connect framework.
What happens if the client crashes or is disconnected from either Postgres or Kafka? No problem: it keeps track of which messages have been published and acknowledged by the Kafka brokers. When the client restarts after an error, it replays all messages that haven’t been acknowledged. Thus, some messages could appear twice in Kafka, but no data should be lost. Log compaction will eventually remove the duplicated messages.
One more question remains: what happens if several clients are concurrently writing to the database (Figure 3-11)? How is the result of those writes reflected in the change stream that is sent to Kafka? What happens if a transaction writes some data and then aborts before committing?
Fortunately, in the case of Bottled Water, PostgreSQL’s logical decoding API offers a simple answer: all of the writes made during a transaction are exposed to the logical decoding API at the same time, at the time the transaction commits. This means Bottled Water doesn’t need to worry about aborted transactions (it won’t even see any writes made by a transaction that subsequently aborts) or about ordering of writes.
PostgreSQL’s transaction isolation semantics ensure that if you apply writes in the order in which they were committed, you get the right result. However, the WAL may actually contain interleaved writes from several different transactions. Thus, while decoding the WAL, the logical decoding feature needs to reorder those writes so that they appear in the order of transaction commit.
Postgres makes this particular aspect of change data capture easy. If you are implementing change data capture with another database, you may need to deal with these concurrency issues yourself.
Status of Bottled Water
At present, Bottled Water is alpha-quality software. Quite a bit of care has gone into its design and implementation, but it hasn’t yet been run in any production environment. However, with some testing and tweaking it will hopefully become production-ready in future. We released it as open source early, in the hope of getting feedback from the community; the response and the number of contributions from the community has been encouraging. When integrated with Kafka Connect, it will hopefully become a fully supported part of the Kafka ecosystem.
I’m excited about change capture because it allows you to unlock the value in the data you already have and makes your architecture more agile by reducing irreversibility. Getting data out of databases and into a stream data platform24 allows you to combine it with event streams and data from other databases in real time.
In the next chapter, we will see how this approach of building systems resembles the design of Unix, which has been successful for approximately 40 years and is still going strong.
1“Oracle GoldenGate 12c: Real-time access to real-time information.” Oracle White Paper, oracle.com, March 2015.
3Manuel Schoebel: “Meteor.js and MongoDB Replica Set for Oplog Tailing,” manuel-schoebel.com, 28 January 2014.
8Yogeshwer Sharma, Philippe Ajoux, Petchean Ang, et al.: “Wormhole: Reliable Pub-Sub to Support Geo-replicated Internet Services,” at 12th USENIX Symposium on Networked Systems Design and Implementation (NSDI), May 2015.
16Michael Paquier: “Postgres 9.4 feature highlight: REPLICA IDENTITY and logical replication,” michael.otacoo.com, 24 April 2014.
17Daniel Bryant: “Agile Architecture: Reversibility, Communication and Collaboration,” infoq.com, 4 May 2015.
20Martin Kleppmann: “Schema evolution in Avro, Protocol Buffers and Thrift,” martin.kleppmann.com, 5 December 2012.
21Jay Kreps: “Putting Apache Kafka to use: A practical guide to building a stream data platform (Part 2),” confluent.io, 24 February 2015.
24Jay Kreps: “Putting Apache Kafka to use: A practical guide to building a stream data platform (Part 2),” confluent.io, 24 February 2015.