Capítulo 4. Procesamiento por estados

Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com

En el capítulo anterior, aprendimos a realizar transformaciones sin estado de flujos de registros utilizando la abstracción KStream y un rico conjunto de operadores sin estado que están disponibles en Kafka Streams. Puesto que las transformaciones sin estado no requieren ninguna memoria de los eventos vistos previamente, son fáciles de razonar y utilizar. Tratamos cada evento como un hecho inmutable y lo procesamos independientemente de otros eventos.

Sin embargo, Kafka Streams también nos ofrece la posibilidad de capturar y recordar información sobre los eventos que consumimos. La información capturada, o estado, nos permite realizar operaciones de procesamiento de flujos más avanzadas, como unir y agregar datos. En este capítulo exploraremos en detalle el procesamiento de flujos con estado. Algunos de los temas que trataremos son:

  • Las ventajas del procesamiento de flujos con estado

  • Las diferencias entre hechos y comportamientos

  • ¿Qué tipos de operadores con estado están disponibles en Kafka Streams?

  • Cómo se captura y consulta el estado en Kafka Streams

  • Cómo puede utilizarse la abstracción KTable para representar el estado local particionado

  • Cómo puede utilizarse la abstracción GlobalKTable para representar el estado global replicado

  • Cómo realizar operaciones con estado, incluyendo unir y agregar datos

  • Cómo utilizar consultas interactivas para exponer el estado

Al igual que en el capítulo anterior, exploraremos estos conceptos utilizando un enfoque basado en tutoriales. El tutorial de este capítulo está inspirado en la industria de los videojuegos, y construiremos una tabla de clasificación en tiempo real que nos obligará a utilizar muchos de los operadores con estado de Kafka Streams. Además, dedicaremos mucho tiempo a hablar de las uniones, ya que es una de las formas más comunes de enriquecimiento de datos en las aplicaciones con estado. Pero antes de empezar con el tutorial, veamos algunas de las ventajas del procesamiento con estado.

Ventajas del procesamiento por estados

El procesamiento con estados nos ayuda a comprender las relaciones entre los eventos y a aprovechar estas relaciones para casos de uso más avanzados del procesamiento de flujos. Cuando somos capaces de entender cómo se relaciona un evento con otros eventos, podemos:

  • Reconocer patrones y comportamientos en nuestros flujos de eventos

  • Realiza agregaciones

  • Enriquece los datos de forma más sofisticada utilizando uniones

Otra ventaja del procesamiento de flujos con estado es que nos proporciona una abstracción adicional para representar datos. Al reproducir un flujo de eventos de uno en uno, yguardar el último estado de cada clave en un almacén de valores clave integrado, podemos construir una representación puntual de flujos de registros continuos e ilimitados. Estas representaciones puntuales, o instantáneas, se denominan tablas, y Kafka Streams incluye distintos tipos de abstracciones de tablas que conoceremos en este capítulo.

Las tablas no sólo son el núcleo del procesamiento de flujos con estado, sino que, cuando se materializan, también se pueden consultar. Esta capacidad de consultar una instantánea en tiempo real de un flujo de eventos en rápido movimiento es lo que convierte a Kafka Streams en una plataforma de procesamiento relacional de flujos,1 y nos permite no sólo crear aplicaciones de procesamiento de flujos, sino también microservicios de baja latencia basados en eventos.

Por último, el procesamiento de flujos con estado nos permite comprender nuestros datos utilizando modelos mentales más sofisticados. Un punto de vista especialmente interesante procede de Neil Avery, que analiza las diferencias entre hechos y comportamientos en su debate sobre el pensamiento "primero los acontecimientos":

Un acontecimiento representa un hecho, algo que ha ocurrido; es inmutable...

Las aplicaciones sin estado, como las que hemos analizado en el capítulo anterior, se basan en hechos. Cada evento se trata como un hecho independiente y atómico, que puedeprocesarse utilizando una semántica inmutable (piensa en inserciones en un flujo interminable), y luego olvidarse.

Sin embargo, además de aprovechar los operadores sin estado para filtrar, bifurcar, combinar y transformar hechos, podemos hacer preguntas aún más avanzadas a nuestros datos si aprendemos a modelar comportamientos utilizando operadores con estado. ¿Qué son los comportamientos? Según Neil

La acumulación de hechos capta el comportamiento.

Verás, los acontecimientos (o hechos) rara vez ocurren de forma aislada en el mundo real. Todo está interconectado, y al capturar y recordar hechos, podemos empezar a comprender su significado. Esto es posible comprendiendo los hechos en su contexto histórico más amplio, o mirando otros hechos relacionados que hayan sido capturados y almacenados por nuestra aplicación.

Un ejemplo popular es el abandono de la cesta de la compra, que es un comportamiento compuesto por múltiples hechos: un usuario añade uno o más artículos a una cesta de la compra y, a continuación, se termina la sesión, ya sea manualmente (por ejemplo, el usuario cierra la sesión) o automáticamente (por ejemplo, debido a un largo período de inactividad). Procesar cualquiera de los dos hechos de forma independiente nos dice muy poco sobre en qué punto del proceso de compra se encuentra el usuario. Sin embargo, recopilar, recordar y analizar cada uno de los hechos (que es lo que permite el procesamiento de estados) nos permite reconocer el comportamiento y reaccionar ante él, y proporciona un valor empresarial mucho mayor que ver el mundo como una serie de hechos no relacionados.

Ahora que entendemos las ventajas del procesamiento de flujos con estado y las diferencias entre hechos y comportamientos, vamos a ver un avance de los operadores con estado de Kafka Streams.

Vista previa de los Operadores con Estado

Kafka Streams incluye varios operadores de estado que podemos utilizar en nuestras topologías de procesador . La Tabla 4-1 incluye un resumen de varios operadores con los que trabajaremos en este libro.

Tabla 4-1. Operadores con estado y su finalidad
Caso práctico Propósito Operarios

Unir datos

Enriquece un evento con información adicional de o contexto que se capturó en un flujo o tabla independiente

  • join (unión interna)

  • leftJoin

  • outerJoin

Agregación de datos

Calcula una transformación matemática o combinatoria de sucesos relacionados que se actualice continuamente

  • aggregate

  • count

  • reduce

Datos de la ventana

Agrupa acontecimientos que tienen una proximidad temporal cercana

  • windowedBy

Además, en podemos combinar operadores de estado en Kafka Streams para comprender relaciones/comportamientos aún más complejos entre eventos. Por ejemplo, realizar una unión en ventana nos permite comprender cómo se relacionan flujos de eventos discretos durante un determinado periodo de tiempo. Como veremos en el próximo capítulo, las agregaciones por ventanas son otra forma útil de combinar operadores con estado.

Ahora bien, en comparación con los operadores sin estado que vimos en el capítulo anterior, los operadores con estado son más complejos y tienen requisitos adicionales de cálculo y almacenamiento.2 adicionales. Por esta razón, pasaremos algún tiempo aprendiendo sobre el funcionamiento interno del procesamiento con estado en Kafka Streams antes de empezar a utilizar los operadores con estado enumerados en la Tabla 4-1.

Quizá el punto de partida más importante sea ver cómo se almacena y consulta el estado en Kafka Streams.

Tiendas estatales

Ya hemos establecido en que las operaciones con estado requieren que nuestra aplicación mantenga cierta memoria de los eventos vistos anteriormente. Por ejemplo, una aplicación que cuente el número de registros de errores que ve necesita mantener un único número para cada clave: un recuento rodante que se actualiza cada vez que se consume un nuevo registro de errores. Este recuento representa el contexto histórico de un registro y, junto con la clave del registro, pasa a formar parte del estado de la aplicación.

Para soportar las operaciones con estado, necesitamos una forma de almacenar y recuperar los datos recordados, o estado, requeridos por cada operador con estado de nuestra aplicación (por ejemplo, count, aggregate, join, etc.). La abstracción de almacenamiento que aborda estas necesidades en Kafka Streams se denomina almacén de estado, y puesto que una sola aplicación Kafka Streams puede aprovechar muchos operadores con estado, una sola aplicación puede contener varios almacenes de estado.

Nota

Esta sección proporciona información de bajo nivel sobre cómo se captura y almacena el estado en Kafka Streams. Si estás impaciente por empezar con el tutorial, no dudes en saltar a "Presentación de nuestro tutorial: Tabla de clasificación de videojuegos" y volver a visitar esta sección más adelante.

Hay muchas implementaciones de almacenes de estado y posibilidades de configuración disponibles en Kafka Streams, cada una con ventajas, compensaciones y casos de uso específicos. Siempre que utilices un operador con estado en tu aplicación Kafka Streams, es útil considerar qué tipo de almacén de estado necesita el operador, y también cómo configurar el almacén de estado en función de tus criterios de optimización (por ejemplo, ¿estás optimizando un alto rendimiento, simplicidad operativa, tiempos de recuperación rápidos en caso de fallo, etc.). En la mayoría de los casos, Kafka Streams elegirá un valor predeterminado razonable si no especificas explícitamente un tipo de almacén de estado o anulas las propiedades de configuración de un almacén de estado.

Dado que la variación en los tipos y configuraciones de los almacenes de estado hace que éste sea un tema bastante profundo, centraremos inicialmente nuestra discusión en las características comunes de todas las implementaciones de almacenes de estado por defecto, y luego echaremos un vistazo a las dos grandes categorías de almacenes de estado: los almacenes persistentes y los almacenes en memoria. En el Capítulo 6 trataremos más en profundidad los almacenes de estado, y a medida que vayamos encontrando temas específicos en nuestrotutorial.

Características comunes

Las implementaciones predeterminadas de almacenes de estado incluidas en Kafka Streams comparten algunas propiedades comunes. Discutiremos estos puntos en común en esta sección para tener una mejor idea de cómo funcionan los almacenes de estado.

Embedded

Las implementaciones predeterminadas de almacenes de estado que se incluyen en Kafka Streams están incrustadas dentro de tu aplicación Kafka Streams a nivel de tarea (hablamos por primera vez de las tareas en "Tareas e hilos de flujo"). La ventaja de los almacenes de estado incrustados, frente al uso de un motor de almacenamiento externo, es que este último requeriría una llamada a la red cada vez que fuera necesario acceder al estado, y por tanto introduciría latencia y cuellos de botella de procesamiento innecesarios. Además, como los almacenes de estado están integrados en el nivel de tarea, se elimina toda una clase de problemas de concurrencia para acceder al estado compartido.

Además, si los almacenes de estado fueran remotos, tendrías que preocuparte de la disponibilidad del sistema remoto por separado de tu aplicación Kafka Streams. Permitir que Kafka Streams gestione un almacén de estado local garantiza que siempre estará disponible y reduce bastante la superficie de error. Un almacén remoto centralizado sería aún peor, ya que se convertiría en un único punto de fallo para todas las instancias de tu aplicación. Por tanto, la estrategia de Kafka Streams de colocar el estado de una aplicación junto a la propia aplicación no sólo mejora el rendimiento (como se ha comentado en el párrafo anterior), sino también la disponibilidad.

Todos los almacenes de estado predeterminados utilizan RocksDB. RocksDB es un almacén de valores clave rápido e integrado, desarrollado originalmente en Facebook. Como admite flujos arbitrarios de bytes para almacenar pares clave-valor, funciona bien con Kafka, que también desacopla la serialización del almacenamiento. Además, tanto las lecturas como las escrituras son extremadamente rápidas, gracias a un rico conjunto de optimizaciones realizadas en el código bifurcado LevelDB.3

Múltiples modos de acceso

Los almacenes de estado admiten múltiples modos de acceso y patrones de consulta. Las topologías de procesadores requieren acceso de lectura y escritura a los almacenes de estado. Sin embargo, cuando se construyen microservicios utilizando la función de consultas interactivas de Kafka Streams , de la que hablaremos más adelante en "Consultas interactivas", los clientes sólo necesitan acceso de lectura al estado subyacente. Esto garantiza que el estado nunca sea mutable fuera de la topología del procesador, y se consigue mediante una envoltura dedicada de sólo lectura que los clientes pueden utilizar para consultar de forma segura el estado de una aplicación Kafka Streams.

Tolerante a fallos

Por defecto, los almacenes de estado están respaldados por temas de registro de cambios en Kafka.4 En caso de fallo, los almacenes de estado pueden restaurarse reproduciendo los eventos individuales del tema de registro de cambios subyacente para reconstruir el estado de una aplicación. Además, Kafka Streams permite a los usuarios habilitar réplicas en espera para reducir el tiempo que se tarda en reconstruir el estado de una aplicación. Estas réplicas en espera (a veces llamadas copias en la sombra) hacen que los almacenes de estado sean redundantes, lo cual es una característica importante de los sistemas de alta disponibilidad. Además, las aplicaciones que permiten que se consulte su estado pueden confiar en las réplicas en espera para servir el tráfico de consulta cuando otras instancias de la aplicación se caen, lo que también contribuye a la alta disponibilidad.

Basado en claves

Las operaciones que aprovechan los almacenes de estado se basan en claves. La clave de un registro define la relación entre el evento actual y otros eventos. La estructura de datos subyacente variará en función del tipo de almacén de estado que decidas utilizar,5 pero cada implementación puede conceptualizarse como alguna forma de almacén clave-valor, donde las claves pueden ser simples, o incluso compuestas (es decir, multidimensionales) en algunos casos.6

Nota

Para complicar un poco las cosas, Kafka Streams se refiere explícitamente a ciertos tipos de almacenes de estado como almacenes clave-valor, aunque todos los almacenes de estado por defecto están basados en claves. Cuando nos referimos a almacenes de valor-clave en este capítulo y en otras partes de este libro, nos referimos a almacenes de estado sin ventana (los almacenes de estado con ventana se tratarán en el capítulo siguiente).

Ahora que entendemos los puntos en común entre los almacenes de estado predeterminados en Kafka Streams, veamos dos grandes categorías de almacenes de estado para comprender las diferencias entre determinadas implementaciones.

Almacenes persistentes frente a almacenes en memoria

Uno de los diferenciadores más importantes entre las distintas implementaciones de almacenes de estado es si el almacén de estado es persistente o no, o si simplemente almacena la información recordada en memoria (RAM ). Los almacenes de estado persistentes vuelcan el estado al disco de forma asíncrona (a un directorio de estado configurable), lo que tiene dos ventajas principales:

  • El estado puede superar el tamaño de la memoria disponible.

  • En caso de fallo, los almacenes persistentes pueden restaurarse más rápidamente que los almacenes en memoria.

Para aclarar el primer punto, un almacén de estado persistente puede mantener parte de su estado en memoria, mientras escribe en disco cuando el tamaño del estado se hace demasiado grande (esto se denomina derramar en disco) o cuando el búfer de escritura supera un valor configurado. En segundo lugar, como el estado de la aplicación persiste en el disco, Kafka Streams no necesita reproducir todo el tema para reconstruir el almacén de estado cuando éste se pierde (por ejemplo, debido a un fallo del sistema, una migración de instancia, etc.). Sólo necesita reproducir los datos que falten entre el momento en que la aplicación dejó de funcionar y el momento en que volvió a funcionar.

Consejo

El directorio del almacén de estados utilizado para los almacenes persistentes puede establecerse utilizando la propiedad StreamsConfig.STATE_DIR_CONFIG. La ubicación predeterminada es /tmp/kafka-streams, pero se recomienda encarecidamente que la sustituyas por un directorio fuera de /tmp.

El inconveniente es que los almacenes de estado persistentes son operativamente más complejos y pueden ser más lentos que un almacén en memoria puro, que siempre extrae datos de la RAM. La complejidad operativa adicional proviene del requisito de almacenamiento secundario (es decir, almacenamiento basado en disco) y, si necesitas ajustar el almacén de estado, de la comprensión de RocksDB y sus configuraciones (esto último puede no ser un problema para la mayoría de las aplicaciones).

En cuanto a las ganancias de rendimiento de un almacén de estado en memoria, puede que no sean lo suficientemente drásticas como para justificar su uso (ya que la recuperación de fallos lleva más tiempo). Añadir más particiones para paralelizar el trabajo siempre es una opción si necesitas exprimir más el rendimiento de tu aplicación. Por tanto, mi recomendación es que empieces con almacenes persistentes y sólo cambies a almacenes en memoria si has medido una mejora notable del rendimiento y, cuando se trate de una recuperación rápida (por ejemplo, en caso de que se pierda el estado de tu aplicación), utilizas réplicas en espera para reducir el tiempo de recuperación.

Ahora que ya sabemos qué son los almacenes de estado y cómo permiten el procesamiento basado en el estado/comportamiento, echemos un vistazo al tutorial de este capítulo y veamos algunas de estas ideas en acción.

Presentamos nuestro tutorial: Tabla de clasificación de videojuegos

En este capítulo, aprenderemos sobre el procesamiento de estados implementando una tabla de clasificación de videojuegos con Kafka Streams. La industria del videojuego es un excelente ejemplo en el que destaca el procesamiento de flujos, ya que tanto los jugadores como los sistemas de juego requieren un procesamiento de baja latencia y una respuesta inmediata. Esta es una de las razones por las que empresas como Activision (la empresa que está detrás de juegos como Call of Duty y remasterizaciones de Crash Bandicoot y Spyro) utilizan Kafka Streams para procesar la telemetría de los videojuegos.7

La tabla de clasificación que vamos a construir requerirá que modelemos los datos de formas que aún no hemos explorado. En concreto, veremos cómo utilizar las abstracciones de tabla de Kafka Streams para modelar los datos como una secuencia de actualizaciones. Luego, nos sumergiremos en los temas de unir y agregar datos, que son útiles siempre que necesites comprender o calcular la relación entre varios eventos. Estos conocimientos te ayudarán a resolver problemas empresariales más complicados con Kafka Streams.

Una vez que hayamos creado nuestra tabla de clasificación en tiempo real utilizando un nuevo conjunto de operadores con estado, demostraremos cómo consultar Kafka Streams para obtener la información más reciente de la tabla de clasificación mediante consultas interactivas. Nuestro debate sobre esta función te enseñará a construir microservicios basados en eventos con Kafka Streams, lo que a su vez ampliará el tipo de clientes con los que podemos compartir datos desde nuestras aplicaciones de procesamiento de flujos.8

Sin más preámbulos, echemos un vistazo a la arquitectura de nuestra tabla de clasificación de videojuegos. La Figura 4-1 muestra el diseño de la topología que implementaremos en este capítulo. Después del diagrama se incluye información adicional sobre cada paso.

Figura 4-1. La topología que implementaremos en nuestra aplicación de clasificación de videojuegos con estado
1

Nuestro clúster Kafka contiene tres temas:

  • El tema score-events contiene los resultados de los partidos. Los registros no tienen clave y, por tanto, se distribuyen de forma circular por las particiones del tema.

  • El tema players contiene perfiles de jugadores. Cada registro tiene un identificador de jugador.

  • El tema products contiene información sobre productos de varios videojuegos. Cada registro tiene un identificador de producto.

2

Necesitamos enriquecer nuestros datos de eventos de puntuación con información detallada de los jugadores. Podemos conseguirlo mediante una unión.

3

Una vez que hemos enriquecido los datos de score-events con los datos de los jugadores, tenemos que añadir información detallada sobre los productos al flujo resultante. Esto también puede hacerse mediante una unión.

4

Como la agrupación de datos es un requisito previo para la agregación, necesitamos agrupar el flujo enriquecido.

5

Necesitamos calcular las tres puntuaciones más altas de cada juego. Para ello, podemos utilizar los operadores de agregación de Kafka Streams.

6

Por último, necesitamos exponer externamente las puntuaciones más altas de cada juego. Para ello, construiremos un microservicio RESTful utilizando la función de consultas interactivas de Kafka Streams.

Con nuestro diseño de topología en la mano, ahora podemos pasar a la configuración del proyecto.

Configuración del proyecto

El código de este capítulo se encuentra enhttps://github.com/mitch-seymour/mastering-kafka-streams-and-ksqldb.git.

Si quieres hacer referencia al código a medida que avanzamos por cada paso de la topología, clona el repositorio y cambia al directorio que contiene el tutorial de este capítulo. El siguiente comando será suficiente:

$ git clone git@github.com:mitch-seymour/mastering-kafka-streams-and-ksqldb.git
$ cd mastering-kafka-streams-and-ksqldb/chapter-04/video-game-leaderboard

Puedes construir el proyecto en cualquier momento ejecutando el siguiente comando:

$ ./gradlew build --info

Ahora que nuestro proyecto está configurado, vamos a empezar a implementar nuestra tabla de clasificación del videojuego.

Modelos de datos

Como siempre, empezaremos definiendo nuestros modelos de datos. Dado que los temas fuente contienen datos JSON, definiremos nuestros modelos de datos utilizando clases de datos POJO, que serializaremos y deserializaremos utilizando la biblioteca de serialización JSON de nuestra elección (a lo largo de este libro, utilizamos Gson, pero podrías utilizar fácilmente Jackson u otra biblioteca). 9

Me gusta agrupar mis modelos de datos en un paquete dedicado de mi proyecto, por ejemplo, com.magicalpipelines.model. Aquí se muestra una vista del sistema de archivos donde se encuentran las clases de datos de este tutorial:

src/
└── main
    └── java
        └── com
            └── magicalpipelines
                └── model
                    ├── ScoreEvent.java 1
                    ├── Player.java 2
                    └── Product.java 3
1

La clase de datos ScoreEvent.java se utilizará para representar los registros del tema score-events.

2

La clase de datos Player.java se utilizará para representar los registros del tema players.

3

La clase de datos Product.java se utilizará para representar los registros del tema products.

Ahora que sabemos qué clases de datos tenemos que implementar, vamos a crear una clase de datos para cada tema. La Tabla 4-2 muestra los POJOs resultantes que hemos implementado para este tutorial.

Tabla 4-2. Ejemplos de registros y clases de datos para cada tema
Tema Kafka Ejemplo de registro Clase de datos

score-events

{
  "score": 422,
  "product_id": 6,
  "player_id": 1
}
public class ScoreEvent {
  private Long playerId;
  private Long productId;
  private Double score;
}

players

{
  "id": 2,
  "name": "Mitch"
}
public class Player {
  private Long id;
  private String name;
}

products

{
  "id": 1,
  "name": "Super Smash Bros"
}
public class Product {
  private Long id;
  private String name;
}
Nota

Ya hemos hablado en detalle de la serialización y deserialización en "Serialización/Deserialización". En el tutorial de ese capítulo, implementamos nuestro propio serializador, deserializador y Serdes personalizados. No dedicaremos más tiempo a eso aquí, pero puedes consultar el código de este tutorial para ver cómo hemos implementado los Serdes para cada una de las clases de datos que se muestran en la Tabla 4-2.

Añadir los procesadores de origen

Una vez definidas nuestras clases de datos, podemos configurar nuestros procesadores fuente. En esta topología, necesitamos tres de ellos, ya que leeremos de tres temas fuente. Lo primero que tenemos que hacer al añadir un procesador fuente es determinar qué abstracción de Kafka Streams debemos utilizar para representar los datos en el tema subyacente.

Hasta ahora, sólo hemos trabajado con la abstracción KStream, que se utiliza para representar flujos de registros sin estado. Sin embargo, nuestra topología requiere que utilicemos los temas products y players como búsquedas, por lo que esto es un buen indicio de que una abstracción tipo tabla puede ser apropiada para estos temas.10 Antes de empezar a asignar nuestros temas a las abstracciones de Kafka Streams, revisemos primero la diferencia entre las representaciones KStream, KTable y GlobalKTable de un tema Kafka. A medida que revisemos cada abstracción, rellenaremos la abstracción apropiada para cada tema en la Tabla 4-3.

Tabla 4-3. El mapeo tema-abstracción que actualizaremos en las siguientes secciones
Tema Kafka Abstracción

score-events

???

players

???

products

???

KStream

Al decidir qué abstracción utilizar, ayuda determinar la naturaleza del tema, la configuración del tema y el espacio de claves de los registros en el tema fuente subyacente. Aunque las aplicaciones Kafka Streams con estado utilizan una o más abstracciones de tabla, también es muy común utilizar KStreams sin estado junto a KTable o GlobalKTable cuando no se necesita la semántica de tabla mutable para una o más fuentes de datos.

En este tutorial, nuestro tema score-events contiene eventos de puntuación brutos, que están sin clave (y, por tanto, distribuidos de forma rotatoria) en un tema sin compactar. Dado que las tablas se basan en claves, esto es un fuerte indicio de que deberíamos utilizar un KStream para nuestro tema score-events sin clave. Podríamos cambiar nuestra estrategia de claves en sentido ascendente (es decir, en cualquier aplicación que esté produciendo nuestro tema fuente), pero eso no siempre es posible. Además, nuestra aplicación se interesa por la puntuación más alta de cada jugador, no por la puntuación más reciente, por lo que la semántica de la tabla (es decir, retener sólo el registro más reciente para una clave determinada) no se traduce bien en la forma en que pretendemos utilizar el tema score-events, aunque tuviera clave.

Por lo tanto, utilizaremos un KStream para el tema score-events, así que actualicemos la tabla de la Tabla 4-3 para reflejar esta decisión del siguiente modo:

Tema Kafka Abstracción

score-events

KStream

players

???

products

???

Los dos temas restantes, players y products, tienen clave, y sólo nos interesa el último registro de cada clave única del tema. Por lo tanto, la abstracción KStream no es ideal para estos temas. Así pues, sigamos adelante y veamos si la abstracción KTable es adecuada para alguno de estos temas.

KTable

El tema players es un tema compactado que contiene perfiles de jugadores, y cada registro tiene como clave el ID del jugador. Como sólo nos interesa el último estado de un jugador, tiene sentido representar este tema mediante una abstracción basada en tablas (ya sea KTable oGlobalKTable).

Algo importante que hay que tener en cuenta al decidir entre utilizar un KTable o un GlobalKTable es el espacio de claves. Si el espacio de claves es muy grande (es decir, tiene una cardinalidad alta/muchas claves únicas), o se espera que crezca hasta convertirse en un espacio de claves muy grande, entonces tiene más sentido utilizar un KTable para poder distribuir fragmentos de todo el estado entre todas las instancias de tu aplicación en ejecución. Particionando el estado de este modo, podemos reducir la sobrecarga de almacenamiento local para cada instancia individual de Kafka Streams.

Quizás una consideración más importante a la hora de elegir entre un KTable o un GlobalKTable es si necesitas o no un procesamiento sincronizado en el tiempo. Un KTable está sincronizado en el tiempo, por lo que cuando Kafka Streams esté leyendo de múltiples fuentes (por ejemplo, en el caso de una unión), mirará la marca de tiempo para determinar qué registro procesar a continuación. Esto significa que una unión reflejará lo que habría sido el registro combinado en un momento determinado, y esto hace que el comportamiento de la unión sea más predecible. Por otra parte, GlobalKTables no está sincronizado en el tiempo, y está "completamente poblado antes de que se realice cualquier procesamiento".11 Por tanto, las uniones siempre se hacen contra la versión más actualizada de un GlobalKTable, lo que cambia la semántica del programa.

En este caso, no vamos a centrarnos demasiado en la segunda consideración, ya que hemos reservado el próximo capítulo para nuestra discusión sobre el tiempo y el papel que desempeña en Kafka Streams. En cuanto al espacio de claves, players contiene un registro por cada jugador único de nuestro sistema. Aunque esto puede ser pequeño dependiendo de dónde nos encontremos en el ciclo de vida de nuestra empresa o producto, es un número que esperamos que crezca significativamente con el tiempo, por lo que utilizaremos una abstracción de KTable para este tema.

La Figura 4-2 muestra cómo el uso de KTable hace que el estado subyacente se distribuya entre varias instancias de aplicación en ejecución.

Figura 4-2. Debes utilizar KTable cuando quieras dividir el estado entre varias instancias de la aplicación y necesites un procesamiento sincronizado en el tiempo.

Nuestra tabla de abstracción actualizada tiene ahora este aspecto:

Tema Kafka Abstracción

score-events

KStream

players

KTable

products

???

Nos queda un tema: el tema products. Este tema es relativamente pequeño, por lo que deberíamos poder replicar el estado en su totalidad en todas las instancias de nuestra aplicación. Echemos un vistazo a la abstracción que nos permite hacerlo: GlobalKTable.

GlobalKTable

El tema products es similar al tema players en cuanto a su configuración (está compactado) y su espacio de claves delimitado (mantenemos el último registro para cada ID de producto único, y sólo hay un número fijo de productos de los que hacemos un seguimiento). Sin embargo, el tema products tiene una cardinalidad mucho menor (es decir, menos claves únicas) que el temaplayers e incluso si nuestra tabla de clasificación registrara las puntuaciones más altas de varios cientos de juegos, esto se traduciría en un espacio de estados lo suficientemente pequeño como para caber por completo en la memoria.

Además de ser más pequeños, los datos del tema products también son relativamente estáticos. Los videojuegos tardan mucho en construirse, así que no esperamos muchas actualizaciones de nuestroproducts tema.

Estas dos características (datos pequeños y estáticos) son para las que se diseñó GlobalKTables. Por lo tanto, utilizaremos un GlobalKTable para nuestro tema products. Como resultado, cada una de nuestras instancias de Kafka Streams almacenará una copia completa de la información del producto, lo que, como veremos más adelante, facilita mucho la realización de uniones.

La Figura 4-3 muestra cómo cada instancia de Kafka Streams mantiene una copia completa de la tablaproducts tabla.

Figura 4-3. Deberías utilizar GlobalKTable cuando tu espacio de claves sea pequeño, quieras evitar los requisitos decopartición de una unión (hablaremos de la copartición en"Copartición") y cuando no sea necesaria la sincronización temporal.

Ahora podemos hacer la actualización final de nuestro mapeo tema-abstracción:

Tema Kafka Abstracción

score-events

KStream

players

KTable

products

GlobalKTable

Ahora que hemos decidido qué abstracción utilizar para cada uno de nuestros temas fuente, podemos registrar los flujos y las tablas.

Registrar flujos y tablas

Registrar flujos y tablas es sencillo. El siguiente bloque de código muestra cómo utilizar el DSL de alto nivel para crear un KStream, KTable, y GlobalKTable utilizando los métodos constructores adecuados:

StreamsBuilder builder = new StreamsBuilder();

KStream<byte[], ScoreEvent> scoreEvents =
    builder.stream(
        "score-events",
        Consumed.with(Serdes.ByteArray(), JsonSerdes.ScoreEvent())); 1

KTable<String, Player> players =
    builder.table(
        "players",
        Consumed.with(Serdes.String(), JsonSerdes.Player())); 2

GlobalKTable<String, Product> products =
    builder.globalTable(
        "products",
        Consumed.with(Serdes.String(), JsonSerdes.Product())); 3
1

Utiliza un KStream para representar datos en el tema score-events, que actualmente no tiene clave.

2

Crea una tabla particionada (o fragmentada) para el tema players, utilizando la abstracción KTable.

3

Crea un GlobalKTable para el tema products, que se replicará íntegramente en cada instancia de la aplicación.

Al registrar los temas fuente, ya hemos implementado el primer paso de nuestra topología de tablas de clasificación (ver Figura 4-1). Pasemos al siguiente paso: unir flujos y tablas.

Únete a

El método más común para combinar conjuntos de datos en el mundo relacional es mediante uniones.12 En los sistemas relacionales, los datos suelen ser muy dimensionales y estar dispersos en muchas tablas diferentes. Es habitual ver estos mismos patrones también en Kafka, ya sea porque los eventos proceden de varias ubicaciones, porque los desarrolladores se sienten cómodos o están acostumbrados a los modelos de datos relacionales, o porque ciertas integraciones de Kafka (por ejemplo, el conector JDBC Kafka, Debezium, Maxwell, etc.) traen consigo tanto los datos en bruto como los modelos de datos de los sistemas de origen.

Independientemente de cómo se dispersen los datos en Kafka, poder combinar datos en flujos y tablas separados basándose en relaciones abre la puerta a oportunidades de enriquecimiento de datos más avanzadas en Kafka Streams. Además, el método de unión para combinar conjuntos de datos es muy diferente de la simple fusión de flujos, como vimos en la Figura 3-6. Cuando utilizamos el operador merge en Kafka Streams, los registros de ambos lados de la fusión se combinan incondicionalmente en un único flujo. Las operaciones de fusión simples son, por tanto, sin estado, ya que no necesitan contexto adicional sobre los eventos que se están fusionando.

Las uniones, sin embargo, pueden considerarse como un tipo especial de fusión condicional que se preocupa por la relación entre los eventos, y en la que los registros no se copian textualmente en el flujo de salida, sino que se combinan. Además, estas relaciones deben capturarse, almacenarse y referenciarse en el momento de la fusión para facilitar la unión, lo que hace que la unión sea una operación con estado. La Figura 4-4 muestra una representación simplificada de cómo funciona un tipo de unión (hay varios tipos de uniones, como veremos en la Tabla 4-5).

Al igual que los sistemas relacionales, Kafka Streams incluye soporte para múltiples tipos de uniones. Así que, antes de aprender a unir nuestro flujo score-events con nuestra tabla players, familiaricémonos primero con los distintos operadores de unión que tenemos a nuestra disposición para poder seleccionar la mejor opción para nuestro caso de uso particular.

Figura 4-4. Unir mensajes

Unir Operarios

Kafka Streams incluye tres operadores de unión diferentes para unir flujos y tablas. Cada operador se detalla en la Tabla 4-4.

Tabla 4-4. Operadores de unión
Operario Descripción
join

Unión interna. La unión se activa cuando los registros de entrada de ambos lados de la unión comparten la misma clave.

leftJoin

Unión izquierda. La semántica de la unión es diferente según el tipo de unión:

  • Para las uniones de tablas de flujo: una unión se activa cuando se recibe un registro en el lado izquierdo de la unión. Si no hay ningún registro con la misma clave en el lado derecho de la unión, el valor derecho se establece en nulo.

  • Para uniones flujo-flujo y tabla-tabla: la misma semántica que una unión flujo-izquierda, salvo que una entrada en el lado derecho de la unión también puede desencadenar una búsqueda. Si el lado derecho desencadena la unión y no hay ninguna clave coincidente en el lado izquierdo, la unión no producirá ningún resultado.

outerJoin

Unión externa. La unión se activa cuando se recibe un registro en cualquiera de los lados de la unión. Si no hay ningún registro coincidente con la misma clave en el lado opuesto de la unión, el valor correspondiente se establece como nulo.

Nota

Cuando hablamos de las diferencias entre los operadores join, nos referimos a los distintos lados de la unión. Recuerda que el lado derecho de la unión siempre se pasa como parámetro al operador join correspondiente. Por ejemplo

KStream<String, ScoreEvent> scoreEvents = ...;
KTable<String, Player> players = ...;

scoreEvents.join(players, ...); 1
1

scoreEvents es el lado izquierdo de la unión. players es el lado derecho de la unión.

Veamos ahora el tipo de uniones que podemos crear con estos operadores.

Tipos de unión

Kafka Streams admite muchos tipos diferentes de uniones, como se muestra en la Tabla 4-5. La columnade coparticionamiento se refiere a algo que trataremos en "Coparticionamiento". Por ahora, basta con entender que la co-partición es simplemente un conjunto adicional de requisitos que se necesitan para realizar realmente la unión.

Tabla 4-5. Tipos de unión
Tipo Ventana Operarios Se requiere copartición

KStream-KStream

a

  • join

  • leftJoin

  • outerJoin

KTable-KTable

No

  • join

  • leftJoin

  • outerJoin

KStream-KTable

No

  • join

  • leftJoin

KStream-GlobalKTable

No

  • join

  • leftJoin

No

a Una cosa clave que hay que tener en cuenta es que las uniones de KStream-KStream tienen ventana. Hablaremos de ello en detalle en el próximo capítulo.

Los dos tipos de uniones que tenemos que realizar en este capítulo son:

  • KStream-KTable a unir el score-events KStream y el players KTable

  • KStream-GlobalKTable para unir la salida de la unión anterior con la products GlobalKTable

Utilizaremos una unión interna, utilizando el operador join para cada una de las uniones, ya que sólo queremos que la unión se active cuando haya una coincidencia en ambos lados. Sin embargo, antes de hacerlo, podemos ver que la primera unión que vamos a crear (KStream-KTable) muestra que es necesaria la co-partición. Echemos un vistazo a lo que eso significa antes de escribir más código.

Copartición

Si un árbol cae en un bosque y no hay nadie cerca para oírlo, ¿hace ruido?

Aforismo

Este famoso experimento mental plantea una pregunta sobre qué papel tiene un observador en la ocurrencia de un evento (en este caso, el sonido que se produce en un bosque). Del mismo modo, en Kafka Streams, siempre debemos ser conscientes del efecto que tiene un observador en el procesamiento de un evento.

En "Tareas e hilos de flujo", aprendimos que cada partición se asigna a una única tarea Kafka Streams, y estas tareas actuarán como observadores en nuestra analogía, ya que son las responsables de consumir y procesar realmente los eventos. Como no hay garantía de que los eventos de diferentes particiones sean gestionados por la misma tarea Kafka Streams, tenemos un problema potencial de observabilidad.

La Figura 4-5 muestra el problema básico de la observabilidad. Si los eventos relacionados son procesados por tareas diferentes, no podremos determinar con precisión la relación entre los eventos porque tenemos dos observadores separados. Dado que el objetivo de unir datos es combinar eventos relacionados, un problema de observabilidad hará que nuestras uniones fallen cuando, de otro modo, deberían tener éxito.

Para comprender la relación entre los sucesos (mediante uniones) o calcular agregaciones sobre una secuencia de sucesos, necesitamos asegurarnos de que los sucesos relacionados se dirigen a la misma partición, y por tanto son tratados por la misma tarea.

Para garantizar que los eventos relacionados se enrutan a la misma partición, debemos asegurarnos de que se cumplen los siguientes requisitos de co-partición:

  • Los registros de ambos lados deben tener como clave el mismo campo, y deben particionarse con esa clave utilizando la misma estrategia de partición.

  • Los temas de entrada de ambos lados de la unión deben contener el mismo número de particiones. (Éste es el único requisito que se comprueba al inicio. Si no se cumple este requisito, se lanzará un TopologyBuilderException ).

Figura 4-5. Si queremos unir registros relacionados, pero estos registros no siempre son procesados por la misma tarea, entonces tenemos un problema de observabilidad

En este tutorial, cumplimos todos los requisitos para realizar una unión KTable-KTable excepto el primero. Recuerda que los registros del tema score-events no tienen clave, pero que nos uniremos con el tema players KTable , que tiene clave por ID de jugador. Por lo tanto, antes de realizar la unión, tenemos que volver a codificar los datos de score-events por ID de jugador. Esto puede hacerse utilizando el operador selectKey, como se muestra en el Ejemplo 4-1.

Ejemplo 4-1. El operador selectKey nos permite reintroducir registros; esto suele ser necesario para cumplir los requisitos de co-partición para realizar ciertos tipos de uniones
KStream<String, ScoreEvent> scoreEvents =
  builder
    .stream(
      "score-events",
      Consumed.with(Serdes.ByteArray(), JsonSerdes.ScoreEvent()))
    .selectKey((k, v) -> v.getPlayerId().toString()); 1
1

selectKey se utiliza para cambiar la clave de los registros. En este caso, nos ayuda a cumplir el primer requisito de la co-partición, que consiste en garantizar que los registros de ambos lados de la unión (los datos de score-events y los de players ) estén codificados por el mismo campo.

En la Figura 4-6 se muestra una visualización de cómo se reintroducen los registros.

Figura 4-6. Reintroducir mensajes garantiza que los registros relacionados aparezcan en la misma partición
Nota

Cuando añadimos un operador de cambio de clave a nuestra topología, los datos subyacentes se marcarán para su repartición. Esto significa que en cuanto añadamos un operador descendente que lea la nueva clave, Kafka Streams lo hará:

  • Envía los datos reescritos a un tema interno de repartición

  • Vuelve a leer los datos recién recodificados en Kafka Streams

Este proceso garantiza que los registros relacionados (es decir, los registros que comparten la misma clave) serán procesados por la misma tarea en pasos de topología posteriores. Sin embargo, el viaje por la red necesario para redirigir los datos a un tema de repartición especial significa que las operaciones de cambio de clave pueden ser caras.

¿Qué ocurre con nuestro KStream-GlobalKTable para unirnos al tema products? Como se muestra en la Tabla 4-5, la co-partición no es necesaria para las uniones GlobalKTable, ya que el estado está totalmente replicado en cada instancia de nuestra aplicación Kafka Streams. Por lo tanto, nunca nos encontraremos con este tipo de problema de observabilidad con una unión a GlobalKTable.

Ya casi estamos listos para unir nuestros flujos y tablas. Pero antes, veamos cómo se combinan realmente los registros durante una operación de unión.

Ensambladoras de valor

Al realizar una unión mediante SQL tradicional, sólo tenemos que utilizar el operador de unión junto con una cláusula SELECT para especificar la forma (o proyección) del registro de unión combinado. Por ejemplo

SELECT a.customer_name, b.purchase_amount 1
FROM customers a
LEFT JOIN purchases b
ON a.customer_id = b.customer_id
1

La proyección del registro de la unión combinada incluye dos columnas.

Sin embargo, en Kafka Streams, necesitamos utilizar un ValueJoiner para especificar cómo deben combinarse los distintos registros. Un ValueJoiner simplemente toma cada registro que participa en la unión y produce un nuevo registro combinado. Si nos fijamos en la primera unión, en la que necesitamos unir el score-events KStream con el players KTable , el comportamiento del juntador de valores podría expresarse utilizando el siguiente pseudocódigo:

(scoreEvent, player) -> combine(scoreEvent, player);

Pero podemos hacerlo mucho mejor. Es más típico tener una clase de datos dedicada que haga una de las siguientes cosas:

  • Envuelve cada uno de los valores implicados en la unión

  • Extrae los campos relevantes de cada lado de la unión, y guarda los valores extraídos en propiedades de clase

A continuación exploraremos ambos enfoques. En primer lugar, empecemos con una clase envolvente sencilla para la unión score-events -> players. El Ejemplo 4-2 muestra una implementación sencilla de una clase de datos que envuelve el registro de cada lado de la unión.

Ejemplo 4-2. La clase de datos que utilizaremos para construir el registro unido score-events -> players
public class ScoreWithPlayer {
  private ScoreEvent scoreEvent;
  private Player player;

  public ScoreWithPlayer(ScoreEvent scoreEvent, Player player) {1
    this.scoreEvent = scoreEvent; 2
    this.player = player;
  }

  // accessors omitted from brevity
}
1

El constructor contiene un parámetro para cada lado de la unión. El lado izquierdo de la unión contiene ScoreEvent, y el lado derecho contiene un Player.

2

Simplemente guardamos una referencia a cada registro implicado en la unión dentro de nuestra clase envoltorio.

Podemos utilizar nuestra nueva clase envoltorio como tipo de retorno en nuestro ValueJoiner. El Ejemplo 4-3 muestra un ejemplo de implementación de un ValueJoiner que combinaun ScoreEvent (del score-events KStream ) y un Player (del playersKTable ) en una instancia ScoreWithPlayer.

Ejemplo 4-3. El ValueJoiner para combinar score-events y players
ValueJoiner<ScoreEvent, Player, ScoreWithPlayer> scorePlayerJoiner =
        (score, player) -> new ScoreWithPlayer(score, player); 1
1

También podríamos utilizar aquí simplemente una referencia a un método estático, comoScoreWithPlayer::new.

Pasemos a la segunda unión. Esta unión necesita combinar un ScoreWithPlayer (de la salida de la primera unión) con un Product (del products GlobalKTable ). Podríamos reutilizar de nuevo el patrón envoltorio, pero también podríamos simplemente extraer las propiedades que necesitamos de cada lado de la unión, y descartar el resto.

El siguiente bloque de código muestra una implementación de una clase de datos que sigue el segundo patrón. Simplemente extraemos los valores que queremos y los guardamos en las propiedades apropiadas de la clase:

public class Enriched {
  private Long playerId;
  private Long productId;
  private String playerName;
  private String gameName;
  private Double score;

  public Enriched(ScoreWithPlayer scoreEventWithPlayer, Product product) {
    this.playerId = scoreEventWithPlayer.getPlayer().getId();
    this.productId = product.getId();
    this.playerName = scoreEventWithPlayer.getPlayer().getName();
    this.gameName = product.getName();
    this.score = scoreEventWithPlayer.getScoreEvent().getScore();
  }

  // accessors omitted from brevity
}

Con esta nueva clase de datos, podemos construir nuestro ValueJoiner para la unión KStream-GlobalKTable utilizando el código que se muestra en el Ejemplo 4-4.

Ejemplo 4-4. Un ValueJoiner, expresado como una lambda, que utilizaremos para la unión
ValueJoiner<ScoreWithPlayer, Product, Enriched> productJoiner =
    (scoreWithPlayer, product) -> new Enriched(scoreWithPlayer, product);

Ahora que le hemos dicho a Kafka Streams cómo combinar nuestros registros de unión, podemos realizar las uniones.

Unión de KStream a KTable (unión de reproductores)

Es hora de unir nuestro score-events KStream con nuestro players KTable . Como sólo queremos activar la unión cuando el registro ScoreEvent pueda coincidir con un registro Player (utilizando la clave de registro), realizaremos una unión interna utilizando el operador join, como se muestra aquí:

Joined<String, ScoreEvent, Player> playerJoinParams =
  Joined.with( 1
    Serdes.String(),
    JsonSerdes.ScoreEvent(),
    JsonSerdes.Player()
  );

KStream<String, ScoreWithPlayer> withPlayers =
  scoreEvents.join( 2
    players,
    scorePlayerJoiner, 3
    playerJoinParams
  );
1

Los parámetros de la unión definen cómo deben serializarse las claves y los valores de los registros de la unión.

2

El operador join realiza una unión interna.

3

Este es el ValueJoiner que creamos en el Ejemplo 4-3. Se crea un nuevo valor ScoreWithPlayer a partir de los dos registros de unión. Echa un vistazo a la clase de datos ScoreWithPlayer del Ejemplo 4-2 para ver cómo se pasan los valores izquierdo y derecho de la unión al constructor.

Así de sencillo. Además, si ejecutaras el código en este punto y luego listaras todos los temas disponibles en tu clúster de Kafka, verías que Kafka Streams creó dos nuevos temas internos para nosotros.

Estos temas son:

  • Un tema de repartición para gestionar la operación de reescritura que realizamos en el Ejemplo 4-1.

  • Un tema de registro de cambios para respaldar el almacén de estados, que utiliza el operador de unión. Esto forma parte del comportamiento de tolerancia a fallos que comentamos inicialmente en "Tolerancia a fallos".

Puedes verificarlo con el script de consola kafka-topics:13

$ kafka-topics --bootstrap-server kafka:9092 --list

players
products
score-events
dev-KSTREAM-KEY-SELECT-0000000001-repartition 1
dev-players-STATE-STORE-0000000002-changelog 2
1

Un tema de reparto interno creado por Kafka Streams. Lleva como prefijo el ID de aplicación de nuestra aplicación Kafka Streams (dev).

2

Un tema de registro de cambios interno creado por Kafka Streams. Al igual que el tema de repartición, este tema de registro de cambios también lleva como prefijo el ID de aplicación de nuestra aplicación Kafka Streams.

Bien, estamos listos para pasar a la segunda unión.

Unión de KStream a GlobalKTable (unión de productos)

Como hemos comentado en los requisitos de la co-partición, los registros de ambos lados de una unión GlobalKTable no tienen por qué compartir la misma clave. Como la tarea local tiene una copia completa de la tabla, en realidad podemos realizar una unión utilizando algún atributo del propio valor del registro en el lado del flujo de la unión,14 lo que es más eficaz que tener que volver a teclear los registros a través de un tema de repartición sólo para garantizar que los registros relacionados sean manejados por la misma tarea.

Para realizar una unión KStream-GlobalKTable, necesitamos crear algo llamado KeyValueMapper, cuyo propósito es especificar cómo mapear un registro KStream a un registro GlobalKTable.Para este tutorial, podemos simplemente extraer el ID del producto del valor ScoreWithPlayer para mapear estos registros a un Product, como se muestra aquí:

KeyValueMapper<String, ScoreWithPlayer, String> keyMapper =
  (leftKey, scoreWithPlayer) -> {
    return String.valueOf(scoreWithPlayer.getScoreEvent().getProductId());
  };

Con nuestro KeyValueMapper en su sitio, y también el ValueJoiner que creamos en el Ejemplo 4-4, ya podemos realizar la unión:

KStream<String, Enriched> withProducts =
  withPlayers.join(products, keyMapper, productJoiner);

Esto completa el segundo y tercer paso de nuestra topología de tabla de clasificación (ver Figura 4-1). Lo siguiente que tenemos que hacer es agrupar los registros enriquecidos para poder realizar una agregación.

Agrupar registros

Antes de realizar cualquier agregación de flujos o tablas en Kafka Streams, primero debes agrupar las KStream o KTable que piensas agregar. El propósito de la agrupación es el mismo que el de reagrupar registros antes de la unión: garantizar que los registros relacionados sean procesados por el mismo observador, o tarea de Kafka Streams.

Hay algunas pequeñas diferencias entre agrupar flujos y tablas, así que echaremos un vistazo a cada una de ellas.

Agrupar flujos

Hay dos operadores que pueden utilizarse para agrupar un KStream:

  • groupBy

  • groupByKey

Utilizar groupBy es similar al proceso de reincorporación de un flujo utilizando selectKey, ya que este operador es un operador de cambio de clave y hace que Kafka Streams marque el flujo para su repartición. Si se añade un operador descendente que lea la nueva clave, Kafka Streams creará automáticamente un tema de repartición y enrutará los datos de vuelta a Kafka para completar el proceso de recodificación.

El ejemplo 4-5 muestra cómo utilizar el operador groupBy para agrupar un KStream.

Ejemplo 4-5. Utiliza el operador groupBy para volver a teclear y agrupar un KStream al mismo tiempo
KGroupedStream<String, Enriched> grouped =
  withProducts.groupBy(
      (key, value) -> value.getProductId().toString(), 1
      Grouped.with(Serdes.String(), JsonSerdes.Enriched())); 2
1

Podemos utilizar una lambda para seleccionar la nueva clave, ya que el operador groupBy espera una KeyValueMapper, que resulta ser una interfaz funcional.

2

Grouped nos permite pasar algunas opciones adicionales para la agrupación, incluidos los Serdes de clave y valor a utilizar al serializar los registros.

Sin embargo, si tus registros de no necesitan ser reparticionados, entonces es preferible utilizar el operador groupByKey en su lugar. groupByKey no marcará el flujo para reparticionarlo y, por tanto, tendrá un mayor rendimiento, ya que evita las llamadas de red adicionales asociadas al envío de datos de vuelta a Kafka para reparticionarlos. Aquí se muestra la implementación de groupByKey:

KGroupedStream<String, Enriched> grouped =
    withProducts.groupByKey(
      Grouped.with(Serdes.String(),
      JsonSerdes.Enriched()));

Como queremos calcular las puntuaciones altas de cada ID de producto, y como nuestro flujo enriquecido está actualmente codificado por ID de jugador, utilizaremos la variación groupBy que se muestra en el Ejemplo 4-5 en la topología de la tabla de clasificación.

Independientemente del operador que utilices para agrupar un flujo, Kafka Streams devolverá un nuevo tipo del que no hemos hablado antes: KGroupedStream. KGroupedStream no es más que una representación intermedia de un flujo que nos permite realizar agregaciones. Veremos las agregaciones en breve, pero antes, veamos cómo agrupar KTables.

Tablas de agrupación

A diferencia de la agrupación de flujos, sólo hay un operador disponible para agrupar tablas: groupBy. Además, en lugar de devolver un KGroupedStream, invocar groupBy sobre un KTable devuelve una representación intermedia diferente: KGroupedTable. Por lo demás, el proceso de agrupar KTables es idéntico al de agrupar un KStream. Por ejemplo, si quisiéramos agrupar el players KTable para poder realizar posteriormente alguna agregación (por ejemplo, contar el número de jugadores), podríamos utilizar el siguiente código:

KGroupedTable<String, Player> groupedPlayers =
    players.groupBy(
        (key, value) -> KeyValue.pair(key, value),
        Grouped.with(Serdes.String(), JsonSerdes.Player()));

El bloque de código anterior no es necesario para este tutorial, ya que no necesitamos agrupar la tabla players, pero lo mostramos aquí para demostrar el concepto. Ahora ya sabemos cómo agrupar flujos y tablas, y hemos completado el paso 4 de nuestra topología de procesadores (ver Figura 4-1). A continuación, aprenderemos a realizar agregaciones en Kafka Streams.

Agregaciones

Uno de los pasos finales necesarios para nuestra topología de tabla de clasificación es calcular las puntuaciones más altas de cada juego. Kafka Streams nos proporciona un conjunto de operadores que hace que realizar este tipo de agregaciones sea muy fácil:

  • aggregate

  • reduce

  • count

A alto nivel, las agregaciones no son más que una forma de combinar varios valores de entrada en un único valor de salida. Tendemos a pensar en las agregaciones como operaciones matemáticas, pero no tienen por qué serlo. Mientras que count es una operación matemática que calcula el número de eventos por clave, tanto el operador aggregate como reduce son más genéricos, y pueden combinar valores utilizando cualquier lógica combinacional que especifiques.

Nota

reduce es muy similar a aggregate. La diferencia radica en el tipo de retorno. El operador reduce requiere que la salida de una agregación sea del mismo tipo que la entrada, mientras que el operador aggregate puede especificar un tipo diferente para el registro de salida.

Además, las agregaciones pueden aplicarse tanto a flujos como a tablas. La semántica es un poco diferente en cada uno de ellos, ya que los flujos son inmutables mientras que las tablas son mutables. Esto se traduce en versiones ligeramente diferentes de losoperadores aggregate y reduce, aceptando la versión de flujos dos parámetros: un inicializador y un sumador, y la versión de tablas tres parámetros: un inicializador, un sumador yun restador.15

Veamos cómo agregar flujos creando nuestra agregación de puntuaciones altas.

Agregación de flujos

En esta sección, aprenderemos a aplicar agregaciones a flujos de registros, lo que implica crear una función para inicializar un nuevo valor agregado (llamada inicializador) y una función para realizar agregaciones posteriores a medida que llegan nuevos registros para una clave determinada (llamada función sumadora ). En primer lugar, aprenderemos sobre los inicializadores.

Inicializador

Cuando nuestra topología Kafka Streams ve una nueva clave, necesitamos alguna forma de inicializar la agregación. La interfaz que nos ayuda con esto es Initializer, y como muchas de las clases de la API de Kafka Streams, Initializer es una interfaz funcional (es decir, contiene un único método), y por tanto puede definirse como una lambda.

Por ejemplo, si echaras un vistazo a las partes internas de la agregación count, verías un inicializador que establece el valor inicial de la agregación en 0:

Initializer<Long> countInitializer = () -> 0L; 1
1

El inicializador se define como una lambda, ya que la interfaz Initializer es una interfaz funcional.

Para agregaciones más complejas, puedes proporcionar tu propio inicializador personalizado. Por ejemplo, para implementar una tabla de clasificación de videojuegos, necesitamos alguna forma de calcular las tres puntuaciones más altas de un juego determinado. Para ello, podemos crear una clase independiente que incluya la lógica para rastrear las tres puntuaciones más altas, y proporcionar una nueva instancia de esta clase cada vez que sea necesario inicializar una agregación.

En este tutorial, crearemos una clase personalizada llamada HighScores para que actúe como nuestra clase de agregación. Esta clase necesitará alguna estructura de datos subyacente para contener las tres puntuaciones más altas de un determinado videojuego. Un enfoque es utilizar un TreeSet, que es un conjunto ordenado incluido en la biblioteca estándar de Java, y por lo tanto es bastante conveniente para contener puntuaciones altas (que son inherentemente ordenadas).

Aquí se muestra una implementación inicial de nuestra clase de datos que utilizaremos para la agregación de puntuaciones altas:

public class HighScores {

  private final TreeSet<Enriched> highScores = new TreeSet<>();

}

Ahora tenemos que decirle a Kafka Streams cómo inicializar nuestra nueva clase de datos. Inicializar una clase es sencillo; sólo tenemos que instanciarla:

Initializer<HighScores> highScoresInitializer = HighScores::new;

Una vez que tenemos un inicializador para nuestra agregación, tenemos que implementar la lógica para realizar realmente la agregación (en este caso, llevar la cuenta de las tres puntuaciones más altas de cada videojuego).

Adder

Lo siguiente que tenemos que hacer para construir un agregador de flujos es definir la lógica para combinar dos agregados. Para ello se utiliza la interfaz Aggregator, que, al igual que Initializer, es una interfaz funcional que puede implementarse mediante una lambda. La función de implementación debe aceptar tres parámetros:

  • La clave de registro

  • El valor del registro

  • El valor agregado actual

Podemos crear nuestro agregador de puntuaciones altas con el siguiente código:

Aggregator<String, Enriched, HighScores> highScoresAdder =
        (key, value, aggregate) -> aggregate.add(value);

Ten en cuenta que aggregate es una instancia de HighScores, y como nuestro agregador invoca el método HighScores.add, sólo tenemos que implementarlo en nuestra clase HighScores. Como puedes ver en el siguiente bloque de código, el código es extremadamente sencillo, con el método add simplemente añadiendo una nueva puntuación alta a la interna TreeSet, y luego eliminando la puntuación más baja si tenemos más de tres puntuaciones altas:

public class HighScores {
  private final TreeSet<Enriched> highScores = new TreeSet<>();

  public HighScores add(final Enriched enriched) {
    highScores.add(enriched); 1

    if (highScores.size() > 3) { 2
      highScores.remove(highScores.last());
    }

    return this;
  }
}
1

Cada vez que nuestro método sumador (HighScores.add) es llamado por Kafka Streams, simplemente añadimos el nuevo registro al subyacente TreeSet, que ordenará cada entrada automáticamente.

2

Si tenemos más de tres puntuaciones altas en nuestro TreeSet, elimina la puntuación más baja.

Para que TreeSet sepa cómo ordenar los objetos Enriched (y, por tanto, pueda identificar el registro Enriched con la puntuación más baja para eliminarlo cuando nuestro agregado highScores supere los tres valores), implementaremos la interfaz Comparable, como se muestra en el Ejemplo 4-6.

Ejemplo 4-6. La clase Enriched actualizada, que implementa la interfaz Comparable
public class Enriched implements Comparable<Enriched> { 1

  @Override
  public int compareTo(Enriched o) { 2
    return Double.compare(o.score, score);
  }

  // omitted for brevity
}
1

Actualizaremos nuestra clase Enriched para que implemente Comparable, ya que determinar las tres puntuaciones más altas implicará comparar un objeto Enriched con otro.

2

Nuestra implementación del método compareTo utiliza la propiedad score como método para comparar dos objetos Enriched diferentes.

Ahora que tenemos nuestro inicializador y nuestra función sumadora, podemos realizar la agregación utilizando el código del Ejemplo 4-7.

Ejemplo 4-7. Utilizar el operador de agregación de Kafka Streams para realizar nuestra agregación de puntuaciones altas
KTable<String, HighScores> highScores =
    grouped.aggregate(highScoresInitializer, highScoresAdder);

Tablas de agregación

El proceso de agregar tablas es bastante similar al de agregar flujos. Necesitamos un inicializador y una función sumadora. Sin embargo, las tablas son mutables, y necesitan poder actualizar un valor agregado siempre que se elimine una clave.16 También necesitamos un tercer parámetro, llamado función sustractora.

Sustractor

Aunque esto no es necesario para el ejemplo de la tabla de clasificación, supongamos que queremos contar el número de jugadores en nuestro players KTable . Podríamos utilizar el operador count, pero para demostrar cómo construir una función sustractora, construiremos nuestra propia función agregadora que es esencialmente equivalente al operador count. Aquí se muestra una implementación básica de un agregado que utiliza una función sustractora (así como una función inicializadora y sumadora, que es necesaria para los agregados KStream y KTable ):

KGroupedTable<String, Player> groupedPlayers =
  players.groupBy(
      (key, value) -> KeyValue.pair(key, value),
      Grouped.with(Serdes.String(), JsonSerdes.Player()));

groupedPlayers.aggregate(
    () -> 0L, 1
    (key, value, aggregate) -> aggregate + 1L, 2
    (key, value, aggregate) -> aggregate - 1L); 3
1

La función inicializadora inicializa el agregado a 0.

2

La función sumador incrementa la cuenta actual cuando se ve una nueva tecla.

3

La función sustractor disminuye la cuenta actual cuando se elimina una tecla.

Ya hemos completado el paso 5 de nuestra topología de tabla de clasificación(Figura 4-1). Hemos escrito una cantidad decente de código, así que veamos cómo encajan los fragmentos de código individuales en la siguiente sección.

Ponerlo todo junto

Ahora que hemos construido los pasos de procesamiento individuales de nuestra topología de tabla de clasificación, vamos a unirlo todo. El Ejemplo 4-8 muestra cómo se unen los pasos de la topología que hemos creado hasta ahora.

Ejemplo 4-8. Topología del procesador para nuestra aplicación de clasificación de videojuegos
// the builder is used to construct the topology
StreamsBuilder builder = new StreamsBuilder();

// register the score events stream
KStream<String, ScoreEvent> scoreEvents = 1
    builder
        .stream(
          "score-events",
          Consumed.with(Serdes.ByteArray(), JsonSerdes.ScoreEvent()))
        .selectKey((k, v) -> v.getPlayerId().toString()); 2

// create the partitioned players table
KTable<String, Player> players = 3
    builder.table("players", Consumed.with(Serdes.String(), JsonSerdes.Player()));

// create the global product table
GlobalKTable<String, Product> products = 4
    builder.globalTable(
      "products",
      Consumed.with(Serdes.String(), JsonSerdes.Product()));

// join params for scoreEvents - players join
Joined<String, ScoreEvent, Player> playerJoinParams =
    Joined.with(Serdes.String(), JsonSerdes.ScoreEvent(), JsonSerdes.Player());

// join scoreEvents - players
ValueJoiner<ScoreEvent, Player, ScoreWithPlayer> scorePlayerJoiner =
    (score, player) -> new ScoreWithPlayer(score, player);
KStream<String, ScoreWithPlayer> withPlayers =
    scoreEvents.join(players, scorePlayerJoiner, playerJoinParams); 5

// map score-with-player records to products
KeyValueMapper<String, ScoreWithPlayer, String> keyMapper =
  (leftKey, scoreWithPlayer) -> {
    return String.valueOf(scoreWithPlayer.getScoreEvent().getProductId());
  };

// join the withPlayers stream to the product global ktable
ValueJoiner<ScoreWithPlayer, Product, Enriched> productJoiner =
  (scoreWithPlayer, product) -> new Enriched(scoreWithPlayer, product);
KStream<String, Enriched> withProducts =
  withPlayers.join(products, keyMapper, productJoiner); 6

// Group the enriched product stream
KGroupedStream<String, Enriched> grouped =
  withProducts.groupBy( 7
      (key, value) -> value.getProductId().toString(),
      Grouped.with(Serdes.String(), JsonSerdes.Enriched()));

// The initial value of our aggregation will be a new HighScores instance
Initializer<HighScores> highScoresInitializer = HighScores::new;

// The logic for aggregating high scores is implemented in the HighScores.add method
Aggregator<String, Enriched, HighScores> highScoresAdder =
  (key, value, aggregate) -> aggregate.add(value);

// Perform the aggregation, and materialize the underlying state store for querying
KTable<String, HighScores> highScores =
    grouped.aggregate( 8
        highScoresInitializer,
        highScoresAdder);
1

Lee el score-events en un KStream.

2

Vuelve a codificar los mensajes para cumplir los requisitos de co-partición necesarios para la unión.

3

Lee el tema players en un KTable, ya que el espacio de claves es grande (lo que nos permite repartir el estado entre varias instancias de la aplicación) y porque queremos un procesamiento sincronizado en el tiempo para la unión score-events -> players.

4

Lee el tema products como GlobalKTable, ya que el espacio de claves es pequeño y no necesitamos un procesamiento sincronizado en el tiempo.

5

Une el flujo score-events y la tabla players.

6

Une la score-events enriquecida con la tabla products.

7

Agrupa el flujo enriquecido. Este es un requisito previo para la agregación.

8

Agrega el flujo agrupado. La lógica de agregación reside en la clase HighScores.

Añadamos la configuración necesaria para nuestra aplicación e iniciemos el streaming:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "dev");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

Llegados a este punto, nuestra aplicación está lista para empezar a recibir registros y calcular puntuaciones altas para nuestra tabla de clasificación. Sin embargo, aún nos queda un último paso que abordar para exponer los resultados de la tabla de clasificación a clientes externos. Pasemos al último paso de nuestra topología de procesadores, y aprendamos a exponer el estado de nuestra aplicación Kafka Streams mediante consultas interactivas.

Consultas interactivas

Una de las características definitorias de Kafka Streams es su capacidad para exponer el estado de la aplicación, tanto localmente como al mundo exterior. Esto último facilita la construcción de microservicios basados en eventos con una latencia extremadamente baja. En este tutorial, podemos utilizar consultas interactivas para exponer nuestras agregaciones de alta puntuación.

Para ello, necesitamos materializar el almacén de estados. Aprenderemos a hacerlo en la siguiente sección.

Almacenes materializados

Ya sabemos que los operadores con estado, como aggregate, count, reduce, etc., aprovechan los almacenes de estado para gestionar el estado interno. Sin embargo, si te fijas bien en nuestro método para agregar puntuaciones altas del Ejemplo 4-7, no verás ninguna mención a un almacén de estado. Esta variante del método aggregate utiliza un almacén de estado interno al que sólo accede la topología del procesador.

Si queremos permitir el acceso de sólo lectura al almacén de estado subyacente para consultas ad hoc, podemos utilizar uno de los métodos sobrecargados para forzar la materialización del almacén de estado localmente. Los almacenes de estado materializados difieren de los almacenes de estado internos en que se nombran explícitamente y se pueden consultar fuera de la topología del procesador. Aquí es donde resulta útil la clase Materialized. El Ejemplo 4-9 muestra cómo materializar un almacén persistente de valores clave utilizando la clase Materialized, lo que nos permitirá consultar el almacén mediante consultas interactivas.

Ejemplo 4-9. Almacén de estado materializado con una configuración mínima
KTable<String, HighScores> highScores =
    grouped.aggregate(
        highScoresInitializer,
        highScoresAdder,
        Materialized.<String, HighScores, KeyValueStore<Bytes, byte[]>> 1
            as("leader-boards") 2
            .withKeySerde(Serdes.String()) 3
            .withValueSerde(JsonSerdes.HighScores()));
1

Esta variación del método Materialized.as incluye tres genéricos:

  • El tipo de clave del almacén (en este caso, String)

  • El tipo de valor del almacén (en este caso, HighScores)

  • El tipo de almacén de estado (en este caso, utilizaremos un almacén simple de clave-valor, representado por KeyValueStore<Bytes, byte[]>)

2

Proporciona un nombre explícito al almacén para que pueda consultarse fuera de la topología del procesador.

3

Podemos personalizar el almacén de estado materializado utilizando diversos parámetros, como los Serdes de clave y valor, así como otras opciones que exploraremos en el Capítulo 6.

Una vez que hemos materializado nuestro almacén de estado leader-boards, estamos casi listos para exponer estos datos mediante consultas ad hoc. Sin embargo, lo primero que tenemos que hacer es recuperar el almacén de Kafka Streams.

Acceder a almacenes de estado de sólo lectura

Cuando necesitamos acceder a un almacén de estado en modo sólo lectura, necesitamos dosdatos:

  • El nombre del almacén estatal

  • El tipo de almacén estatal

Como vimos en el Ejemplo 4-9, el nombre de nuestro almacén de estados es leader-boards. Tenemos que recuperar la envoltura de sólo lectura adecuada para nuestro almacén de estado subyacente utilizando la clase de fábrica QueryableStoreTypes. Hay varios almacenes de estado compatibles, entre ellos

  • QueryableStoreTypes.keyValueStore()

  • QueryableStoreTypes.timestampedKeyValueStore()

  • QueryableStoreTypes.windowStore()

  • QueryableStoreTypes.timestampedWindowStore()

  • QueryableStoreTypes.sessionStore()

En nuestro caso, estamos utilizando un almacén simple de clave-valor, por lo que necesitamos el método QueryableStoreType.keyValueStore(). Con el nombre del almacén de estado y el tipo de almacén de estado, podemos instanciar una instancia de un almacén de estado consultable para utilizarlo en consultas interactivas, mediante utilizando el método KafkaStreams.store(), como se muestra en el Ejemplo 4-10.

Ejemplo 4-10. Instanciar un almacén clave-valor que pueda utilizarse para realizar consultas interactivas
ReadOnlyKeyValueStore<String, HighScores> stateStore =
    streams.store(
        StoreQueryParameters.fromNameAndType(
            "leader-boards",
            QueryableStoreTypes.keyValueStore()));

Cuando tengamos nuestra instancia de almacén de estados, podremos consultarla. En la siguiente sección se analizan los distintos tipos de consulta disponibles en los almacenes clave-valor.

Consulta de almacenes clave-valor sin ventanas

Cada tipo de almacén de estado admite distintos tipos de consultas. Por ejemplo, los almacenes de ventana (por ejemplo, ReadOnlyWindowStore) admiten búsquedas de claves utilizando rangos temporales, mientras que los almacenes simples de clave-valor (ReadOnlyKeyValueStore) admiten búsquedas de puntos, escaneos de rangos y consultas de recuento.

Hablaremos de los almacenes de estado con ventanas en el próximo capítulo, así que, de momento, vamos a demostrar los tipos de consultas que podemos hacer a nuestro almacén leader-boards.

La forma más sencilla de determinar qué tipos de consulta están disponibles para tu tipo de almacén de estado es comprobar la interfaz subyacente. Como podemos ver en la definición de la interfaz del siguiente fragmento, los almacenes simples de clave-valor admiten varios tipos diferentes de consultas:

public interface ReadOnlyKeyValueStore<K, V> {

    V get(K key);

    KeyValueIterator<K, V> range(K from, K to);

    KeyValueIterator<K, V> all();

    long approximateNumEntries();
}

Echemos un vistazo a cada uno de estos tipos de consulta, empezando por el primero: las búsquedas puntuales (get()).

Búsqueda de puntos

Quizás el tipo de consulta más común, las búsquedas puntuales simplemente implican consultar el almacén de estado para una clave individual. Para realizar este tipo de consulta, podemos utilizar el método get para recuperar el valor de una clave determinada. Por ejemplo:

HighScores highScores = stateStore.get(key);

Ten en cuenta que una búsqueda puntual devolverá una instancia deserializada del valor (en este caso, un objeto HighScores, ya que es lo que estamos almacenando en nuestro almacén de estado) o null si no se encuentra la clave.

Exploraciones de alcance

Los almacenes simples de clave-valor también admiten consultas de escaneo de rango. Los escaneos de rango devuelven un iterador para un rango inclusivo de claves. Es muy importante cerrar el iterador una vez que hayas terminado con él para evitar fugas de memoria.

El siguiente bloque de código muestra cómo ejecutar una consulta de rango, iterar sobre cada resultado y cerrar el iterador:

KeyValueIterator<String, HighScores> range = stateStore.range(1, 7); 1

while (range.hasNext()) {
    KeyValue<String, HighScores> next = range.next(); 2

    String key = next.key;
    HighScores highScores = next.value; 3

    // do something with high scores object
}

range.close(); 4
1

Devuelve un iterador que puede utilizarse para iterar por cada clave del rango seleccionado.

2

Obtener el siguiente elemento de la iteración.

3

El valor HighScores está disponible en la propiedad next.value.

4

Es muy importante cerrar el iterador para evitar fugas de memoria. Otra forma de cerrarlo es utilizar una sentencia try-con-recursos al obtener el iterador.

Todas las entradas

Similar a un escaneo de rangos, la consulta all() devuelve un iterador de pares clave-valor, y es similar a una consulta SELECT * sin filtrar. Sin embargo, este tipo de consulta devolverá un iterador para todas las entradas de nuestro almacén de estados, en lugar de sólo las que estén dentro de un rango de claves específico. Al igual que con las consultas de rango, es importante cerrar el iterador una vez que hayas terminado con él para evitar fugas de memoria. El siguiente código muestra cómo ejecutar una consulta all(). Iterar por los resultados y cerrar el iterador es lo mismo que la consulta de escaneo de rango, así que hemos omitido esa lógica por brevedad:

KeyValueIterator<String, HighScores> range = stateStore.all();

Número de entradas

Finalmente, el último tipo de consulta es similar a una consulta COUNT(*), y devuelve el número aproximado de entradas en el almacén de estado subyacente.

Nota

Cuando se utilizan almacenes persistentes RocksDB, el valor devuelto es aproximado, ya que calcular un recuento preciso de puede resultar caro y, cuando se trata de almacenes respaldados por RocksDB, también complicado. Tomado de las FAQ de RocksDB:

Obtener un número exacto de claves [en] bases de datos LSM como RocksDB es un problema difícil, ya que tienen claves duplicadas y entradas de borrado (es decir, lápidas) que requerirán una compactación completa para obtener un número exacto de claves. Además, si la base de datos RocksDB contiene operadores de fusión, también hará que el número estimado de claves sea menos preciso.

En cambio, si utilizas un almacén en memoria, el recuento será exacto.

Para ejecutar este tipo de consulta contra un almacén simple de clave-valor, podríamos ejecutar elsiguiente código:

long approxNumEntries = stateStore.approximateNumEntries();

Ahora que sabemos cómo consultar almacenes clave-valor sencillos, veamos desde dónde podemos ejecutar esas consultas.

Consultas locales

Cada instancia de una aplicación Kafka Streams puede consultar su propio estado local. Sin embargo, es importante recordar que, a menos que estés materializando un GlobalKTable o ejecutando una única instancia de tu aplicación Kafka Streams,17 el estado local sólo representará una vista parcial de todo el estado de la aplicación (ésta es la naturaleza de un KTable, como se explica en "KTable").

Por suerte para nosotros, Kafka Streams proporciona algunos métodos adicionales que facilitan la conexión de almacenes de estado distribuidos, y la ejecución de consultas remotas, que nos permiten consultar todo el estado de nuestra aplicación. A continuación aprenderemos sobre las consultas remotas.

Consultas remotas

Para consultar el estado completo de nuestra aplicación, necesitamos

  • Descubre qué instancias contienen los distintos fragmentos del estado de nuestra aplicación

  • Añade una llamada a procedimiento remoto (RPC) o un servicio REST para exponer el estado local a otras instancias de aplicación en ejecución18

  • Añade un cliente RPC o REST para consultar almacenes de estado remotos desde una instancia de aplicación en ejecución

En cuanto a los dos últimos puntos, tienes mucha flexibilidad para elegir qué componentes del servidor y del cliente quieres utilizar para la comunicación entre instancias. En este tutorial, utilizaremos Javalin para implementar un servicio REST debido a su sencilla API. También utilizaremos OkHttp, desarrollado por Square, para nuestro cliente REST por su facilidad de uso. Vamos a añadir estas dependencias a nuestra aplicación actualizando nuestro archivo build.gradle con lo siguiente:

dependencies {

  // required for interactive queries (server)
  implementation 'io.javalin:javalin:3.12.0'

  // required for interactive queries (client)
  implementation 'com.squareup.okhttp3:okhttp:4.9.0'

  // other dependencies
}

Ahora abordemos la cuestión del descubrimiento de instancias. Necesitamos alguna forma de difundir qué instancias se están ejecutando en un momento dado y dónde se están ejecutando. Esto último puede conseguirse utilizando el parámetroAPPLICATION_SERVER_CONFIG para especificar un par de host y puerto en Kafka Streams, como se muestra aquí:

Properties props = new Properties();

props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "myapp:8080"); 1

// other Kafka Streams properties omitted for brevity

KafkaStreams streams = new KafkaStreams(builder.build(), props);
1

Configura un punto final. Esto se comunicará a otras instancias de la aplicación en ejecución a través del protocolo de grupo de consumidores de Kafka. Es importante que utilices un par de IP y puerto que otras instancias puedan utilizar para comunicarse con tu aplicación (es decir, localhost no funcionaría, ya que se resolvería a IP diferentes en función de la instancia).

Ten en cuenta que establecer el parámetro APPLICATION_SERVER_CONFIG config no le dice realmente a Kafka Streams que empiece a escuchar en el puerto que configures. De hecho, Kafka Streams no incluye un servicio RPC incorporado. Sin embargo, esta información de host/puerto se transmite a otras instancias en ejecución de tu aplicación Kafka Streams y está disponible a través de métodos API dedicados, de los que hablaremos más adelante. Pero primero, vamos a configurar nuestro servicio REST para que empiece a escuchar en el puerto apropiado (8080 en este ejemplo).

En términos de mantenimiento del código, tiene sentido definir nuestro servicio REST de la tabla de clasificación en un archivo específico, separado de la definición de la topología. El siguiente bloque de código muestra una implementación sencilla del servicio de tabla de clasificación:

class LeaderboardService {
  private final HostInfo hostInfo; 1
  private final KafkaStreams streams; 2

  LeaderboardService(HostInfo hostInfo, KafkaStreams streams) {
    this.hostInfo = hostInfo;
    this.streams = streams;
  }

  ReadOnlyKeyValueStore<String, HighScores> getStore() { 3
    return streams.store(
        StoreQueryParameters.fromNameAndType(
            "leader-boards",
            QueryableStoreTypes.keyValueStore()));
  }

  void start() {
    Javalin app = Javalin.create().start(hostInfo.port()); 4

    app.get("/leaderboard/:key", this::getKey); 5
  }
}
1

HostInfo es una simple clase envolvente de Kafka Streams que contiene un nombre de host y un puerto. Veremos cómo instanciarla en breve.

2

Necesitamos hacer un seguimiento de la instancia local de Kafka Streams. Utilizaremos algunos métodos de la API en esta instancia en el siguiente bloque de código.

3

Añade un método dedicado para recuperar el almacén de estado que contiene las agregaciones de la tabla de clasificación. Sigue el mismo método para recuperar un almacén de estado de sólo lectura que vimos en el Ejemplo 4-10.

4

Inicia el servicio web basado en Javalin en el puerto configurado.

5

Añadir puntos finales con Javalin es fácil. Simplemente asignamos una ruta URL a un método, que implementaremos en breve. Los parámetros de ruta, que se especifican con dos puntos delante (por ejemplo, :key), nos permiten crear puntos finales dinámicos. Esto es ideal para una consulta de búsqueda de puntos.

Ahora, vamos a implementar el punto final /leaderboard/:key, que mostrará las puntuaciones altas de una clave determinada (que en este caso es un ID de producto). Como hemos aprendido recientemente, podemos utilizar una búsqueda puntual para recuperar un único valor de nuestro almacén de estados. A continuación se muestra una implementación ingenua:

void getKey(Context ctx) {
    String productId = ctx.pathParam("key");
    HighScores highScores = getStore().get(productId); 1
    ctx.json(highScores.toList()); 2
}
1

Utiliza una búsqueda puntual para recuperar un valor del almacén de estado local.

2

Nota: el método toList() está disponible en el código fuente.

Por desgracia, esto no es suficiente. Considera el ejemplo en el que tenemos dos instancias en ejecución de nuestra aplicación Kafka Streams. Dependiendo de qué instancia consultemos y de cuándo emitamos la consulta (el estado puede moverse siempre que haya un reequilibrio del consumidor), es posible que no podamos recuperar el valor solicitado. La Figura 4-7 muestra este enigma.

Figura 4-7. Cuando el estado se reparte entre varias instancias de la aplicación, las consultas locales no son suficientes

Afortunadamente, Kafka Streams proporciona un método llamado queryMetadataForKey,19 que nos permite descubrir la instancia de aplicación (local o remota) en la que vive una clave concreta. En el Ejemplo 4-11 se muestra una implementación mejorada de nuestro método getKey.

Ejemplo 4-11. Una implementación actualizada del método getKey, que aprovecha las consultas remotas para extraer datos de distintas instancias de la aplicación
void getKey(Context ctx) {

  String productId = ctx.pathParam("key");

  KeyQueryMetadata metadata =
      streams.queryMetadataForKey(
        "leader-boards", productId, Serdes.String().serializer()); 1

  if (hostInfo.equals(metadata.activeHost())) {
    HighScores highScores = getStore().get(productId); 2

    if (highScores == null) { 3
      // game was not found
      ctx.status(404);
      return;
    }

    // game was found, so return the high scores
    ctx.json(highScores.toList()); 4
    return;
  }

  // a remote instance has the key
  String remoteHost = metadata.activeHost().host();
  int remotePort = metadata.activeHost().port();
  String url =
    String.format(
        "http://%s:%d/leaderboard/%s",
        remoteHost, remotePort, productId); 5

  OkHttpClient client = new OkHttpClient();
  Request request = new Request.Builder().url(url).build();

  try (Response response = client.newCall(request).execute()) { 6
    ctx.result(response.body().string());
  } catch (Exception e) {
    ctx.status(500);
  }
}
1

queryMetadataForKey nos permite encontrar en qué host debe vivir una clave concreta.

2

Si la instancia local tiene la clave, basta con consultar el almacén de estado local.

3

El método queryMetadataForKey no comprueba realmente si la clave existe. Utiliza el particionador de flujo por defecto20 para determinar dónde existiría la clave , si existiera. Por lo tanto, comprobamos si existe null (que se devuelve si no se encuentra la clave) y devolvemos una respuesta 404 si no existe.

4

Devuelve una respuesta formateada con las puntuaciones más altas.

5

Si hemos llegado hasta aquí, entonces la clave existe en un host remoto, si es que existe. Por tanto, construye una URL utilizando los metadatos, que incluya el host y el puerto de la instancia de Kafka Streams que contendría la clave especificada.

6

Invoca la petición y devuelve el resultado si se ha realizado correctamente.

Para ayudar a visualizar lo que ocurre aquí, la Figura 4-8 muestra cómo pueden conectarse los almacenes de estados distribuidos utilizando una combinación de descubrimiento de instancias y un servicio RPC/REST.

Figura 4-8. Las consultas remotas nos permiten consultar el estado de otras instancias de aplicación en ejecución

Pero, ¿y si necesitas ejecutar una consulta que no opere sobre una única clave? Por ejemplo, ¿y si necesitas contar el número de entradas en todos tus almacenes de estado distribuidos? El queryMetadataForKey no funcionaría bien en este caso, ya que requiere que especifiquemos una única clave. En su lugar, aprovecharíamos otro método de Kafka Streams, llamado allMetadataForStore, que devuelve el punto final de cada aplicación Kafka Streams en ejecución que comparta el mismo ID de aplicación y tenga al menos una partición activa para el nombre de almacén proporcionado.

Vamos a añadir un nuevo punto final a nuestro servicio de clasificación que muestre el número de registros de puntuaciones altas en todas las instancias de la aplicación en ejecución:

app.get("/leaderboard/count", this::getCount);

Ahora, implementaremos el método getCount al que se hace referencia en el código anterior, que aprovecha el método allMetadataForStore para obtener el número total de registros de cada almacén de estado remoto:

void getCount(Context ctx) {
  long count = getStore().approximateNumEntries(); 1
                                          2
  for (StreamsMetadata metadata : streams.allMetadataForStore("leader-boards")) {
    if (!hostInfo.equals(metadata.hostInfo())) {
      continue; 3
    }
    count += fetchCountFromRemoteInstance( 4
      metadata.hostInfo().host(),
      metadata.hostInfo().port());
  }

  ctx.json(count);
}
1

Inicializa la cuenta con el número de entradas del almacén de estado local.

2

En la línea siguiente, utilizamos el método allMetadataForStore para recuperar los pares de host/puerto de cada instancia de Kafka Streams que contiene un fragmento del estado que queremos consultar.

3

Si los metadatos corresponden al host actual, continúa con el bucle, puesto que ya hemos extraído el recuento de entradas del almacén de estado local.

4

Si los metadatos no pertenecen a la instancia local, entonces recupera el recuento de la instancia remota. Hemos omitido los detalles de implementación de fetchCountFromRemoteInstance en este texto, ya que es similar a lo que vimos en el Ejemplo 4-11, en el que instanciamos un cliente REST y emitimos una solicitud contra una instancia de aplicación de eliminación. Si te interesan los detalles de la implementación, consulta el código fuente de este capítulo.

Esto completa el último paso de nuestra topología de tabla de clasificación (ver Figura 4-1). Ahora podemos ejecutar nuestra aplicación, generar algunos datos ficticios y consultar nuestroservicio de clasificación.

Los datos ficticios para cada uno de los temas fuente se muestran en el Ejemplo 4-12.

Nota

Para los temas con clave (players y products), la clave del registro se formatea como<key>|<value>. Para el tema score-events, los registros ficticios se formatean simplemente como <value>.

Ejemplo 4-12. Registros ficticios que produciremos para nuestros temas fuente
# players
1|{"id": 1, "name": "Elyse"}
2|{"id": 2, "name": "Mitch"}
3|{"id": 3, "name": "Isabelle"}
4|{"id": 4, "name": "Sammy"}

# products
1|{"id": 1, "name": "Super Smash Bros"}
6|{"id": 6, "name": "Mario Kart"}

# score-events
{"score": 1000, "product_id": 1, "player_id": 1}
{"score": 2000, "product_id": 1, "player_id": 2}
{"score": 4000, "product_id": 1, "player_id": 3}
{"score": 500, "product_id": 1, "player_id": 4}
{"score": 800, "product_id": 6, "player_id": 1}
{"score": 2500, "product_id": 6, "player_id": 2}
{"score": 9000.0, "product_id": 6, "player_id": 3}
{"score": 1200.0, "product_id": 6, "player_id": 4}

Si producimos estos datos ficticios en los temas apropiados y luego consultamos nuestro servicio de tabla de clasificación, veremos que nuestra aplicación Kafka Streams no sólo procesa las puntuaciones más altas, sino que ahora expone los resultados de nuestras operaciones con estado. En el siguiente bloque de código se muestra un ejemplo de respuesta a una consulta interactiva:

$ curl -s localhost:7000/leaderboard/1 | jq '.'

[
  {
    "playerId": 3,
    "productId": 1,
    "playerName": "Isabelle",
    "gameName": "Super Smash Bros",
    "score": 4000
  },
  {
    "playerId": 2,
    "productId": 1,
    "playerName": "Mitch",
    "gameName": "Super Smash Bros",
    "score": 2000
  },
  {
    "playerId": 1,
    "productId": 1,
    "playerName": "Elyse",
    "gameName": "Super Smash Bros",
    "score": 1000
  }
]

Resumen

En este capítulo, has aprendido cómo Kafka Streams captura información sobre los eventos que consume, y cómo aprovechar la información recordada (estado) para realizar tareas de procesamiento de flujos más avanzadas, entre ellas:

  • Realizar una unión KStream-KTable

  • Reencriptación de mensajes para cumplir los requisitos de coparticipación de determinados tipos de uniones

  • Realizar una unión KStream-GlobalKTable

  • Agrupar los registros en representaciones intermedias (KGroupedStream, KGroupedTable) para preparar nuestros datos para la agregación

  • Agregación de flujos y tablas

  • Utilizar las consultas interactivas para exponer el estado de nuestra aplicación mediante consultas locales y remotas

En el próximo capítulo, hablaremos de otro aspecto de la programación con estado que tiene que ver no sólo con los eventos que ha visto nuestra aplicación, sino con cuándo se han producido. El tiempo desempeña un papel clave en el procesamiento con estado, por lo que comprender las distintas nociones de tiempo y también las diversas abstracciones basadas en el tiempo de la biblioteca Kafka Streams nos ayudará a ampliar aún más nuestros conocimientos sobre el procesamiento con estado.

1 Para más información sobre las plataformas de procesamiento relacional de flujos, consulta la entrada del blog de Robert Yokota de 2018 sobre el tema.

2 En memoria, en disco o en una combinación de ambos.

3 LevelDB se escribió en Google, pero cuando los ingenieros de Facebook empezaron a utilizarlo, descubrieron que era demasiado lento para sus flujos de trabajo integrados. Al cambiar el proceso de compactación monohilo de LevelDB por un proceso de compactación multihilo, y al aprovechar los filtros bloom para las lecturas, se mejoró drásticamente el rendimiento tanto de lectura como de escritura.

4 Hemos mencionado que los almacenes de estado son altamente configurables, e incluso la tolerancia a fallos puede desactivarse desactivando el comportamiento de registro de cambios.

5 Por ejemplo, inMemoryKeyValueStore utiliza un Java TreeMap, que se basa en un árbol rojo-negro, mientras que todos los almacenes persistentes de clave-valor utilizan RocksDB.

6 Por ejemplo, los almacenes de ventanas son almacenes clave-valor, pero las claves también incluyen la hora de la ventana además de la clave del registro.

7 Tim Berglund y Yaroslav Tkachenko hablan del caso de uso de Activision en el podcastStreaming Audio.

8 Ya hemos visto cómo Kafka Streams puede escribir directamente en temas de salida, lo que nos permite enviar datos procesados/enriquecidos a aplicaciones posteriores. Sin embargo, los clientes que deseen realizar consultas ad hoc contra una aplicación Kafka Streams pueden utilizar consultas interactivas.

9 Como se mencionó en el Capítulo 3, si nuestros temas contuvieran datos Avro, podríamos definir nuestro modelo de datos en un archivo de esquema Avro.

10 También podemos utilizar KStreams para operaciones de búsqueda/unión, pero se trata siempre de una operación en ventana, por lo que hemos reservado la discusión de este tema hasta el próximo capítulo.

11 Florian Trossbach y Matthias J. Sax profundizan mucho más en este tema en su artículo "Crossing the Streams: Joins in Apache Kafka".

12 Las consultas UNION son otro método para combinar conjuntos de datos en el mundo relacional. El comportamiento del operador merge en Kafka Streams está más relacionado con el funcionamiento de una consulta UNION.

13 Si no utilizas la Plataforma Confluent, el script es kafka-topics.sh.

14 El lado GlobalKTable de la unión seguirá utilizando la clave de registro para la búsqueda.

15 Los flujos son sólo apéndices, por lo que no necesitan un sustractor.

16 Aún no hemos hablado de borrar claves, pero trataremos este tema en el Capítulo 6, cuando hablemos de la limpieza de los almacenes de estado.

17 Esto último no es aconsejable. Ejecutar una única aplicación Kafka Streams consolidaría todo el estado de la aplicación en una única instancia, pero Kafka Streams está pensado para ejecutarse de forma distribuida para maximizar el rendimiento y la tolerancia a fallos.

18 Y otros clientes si se desea, por ejemplo, humanos.

19 Que sustituye al método metadataForKey, muy utilizado en versiones < 2.5, pero oficialmente obsoleto en esa versión.

20 Existe una versión sobrecargada del método queryMetadataForKey que también acepta un StreamPartitioner personalizado.

Get Dominar Kafka Streams y ksqlDB now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.