Capítulo 4. Consulta de Kafka con Kafka Streams

Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com

La AATD no dispone actualmente de información en tiempo real sobre el número de pedidos que se realizan ni sobre los ingresos que se generan. La empresa desearía saber si se producen picos o caídas en el número de pedidos para poder reaccionar más rápidamente en la parte operativa del negocio.

El equipo de ingenieros de la AATD ya está familiarizado con Kafka Streams gracias a otras aplicaciones que han construido, así que vamos a crear una aplicación Kafka Streams que exponga un punto final HTTP que muestre los pedidos e ingresos recientes. Construiremos esta aplicación con el framework Quarkus, empezando con una versión ingenua. Luego aplicaremos algunas optimizaciones. Concluiremos con un resumen de las limitaciones de utilizar un procesador de flujo para consultar datos de flujo.La Figura 4-1 muestra lo que construiremos en este capítulo.

bras 0401
Figura 4-1. Arquitectura de Kafka Streams

¿Qué es Kafka Streams?

Kafka Streams es una biblioteca para construir aplicaciones de streaming que transforman temas Kafka de entrada en temas Kafka de salida. Es un ejemplo del componente procesador de streaming de la pila de análisis en tiempo real descrita en el Capítulo 2.

Kafka Streams se utiliza a menudo para unir, filtrar y transformar flujos, pero en este capítulo vamos a utilizarlo para consultar un flujo existente.

En el corazón de una aplicación Kafka Streams hay una topología, que define la lógica de procesamiento de flujos de la aplicación. Una topología describe cómo se consumen los datos de los flujos de entrada (fuente) y luego se transforman en algo que pueda producirse a los flujos de salida (sumidero).

Más concretamente, Jacek Laskowski, autor de The Internals of Kafka Streams, define una topología del siguiente modo:

Un grafo acíclico dirigido de nodos de procesamiento de flujos que representa la lógica de procesamiento de flujos de una aplicación Kafka Streams.

En este gráfico, los nodos son el trabajo de procesamiento, y las relaciones son flujos. Mediante esta topología, podemos crear potentes aplicaciones de flujo que pueden manejar incluso las tareas de procesamiento de datos más complejas. Puedes ver un ejemplo de topología en la Figura 4-2.

Kafka Streams proporciona un lenguaje específico de dominio (DSL) que simplifica la construcción de estas topologías.

bras 0402
Figura 4-2. Topología de Kafka Streams

Repasemos las definiciones de algunas abstracciones de Kafka Streams que utilizaremos en esta sección. Las siguientes definiciones proceden de la documentación oficial:

KStream

Un KStream es una abstracción de un flujo de registros , en el que cada registro de datos representa un dato autónomo en el conjunto de datos no acotado. Los registros de datos de un KStream se interpretan como operaciones "INSERT", en las que cada registro añade una nueva entrada a un libro de contabilidad de sólo añadir datos. En otras palabras, cada registro representa un nuevo dato que se añade al flujo sin sustituir ningún dato existente con la misma clave.

KTable

Una KTable es una abstracción de un flujo de registro de cambios de , donde cada registro de datos representa una actualización. Cada registro de una KTable representa una actualización del valor anterior para una clave de registro específica, si existe. Si aún no existe una clave correspondiente, la actualización se trata como una operación "INSERT". En otras palabras, cada registro de una KTable representa una actualización de los datos existentes con la misma clave o la adición de un nuevo registro con un nuevo par clave-valor.

Tienda estatal

Los almacenes de estado son motores de almacenamiento para gestionar el estado de los procesadores de flujo. Pueden almacenar el estado en memoria o en una base de datos como RocksDB.

Cuando se llama a funciones con estado, como funciones de agregación o de ventana, los datos intermedios se almacenan en el Almacén de Estado. Estos datos pueden ser consultados por el lado de lectura de una aplicación de procesamiento de flujos para generar flujos o tablas de salida. Los Almacenes de Estado son una forma eficaz de gestionar el estado de los procesadores de flujos y permiten la creación de potentes aplicaciones de procesamiento de flujos en Kafka Streams.

¿Qué es Quarkus?

Quarkus es un framework Java optimizado para crear aplicaciones nativas de la nube que se implementan en Kubernetes. Desarrollado por el equipo de ingeniería de Red Hat y lanzado en 2019, Quarkus ofrece un enfoque moderno y ligero para crear aplicaciones Java que se adapta perfectamente a las necesidades deldesarrollo nativo de la nube.

El marco incluye una amplia gama de extensiones para tecnologías populares, como Camel, Hibernate, MongoDB, Kafka Streams, etc. Estas extensiones proporcionan una forma sencilla y eficaz de integrar estas herramientas en tu arquitectura de microservicios, acelerando el tiempo de desarrollo y agilizando el proceso de construcción de sistemas distribuidos complejos.

La integración nativa de Kafka Streams, en particular, hace que sea una gran elección para nosotros.

Aplicación Quarkus

Ahora que ya tenemos las definiciones, es hora de empezar a construir nuestra aplicación Kafka Streams.

Instalación de la CLI de Quarkus

La CLI de Quarkus es una potente herramienta que nos permite crear y gestionar aplicaciones Quarkus desde la línea de comandos. Con la CLI de Quarkus, podemos crear rápidamente nuevas aplicaciones, generar código, ejecutar pruebas y desplegar nuestras aplicaciones en varios entornos Hay muchas formas de instalar la CLI, así que casi seguro que encuentras la que prefieres.

Soy un gran fan de SDKMAN, así que voy a instalarlo con él. SDKMAN facilita la instalación y gestión de kits de desarrollo de software (SDK). Tiene muchas funciones útiles, como actualizaciones automáticas, gestión de entornos y compatibilidad con varias plataformas. Yo lo utilizo para ejecutar distintas versiones de Java en mi máquina.

Podemos instalar Quarkus con SDKMAN ejecutando el siguiente comando:

sdk install quarkus

Podemos comprobar que está instalado ejecutando el siguiente comando:

quarkus --version

Deberías ver una salida similar a la del Ejemplo 4-1.

Ejemplo 4-1. Versión de Quarkus
2.13.1.Final
Nota

La CLI de Quarkus no es obligatoria, pero tenerla instalada hace que el proceso de desarrollo sea mucho más fluido, ¡así que te sugerimos que la instales!

Crear una aplicación Quarkus

Ahora que lo tenemos instalado, podemos ejecutar el siguiente comando en para crear nuestra aplicación de pizzería:

quarkus create app pizzashop --package-name pizzashop
cd pizzashop

Este comando creará una aplicación Maven con la mayoría de las dependencias que necesitaremos y una estructura esquelética para empezar. Lo único que falta es Kafka Streams, que podemos añadir utilizando la extensión kafka-streams:

quarkus extension add 'kafka-streams'

Ya estamos listos para empezar a construir nuestra aplicación.

Crear una topología

Lo primero que tenemos que hacer es crear una topología de Kafka Streams. Una aplicación Quarkus puede definir una única topología, en la que definiremos todas nuestras operaciones de flujo, como unir flujos para crear un nuevo flujo, filtrar un flujo, crear un almacén de valores clave basado en un flujo, etc.

Una vez que tengamos nuestra clase de topología, crearemos un par de almacenes de ventana que lleven la cuenta del total de pedidos e ingresos generados en los últimos minutos. Esto nos permitirá crear un endpoint HTTP que devuelva un resumen de los últimos pedidos basándose en el contenido de estos almacenes.

Crea el archivo src/main/java/pizzashop/streams/Topology.java y añade esto:

package pizzashop.streams;

import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.WindowStore;
import pizzashop.deser.JsonDeserializer;
import pizzashop.deser.JsonSerializer;
import pizzashop.models.Order;

import javax.enterprise.context.ApplicationScoped;
import javax.enterprise.inject.Produces;
import java.time.Duration;

@ApplicationScoped
public class Topology {
    @Produces
    public org.apache.kafka.streams.Topology buildTopology() {
        final Serde<Order> orderSerde = Serdes.serdeFrom(new JsonSerializer<>(),
            new JsonDeserializer<>(Order.class));

        // Create a stream over the `orders` topic
        StreamsBuilder builder = new StreamsBuilder();
        KStream<String, Order> orders = builder.stream("orders",
            Consumed.with(Serdes.String(), orderSerde));

        // Defining the window size of our state store
        Duration windowSize = Duration.ofSeconds(60);
        Duration advanceSize = Duration.ofSeconds(1);
        Duration gracePeriod = Duration.ofSeconds(60);
        TimeWindows timeWindow = TimeWindows.ofSizeAndGrace(
            windowSize, gracePeriod).advanceBy(advanceSize);

        // Create an OrdersCountStore that keeps track of the
        // number of orders over the last two minutes
        orders.groupBy(
                (key, value) -> "count",
                Grouped.with(Serdes.String(), orderSerde))
                .windowedBy(timeWindow)
                .count(Materialized.as("OrdersCountStore")
        );

        // Create a RevenueStore that keeps track of the amount of revenue
        // generated over the last two minutes
        orders.groupBy(
                (key, value) -> "count",
                Grouped.with(Serdes.String(), orderSerde))
                .windowedBy(timeWindow)
                .aggregate(
                    () -> 0.0,
                    (key, value, aggregate) -> aggregate + value.price,
                    Materialized.
                        <String, Double, WindowStore<Bytes, byte[]>>
                        as("RevenueStore")
                        .withValueSerde(Serdes.Double())
                );

        return builder.build();
    }
}

En este código, primero creamos un KStream basado en el tema orders, antes de crear los temas OrdersCountStore y RevenueStore, que almacenan una ventana móvil de un minuto del número de pedidos e ingresos generados. El periodo de gracia se utiliza normalmente para capturar eventos que llegan tarde, pero nosotros lo utilizamos para tener ventanas de dos minutos guardadas, que necesitaremos más adelante.

También tenemos las siguientes clases modelo que representan eventos en el flujo orders:

package pizzashop.models;

import io.quarkus.runtime.annotations.RegisterForReflection;
import java.util.List;

@RegisterForReflection
public class Order {
    public Order() { }

    public String id;
    public String userId;
    public String createdAt;
    public double price;
    public double deliveryLat;
    public double deliveryLon;
    public List<OrderItem> items;
}
package pizzashop.models;

public class OrderItem {
    public String productId;
    public int quantity;
    public double price;
}

Consulta del almacén clave-valor

A continuación, crearemos la clase src/main/java/pizzashop/streams/OrdersQuer⁠ies​.java, que abstraerá nuestras interacciones con el OrdersStore. La consulta de almacenes de estado (como OrdersStore) utiliza una función de Kafka Streams llamada consultas interactivas:

package pizzashop.streams;

import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.state.*;

import pizzashop.models.*;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import java.time.Instant;

@ApplicationScoped
public class OrdersQueries {
    @Inject
    KafkaStreams streams;

    public OrdersSummary ordersSummary() {
        KStreamsWindowStore<Long> countStore = new KStreamsWindowStore<>(
            ordersCountsStore());
        KStreamsWindowStore<Double> revenueStore = new KStreamsWindowStore<>(
            revenueStore());

        Instant now = Instant.now();
        Instant oneMinuteAgo = now.minusSeconds(60);
        Instant twoMinutesAgo = now.minusSeconds(120);

        long recentCount = countStore.firstEntry(oneMinuteAgo, now);
        double recentRevenue = revenueStore.firstEntry(oneMinuteAgo, now);

        long  previousCount = countStore.firstEntry(twoMinutesAgo, oneMinuteAgo);
        double previousRevenue = revenueStore.firstEntry(
            twoMinutesAgo, oneMinuteAgo);

        TimePeriod currentTimePeriod = new TimePeriod
          (recentCount, recentRevenue);
        TimePeriod previousTimePeriod = new TimePeriod
          (previousCount, previousRevenue);
        return new OrdersSummary(currentTimePeriod, previousTimePeriod);
    }

    private ReadOnlyWindowStore<String, Double> revenueStore() {
        while (true) {
            try {
                return streams.store(StoreQueryParameters.fromNameAndType(
                        "RevenueStore", QueryableStoreTypes.windowStore()
                ));
            } catch (InvalidStateStoreException e) {
                System.out.println("e = " + e);
            }
        }
    }

    private ReadOnlyWindowStore<String, Long> ordersCountsStore() {
        while (true) {
            try {
                return streams.store(StoreQueryParameters.fromNameAndType(
                        "OrdersCountStore", QueryableStoreTypes.windowStore()
                ));
            } catch (InvalidStateStoreException e) {
                System.out.println("e = " + e);
            }
        }
    }
}

Tanto ordersCountsStore como revenueStore están devolviendo datos de almacenes de ventanas que contienen el recuento de pedidos y la cantidad de ingresos generados, respectivamente. La razón del bloque de código while(true) { try {} catch {} } en ambas funciones es que el almacén podría no estar disponible si llamamos a este código antes de que el hilo de flujo esté en el estado RUN⁠NING. Suponiendo que no tengamos ningún error en nuestro código, al final llegaremos al estado RUNNING; sólo que podría tardar un poco más de lo que tarda en iniciarse el punto final HTTP.

ordersSummary llama a esas dos funciones para obtener el número de pedidos del último minuto y del minuto anterior, así como los ingresos totales del último minuto y del minuto anterior.

KStreamsWindowStore.java se define aquí:

package pizzashop.models;

import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;

import java.time.Instant;

public class KStreamsWindowStore<T> {
    private final ReadOnlyWindowStore<String, T> store;

    public KStreamsWindowStore(ReadOnlyWindowStore<String, T> store) {
        this.store = store;
    }

    public T firstEntry(Instant from, Instant to) {
        try (WindowStoreIterator<T> iterator = store.fetch("count", from, to)) {
            if (iterator.hasNext()) {
                return iterator.next().value;
            }
        }
        throw new RuntimeException(
            "No entries found in store between " + from + " and " + to);
    }
}

El método firstEntry busca la primera entrada en el almacén de ventanas en el intervalo de fechas proporcionado y devuelve el valor. Si no existe ninguna entrada, lanzará un error.

OrdersSummary.java se define aquí:

package pizzashop.models;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class OrdersSummary {
    private TimePeriod currentTimePeriod;
    private TimePeriod previousTimePeriod;

    public OrdersSummary(
        TimePeriod currentTimePeriod, TimePeriod previousTimePeriod) {
        this.currentTimePeriod = currentTimePeriod;
        this.previousTimePeriod = previousTimePeriod;
    }

    public TimePeriod getCurrentTimePeriod() { return currentTimePeriod; }
    public TimePeriod getPreviousTimePeriod() { return previousTimePeriod; }
}

Esta clase es un objeto de datos que realiza un seguimiento de los pedidos e ingresos del periodo actual y de los anteriores.

TimePeriod.java se define aquí:

package pizzashop.models;

import io.quarkus.runtime.annotations.RegisterForReflection;

@RegisterForReflection
public class TimePeriod {
    private int orders;
    private double totalPrice;

    public TimePeriod(long orders, double totalPrice) {
        this.orders = orders;
        this.totalPrice = totalPrice;
    }

    public int getOrders() { return orders; }
    public double getTotalPrice() { return totalPrice; }
}

Esta clase es un objeto de datos que mantiene un seguimiento de los pedidos y los ingresos.

Crear un punto final HTTP

Por último, vamos a crear el punto final HTTP que expone los datos del resumen a nuestros usuarios . Crea el archivo src/main/java/pizzashop/rest/OrdersResource.java y añade esto:

package pizzashop.rest;

import pizzashop.models.OrdersSummary;
import pizzashop.streams.InteractiveQueries;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;

@ApplicationScoped
@Path("/orders")
public class OrdersResource {
    @Inject
    OrdersQueries ordersQueries;

    @GET
    @Path("/overview")
    public Response overview() {
        OrdersSummary ordersSummary = ordersQueries.ordersSummary();
        return Response.ok(ordersSummary).build();
    }
}

Ejecutar la aplicación

Ahora que hemos creado todas nuestras clases, es el momento de ejecutar la aplicación. Podemos hacerlo ejecutando el siguiente comando:

QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS=localhost:29092 quarkus dev

Pasamos la variable de entorno QUARKUS_KAFKA_STREAMS_BOOTSTRAP_SERVERS para que Quarkus pueda conectarse a nuestro broker Kafka.

Consulta del punto final HTTP

Ahora podemos consultar el punto final HTTP para ver cuántos pedidos está recibiendo nuestro servicio online. El punto final está disponible en el puerto 8080 en /orders/overview:

curl http://localhost:8080/orders/overview 2>/dev/null | jq '.'

Los resultados de este comando se muestran en el Ejemplo 4-2.

Ejemplo 4-2. Estado de los últimos pedidos
{
  "currentTimePeriod": {
    "orders": 994,
    "totalPrice": 4496973
  },
  "previousTimePeriod": {
    "orders": 985,
    "totalPrice": 4535117
  }
}

Éxito! Podemos ver el número de pedidos y los ingresos totales en los periodos de tiempo actual y anterior.

Limitaciones de Kafka Streams

Aunque este enfoque para consultar flujos ha tenido éxito en muchos casos, ciertos factores podrían afectar a su eficacia para nuestro caso de uso particular. En esta sección, examinaremos más detenidamente estas limitaciones para comprender mejor cómo podrían afectar al rendimiento de este enfoque.

La base de datos subyacente utilizada por Kafka Streams es RocksDB, un almacén clave-valor que te permite almacenar y recuperar datos utilizando pares clave-valor. Esta bifurcación deLevelDB de Google está optimizada para cargas de trabajo de escritura intensiva con grandes conjuntos de datos.

Una de sus limitaciones es que sólo podemos crear un índice por almacén de valores clave. Esto significa que si decidimos consultar los datos a lo largo de otra dimensión, tendremos que actualizar la topología para crear otro almacén de valores clave. Si realizamos una búsqueda sin claves, RocksDB hará un escaneo completo para encontrar los registros coincidentes, lo que provocará una elevada latencia de consulta.

Además, nuestros almacenes de valores clave sólo capturan los sucesos ocurridos en el último minuto y en el minuto anterior. Si quisiéramos capturar datos que se remontan más atrás, tendríamos que actualizar la topología para capturar más sucesos. En el caso de la AATD, podríamos imaginar un caso de uso futuro en el que quisiéramos comparar las cifras de ventas de ahora mismo con las cifras de esta misma hora de la semana pasada o del mes pasado. Esto sería difícil de hacer en Kafka Streams porque necesitaríamos almacenar datos históricos, lo que ocuparía mucha memoria.

Así que, aunque podemos utilizar Kafka Streams para escribir consultas analíticas en tiempo real y hará un trabajo razonable, probablemente necesitemos encontrar una herramienta que se adapte mejor al problema.

Resumen

En este capítulo, vimos cómo construir una API HTTP sobre el flujo orders para que podamos obtener una visión agregada de lo que está ocurriendo con los pedidos en la empresa. Construimos esta solución utilizando Kafka Streams, pero nos dimos cuenta de que ésta podría no ser la herramienta más adecuada para el trabajo. En el próximo capítulo, aprenderemos por qué necesitamos una capa de servicio para construir una aplicación de análisis en tiempo real escalable.

Get Creación de sistemas de análisis en tiempo real now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.