Capítulo 4. Ventana avanzada
Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com
¡Hola de nuevo! Espero que hayas disfrutado del Capítulo 3 tanto como yo. Las marcas de agua son un tema fascinante, y Slava las conoce mejor que nadie en el planeta. Ahora que ya tenemos un conocimiento más profundo de las marcas de agua, me gustaría profundizar en algunos temas más avanzados relacionados con las preguntas qué,dónde, cuándo y cómo.
Primero examinamos las ventanas en tiempo de procesamiento, que son una interesante mezcla de dónde y cuándo, para comprender mejor su relación con las ventanas en tiempo de eventos y tener una idea de cuándo es realmente el enfoque adecuado. A continuación, nos sumergimos en algunos conceptos más avanzados de ventanas en tiempo de eventos, examinando en detalle las ventanas de sesión y, por último, defendiendo por qué las ventanas personalizadas generalizadas son un concepto útil (y sorprendentemente sencillo) explorando tres tipos diferentes de ventanas personalizadas: ventanas fijas no alineadas, ventanas fijas por clave y ventanas de sesión delimitadas.
Cuándo/Dónde: Ventanas de tiempo de procesamiento
La ventana de tiempo de procesamiento es importante por dos razones:
-
Para determinados casos de uso, como el monitoreo del uso (por ejemplo, QPS de tráfico de servicios web), para los que quieres analizar un flujo de datos entrantes a medida que se observan, la ventana de tiempo de procesamiento es absolutamente el enfoque adecuado a adoptar.
-
Para los casos de uso en los que el momento en que ocurrieron los hechos es importante (por ejemplo, analizar las tendencias de comportamiento de los usuarios, facturación, puntuación, etc.), la ventana de tiempo de procesamiento es absolutamente el enfoque equivocado que hay que adoptar, y ser capaz de reconocer estos casos es fundamental.
Por ello, merece la pena conocer a fondo las diferencias entre las ventanas en tiempo de procesamiento y las ventanas en tiempo de eventos, sobre todo teniendo en cuenta la prevalencia de las ventanas en tiempo de procesamiento en muchos sistemas de streaming actuales.
Cuando trabajes en un modelo para el que la ventana como noción de primera clase esté estrictamente basada en el tiempo de evento, como el que se presenta en este libro, hay dos métodos que puedes utilizar para conseguir la ventana en tiempo de procesamiento:
- Disparadores
-
Ignora el tiempo del evento (es decir, utiliza una ventana global que abarque todo el tiempo del evento) y utiliza disparadores para proporcionar instantáneas de esa ventana en el eje tiempo-proceso.
- Tiempo de entrada
-
Asigna los tiempos de entrada como los tiempos de evento para los datos a medida que llegan, y utiliza la ventana de tiempo de evento normal a partir de ahí. Esto es esencialmente lo que hace algo como Spark Streaming 1.x.
Observa que los dos métodos son más o menos equivalentes, aunque difieren ligeramente en el caso de las canalizaciones multietapa: en la versión de activación, un conducto multietapa troceará las "ventanas" de tiempo de procesamiento de forma independiente en cada etapa, de modo que, por ejemplo, los datos de la ventana N de una etapa podrían acabar en la ventana N-1o N+1 de la etapa siguiente; en la versión de tiempo de entrada, después de que un dato se incorpore a la ventana N, permanecerá en la ventana N mientras dure el pipeline, debido a la sincronización del progreso entre etapas mediante marcas de agua (en el caso de Cloud Dataflow), límites de microlotes (en el caso de Spark Streaming), o cualquier otro factor de coordinación que intervenga a nivel del motor.
Como he señalado hasta la saciedad, el gran inconveniente de las ventanas en tiempo de procesamiento es que el contenido de las ventanas cambia cuando cambia el orden de observación de las entradas. Para aclarar este punto de forma más concreta, vamos a examinar estos tres casos de uso: ventanas en tiempo de eventos, ventanas en tiempo de procesamiento mediante activadores y ventanas en tiempo de procesamiento mediante tiempo de entrada.
Cada una se aplicará a dos conjuntos de entradas diferentes (por tanto, seis variaciones en total). Los dos conjuntos de entradas corresponderán exactamente a los mismos acontecimientos (es decir, los mismos valores, ocurridos en los mismos momentos del acontecimiento), pero con diferentes órdenes de observación. El primer conjunto será el orden de observación que hemos visto todo el tiempo, de color gris; el segundo tendrá todos los valores desplazados en el eje del tiempo de procesamiento como en la Figura 4-1, de color morado. Puedes imaginar simplemente que el ejemplo morado es otra forma en que la realidad podría haber sucedido si los vientos hubieran soplado del este en lugar del oeste (es decir, si el conjunto subyacente de sistemas distribuidos complejos hubiera actuado en un orden ligeramente distinto).
Ventana de tiempo de evento
Para establecer una línea de base, comparemos primero la ventana fija en tiempo de evento con una marca de agua heurística sobre estos dos ordenamientos de observación. Reutilizaremos el código temprano/tarde del Ejemplo 2-7/Figura2-10 para obtener los resultados que se muestran en la Figura 4-2. La parte izquierda es esencialmente lo que vimos antes; la parte derecha son los resultados sobre el segundo orden de observación. Lo importante aquí es observar que, aunque la forma general de los resultados difiere (debido a los distintos órdenes de observación en el tiempo de procesamiento), los resultados finales para las cuatro ventanas siguen siendo los mismos: 14, 18, 4 y 12.
Ventana de tiempo de procesamiento mediante activadores
Comparemos ahora esto con los dos métodos de tiempo de procesamiento que acabamos de describir. En primer lugar, probaremos el método de los activadores. Hay tres aspectos para que la "ventana" de tiempo de procesamiento funcione de esta manera:
- Ventana
-
Utilizamos la ventana global de tiempo de eventos porque esencialmente estamos emulando ventanas de tiempo de procesamiento con paneles de tiempo de eventos.
- Activación de
-
Disparamos periódicamente en el dominio del tiempo de procesamiento en función del tamaño deseado de las ventanas de tiempo de procesamiento.
- Acumulación
-
Utilizamos el modo de descarte para mantener los paneles independientes entre sí, dejando así que cada uno de ellos actúe como una "ventana" independiente en tiempo de procesamiento.
El código correspondiente se parece al del Ejemplo 4-1; ten en cuenta que la ventana global es la predeterminada en Beam, por lo que no hay una anulación específica de la estrategia de ventana.
Ejemplo 4-1. Ventana en tiempo de proceso mediante paneles repetidos y descartados de una ventana global en tiempo de evento
PCollection<KV<Team, Integer>> totals = input .apply(Window.triggering(Repeatedly(AlignedDelay(TWO_MINUTES))) .discardingFiredPanes()) .apply(Sum.integersPerKey());
Cuando se ejecuta en un corredor de flujo contra nuestros dos ordenamientos diferentes de los datos de entrada, los resultados tienen el aspecto de la Figura 4-3. He aquí algunas notas interesantes sobre esta figura:
-
Como estamos emulando ventanas de tiempo de procesamiento mediante paneles de tiempo de evento, las "ventanas" están delineadas en el eje del tiempo de procesamiento, lo que significa que su anchura efectiva se mide en el eje y en lugar de en el eje x.
-
Como la ventana de tiempo de procesamiento es sensible al orden en que se encuentran los datos de entrada, los resultados de cada una de las "ventanas" difieren para cada uno de los dos órdenes de observación, aunque los acontecimientos en sí ocurrieron técnicamente en los mismos momentos en cada versión. A la izquierda obtenemos 12, 18, 18, mientras que a la derecha obtenemos 7, 36, 5.
Ventana de tiempo de procesamiento mediante tiempo de entrada
Por último, veamos ventana de tiempo de procesamiento que se consigue mapeando los tiempos de evento de los datos de entrada para que sean sus tiempos de entrada. Desde el punto de vista del código, hay cuatro aspectos dignos de mención:
- Contrarreloj
-
Cuando llegan elementos, sus tiempos de evento deben sobrescribirse con el tiempo de entrada. Podemos hacerlo en Beam proporcionando un nuevo
DoFn
que establezca la marca de tiempo del elemento en la hora actual mediante el métodooutputWithTimestamp
. - Ventana
-
Vuelve a utilizar la ventana fija estándar de evento-tiempo.
- Activación de
-
Como el tiempo de entrada permite calcular una marca de agua perfecta, podemos utilizar el activador por defecto, que en este caso se dispara implícitamente exactamente una vez cuando la marca de agua pasa el final de la ventana.
- Modo de acumulación
-
Como sólo tenemos una salida por ventana, el modo de acumulación es irrelevante.
Así, el código real podría parecerse al del Ejemplo 4-2.
Ejemplo 4-2. Ventana en tiempo de proceso mediante paneles repetidos y descartados de una ventana global en tiempo de evento
PCollection<String> raw = IO.read().apply(ParDo.of( new DoFn<String, String>() { public void processElement(ProcessContext c) { c.outputWithTimestmap(new Instant()); } }); PCollection<KV<Team, Integer>> input = raw.apply(ParDo.of(new ParseFn()); PCollection<KV<Team, Integer>> totals = input .apply(Window.info(FixedWindows.of(TWO_MINUTES)) .apply(Sum.integersPerKey());
La ejecución en un motor de streaming se parecería a la Figura 4-4. A medida que llegan los datos, sus tiempos de evento se actualizan para coincidir con sus tiempos de entrada (es decir, los tiempos de procesamiento a la llegada), lo que da lugar a un desplazamiento horizontal hacia la derecha sobre la línea de filigrana ideal. He aquí algunas notas interesantes sobre esta figura:
-
Al igual que con el otro ejemplo de ventana de tiempo de procesamiento, obtenemos resultados diferentes cuando cambia el orden de las entradas, aunque los valores y los tiempos de los eventos de la entrada permanezcan constantes.
-
A diferencia del otro ejemplo, las ventanas vuelven a estar delineadas en el dominio evento-tiempo (y, por tanto, a lo largo del eje x). A pesar de ello, no son auténticas ventanas evento-tiempo; simplemente hemos trasladado el tiempo de procesamiento al dominio evento-tiempo, borrando el registro original de ocurrencia de cada entrada y sustituyéndolo por uno nuevo que, en cambio, representa el momento en que el dato fue observado por primera vez por la tubería.
-
A pesar de ello, gracias a la marca de agua, los disparos siguen produciéndose exactamente al mismo tiempo que en el ejemplo anterior de tiempo de procesamiento. Además, los valores de salida producidos son idénticos a los de ese ejemplo, como se predijo: 12, 18, 18 a la izquierda, y 7, 36, 5 a la derecha.
-
Como las marcas de agua perfectas son posibles cuando se utiliza el tiempo de entrada, la marca de agua real coincide con la marca de agua ideal, ascendiendo hacia arriba y hacia la derecha con una pendiente de uno.
Aunque es interesante ver las distintas formas en que puedes aplicar la ventana de tiempo de procesamiento, lo más importante aquí es lo que he estado insistiendo desde el primer capítulo: la ventana de tiempo de evento es independiente del orden, al menos en el límite (los paneles reales a lo largo del camino pueden diferir hasta que la entrada se complete); la ventana de tiempo de procesamiento no lo es. Si te interesan los momentos en los que ocurrieron realmente tus eventos, debes utilizar la ventana evento-tiempo o tus resultados carecerán de sentido. Me bajaré del estrado.
Dónde: Ventanas de la Sesión
Basta ya de ventanas en tiempo de procesamiento. Volvamos ahora a a las ventanas en tiempo de eventos, pero ahora vamos a ver una de mis funciones favoritas: las ventanas dinámicas basadas en datos llamadas sesiones.
Las sesiones son un tipo especial de ventana que captura un periodo de actividad en los datos que finaliza con un intervalo de inactividad. Son especialmente útiles en el análisis de datos porque pueden proporcionar una visión de las actividades de un usuario concreto durante un periodo de tiempo específico en el que estuvo involucrado en alguna actividad. Esto permite correlacionar las actividades dentro de la sesión, hacer inferencias sobre los niveles de participación basándose en la duración de las sesiones, etc.
Desde el punto de vista de las ventanas, las sesiones son especialmente interesantes por dos motivos:
-
Son un ejemplo de ventana basada en datos: la ubicación y los tamaños de las ventanas son consecuencia directa de los propios datos de entrada, en lugar de basarse en algún patrón predefinido en el tiempo, como las ventanas fijas y deslizantes.
-
También son un ejemplo de ventana no alineada; es decir, una ventana que no se aplica uniformemente a todos los datos, sino sólo a un subconjunto específico de ellos (por ejemplo, por usuario). Esto contrasta con las ventanas alineadas, como las ventanas fijas y deslizantes, que suelen aplicarse uniformemente a todos los datos.
En algunos casos, es posible etiquetar por adelantado los datos de una sesión con un identificador común (por ejemplo, un reproductor de vídeo que emite pings con información sobre la calidad del servicio; para un visionado determinado, todos los pings pueden etiquetarse por adelantado con un único identificador de sesión). En este caso, las sesiones son mucho más fáciles de construir porque básicamente es una forma de agrupar por clave.
Sin embargo, en el caso más general (es decir, cuando la sesión real en sí no se conoce de antemano), las sesiones deben construirse sólo a partir de las ubicaciones de los datos en el tiempo. Cuando se trata de datos desordenados, esto resulta especialmente complicado.
La Figura 4-5 muestra un ejemplo de esto, con cinco registros independientes agrupados en ventanas de sesión con un tiempo de espera de 60 minutos. Cada registro comienza en una ventana propia de 60 minutos (una proto-sesión). Fusionando las proto-sesiones que se solapan se obtienen las dos ventanas de sesión más grandes que contienen tres y dos registros, respectivamente.
Su idea clave para proporcionar un soporte general de sesión es que una ventana de sesión completa es, por definición, una composición de un conjunto de ventanas más pequeñas y superpuestas, cada una de las cuales contiene un único registro, con cada registro de la secuencia separado del siguiente por un intervalo de inactividad no mayor que un tiempo de espera predefinido. Así, aunque observemos los datos de la sesión fuera de orden, podemos construir la sesión final simplemente fusionando las ventanas solapadas de datos individuales a medida que llegan.
Para verlo de otra forma, considera el ejemplo que hemos estado utilizando hasta ahora. Si especificamos un tiempo de espera de sesión de un minuto, esperaríamos identificar dos sesiones en los datos, delineadas en la Figura 4-6 por las líneas negras discontinuas. Cada una de esas sesiones captura una ráfaga de actividad del usuario, con cada evento de la sesión separado por menos de un minuto de al menos otro evento de la sesión.
Para ver cómo funciona la fusión de ventanas para construir estas sesiones a lo largo del tiempo según se van encontrando eventos, veámoslo en acción. Tomaremos el código temprano/tarde con retracciones activadas del Ejemplo 2-10 y actualizaremos las ventanas para construir sesiones con un tiempo de espera de un minuto de duración en su lugar. El Ejemplo 4-3 ilustra cómo es esto.
Ejemplo 4-3. Despidos anticipados/a tiempo/tarde con ventanas de sesión y retracciones
PCollection<KV<Team, Integer>> totals = input .apply(Window.into(Sessions.withGapDuration(ONE_MINUTE)) .triggering( AfterWatermark() .withEarlyFirings(AlignedDelay(ONE_MINUTE)) .withLateFirings(AfterCount(1))) .accumulatingAndRetractingFiredPanes()) .apply(Sum.integersPerKey());
Ejecutado en un motor de streaming, obtendrías algo como lo que se muestra en la Figura 4-7 (fíjate en que he dejado las líneas negras discontinuas anotando las sesiones finales esperadas como referencia).
Aquí pasan muchas cosas, así que te guiaré por algunas de ellas:
-
Cuando se encuentra el primer registro con valor 5, se coloca en una única ventana de proto-sesión que comienza en la hora del evento de ese registro y abarca la anchura de la duración del intervalo de la sesión; por ejemplo, un minuto más allá del momento en que se produjo ese dato. Cualquier ventana que encontremos en el futuro y que se solape con esta ventana debe formar parte de la misma sesión y se fusionará en ella como tal.
-
El segundo registro en llegar es el 7, que igualmente se coloca en su propia ventana de proto-sesión, dado que no se solapa con la ventana del 5.
-
Mientras tanto, la marca de agua ha pasado el final de la primera ventana, por lo que el valor 5 se materializa como resultado puntual justo antes de las 12:06. Poco después, la segunda ventana también se materializa como resultado especulativo con el valor 7, justo cuando el tiempo de procesamiento llega a las 12:06.
-
A continuación observamos un par de registros 3 y 4, cuyas proto-sesiones se solapan. Como resultado, se fusionan, y para cuando se dispara la activación temprana de las 12:07, se emite una única ventana con valor 7.
-
Cuando la 8 llega poco después, se solapa con las dos ventanas con valor 7. Así, las tres se fusionan, formando una nueva sesión combinada con valor 22. Cuando la marca de agua pasa entonces por el final de esta sesión, materializa tanto la nueva sesión con valor 22 como las retracciones de las dos ventanas de valor 7 que se emitieron anteriormente, pero que luego se incorporaron a ella.
-
Una danza similar se produce cuando el 9 llega tarde, uniendo la proto-sesión con valor 5 y la sesión con valor 22 en una única sesión mayor de valor 36. La 36 y las retracciones de las ventanas 5 y 22 son emitidas inmediatamente por el activador de datos tardíos.
Esto es algo muy potente. Y lo que es realmente asombroso es lo fácil que resulta describir algo así dentro de un modelo que descompone las dimensiones del procesamiento de flujos en piezas distintas y componibles. Al final, puedes centrarte más en la interesante lógica empresarial que tienes entre manos, y menos en las minucias de dar forma a los datos para que sean utilizables.
Si no me crees, echa un vistazo a esta entrada del blog en la que se describe cómo crear sesiones manualmente en Spark Streaming 1. x (ten en cuenta que esto no se hace para señalarles con el dedo; la gente de Spark había hecho un trabajo lo suficientemente bueno con todo lo demás como para que alguien se molestara en tomarse la molestia de documentar lo que se necesita para crear una variedad específica de soporte de sesiones sobre Spark 1.x; no se puede decir lo mismo de la mayoría de los demás sistemas que existen). Es bastante complicado, y ni siquiera hacen sesiones adecuadas en tiempo de eventos, ni proporcionan disparos especulativos o tardíos, ni retracciones.
Dónde: Ventana personalizada
Hasta ahora, hemos hablado principalmente de tipos predefinidos de estrategias de ventanas: fijas, deslizantes y sesiones. Puedes sacar mucho partido de los tipos de ventanas estándar, pero hay muchos casos de uso en el mundo real para los que poder definir una estrategia de ventanas personalizada puede realmente salvarte el día (tres de los cuales vamos a ver ahora).
La mayoría de los sistemas actuales no admiten ventanas personalizadas en el grado en que las admite Beam,1 así que nos centraremos en el enfoque de Beam. En Beam, una estrategia de ventanas personalizadas consiste en dos cosas:
- Asignación de ventanas
-
Esto coloca cada elemento de en una ventana inicial. En el límite, esto permite colocar cada elemento dentro de una ventana única, lo que es muy potente.
- (Opcional) fusión de ventanas
-
Esto permite que las ventanas se fusionen en los momentos de agrupación, lo que hace posible que las ventanas evolucionen con el tiempo, lo que hemos visto antes en acción con las ventanas de sesión.
Para que te hagas una idea de lo sencillas que son en realidad las estrategias de ventanas, y también de lo útil que puede ser la compatibilidad con ventanas personalizadas, vamos a examinar en detalle las implementaciones estándar de ventanas fijas y sesiones en Beam, y luego consideraremos algunos casos de uso del mundo real que requieren variaciones personalizadas sobre esos temas. En el proceso, veremos lo fácil que es crear una estrategia de ventanas personalizada, y lo limitante que puede ser la falta de soporte para ventanas personalizadas cuando tu caso de uso no encaja del todo en los enfoques estándar.
Variaciones de las Ventanas Fijas
Para empezar, veamos la estrategia relativamente sencilla de las ventanas fijas. La implementación de las ventanas fijas es tan sencilla como puedas imaginar, y consiste en la siguiente lógica:
- Asignación
-
El elemento se coloca en la ventana fija adecuada en función de su marca de tiempo y de los parámetros de tamaño y desplazamiento de la ventana.
- Fusión
-
Ninguna.
Una versión abreviada del código tiene el aspecto del Ejemplo 4-4.
Ejemplo 4-4. Implementación abreviada de FixedWindows
public
class
FixedWindows
extends
WindowFn
<
Object
,
IntervalWindow
>
{
private
final
Duration
size
;
private
final
Duration
offset
;
public
Collection
<
IntervalWindow
>
assignWindow
(
AssignContext
c
)
{
long
start
=
c
.
timestamp
().
getMillis
()
-
c
.
timestamp
()
.
plus
(
size
)
.
minus
(
offset
)
.
getMillis
()
%
size
.
getMillis
();
return
Arrays
.
asList
(
IntervalWindow
(
new
Instant
(
start
),
size
));
}
}
Ten en cuenta que el objetivo de mostrarte el código aquí no es tanto enseñarte a escribir estrategias de ventanas (aunque está bien desmitificarlas y resaltar lo sencillas que son). En realidad, se trata de ayudar a contrastar la facilidad y la dificultad comparativas de dar soporte a algunos casos de uso relativamente básicos, con y sin ventanas personalizadas, respectivamente. Consideremos ahora dos casos de uso que son variaciones del tema de las ventanas fijas.
Ventanas fijas no alineadas
Una característica de la implementación por defecto de las ventanas fijas a la que aludimos anteriormente es que las ventanas se alinean en todos los datos. En nuestro ejemplo, la ventana que va de las 12 a las 13 horas de un equipo determinado se alinea con las ventanas correspondientes de todos los demás equipos, que también van de las 12 a las 13 horas. Y en casos de uso en los que quieras comparar ventanas similares a través de otra dimensión, como entre equipos, esta alineación es muy útil. Sin embargo, tiene un coste algo sutil. Todas las ventanas activas desde el mediodía hasta la 1 de la tarde se completan más o menos al mismo tiempo, lo que significa que una vez por hora el sistema recibe una carga masiva de ventanas para materializar.
Para ver a qué me refiero, veamos un ejemplo concreto(Ejemplo 4-5). Empezaremos con un canal de suma de puntuaciones como el que hemos utilizado en la mayoría de los ejemplos, con ventanas fijas de dos minutos y un único disparador de marca de agua.
Ejemplo 4-5. Activador de integridad de la marca de agua (igual que en el Ejemplo 2-6)
PCollection<KV<Team, Integer>> totals = input .apply(Window.into(FixedWindows.of(TWO_MINUTES)) .triggering(AfterWatermark())) .apply(Sum.integersPerKey());
Pero en este caso, examinaremos dos claves diferentes (ver Figura 4-8) del mismo conjunto de datos en paralelo. Lo que veremos es que las salidas de esas dos claves están todas alineadas, debido a que las ventanas están alineadas en todas las claves. Como resultado, acabamos con N paneles materializándose cada vez que la marca de agua pasa el final de una ventana, donde N es el número de claves con actualizaciones en esa ventana. En este ejemplo, donde N es 2, quizá no sea demasiado doloroso. Pero cuando N empieza a ordenarse en miles, millones o más, esa ráfaga sincronizada puede volverse problemática.
En circunstancias en las que la comparación entre ventanas es innecesaria, a menudo es más deseable repartir la carga de finalización de ventanas uniformemente a lo largo del tiempo. Esto hace que la carga del sistema sea más predecible, lo que puede reducir los requisitos de aprovisionamiento para gestionar los picos de carga. Sin embargo, en la mayoría de los sistemas, las ventanas fijas no alineadas sólo están disponibles si el sistema las admite de fábrica.2 Pero con el soporte de ventanas personalizadas, es una modificación relativamente trivial de la implementación de ventanas fijas por defecto proporcionar soporte de ventanas fijas no alineadas. Lo que queremos es seguir garantizando que las ventanas de todos los elementos agrupados (es decir, los que tienen la misma clave) tengan la misma alineación, pero relajando la restricción de alineación entre claves diferentes. El código cambia a la estrategia predeterminada de ventanas fijas y se parece al Ejemplo 4-6.
Ejemplo 4-6. Implementación abreviada de UnalignedFixedWindows
public
class
Unaligned
FixedWindows
extends
WindowFn
<
KV
<
K
,
V
>
,
IntervalWindow
>
{
private
final
Duration
size
;
private
final
Duration
offset
;
public
Collection
<
IntervalWindow
>
assignWindow
(
AssignContext
c
)
{
long
perKeyShift
=
hash
(
c
.
element
(
)
.
key
(
)
)
%
size
.
getMillis
(
)
;
long
start
=
perKe
yShift
+
c
.
timestamp
(
)
.
getMillis
(
)
-
c
.
timestamp
(
)
.
plus
(
size
)
.
minus
(
offset
)
.
getMillis
(
)
%
size
.
getMillis
(
)
;
return
Arrays
.
asList
(
IntervalWindow
(
new
Instant
(
start
)
,
size
)
)
;
}
}
Con este cambio, las ventanas de todos los elementos con la misma clave están alineadas,3 pero las ventanas de los elementos con claves diferentes (normalmente) no estarán alineadas, lo que reparte la carga de finalización de la ventana a costa de que las comparaciones entre claves tengan menos sentido. Podemos cambiar nuestra cadena para que utilice nuestra nueva estrategia de ventanas, ilustrada en el Ejemplo 4-7.
Ejemplo 4-7. Ventanas fijas no alineadas con un único activador de marca de agua
PCollection<KV<Team, Integer>> totals = input .apply(Window.into(UnalignedFixedWindows.of(TWO_MINUTES)) .triggering(AfterWatermark())) .apply(Sum.integersPerKey());
Y luego puedes ver cómo queda esto en la Figura 4-9, comparando diferentes alineaciones de ventana fija en el mismo conjunto de datos que antes (en este caso, he elegido un desplazamiento de fase máximo entre las dos alineaciones para resaltar más claramente las ventajas, dado que las fases elegidas al azar en un gran número de teclas producirán efectos similares).
Observa cómo no hay casos en los que emitamos varios paneles para varias teclas simultáneamente. En cambio, los paneles llegan individualmente con una cadencia mucho más uniforme. Éste es otro ejemplo de poder hacer concesiones en una dimensión (capacidad de comparar entre claves) a cambio de ventajas en otra dimensión (reducción de los requisitos de aprovisionamiento máximo de recursos) cuando el caso de uso lo permite. Esta flexibilidad es fundamental cuando intentas procesar cantidades masivas de datos de la forma más eficiente posible.
Veamos ahora una segunda variante de las ventanas fijas, que está más intrínsecamente ligada a los datos que se procesan.
Ventanas fijas por elemento/llave
Nuestro segundo ejemplo es cortesía de uno de los primeros adoptantes de Cloud Dataflow en . Esta empresa genera datos analíticos para sus clientes, pero cada cliente puede configurar el tamaño de la ventana sobre la que desea agregar sus métricas. En otras palabras, cada cliente puede definir el tamaño específico de sus ventanas fijas.
Dar soporte a un caso de uso como éste no es demasiado difícil siempre que el número de tamaños de ventana disponibles sea en sí mismo fijo. Por ejemplo, podrías imaginar ofrecer la opción de elegir ventanas fijas de 30 minutos, 60 minutos y 90 minutos, y luego ejecutar una canalización independiente (o una bifurcación de la canalización) para cada una de esas opciones. No es lo ideal, pero tampoco es demasiado horrible. Sin embargo, eso se vuelve rápidamente inabordable a medida que aumenta el número de opciones, y en el límite de proporcionar soporte para tamaños de ventana verdaderamente arbitrarios (que es lo que requería el caso de uso de este cliente) es totalmente impracticable.
Afortunadamente, como cada registro que procesa el cliente ya está anotado con metadatos que describen el tamaño de ventana deseado para la agregación, dar soporte a un tamaño de ventana fijo arbitrario y por usuario fue tan sencillo como cambiar un par de líneas de la implementación de ventanas fijas del stock, como se demuestra en el Ejemplo 4-8.
Ejemplo 4-8. Implementación modificada (y abreviada) de FixedWindows que admite tamaños de ventana por elemento
public class PerElementFixedWindows<T extends HasWindowSize> extends WindowFn<T, IntervalWindow> { private final Duration offset; public Collection<IntervalWindow> assignWindow(AssignContext c) { long perElementSize = c.element().getWindowSize(); long start = perKeyShift + c.timestamp().getMillis() - c.timestamp() .plus(size) .minus(offset) .getMillis() % size.getMillis(); return Arrays.asList(IntervalWindow( new Instant(start), perElementSize)); } }
Con este cambio, cada elemento se asigna a una ventana fija con el tamaño adecuado, según dicten los metadatos que lleve el propio elemento.4 Cambiar el código de la canalización para utilizar esta nueva estrategia vuelve a ser trivial, como se muestra en el Ejemplo 4-9.
Ejemplo 4-9. Tamaños de ventana fija por elemento con un único activador de marca de agua
PCollection<KV<Team, Integer>> totals = input .apply(Window.into(new PerElementFixedWindows()) .triggering(AfterWatermark())) .apply(Sum.integersPerKey());
Y si observamos esta cadena en acción(Figura 4-10), es fácil ver que los elementos de la Clave A tienen todos dos minutos como tamaño de ventana, mientras que los elementos de la Clave B tienen tamaños de ventana de un minuto.
En realidad, no es algo que puedas esperar razonablemente que te proporcione un sistema; la naturaleza de dónde se almacenan las preferencias de tamaño de ventana es demasiado específica para cada caso de uso como para que tenga sentido intentar incorporarla a una API estándar. Sin embargo, como demuestran las necesidades de este cliente, existen casos de uso como éste. Por eso es tan poderosa la flexibilidad que proporcionan las ventanas personalizadas.
Variaciones de las Ventanas de Sesión
Para comprender realmente la utilidad de las ventanas personalizadas, veamos un último ejemplo, que es una variación de las sesiones. Las ventanas de sesión son, comprensiblemente, un poco más complejas que las ventanas fijas. Su implementación consiste en lo siguiente
- Asignación
-
Cada elemento se coloca inicialmente en una ventana de proto-sesión que comienza en la marca de tiempo del elemento y se extiende durante la duración del hueco.
- Fusión
-
En el momento de la agrupación, se ordenan todas las ventanas elegibles, tras lo cual se fusionan las ventanas que se solapan.
Una versión abreviada del código de las sesiones (fusionado a mano a partir de varias clases ayudantes) tiene un aspecto parecido al que se muestra en el Ejemplo 4-10.
Ejemplo 4-10. Implementación de Sesiones abreviadas
public
class
Sessions
extends
WindowFn
<
Object
,
IntervalWindow
>
{
private
final
Duration
gapDuration
;
public
Collection
<
IntervalWindow
>
assignWindows
(
AssignContext
c
)
{
return
Arrays
.
asList
(
new
IntervalWindow
(
c
.
timestamp
(),
gapDuration
));
}
public
void
mergeWindows
(
MergeContext
c
)
throws
Exception
{
List
<
IntervalWindow
>
sortedWindows
=
new
ArrayList
<>();
for
(
IntervalWindow
window
:
c
.
windows
())
{
sortedWindows
.
add
(
window
);
}
Collections
.
sort
(
sortedWindows
);
List
<
MergeCandidate
>
merges
=
new
ArrayList
<>();
MergeCandidate
current
=
new
MergeCandidate
();
for
(
IntervalWindow
window
:
sortedWindows
)
{
if
(
current
.
intersects
(
window
))
{
current
.
add
(
window
);
}
else
{
merges
.
add
(
current
);
current
=
new
MergeCandidate
(
window
);
}
}
merges
.
add
(
current
);
for
(
MergeCandidate
merge
:
merges
)
{
merge
.
apply
(
c
);
}
}
}
Como antes, el objetivo de ver el código no es tanto enseñarte cómo se implementan las funciones de ventanas personalizadas, ni siquiera cómo es la implementación de las sesiones; en realidad, es mostrar la facilidad con la que puedes dar soporte a nuevos usos mediante ventanas personalizadas.
Sesiones limitadas
Un caso de uso personalizado con el que me he encontrado muchas veces son las sesiones limitadas: sesiones a las que no se permite crecer más allá de un tamaño determinado, ya sea en tiempo, en número de elementos o en alguna otra dimensión. Esto puede deberse a razones semánticas, o puede ser simplemente un ejercicio de protección contra el spam. Sin embargo, dadas las variaciones en los tipos de límites (algunos casos de uso se preocupan por el tamaño total de la sesión en tiempo de evento, otros por el recuento total de elementos, otros por la densidad de elementos, etc.), es difícil proporcionar una API limpia y concisa para sesiones limitadas. Mucho más práctico es permitir que los usuarios implementen su propia lógica de ventanas personalizada, adaptada a su caso de uso específico. Un ejemplo de un caso de uso de este tipo, en el que las ventanas de sesión están limitadas en el tiempo, podría parecerse al Ejemplo 4-11 (eludiendo parte de la jerga del constructor que utilizaremos aquí).
Ejemplo 4-11. Implementación de Sesiones abreviadas
public
class
Bounded
Sessions
extends
WindowFn
<
Object
,
IntervalWindow
>
{
private
final
Duration
gapDuration
;
private
final
Duration
maxSize
;
public
Collection
<
IntervalWindow
>
assignWindows
(
AssignContext
c
)
{
return
Arrays
.
asList
(
new
IntervalWindow
(
c
.
timestamp
(
)
,
gapDuration
)
)
;
}
private
Duration
windowSize
(
IntervalWindow
window
)
{
return
window
=
=
null
?
new
Duration
(
0
)
:
new
Duration
(
window
.
start
(
)
,
window
.
end
(
)
)
;
}
public
void
mergeWindows
(
MergeContext
c
)
throws
Exception
{
List
<
IntervalWindow
>
sortedWindows
=
new
ArrayList
<
>
(
)
;
for
(
IntervalWindow
window
:
c
.
windows
(
)
)
{
sortedWindows
.
add
(
window
)
;
}
Collections
.
sort
(
sortedWindows
)
;
List
<
MergeCandidate
>
merges
=
new
ArrayList
<
>
(
)
;
MergeCandidate
current
=
new
MergeCandidate
(
)
;
for
(
IntervalWindow
window
:
sortedWindows
)
{
MergeCandidate
next
=
new
MergeCandidate
(
window
)
;
if
(
current
.
intersects
(
window
)
)
{
current
.
add
(
window
)
;
if
(
windowSize
(
current
.
union
)
<
=
(
maxSize
-
gapDuration
)
)
continue
;
// Current window exceeds bounds, so flush and move to next
next
=
new
MergeCandidate
(
)
;
}
merges
.
add
(
current
)
;
current
=
next
;
}
merges
.
add
(
current
)
;
for
(
MergeCandidate
merge
:
merges
)
{
merge
.
apply
(
c
)
;
}
}
}
Como siempre, actualizar nuestro pipeline (la versión early/on-time/late del mismo, del Ejemplo 4-3, en este caso) para utilizar esta estrategia de ventanas personalizada es trivial, como puedes ver en el Ejemplo 4-12.
Ejemplo 4-12. Despidos tempranos, a tiempo y tardíos mediante la API temprano/a tiempo/tarde
PCollection<KV<Team, Integer>> totals = input .apply(Window.into(BoundedSessions .withGapDuration(ONE_MINUTE) .withMaxSize(THREE_MINUTES)) .triggering( AfterWatermark() .withEarlyFirings(AlignedDelay(ONE_MINUTE)) .withLateFirings(AfterCount(1))) .accumulatingAndRetractingFiredPanes()) .apply(Sum.integersPerKey());
Y ejecutado sobre nuestro ejemplo en ejecución, podría entonces parecerse a la Figura 4-11.
Observa cómo la sesión grande con valor 36 que abarcaba [12:00.26, 12:05.20), o casi cinco minutos de tiempo, en la implementación de sesiones no limitadas de la Figura 4-7 ahora acaba dividida en dos sesiones más cortas de 2 minutos y 2 minutos y 53 segundos de duración.
Dado que hoy en día pocos sistemas ofrecen soporte para ventanas personalizadas, merece la pena señalar cuánto más esfuerzo requeriría implementar algo así utilizando un sistema que sólo admitiera una implementación de sesiones ilimitadas. Tu único recurso real sería escribir código a continuación de la lógica de agrupación de sesiones que mirara las sesiones generadas y las troceara si superan el límite de longitud. Esto requeriría la capacidad de descomponer una sesión a posteriori, lo que obviaría las ventajas de la agregación incremental (algo que veremos con más detalle en el Capítulo 7), aumentando el coste. También eliminaría cualquier beneficio de protección contra el spam que se pudiera esperar obtener limitando la longitud de las sesiones, porque éstas primero tendrían que crecer hasta su tamaño completo antes de ser troceadas o truncadas.
Una talla no sirve para todos
Ya hemos visto tres casos de uso reales, cada uno de los cuales era una variación sutil de de los tipos de ventanas de stock que suelen proporcionar los sistemas de procesamiento de datos: ventanas fijas no alineadas, ventanas fijas por elemento y sesiones delimitadas. En los tres casos, vimos lo sencillo que era dar soporte a esos casos de uso mediante ventanas personalizadas y lo mucho más difícil (o caro) que sería dar soporte a esos casos de uso sin ellas. Aunque el uso de ventanas personalizadas todavía no está muy extendido en el sector, es una función que proporciona una flexibilidad muy necesaria para equilibrar las ventajas y desventajas cuando se crean cadenas de procesamiento de datos que deben gestionar casos de uso complejos del mundo real con cantidades masivas de datos de la forma más eficiente posible.
Resumen
Las ventanas avanzadas son un tema complejo y variado. En este capítulo hemos tratado tres conceptos avanzados:
- Ventanas de tiempo de procesamiento
-
Hemos visto cómo se relaciona esto con la ventana evento-tiempo, señalando los lugares en los que es intrínsecamente útil y, lo que es más importante, identificando aquellos en los que no lo es, destacando específicamente la estabilidad de los resultados que nos proporciona la ventana evento-tiempo.
- Ventanas de la sesión
-
Tuvimos nuestra primera introducción a la clase dinámica de estrategias de fusión de ventanas y viendo cuánto trabajo pesado hace el sistema por nosotros al proporcionarnos una construcción tan potente que puedes colocar simplemente en su sitio.
- Ventanas a medida
-
Aquí hemos visto tres ejemplos reales de ventanas personalizadas que son difíciles o imposibles de conseguir en sistemas que sólo proporcionan un conjunto estático de estrategias de ventanas estándar, pero relativamente triviales de implementar en un sistema con soporte de ventanas personalizadas:
-
Ventanas fijasno alineadas, que proporcionan una distribución más uniforme de las salidas a lo largo del tiempo cuando se utiliza un activador de marca de agua junto con ventanas fijas.
-
Ventanas fijas por elemento, que proporcionan la flexibilidad de elegir dinámicamente el tamaño de las ventanas fijas por elemento (por ejemplo, para proporcionar tamaños de ventana personalizables por usuario o por campaña publicitaria), para una mayor personalización de la semántica de la canalización al caso de uso en cuestión.
-
Ventanas de sesión limitadas, que limitan el tamaño que puede alcanzar una sesión determinada; por ejemplo, para contrarrestar los intentos de spam o para poner un límite a la latencia de las sesiones completadas que materializa el canal.
-
Después de profundizar en las marcas de agua en el Capítulo 3 con Slava y de realizar aquí un amplio estudio de las ventanas avanzadas, hemos superado con creces los fundamentos del procesamiento robusto de flujos en múltiples dimensiones. Con ello, concluimos nuestro enfoque sobre el Modelo del Haz.
A continuación, Reuven dedica el Capítulo 5 a las garantías de coherencia, el procesamiento exactamente una vez y los efectos secundarios, tras lo cual iniciaremos nuestro viaje a la Parte II, Secuencias y tablas, con el Capítulo 6.
1 Que yo sepa, Apache Flink es el único otro sistema que admite ventanas personalizadas en la medida en que lo hace Beam. Y para ser justos, su compatibilidad va incluso más allá de la de Beam, gracias a la capacidad de proporcionar un desalojador de ventanas personalizado. Asómbrate.
2 Y, de hecho, no conozco ningún sistema de este tipo en este momento.
3 Esto implica naturalmente el uso de datos por clave, pero como de todas formas la ventana está intrínsecamente ligada a la agrupación por clave, esa restricción no es especialmente gravosa.
4 Y no es fundamental que el propio elemento conozca el tamaño de la ventana; podrías buscar y almacenar en caché con la misma facilidad el tamaño de ventana adecuado para cualquiera que sea la dimensión deseada; por ejemplo, por usuario.
Get Sistemas de streaming now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.