Chapter 4. Querying Kafka with Kafka Streams
AATD doesn’t currently have real-time insight into the number of orders being placed or the revenue being generated. The company would like to know if there are spikes or dips in the numbers of orders so that it can react more quickly in the operations part of the business.
The AATD engineering team is already familiar with Kafka Streams from other applications that they’ve built, so we’re going to create a Kafka Streams app that exposes an HTTP endpoint showing recent orders and revenue. We’ll build this app with the Quarkus framework, starting with a naive version. Then we’ll apply some optimizations. We’ll conclude with a summary of the limitations of using a stream processor to query streaming data. Figure 4-1 shows what we’ll be building in this chapter.
What Is Kafka Streams?
Kafka Streams is a library for building streaming applications that transform input Kafka topics into output Kafka topics. It is an example of the stream processor component of the real-time analytics stack described in Chapter 2.
Kafka Streams is often used for joining, filtering, and transforming streams, but in this chapter we’re going to use it to query an existing stream.
At the heart of a Kafka Streams application is a topology, which defines the stream processing logic of the application. A topology describes how data is consumed from input streams (source) and then transformed into something that can be produced to output streams (sink).
More specifically, Jacek Laskowski, author of The Internals of Kafka Streams, defines a topology as follows:
A directed acyclic graph of stream processing nodes that represents the stream processing logic of a Kafka Streams application.
In this graph, the nodes are the processing work, and the relationships are streams. Through this topology, we can create powerful streaming applications that can handle even the most complex data processing tasks. You can see an example topology in Figure 4-2.
Kafka Streams provides a domain-specific language (DSL) that simplifies the building of these topologies.
Let’s go through the definitions of some Kafka Streams abstractions that we’ll be using in this section. The following definitions are from the official documentation:
- KStream
-
A KStream is an abstraction of a record stream, where each data record represents a self-contained datum in the unbounded dataset. The data records in a KStream are interpreted as “INSERT” operations, where each record adds a new entry to an append-only ledger. In other words, each record represents a new piece of data that is added to the stream without replacing any existing data with the same key.
- KTable
-
A KTable is an abstraction of a change log stream, where each data record represents an update. Each record in a KTable represents an update to the previous value for a specific record key, if any exists. If a corresponding key doesn’t exist yet, the update is treated as an “INSERT” operation. In other words, each record in a KTable represents an update to the existing data with the same key or the addition of a new record with a new key-value pair.
- State Store
-
State Stores are storage engines for managing the state of stream processors. They can store the state in memory or in a database like RocksDB.
When stateful functions like aggregation or windowing functions are called, intermediate data is stored in the State Store. This data can then be queried by the read side of a stream processing application to generate output streams or tables. State Stores are an efficient way to manage the state of stream processors and enable the creation of powerful stream processing applications in Kafka Streams.
What Is Quarkus?
Quarkus is a Java framework optimized for building cloud native applications that are deployed on Kubernetes. Developed by Red Hat’s engineering team and released in 2019, Quarkus offers a modern, lightweight approach to building Java applications that is ideally suited to the needs of cloud native development.
The framework includes a wide range of extensions for popular technologies, including Camel, Hibernate, MongoDB, Kafka Streams, and more. These extensions provide a simple and efficient way to integrate these tools into your microservices architecture, speeding up development time and streamlining the process of building complex distributed systems.
The native Kafka Streams integration in particular makes it a great choice for us.
Quarkus Application
Now that we’ve got the definitions out of the way, it’s time to start building our Kafka Streams app.
Installing the Quarkus CLI
The Quarkus CLI is a powerful tool that lets us create and manage Quarkus applications from the command line. With the Quarkus CLI, we can quickly scaffold new applications, generate code, run tests, and deploy our applications to various environments There are many ways to install the CLI, so you can almost certainly find one that you prefer.
I’m a big fan of SDKMAN, so I’m going to install it using that. SDKMAN makes it easy to install and manage software development kits (SDKs). It has lots of useful features, including automated updates, environment management, and support for multiple platforms. I use it to run different Java versions on my machine.
We can install Quarkus with SDKMAN by running the following command:
sdkinstall
quarkus
We can check that it’s installed by running the following command:
quarkus--version
You should see output similar to Example 4-1.
Example 4-1. Quarkus version
2.13.1.Final
Note
The Quarkus CLI isn’t mandatory, but having it installed does make the development process much smoother, so we suggest installing it!
Creating a Quarkus Application
Now that we’ve got that installed, we can run the following command to create our pizza shop app:
quarkuscreate
app
pizzashop
--package-name
pizzashop
cd
pizzashop
This command will create a Maven application with most of the dependencies that we’ll need and a skeleton structure to get us started.
The only thing missing is Kafka Streams, which we can add using the kafka-streams
extension:
quarkusextension
add
'kafka-streams'
We’re now ready to start building our application.
Creating a Topology
The first thing we need to do is create a Kafka Streams topology. A Quarkus application can define a single topology, in which we’ll define all our stream operations. This could include joining streams together to create a new stream, filtering a stream, creating a key-value store based on a stream, and more.
Once we have our topology class, we’ll create a couple of window stores that keep track of the total orders and revenue generated in the last couple of minutes. This will allow us to create an HTTP endpoint that returns a summary of the latest orders based on the contents of these stores.
Create the file src/main/java/pizzashop/streams/Topology.java and add this:
package
pizzashop.streams
;
import
org.apache.kafka.common.serialization.Serde
;
import
org.apache.kafka.common.serialization.Serdes
;
import
org.apache.kafka.common.utils.Bytes
;
import
org.apache.kafka.streams.StreamsBuilder
;
import
org.apache.kafka.streams.kstream.*
;
import
org.apache.kafka.streams.state.WindowStore
;
import
pizzashop.deser.JsonDeserializer
;
import
pizzashop.deser.JsonSerializer
;
import
pizzashop.models.Order
;
import
javax.enterprise.context.ApplicationScoped
;
import
javax.enterprise.inject.Produces
;
import
java.time.Duration
;
@ApplicationScoped
public
class
Topology
{
@Produces
public
org
.
apache
.
kafka
.
streams
.
Topology
buildTopology
()
{
final
Serde
<
Order
>
orderSerde
=
Serdes
.
serdeFrom
(
new
JsonSerializer
<>
(),
new
JsonDeserializer
<>
(
Order
.
class
));
// Create a stream over the `orders` topic
StreamsBuilder
builder
=
new
StreamsBuilder
();
KStream
<
String
,
Order
>
orders
=
builder
.
stream
(
"orders"
,
Consumed
.
with
(
Serdes
.
String
(),
orderSerde
));
// Defining the window size of our state store
Duration
windowSize
=
Duration
.
ofSeconds
(
60
);
Duration
advanceSize
=
Duration
.
ofSeconds
(
1
);
Duration
gracePeriod
=
Duration
.
ofSeconds
(
60
);
TimeWindows
timeWindow
=
TimeWindows
.
ofSizeAndGrace
(
windowSize
,
gracePeriod
).
advanceBy
(
advanceSize
);
// Create an OrdersCountStore that keeps track of the
// number of orders over the last two minutes
orders
.
groupBy
(
(
key
,
value
)
->
"count"
,
Grouped
.
with
(
Serdes
.
String
(),
orderSerde
))
.
windowedBy
(
timeWindow
)
.
count
(
Materialized
.
as
(
"OrdersCountStore"
)
);
// Create a RevenueStore that keeps track of the amount of revenue
// generated over the last two minutes
orders
.
groupBy
(
(
key
,
value
)
->
"count"
,
Grouped
.
with
(
Serdes
.
String
(),
orderSerde
))
.
windowedBy
(
timeWindow
)
.
aggregate
(
()
->
0.0
,
(
key
,
value
,
aggregate
)
->
aggregate
+
value
.
price
,
Materialized
.
<
String
,
Double
,
WindowStore
<
Bytes
,
byte
[]>>
as
(
"RevenueStore"
)
.
withValueSerde
(
Serdes
.
Double
())
);
return
builder
.
build
();
}
}
In this code, we first create a KStream based on the orders
topic, before creating the OrdersCountStore
and RevenueStore
, which store a one-minute rolling window of the number of orders and revenue generated.
The grace period is usually used to capture late-arriving events, but we’re using it so that we have two minutes’ worth of windows kept around, which we’ll need later on.
We also have the following model classes that represent events in the orders
stream:
package
pizzashop.models
;
import
io.quarkus.runtime.annotations.RegisterForReflection
;
import
java.util.List
;
@RegisterForReflection
public
class
Order
{
public
Order
()
{
}
public
String
id
;
public
String
userId
;
public
String
createdAt
;
public
double
price
;
public
double
deliveryLat
;
public
double
deliveryLon
;
public
List
<
OrderItem
>
items
;
}
package
pizzashop.models
;
public
class
OrderItem
{
public
String
productId
;
public
int
quantity
;
public
double
price
;
}
Querying the Key-Value Store
Next, we’ll create the class src/main/java/pizzashop/streams/OrdersQueries.java
, which will abstract our interactions with the OrdersStore
.
The querying of state stores (like OrdersStore
) uses a feature of Kafka Streams called interactive queries:
package
pizzashop.streams
;
import
org.apache.kafka.streams.KafkaStreams
;
import
org.apache.kafka.streams.KeyValue
;
import
org.apache.kafka.streams.StoreQueryParameters
;
import
org.apache.kafka.streams.errors.InvalidStateStoreException
;
import
org.apache.kafka.streams.state.*
;
import
pizzashop.models.*
;
import
javax.enterprise.context.ApplicationScoped
;
import
javax.inject.Inject
;
import
java.time.Instant
;
@ApplicationScoped
public
class
OrdersQueries
{
@Inject
KafkaStreams
streams
;
public
OrdersSummary
ordersSummary
()
{
KStreamsWindowStore
<
Long
>
countStore
=
new
KStreamsWindowStore
<>
(
ordersCountsStore
());
KStreamsWindowStore
<
Double
>
revenueStore
=
new
KStreamsWindowStore
<>
(
revenueStore
());
Instant
now
=
Instant
.
now
();
Instant
oneMinuteAgo
=
now
.
minusSeconds
(
60
);
Instant
twoMinutesAgo
=
now
.
minusSeconds
(
120
);
long
recentCount
=
countStore
.
firstEntry
(
oneMinuteAgo
,
now
);
double
recentRevenue
=
revenueStore
.
firstEntry
(
oneMinuteAgo
,
now
);
long
previousCount
=
countStore
.
firstEntry
(
twoMinutesAgo
,
oneMinuteAgo
);
double
previousRevenue
=
revenueStore
.
firstEntry
(
twoMinutesAgo
,
oneMinuteAgo
);
TimePeriod
currentTimePeriod
=
new
TimePeriod
(
recentCount
,
recentRevenue
);
TimePeriod
previousTimePeriod
=
new
TimePeriod
(
previousCount
,
previousRevenue
);
return
new
OrdersSummary
(
currentTimePeriod
,
previousTimePeriod
);
}
private
ReadOnlyWindowStore
<
String
,
Double
>
revenueStore
()
{
while
(
true
)
{
try
{
return
streams
.
store
(
StoreQueryParameters
.
fromNameAndType
(
"RevenueStore"
,
QueryableStoreTypes
.
windowStore
()
));
}
catch
(
InvalidStateStoreException
e
)
{
System
.
out
.
println
(
"e = "
+
e
);
}
}
}
private
ReadOnlyWindowStore
<
String
,
Long
>
ordersCountsStore
()
{
while
(
true
)
{
try
{
return
streams
.
store
(
StoreQueryParameters
.
fromNameAndType
(
"OrdersCountStore"
,
QueryableStoreTypes
.
windowStore
()
));
}
catch
(
InvalidStateStoreException
e
)
{
System
.
out
.
println
(
"e = "
+
e
);
}
}
}
}
Both ordersCountsStore
and revenueStore
are returning data from window stores that hold the order count and amount of revenue generated, respectively.
The reason for the while(true) { try {} catch {} }
code block in both functions is that the store might not be available if we call this code before the stream thread is in a RUNNING
state.
Assuming we don’t have any bugs in our code, we will eventually get to the RUNNING
state; it just might take a bit longer than it takes for the HTTP endpoint to start up.
ordersSummary
calls those two functions to get the number of orders for the last minute and the minute before that, as well as the total revenue for the last minute and the minute before that.
KStreamsWindowStore.java
is defined here:
package
pizzashop.models
;
import
org.apache.kafka.streams.state.ReadOnlyWindowStore
;
import
org.apache.kafka.streams.state.WindowStoreIterator
;
import
java.time.Instant
;
public
class
KStreamsWindowStore
<
T
>
{
private
final
ReadOnlyWindowStore
<
String
,
T
>
store
;
public
KStreamsWindowStore
(
ReadOnlyWindowStore
<
String
,
T
>
store
)
{
this
.
store
=
store
;
}
public
T
firstEntry
(
Instant
from
,
Instant
to
)
{
try
(
WindowStoreIterator
<
T
>
iterator
=
store
.
fetch
(
"count"
,
from
,
to
))
{
if
(
iterator
.
hasNext
())
{
return
iterator
.
next
().
value
;
}
}
throw
new
RuntimeException
(
"No entries found in store between "
+
from
+
" and "
+
to
);
}
}
The firstEntry
method finds the first entry in the window store in the provided date range and returns the value.
If no entries, exist it will throw an error.
OrdersSummary.java
is defined here:
package
pizzashop.models
;
import
io.quarkus.runtime.annotations.RegisterForReflection
;
@RegisterForReflection
public
class
OrdersSummary
{
private
TimePeriod
currentTimePeriod
;
private
TimePeriod
previousTimePeriod
;
public
OrdersSummary
(
TimePeriod
currentTimePeriod
,
TimePeriod
previousTimePeriod
)
{
this
.
currentTimePeriod
=
currentTimePeriod
;
this
.
previousTimePeriod
=
previousTimePeriod
;
}
public
TimePeriod
getCurrentTimePeriod
()
{
return
currentTimePeriod
;
}
public
TimePeriod
getPreviousTimePeriod
()
{
return
previousTimePeriod
;
}
}
This class is a data object that keeps track of orders and revenue for the current and previous time periods.
TimePeriod.java
is defined here:
package
pizzashop.models
;
import
io.quarkus.runtime.annotations.RegisterForReflection
;
@RegisterForReflection
public
class
TimePeriod
{
private
int
orders
;
private
double
totalPrice
;
public
TimePeriod
(
long
orders
,
double
totalPrice
)
{
this
.
orders
=
orders
;
this
.
totalPrice
=
totalPrice
;
}
public
int
getOrders
()
{
return
orders
;
}
public
double
getTotalPrice
()
{
return
totalPrice
;
}
}
This class is a data object that keeps track of orders and revenue.
Creating an HTTP Endpoint
Finally, let’s create the HTTP endpoint that exposes the summary data to our users. Create the file src/main/java/pizzashop/rest/OrdersResource.java and add this:
package
pizzashop.rest
;
import
pizzashop.models.OrdersSummary
;
import
pizzashop.streams.InteractiveQueries
;
import
javax.enterprise.context.ApplicationScoped
;
import
javax.inject.Inject
;
import
javax.ws.rs.GET
;
import
javax.ws.rs.Path
;
import
javax.ws.rs.core.Response
;
@ApplicationScoped
@Path
(
"/orders"
)
public
class
OrdersResource
{
@Inject
OrdersQueries
ordersQueries
;
@GET
@Path
(
"/overview"
)
public
Response
overview
()
{
OrdersSummary
ordersSummary
=
ordersQueries
.
ordersSummary
();
return
Response
.
ok
(
ordersSummary
).
build
();
}
}
Running the Application
Now that we’ve created all our classes, it’s time to run the application. We can do this by running the following command:
QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS
=
localhost:29092quarkus
dev
We pass in the QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS
environment variable so that Quarkus can connect to our Kafka broker.
Querying the HTTP Endpoint
Now we can query the HTTP endpoint to see how many orders our online service is receiving.
The endpoint is available on port 8080 at /orders/overview
:
curlhttp://localhost:8080/orders/overview
2
>/dev/null
|
jq
'.'
The results of this command are shown in Example 4-2.
Example 4-2. Latest orders state
{
"currentTimePeriod"
:
{
"orders"
:
994
,
"totalPrice"
:
4496973
},
"previousTimePeriod"
:
{
"orders"
:
985
,
"totalPrice"
:
4535117
}
}
Success! We can see the number of orders and the total revenue in the current and previous time periods.
Limitations of Kafka Streams
While this approach for querying streams has been successful in many cases, certain factors could impact its efficacy for our particular use case. In this section, we will take a closer look at these limitations to better understand how they could affect the performance of this approach.
The underlying database used by Kafka Streams is RocksDB, a key-value store that allows you to store and retrieve data using key-value pairs. This fork of Google’s LevelDB is optimized for write-heavy workloads with large datasets.
One of its constraints is that we can create only one index per key-value store. This means that if we decide to query the data along another dimension, we’ll need to update the topology to create another key-value store. If we do a non-key search, RocksDB will do a full scan to find the matching records, leading to high query latency.
Our key-value stores are also capturing only events that happened in the last one minute and the minute before that. If we wanted to capture data going further back, we’d need to update the topology to capture more events. In AATD’s case, we could imagine a future use case where we’d want to compare the sales numbers from right now with the numbers from this same time last week or last month. This would be difficult to do in Kafka Streams because we’d need to store historical data, which would take up a lot of memory.
So although we can use Kafka Streams to write real-time analytics queries and it will do a reasonable job, we probably need to find a tool that better fits the problem.
Summary
In this chapter, we looked at how to build an HTTP API on top of the orders
stream so that we can get an aggregate view of what’s happening with orders in the business.
We built this solution using Kafka Streams, but we realized that this might not be the most appropriate tool for the job.
In the next, chapter we’ll learn why we need a serving layer to build a scalable real-time analytics application.
Get Building Real-Time Analytics Systems now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.