Chapter 4. Designing Effective Data Pipelines

In this chapter, you will learn how to build resilient and effective data pipelines using Kafka Connect. We explain the key concepts and decision points that data engineers and architects have to understand when assembling the components we introduced in Chapter 3.

In the first half of this chapter, we look at how to choose connector plug-ins for your pipelines. You need a connector, a converter, and, optionally, some transformations and predicates. We discuss how to evaluate connectors and identify the one that satisfies your production requirements among the hundreds that are available in the community. Then we discuss how to model your data as it flows through the pipeline and the formatting options that you have available.

The second half of this chapter is focused on the resiliency characteristics of Kafka Connect. Before building your pipeline, you need to identify the semantics you require based on your use cases. For example, do you need to guarantee that every piece of data is delivered, or is it acceptable to lose some data in favor of increased throughput? We first dive into the inner workings of Kafka Connect, explaining why it is a robust environment that is able to handle failures. Then we look at the semantics that sink and source pipelines can achieve and the different configuration options and trade-offs available to target your specific use cases.

Choosing a Connector

When building a data pipeline that uses Kafka Connect, you first need to decide which connector to install. Since Kafka is a very popular technology, there are many existing connectors for you to choose from. Rather than reinventing the wheel, it is often better to use an existing connector, but only if it fulfills your requirements. Here are some things to consider when choosing whether to use a specific connector as part of your pipeline:

  • Pipeline direction (source or sink)

  • Licensing and support

  • Connector features

Pipeline Direction

First, verify that the connector flows data in the right direction. Is it a source connector that produces data to Kafka or a sink connector that consumes from Kafka? Most connectors include this detail as part of the name, and it is usually clear from the documentation. If not, you can install the connector in a Kafka Connect environment and use the REST API to retrieve its type.

$ curl localhost:8083/connector-plugins
[
  {
    "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
    "type": "source",
    "version": "3.5.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector",
    "type": "source",
    "version": "3.5.0"
  },
  {
    "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
    "type": "source",
    "version": "3.5.0"
  }
]

The type field indicates the type of the connector.

Some projects provide a single download that includes both a source and sink connector, but other projects may provide only one or the other.

Licensing and Support

Before using a connector, make sure to check what its license permits. Just because a connector’s source code is public or freely available to download doesn’t mean the license is permissive. You should also consider the level of maintenance and support you expect. The Kafka community works hard to make sure that older connectors are compatible with newer versions of the runtime; however, connectors are not all maintained or updated with the same regularity. Whatever connector you choose, whether it’s open source or proprietary, make sure you know how often the connector is updated with the latest Kafka APIs and how the developers address security vulnerabilities.

The level of support you get for a particular connector varies greatly. Many companies offer paid-for support for connectors, whether proprietary or open source. This normally includes a dedicated communication channel if you have problems and access to industry experts for advice on configuration. That being said, many open source communities also respond to bug reports quickly and provide their own dedicated communication channels, which, depending on your use cases, may be an alternative to paid support.

Note

Since a single connector is often used for many different use cases, you may find there isn’t one perfectly suited to your needs. If that is the case, instead of writing one from scratch, we would encourage you to see if there is an open source connector that you could contribute to. You still need to get your changes accepted, but most open source projects accept new contributors.

Connector Features

Once you have identified potential connectors for your pipelines, you need to take a closer look at the features offered by those connectors. To start with, does the connector support the type of connection you need? For example, your external system might require an encrypted connection, some form of authentication, or for the data to be in a specific format. You should also check if the connector is suitable for production use. For example, does it provide metrics for monitoring its status and logging to help you debug problems? Look over the documentation—and, for an open source connector, the code—to see how the connector works and assess the features it provides.

In Chapter 3, we introduced the common configuration options for all connectors: topics for sink connectors and tasks.max for both source and sink. Most connectors offer additional options to configure their specific features. For a particular connector, you can use the REST API to list all of the available configuration options and validate your configuration before starting the connector.

Using the REST API is especially useful if the code is not available, but be aware that this relies on the developer documenting their configuration correctly. Some fields might be incorrectly marked as optional or required. Similarly, the validation is useful for verifying the connector configuration will be accepted, but a successful validation request does not guarantee that your connector will work.

Use the GET /connector-plugins/<CONNECTOR_PLUGIN>/config endpoint to list the configuration options and the PUT /connector-plugins/<CONNECTOR_PLUGIN>/​con⁠fig/validate endpoint to validate a specific configuration.

We describe the REST API endpoints in detail in Chapter 7.

Defining Data Models

No two pipelines are identical. Even if they fulfill a similar use case or use the same components, the actual data and how that data evolves varies from pipeline to pipeline. When you are designing your pipeline, you need to consider when and how each individual data entry will change, as well as how the individual entries relate to each other. How you group or split (shard) your data will affect how well you can scale your pipeline as the amount of data it is processing increases. To examine these ideas in more detail, we first discuss when to apply data transformation in Kafka Connect using transformations and predicates, and then discuss techniques for mapping data between Kafka Connect and other systems.

Data Transformation

There are two common patterns that are used to evolve data as it flows through a pipeline: ETL (Extract-Transform-Load) and ELT (Extract-Load-Transform). In these patterns, the word “Transform” doesn’t just refer to updating the format. Transformation could include cleaning the data to remove sensitive information, collating the data with other data streams, or performing more advanced analysis.

Both approaches have their advantages and disadvantages. In systems where storage is restricted, it is better to use the ETL approach and transform the data before loading it into storage. This makes it easy to query the data because it has already been prepared for analysis. However, it can be difficult to update the pipeline if a new use case that requires a different transformation is discovered. In contrast, ELT keeps the data as generic as possible for as long as possible, giving the opportunity for the data to be reused for other purposes. The ELT pattern has been gaining popularity and there are now many dedicated data processing and analysis tools that are built to support it. Some examples of these tools are Kafka Streams, Apache Spark, Apache Flink, Apache Druid and Apache Pinot.

So where do Kafka Connect transformations fit in this flow? In Kafka Connect, there is a rich set of transformations that you can perform on your data while it is in flight, which fits naturally into the ETL pattern. Using Kafka Connect for your transformations removes the need for a separate tool to transform the data before loading it. Since you choose the specific set of transformations to apply and Kafka Connect allows you to plug in custom ones, the possibilities are endless. However, Kafka Connect transformations do have their limitations, because they are applied to each piece of data independently. This means you can’t perform more advanced processing, like merging two streams of data or aggregating data over time. Instead, you should use one of the dedicated stream processing technologies for these kinds of operations.

Even if you decide to use a dedicated technology for the bulk of your data processing and analysis, you can still make use of Kafka Connect transformations. Some particular transformations that you might want to consider are the ones that remove or rename fields and can drop records. These are very useful for ensuring that sensitive data isn’t sent further down the pipeline and for removing data that could cause processing problems later. If you have multiple different sources that need to be aggregated in subsequent steps, you can also use Kafka Connect transformations to first align the data to have common fields. Figure 4-1 shows this sort of flow.

Data pipeline using Kafka Connect transformations for removing sensitive data and Kafka Streams to perform further processing
Figure 4-1. Data pipeline using Kafka Connect transformations for removing sensitive data and Kafka Streams to perform further processing

Mapping Data Between Systems

We have discussed how you can transform individual data entries, but what about the overall structure of your data as it is passed along the pipeline?

One of the hardest things to reason through when building a data pipeline is how to map data structures between different systems. More than just the format of individual entries, how the data should be grouped and stored, what ordering is required, and what happens when the pipeline needs to be scaled are all structural considerations. In Kafka Connect, a lot of these decisions are made for you by the developer who wrote the connector, but you should still be aware of the mechanisms that are available for connectors to use when mapping data between Kafka and other systems. If you understand these mechanisms, you are better equipped to assess a connector you want to use and configure it correctly for your use case.

To understand how connectors can group and map data, you need to consider the interaction between Kafka Connect tasks and Kafka partitions. In Chapter 3, we introduced tasks as the mechanism that Kafka Connect uses to do the actual work of transferring data from one place to another. In Chapter 2, we talked about partitions and highlighted the fact that Kafka provides ordering guarantees within a single partition. Both mechanisms provide a way to shard data.

Let’s first look at the impact of tasks on source connectors. When a source connector reads data from an external system, each task is reading data in parallel. It is up to the connector to decide how to split this data among the available tasks to ensure there are no duplicates. A simple connector could run a single task and avoid the problem of sharding the data that’s in the external system. This is actually how FileStreamSourceConnector, which is packaged with Kafka, works. See Figure 4-2 for an example.

A single task in FileStreamSourceConnector reads the file line by line
Figure 4-2. A single task in FileStreamSourceConnector reads the file line by line

Even if you increase the tasks.max setting, it still only runs a single task because it doesn’t have a sensible mechanism to shard the data. Most connectors are more advanced than FileStreamSourceConnector and have built-in mechanisms to assign the data across the tasks. Figure 4-3 shows an example of such a connector that allows different tasks to read different lines of a table.

Multiple tasks that each read a subset of the data, preventing duplicates in Kafka
Figure 4-3. Multiple tasks that each read a subset of the data, preventing duplicates in Kafka

Let’s consider partitions. An individual source connector can either choose which records should go to which partitions or rely on the configured partitioning strategy. Many connectors use keys to identify the data that needs to be sent to the same partitions. For example, status updates that apply to a particular entity might use the entity ID as the key. Figure 4-4 shows an example of tasks sending data to partitions.

Source tasks can send their data to one or more partitions
Figure 4-4. Source tasks can send their data to one or more partitions

How a source connector partitions its data affects the next stage of the pipeline, whether that next stage is a sink connector or just a Kafka consumer, due to the way that Kafka distributes partitions among both sink tasks and consumers from a group. Each partition can only be assigned to a single sink task of a particular sink connector, and likewise a single consumer within a particular group, so any data that needs to be read by a single task or consumer needs to be sent to the same partition by the source connector.

Now let’s consider sink connectors. In sink connectors, tasks also run in parallel. This can affect the order in which data is sent to the external system. You can be sure that each task will write its own data in order, but there isn’t any order coordination between tasks. The way that sink tasks interact with partitions also impacts the number of sink tasks you can run. If you have one partition and two tasks, only one task will receive any data, so when creating a data pipeline with a sink connector, make sure you are mindful about the number of partitions on the topics the connector is reading from. Figure 4-5 shows two sink tasks reading data from three partitions.

Each partition can only be read by one sink task for a specific connector
Figure 4-5. Each partition can only be read by one sink task for a specific connector

As you can imagine, the combination of tasks and partitions means that there are multiple ways that the data can be grouped and ordered as it flows through the system. When you are designing your Kafka Connect data pipeline, make sure you think about these options and don’t leave the tasks.max and partitions configuration options as an afterthought.

Now that we have looked at how data at a high level can be transformed and mapped between systems, let’s look at how you can control the specific format of data in a Kafka Connect pipeline.

Formatting Data

In Chapter 3, we talked about converters and how they serialize and deserialize data as it goes into and out of Kafka. We also briefly covered why you need to align your converters with the serializer and deserializer of the producers and consumers that are also interacting with the data. Here we discuss in more detail the differences among converters, transformations, and connectors, and how they impact the data format throughout the pipeline. We also look at how you can enforce this structure with schemas and a schema registry.

Data Formats

In a Kafka Connect pipeline, the format of the data and how it evolves depends on the connector, any configured transformations, and the converter. Let’s look at each of these and how they affect the data format.

First, let’s consider the connector. In a source flow, the connector runs first; it reads the data from the external system and creates a Java object called a ConnectRecord. The connector decides which parts of the record data should be kept and how to map them to the ConnectRecord. The specifics of this mapping can differ between connectors, even if they are for the same system, so make sure that the connector you choose keeps the parts of the data that are important to you.

In a sink flow, the connector runs last rather than first. It takes ConnectRecord objects and translates them into data objects that it can send to the external system. This means that a sink connector has the last say in what data makes it to the external system.

Now let’s look at the difference between converters and transformations when it comes to their input and output:

  • Transformations have ConnectRecord objects as both their input and output.

  • Converters convert between ConnectRecord objects and the raw bytes that Kafka sends and receives. They run last in source pipelines and first in sink pipelines.

Figure 4-6 shows the different data types that are passed between connectors, transformations, and converters.

Data types in source and sink flows
Figure 4-6. Data types in source and sink flows

Transformations and converters are separate steps to enable the composability that Kafka Connect offers. You could write a JSON converter that manipulates the contents of the record before sending it to Kafka. If you wanted a pipeline that manipulated the data in the same way but used a format like Avro, you would need a new converter. It would be better to create a transformation that manipulates the data and then use two converters, one for JSON and one for Avro.

Transformations can be chained, so rather than needing to write a custom transformation, you can run multiple simple transformations to fulfill your requirements. If you can’t find transformations or converters that fit your requirements, you can write your own (see Chapter 12).

Now that you understand the roles of connectors, transformations, and converters, and the orders they can run in, you can make a better-informed decision about which libraries to use for your pipeline in order to get the exact data format you need at each stage. As with choosing connectors, make sure you take into account licensing and support requirements when choosing transformations and converters.

Schemas

A schema provides a blueprint for the shape of the data. For example, a schema can specify which fields are required and what types should be present. Using schemas is important when you are building a data pipeline because most data is complex and contains multiple fields of different types. Without schemas to give context to the data, it is very difficult for applications to reliably perform the steps to process and analyze it.

Almost all data management systems have a mechanism to define schemas. Your systems’ specific schemas will vary, but here’s how Kafka Connect pipelines generally make use of schemas. As we saw in the previous section, data transitions between two different formats while passing through Kafka Connect: the ConnectRecord and raw bytes. Each format has a different configuration mechanism.

Kafka Connect record schemas

A ConnectRecord contains an optional Schema object for both the key and the value. Schema is a Java class that is part of the Kafka Connect API and is used by connectors, transformations, and converters as the data travels through Kafka Connect. Let’s look at how the Schema is used in both source and sink pipelines.

A source connector is responsible for constructing the initial ConnectRecord object and has control over the Schema that is added. How the Schema is defined depends on the connector. FileStreamSourceConnector always uses the STRING_SCHEMA, no matter what format the file is using. You can see this in the source code:

private static final Schema VALUE_SCHEMA = Schema.STRING_SCHEMA;

@Override
public List<SourceRecord> poll() throws InterruptedException {
    ...
    records.add(new SourceRecord(offsetKey(filename),
        offsetValue(streamOffset),
        topic,
        null,
        null,
        null,
        VALUE_SCHEMA,
        line,
        System.currentTimeMillis()));
    ...
}

Most connectors are more complex than FileStreamSourceConnector and make use of schemas provided by the system to construct the Schema object. For example, the Debezium connectors that read database change logs take note of database schema changes and use that information to construct the ConnectRecord. The ConnectRecord and included Schema objects are then passed to any transformations and to the converter. Transformations and converters can use the Schema to parse the ConnectRecord and do their respective work.

In a sink pipeline, it is the converter that constructs the ConnectRecord, and therefore the Schema; the transformations use this information to parse the contents. Sink connectors use the ConnectRecord to construct the object that is sent to the external system, which means that they can choose how to interpret the Schema that is included in the ConnectRecord. For example, FileStreamSinkConnector ignores the Schema completely, but that is only because it is writing to a file. Most sink connectors use the Schema information to construct the external system data.

Kafka record schemas

Kafka Connect pipelines can also use schemas to describe the data stored in Kafka. These schemas are used by converters to understand how to serialize and deserialize the data sent to and from Kafka. In a sink pipeline, the schema Kafka Connect uses to deserialize data is the same one that was used by the applications to produce that data. In a source pipeline, the schema Kafka Connect uses to serialize the data is also used by consuming applications or sink connectors to deserialize it further down the pipeline.

The ConnectRecord object has built-in support for schemas, but since records in Kafka are raw bytes, you need to decide on a mechanism to include the schema. The naïve approach is to put the schema alongside the payload in the value of the record. This is what the JsonConverter does by default.

If you run FileStreamSourceConnector with JsonConverter against a file with the following contents:

This is a string
Another string
A third string
The final string

The JsonConverter uses the String schema that the connector provides and constructs Kafka records with the values as:

{"schema":{"type":"string","optional":false},"payload":"This is a string"}
{"schema":{"type":"string","optional":false},"payload":"Another string"}
{"schema":{"type":"string","optional":false},"payload":"A third string"}
{"schema":{"type":"string","optional":false},"payload":"The final string"}

Although this makes it easy to pass a schema along for consumers, it means that every single record has to include the schema. The example here is simple, so the schema is small, but the more complex the schema, the bigger the overhead for each record.

A better approach is to only include a small identifier for the schema in each record and store the schemas elsewhere. This gives you the benefits of having schemas throughout your pipeline with only a negligible overhead. This is what most of the existing converter, serializer, and deserializer tools do, and they typically put the schema ID in one of two places: in a record header or at the beginning of the serialized value.

If you are building a source flow, make sure you choose a converter that will store the ID in a place that is expected by the downstream applications that will consume the record. Similarly, if you are building a sink flow, choose a converter that knows where in the record to look for the ID. It is relatively easy to build a system to store schemas that can be retrieved by your applications; otherwise, this is exactly what schema registries are built for.

A schema registry typically consists of two parts: a server that stores schemas and exposes APIs to retrieve and administer them, and serializer/deserializer/converter libraries to use in your clients. Schema registries often include additional features to help you manage your schemas. For example, many registries perform compatibility enforcement and allow you to control the lifecycle of your schemas. This is useful for applications, as it can prevent breaking changes from being introduced and allows administrators to inform application developers when a schema has been deprecated.

The two schema registries most commonly used with Kafka are the Confluent Schema Registry and the Apicurio Registry. Both allow you to use Kafka as the backing store for the registry, removing the need for a separate database or other storage system. They also both support the most common schema formats that are used with Kafka: Avro, JSON Schema, and Protobuf.

A detailed comparison of the available schema formats and schema registries for Kafka is outside the scope of this book, but we can give pointers. To choose a format, make sure you consider the tools and libraries that go along with each one. For example, do they support the language you want and provide code generation options? If using a schema registry, make sure your chosen converter and application serializer/deserializer are compatible with the registry. The Confluent schema registry will only work with Confluent libraries, whereas Apicurio Registry comes with a compatibility API, which means that you can use the dedicated Apicurio Registry libraries or the Confluent ones.

Exploring Kafka Connect Internals

In order to understand how Kafka Connect in distributed mode can withstand failures, you need to know how it stores its state with a mix of internal topics and group membership. Secondly, you should be familiar with the rebalance protocol Kafka Connect uses to spread tasks across workers and detect worker failures.

Internal Topics

As mentioned in Chapter 3, Kafka Connect in distributed mode uses topics to store state, which are:

  • Configuration topic, specified via config.storage.topic

  • Offset topic, specified via offset.storage.topic

  • Status topic, specified via status.storage.topic

In the configuration topic, Kafka Connect stores the configuration of all the connectors and tasks that have been started by users. Each time users update the configuration of a connector or a connector requests a reconfiguration (for example, when it detects it can start more tasks), a record is emitted to this topic. This topic is compacted, so it always keeps the last state for each entity while ensuring that it does not use a lot of storage.

In the offset topic, Kafka Connect stores offsets of source connectors. (This topic is compacted for the same reasons.) By default, Kafka Connect creates this topic with several partitions, as each source task uses it regularly to write its position. Offsets for sink connectors are stored using regular Kafka consumer groups.

In the status topic, Kafka Connect stores the current state of connectors and tasks. This topic is the central place for the data queried by users of the REST API. It allows users to query any worker and get the status of all running plug-ins. It is also compacted and should also have multiple partitions.

At startup, Kafka Connect automatically creates these topics if they don’t already exist. All workers in a Kafka Connect cluster must use the same topics, but if you are running multiple Kafka Connect clusters, each cluster needs its own separate topics. Data within all three topics is stored in JSON, so it can be viewed using a regular consumer.

For example, with the kafka-console-consumer.sh tool, here’s how you can view the contents of the status topic:

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 \
  --topic connect-status \
  --from-beginning \
  --property print.key=true
status-connector-file-source {"state":"RUNNING","trace":null,
"worker_id":"192.168.1.12:8083","generation":5}

In this example, the runtime has status.storage.topic set to connect-status. Records in this topic show the status for a connector named file-source and Kafka Connect uses that name to derive the key, status-connector-file-source, for records related to the connector.

Group Membership

In addition to topics, Kafka Connect makes extensive use of Kafka’s group membership API.

First, for each sink connector, the Kafka Connect runtime runs a regular consumer group that consumes records from Kafka to pass them to the connector. By default, the groups are named after the connector name; for example, for a connector named file-sink, the group is connect-file-sink. Each consumer in the group provides records to a single task. These groups and their offsets can be retrieved using regular consumer groups tools, such as kafka-consumer-groups.sh.

In addition, Kafka Connect uses the group membership API to assign connectors and tasks to workers and ensure each user partition is only consumed by a single sink task per connector. At startup, Kafka Connect creates a group using the group.id value from its configuration. This group is not directly visible by the consumer groups tools, as it’s not a consumer group, but it works in essentially the same way. This is why all workers with the same group.id value become part of the same Kafka Connect cluster.

To be a member of a group, workers, just like regular consumers, have to heartbeat regularly. A heartbeat is a request that contains the group name, the member ID and a few more fields to identify the sender. It is sent at regular intervals (specified by heartbeat.interval.ms, with a default value of three seconds) by all workers to the group coordinator. If a worker stops sending heartbeats, the coordinator detects it, removes the worker from the group, and triggers a rebalance. During a rebalance, tasks are assigned to workers using a rebalance protocol.

Rebalance Protocols

The specifics of rebalance (or rebalancing) protocols are generally hard to comprehend. Thankfully, to use Kafka Connect effectively, it’s enough to understand the high-level process described in this section.

Kafka Connect wants to ensure that all tasks are running, that each task is run by a single worker, and that tasks are spread evenly across all workers. The distribution of tasks has to be updated anytime the resources that are managed by Kafka Connect change, such as when a worker joins or leaves the group, or when tasks from a connector are added or removed. When resources change, Kafka Connect has to rebalance tasks across the workers.

The mechanism that Kafka Connect uses for rebalances has changed over time. Until Kafka 2.3, during a rebalance, Kafka Connect simply stopped all tasks and reassigned them all onto the available workers. This is called the eager rebalance protocol, also called “stop the world.” The main issue with this protocol is that Kafka Connect can run a set of independent connectors, and each time one of these connectors decides to create or delete tasks, all connectors and tasks are stopped, then reassigned to workers, then restarted. In a busy Kafka Connect cluster, this can cause long and repetitive pauses in data processing. It also makes rolling restarts very expensive, as each worker causes two rebalances to happen: one when it shuts down and another one when it restarts.

In Kafka 2.3, Connect introduced an incremental cooperative rebalance protocol called compatible. The idea is to avoid stopping all connectors and tasks each time a rebalance happens and instead only rebalance the resources that need to be rebalanced (incrementally, if possible). For example, if a worker disappears, Kafka Connect waits a short duration before rebalancing anything. This is because workers usually don’t experience destructive failures and will restart immediately. If a worker rejoins quickly, it keeps the tasks that it owned before and no rebalance is needed. If the worker does not rejoin quickly enough—the duration is specified via scheduled.rebalance.max.delay.ms, with a default of five minutes—then the tasks it used to run are reassigned to available workers.

Since Kafka 2.4, the default rebalance protocol is sessioned. In terms of rebalancing behavior, it works exactly the same way as compatible, but it also ensures that intra-cluster communications are secured. Like compatible, sessioned is only active if all workers support it; otherwise, it defaults to the common protocol shared by all workers.

The rebalance protocol used by Kafka Connect is specified by the connect.protocol configuration. Users should keep the default value for the version they use and only consider downgrading to eager if they rely on its specific behavior.

Note

For more details on the history behind each protocol, you can read the respective KIPs. The compatible rebalance protocol was introduced by KIP-415. The sessioned rebalance protocol was introduced by KIP-507.

Handling Failures in Kafka Connect

Now that you understand how Kafka Connect manages its state, let’s take a look at the most common types of failures and how to handle them.

In order to build a resilient pipeline, it’s key to understand how all components in your system handle failures. In this section, we focus on Kafka Connect and how it handles failures, ignoring other components such as the operating system, execution and deployment environment, or hardware.

We cover the following failures:

  • Worker failure

  • Connector/task failure

  • Kafka/external systems failure

We also discuss how you can use dead letter queues to deal with unprocessable records.

Worker Failure

In distributed mode, Kafka Connect can run across multiple workers. We recommend using at least two workers to be resilient to worker failure.

For example, if we have three workers that are running two connectors (C1 and C2), the different tasks could be spread out like in Figure 4-7.

Example of a Kafka Connect cluster with three workers. Connector C1 has three tasks (T1, T2, T3) and C2 has two tasks (T1 and T2).
Figure 4-7. Example of a Kafka Connect cluster with three workers. Connector C1 has three tasks (T1, T2, T3) and C2 has two tasks (T1 and T2).

In this case, if worker2 is taken offline—either because it crashed or for maintenance—Kafka does not receive its heartbeat anymore. After a short interval, it automatically kicks worker2 out of the group, which forces Kafka Connect to rebalance all running tasks onto the remaining workers.

After the rebalance, the task assignment may look like Figure 4-8.

Kafka Connect has reassigned all tasks onto the remaining workers
Figure 4-8. Kafka Connect has reassigned all tasks onto the remaining workers

While the rebalance is happening, the tasks that were on worker2 are not run. This mechanism triggers and completes within about five minutes. The actual time it takes depends mostly on the following configurations:

  • session.timeout.ms is the maximum duration between two consecutive heartbeats from workers

  • rebalance.timeout.ms is the maximum duration workers can take to rejoin the group when a rebalance happens

  • scheduled.rebalance.max.delay.ms is the time to wait before re-allocating the connectors and tasks of workers that have fallen out of the group since the last rebalance

When a worker is not stopped cleanly, it’s possible that its tasks did not commit offsets for all records they were processing. So, upon restarting, some tasks may reprocess some records. We discuss this problem later in this chapter.

In order for Kafka Connect to handle worker failures, you need to make sure that you have enough capacity to accommodate task redistribution. Kafka Connect has no mechanism to limit the number of tasks that can be assigned to a worker during a rebalance. If a worker is assigned too many tasks, its performance degrades and eventually tasks won’t make any progress. At minimum, you should always have enough capacity to handle a single worker failure in order to reliably handle rolling worker restarts.

Connector/Task Failure

Another common type of failure is a crash of one of the connectors or one of its tasks. Until now, we’ve simplified what exactly happens when Kafka Connect runs a connector. In reality, it has to run one instance of the connector and zero or more task instances. Kafka Connect tracks the health of both and associates them with a state, which can be:

UNASSIGNED

A connector or task has not yet been assigned to a worker

RUNNING

A connector or task is correctly running on a worker

PAUSED

A connector or task has been paused by a user via the REST API

FAILED

A connector or task has encountered an error and crashed

RESTARTING

The connector/task is either actively restarting or is expected to restart soon

STOPPED

A connector has been stopped by a user via the REST API

The state of connectors and tasks can be retrieved via the REST API. Figure 4-9 depicts the most common transitions between the different states.

Most common state transitions for connectors and tasks
Figure 4-9. Most common state transitions for connectors and tasks
Tip

Kafka Connect emits detailed metrics tracking the time spent in each state by each connector. See Chapter 9 for details on how to retrieve and monitor metrics.

The statuses of the connector and tasks are determined independently. For example, some connectors may perform extra logic, such as connecting to their target system to discover resources when they start up. While this happens, the connector will be in RUNNING state, but no tasks will be created.

Each task can also encounter an error (and be marked FAILED) separately from the connector. By default, if a task has a problem, Kafka Connect lets it crash, marks it as FAILED, and does not attempt to restart it automatically. Kafka Connect emits metrics for the state of tasks, which administrators have to monitor to identify failures. A task failure does not trigger a rebalance.

In case of a one-off failure, administrators can restart tasks via the REST API. The REST API can also be used to retrieve the exception that crashed the task and its stack trace. In case of a systematic failure, such as a record that is impossible to process, Kafka Connect offers the option to skip it (and optionally emit a detailed log message) instead of failing the task. This can be configured per connector using the errors.tolerance configuration.

Kafka/External Systems Failure

As Kafka Connect flows data between Kafka and external systems, failures in either can impact Kafka Connect.

As detailed in Chapter 2, Kafka can be very resilient. To have a resilient production deployment, Kafka clusters must have multiple brokers and be configured to offer maximum availability. In addition, Kafka Connect must be configured to create its topics with multiple replicas so that it won’t be negatively impacted by the failure of a single broker. This is important for topics that are either the source or sink for connectors, internal Kafka Connect topics, and __consumer_offsets and __transaction_state topics.

On the other hand, an external system failure has to be handled by the connector. Depending on the system and the implementation of the connector, it may be handled automatically or it may crash tasks and require manual intervention to recover.

Before building a Kafka Connect pipeline, it’s important to read the connector documentation and understand the failure modes of the external system to gauge the pipeline’s resiliency. Sometimes there are multiple community implementations for the same connectors and you will need to pick the one that satisfies your needs. Then you need to perform resiliency testing to determine whether the connector provides the required resiliency for your use cases. Finally, it’s important to monitor the appropriate metrics and logs from both the external system and the connector, which are described in Chapter 9.

Dead Letter Queues

When dealing with an unprocessable record, for sink connectors, Kafka Connect can use a dead letter queue rather than having to skip the record or fail. A dead letter queue, often abbreviated DLQ, is a concept from traditional messaging systems—basically a place to store records that can’t be processed or delivered. In Kafka Connect, the dead letter queue is a topic (specified via errors.deadletterqueue.topic.name in the connector configuration) where unprocessable records are written. Kafka Connect, however, does not provide a similar mechanism for source connectors, because it can’t convert the undeliverable record from the external system into a Kafka record.

Note

Support for dead letter queues for sink connectors was first introduced via KIP-298 and further improved in Kafka 2.6 via KIP-610.

Let’s look at an example of using a dead letter queue. When running the Confluent S3 sink connector, the Kafka Connect runtime is reading records from a Kafka topic before passing them to the connector. As the topic is expected to contain Avro records, we configure the connector with an Avro converter. However, if a single record in the topic is not in the Avro format, the connector is not able to handle this record. Instead of failing the connector or losing this record, Kafka Connect can forward it to a dead letter queue and keep processing the other records in the topic. The connector configuration would contain the following settings:

{
  "connector.class": "io.confluent.connect.s3.S3SinkConnector",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "errors.tolerance": "all",
  "errors.deadletterqueue.topic.name": "my-dlq"
}

This allows the contents of the dead letter queue topic to be processed by another mechanism, for example another connector or a consumer application.

Figure 4-10 shows an example of using a dead letter queue.

Unprocessable records are sent to a dead letter queue to be processed by another mechanism
Figure 4-10. Unprocessable records are sent to a dead letter queue to be processed by another mechanism

The flow starts with the S3 sink connector configured with Avro receiving records from the input topic (1). Avro records are correctly processed and sent to S3 (2). If a record can’t be processed due to failures in the converter, transformation, or sink task delivery phase, it is sent to the dead letter queue configured for the connector (3). In this example, another application receives records from the dead letter queue (4), processes them, and reports errors (5).

Understanding Processing Semantics

Processing semantics define the type of guarantees that are made when a message flows through a Kafka Connect pipeline. It can be one of these three types:

At-least-once

A message entering the pipeline reaches the target system at least once as one or multiple copies. Extra copies of a message are called duplicates.

At-most-once

A message entering the pipeline may not arrive to the target system, and will never be duplicated.

Exactly-once

A message entering the pipeline is processed by downstream readers exactly once.

The exact semantics that a Kafka Connect pipeline provides depends on several aspects, including the connector being used, how it’s configured, and the configuration of the runtime.

Let’s look at each type of connector and see how to understand the semantics that can be provided.

Sink Connectors

To recap, these are the steps that constitute a sink pipeline:

  1. The runtime consumes records from the Kafka topic

  2. Records are passed to the configured converter

  3. Records are passed to the configured transformations

  4. Records are passed to the sink connector that writes them to the sink system

Figure 4-11 shows this flow.

Steps in a sink pipeline
Figure 4-11. Steps in a sink pipeline

To determine the semantics of a sink pipeline, there are a few different elements to consider:

  • The value of errors.tolerance

  • Whether you are using a DLQ

  • The behavior of the connector

  • The characteristics of the target system

Let’s first consider errors.tolerance. This configuration tells the Kafka Connect runtime what to do if the converter, transformations, or connector reports an error. The default value is none, which causes the task to fail. In this case, the task fails without committing offsets for the record that caused the error, so when it restarts, that record is consumed again by the task. This setup always provides at-least-once semantics; whether it provides exactly-once depends on the specific behavior of the connector (we discuss those considerations later in this section).

Sometimes it is unnecessary for the task to fail if it encounters an error. If a particular record is unprocessable (known as a poison pill), it can cause the task to fail repeatedly. Instead, you can set errors.tolerance to all so that the runtime skips records that cause failures. This is good for keeping your tasks running, but unhelpful if you are aiming for exactly-once or at-least-once semantics.

If you are using errors.tolerance set to all and you want exactly-once or at-least-once semantics, one option is to configure a dead letter queue. With a DLQ, if there is a failure, the runtime ensures no records are lost and it automatically forwards the affected records to the DLQ. This approach allows you to get at-least-once semantics in most cases, although it’s worth noting that Kafka Connect does not retry when sending records to the DLQ.

Finally, you need to consider the connector itself in combination with the characteristics of the target system. The difference between at-least-once and exactly-once semantics often comes down to how data is sent to the target system. Because records in Kafka are immutable, if the same records are processed twice, the connector can emit the exact same records to the target system. In systems that offer idempotent writes—for example by storing records based on their key—they can remove duplicated records and only keep a single copy, effectively achieving exactly-once semantics.

The behavior of the connector also determines whether you have at-most-once or at-least-once semantics. Each time a task starts up, it restarts from the last committed offset. By default, the runtime only commits offsets for records that have been successfully passed to the connector and did not result in an error. This means if the connector writes data to the external system asynchronously, it must utilize the hooks provided to influence which offsets the runtime can commit. Otherwise, if the task were to crash before the connector successfully wrote the record and after the offset was committed, then this record would effectively be skipped when the task restarted.

To summarize, the errors.tolerance configuration is important when determining the semantics of a sink pipeline. If it is set to all, the runtime skips unprocessable records. If you want to avoid losing any records and maximize the availability of a sink pipeline, consider using a dead letter queue, but take into account the additional operational cost it requires. Also, while some steps may cause duplicates, some external systems are able to handle them and effectively provide end-to-end exactly-once semantics for sink pipelines.

Source Connectors

As a quick recap, these are the steps that constitute a source pipeline, as shown in Figure 4-12:

  1. The connector consumes records from the external system

  2. Records are passed to the configured transformations

  3. Records are passed to the configured converter

  4. Records are passed to the runtime that produces them to a Kafka topic

Steps in a source pipeline
Figure 4-12. Steps in a source pipeline

To determine semantics for a source pipeline, you need to consider the following:

  • Use of source connector offsets

  • The value of errors.tolerance

  • The behavior of the producer

  • Whether exactly-once support is enabled

Similarly to the consumer fetching records in a sink pipeline, in a source pipeline the connector has to decide which data to retrieve from the external system. Not all external systems have a mechanism like offsets in Kafka that enables them to directly identify a record. For that reason, source connectors can associate an arbitrary mapping of keys to values—the sourceOffset field in SourceRecord objects—to express their current position. This arbitrary object can be retrieved by tasks as needed. It is the connector’s responsibility to ensure that this object contains the appropriate information to correctly retrieve records from the external system. The Kafka Connect runtime automatically stores this object in the offset topic and it also provides a mechanism to let connectors know when it is committing offsets in case they want to do their own offset tracking in the target system. In most source connector pipelines, the offsets are committed after producing records to Kafka, so it’s possible for a worker to successfully produce records to Kafka but fail before it’s able to commit their offsets. Depending on how the connector works, this step may cause some record reprocessing, resulting in at-least-once semantics.

In case there are any errors in the transformations, converter steps, or in the producer used by the runtime, the value of the errors.tolerance setting determines whether the task is marked as FAILED or the failing record is skipped. Source tasks can’t rely on dead letter queues, so tolerating errors makes these steps provide at-most-once semantics in case of errors.

Records in a source pipeline are pushed to Kafka via a producer from the runtime. As of Kafka 3.1.0, producers are configured to offer at-least-once by default, but this can be overridden in the connector configuration.

Support for exactly-once in source connectors was added via KIP-618 in Kafka 3.3. When enabled, the runtime uses a transactional producer to commit offsets and produce to Kafka as part of a single transaction. In Chapter 8, we explain the worker and connector configurations required to enable this feature. Not all source connectors support exactly-once, so make sure you check the connector documentation. We also explain how to write a connector that supports exactly-once in Chapter 11. You can use this information to write your own connector or to evaluate the semantics of an existing one.

Warning

Exactly-once semantics for source connectors is not available in standalone mode.

In summary, how source connectors handle offsets and how those are committed to Kafka are key factors in source pipeline semantics. As with sink pipelines, the errors.tolerance configuration also plays a role; however, unlike with sink pipelines, you cannot make use of dead letter queues to catch skipped records.

Summary

In this chapter, we looked at the different aspects that need to be considered in order to build resilient data pipelines with Kafka Connect.

We first looked at selecting the right connectors from the hundreds of connectors built by the Kafka community. You need to consider the pipeline direction, whether it fulfills your feature requirements, and whether it comes with an appropriate level of support.

Then we focused on data models and formats and the options you have for mapping data between systems. Whatever choice you make, you need to understand the structure of your data at each stage in the pipeline and make conscious transformation and formatting decisions. These decisions inform your choice of converter, transformations, and predicates. We also highlighted the benefits of using schemas and a schema registry to properly define, enforce, and manage the structure of the data.

We then examined the challenges in handling the many kinds of failures that can arise, from worker crashes to task errors. Although Kafka Connect is generally considered to be resilient, it cannot recover from all failures, so you should understand the levers we discussed and how to use them in response to failures.

Finally, we detailed how all the decisions taken regarding data models, error handling, runtime, and connector configurations directly impact the processing semantics that can be achieved by Kafka Connect pipelines. For sink pipelines, dead letter queues are a powerful feature to avoid losing data, and it’s possible to achieve exactly-once semantics with capable downstream systems. Since Kafka 3.3, source pipelines can also achieve exactly-once with connectors that support this feature.

Get Kafka Connect now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.