Chapter 4. Streaming Data Products

In a streaming data mesh, domains own their data. This creates a decentralized data platform to help resolve the issues relating to agility and scalability in the data lake and warehouses. Domains now have to serve other domains their data. So it’s important that they treat their data as products with high quality and trust.

Currently, data engineers are very used to the idea that all their data is in a central data store like a data lake or warehouse. They are used to finding ways to “boil the ocean” (in this case, lake) when working with data. A streaming data mesh allows us to evaporate that idea. In this chapter, we will outline the requirements for streaming data products.

In our careers as data engineers, we have found ourselves writing many wrappers for Apache Spark, a widely used analytics engine for large-scale data processing. Only in the past few years did we fully understand why companies asked us to do this.

Big data tools like Apache Spark, Apache Flink, and Apache Kafka Streams were inaccessible to many engineers who were tasked to solve big data problems. Referring back to Chapter 1, breaking up the monolithic role of a data engineer is a side effect of a data mesh.

This is a very important point because a second side effect is making complex data engineering tools like Spark, Flink, and Kafka Streams more accessible to generalist engineers so they can solve their big data problems. It’s the reason these companies asked us to wrap large-scale data processing engines. Table 4-1 shows a list of projects we were involved with to help specific engineers query big data stored in data lakes.

Table 4-1. Apache Spark wrapper projects and the engineers each project supports
The project The engineer

Big Data Integrator—an Apache Spark UI wrapper with drag-and-drop features

Integration engineers and business analysts needing to process data to deliver reports in a business intelligence tool.

Sparknado—an Apache Spark wrapper that used Airflow syntax to build Spark DAGs

Airflow engineers build Spark applications to move data into Snowflake.

Apache Envelope—a YAML-based Spark configuration

Engineers who want to define a Spark DAG without needing to know how to code in Python or Scala.

Splunk SPL to Spark—a pipe syntax that resembles Splunk Search Language (SPL)

Security threat hunters who are familiar with Splunk to be able to hunt for threats in network logs stored in a data lake.

Having done this so many times validates the necessity to simplify data tools to be more accessible to generalist and sometimes specialist engineers, and not so much to reduce workload for data engineers. It enables them to react faster to anomalous issues and provide immediate results to customers.

When building data products, it is important to understand this. Enabling domains with access to data tools will enable them to solve complex data problems. This ability was previously out of reach for them.

In the following sections, we will go through the requirements of defining and building data products to be published in a streaming data mesh. We will try to keep the scope of the details to building streaming data products, and we will refer to details regarding self-services and data governance as they relate to streaming data products in their corresponding chapters.

Defining Data Product Requirements

This section is a summary of the requirements for data products listed in Table 4-2. The goal of these requirements is to give consumers and domain consumers a smooth and pleasant data mesh experience. The health of the data mesh is determined by the domain consumer’s experience with data products. These requirements will help design and implement a data product that meets the needs of its consumers.

Table 4-2. Suggestions for data product requirements
Requirements Implementation considerations

Data products should be of high quality.

  • The data should be transformed to the standards defined by all the domains in the data mesh.

  • Bad records should be filtered out as required.

Data products must be secure and trustworthy.

  • All personal identifiable information (PII) or personal health information (PHI) should be obfuscated by either tokenization or encryption.

  • Authorization and authentication is defined as ACLs or RBACs rules.

Data products must be interoperable.

  • Data products should provide schemas that represent the final data product. Schemas serve as a contract between the producing domain and the consuming domain.

  • Data products should be in a format that supports consumers written in multiple programming languages (polyglot). Some examples are JSON, Apache Avro, and protobuf.

Data products must be easily consumable.

  • The data product must be quickly and easily accessible through simple technical means.

  • The data product must be optimized for reading and serving. Some ways to optimize data are partitioning, indexing, bucketing, and compaction.

Data products should preserve lineage.

  • The data lineage must be derived from the metadata recursively through the other domains. For example, if the data product uses another domain’s data product, the lineage from the derivative data product is preserved and can be parsed.

Data products should be easily searchable and self-describing.

  • The data product should be discoverable using a search engine and is descriptive enough for consumers to have trust in it.

Data products should provide historical data.

  • The data product should support serving historical data, not just the latest changes.

Retroactively adding data product requirements could prove to be costly in technical debt. For example, lineage is most likely a requirement that would be hard to retroactively add without the referable metadata needed to build it, so it is important to think of these complex requirements early to avoid costly technical debt.

When other domains request data products, ensure that they adhere to these requirements. Keep them in mind when you identify the sources you’ll need to compose your data products.

Warning

Some have confused data products as being mastered data. Data mastering is a process of building an undisputed master version of a record called a golden record. Master data is a feature of a data product and not necessarily a data product requirement. If your data product is required to provide mastered employee records, for example, then a proper master data management (MDM) tool will be required within the domain to perform this work.

Identifying Data Product Derivatives

Data products are derived from the data sources within a domain. They could also be enriched from data products from other domains. Remember that data products are owned by the domain experts that produced them. If the data product you are building requires enrichment from data from another domain, you’ll need to ingest that data into your own domain to enrich your own data products. We are defining data derivatives as both data within the domain and data sourced from other domains. Identifying these derivatives and understanding their integration points will help in defining solutions to begin ingesting them into the streaming platform.

There are two types of data: at rest and in motion. We need to start ingesting data derivatives that will involve getting the data at rest to be data in motion. We also want to keep the data already in motion to stay in motion. It is important to think of optimization of data early in the ingestion process so that any downstream components can take advantage of this optimization. Start with partitioning the data in the source topics in the streaming platform. Having enough partitions will efficiently serve data products to consumers and create balanced processing in the data pipeline.

Derivatives from Other Domains

Derivatives that originate from other domains as data products need to be referable so that a full lineage picture can be generated. This could include multiple domains traversed recursively throughout the mesh. Preserving a snapshot of the current lineage of a data product will eventually get stale as the data product derivatives may have evolved in quality, scalability, and structure, as in schemas. In later chapters, we will discuss data governance and schema evolution as a centralized component in the data mesh. Techniques for preserving lineage will be discussed more in Chapter 6.

Consuming other data products from other domains and enriching your own is the true essence of working in a data mesh. The experience will involve requesting access to the data product and then subscribing to the real-time stream of the data product. After this is done, a topic in the streaming platform should appear that represents the real-time streaming data product originating from another domain. Part of this data product subscription is not only to the data but also to the metadata. It is this metadata that will enable lineage that spans multiple domains.

Ingesting Data Product Derivatives with Kafka Connect

After identifying the data product derivatives for our streaming data products, we need to get them into a streaming platform. The simplest way to get data into or out of a Kafka-compliant streaming platform is to leverage Kafka connectors. (Other platforms such as Spark or Flink also have their own connectors.) Kafka Connect is a framework that enables implementation for reading data from a data source and into a streaming platform. Conversely, it also enables implementation for writing data to a data sink from the streaming platform. See Figure 4-1.

Kafka Connect
Figure 4-1. A data pipeline that illustrates a Kafka source connector writing to Kafka, and a Kafka sink connector reading from Kafka

Kafka Connect also provides low-code experience, which means that no coding is required for domain engineers. We will use Kafka Connect as our way of ingesting source data into the streaming platform. There are other ways of ingesting data into a streaming platform, but not many support the change data capture (CDC) connectors that we need. We will talk about CDC in “Debezium Connectors”.

The Kafka Connect open source framework allows for simple ingress and egress of data in a scalable way. It’s one of the reasons we’ve chosen this solution to discuss. Connectors are able to connect to specific data sources as well as a Kafka-compliant streaming platform to stream data, such as Redpanda. Other products, such as Apache Pulsar, allow Kafka clients to produce and consume messages from Kafka into their platform.

Kafka connectors do not run by themselves. They run in a Kafka Connect cluster that enables them to be distributable and highly available. Each node in the connect cluster is called a connect worker. Every connector contains a configuration property called tasks.max. Tasks are the main processes that stream data in a connector. When configured with multiple tasks, the connect cluster is able to distribute them across its workers to run in parallel, enabling scalability. If one of the connect workers in the cluster were to suffer an outage, the data is redistributed across the remaining workers (see Figure 4-2). The task.max property defines the maximum number of tasks that should be created for this connector in a connect cluster. The connector may create fewer tasks if it cannot achieve this level of parallelism.

Kafka Connect Tasks
Figure 4-2. A single source connector whose three tasks are distributed across three connect workers in a Kafka Connect cluster; connect workers can be deployed into separate machines for high availability and scalability

Using Kafka Connect and the many prebuilt connectors makes building the streaming data mesh a lot faster and relinquishes the development and support to third-party vendors. Connectors for popular databases and cloud data stores have already been built. Many of them are developed by the vendors themselves and come with support plans. If a connector is not available, the centralized data mesh team should have the skill set to build one and provide it to the domains in the data mesh for use.

Another reason Kafka Connect is a good solution is that it standardizes ingestion, simplifying connector deployment and management. The Kafka Connect cluster provides a RESTful interface and can integrate easily with a CI/CD pipeline like Jenkins, Kubernetes, Ansible, or Terraform.

Kafka Connect has some transformation capabilities called single message transforms (SMTs), but these are limited to simple transformations. That means no joins or aggregations. Data transformation is best consolidated in the streaming processes where both simple and complex transformations are supported. Transformations will be covered in “Transforming Data Derivatives to Data Products”.

Tip

It is best practice to perform transformations in a stream processor that can hold state and not in the connectors that cannot hold state. Also, capturing transformations in a connector to preserve lineage could be difficult, while stream processors that build directed acyclic graphs (DAGs) representing the processing workflow can serialize their DAGs for lineage preservation.

When ingesting the data product derivatives, we need to think early about the eventual streaming data product and how to ensure its ease of consumption. Also keep in mind that whether the data will be ingested asynchronously or synchronously is a factor that will impact how the domain will consume and use your streaming data product.

Consumability

Consumability is a very important requirement because it will directly affect the experience domain consumers will have in a streaming data mesh. If other domains cannot easily consume streaming data products, then they may opt out of the streaming data mesh and decide to build their own integrations by hand, bypassing any issues they encounter with the data mesh. Some factors to consider when ingesting data derivatives that will affect the consumability of other domains are as follows:

  • Lack of scalability

  • Lack of interoperability

Scalability

When thinking about ingesting data derivatives into the streaming platform, it is important to know the scale in which you will need to ingest the data. In the streaming platform, you will need to make sure that the number of partitions can support the throughput (or the rate) the data is expected to be streamed. The more partitions in a Kafka topic, the faster it can stream the data. Partitions are how Kafka enables parallelism. In other streaming platforms, you will need to configure their topic equivalents similarly.

A rough formula proposed by Jun Rao (an original developer of Apache Kafka) for determining partition count is based on throughput. This formula gets partition count by finding the max between these two values:

  • The desired throughput (t) divided by the throughput of producing data to the partition you can achieve (p)

  • The desired throughput (t) divided by the throughput consuming data from the partition you can achieve (c)

The formula is as follows:

  • max(t/p, t/c)

The following example shows 3 MBps (megabytes per second) desired throughput. The producer can produce 1 MBps. Let’s assume 3 consumers want to subscribe to the data, which means 3 MBps at 1 MBps each. The result is 3 partitions. In this example, the count is actually low for most Kafka use cases. You may want to prepare for future increases in throughput by increasing this count to 5 or 6:

  • max(3 MBps/1 MBps, 3 MBps/3 MBps) = max(3, 1)= 3 partitions

Other factors can help with getting the throughput desired but are beyond the scope of this book.

After determining the number of partitions, understanding how to distribute the data evenly throughout all the partitions is important to achieve balanced parallelism. In Apache Kafka, data is sent as a key and value pair. The key is hashed to determine which partition it will be assigned to. This formula illustrates how this works:

  • key % p = partition assignment

This hashing algorithm works well when the key is evenly distributed across all records in the data sent to the Kafka topic. In other words, the key should have high cardinality and even distribution in the entire data set. High cardinality and even distribution creates good data balance among all the topic partitions. Low cardinality or an uneven distribution creates imbalanced parallelism. Imbalanced parallelism at ingestion creates imbalance for all downstream components in the data pipeline. In the Kafka topics, this manifests as hot spotting, where only one or a few partitions are doing most of the work, causing your entire data pipeline to run slowly. It would be beneficial to profile your data to get a sense of your cardinality and distribution of the keys in the data. Defining keys with high cardinality and even distribution is an important step in distributed processing because most distributed systems distribute workload to its workers by key.

Another way of improving scalability is by using a proper data serialization format, which we’ll discuss next.

Interoperability and data serialization

Keeping scalability and interoperability in mind is critical as we start to talk about how to actually ingest the data into a streaming platform. Interoperability is the ability to exchange information or work seamlessly with other systems. In the case of a streaming data mesh, this can be accomplished by creating schemas that define the domain objects or models and choosing a proper data serialization format. The schemas will help domains in the streaming data mesh easily exchange information and work seamlessly with other domains, and the data serialization format will allow information exchange between systems that normally are incompatible with each other.

Interoperability and data serialization are data mesh requirements that fall under the data governance pillar, discussed in Chapter 5, so we’ll go into more detail there. But it’s critical that we start thinking about them at data ingestion because this affects all the systems and data pipeline components downstream. First, we need to define the schema that represents the data derivative. A schema is basically a definition of how the data will be structured when it moves through the streaming data pipeline. For example, let’s say you’re ingesting COVID-19 global statistics. The shape of your data may look like Table 4-3, which provides only two countries. (Note that Table 4-4 is a continuation of the data in Table 4-3.)

Table 4-3. COVID-19 mock data in a table format
Country CountryCode Date ID

United States of America

USA

2022-04-07T19:47:08.275Z

2291245c-5bf8-460f-af77-05ce78cc60c9

Philippines

PH

2022-04-07T19:47:08.275Z

82aa04f7-05a1-4caa-b309-0e7cfbfae5ea

Table 4-4. COVID-19 mock data in a table format (continued)
NewConfirmed NewDeaths NewRecovered TotalConfirmed TotalDeaths TotalRecovered

40176

1241

0

80248986

983817

0

0

0

0

3680244

59422

0

The COVID-19 data source we are reading is served as JSON records, so we would create the data serialization format as JSON. In Example 4-1 we show a JSON schema definition that matches the structure of Table 4-3.

Example 4-1. JSON schema that defines the table structure in Table 4-3
{
  "$schema": "http://json-schema.org/draft-04/schema#",
  "type": "object",
  "properties": { 1
    "ID": {
      "type": "string"
    },
    "Country": {
      "type": "string"
    },
    "CountryCode": {
      "type": "string"
    },
    "NewConfirmed": {
      "type": "integer"
    },
    "TotalConfirmed": {
      "type": "integer"
    },
    "NewDeaths": {
      "type": "integer"
    },
    "TotalDeaths": {
      "type": "integer"
    },
    "NewRecovered": {
      "type": "integer"
    },
    "TotalRecovered": {
      "type": "integer"
    },
    "Date": {
      "type": "string"
    }
  },
  "required": [ 2
    "ID",
    "Country",
    "CountryCode",
    "NewConfirmed",
    "TotalConfirmed",
    "NewDeaths",
    "TotalDeaths",
    "NewRecovered",
    "TotalRecovered",
    "Date"
  ]
}
1

Lists the fields and their data types

2

Lists the fields that are required

Table 4-5 shows some data serialization formats to choose from that are suited for streaming data. Many other serialization formats exist, like Parquet and ORC, but those aren’t suited for streaming data. They are more suited for data at rest in data lakes.

Table 4-5. Data serialization formats supported in streaming
Name Maintainer Binary Human readable

JSON

Douglas Crockford

No

Yes

Apache Avro

Apache Software Foundation

Yes

Partial

Protocol Buffers (protobuf)

Google

Yes

Partial

Many data serialization formats, like Parquet and ORC, help queries run more efficiently in a data lake. Others improve performance in a service mesh (microservice intercommunication) like Avro and protobuf. JSON is probably the most used data serialization format because of its easy use and human readability.

All the options in Table 4-5 can be defined by a schema. We will talk more about how these schemas create contracts between producing and consuming domains and their roles in supporting data governance in the streaming data mesh in Chapter 5.

Synchronous Data Sources

At the beginning of this section we spoke about how domains will consume streaming data products—synchronously or asynchronously. Let’s first describe synchronous data consumption in terms of client and server, where the client is requesting data and the server serves the data.

Synchronous (also referred to as batch) data consumption means the consumer (the client) of the data follows a request-and-response relationship with the server. The client requests data from a data service, and the service quickly returns a snapshot of the result from the source.

When requesting these pages, you are requesting results using frameworks like ODBC or JDBC to connect and retrieve these batches from a data source. This approach forces you to capture snapshots of data that follow batching semantics. Each snapshot is considered a batch.

Figure 4-3 shows the client requesting the initial snapshot of the data from the database. When the client requests another snapshot, you have to subtract the second from the initial snapshot to find only the incremental changes. Or, you can overwrite the initial snapshot with the second snapshot, but you’ll lose what changed. If database changes need to trigger an action, you will need to find the incremental changes.

Paginating table
Figure 4-3. Determining incremental changes from a database snapshot

Additionally, if changes occur between client snapshots, you will miss changes that revert to its original value. In Table 4-6, the events that changed from Robert to Bob and then back to Robert again are lost.

Table 4-6. Changes lost between snapshots
Time Events

12:00

Initial snapshot

12:01

Name changed from Robert to Bob

12:03

Name changed back from Bob to Robert

12:05

Second snapshot

This example illustrates how synchronous data APIs force clients requesting data into batching semantics that could lead to some data loss, as seen with incremental snapshots. Some event-driven application systems will try to use synchronous APIs and adapt them to real-time data streams. We see this quite often because many systems don’t have support for streaming, so we are left to emulate a data stream emitting from those data sources.

Consumers of this data will need to know that the results are taken using snapshots and to expect to lose some data.

Asynchronous Data Sources and Change Data Capture

Asynchronous data sources follow a different approach: clients subscribe to changes to data instead of taking snapshots. Anytime a change is made, an entry is made in the change log, and the clients subscribed to it get notified and can react to the change. This is called change data capture (CDC).

Kafka connectors that support CDC read the database commit log and capture the changes to a database table. These changes are real-time streams of database table changes, including inserts, updates, and deletes. This means you don’t lose the changes described in Table 4-6.

Tip

If possible, ingest data derivatives by CDC and not by snapshots to ensure that all transactions are captured.

Referring back to “Data Divide”, the goal was to move data from the operational databases to the analytical plane. Those operational databases hold transactions that power the business. Losing transactions between snapshots could be a critical problem in many use cases. It is best to utilize a Kafka connector that can perform CDC from the operational database and stream it to the streaming platform.

Debezium Connectors

A set of CDC connectors called Debezium connectors capture changes in a database from a change log and send them to a streaming platform. The most commonly used connectors are as follows:

  • MySQL CDC

  • MongoDB

  • PostgreSQL

  • Oracle

  • SQL Server

  • IBM Db2

  • Apache Cassandra

  • Vitess

For non-Kafka streaming platforms, a Debezium server can be used as an alternative to running a Kafka Connect cluster. Figure 4-4 illustrates how the Debezium server can replace the Kafka Connect cluster to send to other streaming platforms. Change events can be serialized to different formats like JSON or Apache Avro, and then sent to one of a variety of messaging systems.

Debezium Server
Figure 4-4. A Debezium server replacing a Kafka Connect cluster to serve connectors to alternative streaming platforms: Redpanda and Apache Pulsar

The Debezium server can also serve other streaming platforms that do not have a commit log. Those platforms do not appear in Figure 4-4. It is important to remember that only streaming platforms that hold data in a commit log can best support Kappa architectures and ultimately a streaming data mesh, as mentioned in Chapter 2.

Thinking about consumability early will save you from complaints from domain consumers later. Making clear how the streaming data products are retrieved (synchronous or asynchronous) will help them understand what to expect.

The result of ingesting data into a streaming platform will be a topic for each data derivative. We will transform and enrich these derivatives to compose the final streaming data product.

Transforming Data Derivatives to Data Products

In the previous section, we ingested the data derivatives so that our streaming data products would be consumable, focusing on scalability and interoperability. This section will focus on transforming data derivatives to ensure that our streaming data products are of high quality and secure.

We will also suggest some easy tools for transforming data derivatives into streaming data products. These tools will leverage SQL to transform data. Using SQL is a skill set ubiquitous to many engineers, including domain engineers. It is the preferred way of enabling domain engineers to build streaming data products.

Data Standardization

It is a best practice to establish format standards when sharing data with other domains. These standards are part of a set of data governance polices enforced for all domains (we will cover data governance in Chapter 5). Standardizing data creates consistency across your domains and makes the data easy for them to use. Otherwise, all domains will need to know how to convert different formats for every domain that doesn’t follow the standard. So as part of transforming data derivatives, we will need to transform data to adhere to these standards. For example, data governance policies may require all phone numbers to be in a standard format like 1-234-567-8900. But the original source data may not provide the numbers in that format. We need to make sure all the formatting standards are applied to the data before it is published as a streaming data product.

Protecting Sensitive Information

You must also ensure that sensitive information is obfuscated by tokenizing, encrypting, or omitting it. For example, protected health information (PHI) and personally identifiable information (PII) are considered sensitive data. PHI and PII data is subject to regulatory rules like the Health Insurance Portability and Accountability Act (HIPAA) and the General Data Protection Regulation (GDPR). Regulations beyond HIPAA and GDPR are beyond the scope of this book.

The HIPAA privacy rule protects all individually identifiable health information held or transmitted by a covered entity or its business associate, in any form or media, whether electronic, paper, or oral. The privacy rule calls this information “protected health information (PHI).” Individually identifiable health information is information, including demographic data, that relates to any of the following:1

  • The individual’s past, present, or future physical or mental health or condition

  • The provision of health care to the individual

  • The past, present, or future payment for the provision of health care to the individual

GDPR requires companies across the EU to protect the privacy of, and safeguard the data they keep on, their employees, customers, and third-party vendors. Companies are now under legal obligation to keep this PII safe and secure.2

Table 4-7 shows a few methods for obfuscating sensitive data such as PHI and PII to keep within the regulations that protect them.

Table 4-7. Examples of obfuscating sensitive data
Method Purpose

Tokenization

Replaces the data with a token. Later, the token can be looked up to get the original value as long as the system looking up the token has permission to retrieve it. Often, tokenizing data maintains its original format and exposes a partial value. For example, a credit card number may show the last four digits: xxxx-xxxx-xxxx-1234

Encryption

Replaces the value with an encrypted value. The value can be decrypted with a key. A system can request to decrypt the data to get its original value as long as it has the key. The format is not preserved in this method. Here is a credit card example: 1234-5678-9012-3456 encrypted to MTIzNDEyMzQxMjM0MTIzNAo=

Filtering

This method omits the sensitive information entirely.

The SQL language can be extended with user-defined functions to allow for these obfuscation methods, which we will talk more about in the next section.

SQL

As mentioned before, SQL is the language of choice for streaming data product transformations because it’s accessible to many domain engineers. Therefore, we will have to choose a streaming data processing platform that supports SQL. There are not many streaming data processing engines at the time of this writing. We will go over two options: a SaaS stream processor and ksqlDB.

SaaS stream processor

A SaaS stream processor is a cloud SaaS product that uses SQL to transform data consumed from a streaming platform like Kafka. They tend to be implemented on Apache Flink (which offers truly native streaming) as opposed to Apache Spark structured streaming (which offers low-latency microbatches that only emulate streaming). Apache Flink processes events in real time with lower latency compared to Apache Spark structured streaming.

Apache Flink is not seen in the user interfaces. A SQL interface is shown to the user instead. Data product engineers can consume from a streaming platform like Kafka, perform stateful transformations, and then write the output to a sink or another streaming platform, as shown in Figure 4-5.

Flink’s model for stream processing includes a component called connectors that acts as data sources and sinks (similar to Kafka connectors). The component called stream represents streaming data that holds streams of data. Lastly, a component called pipeline can join and aggregate streams using SQL to create new streams. Altogether, these components create a simple and easy-to-use data processing tool to build streaming data pipelines.

Debezium Server
Figure 4-5. SaaS stream processor joining/enriching data derivatives, one of which is a data product from another domain, and publishing back to the mesh

Apache Flink also enables domains to replicate data from the producing domain into their own, ultimately building the mesh, as shown in Figure 4-6.

Debezium Server
Figure 4-6. Apache Flink replicating data from the producing to the consuming domain

Each stream in Flink represents streaming data that can be treated as a new streaming data product and consumed by many other domains.

A great advantage with Flink is that it can consume from many Kafka clusters. It can also consume from alternative streaming platforms, like Redpanda and Apache Pulsar, and join them in a single streaming pipeline. In addition, it can mix streaming platforms (such as RabbitMQ or Kinesis) that do not utilize a commit log, thus providing a fully agnostic stream processing solution, as shown in Figure 4-7.

Apache Flink can also provide self-services to easily provision and author streaming data pipelines. It removes the tasks of provisioning infrastructure and writing code that require specialized skills.

Agnostic
Figure 4-7. A streaming pipeline implemented as SQL that can join multiple streaming platforms, even ones that do not utilize a commit log, to enable event sourcing

ksqlDB

ksqlDB is another stream processing tool that provides a SQL interface for building data pipelines on top of Kafka. ksqlDB’s SQL-like language abstracts away the actual implementation of Kafka Streams underneath. ksqlDB creates table-like structures from Kafka topics. It can join and aggregate data between Kafka topics and provide Kappa architecture capabilities.

ksqlDB follows the highly recognized SQL-92 standard, which is the third revision of the standard. This is defined by the American National Standards Institute (ANSI), which is responsible for maintaining this standard for SQL specifications.

A ksqlDB deployment is limited to a single Kafka cluster. It cannot combine multiple streaming platforms. It does provide a way to provision connectors. It allows domain engineers to stay entirely within a single tool to build streaming data pipelines.

Provisioning connectors in ksqlDB

ksqlDB not only executes stateful transformations, but also has the unique ability to create Kafka connectors to ingress and egress data. Domain engineers no longer have to exit a tool to import and export data. From a single ksqlDB command-line interface (CLI), domain engineers can build an entire data pipeline from sources to published data products completely within the Kappa architecture pattern.

Example 4-2 shows how to create a source Kafka connector. In this example, a Debezium CDC source connector will be connecting to a MySQL database. The connector will read all operations in the order in which they are committed to the database: inserts, updates, and deletes.

Example 4-2. ksqlDB statement that creates a Kafka source connector to bring data into Kafka; a stream/table could then be created from the resulting topic
/* creates a connector that reads from MySQL */
CREATE SOURCE CONNECTOR app_data WITH ( 1
    'connector.class': 'io.debezium.connector.mysql.MySqlConnector', 2
    'tasks.max': '1',  3
    'database.hostname': 'mysql',  4
    'database.port': '3306',
    'database.user': 'debezium',
    'database.password': 'dbz',
    'database.server.id': '184054',
    'database.server.name': 'dbserver1',
    'database.include.list': 'inventory',
    'database.history.kafka.bootstrap.servers': 'kafka:9092',  5
    'database.history.kafka.topic': 'schema-changes.inventory'
);
1

SOURCE keyword for source connectors.

2

io.debezium.connector.mysql.MySqlConnector is the class name of the connector. If building your own connector, its class name will go here.

3

The number of tasks that the connector will create.

4

The hostname of the database.

5

The Kafka bootstrap server.

Similarly, in Example 4-3, you can create a sink connector to take data out of the streaming platform. In this case, the statement creates a sink connector taking data from a Kafka topic and writing it into an Amazon S3 sink.

Example 4-3. ksqlDB statement creating a Kafka sink connector that reads from a topic and writes to a destination
/* creates a connector that writes to a data lake */
CREATE SINK CONNECTOR training WITH ( 1
    'connector.class': 'S3_SINK', 2
    'tasks.max': '1',
    'aws.access.key.id': '$AWS_ACCESS_KEY_ID', 3
    'aws.secret.access.key': '$AWS_SECRET_ACCESS_KEY', 4
    's3.bucket.name': '$S3_BUCKET', 5
    'time.interval' : 'HOURLY', 6
    'data.format': 'BYTES',
    'topics': '$KAFKA_TOPIC_NAME_OUT2'
);
1

SINK keyword for sink connectors.

2

S3_SINK is an alias to the class name of the S3 sink connector.

3

The AWS access key.

4

The AWS secret.

5

The S3 bucket.

6

Sets how your messages are grouped in the S3 bucket. Valid entries are DAILY or HOURLY.

You can define transformations between the source and sink statements to create a streaming data pipeline. You can also save the SQL statements in a file to be executed at once. This can be used to promote your streaming data pipeline from development, to staging, then finally to production.

User-defined functions in ksqlDB

In ksqlDB, you are limited to the SQL language when defining streaming data transformations. It does not have the ability to represent complex logic that imperative programming languages can, like C++, Java, Python, etc. To perform more complex logic, a user-defined function (UDF) could be written in Java to hold the complex logic that could not be represented in SQL. That UDF could then be called in ksqlDB without breaking the SQL grammar that ksqlDB uses.

Example 4-4 shows an example of a ksqlDB UDF that multiplies two numbers together. The annotations applied to the Java source enable ksqlDB class loaders to register this UDF as an available function to use.

Example 4-4. A ksqlDB user-defined function that is loaded by ksqlDB and can be used as a function in a ksqlDB statement
package com.example;

import io.confluent.ksql.function.udf.Udf;
import io.confluent.ksql.function.udf.UdfDescription;
import io.confluent.ksql.function.udf.UdfParameter;

import java.util.Map;

@UdfDescription(name = "Mul", 1
                author = "example user",
                version = "1.0.2",
                description = "Multiplies 2 numbers together")
public class MulUdf {

    @Udf(description = "Multiplies 2 integers together.") 2
    public long formula(@UdfParameter int v1, @UdfParameter int v2) { 3
        return (v1 * v2);
    }

    @Udf(description = "Multiplies 2 doubles together")
    public long formula(@UdfParameter double v1, @UdfParameter double v2) {
        return ((int) (Math.ceil(v1) * Math.ceil(v2)));
    }

}
1

@UdfDescription provides a description of the class.

2

@Udf identifies the UDF.

3

@UdfParameter identifies the parameters in the function.

This function permits function overloading to allow for multiple data types. Example 4-5 shows how this can be done.

Example 4-5. An example of how to use the UDF in a ksqlDB statement
select formula(col1, col2) as product 1
from mytable
emit changes; 2
1

Invocation of the formula function.

2

Specifies a push query with a continuous output. Push queries are unbounded, or in other words, they do not end because results are continuously being pushed to the consumer. Conversely, pull queries are bounded or eventually end.

Note

Authoring UDFs in Java may not be a skill accessible to domain engineers. In this case, the centralized data mesh team, which has the necessary skills, should code the UDFs. The data engineers who previously developed and maintained the data pipelines in the data lake will now be part of the central data mesh team.

We covered two solutions for transforming data derivatives in this section. Both used SQL to do these transformations. We transform data to ensure that our streaming data products are of high quality and trustable. Another reason for transforming data is to add more information to the data product so that it can be more useful to domain consumers. In the next section we will talk about how to enrich data using data warehousing concepts.

Extract, Transform, and Load

Now, let’s talk about transforming data in an extract, transform, and load (ETL) process. Traditionally, this process extracts data from a data source, transforms it to a format that can be consumed by its consumers, and then loads it into a system where these consumers can read it (such as a data lake or data warehouse). ETL is a pattern typically used to collate data from multiple disparate systems into a single centralized system like a data warehouse, as shown in Figure 4-8.

ETL Warehouse
Figure 4-8. An ETL data pipeline that extracts data from multiple sources, writes it into a data lake for staging and transformation, and then loads it into a data warehouse for consumption

The staging area in Figure 4-8 is where transformations are performed. The staging area will typically be a data lake because it can hold a large amount of data. The transformation is executed by massively parallel processing (MPP) applications as well as batch jobs, which then send the data to a data warehouse.

The operational database is the source of information for the data warehouse. It includes detailed information used to run the day-to-day operations of the business. Data frequently changes in the operational database as updates are made to reflect the current value of the last business transactions. To accomplish this, online transaction processing (OLTP) databases are used to manage the changing nature of data in real time.

Data warehouse systems serve as data stores that empower users and decision makers to make business decisions. These systems can organize and present information in specific formats to accommodate the diverse needs of various users. These systems are often referred to as online analytical processing (OLAP) systems.

Operational databases and data warehouses are both relational databases, but each serves different purposes. Operational database systems handle both operational and transactional data. Operational data consists of data contained in the operations of a particular system or business unit. For example, in retail or e-tail, an operational database handles discrete purchases of items in real time, as well as keeps track of item inventory. A data warehouse, on the other hand, holds the historical record of transactions that occur over large amounts of time. For instance, if a large online retailer wants to track the performance of a particular brand or item over the last 10 years, this can be accomplished by querying the data warehouse. This may provide useful information such as performance of ad campaigns, show seasonal trends in buying behavior, or even help to better understand overall brand market share.

The major difference between an operational database and a data warehouse is that, while an operational database is volatile and subject to constant updating, the data warehouse is a historical snapshot of the most current state of any transaction.

Not all data within a data warehouse changes at the same rate. Keeping with the current retail example, a customer’s total basket may change many times, and the state of their shopping cart will change with every addition and removal of an item until the customer decides to check out and finalize a purchase. Product characteristics, such as brand, size, color, flavor, or weight, change much more slowly than a set of sales transactions. Information about the customer, such as location, age, interests, along with other demographic and firmographic data, may not change at all or may change only when the customer informs us of such a change or when we receive a feed of matched demographics from a third-party source.

Maintaining data warehouse concepts

Even though a data mesh architecture is intended to decompose and decentralize the analytical plane like the data lake or warehouse, the concepts that make these systems successful should not be lost or compromised. The concept of a star schema (a model that resembles a star by separating fact from dimensional data), the way transformations are defined, and the structure of the data make the data model easy to understand and implement. These same concepts can be used outside the data warehouse to help design streaming ETL data pipelines and provide more usable data products.

As we introduced in Chapter 1, data in an enterprise is divided between operational and analytical planes. Domains that reside in the analytical plane differ greatly from domains that reside in the operational plane. Data that is published as data products, sourced from the operational domain, is typically immutable timestamped snapshots of operational data over time. Historically, changes to data products in the analytical plane evolve more slowly than their operational counterparts. Because of this, a data domain in the analytical plane is responsible for efficiently serving and providing data access to large bodies of data to consumers. Data domains provide a view of data to the outside world—views that are published standards for data access. Behind-the-scenes processes, such as ETL, that are used to create the domain are not exposed to downstream consumers.

Data warehousing basics

In today’s business climate, organizations need to have reliable reporting and analysis of large amounts of data. Data needs to be consolidated and integrated for different levels of aggregation for various reporting purposes without impacting the organization’s operational systems. The data warehouse makes this possible by creating a repository of an organization’s electronically stored data, extracted from the operational systems, and making it available for ad hoc queries and scheduled reporting through ETL processes.

Many approaches to building a data warehouse exist, each with its own set of pros and cons. This book focuses on the star schema approach to data warehousing and its applications to data mesh and streaming data mesh. While we are aware of other data warehouse approaches and their potential application to data mesh, such as Data Vault 2.0, this book does not offer specifics for those approaches.

The star schema is the simplest form of a dimensional model used in business intelligence and data warehousing. The star schema consists of one or more fact tables that reference any number of dimension tables. As its name indicates, the star schema’s physical model resembles a star shape, with a fact table at its center and dimension tables surrounding it, representing the points of a star.

A fact table contains all the primary keys of each dimension, and facts or measures associated with those dimensions. Dimension tables provide descriptive information for all measurements recorded in the fact table. As dimensions are informational and change much more slowly than fact tables, dimensions are relatively small compared to the fact table. Commonly used dimensions are people, products, place (or geography), and most importantly, time (see Figure 4-9).

Separating fact and dimensional data is extremely important in order to scale a data warehouse. This separation allows attributes about facts to change over time without the need to re-key the entire fact table. Suppose, for instance, we are tracking sales of a product, and the owner of a brand changes over time. In a single table model (models that flatten fact and dimensional data into a single table), expensive updates or truncating/loading would be required to update the entire table to reflect the proper brand owner. In a dimensional model, only the brand owner attribute of the product dimension needs to change. This change is then reflected to downstream reporting and analytic applications without much fanfare.

Simple Star Schema
Figure 4-9. A simple star schema without slowly changing dimensions

As operational data is transformed into the star schema, careful consideration must be taken into account when choosing what is fact versus dimensional data. Design considerations of the database will have direct impact on ETL that feeds the data warehouse. For instance, data can be added to a warehouse at the transaction level, or it can be imported, transformed, and inserted in batch. Any of these approaches require ETL and proper denormalizing to fulfill data warehouse requirements—the ability to query data quickly and fulfill data requests.

Dimensional versus fact data in a streaming context

Fact and dimensional data both change state over time, but at different rates. To fully understand a customer’s behavior during an online shopping instance, the data warehouse must be able to track each interaction a customer has with products, perhaps even keeping track of what the customer added to their basket, what they removed, and in what order. For a traditional data warehouse system, this poses a challenge, since each batch insert of data to the warehouse accounts for only the point in time when a snapshot was taken. But, with the advent of the Kappa architecture and the use of tiered storage, the complexities of providing real-time actionable data become simplified and achievable. Insights can now be delivered at near real time, and the data warehouse can take advantage of this in terms of data ingestion.

Dimensional data, as stated before, also changes over time. This is an important topic that often gets overlooked when building a star schema data model. These so-called slowly changing dimensions also require attention to detail, because for accurate analytics, it is important to be able to view customers, products, and locations as they were known at the time of a transaction or set of transactions. Understanding the attributes of a particular product today is important, but it is also important to understand what that product was six months or even a year ago. Understanding a customer’s demographic profile at time of purchase is important to understanding and predicting how other customers may also behave.

Slowly changing dimensions, too, have suffered from the historical type of data warehousing ingestion that we saw in the past. Kappa architecture again simplifies the slowly changing dimension definition. Rather than materializing point-in-time information about dimensions on certain intervals, understanding what a dimension looks like at a particular point in time becomes a matter of looking at a point in time in a stream and determining its characteristics. To take this concept even further, in a streaming data mesh, the dimensions of a data model become data products, along with the fact data. This allows the data product engineer to publish a standardized interface to dimensional data that allows for point-in-time lookups. Rather than creating views based on some sort of SCD Type-6 setup, and querying this view, business logic to create point-in-time lookups now becomes encapsulated in the data product itself.

Materialized views in streams

In its simplest terms, materialized views are preprocessed query results stored on disk. The idea is that the preprocessing of the query is always running, so that at any time a user can query the materialized view and expect to get the latest results. Conversely, a traditional view is a query that is not preprocessed and is run at the time the view is queried. The results of a traditional view are not stored on disk.

Both materialized and traditional views will return the same result, except the materialized view will run faster since the results are already precomputed, while the traditional view needs to process the view first before returning the result. Since the preprocessing in a materialized view is happening in the background, it has asynchronous characteristics. Conversely, since a traditional view processes the query only on request and responds with the result, it has synchronous characteristics.

Let’s take these concepts further. In “Ingesting Data Product Derivatives with Kafka Connect”, we discussed the differences between synchronous and asynchronous data sources. The primary difference is that synchronous data sources follow batching semantics, while asynchronous data sources follow streaming semantics.

This description of materialized views was explained in the context of a single database. Materialized views don’t actually exist in only a single database. The materialized view semantics of preprocessing data exists when replicating data from an active instance of a database to a passive instance, as seen in Figure 4-10.

In this illustration of a disaster recovery solution, the application uses the “active” database and fails over to the “passive” database in case the active database crashes. The data passes over the write-ahead log (WAL) and “materializes” in the passive database. Every transaction that occurred in the active database is recorded in the WAL and is executed in the passive database in the background. This replication of data therefore happens asynchronously and is an example of a materialized view involving two databases.

Database Replication
Figure 4-10. Database replication using a write-ahead log creating a materialized view in the passive database

The Debezium connector that we previously discussed actually reads the WAL of the databases it supports to capture changes, but instead of sending it to another database of the same instance, it sends it to Kafka (see Figure 4-11).

Database Replication
Figure 4-11. Database replication using a WAL to populate Kafka with CDC transactions

From this point, you can build multiple materialized views. As in the case in Figure 4-12, you can create a materialized view in ksqlDB or in another passive database using Flink.

Multiple materialized views.
Figure 4-12. Building multiple materialized views from a single Debezium CDC connector reading from the WAL

CDC use cases are really used for models or entities that are the outcomes of DDD. These entities do not change often, so they change slowly. In ETL pipelines, these entities are the dimensional data used to enrich fact data.

Streaming ETL with domain-driven design

Let’s now tie this information back to streaming ETL. In summary, dimensional data is sourced by materialized views, which are sourced by CDC streams, which are sourced from the WALs from the original source databases. We now have a fully streaming data pipeline for dimensional data that will be used to enrich fact data. It also is our solution to a fully streaming ETL, where both the dimensional data and fact data are backed by streams.

This is the goal we want: to enable streaming ETL in all domains. To accomplish this, we need two types of data products: dimensional and fact. If we were to make them streaming data products, they would be different types of streaming data products—a CDC stream for dimensional data and an append-only stream for fact data. CDC streams contain only changes captured from WALs in an operational (transactional) source database, and append-only streams contain fact data.

In DDD, the domain model defines the entities, their interrelationships, and their interactions based on business logic. Fact data is these interaction events between entities bound with time and state. Dimensional data is the create, update, and delete events related to entities and their interrelationships.

For example, a visitor to a website doesn’t change their name often, so this is a slowly changing dimension. But they may be clicking many things around the site—adding and removing items in their cart, for example. This is fact data that arrives fast and is associated with time. Joining the fact data with the dimensional data is an ETL process that enriches fact data with dimensional data so that analysts can know who clicked, and they can infer why, when, and where the user clicked to improve their experience. Model training requires capturing dimensional state not from dimensional tables, but from enriched fact data (enriched from dimensional) so that it can capture the current state of a dimension along with time with the click event.

Publishing Data Products with AsyncAPI

We’ve defined the streaming data product requirements based on the requirements provided by other domains. We also identified the data product derivatives (and subscribed to other domain products if necessary). We then extracted and transformed the data to create a new data product. At this point the data product is ready for publishing into the data mesh. We will further use AsyncAPI to define the consumption point of the data product content.

AsyncAPI is an open source project created to simplify and standardize definitions of streaming data sources. AsyncAPI is an interface definition language (IDL) that lets applications written in one language interface with applications written in other languages. In this case, AsyncAPI is an IDL that defines an asynchronous API. It allows other applications to create integrations with the streaming data product agnostic to any programming language. The process of publishing a data product will involve creating an AsyncAPI YAML document and registering it to the streaming data mesh.

Registering the Streaming Data Product

AsyncAPI documents are written in YAML, a machine-readable document format that can easily be edited by a domain engineer so it is also somewhat human-readable as well. When we register a data product, we will create an AsyncAPI YAML document and register it with a streaming data catalog, which we will talk more about in Chapter 5. For now, the streaming data catalog will hold all the data products in the streaming data mesh so that data product shoppers have a single place to search for streaming data products and subscribe to them.

AsyncAPI extends OpenAPI, which is formally known as Swagger (see Figure 4-13 for details). OpenAPI is also a YAML-based IDL that describes synchronous APIs. Today, synchronous APIs are registered in API gateways, like Kong and Apigee, where API shoppers can browse and search for specific APIs based on their use cases. The goal of AsyncAPI is to apply that simple approach to asynchronous data sources as well. AsyncAPI provides us a simple way to enable self-services needed to create a good experience for all users/consumers of the streaming data mesh.

The AsyncAPI YAML document will enable us to specifically define how applications can consume the streaming data product so that we can build self-services to seamlessly create integrations between domains. This ultimately builds the mesh in the streaming data mesh. The AsyncAPI YAML document will also enable us to perform searches in the streaming data catalog, which we will cover in Chapter 5.

The AsyncAPI YAML documents are parsed by applications to generate client consumer code for domains in any programming language. These applications can do other things like generate HTML pages that can be served by a streaming data catalog. We will demonstrate this in Chapter 5. In Chapter 6, we will show how an AsyncAPI YAML service can invoke a REST API that will provision a Kafka connector to read from the Kafka topic and write to Amazon S3.

OpenAPI and AsyncAPI
Figure 4-13. Differences between OpenAPI and AsyncAPI

Building an AsyncAPI YAML Document

Creating the AsyncAPI YAML document is the last step in publishing a data product into a streaming data mesh. Example 4-6 shows a skeleton AsyncAPI YAML document; we’ve removed the details. We will populate this YAML document with the metadata needed to define a streaming data product to be published to the data mesh. In YAML, all the fields are called objects. For instance, in Example 4-6, asyncapi, externalDocs, info, etc., are all considered objects, as well as the subobjects like covid, messages, schemas, etc. We will refer to them as objects when speaking in the context of YAML.

Example 4-6. An AsyncAPI YAML document skeleton
asyncapi: '2.2.0'
externalDocs:
info:
tags:
servers:
defaultContentType:
channels:
  covid:
components:
  messages:
    covidapi:
  schemas:
    covidapi:
  securitySchemes:
    user-password:
      type: userPassword
  messageTraits:
  operationTraits:
Note

The AsyncAPI example uses Confluent Cloud as the streaming platform. Confluent Cloud provides a fully managed Apache Kafka and Schema Registry, which we will talk more about in later sections of this chapter.

Objects asyncapi, externalDocs, info, and tags

Let’s now build an AsyncAPI YAML document defining a streaming data product that provides COVID-19 global statistics. In Example 4-7 we populate all the descriptive information for the data product. This covers the top four sections of the YAML in Example 4-6.

Example 4-7. AsyncAPI YAML informational sections
asyncapi: '2.2.0' 1
externalDocs: 2
  description: The source of the COVID-19 global statistics that is provided
  as a real-time data stream.
  url: https://covid19api.com/
info: 3
  title: COVID-19 Global Statistics AsyncAPI Spec
  version: '0.0.1'
  description: |
    This AsyncAPI provides pub/sub information for clients to pub/sub COVID
    data to Kafka

  license:
    name: Apache 2.0
    url: https://www.apache.org/licenses/LICENSE-2.0
  contact:
    name: API Support
    url: http://www.asyncapi.com/support
    email: info@asyncapi.io
  x-twitter: '@AsyncAPISpec'

tags: 4
  - name: root-tag1
    externalDocs:
      description: External docs description 1
      url: https://www.asyncapi.com/
  - name: root-tag2
    description: Description 2
    externalDocs:
      url: "https://www.asyncapi.com/"
  - name: root-tag3
  - name: root-tag4
    description: Description 4
  - name: root-tag5
    externalDocs:
      url: "https://www.asyncapi.com/"
1

The AsyncAPI version 2.2.0.

2

externalDocs provides a description of the data product and a URL where users can find more information on the data product.

3

The info section of the YAML provides additional details on the data product, including version and license information.

4

The optional tags section places hashtags that may relate to the data product.

Notice that in Example 4-7 we could place URLs in several places for users shopping to do further research on the data products, so it’s beneficial to add them to provide all the information necessary to understand it.

Tags in this section could be used for relating data products together. This will become more useful when we start talking about knowledge graphs in Chapter 5. Knowledge graphs enable building semantic multidimensional relationships between data and metadata, making data more valuable to users.

Servers and security section

In Example 4-8 we add an important block to the YAML that provides connectivity and security information. In this case, the data product is published in a Kafka topic. AsyncAPI supports any type of streaming platform, so it needs to be configured with a protocol for parsers to understand how to build the integration between the streaming data product and the component that will subscribe to it.

Example 4-8. An AsyncAPI defining a data product for the data mesh
servers: 1
  kafka-aws-useast2: 2
    url: https://kafka.us-east-2.aws.confluent.cloud:9092 3
    protocol: kafka 4
    description: Kafka cluster Confluent cloud AWS US-EAST-2
    security:
      - user-password: [] 5

defaultContentType: application/json 6
1

List of servers that define the connection to the data product.

2

kafka-aws-useast2 is a custom property that identifies a specific Kafka server and its connection and security requirements.

3

The URL to use to connect to the streaming platform—in this case, Apache Kafka.

4

protocol identifies the type of streaming platform the data product is being served. This field informs the application reading this YAML to include specific libraries to enable connectivity to the streaming platform—in this case, Apache Kafka.

5

user-password references the securitySchema that informs the application reading this YAML that the security mechanism to use is SASL_SSL, which ensures that the communication is encrypted and authenticated using SASL/PLAIN.

6

The defaultContentType property informs the reading application that the content of the data product is JSON. Alternatives types could be Apache Avro or protobuf.

In Example 4-8, we have multiple options for security. Some of these are as follows:

  • User and password

  • Certificates

  • API keys

  • OAuth 2

  • OpenID

The security section can contain multiple security options, but in this case there is only user-password. Most security configurations make use of user/password or certificates. In AsyncAPI, security extends OpenAPI to add other security mechanisms like OAuth 2 and OpenID that are supported by streaming platforms. We will not go over each implementation in detail because it is beyond the scope of this book (it would be a whole other book). For the purpose of this book, we will use user-password as the security mechanism. Later in this chapter we will show how we provide details for this security configuration.

Channels and topic section

Example 4-9 shows the channels block in the AsyncAPI, where a lot of the details of the streaming data product reside. Under channels is another level labeled covid that corresponds to the topic name in Apache Kafka, the topic from which the streaming data product is served. In this case, the streaming data product is again global COVID-19 statistics.

Example 4-9. channels section
channels:
  covid: # topic name 1
    x-confluent-cloud-security: 2
      $ref: '#/components/securitySchemes/user-password'
    description: Publishes/Subscribes to the COVID topic for new statistics.

    subscribe: 3
      summary: Subscribe to global COVID-19 Statistics.
      description: |
        The schema that this service follows the https://api.covid19api.com/
      operationId: receiveNewCovidInfo
      tags: 4
        - name: covid19api
          externalDocs:
            description: COVID-19 API
            url: https://api.covid19api.com/
        - name: covid
          description: covid service
      traits: 5
        - $ref: '#/components/operationTraits/covid'
      message: 6
        $ref: '#/components/messages/covidapi'
1

The name of the channel that corresponds to the name of the Kafka topic.

2

The security implementation. $ref property is a reference to another part of the AsyncAPI that defines the security implementation in more detail, covered in “Security schemes section”.

3

Indicates how the client will subscribe to the covid topic. This section is for subscribers, not producers.

4

tags allows for more relationships to be built in the knowledge graph in the streaming data catalog. We provide more metadata about the streaming data product and improved searchability.

5

traits provides more information for the client to configure itself. In this case, AsyncAPI is referencing another part of the document that will provide more information on how the client subscriber/consumer will need to configure itself. We will go over these details in “Traits section”.

6

message is yet another $ref that references the schema of the streaming data product. The reference points to another part of the AsyncAPI document that will give details on how the message is structured so that the client consumer can parse and process it.

The channels section of the AsyncAPI can provide both a subscribe and a publish section. We omitted the publish section because since this AsyncAPI document is meant to describe streaming data products, other domains should not have the information to produce to the Apache Kafka topic. This AsyncAPI should have only subscribers that are the other domains in the streaming data mesh.

Components section

The components object contains five subsections that hold reusable objects for different parts of the AsyncAPI specification (see Example 4-10). All objects defined within the components object will have no effect on the API unless they are explicitly referenced from properties outside the components object. In the previous sections, the AsyncAPI examples referenced many of the subsections in the components object. Let’s go over each of the subsections in detail.

Example 4-10. components holds the messages and schemas details
components:
  messages:
  schemas:
  securitySchemes:
  messageTraits:
  operationTraits:

Messages section

You may recall from Example 4-9 that the channel/covid/message section of the AsyncAPI document referenced a message object that was components/messages/covidapi. That schema is defined under the components section of the AsyncAPI document. The components section contains two subsections: messages and schemas. The messages section describes the envelope of the payload (see Example 4-11), and schemas describes the payload itself.

Example 4-11. messages describes the envelope that holds the payload
components:
  messages:
    covidapi: 1
      name: covidapi
      title: covid api
      summary: covidapi from https://api.covid19api.com/
      correlationId: 2
        description: |
          You can correlate / join with other data using the
          CountryCode field.
        location: $message.payload#/CountryCode
      tags: 3
        - name: message-tag1
          externalDocs:
            description: External docs description 1
            url: https://www.asyncapi.com/
        - name: message-tag2
          description: Description 2
          externalDocs:
            url: "https://www.asyncapi.com/"
      headers: 4
        type: object
        properties:
          my-custom-app-header:
            type: string
          correlationId:
            type: string
      payload: 5
        $ref: "#/components/schemas/covidapi"
      bindings: 6
        kafka:
          key:
            type: object
            properties:
              id:
                type: string
              type:
                type: string
          bindingVersion: '0.1.0'
1

The name of the message component. This is the element that is being referenced in Example 4-9.

2

The correlationId references the (5) to identify the field to be used as a correlation ID, an identifier in message tracing. In Kafka, this will most likely be the key used to assign the partition in a topic.

3

Again, tags can be used to build relationships between other data products or domains.

4

The headers section provides information in the header of the message from the streaming platform. It also has the correlationId in case it is provided in the header.

5

The payload references another section of the AsyncAPI that contains the schema of the message located in components/schemas/covidapi in the same components object.

6

A free-form map where the keys describe the name of the protocol (in this case, the Kafka protocol), and the values describe protocol-specific definitions for the server (Kafka).

The schemas subsection shown in Example 4-12 defines the schema payload inline. Inline basically means that the schema is defined in the AsyncAPI YAML document. You can alternatively define the schema outside the AsyncAPI YAML document by providing a $ref, which is just a URL to the schema (see Example 4-13).

Example 4-12. Schema describes the payload that is the streaming data product itself
  schemas:
      covidapi:
        type: object
        required:
          - CountryCode
        properties:
          Country:
            type: string
          CountryCode:
            type: string
            description: correlationId
          Date:
            type: string
          ID:
            type: string
          NewConfirmed:
            type: integer
          NewDeaths:
            type: integer
          NewRecovered:
            type: integer
          Premium:
            type: object
          Slug:
            type: string
          TotalConfirmed:
            type: integer
          TotalDeaths:
            type: integer
          TotalRecovered:
            type: integer

Using a tool like a schema registry to register and manage schemas is the preferred approach. The schema registry keeps track of schema versions and sometimes checks for compatibility to previous versions. Schemas are the “contract” between the producing domain and the consuming domain for streaming data products. This protects applications from changes to the schema that could break the data processing in the consuming domain. It also enforces producing domains to evolve their streaming data products in such a way that doesn’t break compatibility to older versions. A schema registry falls under the federated data governance in the streaming data mesh, so we will go into more details in Chapter 5. In the AsyncAPI YAML document, it’s important to know that instead of defining your schema inline, you can do it remotely with a schema registry (see Example 4-13).

Example 4-13. messages describes the envelope that holds the payload
messages:
    covidapi:
      name: covidapi
      title: covidapi
      summary: COVID 19 global statistics
      contentType: avro/binary
      schemaFormat: application/vnd.apache.avro+json;version=1.9.0
      payload:
        $ref: 'http://schema-registry:8081/subjects/topic/versions/1/#covidapi' 1
1

The payload schema is an external reference to a schema registry.

Security schemes section

AsyncAPI provides specific security information in the securitySchemes object of the YAML document. Example 4-14 shows how to connect to the streaming platform, which in this case is Kafka as defined in the servers section of the AsyncAPI YAML. The description object has a property description that is provided to the consuming application. This provides more detailed information to the streaming data product client. Authors of the AsyncAPI YAML document can format the content of the description object to provide more information that AsyncAPI YAML doesn’t provide.

Example 4-14. Security schemes section of components that shows more details in the description
securitySchemes:
    user-password:
      type: userPassword
      description: |
        Provide your Confluent KEY as the user and SECRET as the password.

        ```prop
        # Kafka
        bootstrap.servers=kafka.us-east-2.aws.confluent.cloud:9092
        security.protocol=SASL_SSL
        sasl.mechanisms=PLAIN
        sasl.username={{ CLUSTER_API_KEY }} 1
        sasl.password={{ CLUSTER_API_SECRET }} 2

        # Best practice for higher availability in librdkafka clients prior to 1.7
        session.timeout.ms=45000

        # Confluent Cloud Schema Registry
        schema.registry.url=https://schema-registry.westus2.azure.confluent.cloud
        basic.auth.credentials.source=USER_INFO
        basic.auth.user.info={{ SR_API_KEY }}:{{ SR_API_SECRET }} 3

        ```

        Copy the above YAML replacing the KEY/SECRETS for both the cluster and
        schema registry and use in your Kafka clients.
1

{{ CLUSTER_API_KEY }} is the key or user in user-password to use to connect to Kafka in Confluent Cloud.

2

{{ CLUSTER_API_SECRET }} is the secret or password user-password to use to connect to Kafka in Confluent Cloud.

3

{{ SR_API_KEY }}:{{ SR_API_SECRET }} is the key/secret or user-password to use when requesting schemas from the Schema Registry in Confluent Cloud.

When domain consumers want to consume a streaming data product, such as the COVID-19 global statistics data described in this AsyncAPI, they have to request access to it. Then the streaming data product manager needs to approve the requesting domain. Part of this approval may involve sending the credentials to the requesting domain so that it can put that information in its configuration to start consuming the streaming data product. In Example 4-14, this would correspond to the parameters listed in (1), (2), and (3). These parameters will need to be replaced with the credentials provided by the producing domain to gain access to the streaming data products.

The description in Example 4-14 shows specifically how to configure a consumer to read from Confluent Cloud. The client will only need to again replace the parameters with the credentials, read this configuration into the application, and pass it to the Kafka client libraries to configure it to read from Kafka. In later chapters we will demonstrate how to use the AsyncAPI YAML document to build Apache Kafka connectors instead, creating applications that read from Apache Kafka.

Warning

It’s important to remember that this example uses the user/password methodology for security. As mentioned earlier, other supported security methodologies have their own types of credentials and may not use the user/password approach. Those other security methodologies are outside the scope of this book.

Traits section

In AsyncAPI, traits provides additional information that could be applied to an object in the YAML document. Traits are usually used only when the application parsing the AsyncAPI YAML document is trying to generate client code in a specific language. Example 4-9 had an object called operationId with the value receiveNewCovidInfo. An application that reads an AsyncAPI YAML document called AsyncAPI Generator can be downloaded from the AsyncAPI website. This will generate Java Spring client code for domains to compile and deploy. This application can consume the streaming data product from the streaming platform defined in the servers section in the AsyncAPI YAML. In this case, it will be Apache Kafka. AsyncAPI Generator will use the value in operationId as the method name in the source code. The traits in both messageTraits and operationTraits in Example 4-15 are used to assign values to methods like the groupID or clientId to help with generating client code.

Example 4-15. Traits are used by code generators to assign values to properties, name classes, and methods in the generated code
  messageTraits:
    commonHeaders:
      headers:
        type: object
        properties: 1
          my-app-header:
            type: integer
            minimum: 0
            maximum: 100
          correlationId:
            type: string

  operationTraits:
    covid:
      bindings:
        kafka: 2
          groupId: my-app-group-id-pub
          clientId: my-app-client-id-pub
          bindingVersion: '0.1.0'
1

Provides more information on what headers could be used by the client for processing the streaming data product

2

Provides binding information to Kafka that will help identify the consumer groups and elastically scale the client application if needed

Assigning Data Tags

Data tags are a simple way of providing the consuming domains with more information about the streaming data product: how it was built and what to expect when consuming it. Many of the streaming data characteristics are hard to measure, like quality and security, so it’s sometimes tough to provide that important information to the consuming domain. Instead of providing a number or a score, we can provide tags that represent levels of quality and security. In this section we’ll try to assign data tags to the data.

Tags can provide information about the quality or security of the streaming data product. Sometimes consuming domains want streaming data products that were unchanged (in raw form) from the original source. Other consuming domains may want that same streaming data product to be of highest quality that satisfies the format standards and security requirements. These would end up being two distinct streaming data products. Tags give us an easy way to present streaming data products to the consuming domains.

Quality

Quality is a hard characteristic to score, but we could use tags like those defined in Table 4-8.

Table 4-8. Possible quality tags
Tags Definition

RAW

Raw data from the original source

STANDARD

Transformation to meet format standards

ENRICHED

Transformation to format standards and enrichment

For the two streaming data products originating from the same source data but with differing quality, we could assign RAW as the data quality tag for the streaming data product that serves raw data. We could also assign ENRICHED as the data quality tag for the second streaming data product, where consumers are expecting enrichment. Consuming domains would easily identify which streaming data product to request access to.

These tags could be assigned to tags in the AsyncAPI and domain consumers could click into it and get the tags definitions, as in Example 4-16.

Example 4-16. Adding quality tags to provide additional information
    covidapi:
      name: covidapi
      title: covid api
      tags:
        - name: quality.RAW 1
          externalDocs:
            description: Provides raw source data
            url: https://somewhere/quality/raw
1

quality.RAW corresponds to raw data. The URL would direct the domain consumer to more information on how the quality was implemented.

Security

Security in this context of informational data tags would involve protecting sensitive information in the streaming data product. Similarly to quality, security tags could also be defined as in Table 4-9.

Table 4-9. Possible security tags
Tags Definition

FILTERED

The sensitive data was filtered or selected out of the streaming data product, or there was no sensitive information in the payload.

TOKENIZED

The sensitive data was tokenized and can be retrieved via a lookup mechanism.

ENCRYPTED

The sensitive data, was encrypted and decrypting the data to its original value requires a key.

Similarly to the tags for informing quality, the tags for security are added. You can add multiple tags to the AsyncAPI YAML document for both quality and security (Example 4-17).

Example 4-17. Adding security tags to provide additional information
    covidapi:
      name: covidapi
      title: covid api
      tags:
        - name: security.FILTERED 1
          externalDocs:
            description: Provides raw source data
            url: https://somewhere/security/filtered
1

security.FILTERED means that if any of the information was sensitive, it was filtered or omitted from the final streaming data product.

Example 4-17 shows how we can provide information to the consuming domain about what was done to the data to be secured. The url is an additional resource that could provide more information about what was filtered out and why.

Throughput

Providing throughput to the streaming data product will provide very important scalability information to the consuming domain. Throughput can be measured in megabytes per second (MBps). It is an indicator of how fast the data is coming to a consuming domain. Some streaming data products can be slow, like the slowly changing dimensional data discussed in “Dimensional versus fact data in a streaming context”. Other streaming data can be really fast, like clickstream data from a web application or a Twitter feed.

Similarly to quality and security, in Example 4-18 you could provide the throughput as a description in a throughput tag and a URL that will provide additional information about how the Apache Kafka topic is configured to enable that level of throughput, like the number of partitions.

Example 4-18. Describing throughput in AsyncAPI
subscribe:
      summary: Subscribe to global COVID 19 Statistics.
      description: |
        The schema that this service follows the https://api.covid19api.com/
      operationId: receiveNewCovidInfo
      tags:
        - name: throughput 1
          externalDocs:
            description: 10/mbps 2
            url: https://localhost/covid119/throughput 3
1

Throughput tag

2

The tag description that provides the throughput value

3

A URL where consuming domains can find out more about how the throughput is implemented

Versioning

It is important to provide the version of the streaming data product. This allows consuming domains to understand whether the streaming data product is ready for their production workloads or whether a major version change has occurred that they could take advantage of.

Example 4-19 shows how AsyncAPI can provide detailed versioning information to consuming domains they could use when managing their own application development.

Example 4-19. AsyncAPI YAML informational sections
info:
  title: COVID 19 Global Statistics AsyncAPI Spec
  version: '0.0.1'
  description: |
    This streaming data product is in preview. DISCLAIMER - this streaming data
    product could implement breaking changes. Do not use this for your production
    applications in your domain.
  contact:
    name: API Support
    url: http://www.asyncapi.com/support

It may also be beneficial if the change log to updated versions is provided either in the description or a URL. This includes any changes to the ETL data pipeline that produces the data product and any changes in the original source.

Data derivatives that originate from other streaming data products from other domains should be identified as well in tags or URLs so that consuming domains can recursively drill down into the sources that compose the final streaming data product. In later chapters, we will go over ways to do this.

Monitoring

Monitoring information about the streaming data product is also critical for consuming domains to see. This again could be provided as yet another URL or tag in the AsyncAPI YAML document. Table 4-10 shows some of the important information that consuming domains would want to know.

Table 4-10. Information that could be learned from monitoring streaming data products
Information Insight

Number of consumers

  • Indicates the popularity of the streaming data product.

  • To see which other domains are consuming this data.

Error count/SLA

  • To get the current status of the streaming data product in case the consuming domain is experiencing issues. This will answer questions like:

    • Is it active?

    • Is there a current outage?

    • When will the streaming data product expected to be back online?

  • To see if the streaming data product is meeting uptime SLA.

Throughput/bandwidth

  • To see if the streaming data product is at maximum capacity.

If the consuming domain requires 99.999% for its own applications and the streaming data product provides only 99.9%, it may want to request higher SLA guarantees, which may result in a separate streaming data product.

Consuming domains will want to set up alerts to these metrics so that they can react to potential issues. You can provide better data mesh experience if you provide monitoring to all your streaming data products and have them be programmatically consumable or alertable to the consuming domains.

Summary

In this chapter, we outlined all the necessary steps to build a streaming data product: how to define requirements, ingestion, transformation, and ultimately publishing an AsyncAPI YAML document. We did all this with the skill set of generalist engineers that we expect from domain engineers: JSON, SQL, and YAML. This AsyncAPI document will allow us to build a streaming data mesh. In Chapter 5, we will talk about how we can use the AsyncAPI YAML document to populate a streaming data catalog. We will also use AsyncAPI applications (tools) that will generate HTML pages in a streaming data catalog application and see how we can extend it to add a streaming data mesh workflow. In later chapters, we will continue to use the AsyncAPI document to build self-services that can build integrations and retrieve metadata recursively, such as data lineage.

1 US Department of Health and Human Services, “Summary of the HIPAA Privacy Rule”, p. 4.

2 The EU’s GDPR applies only to personal data, which is any piece of information that relates to an identifiable person. It’s crucial for any business with EU consumers to understand this concept for GDPR compliance.

Get Streaming Data Mesh now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.