Capítulo 4. Procesamiento por estados
Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com
En el capítulo anterior, aprendimos a realizar transformaciones sin estado de flujos de registros utilizando la abstracción KStream
y un rico conjunto de operadores sin estado que están disponibles en Kafka Streams. Puesto que las transformaciones sin estado no requieren ninguna memoria de los eventos vistos previamente, son fáciles de razonar y utilizar. Tratamos cada evento como un hecho inmutable y lo procesamos independientemente de otros eventos.
Sin embargo, Kafka Streams también nos ofrece la posibilidad de capturar y recordar información sobre los eventos que consumimos. La información capturada, o estado, nos permite realizar operaciones de procesamiento de flujos más avanzadas, como unir y agregar datos. En este capítulo exploraremos en detalle el procesamiento de flujos con estado. Algunos de los temas que trataremos son:
-
Las ventajas del procesamiento de flujos con estado
-
Las diferencias entre hechos y comportamientos
-
¿Qué tipos de operadores con estado están disponibles en Kafka Streams?
-
Cómo se captura y consulta el estado en Kafka Streams
-
Cómo puede utilizarse la abstracción
KTable
para representar el estado local particionado -
Cómo puede utilizarse la abstracción
GlobalKTable
para representar el estado global replicado -
Cómo realizar operaciones con estado, incluyendo unir y agregar datos
-
Cómo utilizar consultas interactivas para exponer el estado
Al igual que en el capítulo anterior, exploraremos estos conceptos utilizando un enfoque basado en tutoriales. El tutorial de este capítulo está inspirado en la industria de los videojuegos, y construiremos una tabla de clasificación en tiempo real que nos obligará a utilizar muchos de los operadores con estado de Kafka Streams. Además, dedicaremos mucho tiempo a hablar de las uniones, ya que es una de las formas más comunes de enriquecimiento de datos en las aplicaciones con estado. Pero antes de empezar con el tutorial, veamos algunas de las ventajas del procesamiento con estado.
Ventajas del procesamiento por estados
El procesamiento con estados nos ayuda a comprender las relaciones entre los eventos y a aprovechar estas relaciones para casos de uso más avanzados del procesamiento de flujos. Cuando somos capaces de entender cómo se relaciona un evento con otros eventos, podemos:
-
Reconocer patrones y comportamientos en nuestros flujos de eventos
-
Realiza agregaciones
-
Enriquece los datos de forma más sofisticada utilizando uniones
Otra ventaja del procesamiento de flujos con estado es que nos proporciona una abstracción adicional para representar datos. Al reproducir un flujo de eventos de uno en uno, yguardar el último estado de cada clave en un almacén de valores clave integrado, podemos construir una representación puntual de flujos de registros continuos e ilimitados. Estas representaciones puntuales, o instantáneas, se denominan tablas, y Kafka Streams incluye distintos tipos de abstracciones de tablas que conoceremos en este capítulo.
Las tablas no sólo son el núcleo del procesamiento de flujos con estado, sino que, cuando se materializan, también se pueden consultar. Esta capacidad de consultar una instantánea en tiempo real de un flujo de eventos en rápido movimiento es lo que convierte a Kafka Streams en una plataforma de procesamiento relacional de flujos,1 y nos permite no sólo crear aplicaciones de procesamiento de flujos, sino también microservicios de baja latencia basados en eventos.
Por último, el procesamiento de flujos con estado nos permite comprender nuestros datos utilizando modelos mentales más sofisticados. Un punto de vista especialmente interesante procede de Neil Avery, que analiza las diferencias entre hechos y comportamientos en su debate sobre el pensamiento "primero los acontecimientos":
Un acontecimiento representa un hecho, algo que ha ocurrido; es inmutable...
Las aplicaciones sin estado, como las que hemos analizado en el capítulo anterior, se basan en hechos. Cada evento se trata como un hecho independiente y atómico, que puedeprocesarse utilizando una semántica inmutable (piensa en inserciones en un flujo interminable), y luego olvidarse.
Sin embargo, además de aprovechar los operadores sin estado para filtrar, bifurcar, combinar y transformar hechos, podemos hacer preguntas aún más avanzadas a nuestros datos si aprendemos a modelar comportamientos utilizando operadores con estado. ¿Qué son los comportamientos? Según Neil
La acumulación de hechos capta el comportamiento.
Verás, los acontecimientos (o hechos) rara vez ocurren de forma aislada en el mundo real. Todo está interconectado, y al capturar y recordar hechos, podemos empezar a comprender su significado. Esto es posible comprendiendo los hechos en su contexto histórico más amplio, o mirando otros hechos relacionados que hayan sido capturados y almacenados por nuestra aplicación.
Un ejemplo popular es el abandono de la cesta de la compra, que es un comportamiento compuesto por múltiples hechos: un usuario añade uno o más artículos a una cesta de la compra y, a continuación, se termina la sesión, ya sea manualmente (por ejemplo, el usuario cierra la sesión) o automáticamente (por ejemplo, debido a un largo período de inactividad). Procesar cualquiera de los dos hechos de forma independiente nos dice muy poco sobre en qué punto del proceso de compra se encuentra el usuario. Sin embargo, recopilar, recordar y analizar cada uno de los hechos (que es lo que permite el procesamiento de estados) nos permite reconocer el comportamiento y reaccionar ante él, y proporciona un valor empresarial mucho mayor que ver el mundo como una serie de hechos no relacionados.
Ahora que entendemos las ventajas del procesamiento de flujos con estado y las diferencias entre hechos y comportamientos, vamos a ver un avance de los operadores con estado de Kafka Streams.
Vista previa de los Operadores con Estado
Kafka Streams incluye varios operadores de estado que podemos utilizar en nuestras topologías de procesador . La Tabla 4-1 incluye un resumen de varios operadores con los que trabajaremos en este libro.
Caso práctico | Propósito | Operarios |
---|---|---|
Unir datos |
|
|
Agregación de datos |
|
|
Datos de la ventana |
Agrupa acontecimientos que tienen una proximidad temporal cercana |
|
Además, en podemos combinar operadores de estado en Kafka Streams para comprender relaciones/comportamientos aún más complejos entre eventos. Por ejemplo, realizar una unión en ventana nos permite comprender cómo se relacionan flujos de eventos discretos durante un determinado periodo de tiempo. Como veremos en el próximo capítulo, las agregaciones por ventanas son otra forma útil de combinar operadores con estado.
Ahora bien, en comparación con los operadores sin estado que vimos en el capítulo anterior, los operadores con estado son más complejos y tienen requisitos adicionales de cálculo y almacenamiento.2 adicionales. Por esta razón, pasaremos algún tiempo aprendiendo sobre el funcionamiento interno del procesamiento con estado en Kafka Streams antes de empezar a utilizar los operadores con estado enumerados en la Tabla 4-1.
Quizá el punto de partida más importante sea ver cómo se almacena y consulta el estado en Kafka Streams.
Tiendas estatales
Ya hemos establecido en que las operaciones con estado requieren que nuestra aplicación mantenga cierta memoria de los eventos vistos anteriormente. Por ejemplo, una aplicación que cuente el número de registros de errores que ve necesita mantener un único número para cada clave: un recuento rodante que se actualiza cada vez que se consume un nuevo registro de errores. Este recuento representa el contexto histórico de un registro y, junto con la clave del registro, pasa a formar parte del estado de la aplicación.
Para soportar las operaciones con estado, necesitamos una forma de almacenar y recuperar los datos recordados, o estado, requeridos por cada operador con estado de nuestra aplicación (por ejemplo, count
, aggregate
, join
, etc.). La abstracción de almacenamiento que aborda estas necesidades en Kafka Streams se denomina almacén de estado, y puesto que una sola aplicación Kafka Streams puede aprovechar muchos operadores con estado, una sola aplicación puede contener varios almacenes de estado.
Nota
Esta sección proporciona información de bajo nivel sobre cómo se captura y almacena el estado en Kafka Streams. Si estás impaciente por empezar con el tutorial, no dudes en saltar a "Presentación de nuestro tutorial: Tabla de clasificación de videojuegos" y volver a visitar esta sección más adelante.
Hay muchas implementaciones de almacenes de estado y posibilidades de configuración disponibles en Kafka Streams, cada una con ventajas, compensaciones y casos de uso específicos. Siempre que utilices un operador con estado en tu aplicación Kafka Streams, es útil considerar qué tipo de almacén de estado necesita el operador, y también cómo configurar el almacén de estado en función de tus criterios de optimización (por ejemplo, ¿estás optimizando un alto rendimiento, simplicidad operativa, tiempos de recuperación rápidos en caso de fallo, etc.). En la mayoría de los casos, Kafka Streams elegirá un valor predeterminado razonable si no especificas explícitamente un tipo de almacén de estado o anulas las propiedades de configuración de un almacén de estado.
Dado que la variación en los tipos y configuraciones de los almacenes de estado hace que éste sea un tema bastante profundo, centraremos inicialmente nuestra discusión en las características comunes de todas las implementaciones de almacenes de estado por defecto, y luego echaremos un vistazo a las dos grandes categorías de almacenes de estado: los almacenes persistentes y los almacenes en memoria. En el Capítulo 6 trataremos más en profundidad los almacenes de estado, y a medida que vayamos encontrando temas específicos en nuestrotutorial.
Características comunes
Las implementaciones predeterminadas de almacenes de estado incluidas en Kafka Streams comparten algunas propiedades comunes. Discutiremos estos puntos en común en esta sección para tener una mejor idea de cómo funcionan los almacenes de estado.
Embedded
Las implementaciones predeterminadas de almacenes de estado que se incluyen en Kafka Streams están incrustadas dentro de tu aplicación Kafka Streams a nivel de tarea (hablamos por primera vez de las tareas en "Tareas e hilos de flujo"). La ventaja de los almacenes de estado incrustados, frente al uso de un motor de almacenamiento externo, es que este último requeriría una llamada a la red cada vez que fuera necesario acceder al estado, y por tanto introduciría latencia y cuellos de botella de procesamiento innecesarios. Además, como los almacenes de estado están integrados en el nivel de tarea, se elimina toda una clase de problemas de concurrencia para acceder al estado compartido.
Además, si los almacenes de estado fueran remotos, tendrías que preocuparte de la disponibilidad del sistema remoto por separado de tu aplicación Kafka Streams. Permitir que Kafka Streams gestione un almacén de estado local garantiza que siempre estará disponible y reduce bastante la superficie de error. Un almacén remoto centralizado sería aún peor, ya que se convertiría en un único punto de fallo para todas las instancias de tu aplicación. Por tanto, la estrategia de Kafka Streams de colocar el estado de una aplicación junto a la propia aplicación no sólo mejora el rendimiento (como se ha comentado en el párrafo anterior), sino también la disponibilidad.
Todos los almacenes de estado predeterminados utilizan RocksDB. RocksDB es un almacén de valores clave rápido e integrado, desarrollado originalmente en Facebook. Como admite flujos arbitrarios de bytes para almacenar pares clave-valor, funciona bien con Kafka, que también desacopla la serialización del almacenamiento. Además, tanto las lecturas como las escrituras son extremadamente rápidas, gracias a un rico conjunto de optimizaciones realizadas en el código bifurcado LevelDB.3
Múltiples modos de acceso
Los almacenes de estado admiten múltiples modos de acceso y patrones de consulta. Las topologías de procesadores requieren acceso de lectura y escritura a los almacenes de estado. Sin embargo, cuando se construyen microservicios utilizando la función de consultas interactivas de Kafka Streams , de la que hablaremos más adelante en "Consultas interactivas", los clientes sólo necesitan acceso de lectura al estado subyacente. Esto garantiza que el estado nunca sea mutable fuera de la topología del procesador, y se consigue mediante una envoltura dedicada de sólo lectura que los clientes pueden utilizar para consultar de forma segura el estado de una aplicación Kafka Streams.
Tolerante a fallos
Por defecto, los almacenes de estado están respaldados por temas de registro de cambios en Kafka.4 En caso de fallo, los almacenes de estado pueden restaurarse reproduciendo los eventos individuales del tema de registro de cambios subyacente para reconstruir el estado de una aplicación. Además, Kafka Streams permite a los usuarios habilitar réplicas en espera para reducir el tiempo que se tarda en reconstruir el estado de una aplicación. Estas réplicas en espera (a veces llamadas copias en la sombra) hacen que los almacenes de estado sean redundantes, lo cual es una característica importante de los sistemas de alta disponibilidad. Además, las aplicaciones que permiten que se consulte su estado pueden confiar en las réplicas en espera para servir el tráfico de consulta cuando otras instancias de la aplicación se caen, lo que también contribuye a la alta disponibilidad.
Basado en claves
Las operaciones que aprovechan los almacenes de estado se basan en claves. La clave de un registro define la relación entre el evento actual y otros eventos. La estructura de datos subyacente variará en función del tipo de almacén de estado que decidas utilizar,5 pero cada implementación puede conceptualizarse como alguna forma de almacén clave-valor, donde las claves pueden ser simples, o incluso compuestas (es decir, multidimensionales) en algunos casos.6
Nota
Para complicar un poco las cosas, Kafka Streams se refiere explícitamente a ciertos tipos de almacenes de estado como almacenes clave-valor, aunque todos los almacenes de estado por defecto están basados en claves. Cuando nos referimos a almacenes de valor-clave en este capítulo y en otras partes de este libro, nos referimos a almacenes de estado sin ventana (los almacenes de estado con ventana se tratarán en el capítulo siguiente).
Ahora que entendemos los puntos en común entre los almacenes de estado predeterminados en Kafka Streams, veamos dos grandes categorías de almacenes de estado para comprender las diferencias entre determinadas implementaciones.
Almacenes persistentes frente a almacenes en memoria
Uno de los diferenciadores más importantes entre las distintas implementaciones de almacenes de estado es si el almacén de estado es persistente o no, o si simplemente almacena la información recordada en memoria (RAM ). Los almacenes de estado persistentes vuelcan el estado al disco de forma asíncrona (a un directorio de estado configurable), lo que tiene dos ventajas principales:
-
El estado puede superar el tamaño de la memoria disponible.
-
En caso de fallo, los almacenes persistentes pueden restaurarse más rápidamente que los almacenes en memoria.
Para aclarar el primer punto, un almacén de estado persistente puede mantener parte de su estado en memoria, mientras escribe en disco cuando el tamaño del estado se hace demasiado grande (esto se denomina derramar en disco) o cuando el búfer de escritura supera un valor configurado. En segundo lugar, como el estado de la aplicación persiste en el disco, Kafka Streams no necesita reproducir todo el tema para reconstruir el almacén de estado cuando éste se pierde (por ejemplo, debido a un fallo del sistema, una migración de instancia, etc.). Sólo necesita reproducir los datos que falten entre el momento en que la aplicación dejó de funcionar y el momento en que volvió a funcionar.
Consejo
El directorio del almacén de estados utilizado para los almacenes persistentes puede establecerse utilizando la propiedad StreamsConfig.STATE_DIR_CONFIG
. La ubicación predeterminada es /tmp/kafka-streams, pero se recomienda encarecidamente que la sustituyas por un directorio fuera de /tmp.
El inconveniente es que los almacenes de estado persistentes son operativamente más complejos y pueden ser más lentos que un almacén en memoria puro, que siempre extrae datos de la RAM. La complejidad operativa adicional proviene del requisito de almacenamiento secundario (es decir, almacenamiento basado en disco) y, si necesitas ajustar el almacén de estado, de la comprensión de RocksDB y sus configuraciones (esto último puede no ser un problema para la mayoría de las aplicaciones).
En cuanto a las ganancias de rendimiento de un almacén de estado en memoria, puede que no sean lo suficientemente drásticas como para justificar su uso (ya que la recuperación de fallos lleva más tiempo). Añadir más particiones para paralelizar el trabajo siempre es una opción si necesitas exprimir más el rendimiento de tu aplicación. Por tanto, mi recomendación es que empieces con almacenes persistentes y sólo cambies a almacenes en memoria si has medido una mejora notable del rendimiento y, cuando se trate de una recuperación rápida (por ejemplo, en caso de que se pierda el estado de tu aplicación), utilizas réplicas en espera para reducir el tiempo de recuperación.
Ahora que ya sabemos qué son los almacenes de estado y cómo permiten el procesamiento basado en el estado/comportamiento, echemos un vistazo al tutorial de este capítulo y veamos algunas de estas ideas en acción.
Presentamos nuestro tutorial: Tabla de clasificación de videojuegos
En este capítulo, aprenderemos sobre el procesamiento de estados implementando una tabla de clasificación de videojuegos con Kafka Streams. La industria del videojuego es un excelente ejemplo en el que destaca el procesamiento de flujos, ya que tanto los jugadores como los sistemas de juego requieren un procesamiento de baja latencia y una respuesta inmediata. Esta es una de las razones por las que empresas como Activision (la empresa que está detrás de juegos como Call of Duty y remasterizaciones de Crash Bandicoot y Spyro) utilizan Kafka Streams para procesar la telemetría de los videojuegos.7
La tabla de clasificación que vamos a construir requerirá que modelemos los datos de formas que aún no hemos explorado. En concreto, veremos cómo utilizar las abstracciones de tabla de Kafka Streams para modelar los datos como una secuencia de actualizaciones. Luego, nos sumergiremos en los temas de unir y agregar datos, que son útiles siempre que necesites comprender o calcular la relación entre varios eventos. Estos conocimientos te ayudarán a resolver problemas empresariales más complicados con Kafka Streams.
Una vez que hayamos creado nuestra tabla de clasificación en tiempo real utilizando un nuevo conjunto de operadores con estado, demostraremos cómo consultar Kafka Streams para obtener la información más reciente de la tabla de clasificación mediante consultas interactivas. Nuestro debate sobre esta función te enseñará a construir microservicios basados en eventos con Kafka Streams, lo que a su vez ampliará el tipo de clientes con los que podemos compartir datos desde nuestras aplicaciones de procesamiento de flujos.8
Sin más preámbulos, echemos un vistazo a la arquitectura de nuestra tabla de clasificación de videojuegos. La Figura 4-1 muestra el diseño de la topología que implementaremos en este capítulo. Después del diagrama se incluye información adicional sobre cada paso.
Nuestro clúster Kafka contiene tres temas:
El tema
score-events
contiene los resultados de los partidos. Los registros no tienen clave y, por tanto, se distribuyen de forma circular por las particiones del tema.El tema
players
contiene perfiles de jugadores. Cada registro tiene un identificador de jugador.El tema
products
contiene información sobre productos de varios videojuegos. Cada registro tiene un identificador de producto.
Necesitamos enriquecer nuestros datos de eventos de puntuación con información detallada de los jugadores. Podemos conseguirlo mediante una unión.
Una vez que hemos enriquecido los datos de
score-events
con los datos de los jugadores, tenemos que añadir información detallada sobre los productos al flujo resultante. Esto también puede hacerse mediante una unión.Como la agrupación de datos es un requisito previo para la agregación, necesitamos agrupar el flujo enriquecido.
Necesitamos calcular las tres puntuaciones más altas de cada juego. Para ello, podemos utilizar los operadores de agregación de Kafka Streams.
Por último, necesitamos exponer externamente las puntuaciones más altas de cada juego. Para ello, construiremos un microservicio RESTful utilizando la función de consultas interactivas de Kafka Streams.
Con nuestro diseño de topología en la mano, ahora podemos pasar a la configuración del proyecto.
Configuración del proyecto
El código de este capítulo se encuentra enhttps://github.com/mitch-seymour/mastering-kafka-streams-and-ksqldb.git.
Si quieres hacer referencia al código a medida que avanzamos por cada paso de la topología, clona el repositorio y cambia al directorio que contiene el tutorial de este capítulo. El siguiente comando será suficiente:
$ git clone git@github.com:mitch-seymour/mastering-kafka-streams-and-ksqldb.git $ cd mastering-kafka-streams-and-ksqldb/chapter-04/video-game-leaderboard
Puedes construir el proyecto en cualquier momento ejecutando el siguiente comando:
$ ./gradlew build --info
Ahora que nuestro proyecto está configurado, vamos a empezar a implementar nuestra tabla de clasificación del videojuego.
Modelos de datos
Como siempre, empezaremos definiendo nuestros modelos de datos. Dado que los temas fuente contienen datos JSON, definiremos nuestros modelos de datos utilizando clases de datos POJO, que serializaremos y deserializaremos utilizando la biblioteca de serialización JSON de nuestra elección (a lo largo de este libro, utilizamos Gson, pero podrías utilizar fácilmente Jackson u otra biblioteca). 9
Me gusta agrupar mis modelos de datos en un paquete dedicado de mi proyecto, por ejemplo, com.magicalpipelines.model
. Aquí se muestra una vista del sistema de archivos donde se encuentran las clases de datos de este tutorial:
src/ └── main └── java └── com └── magicalpipelines └── model ├── ScoreEvent.java ├── Player.java └── Product.java
La clase de datos
ScoreEvent.java
se utilizará para representar los registros del temascore-events
.La clase de datos
Player.java
se utilizará para representar los registros del temaplayers
.La clase de datos
Product.java
se utilizará para representar los registros del temaproducts
.
Ahora que sabemos qué clases de datos tenemos que implementar, vamos a crear una clase de datos para cada tema. La Tabla 4-2 muestra los POJOs resultantes que hemos implementado para este tutorial.
Tema Kafka | Ejemplo de registro | Clase de datos |
---|---|---|
|
{ "score": 422, "product_id": 6, "player_id": 1 } |
public class ScoreEvent { private Long playerId; private Long productId; private Double score; } |
|
{ "id": 2, "name": "Mitch" } |
public class Player { private Long id; private String name; } |
|
{ "id": 1, "name": "Super Smash Bros" } |
public class Product { private Long id; private String name; } |
Nota
Ya hemos hablado en detalle de la serialización y deserialización en "Serialización/Deserialización". En el tutorial de ese capítulo, implementamos nuestro propio serializador, deserializador y Serdes personalizados. No dedicaremos más tiempo a eso aquí, pero puedes consultar el código de este tutorial para ver cómo hemos implementado los Serdes para cada una de las clases de datos que se muestran en la Tabla 4-2.
Añadir los procesadores de origen
Una vez definidas nuestras clases de datos, podemos configurar nuestros procesadores fuente. En esta topología, necesitamos tres de ellos, ya que leeremos de tres temas fuente. Lo primero que tenemos que hacer al añadir un procesador fuente es determinar qué abstracción de Kafka Streams debemos utilizar para representar los datos en el tema subyacente.
Hasta ahora, sólo hemos trabajado con la abstracción KStream
, que se utiliza para representar flujos de registros sin estado. Sin embargo, nuestra topología requiere que utilicemos los temas products
y players
como búsquedas, por lo que esto es un buen indicio de que una abstracción tipo tabla puede ser apropiada para estos temas.10 Antes de empezar a asignar nuestros temas a las abstracciones de Kafka Streams, revisemos primero la diferencia entre las representaciones KStream
, KTable
y GlobalKTable
de un tema Kafka. A medida que revisemos cada abstracción, rellenaremos la abstracción apropiada para cada tema en la Tabla 4-3.
Tema Kafka | Abstracción |
---|---|
|
??? |
|
??? |
|
??? |
KStream
Al decidir qué abstracción utilizar, ayuda determinar la naturaleza del tema, la configuración del tema y el espacio de claves de los registros en el tema fuente subyacente. Aunque las aplicaciones Kafka Streams con estado utilizan una o más abstracciones de tabla, también es muy común utilizar KStreams
sin estado junto a KTable
o GlobalKTable
cuando no se necesita la semántica de tabla mutable para una o más fuentes de datos.
En este tutorial, nuestro tema score-events
contiene eventos de puntuación brutos, que están sin clave (y, por tanto, distribuidos de forma rotatoria) en un tema sin compactar. Dado que las tablas se basan en claves, esto es un fuerte indicio de que deberíamos utilizar un KStream
para nuestro tema score-events
sin clave. Podríamos cambiar nuestra estrategia de claves en sentido ascendente (es decir, en cualquier aplicación que esté produciendo nuestro tema fuente), pero eso no siempre es posible. Además, nuestra aplicación se interesa por la puntuación más alta de cada jugador, no por la puntuación más reciente, por lo que la semántica de la tabla (es decir, retener sólo el registro más reciente para una clave determinada) no se traduce bien en la forma en que pretendemos utilizar el tema score-events
, aunque tuviera clave.
Por lo tanto, utilizaremos un KStream
para el tema score-events
, así que actualicemos la tabla de la Tabla 4-3 para reflejar esta decisión del siguiente modo:
Tema Kafka | Abstracción |
---|---|
|
|
|
??? |
|
??? |
Los dos temas restantes, players
y products
, tienen clave, y sólo nos interesa el último registro de cada clave única del tema. Por lo tanto, la abstracción KStream
no es ideal para estos temas. Así pues, sigamos adelante y veamos si la abstracción KTable
es adecuada para alguno de estos temas.
KTable
El tema players
es un tema compactado que contiene perfiles de jugadores, y cada registro tiene como clave el ID del jugador. Como sólo nos interesa el último estado de un jugador, tiene sentido representar este tema mediante una abstracción basada en tablas (ya sea KTable
oGlobalKTable
).
Algo importante que hay que tener en cuenta al decidir entre utilizar un KTable
o un GlobalKTable
es el espacio de claves. Si el espacio de claves es muy grande (es decir, tiene una cardinalidad alta/muchas claves únicas), o se espera que crezca hasta convertirse en un espacio de claves muy grande, entonces tiene más sentido utilizar un KTable
para poder distribuir fragmentos de todo el estado entre todas las instancias de tu aplicación en ejecución. Particionando el estado de este modo, podemos reducir la sobrecarga de almacenamiento local para cada instancia individual de Kafka Streams.
Quizás una consideración más importante a la hora de elegir entre un KTable
o un GlobalKTable
es si necesitas o no un procesamiento sincronizado en el tiempo. Un KTable
está sincronizado en el tiempo, por lo que cuando Kafka Streams esté leyendo de múltiples fuentes (por ejemplo, en el caso de una unión), mirará la marca de tiempo para determinar qué registro procesar a continuación. Esto significa que una unión reflejará lo que habría sido el registro combinado en un momento determinado, y esto hace que el comportamiento de la unión sea más predecible. Por otra parte, GlobalKTables
no está sincronizado en el tiempo, y está "completamente poblado antes de que se realice cualquier procesamiento".11
Por tanto, las uniones siempre se hacen contra la versión más actualizada de un GlobalKTable
, lo que cambia la semántica del programa.
En este caso, no vamos a centrarnos demasiado en la segunda consideración, ya que hemos reservado el próximo capítulo para nuestra discusión sobre el tiempo y el papel que desempeña en Kafka Streams. En cuanto al espacio de claves, players
contiene un registro por cada jugador único de nuestro sistema. Aunque esto puede ser pequeño dependiendo de dónde nos encontremos en el ciclo de vida de nuestra empresa o producto, es un número que esperamos que crezca significativamente con el tiempo, por lo que utilizaremos una abstracción de KTable
para este tema.
La Figura 4-2 muestra cómo el uso de KTable
hace que el estado subyacente se distribuya entre varias instancias de aplicación en ejecución.
Nuestra tabla de abstracción actualizada tiene ahora este aspecto:
Tema Kafka | Abstracción |
---|---|
|
|
|
|
|
??? |
Nos queda un tema: el tema products
. Este tema es relativamente pequeño, por lo que deberíamos poder replicar el estado en su totalidad en todas las instancias de nuestra aplicación. Echemos un vistazo a la abstracción que nos permite hacerlo: GlobalKTable
.
GlobalKTable
El tema products
es similar al tema players
en cuanto a su configuración (está compactado) y su espacio de claves delimitado (mantenemos el último registro para cada ID de producto único, y sólo hay un número fijo de productos de los que hacemos un seguimiento). Sin embargo, el tema products
tiene una cardinalidad mucho menor (es decir, menos claves únicas) que el temaplayers
e incluso si nuestra tabla de clasificación registrara las puntuaciones más altas de varios cientos de juegos, esto se traduciría en un espacio de estados lo suficientemente pequeño como para caber por completo en la memoria.
Además de ser más pequeños, los datos del tema products
también son relativamente estáticos. Los videojuegos tardan mucho en construirse, así que no esperamos muchas actualizaciones de nuestroproducts
tema.
Estas dos características (datos pequeños y estáticos) son para las que se diseñó GlobalKTables
. Por lo tanto, utilizaremos un GlobalKTable
para nuestro tema products
. Como resultado, cada una de nuestras instancias de Kafka Streams almacenará una copia completa de la información del producto, lo que, como veremos más adelante, facilita mucho la realización de uniones.
La Figura 4-3 muestra cómo cada instancia de Kafka Streams mantiene una copia completa de la tablaproducts
tabla.
Ahora podemos hacer la actualización final de nuestro mapeo tema-abstracción:
Tema Kafka | Abstracción |
---|---|
|
|
|
|
|
|
Ahora que hemos decidido qué abstracción utilizar para cada uno de nuestros temas fuente, podemos registrar los flujos y las tablas.
Registrar flujos y tablas
Registrar flujos y tablas es sencillo. El siguiente bloque de código muestra cómo utilizar el DSL de alto nivel para crear un KStream
, KTable
, y GlobalKTable
utilizando los métodos constructores adecuados:
StreamsBuilder builder = new StreamsBuilder(); KStream<byte[], ScoreEvent> scoreEvents = builder.stream( "score-events", Consumed.with(Serdes.ByteArray(), JsonSerdes.ScoreEvent())); KTable<String, Player> players = builder.table( "players", Consumed.with(Serdes.String(), JsonSerdes.Player())); GlobalKTable<String, Product> products = builder.globalTable( "products", Consumed.with(Serdes.String(), JsonSerdes.Product()));
Utiliza un
KStream
para representar datos en el temascore-events
, que actualmente no tiene clave.Crea una tabla particionada (o fragmentada) para el tema
players
, utilizando la abstracciónKTable
.Crea un
GlobalKTable
para el temaproducts
, que se replicará íntegramente en cada instancia de la aplicación.
Al registrar los temas fuente, ya hemos implementado el primer paso de nuestra topología de tablas de clasificación (ver Figura 4-1). Pasemos al siguiente paso: unir flujos y tablas.
Únete a
El método más común para combinar conjuntos de datos en el mundo relacional es mediante uniones.12 En los sistemas relacionales, los datos suelen ser muy dimensionales y estar dispersos en muchas tablas diferentes. Es habitual ver estos mismos patrones también en Kafka, ya sea porque los eventos proceden de varias ubicaciones, porque los desarrolladores se sienten cómodos o están acostumbrados a los modelos de datos relacionales, o porque ciertas integraciones de Kafka (por ejemplo, el conector JDBC Kafka, Debezium, Maxwell, etc.) traen consigo tanto los datos en bruto como los modelos de datos de los sistemas de origen.
Independientemente de cómo se dispersen los datos en Kafka, poder combinar datos en flujos y tablas separados basándose en relaciones abre la puerta a oportunidades de enriquecimiento de datos más avanzadas en Kafka Streams. Además, el método de unión para combinar conjuntos de datos es muy diferente de la simple fusión de flujos, como vimos en la Figura 3-6. Cuando utilizamos el operador merge
en Kafka Streams, los registros de ambos lados de la fusión se combinan incondicionalmente en un único flujo. Las operaciones de fusión simples son, por tanto, sin estado, ya que no necesitan contexto adicional sobre los eventos que se están fusionando.
Las uniones, sin embargo, pueden considerarse como un tipo especial de fusión condicional que se preocupa por la relación entre los eventos, y en la que los registros no se copian textualmente en el flujo de salida, sino que se combinan. Además, estas relaciones deben capturarse, almacenarse y referenciarse en el momento de la fusión para facilitar la unión, lo que hace que la unión sea una operación con estado. La Figura 4-4 muestra una representación simplificada de cómo funciona un tipo de unión (hay varios tipos de uniones, como veremos en la Tabla 4-5).
Al igual que los sistemas relacionales, Kafka Streams incluye soporte para múltiples tipos de uniones. Así que, antes de aprender a unir nuestro flujo score-events
con nuestra tabla players
, familiaricémonos primero con los distintos operadores de unión que tenemos a nuestra disposición para poder seleccionar la mejor opción para nuestro caso de uso particular.
Unir Operarios
Kafka Streams incluye tres operadores de unión diferentes para unir flujos y tablas. Cada operador se detalla en la Tabla 4-4.
Nota
Cuando hablamos de las diferencias entre los operadores join, nos referimos a los distintos lados de la unión. Recuerda que el lado derecho de la unión siempre se pasa como parámetro al operador join correspondiente. Por ejemplo
KStream<String, ScoreEvent> scoreEvents = ...; KTable<String, Player> players = ...; scoreEvents.join(players, ...);
Veamos ahora el tipo de uniones que podemos crear con estos operadores.
Tipos de unión
Kafka Streams admite muchos tipos diferentes de uniones, como se muestra en la Tabla 4-5. La columnade coparticionamiento se refiere a algo que trataremos en "Coparticionamiento". Por ahora, basta con entender que la co-partición es simplemente un conjunto adicional de requisitos que se necesitan para realizar realmente la unión.
Tipo | Ventana | Operarios | Se requiere copartición |
---|---|---|---|
|
Sía |
|
Sí |
|
No |
|
Sí |
|
No |
|
Sí |
|
No |
|
No |
a Una cosa clave que hay que tener en cuenta es que las uniones de |
Los dos tipos de uniones que tenemos que realizar en este capítulo son:
-
KStream-KTable
a unir elscore-events
KStream
y elplayers
KTable
-
KStream-GlobalKTable
para unir la salida de la unión anterior con laproducts
GlobalKTable
Utilizaremos una unión interna, utilizando el operador join
para cada una de las uniones, ya que sólo queremos que la unión se active cuando haya una coincidencia en ambos lados. Sin embargo, antes de hacerlo, podemos ver que la primera unión que vamos a crear (KStream-KTable
) muestra que es necesaria la co-partición. Echemos un vistazo a lo que eso significa antes de escribir más código.
Copartición
Si un árbol cae en un bosque y no hay nadie cerca para oírlo, ¿hace ruido?
Aforismo
Este famoso experimento mental plantea una pregunta sobre qué papel tiene un observador en la ocurrencia de un evento (en este caso, el sonido que se produce en un bosque). Del mismo modo, en Kafka Streams, siempre debemos ser conscientes del efecto que tiene un observador en el procesamiento de un evento.
En "Tareas e hilos de flujo", aprendimos que cada partición se asigna a una única tarea Kafka Streams, y estas tareas actuarán como observadores en nuestra analogía, ya que son las responsables de consumir y procesar realmente los eventos. Como no hay garantía de que los eventos de diferentes particiones sean gestionados por la misma tarea Kafka Streams, tenemos un problema potencial de observabilidad.
La Figura 4-5 muestra el problema básico de la observabilidad. Si los eventos relacionados son procesados por tareas diferentes, no podremos determinar con precisión la relación entre los eventos porque tenemos dos observadores separados. Dado que el objetivo de unir datos es combinar eventos relacionados, un problema de observabilidad hará que nuestras uniones fallen cuando, de otro modo, deberían tener éxito.
Para comprender la relación entre los sucesos (mediante uniones) o calcular agregaciones sobre una secuencia de sucesos, necesitamos asegurarnos de que los sucesos relacionados se dirigen a la misma partición, y por tanto son tratados por la misma tarea.
Para garantizar que los eventos relacionados se enrutan a la misma partición, debemos asegurarnos de que se cumplen los siguientes requisitos de co-partición:
-
Los registros de ambos lados deben tener como clave el mismo campo, y deben particionarse con esa clave utilizando la misma estrategia de partición.
-
Los temas de entrada de ambos lados de la unión deben contener el mismo número de particiones. (Éste es el único requisito que se comprueba al inicio. Si no se cumple este requisito, se lanzará un
TopologyBuilderException
).
En este tutorial, cumplimos todos los requisitos para realizar una unión KTable-KTable
excepto el primero. Recuerda que los registros del tema score-events
no tienen clave, pero que nos uniremos con el tema players
KTable
, que tiene clave por ID de jugador. Por lo tanto, antes de realizar la unión, tenemos que volver a codificar los datos de score-events
por ID de jugador. Esto puede hacerse utilizando el operador selectKey
, como se muestra en el Ejemplo 4-1.
Ejemplo 4-1. El operador selectKey
nos permite reintroducir registros; esto suele ser necesario para cumplir los requisitos de co-partición para realizar ciertos tipos de uniones
KStream<String, ScoreEvent> scoreEvents = builder .stream( "score-events", Consumed.with(Serdes.ByteArray(), JsonSerdes.ScoreEvent())) .selectKey((k, v) -> v.getPlayerId().toString());
selectKey
se utiliza para cambiar la clave de los registros. En este caso, nos ayuda a cumplir el primer requisito de la co-partición, que consiste en garantizar que los registros de ambos lados de la unión (los datos descore-events
y los deplayers
) estén codificados por el mismo campo.
En la Figura 4-6 se muestra una visualización de cómo se reintroducen los registros.
Nota
Cuando añadimos un operador de cambio de clave a nuestra topología, los datos subyacentes se marcarán para su repartición. Esto significa que en cuanto añadamos un operador descendente que lea la nueva clave, Kafka Streams lo hará:
-
Envía los datos reescritos a un tema interno de repartición
-
Vuelve a leer los datos recién recodificados en Kafka Streams
Este proceso garantiza que los registros relacionados (es decir, los registros que comparten la misma clave) serán procesados por la misma tarea en pasos de topología posteriores. Sin embargo, el viaje por la red necesario para redirigir los datos a un tema de repartición especial significa que las operaciones de cambio de clave pueden ser caras.
¿Qué ocurre con nuestro KStream-GlobalKTable
para unirnos al tema products
? Como se muestra en la Tabla 4-5, la co-partición no es necesaria para las uniones GlobalKTable
, ya que el estado está totalmente replicado en cada instancia de nuestra aplicación Kafka Streams. Por lo tanto, nunca nos encontraremos con este tipo de problema de observabilidad con una unión a GlobalKTable
.
Ya casi estamos listos para unir nuestros flujos y tablas. Pero antes, veamos cómo se combinan realmente los registros durante una operación de unión.
Ensambladoras de valor
Al realizar una unión mediante SQL tradicional, sólo tenemos que utilizar el operador de unión junto con una cláusula SELECT
para especificar la forma (o proyección) del registro de unión combinado. Por ejemplo
SELECT a.customer_name, b.purchase_amount FROM customers a LEFT JOIN purchases b ON a.customer_id = b.customer_id
Sin embargo, en Kafka Streams, necesitamos utilizar un ValueJoiner
para especificar cómo deben combinarse los distintos registros. Un ValueJoiner
simplemente toma cada registro que participa en la unión y produce un nuevo registro combinado. Si nos fijamos en la primera unión, en la que necesitamos unir el score-events
KStream
con el players
KTable
, el comportamiento del juntador de valores podría expresarse utilizando el siguiente pseudocódigo:
(scoreEvent, player) -> combine(scoreEvent, player);
Pero podemos hacerlo mucho mejor. Es más típico tener una clase de datos dedicada que haga una de las siguientes cosas:
-
Envuelve cada uno de los valores implicados en la unión
-
Extrae los campos relevantes de cada lado de la unión, y guarda los valores extraídos en propiedades de clase
A continuación exploraremos ambos enfoques. En primer lugar, empecemos con una clase envolvente sencilla para la unión score-events -> players
. El Ejemplo 4-2 muestra una implementación sencilla de una clase de datos que envuelve el registro de cada lado de la unión.
Ejemplo 4-2. La clase de datos que utilizaremos para construir el registro unido score-events -> players
public class ScoreWithPlayer { private ScoreEvent scoreEvent; private Player player; public ScoreWithPlayer(ScoreEvent scoreEvent, Player player) { this.scoreEvent = scoreEvent; this.player = player; } // accessors omitted from brevity }
El constructor contiene un parámetro para cada lado de la unión. El lado izquierdo de la unión contiene
ScoreEvent
, y el lado derecho contiene unPlayer
.Simplemente guardamos una referencia a cada registro implicado en la unión dentro de nuestra clase envoltorio.
Podemos utilizar nuestra nueva clase envoltorio como tipo de retorno en nuestro ValueJoiner
. El Ejemplo 4-3 muestra un ejemplo de implementación de un ValueJoiner
que combinaun ScoreEvent
(del score-events
KStream
) y un Player
(del players
KTable
) en una instancia ScoreWithPlayer
.
Ejemplo 4-3. El ValueJoiner
para combinar score-events
y players
ValueJoiner<ScoreEvent, Player, ScoreWithPlayer> scorePlayerJoiner = (score, player) -> new ScoreWithPlayer(score, player);
También podríamos utilizar aquí simplemente una referencia a un método estático, como
ScoreWithPlayer::new
.
Pasemos a la segunda unión. Esta unión necesita combinar un ScoreWithPlayer
(de la salida de la primera unión) con un Product
(del products
GlobalKTable
). Podríamos reutilizar de nuevo el patrón envoltorio, pero también podríamos simplemente extraer las propiedades que necesitamos de cada lado de la unión, y descartar el resto.
El siguiente bloque de código muestra una implementación de una clase de datos que sigue el segundo patrón. Simplemente extraemos los valores que queremos y los guardamos en las propiedades apropiadas de la clase:
public class Enriched { private Long playerId; private Long productId; private String playerName; private String gameName; private Double score; public Enriched(ScoreWithPlayer scoreEventWithPlayer, Product product) { this.playerId = scoreEventWithPlayer.getPlayer().getId(); this.productId = product.getId(); this.playerName = scoreEventWithPlayer.getPlayer().getName(); this.gameName = product.getName(); this.score = scoreEventWithPlayer.getScoreEvent().getScore(); } // accessors omitted from brevity }
Con esta nueva clase de datos, podemos construir nuestro ValueJoiner
para la unión KStream-GlobalKTable
utilizando el código que se muestra en el Ejemplo 4-4.
Ejemplo 4-4. Un ValueJoiner, expresado como una lambda, que utilizaremos para la unión
ValueJoiner<ScoreWithPlayer, Product, Enriched> productJoiner = (scoreWithPlayer, product) -> new Enriched(scoreWithPlayer, product);
Ahora que le hemos dicho a Kafka Streams cómo combinar nuestros registros de unión, podemos realizar las uniones.
Unión de KStream a KTable (unión de reproductores)
Es hora de unir nuestro score-events
KStream
con nuestro players
KTable
. Como sólo queremos activar la unión cuando el registro ScoreEvent
pueda coincidir con un registro Player
(utilizando la clave de registro), realizaremos una unión interna utilizando el operador join
, como se muestra aquí:
Joined<String, ScoreEvent, Player> playerJoinParams = Joined.with( Serdes.String(), JsonSerdes.ScoreEvent(), JsonSerdes.Player() ); KStream<String, ScoreWithPlayer> withPlayers = scoreEvents.join( players, scorePlayerJoiner, playerJoinParams );
Los parámetros de la unión definen cómo deben serializarse las claves y los valores de los registros de la unión.
El operador
join
realiza una unión interna.Este es el
ValueJoiner
que creamos en el Ejemplo 4-3. Se crea un nuevo valorScoreWithPlayer
a partir de los dos registros de unión. Echa un vistazo a la clase de datosScoreWithPlayer
del Ejemplo 4-2 para ver cómo se pasan los valores izquierdo y derecho de la unión al constructor.
Así de sencillo. Además, si ejecutaras el código en este punto y luego listaras todos los temas disponibles en tu clúster de Kafka, verías que Kafka Streams creó dos nuevos temas internos para nosotros.
Estos temas son:
-
Un tema de repartición para gestionar la operación de reescritura que realizamos en el Ejemplo 4-1.
-
Un tema de registro de cambios para respaldar el almacén de estados, que utiliza el operador de unión. Esto forma parte del comportamiento de tolerancia a fallos que comentamos inicialmente en "Tolerancia a fallos".
Puedes verificarlo con el script de consola kafka-topics
:13
$ kafka-topics --bootstrap-server kafka:9092 --list players products score-events dev-KSTREAM-KEY-SELECT-0000000001-repartition dev-players-STATE-STORE-0000000002-changelog
Un tema de reparto interno creado por Kafka Streams. Lleva como prefijo el ID de aplicación de nuestra aplicación Kafka Streams (
dev
).Un tema de registro de cambios interno creado por Kafka Streams. Al igual que el tema de repartición, este tema de registro de cambios también lleva como prefijo el ID de aplicación de nuestra aplicación Kafka Streams.
Bien, estamos listos para pasar a la segunda unión.
Unión de KStream a GlobalKTable (unión de productos)
Como hemos comentado en los requisitos de la co-partición, los registros de ambos lados de una unión GlobalKTable
no tienen por qué compartir la misma clave. Como la tarea local tiene una copia completa de la tabla, en realidad podemos realizar una unión utilizando algún atributo del propio valor del registro en el lado del flujo de la unión,14 lo que es más eficaz que tener que volver a teclear los registros a través de un tema de repartición sólo para garantizar que los registros relacionados sean manejados por la misma tarea.
Para realizar una unión KStream-GlobalKTable
, necesitamos crear algo llamado KeyValueMapper
, cuyo propósito es especificar cómo mapear un registro KStream
a un registro GlobalKTable
.Para este tutorial, podemos simplemente extraer el ID del producto del valor ScoreWithPlayer
para mapear estos registros a un Product
, como se muestra aquí:
KeyValueMapper<String, ScoreWithPlayer, String> keyMapper = (leftKey, scoreWithPlayer) -> { return String.valueOf(scoreWithPlayer.getScoreEvent().getProductId()); };
Con nuestro KeyValueMapper
en su sitio, y también el ValueJoiner
que creamos en el Ejemplo 4-4, ya podemos realizar la unión:
KStream<String, Enriched> withProducts = withPlayers.join(products, keyMapper, productJoiner);
Esto completa el segundo y tercer paso de nuestra topología de tabla de clasificación (ver Figura 4-1). Lo siguiente que tenemos que hacer es agrupar los registros enriquecidos para poder realizar una agregación.
Agrupar registros
Antes de realizar cualquier agregación de flujos o tablas en Kafka Streams, primero debes agrupar las KStream
o KTable
que piensas agregar. El propósito de la agrupación es el mismo que el de reagrupar registros antes de la unión: garantizar que los registros relacionados sean procesados por el mismo observador, o tarea de Kafka Streams.
Hay algunas pequeñas diferencias entre agrupar flujos y tablas, así que echaremos un vistazo a cada una de ellas.
Agrupar flujos
Hay dos operadores que pueden utilizarse para agrupar un KStream
:
-
groupBy
-
groupByKey
Utilizar groupBy
es similar al proceso de reincorporación de un flujo utilizando selectKey
, ya que este operador es un operador de cambio de clave y hace que Kafka Streams marque el flujo para su repartición. Si se añade un operador descendente que lea la nueva clave, Kafka Streams creará automáticamente un tema de repartición y enrutará los datos de vuelta a Kafka para completar el proceso de recodificación.
El ejemplo 4-5 muestra cómo utilizar el operador groupBy
para agrupar un KStream
.
Ejemplo 4-5. Utiliza el operador groupBy
para volver a teclear y agrupar un KStream
al mismo tiempo
KGroupedStream<String, Enriched> grouped = withProducts.groupBy( (key, value) -> value.getProductId().toString(), Grouped.with(Serdes.String(), JsonSerdes.Enriched()));
Podemos utilizar una lambda para seleccionar la nueva clave, ya que el operador
groupBy
espera unaKeyValueMapper
, que resulta ser una interfaz funcional.Grouped
nos permite pasar algunas opciones adicionales para la agrupación, incluidos los Serdes de clave y valor a utilizar al serializar los registros.
Sin embargo, si tus registros de no necesitan ser reparticionados, entonces es preferible utilizar el operador groupByKey
en su lugar. groupByKey
no marcará el flujo para reparticionarlo y, por tanto, tendrá un mayor rendimiento, ya que evita las llamadas de red adicionales asociadas al envío de datos de vuelta a Kafka para reparticionarlos. Aquí se muestra la implementación de groupByKey
:
KGroupedStream<String, Enriched> grouped = withProducts.groupByKey( Grouped.with(Serdes.String(), JsonSerdes.Enriched()));
Como queremos calcular las puntuaciones altas de cada ID de producto, y como nuestro flujo enriquecido está actualmente codificado por ID de jugador, utilizaremos la variación groupBy
que se muestra en el Ejemplo 4-5 en la topología de la tabla de clasificación.
Independientemente del operador que utilices para agrupar un flujo, Kafka Streams devolverá un nuevo tipo del que no hemos hablado antes: KGroupedStream
. KGroupedStream
no es más que una representación intermedia de un flujo que nos permite realizar agregaciones. Veremos las agregaciones en breve, pero antes, veamos cómo agrupar KTables
.
Tablas de agrupación
A diferencia de la agrupación de flujos, sólo hay un operador disponible para agrupar tablas: groupBy
. Además, en lugar de devolver un KGroupedStream
, invocar groupBy
sobre un KTable
devuelve una representación intermedia diferente: KGroupedTable
. Por lo demás, el proceso de agrupar KTables
es idéntico al de agrupar un KStream
. Por ejemplo, si quisiéramos agrupar el players
KTable
para poder realizar posteriormente alguna agregación (por ejemplo, contar el número de jugadores), podríamos utilizar el siguiente código:
KGroupedTable<String, Player> groupedPlayers = players.groupBy( (key, value) -> KeyValue.pair(key, value), Grouped.with(Serdes.String(), JsonSerdes.Player()));
El bloque de código anterior no es necesario para este tutorial, ya que no necesitamos agrupar la tabla players
, pero lo mostramos aquí para demostrar el concepto. Ahora ya sabemos cómo agrupar flujos y tablas, y hemos completado el paso 4 de nuestra topología de procesadores (ver Figura 4-1). A continuación, aprenderemos a realizar agregaciones en Kafka Streams.
Agregaciones
Uno de los pasos finales necesarios para nuestra topología de tabla de clasificación es calcular las puntuaciones más altas de cada juego. Kafka Streams nos proporciona un conjunto de operadores que hace que realizar este tipo de agregaciones sea muy fácil:
-
aggregate
-
reduce
-
count
A alto nivel, las agregaciones no son más que una forma de combinar varios valores de entrada en un único valor de salida. Tendemos a pensar en las agregaciones como operaciones matemáticas, pero no tienen por qué serlo. Mientras que count
es una operación matemática que calcula el número de eventos por clave, tanto el operador aggregate
como reduce
son más genéricos, y pueden combinar valores utilizando cualquier lógica combinacional que especifiques.
Nota
reduce
es muy similar a aggregate
. La diferencia radica en el tipo de retorno. El operador reduce
requiere que la salida de una agregación sea del mismo tipo que la entrada, mientras que el operador aggregate
puede especificar un tipo diferente para el registro de salida.
Además, las agregaciones pueden aplicarse tanto a flujos como a tablas. La semántica es un poco diferente en cada uno de ellos, ya que los flujos son inmutables mientras que las tablas son mutables. Esto se traduce en versiones ligeramente diferentes de losoperadores aggregate
y reduce
, aceptando la versión de flujos dos parámetros: un inicializador y un sumador, y la versión de tablas tres parámetros: un inicializador, un sumador yun restador.15
Veamos cómo agregar flujos creando nuestra agregación de puntuaciones altas.
Agregación de flujos
En esta sección, aprenderemos a aplicar agregaciones a flujos de registros, lo que implica crear una función para inicializar un nuevo valor agregado (llamada inicializador) y una función para realizar agregaciones posteriores a medida que llegan nuevos registros para una clave determinada (llamada función sumadora ). En primer lugar, aprenderemos sobre los inicializadores.
Inicializador
Cuando nuestra topología Kafka Streams ve una nueva clave, necesitamos alguna forma de inicializar la agregación. La interfaz que nos ayuda con esto es Initializer
, y como muchas de las clases de la API de Kafka Streams, Initializer
es una interfaz funcional (es decir, contiene un único método), y por tanto puede definirse como una lambda.
Por ejemplo, si echaras un vistazo a las partes internas de la agregación count
, verías un inicializador que establece el valor inicial de la agregación en 0
:
Initializer<Long> countInitializer = () -> 0L;
El inicializador se define como una lambda, ya que la interfaz
Initializer
es una interfaz funcional.
Para agregaciones más complejas, puedes proporcionar tu propio inicializador personalizado. Por ejemplo, para implementar una tabla de clasificación de videojuegos, necesitamos alguna forma de calcular las tres puntuaciones más altas de un juego determinado. Para ello, podemos crear una clase independiente que incluya la lógica para rastrear las tres puntuaciones más altas, y proporcionar una nueva instancia de esta clase cada vez que sea necesario inicializar una agregación.
En este tutorial, crearemos una clase personalizada llamada HighScores
para que actúe como nuestra clase de agregación. Esta clase necesitará alguna estructura de datos subyacente para contener las tres puntuaciones más altas de un determinado videojuego. Un enfoque es utilizar un TreeSet
, que es un conjunto ordenado incluido en la biblioteca estándar de Java, y por lo tanto es bastante conveniente para contener puntuaciones altas (que son inherentemente ordenadas).
Aquí se muestra una implementación inicial de nuestra clase de datos que utilizaremos para la agregación de puntuaciones altas:
public class HighScores { private final TreeSet<Enriched> highScores = new TreeSet<>(); }
Ahora tenemos que decirle a Kafka Streams cómo inicializar nuestra nueva clase de datos. Inicializar una clase es sencillo; sólo tenemos que instanciarla:
Initializer<HighScores> highScoresInitializer = HighScores::new;
Una vez que tenemos un inicializador para nuestra agregación, tenemos que implementar la lógica para realizar realmente la agregación (en este caso, llevar la cuenta de las tres puntuaciones más altas de cada videojuego).
Adder
Lo siguiente que tenemos que hacer para construir un agregador de flujos es definir la lógica para combinar dos agregados. Para ello se utiliza la interfaz Aggregator
, que, al igual que Initializer
, es una interfaz funcional que puede implementarse mediante una lambda. La función de implementación debe aceptar tres parámetros:
-
La clave de registro
-
El valor del registro
-
El valor agregado actual
Podemos crear nuestro agregador de puntuaciones altas con el siguiente código:
Aggregator<String, Enriched, HighScores> highScoresAdder = (key, value, aggregate) -> aggregate.add(value);
Ten en cuenta que aggregate
es una instancia de HighScores
, y como nuestro agregador invoca el método HighScores.add
, sólo tenemos que implementarlo en nuestra clase HighScores
. Como puedes ver en el siguiente bloque de código, el código es extremadamente sencillo, con el método add
simplemente añadiendo una nueva puntuación alta a la interna TreeSet
, y luego eliminando la puntuación más baja si tenemos más de tres puntuaciones altas:
public class HighScores { private final TreeSet<Enriched> highScores = new TreeSet<>(); public HighScores add(final Enriched enriched) { highScores.add(enriched); if (highScores.size() > 3) { highScores.remove(highScores.last()); } return this; } }
Cada vez que nuestro método sumador (
HighScores.add
) es llamado por Kafka Streams, simplemente añadimos el nuevo registro al subyacenteTreeSet
, que ordenará cada entrada automáticamente.Si tenemos más de tres puntuaciones altas en nuestro
TreeSet
, elimina la puntuación más baja.
Para que TreeSet
sepa cómo ordenar los objetos Enriched
(y, por tanto, pueda identificar el registro Enriched
con la puntuación más baja para eliminarlo cuando nuestro agregado highScores
supere los tres valores), implementaremos la interfaz Comparable
, como se muestra en el Ejemplo 4-6.
Ejemplo 4-6. La clase Enriched
actualizada, que implementa la interfaz Comparable
public class Enriched implements Comparable<Enriched> { @Override public int compareTo(Enriched o) { return Double.compare(o.score, score); } // omitted for brevity }
Actualizaremos nuestra clase
Enriched
para que implementeComparable
, ya que determinar las tres puntuaciones más altas implicará comparar un objetoEnriched
con otro.Nuestra implementación del método
compareTo
utiliza la propiedadscore
como método para comparar dos objetosEnriched
diferentes.
Ahora que tenemos nuestro inicializador y nuestra función sumadora, podemos realizar la agregación utilizando el código del Ejemplo 4-7.
Ejemplo 4-7. Utilizar el operador de agregación de Kafka Streams para realizar nuestra agregación de puntuaciones altas
KTable<String, HighScores> highScores = grouped.aggregate(highScoresInitializer, highScoresAdder);
Tablas de agregación
El proceso de agregar tablas es bastante similar al de agregar flujos. Necesitamos un inicializador y una función sumadora. Sin embargo, las tablas son mutables, y necesitan poder actualizar un valor agregado siempre que se elimine una clave.16 También necesitamos un tercer parámetro, llamado función sustractora.
Sustractor
Aunque esto no es necesario para el ejemplo de la tabla de clasificación, supongamos que queremos contar el número de jugadores en nuestro players
KTable
. Podríamos utilizar el operador count
, pero para demostrar cómo construir una función sustractora, construiremos nuestra propia función agregadora que es esencialmente equivalente al operador count
. Aquí se muestra una implementación básica de un agregado que utiliza una función sustractora (así como una función inicializadora y sumadora, que es necesaria para los agregados KStream
y KTable
):
KGroupedTable<String, Player> groupedPlayers = players.groupBy( (key, value) -> KeyValue.pair(key, value), Grouped.with(Serdes.String(), JsonSerdes.Player())); groupedPlayers.aggregate( () -> 0L, (key, value, aggregate) -> aggregate + 1L, (key, value, aggregate) -> aggregate - 1L);
La función inicializadora inicializa el agregado a 0.
La función sumador incrementa la cuenta actual cuando se ve una nueva tecla.
La función sustractor disminuye la cuenta actual cuando se elimina una tecla.
Ya hemos completado el paso 5 de nuestra topología de tabla de clasificación(Figura 4-1). Hemos escrito una cantidad decente de código, así que veamos cómo encajan los fragmentos de código individuales en la siguiente sección.
Ponerlo todo junto
Ahora que hemos construido los pasos de procesamiento individuales de nuestra topología de tabla de clasificación, vamos a unirlo todo. El Ejemplo 4-8 muestra cómo se unen los pasos de la topología que hemos creado hasta ahora.
Ejemplo 4-8. Topología del procesador para nuestra aplicación de clasificación de videojuegos
// the builder is used to construct the topology StreamsBuilder builder = new StreamsBuilder(); // register the score events stream KStream<String, ScoreEvent> scoreEvents = builder .stream( "score-events", Consumed.with(Serdes.ByteArray(), JsonSerdes.ScoreEvent())) .selectKey((k, v) -> v.getPlayerId().toString()); // create the partitioned players table KTable<String, Player> players = builder.table("players", Consumed.with(Serdes.String(), JsonSerdes.Player())); // create the global product table GlobalKTable<String, Product> products = builder.globalTable( "products", Consumed.with(Serdes.String(), JsonSerdes.Product())); // join params for scoreEvents - players join Joined<String, ScoreEvent, Player> playerJoinParams = Joined.with(Serdes.String(), JsonSerdes.ScoreEvent(), JsonSerdes.Player()); // join scoreEvents - players ValueJoiner<ScoreEvent, Player, ScoreWithPlayer> scorePlayerJoiner = (score, player) -> new ScoreWithPlayer(score, player); KStream<String, ScoreWithPlayer> withPlayers = scoreEvents.join(players, scorePlayerJoiner, playerJoinParams); // map score-with-player records to products KeyValueMapper<String, ScoreWithPlayer, String> keyMapper = (leftKey, scoreWithPlayer) -> { return String.valueOf(scoreWithPlayer.getScoreEvent().getProductId()); }; // join the withPlayers stream to the product global ktable ValueJoiner<ScoreWithPlayer, Product, Enriched> productJoiner = (scoreWithPlayer, product) -> new Enriched(scoreWithPlayer, product); KStream<String, Enriched> withProducts = withPlayers.join(products, keyMapper, productJoiner); // Group the enriched product stream KGroupedStream<String, Enriched> grouped = withProducts.groupBy( (key, value) -> value.getProductId().toString(), Grouped.with(Serdes.String(), JsonSerdes.Enriched())); // The initial value of our aggregation will be a new HighScores instance Initializer<HighScores> highScoresInitializer = HighScores::new; // The logic for aggregating high scores is implemented in the HighScores.add method Aggregator<String, Enriched, HighScores> highScoresAdder = (key, value, aggregate) -> aggregate.add(value); // Perform the aggregation, and materialize the underlying state store for querying KTable<String, HighScores> highScores = grouped.aggregate( highScoresInitializer, highScoresAdder);
Lee el
score-events
en unKStream
.Vuelve a codificar los mensajes para cumplir los requisitos de co-partición necesarios para la unión.
Lee el tema
players
en unKTable
, ya que el espacio de claves es grande (lo que nos permite repartir el estado entre varias instancias de la aplicación) y porque queremos un procesamiento sincronizado en el tiempo para la uniónscore-events -> players
.Lee el tema
products
comoGlobalKTable
, ya que el espacio de claves es pequeño y no necesitamos un procesamiento sincronizado en el tiempo.Une el flujo
score-events
y la tablaplayers
.Une la
score-events
enriquecida con la tablaproducts
.Agrupa el flujo enriquecido. Este es un requisito previo para la agregación.
Agrega el flujo agrupado. La lógica de agregación reside en la clase
HighScores
.
Añadamos la configuración necesaria para nuestra aplicación e iniciemos el streaming:
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "dev"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); KafkaStreams streams = new KafkaStreams(builder.build(), props); streams.start();
Llegados a este punto, nuestra aplicación está lista para empezar a recibir registros y calcular puntuaciones altas para nuestra tabla de clasificación. Sin embargo, aún nos queda un último paso que abordar para exponer los resultados de la tabla de clasificación a clientes externos. Pasemos al último paso de nuestra topología de procesadores, y aprendamos a exponer el estado de nuestra aplicación Kafka Streams mediante consultas interactivas.
Consultas interactivas
Una de las características definitorias de Kafka Streams es su capacidad para exponer el estado de la aplicación, tanto localmente como al mundo exterior. Esto último facilita la construcción de microservicios basados en eventos con una latencia extremadamente baja. En este tutorial, podemos utilizar consultas interactivas para exponer nuestras agregaciones de alta puntuación.
Para ello, necesitamos materializar el almacén de estados. Aprenderemos a hacerlo en la siguiente sección.
Almacenes materializados
Ya sabemos que los operadores con estado, como aggregate
, count
, reduce
, etc., aprovechan los almacenes de estado para gestionar el estado interno. Sin embargo, si te fijas bien en nuestro método para agregar puntuaciones altas del Ejemplo 4-7, no verás ninguna mención a un almacén de estado. Esta variante del método aggregate
utiliza un almacén de estado interno al que sólo accede la topología del procesador.
Si queremos permitir el acceso de sólo lectura al almacén de estado subyacente para consultas ad hoc, podemos utilizar uno de los métodos sobrecargados para forzar la materialización del almacén de estado localmente. Los almacenes de estado materializados difieren de los almacenes de estado internos en que se nombran explícitamente y se pueden consultar fuera de la topología del procesador. Aquí es donde resulta útil la clase Materialized
. El Ejemplo 4-9 muestra cómo materializar un almacén persistente de valores clave utilizando la clase Materialized
, lo que nos permitirá consultar el almacén mediante consultas interactivas.
Ejemplo 4-9. Almacén de estado materializado con una configuración mínima
KTable<String, HighScores> highScores = grouped.aggregate( highScoresInitializer, highScoresAdder, Materialized.<String, HighScores, KeyValueStore<Bytes, byte[]>> as("leader-boards") .withKeySerde(Serdes.String()) .withValueSerde(JsonSerdes.HighScores()));
Esta variación del método
Materialized.as
incluye tres genéricos:-
El tipo de clave del almacén (en este caso,
String
) -
El tipo de valor del almacén (en este caso,
HighScores
) -
El tipo de almacén de estado (en este caso, utilizaremos un almacén simple de clave-valor, representado por
KeyValueStore<Bytes, byte[]>
)
-
Proporciona un nombre explícito al almacén para que pueda consultarse fuera de la topología del procesador.
Podemos personalizar el almacén de estado materializado utilizando diversos parámetros, como los Serdes de clave y valor, así como otras opciones que exploraremos en el Capítulo 6.
Una vez que hemos materializado nuestro almacén de estado leader-boards
, estamos casi listos para exponer estos datos mediante consultas ad hoc. Sin embargo, lo primero que tenemos que hacer es recuperar el almacén de Kafka Streams.
Acceder a almacenes de estado de sólo lectura
Cuando necesitamos acceder a un almacén de estado en modo sólo lectura, necesitamos dosdatos:
-
El nombre del almacén estatal
-
El tipo de almacén estatal
Como vimos en el Ejemplo 4-9, el nombre de nuestro almacén de estados es leader-boards
. Tenemos que recuperar la envoltura de sólo lectura adecuada para nuestro almacén de estado subyacente utilizando la clase de fábrica QueryableStoreTypes
. Hay varios almacenes de estado compatibles, entre ellos
-
QueryableStoreTypes.keyValueStore()
-
QueryableStoreTypes.timestampedKeyValueStore()
-
QueryableStoreTypes.windowStore()
-
QueryableStoreTypes.timestampedWindowStore()
-
QueryableStoreTypes.sessionStore()
En nuestro caso, estamos utilizando un almacén simple de clave-valor, por lo que necesitamos el método QueryableStoreType.keyValueStore()
. Con el nombre del almacén de estado y el tipo de almacén de estado, podemos instanciar una instancia de un almacén de estado consultable para utilizarlo en consultas interactivas, mediante utilizando el método KafkaStreams.store()
, como se muestra en el Ejemplo 4-10.
Ejemplo 4-10. Instanciar un almacén clave-valor que pueda utilizarse para realizar consultas interactivas
ReadOnlyKeyValueStore<String, HighScores> stateStore = streams.store( StoreQueryParameters.fromNameAndType( "leader-boards", QueryableStoreTypes.keyValueStore()));
Cuando tengamos nuestra instancia de almacén de estados, podremos consultarla. En la siguiente sección se analizan los distintos tipos de consulta disponibles en los almacenes clave-valor.
Consulta de almacenes clave-valor sin ventanas
Cada tipo de almacén de estado admite distintos tipos de consultas. Por ejemplo, los almacenes de ventana (por ejemplo, ReadOnlyWindowStore
) admiten búsquedas de claves utilizando rangos temporales, mientras que los almacenes simples de clave-valor (ReadOnlyKeyValueStore
) admiten búsquedas de puntos, escaneos de rangos y consultas de recuento.
Hablaremos de los almacenes de estado con ventanas en el próximo capítulo, así que, de momento, vamos a demostrar los tipos de consultas que podemos hacer a nuestro almacén leader-boards
.
La forma más sencilla de determinar qué tipos de consulta están disponibles para tu tipo de almacén de estado es comprobar la interfaz subyacente. Como podemos ver en la definición de la interfaz del siguiente fragmento, los almacenes simples de clave-valor admiten varios tipos diferentes de consultas:
public interface ReadOnlyKeyValueStore<K, V> { V get(K key); KeyValueIterator<K, V> range(K from, K to); KeyValueIterator<K, V> all(); long approximateNumEntries(); }
Echemos un vistazo a cada uno de estos tipos de consulta, empezando por el primero: las búsquedas puntuales (get()
).
Búsqueda de puntos
Quizás el tipo de consulta más común, las búsquedas puntuales simplemente implican consultar el almacén de estado para una clave individual. Para realizar este tipo de consulta, podemos utilizar el método get
para recuperar el valor de una clave determinada. Por ejemplo:
HighScores highScores = stateStore.get(key);
Ten en cuenta que una búsqueda puntual devolverá una instancia deserializada del valor (en este caso, un objeto HighScores
, ya que es lo que estamos almacenando en nuestro almacén de estado) o null
si no se encuentra la clave.
Exploraciones de alcance
Los almacenes simples de clave-valor también admiten consultas de escaneo de rango. Los escaneos de rango devuelven un iterador para un rango inclusivo de claves. Es muy importante cerrar el iterador una vez que hayas terminado con él para evitar fugas de memoria.
El siguiente bloque de código muestra cómo ejecutar una consulta de rango, iterar sobre cada resultado y cerrar el iterador:
KeyValueIterator<String, HighScores> range = stateStore.range(1, 7); while (range.hasNext()) { KeyValue<String, HighScores> next = range.next(); String key = next.key; HighScores highScores = next.value; // do something with high scores object } range.close();
Devuelve un iterador que puede utilizarse para iterar por cada clave del rango seleccionado.
Obtener el siguiente elemento de la iteración.
El valor
HighScores
está disponible en la propiedadnext.value
.Es muy importante cerrar el iterador para evitar fugas de memoria. Otra forma de cerrarlo es utilizar una sentencia try-con-recursos al obtener el iterador.
Todas las entradas
Similar a un escaneo de rangos, la consulta all()
devuelve un iterador de pares clave-valor, y es similar a una consulta SELECT *
sin filtrar. Sin embargo, este tipo de consulta devolverá un iterador para todas las entradas de nuestro almacén de estados, en lugar de sólo las que estén dentro de un rango de claves específico. Al igual que con las consultas de rango, es importante cerrar el iterador una vez que hayas terminado con él para evitar fugas de memoria. El siguiente código muestra cómo ejecutar una consulta all()
. Iterar por los resultados y cerrar el iterador es lo mismo que la consulta de escaneo de rango, así que hemos omitido esa lógica por brevedad:
KeyValueIterator<String, HighScores> range = stateStore.all();
Número de entradas
Finalmente, el último tipo de consulta es similar a una consulta COUNT(*)
, y devuelve el número aproximado de entradas en el almacén de estado subyacente.
Nota
Cuando se utilizan almacenes persistentes RocksDB, el valor devuelto es aproximado, ya que calcular un recuento preciso de puede resultar caro y, cuando se trata de almacenes respaldados por RocksDB, también complicado. Tomado de las FAQ de RocksDB:
Obtener un número exacto de claves [en] bases de datos LSM como RocksDB es un problema difícil, ya que tienen claves duplicadas y entradas de borrado (es decir, lápidas) que requerirán una compactación completa para obtener un número exacto de claves. Además, si la base de datos RocksDB contiene operadores de fusión, también hará que el número estimado de claves sea menos preciso.
En cambio, si utilizas un almacén en memoria, el recuento será exacto.
Para ejecutar este tipo de consulta contra un almacén simple de clave-valor, podríamos ejecutar elsiguiente código:
long approxNumEntries = stateStore.approximateNumEntries();
Ahora que sabemos cómo consultar almacenes clave-valor sencillos, veamos desde dónde podemos ejecutar esas consultas.
Consultas locales
Cada instancia de una aplicación Kafka Streams puede consultar su propio estado local. Sin embargo, es importante recordar que, a menos que estés materializando un GlobalKTable
o ejecutando una única instancia de tu aplicación Kafka Streams,17 el estado local sólo representará una vista parcial de todo el estado de la aplicación (ésta es la naturaleza de un KTable
, como se explica en "KTable").
Por suerte para nosotros, Kafka Streams proporciona algunos métodos adicionales que facilitan la conexión de almacenes de estado distribuidos, y la ejecución de consultas remotas, que nos permiten consultar todo el estado de nuestra aplicación. A continuación aprenderemos sobre las consultas remotas.
Consultas remotas
Para consultar el estado completo de nuestra aplicación, necesitamos
-
Descubre qué instancias contienen los distintos fragmentos del estado de nuestra aplicación
-
Añade una llamada a procedimiento remoto (RPC) o un servicio REST para exponer el estado local a otras instancias de aplicación en ejecución18
-
Añade un cliente RPC o REST para consultar almacenes de estado remotos desde una instancia de aplicación en ejecución
En cuanto a los dos últimos puntos, tienes mucha flexibilidad para elegir qué componentes del servidor y del cliente quieres utilizar para la comunicación entre instancias. En este tutorial, utilizaremos Javalin para implementar un servicio REST debido a su sencilla API. También utilizaremos OkHttp, desarrollado por Square, para nuestro cliente REST por su facilidad de uso. Vamos a añadir estas dependencias a nuestra aplicación actualizando nuestro archivo build.gradle con lo siguiente:
dependencies { // required for interactive queries (server) implementation 'io.javalin:javalin:3.12.0' // required for interactive queries (client) implementation 'com.squareup.okhttp3:okhttp:4.9.0' // other dependencies }
Ahora abordemos la cuestión del descubrimiento de instancias. Necesitamos alguna forma de difundir qué instancias se están ejecutando en un momento dado y dónde se están ejecutando. Esto último puede conseguirse utilizando el parámetroAPPLICATION_SERVER_CONFIG
para especificar un par de host y puerto en Kafka Streams, como se muestra aquí:
Properties props = new Properties(); props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "myapp:8080"); // other Kafka Streams properties omitted for brevity KafkaStreams streams = new KafkaStreams(builder.build(), props);
Configura un punto final. Esto se comunicará a otras instancias de la aplicación en ejecución a través del protocolo de grupo de consumidores de Kafka. Es importante que utilices un par de IP y puerto que otras instancias puedan utilizar para comunicarse con tu aplicación (es decir,
localhost
no funcionaría, ya que se resolvería a IP diferentes en función de la instancia).
Ten en cuenta que establecer el parámetro APPLICATION_SERVER_CONFIG
config no le dice realmente a Kafka Streams que empiece a escuchar en el puerto que configures. De hecho, Kafka Streams no incluye un servicio RPC incorporado. Sin embargo, esta información de host/puerto se transmite a otras instancias en ejecución de tu aplicación Kafka Streams y está disponible a través de métodos API dedicados, de los que hablaremos más adelante. Pero primero, vamos a configurar nuestro servicio REST para que empiece a escuchar en el puerto apropiado (8080
en este ejemplo).
En términos de mantenimiento del código, tiene sentido definir nuestro servicio REST de la tabla de clasificación en un archivo específico, separado de la definición de la topología. El siguiente bloque de código muestra una implementación sencilla del servicio de tabla de clasificación:
class LeaderboardService { private final HostInfo hostInfo; private final KafkaStreams streams; LeaderboardService(HostInfo hostInfo, KafkaStreams streams) { this.hostInfo = hostInfo; this.streams = streams; } ReadOnlyKeyValueStore<String, HighScores> getStore() { return streams.store( StoreQueryParameters.fromNameAndType( "leader-boards", QueryableStoreTypes.keyValueStore())); } void start() { Javalin app = Javalin.create().start(hostInfo.port()); app.get("/leaderboard/:key", this::getKey); } }
HostInfo
es una simple clase envolvente de Kafka Streams que contiene un nombre de host y un puerto. Veremos cómo instanciarla en breve.Necesitamos hacer un seguimiento de la instancia local de Kafka Streams. Utilizaremos algunos métodos de la API en esta instancia en el siguiente bloque de código.
Añade un método dedicado para recuperar el almacén de estado que contiene las agregaciones de la tabla de clasificación. Sigue el mismo método para recuperar un almacén de estado de sólo lectura que vimos en el Ejemplo 4-10.
Inicia el servicio web basado en Javalin en el puerto configurado.
Añadir puntos finales con Javalin es fácil. Simplemente asignamos una ruta URL a un método, que implementaremos en breve. Los parámetros de ruta, que se especifican con dos puntos delante (por ejemplo,
:key
), nos permiten crear puntos finales dinámicos. Esto es ideal para una consulta de búsqueda de puntos.
Ahora, vamos a implementar el punto final /leaderboard/:key
, que mostrará las puntuaciones altas de una clave determinada (que en este caso es un ID de producto). Como hemos aprendido recientemente, podemos utilizar una búsqueda puntual para recuperar un único valor de nuestro almacén de estados. A continuación se muestra una implementación ingenua:
void getKey(Context ctx) { String productId = ctx.pathParam("key"); HighScores highScores = getStore().get(productId); ctx.json(highScores.toList()); }
Utiliza una búsqueda puntual para recuperar un valor del almacén de estado local.
Nota: el método
toList()
está disponible en el código fuente.
Por desgracia, esto no es suficiente. Considera el ejemplo en el que tenemos dos instancias en ejecución de nuestra aplicación Kafka Streams. Dependiendo de qué instancia consultemos y de cuándo emitamos la consulta (el estado puede moverse siempre que haya un reequilibrio del consumidor), es posible que no podamos recuperar el valor solicitado. La Figura 4-7 muestra este enigma.
Afortunadamente, Kafka Streams proporciona un método llamado queryMetadataForKey
,19 que nos permite descubrir la instancia de aplicación (local o remota) en la que vive una clave concreta. En el Ejemplo 4-11 se muestra una implementación mejorada de nuestro método getKey
.
Ejemplo 4-11. Una implementación actualizada del método getKey
, que aprovecha las consultas remotas para extraer datos de distintas instancias de la aplicación
void getKey(Context ctx) { String productId = ctx.pathParam("key"); KeyQueryMetadata metadata = streams.queryMetadataForKey( "leader-boards", productId, Serdes.String().serializer()); if (hostInfo.equals(metadata.activeHost())) { HighScores highScores = getStore().get(productId); if (highScores == null) { // game was not found ctx.status(404); return; } // game was found, so return the high scores ctx.json(highScores.toList()); return; } // a remote instance has the key String remoteHost = metadata.activeHost().host(); int remotePort = metadata.activeHost().port(); String url = String.format( "http://%s:%d/leaderboard/%s", remoteHost, remotePort, productId); OkHttpClient client = new OkHttpClient(); Request request = new Request.Builder().url(url).build(); try (Response response = client.newCall(request).execute()) { ctx.result(response.body().string()); } catch (Exception e) { ctx.status(500); } }
queryMetadataForKey
nos permite encontrar en qué host debe vivir una clave concreta.Si la instancia local tiene la clave, basta con consultar el almacén de estado local.
El método
queryMetadataForKey
no comprueba realmente si la clave existe. Utiliza el particionador de flujo por defecto20 para determinar dónde existiría la clave , si existiera. Por lo tanto, comprobamos si existe null (que se devuelve si no se encuentra la clave) y devolvemos una respuesta404
si no existe.Devuelve una respuesta formateada con las puntuaciones más altas.
Si hemos llegado hasta aquí, entonces la clave existe en un host remoto, si es que existe. Por tanto, construye una URL utilizando los metadatos, que incluya el host y el puerto de la instancia de Kafka Streams que contendría la clave especificada.
Invoca la petición y devuelve el resultado si se ha realizado correctamente.
Para ayudar a visualizar lo que ocurre aquí, la Figura 4-8 muestra cómo pueden conectarse los almacenes de estados distribuidos utilizando una combinación de descubrimiento de instancias y un servicio RPC/REST.
Pero, ¿y si necesitas ejecutar una consulta que no opere sobre una única clave? Por ejemplo, ¿y si necesitas contar el número de entradas en todos tus almacenes de estado distribuidos? El queryMetadataForKey
no funcionaría bien en este caso, ya que requiere que especifiquemos una única clave. En su lugar, aprovecharíamos otro método de Kafka Streams, llamado allMetadataForStore
, que devuelve el punto final de cada aplicación Kafka Streams en ejecución que comparta el mismo ID de aplicación y tenga al menos una partición activa para el nombre de almacén proporcionado.
Vamos a añadir un nuevo punto final a nuestro servicio de clasificación que muestre el número de registros de puntuaciones altas en todas las instancias de la aplicación en ejecución:
app.get("/leaderboard/count", this::getCount);
Ahora, implementaremos el método getCount
al que se hace referencia en el código anterior, que aprovecha el método allMetadataForStore
para obtener el número total de registros de cada almacén de estado remoto:
void getCount(Context ctx) { long count = getStore().approximateNumEntries(); for (StreamsMetadata metadata : streams.allMetadataForStore("leader-boards")) { if (!hostInfo.equals(metadata.hostInfo())) { continue; } count += fetchCountFromRemoteInstance( metadata.hostInfo().host(), metadata.hostInfo().port()); } ctx.json(count); }
Inicializa la cuenta con el número de entradas del almacén de estado local.
En la línea siguiente, utilizamos el método
allMetadataForStore
para recuperar los pares de host/puerto de cada instancia de Kafka Streams que contiene un fragmento del estado que queremos consultar.Si los metadatos corresponden al host actual, continúa con el bucle, puesto que ya hemos extraído el recuento de entradas del almacén de estado local.
Si los metadatos no pertenecen a la instancia local, entonces recupera el recuento de la instancia remota. Hemos omitido los detalles de implementación de
fetchCountFromRemoteInstance
en este texto, ya que es similar a lo que vimos en el Ejemplo 4-11, en el que instanciamos un cliente REST y emitimos una solicitud contra una instancia de aplicación de eliminación. Si te interesan los detalles de la implementación, consulta el código fuente de este capítulo.
Esto completa el último paso de nuestra topología de tabla de clasificación (ver Figura 4-1). Ahora podemos ejecutar nuestra aplicación, generar algunos datos ficticios y consultar nuestroservicio de clasificación.
Los datos ficticios para cada uno de los temas fuente se muestran en el Ejemplo 4-12.
Nota
Para los temas con clave (players
y products
), la clave del registro se formatea como<key>|<value>
. Para el tema score-events
, los registros ficticios se formatean simplemente como <value>
.
Ejemplo 4-12. Registros ficticios que produciremos para nuestros temas fuente
# players 1|{"id": 1, "name": "Elyse"} 2|{"id": 2, "name": "Mitch"} 3|{"id": 3, "name": "Isabelle"} 4|{"id": 4, "name": "Sammy"} # products 1|{"id": 1, "name": "Super Smash Bros"} 6|{"id": 6, "name": "Mario Kart"} # score-events {"score": 1000, "product_id": 1, "player_id": 1} {"score": 2000, "product_id": 1, "player_id": 2} {"score": 4000, "product_id": 1, "player_id": 3} {"score": 500, "product_id": 1, "player_id": 4} {"score": 800, "product_id": 6, "player_id": 1} {"score": 2500, "product_id": 6, "player_id": 2} {"score": 9000.0, "product_id": 6, "player_id": 3} {"score": 1200.0, "product_id": 6, "player_id": 4}
Si producimos estos datos ficticios en los temas apropiados y luego consultamos nuestro servicio de tabla de clasificación, veremos que nuestra aplicación Kafka Streams no sólo procesa las puntuaciones más altas, sino que ahora expone los resultados de nuestras operaciones con estado. En el siguiente bloque de código se muestra un ejemplo de respuesta a una consulta interactiva:
$ curl -s localhost:7000/leaderboard/1 | jq '.' [ { "playerId": 3, "productId": 1, "playerName": "Isabelle", "gameName": "Super Smash Bros", "score": 4000 }, { "playerId": 2, "productId": 1, "playerName": "Mitch", "gameName": "Super Smash Bros", "score": 2000 }, { "playerId": 1, "productId": 1, "playerName": "Elyse", "gameName": "Super Smash Bros", "score": 1000 } ]
Resumen
En este capítulo, has aprendido cómo Kafka Streams captura información sobre los eventos que consume, y cómo aprovechar la información recordada (estado) para realizar tareas de procesamiento de flujos más avanzadas, entre ellas:
-
Realizar una unión
KStream-KTable
-
Reencriptación de mensajes para cumplir los requisitos de coparticipación de determinados tipos de uniones
-
Realizar una unión
KStream-GlobalKTable
-
Agrupar los registros en representaciones intermedias (
KGroupedStream
,KGroupedTable
) para preparar nuestros datos para la agregación -
Agregación de flujos y tablas
-
Utilizar las consultas interactivas para exponer el estado de nuestra aplicación mediante consultas locales y remotas
En el próximo capítulo, hablaremos de otro aspecto de la programación con estado que tiene que ver no sólo con los eventos que ha visto nuestra aplicación, sino con cuándo se han producido. El tiempo desempeña un papel clave en el procesamiento con estado, por lo que comprender las distintas nociones de tiempo y también las diversas abstracciones basadas en el tiempo de la biblioteca Kafka Streams nos ayudará a ampliar aún más nuestros conocimientos sobre el procesamiento con estado.
1 Para más información sobre las plataformas de procesamiento relacional de flujos, consulta la entrada del blog de Robert Yokota de 2018 sobre el tema.
2 En memoria, en disco o en una combinación de ambos.
3 LevelDB se escribió en Google, pero cuando los ingenieros de Facebook empezaron a utilizarlo, descubrieron que era demasiado lento para sus flujos de trabajo integrados. Al cambiar el proceso de compactación monohilo de LevelDB por un proceso de compactación multihilo, y al aprovechar los filtros bloom para las lecturas, se mejoró drásticamente el rendimiento tanto de lectura como de escritura.
4 Hemos mencionado que los almacenes de estado son altamente configurables, e incluso la tolerancia a fallos puede desactivarse desactivando el comportamiento de registro de cambios.
5 Por ejemplo, inMemoryKeyValueStore
utiliza un Java TreeMap
, que se basa en un árbol rojo-negro, mientras que todos los almacenes persistentes de clave-valor utilizan RocksDB.
6 Por ejemplo, los almacenes de ventanas son almacenes clave-valor, pero las claves también incluyen la hora de la ventana además de la clave del registro.
7 Tim Berglund y Yaroslav Tkachenko hablan del caso de uso de Activision en el podcastStreaming Audio.
8 Ya hemos visto cómo Kafka Streams puede escribir directamente en temas de salida, lo que nos permite enviar datos procesados/enriquecidos a aplicaciones posteriores. Sin embargo, los clientes que deseen realizar consultas ad hoc contra una aplicación Kafka Streams pueden utilizar consultas interactivas.
9 Como se mencionó en el Capítulo 3, si nuestros temas contuvieran datos Avro, podríamos definir nuestro modelo de datos en un archivo de esquema Avro.
10 También podemos utilizar KStream
s para operaciones de búsqueda/unión, pero se trata siempre de una operación en ventana, por lo que hemos reservado la discusión de este tema hasta el próximo capítulo.
11 Florian Trossbach y Matthias J. Sax profundizan mucho más en este tema en su artículo "Crossing the Streams: Joins in Apache Kafka".
12 Las consultas UNION
son otro método para combinar conjuntos de datos en el mundo relacional. El comportamiento del operador merge
en Kafka Streams está más relacionado con el funcionamiento de una consulta UNION
.
13 Si no utilizas la Plataforma Confluent, el script es kafka-topics.sh.
14 El lado GlobalKTable
de la unión seguirá utilizando la clave de registro para la búsqueda.
15 Los flujos son sólo apéndices, por lo que no necesitan un sustractor.
16 Aún no hemos hablado de borrar claves, pero trataremos este tema en el Capítulo 6, cuando hablemos de la limpieza de los almacenes de estado.
17 Esto último no es aconsejable. Ejecutar una única aplicación Kafka Streams consolidaría todo el estado de la aplicación en una única instancia, pero Kafka Streams está pensado para ejecutarse de forma distribuida para maximizar el rendimiento y la tolerancia a fallos.
18 Y otros clientes si se desea, por ejemplo, humanos.
19 Que sustituye al método metadataForKey
, muy utilizado en versiones < 2.5, pero oficialmente obsoleto en esa versión.
20 Existe una versión sobrecargada del método queryMetadataForKey
que también acepta un StreamPartitioner
personalizado.
Get Dominar Kafka Streams y ksqlDB now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.