Capítulo 4. Flujo de datos: Publicación e Ingesta con Pub/Sub y Dataflow

Este trabajo se ha traducido utilizando IA. Agradecemos tus opiniones y comentarios: translation-feedback@oreilly.com

En el Capítulo 3, desarrollamos un cuadro de mandos para explicar un modelo basado en tablas de contingencia para sugerir si cancelar una reunión. Sin embargo, el cuadro de mando que construimos carecía de inmediatez porque no estaba vinculado al contexto de los usuarios. Dado que los usuarios necesitan poder ver un cuadro de mando y ver la información que les resulta relevante en ese momento, tenemos que construir un cuadro de mando en tiempo real con indicaciones de ubicación.

¿Cómo añadiríamos contexto a nuestro cuadro de mandos? Tendríamos que mostrar mapas de retrasos en tiempo real. Para ello, necesitaremos las ubicaciones de los aeropuertos, y necesitaremos datos en tiempo real. Las ubicaciones de los aeropuertos pueden obtenerse de la Oficina de Estadísticas de Transporte de EEUU (BTS; la misma agencia gubernamental de EEUU de la que obtuvimos nuestros datos históricos de vuelos). Sin embargo, los datos de vuelos en tiempo real son un producto comercial. Si quisiéramos crear un negocio a partir de la predicción de llegadas de vuelos, compraríamos esa fuente de datos. Sin embargo, a efectos de este libro, vamos a simularlo.

Simular la creación de un flujo en tiempo real a partir de datos históricos tiene la ventaja de permitirnos ver los dos lados de un flujo (tanto la producción como el consumo). En la sección siguiente, veremos cómo podríamos transmitir datos a la base de datos si los recibiéramos en tiempo real.

Todos los fragmentos de código de este capítulo están disponibles en la carpeta 04_streaming del repositorio GitHub del libro. Consulta el archivo README.md de ese directorio para obtener instrucciones sobre cómo realizar los pasos descritos en este capítulo.

Diseñar el feed de eventos

Supongamos que deseamos crear una fuente de eventos, no con los 100 campos del conjunto de datos BTS sin procesar, sino sólo con los pocos campos que seleccionamos en el Capítulo 3 como relevantes para el problema de la predicción del retraso de los vuelos (ver Figura 4-1).

Figura 4-1. En el Capítulo 3, creamos una vista en BigQuery con los campos relevantes para el problema de predicción de retrasos de vuelos. En este capítulo, simularemos un flujo en tiempo real de esta información.

Para simular un flujo en tiempo real de la información de vuelo mostrada en la Figura 4-1, podemos empezar utilizando los datos históricos en la vista flights de BigQuery, pero necesitaremos transformarlos aún más. ¿Qué tipo de transformaciones son necesarias?

Transformaciones necesarias

Observa que FL_DATE es un Date mientras que DEP_TIME es un STRING. Esto se debe a que FL_DATE es de la forma 2015-07-03 para el 3 de julio de 2015, mientras que DEP_DATE es de la forma 1406 para las 2:06 p.m. hora local. Esto es desafortunado. No me preocupa la separación de la fecha y la hora en dos columnas: podemos remediarlo. Lo que es lamentable es que no haya un desfase de zona horaria asociado a la hora de salida. Así, en este conjunto de datos, una hora de salida de 1406 en distintas filas puede ser horas diferentes según la zona horaria del aeropuerto de origen.

Los desfases horarios (hay dos, uno para el aeropuerto de origen y otro para el de destino) no están presentes en los datos. Como el desfase depende de la ubicación del aeropuerto, tenemos que encontrar un conjunto de datos que contenga el desfase de la zona horaria de cada aeropuerto y luego mezclar estos datos con ese conjunto de datos.1 Para simplificar el análisis posterior, pondremos todas las horas de los datos en una zona horaria común: el Tiempo Universal Coordinado (UTC) es la elección tradicional de zona horaria común para los conjuntos de datos. Sin embargo, no podemos deshacernos de la hora local: necesitaremos la hora local para realizar análisis, como el retraso típico asociado a los vuelos matutinos frente a los vespertinos. Así que, aunque convertiremos las horas locales a UTC, también almacenaremos el desfase horario (por ejemplo, -3.600 minutos) para recuperar la hora local si es necesario.

Por tanto, vamos a realizar dos transformaciones en el conjunto de datos original. En primer lugar, convertiremos todos los campos de hora del conjunto de datos brutos a UTC. En segundo lugar, además de los campos presentes en los datos brutos, añadiremos al conjunto de datos tres campos para el aeropuerto de origen y los mismos tres campos para el aeropuerto de destino: la latitud, la longitud y el desfase horario. Estos campos se denominarán

DEP_AIRPORT_LAT, DEP_AIRPORT_LON, DEP_AIRPORT_TZOFFSET
ARR_AIRPORT_LAT, ARR_AIRPORT_LON, ARR_AIRPORT_TZOFFSET

La tercera transformación que tendremos que llevar a cabo es que, por cada fila del conjunto de datos históricos, tendremos que publicar varios eventos. Esto se debe a que sería demasiado tarde si esperamos a que el avión haya llegado para enviar un único evento que contenga todos los datos de la fila. Si lo hacemos en el momento de la salida del avión, nuestros modelos violarán las restricciones de causalidad. En su lugar, tendremos que enviar sucesos correspondientes a cada estado en que se encuentre el vuelo. Elijamos enviar cinco eventos para cada vuelo: cuando el vuelo se programa por primera vez, cuando el vuelo sale de la puerta de embarque, cuando el vuelo despega, cuando el vuelo aterriza y cuando el vuelo llega. Estos cinco eventos no pueden tener asociados todos los mismos datos, porque la conocibilidad de las columnas cambia durante el vuelo. Por ejemplo, al enviar un evento a la hora de salida, no conoceremos la hora de llegada. Por simplicidad, podemos notificar la misma estructura, pero tendremos que asegurarnos de que los datos incognoscibles se marcan con un null y no con el valor real del dato.

Arquitectura

La Tabla 4-1 enumera cuándo se pueden enviar esos eventos y los campos que se incluirán en cada uno de ellos.

Tabla 4-1. Campos que se incluirán en cada uno de los cinco eventos que se publicarán. Compara el orden de los campos con los del esquema de la Figura 4-1.
Evento Enviado a las (UTC) Campos incluidos en el mensaje de suceso
Programado CRS_DEP_TIME menos 7 días FL_DATE, UNIQUE_CARRIER, ORIGIN_AIRPORT_SEQ_ID, ORIGIN, DEST_AIRPORT_SEQ_ID, DEST, CRS_DEP_TIME [nulls], CRS_ARR_TIME [nulls], DISTANCE
Salida DEP_TIME Todos los campos disponibles en el mensaje programado, más:
  • DEP_TIME, DEP_DELAY CANCELLED
  • CANCELLATION_CODE
  • DEP_AIRPORT_LAT, DEP_AIRPORT_LON, DEP_AIRPORT_TZOFFSET
Desconexión de las ruedas WHEELS_OFF Todos los campos disponibles en el mensaje de salida, más: TAXI_OUT y WHEELS_OFF
Wheelson WHEELS_ON Todos los campos disponibles en el mensaje wheelsoff, más:
  • WHEELS_ON
  • DIVERTED
  • ARR_AIRPORT_LAT, ARR_AIRPORT_LON, ARR_AIRPORT_TZOFFSET
Llegada ARR_TIME Todos los campos disponibles en el mensaje wheelson, más: ARR_TIME y ARR_DELAY

Realizaremos las transformaciones necesarias y luego almacenaremos los datos transformados en una base de datos para que estén listos para que los utilice el código de simulación de eventos. La Figura 4-2 muestra los pasos que vamos a realizar en nuestra cadena de extracción-transformación-carga (ETL) y los pasos posteriores para simular un flujo de eventos a partir de estos eventos, y luego crear un cuadro de mando en tiempo real a partir del flujo de eventos simulado.

Figura 4-2. Pasos de nuestro canal ETL (extraer-transformar-cargar) para (a) transformar los datos brutos en eventos, (b) simular el flujo de eventos y (c) procesar el flujo de eventos para rellenar un panel de control en tiempo real.

Obtener información del aeropuerto

Para hacer la corrección horaria, necesitamos obtener la latitud y longitud de cada aeropuerto. La BTS tiene un conjunto de datos que contiene esta información, que podemos utilizar para hacer la búsqueda. Para mayor comodidad, he descargado los datos y los he puesto a disposición del público en gs://data-science-on-gcp/edition2/raw/airports.csv.

Examinemos los datos para determinar cómo obtener la latitud y longitud de los aeropuertos. En el Capítulo 2, cuando necesité explorar los datos de flights para crear el primer modelo de retrasos, cargué los datos en BigQuery.

¿Tenemos que importar todos los datos que nos comparten a nuestro conjunto de datos BigQuery para poder hacer la exploración? Por supuesto que no. Podemos consultar conjuntos de datos BigQuery en otros proyectos sin tener que hacer nuestras propias copias de los datos. En la cláusula FROM de la consulta BigQuery, todo lo que tenemos que hacer es especificar el nombre del proyecto en el que se encuentra el conjunto de datos:

SELECT 
  airline,
  AVG(departure_delay) AS avg_dep_delay
 FROM `bigquery-samples.airline_ontime_data.flights`
 GROUP BY airline
 ORDER by avg_dep_delay DESC

Pero, ¿qué ocurre si alguien comparte con nosotros un archivo de valores separados por comas (CSV)? ¿Tenemos que cargar los datos en BigQuery para ver lo que contiene el archivo? No.

BigQuery nos permite consultar datos en el Almacenamiento en la Nube a través de sus capacidades de consulta federada. Se trata de la capacidad de BigQuery para consultar datos que no están almacenados en el producto de almacén de datos, sino que operan sobre fuentes de datos como Google Sheets (una hoja de cálculo en Google Drive) o archivos en Cloud Storage. Así, podríamos dejar los archivos como CSV en el Almacenamiento en la Nube, definir una estructura de tablas sobre ellos, y consultar directamente los archivos CSV. Recuerda que sugerimos utilizar Almacenamiento en la Nube si tu patrón de análisis principal implica trabajar con tus datos a nivel de archivos planos: es una forma de aplicar ocasionalmente consultas SQL a tales conjuntos de datos.

El primer paso es obtener el esquema de estos archivos. Veamos la primera línea:

gsutil cat gs://data-science-on-gcp/edition2/raw/airports.csv | head -1

Obtenemos:

"AIRPORT_SEQ_ID","AIRPORT_ID","AIRPORT","DISPLAY_AIRPORT_NAME",
"DISPLAY_AIRPORT_CITY_NAME_FULL","AIRPORT_WAC_SEQ_ID2","AIRPORT_WAC",
"AIRPORT_COUNTRY_NAME","AIRPORT_COUNTRY_CODE_ISO","AIRPORT_STATE_NAME",
"AIRPORT_STATE_CODE","AIRPORT_STATE_FIPS","CITY_MARKET_SEQ_ID","CITY_MARKET_ID",
"DISPLAY_CITY_MARKET_NAME_FULL","CITY_MARKET_WAC_SEQ_ID2","CITY_MARKET_WAC",
"LAT_DEGREES","LAT_HEMISPHERE","LAT_MINUTES","LAT_SECONDS","LATITUDE",
"LON_DEGREES","LON_HEMISPHERE","LON_MINUTES","LON_SECONDS","LONGITUDE",
"UTC_LOCAL_TIME_VARIATION","AIRPORT_START_DATE","AIRPORT_THRU_DATE",
"AIRPORT_IS_CLOSED","AIRPORT_IS_LATEST"

Utiliza esta cabecera para escribir una cadena de esquema BigQuery del formato (especifica STRING para cualquier columna de la que no estés seguro, ya que siempre puedes CAST al formato adecuado cuando consultes los datos):

AIRPORT_SEQ_ID:INTEGER,AIRPORT_ID:STRING,AIRPORT:STRING, ...

Alternativamente, si tienes por ahí un conjunto de datos similar, parte de su esquema y edítalo:

bq show --format=prettyjson dsongcp.sometable > starter.json

Una vez que tenemos el esquema de los archivos GCS, podemos hacer una definición de tabla para la fuente federada:2

bq mk --external_table_definition= \
./airport_schema.json@CSV=gs://data-science-on-gcp/edition2/raw/airports.csv \
dsongcp.airports_gcs

Si visitas ahora la consola web de BigQuery, deberías ver una nueva tabla en el conjunto de datos dsongcp (recarga la página si es necesario). Se trata de una fuente de datos federada, en el sentido de que su almacenamiento sigue siendo el archivo CSV en Cloud Storage. Sin embargo, puedes consultarla como cualquier otra tabla de BigQuery:

SELECT 
AIRPORT_SEQ_ID, AIRPORT_ID, AIRPORT, DISPLAY_AIRPORT_NAME,
LAT_DEGREES, LAT_HEMISPHERE, LAT_MINUTES, LAT_SECONDS, LATITUDE
FROM dsongcp.airports_gcs
WHERE DISPLAY_AIRPORT_NAME LIKE '%Seattle%'

En la consulta anterior, intento encontrar en el archivo qué columna de aeropuerto y qué columna de latitud debo utilizar. El resultado indica que AIRPORT y LATITUDE son las columnas de interés, pero que hay varias filas correspondientes al aeropuerto SEA:

Row AIRPORT_​SEQ_​ID AIRPORT_​ID AIRPORT DISPLAY_​AIRPORT_​NAME LAT_​DEGREES LAT_​HEMISPHERE LAT_​MINUTES LAT_​SECONDS LATITUDE
1 1247701 12477 JFB Seattle 1st National.Bank Helipad 47 N 36 25 47.60694444
2 1474701 14747 SEA Seattle International 47 N 26 50 47.44722222
3 1474702 14747 SEA Seattle/Tacoma International 47 N 26 57 47.44916667
4 1474703 14747 SEA Seattle/Tacoma International 47 N 27 0 47.45

Afortunadamente, hay una columna que indica qué fila es la información más reciente, así que lo que tengo que hacer es

SELECT 
  AIRPORT, LATITUDE, LONGITUDE
FROM dsongcp.airports_gcs
WHERE AIRPORT_IS_LATEST = 1 AND AIRPORT = 'DFW'

Sin embargo, no te dejes llevar por las consultas federadas. Los usos más apropiados de las fuentes federadas implican conjuntos de datos relativamente pequeños que cambian con frecuencia y que necesitan unirse a grandes conjuntos de datos en tablas nativas de BigQuery. Dado que el almacenamiento en columnas de BigQuery es tan fundamental para su rendimiento, cargaremos la mayoría de los datos en el formato nativo de BigQuery.

Compartir datos

Ahora que tenemos el archivo airports.csv en el Almacenamiento en la Nube y el conjunto de datos de los aeropuertos en BigQuery, es muy probable que nuestros colegas también quieran utilizar estos datos. Compartámoslos con ellos: una de las ventajas de llevar tus datos a la nube (y, más concretamente, a un almacén de datos) es que permite mezclar conjuntos de datos más allá de los límites de la organización. Así que, a menos que tengas una razón clara para no hacerlo, como las precauciones de seguridad, intenta que tus datos sean ampliamente accesibles.

Los costes de la consulta corren a cargo de la persona que envía la consulta al motor BigQuery, por lo que no debes preocuparte de estar incurriendo en costes adicionales para tu división al hacer esto. Es posible hacer que un cubo GCS sea "solicitante-pagador" para obtener el mismo tipo de separación de facturación para los datos en el Almacenamiento en la Nube.

Compartir un conjunto de datos de almacenamiento en la nube

Para compartir algunos datos en el Almacenamiento en la Nube, utiliza gsutil:

gsutil -m acl ch -r -u abc@xyz.com:R gs://$BUCKET/data

En el comando anterior, el símbolo -m indica modo multihilo, el símbolo -r proporciona acceso de forma recursiva empezando por el directorio de nivel superior especificado, y el símbolo -u indica que se trata de un usuario al que se le concede acceso de lectura (:R).

Podríamos proporcionar acceso de lectura a toda la organización o a un grupo de Google utilizando -g:

gsutil -m acl ch -r -g xyz.com:R gs://$BUCKET/data

Compartir un conjunto de datos BigQuery

La compartición en BigQuery puede producirse en la granularidad de una columna, una tabla o un conjunto de datos. Ninguna de nuestras tablas BigQuery contiene información personal identificable o confidencial. Por lo tanto, no hay ninguna razón de peso para controlar el acceso a la información sobre vuelos a nivel de columna o tabla. Por lo tanto, podemos compartir el conjunto de datos dsongcp que se creó en el Capítulo 2, y podemos hacer que todas las personas de la organización que trabajan en este proyecto sean bigquery.user para que puedan realizar consultas en este conjunto de datos. Puedes hacerlo desde la consola web de BigQuery, en el menú del conjunto de datos.

En algunos casos, puede que tu conjunto de datos o tabla contenga determinadas columnas con información de identificación personal o confidencial. Puede que necesites restringir el acceso a esas columnas dejando el resto de la tabla accesible a un público más amplio.3 Siempre que necesites proporcionar acceso a un subconjunto de una tabla en BigQuery (ya sean columnas o filas concretas), puedes utilizar vistas. Coloca la propia tabla en un conjunto de datos accesible a un conjunto muy reducido de usuarios. A continuación, crea una vista sobre esta tabla que extraiga las columnas y filas relevantes y guarda esta vista en un conjunto de datos separado que tenga una accesibilidad más amplia. Tus usuarios sólo consultarán esta vista, y como la información de identificación personal o confidencial ni siquiera está presente en la vista, se reducen las posibilidades de filtración inadvertida.

Otra forma de restringir el acceso a nivel de una tabla BigQuery es utilizar Cloud IAM. Para controlar el acceso a nivel de una columna, utilizarías etiquetas de política y Catálogo de Datos.

Dataplex y Centro de Análisis

Una vez que adquieres el hábito de compartir datos ampliamente, la gobernanza puede volverse problemática. Es mejor si puedes administrar los datos a través del Almacenamiento en la Nube de forma coherente y hacer un seguimiento del linaje, etc. Para eso está Dataplex.

Puede resultar bastante engorroso compartir tablas y conjuntos de datos de uno en uno con un usuario o un grupo a la vez. Para poner en práctica la compartición a escala y obtener estadísticas sobre cómo utiliza la gente los datos que has compartido, utiliza Analytics Hub.

Corrección horaria

Corregir las horas comunicadas en hora local a UTC no es una tarea sencilla. Hay varios pasos:

  • La hora local depende de, bueno, la ubicación. Los datos de vuelo de que disponemos sólo registran el nombre del aeropuerto (por ejemplo, ALB para Albany). Por tanto, necesitamos obtener la latitud y la longitud a partir del código del aeropuerto. El BTS tiene un conjunto de datos que contiene esta información, que podemos utilizar para hacer la búsqueda.

  • Dado un par de latitud/longitud, tenemos que buscar la zona horaria en un mapa de zonas horarias mundiales. Por ejemplo, dada la latitud y longitud del aeropuerto de Albany, necesitaríamos recuperar America/New_York. Hay varios servicios web que hacen esto, pero el paquete Python timezonefinder es una opción más eficaz porque funciona completamente fuera de línea. El inconveniente es que este paquete no maneja las zonas oceánicas y algunos cambios históricos de huso horario,4 pero es una compensación que podemos hacer por ahora.

  • El desfase horario (con respecto a la hora del meridiano de Greenwich [GMT/UTC]) en un lugar cambia durante el año debido a las correcciones del horario de verano. En Nueva York, por ejemplo, hay seis horas en verano y cinco horas en invierno de retraso respecto a la UTC. Por lo tanto, dada la zona horaria (America/New_York), también necesitamos la fecha y hora de salida local (digamos el 13 de enero de 2015, a las 14:08) para encontrar el desfase horario correspondiente. El paquete de Python pytz proporciona esta capacidad utilizando el sistema operativo subyacente.

El problema de las horas ambiguas sigue existiendo: cada instante entre la 01:00 y las 02:00 hora local ocurre dos veces el día en que el reloj cambia del horario de verano al horario estándar (horario de invierno). Por tanto, si nuestro conjunto de datos tiene un vuelo que llega a la 01:30, tenemos que elegir qué hora representa. En una situación del mundo real, mirarías la duración típica del vuelo y elegirías la que fuera más probable. A efectos de este libro, siempre asumiré la hora de invierno (es decir, is_dst es False) por la dudosa razón de que es la zona horaria estándar para ese lugar.

La complejidad de estos pasos debería, espero, convencerte de que sigas las buenas prácticas a la hora de almacenar el tiempo.

Apache Beam/Cloud Dataflow

La forma canónica de construir canalizaciones de datos en Google Cloud Platform es utilizar Cloud Dataflow. Cloud Dataflow es una externalización de las tecnologías llamadas Flume y MillWheel, de uso generalizado en Google desde hace varios años. Emplea un modelo de programación que maneja los datos por lotes y en flujo de manera uniforme, lo que permite utilizar el mismo código base para el procesamiento por lotes y en flujo continuo. El código en sí está escrito en Apache Beam, ya sea en Java, Python o Go,5 y es portable en el sentido de que puede ejecutarse en múltiples entornos de ejecución, incluidos Apache Flink y Apache Spark. En GCP, Cloud Dataflow proporciona un servicio totalmente gestionado (sin servidor) capaz de ejecutar pipelines Beam. Los recursos se asignan bajo demanda, y se autoescalan para conseguir tanto una latencia mínima como una alta utilización de los recursos.

La programación de haces consiste en construir una canalización (una serie de transformaciones de datos) que se envía a un ejecutor. El ejecutor construirá un grafo y luego transmitirá datos a través de él. Cada conjunto de datos de entrada procede de una fuente y cada conjunto de datos de salida se envía a un sumidero. La Figura 4-3 ilustra el canal Beam que vamos a construir.

Compara los pasos de la Figura 4-2 con el diagrama de bloques del canal ETL (extraer-transformar-cargar) de la Figura 4-3. Construyamos el canal de datos pieza a pieza.

Figura 4-3. El canal de flujo de datos que vamos a construir.

Análisis de datos de aeropuertos

Puedes descargar información sobre la ubicación de los aeropuertos de la página web de la BTS. Seleccioné todos los campos, descargué el archivo CSV en mi disco duro local, lo extraje y lo comprimí con gzip. El archivo gzipped de aeropuertos está disponible en el repositorio GitHub de este libro.

Para poder utilizar Apache Beam desde Cloud Shell, necesitamos instalarlo en nuestro entorno Python. Instala también en este momento los paquetes de zona horaria:6

virtualenv ~/beam_env
source ~/beam_env/bin/activate
python3 -m pip install --upgrade \
             timezonefinder pytz \
             'apache-beam[gcp]'

La transformación Read de la tubería Beam que viene a continuación lee el archivo de los aeropuertos línea por línea:7

with beam.Pipeline('DirectRunner') as pipeline:
   airports = (pipeline
      | beam.io.ReadFromText('airports.csv.gz')
      | beam.Map(lambda line: next(csv.reader([line])))
      | beam.Map(lambda fields: (fields[0], (fields[21], fields[26])))
   )

Por ejemplo, supongamos que una de las líneas de entrada leídas del archivo de texto fuente es la siguiente:

1000401,10004,"04A","Lik Mining Camp","Lik, AK",101,1,"United
States","US","Alaska","AK","02",3000401,30004,"Lik,
AK",101,1,68,"N",5,0,68.08333333,163,"W",10,0,-163.16666667,"",2007-07-01,,0,1,

El primer Map toma esta línea y la pasa a un lector CSV que la analiza (teniendo en cuenta campos como Lik, AK que tienen comas) y extrae los campos como una lista de cadenas. Estos campos se pasan a la siguiente transformación. La segunda Map toma los campos como entrada y saca una tupla de la forma (los campos extraídos aparecen en negrita en el ejemplo anterior):

(1000401, (68.08333333,-163.16666667))

El primer número es el código único del aeropuerto (lo utilizamos, en lugar del código de tres letras del aeropuerto, porque las ubicaciones de los aeropuertos pueden cambiar con el tiempo), y los dos números siguientes son el par latitud/longitud de la ubicación del aeropuerto. La variable airports, que es el resultado de estas tres transformaciones, no es una simple lista en memoria de estas tuplas. En su lugar, es una colección inmutable, denominada PCollection, que puedes sacar de la memoria y distribuir.

Podemos escribir el contenido del PCollection en un archivo de texto para verificar que la tubería se comporta correctamente:

(airports
   | beam.Map(lambda airport_data: '{},{}'.format(airport_data[0], ',' \
       .join(airport_data[1])) )
   | beam.io.WriteToText('extracted_airports')
)

Prueba esto: el código, en 04_streaming/transform/df01.py, no es más que un programa Python que puedes ejecutar desde la línea de comandos. En primer lugar, instala el paquete Apache Beam si aún no lo has hecho y, a continuación, ejecuta el programa df01. py mientras te encuentras en el directorio que contiene el repositorio de GitHub de este libro:

cd 04_streaming/simulate
./install_packages.sh
python3 ./df01.py

Esto ejecuta localmente el código de df01.py. Más adelante, cambiaremos la línea de la tubería por:

with beam.Pipeline('DataflowRunner') as pipeline:

y conseguir ejecutar la canalización en Google Cloud Platform utilizando el servicio Cloud Dataflow. Con ese cambio, basta con ejecutar el programa Python para lanzar la canalización de datos en varios trabajadores en la nube. Como ocurre con muchos sistemas distribuidos, la salida de Cloud Dataflow se fragmenta potencialmente en uno o más archivos. Obtendrás un archivo cuyo nombre empieza por "aeropuertos_extraídos" (el mío era aeropuertos_extraídos-00000-de-00001), algunas de cuyas líneas pueden tener un aspecto parecido a éste:

1000101,58.10944444,-152.90666667
1000301,65.54805556,-161.07166667

Las columnas son AIRPORT_SEQ_ID, LATITUDE, y LONGITUDE-el orden de las filas que obtienes depende de cuál de los trabajadores paralelos terminó primero, por lo que podría ser diferente.

Añadir información sobre la zona horaria

Cambiemos ahora el código para determinar la zona horaria correspondiente a un par latitud/longitud. En nuestra cadena, en lugar de emitir simplemente el par latitud/longitud, emitimos una lista de tres elementos: latitud, longitud y zona horaria:

airports = (pipeline
      | beam.Read(beam.io.ReadFromText('airports.csv.gz'))
      | beam.Map(lambda line: next(csv.reader([line])))
      | beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26])))
   )

La palabra clave lambda en Python crea una función anónima. En el caso del primer uso de lambda en el fragmento anterior, ese método toma un parámetro (line) y devuelve lo que sigue a los dos puntos. Podemos determinar la zona horaria utilizando el paquete timezonefinder:8

def addtimezone(lat, lon):
      import timezonefinder
      tf = timezonefinder.TimezoneFinder()
      lat = float(lat)
      lon = float(lon)
      return (lat, lon, tf.timezone_at(lng=lon, lat=lat))

La ubicación de la sentencia import en el ejemplo anterior puede parecer extraña (la mayoría de las importaciones de Python suelen ir al principio del archivo), pero este patrón de importación dentro de la función lo recomienda Cloud Dataflow para que9 cuando lo enviemos a la nube, el picking de la sesión principal no acabe pickeando también los paquetes importados.10

Por ahora, sin embargo, vamos a ejecutar esto(df02.py) localmente. Esto llevará un tiempo porque el cálculo de la zona horaria implica un gran número de comprobaciones de intersección de polígonos y porque lo estamos ejecutando localmente, no (¡todavía!) distribuido en la nube. Así que vamos a acelerarlo añadiendo un filtro para reducir el número de ubicaciones de aeropuertos que tenemos que buscar:

 | beam.io.ReadFromText('airports.csv.gz')
 | beam.Filter(lambda line: "United States" in line 
                              and line[-2:] == '1,')

Los datos de retrasos de vuelos del BTS sólo corresponden a vuelos nacionales de EEUU, por lo que no necesitamos las zonas horarias de los aeropuertos de fuera de EEUU. La razón de la segunda comprobación es que las ubicaciones de los aeropuertos cambian con el tiempo, pero a nosotros sólo nos interesa la ubicación actual del aeropuerto. Por ejemplo, aquí están las ubicaciones de los aeropuertos para ORD (o Chicago):

1393001,...,"ORD",...,41.97805556,...,-87.90611111,...,1950-01-01,2011-06-30,0,0,
1393002,...,"ORD",...,41.98166667,...,-87.90666667,...,2011-07-01,2013-09-30,0,0,
1393003,...,"ORD",...,41.97944444,...,-87.90750000,...,2013-10-01,2015-09-30,0,0,
1393004,...,"ORD",...,41.97722222,...,-87.90805556,...,2015-10-01,,0,1,

La primera fila recoge la ubicación del aeropuerto de Chicago entre 1950 y el 30 de junio de 2011.11 La segunda fila es válida desde el 1 de julio de 2011 hasta el 30 de septiembre de 2013. La última fila, sin embargo, es la ubicación actual y esto viene marcado porque la última columna (el campo AIRPORT_IS_LATEST ) es 1.

Sin embargo, ¡ésa no es la única fila que nos interesa! Los vuelos anteriores a 2015-10-01 informarán del ID de la penúltima fila. Podríamos añadir una comprobación para esto, pero parece bastante arriesgado por un poco de optimización. Así que eliminaré esa última comprobación, de forma que sólo tengamos

 | beam.io.ReadFromText('airports.csv.gz')
 | beam.Filter(lambda line: "United States" in line)

Una vez hecho esto y ejecutado df02.py, la información extraída de los aeropuertos tiene este aspecto:

1672301,62.03611111,-151.45222222,America/Anchorage
1672401,43.87722222,-73.41305556,America/New_York
1672501,40.75722222,-119.21277778,America/Los_Angeles

La última columna de la información extraída contiene la zona horaria, que se determinó a partir de la latitud y longitud de cada aeropuerto.

Convertir horas a UTC

Ahora que tenemos la zona horaria de cada aeropuerto, estamos preparados para abordar la conversión de las horas de los datos de flights a UTC. En el momento en que estamos desarrollando el programa, preferiríamos no procesar todos los meses que tenemos en BigQuery: esperar la consulta cada vez que ejecutemos el programa será molesto. En lugar de eso, crearemos una pequeña muestra de los datos de flights en BigQuery con la que desarrollar nuestro código:12

SELECT *
FROM dsongcp.flights
WHERE RAND() < 0.001

Esto devuelve unas 6.000 filas. Podemos utilizar la interfaz web de BigQuery para guardar estos resultados como un archivo JSON (JavaScript Object Notation). Sin embargo, yo prefiero hacer un script:13

bq query --destination_table dsongcp.flights_sample \
   --replace --nouse_legacy_sql \
   'SELECT * FROM dsongcp.flights WHERE RAND() < 0.001'

bq extract --destination_format=NEWLINE_DELIMITED_JSON \
   dsongcp.flights_sample  \
   gs://${BUCKET}/flights/ch4/flights_sample.json

gsutil cp gs://${BUCKET}/flights/ch4/flights_sample.json

Esto crea un archivo llamado flight_sample.json, una de cuyas filas tiene un aspecto similar al siguiente:

{"FL_DATE":"2015-04-28","UNIQUE_CARRIER":"EV","ORIGIN_AIRPORT_SEQ_ID":"1013503",
"ORIGIN":"ABE","DEST_AIRPORT_SEQ_ID":"1039705","DEST":"ATL",
"CRS_DEP_TIME":"1600","DEP_TIME":"1555","DEP_DELAY":-5,"TAXI_OUT":7,
"WHEELS_OFF":"1602","WHEELS_ON":"1747","TAXI_IN":4,"CRS_ARR_TIME":"1809",
"ARR_TIME":"1751","ARR_DELAY":-18,"CANCELLED":false,"DIVERTED":false,
"DISTANCE":"692.00"}

La lectura de los datos de los vuelos comienza de forma similar a la lectura de los datos de los aeropuertos:14

flights = (pipeline
 | 'flights:read' >> beam.io.ReadFromText('flights_sample.json')
 | 'flights:parse' >> beam.Map(lambda line: json.loads(line))

Se trata del mismo código que cuando leemos el archivo aeropuertos.csv.gz, salvo que también estoy dando un nombre (flights:read) a este paso de la transformación y utilizando un analizador sintáctico JSON en lugar de un analizador sintáctico CSV. Fíjate en la sintaxis

 | 'name-of-step' >> transform_function()

El siguiente paso, sin embargo, es diferente porque implica dos PCollections. Tenemos que unir los datos de los vuelos con los datos de los aeropuertos para encontrar la zona horaria correspondiente a cada vuelo. Para ello, convertimos los aeropuertos PCollection en una "entrada lateral". Las entradas laterales en Beam son como vistas en el PCollection original, y son listas o dicts (diccionarios). En este caso, crearemos un diccionario que relacione el ID del aeropuerto con información sobre los aeropuertos:

flights = (pipeline
 |'flights:read' >> beam.io.ReadFromText('flights_sample.json')
 | 'flights:parse' >> beam.Map(lambda line: json.loads(line))
 |'flights:tzcorr' >> beam.FlatMap(tz_correct, 
                                   beam.pvalue.AsDict(airports))
)

El hecho de que el PCollection tenga que ser una lista Python o un dict Python significa que las entradas laterales tienen que ser lo suficientemente pequeñas como para caber en memoria. Si necesitas unir dos PCollections grandes que no quepan en la memoria, utiliza un CoGroupByKey.

El método FlatMap() llama a un método tz_correct(), que toma el contenido analizado de una línea de flights_sample.json (que contiene la información de un solo vuelo) y un diccionario Python (que contiene la información de la zona horaria de todos los aeropuertos):

def tz_correct(fields, airport_timezones):
   try:
      # convert all times to UTC
      # ORIGIN_AIRPORT_SEQ_ID is the name of JSON attribute
      dep_airport_id = fields["ORIGIN_AIRPORT_SEQ_ID"]
      arr_airport_id = fields["DEST_AIRPORT_SEQ_ID"]
      # airport_id is the key to airport_timezones dict
      # and the value is a tuple (lat, lon, timezone)
      dep_timezone = airport_timezones[dep_airport_id][2]
      arr_timezone = airport_timezones[arr_airport_id][2]

      for f in ["CRS_DEP_TIME", "DEP_TIME", "WHEELS_OFF"]:
         fields[f] = as_utc(fields["FL_DATE"], fields[f], dep_timezone)
      for f in ["WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
         fields[f] = as_utc(fields["FL_DATE"], fields[f], arr_timezone)

      yield json.dumps(fields)
   except KeyError as e:
      logging.exception(" Ignoring " + line + 
                        " because airport is not known")

¿Por qué FlatMap() en lugar de Map para llamar a tz_correct()? Un Map es una relación 1 a 1 entre entrada y salida, mientras que un FlatMap() puede devolver 0-N salidas por entrada. La forma en que lo hace es con una función generadora de Python (es decir, la palabra clave yield: piensa en yield como un retorno que devuelve un elemento cada vez hasta que no haya más datos que devolver). Utilizar FlatMap aquí nos permite ignorar cualquier información de vuelo correspondiente a aeropuertos desconocidos-aunque esto no ocurre en los datos históricos que estamos procesando, un poco de programación defensiva no viene mal.

El código tz_correct() obtiene el ID del aeropuerto de salida de los datos del vuelo y luego busca la zona horaria para ese ID de aeropuerto en los datos del aeropuerto. Una vez que tiene la zona horaria, llama al método as_utc() para que convierta a UTC cada una de las horas de las que se informa en la zona horaria de ese aeropuerto:

def as_utc(date, hhmm, tzone):
   try:
      if len(hhmm) > 0 and tzone is not None:
         import datetime, pytz
         loc_tz = pytz.timezone(tzone)
         loc_dt = loc_tz.localize(datetime.datetime.strptime(date,'%Y-%m-%d'),
                                  is_dst=False)
         loc_dt += datetime.timedelta(hours=int(hhmm[:2]),
                                      minutes=int(hhmm[2:]))
         utc_dt = loc_dt.astimezone(pytz.utc)
         return utc_dt.strftime('%Y-%m-%d %H:%M:%S')
      else:
         return '' # empty string corresponds to canceled flights
   except ValueError as e:
      print('{} {} {}'.format(date, hhmm, tzone))
      raise e

Como antes, puedes ejecutarlo localmente. Para ello, ejecuta df03.py. Una línea que originalmente (en los datos brutos) tenía el siguiente aspecto:

{"FL_DATE":"2015-11-05","UNIQUE_CARRIER":"DL","ORIGIN_AIRPORT_SEQ_ID":"1013503",
"ORIGIN":"ABE","DEST_AIRPORT_SEQ_ID":"1039705","DEST":"ATL",
"CRS_DEP_TIME":"0600","DEP_TIME":"0556","DEP_DELAY":-4,"TAXI_OUT":12,
"WHEELS_OFF":"0608","WHEELS_ON":"0749","TAXI_IN":10,"CRS_ARR_TIME":"0818",
"ARR_TIME":"0759","ARR_DELAY":-19,"CANCELLED":false,
"DIVERTED":false,"DISTANCE":"692.00"}

ahora se convierte en:

{"FL_DATE": "2015-11-05", "UNIQUE_CARRIER": "DL", 
"ORIGIN_AIRPORT_SEQ_ID": "1013503", "ORIGIN": "ABE", 
"DEST_AIRPORT_SEQ_ID": "1039705", "DEST": "ATL", 
"CRS_DEP_TIME": "2015-11-05 11:00:00", "DEP_TIME": "2015-11-05 10:56:00", 
"DEP_DELAY": -4, "TAXI_OUT": 12, "WHEELS_OFF": "2015-11-05 11:08:00", 
"WHEELS_ON": "2015-11-05 12:49:00", "TAXI_IN": 10, 
"CRS_ARR_TIME": "2015-11-05 13:18:00", "ARR_TIME": "2015-11-05 12:59:00", 
"ARR_DELAY": -19, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "692.00"}

Todas las horas se han convertido a UTC. Por ejemplo, la hora 0759 de llegada a Atlanta se ha convertido a UTC para ser las 12:59:00.

Corrección de fechas

Observa atentamente la siguiente línea relativa a un vuelo de Honolulu (HNL) a Dallas-Fort Worth (DFW). ¿Notas algo extraño?

{"FL_DATE": "2015-03-06", "UNIQUE_CARRIER": "AA", 
"ORIGIN_AIRPORT_SEQ_ID": "1217302", "ORIGIN": "HNL", 
"DEST_AIRPORT_SEQ_ID": "1129803", "DEST": "DFW", 
"CRS_DEP_TIME": "2015-03-07 05:30:00", "DEP_TIME": "2015-03-07 05:22:00", 
"DEP_DELAY": -8, "TAXI_OUT": 40, "WHEELS_OFF": "2015-03-07 06:02:00", 
"WHEELS_ON": "2015-03-06 12:32:00", "TAXI_IN": 7, 
"CRS_ARR_TIME": "2015-03-06 12:54:00", "ARR_TIME": "2015-03-06 12:39:00", 
"ARR_DELAY": -15, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "3784.00"}

Examina la hora de salida en Honolulú y la hora de llegada en Dallas: ¡el vuelo llega el día antes de salir! Eso es porque la fecha del vuelo (2015-03-06) es la fecha de salida en hora local. Si añades una diferencia horaria entre aeropuertos, es muy posible que no sea la fecha de llegada. Buscaremos estas situaciones y añadiremos 24 horas si es necesario. Esto es, por supuesto, todo un truco (¡¿he mencionado ya que las horas deberían almacenarse en UTC?!):

def add_24h_if_before(arrtime, deptime):
   import datetime
   if len(arrtime) > 0 and len(deptime) > 0 and arrtime < deptime:
      adt = datetime.datetime.strptime(arrtime, '%Y-%m-%d %H:%M:%S')
      adt += datetime.timedelta(hours=24)
      return adt.strftime('%Y-%m-%d %H:%M:%S')
   else:
      return arrtime

El hack de 24 horas se llama justo antes del rendimiento en tz_correct.15 Ahora que tenemos nuevos datos sobre los aeropuertos, probablemente sea conveniente añadirlos a nuestro conjunto de datos. Además, como hemos señalado antes, queremos mantener un registro del desfase horario con respecto a UTC, porque algunos tipos de análisis pueden requerir el conocimiento de la hora local. Así, el nuevo código tz_correct pasa a ser el siguiente:

def tz_correct(line, airport_timezones):
   fields = json.loads(line)
   try:
      # convert all times to UTC
      dep_airport_id = fields["ORIGIN_AIRPORT_SEQ_ID"]
      arr_airport_id = fields["DEST_AIRPORT_SEQ_ID"]
      dep_timezone = airport_timezones[dep_airport_id][2]
      arr_timezone = airport_timezones[arr_airport_id][2]

      for f in ["CRS_DEP_TIME", "DEP_TIME", "WHEELS_OFF"]:
         fields[f], deptz = as_utc(fields["FL_DATE"], fields[f], dep_timezone)
      for f in ["WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
         fields[f], arrtz = as_utc(fields["FL_DATE"], fields[f], arr_timezone)

      for f in ["WHEELS_OFF", "WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
         fields[f] = add_24h_if_before(fields[f], fields["DEP_TIME"])

      fields["DEP_AIRPORT_TZOFFSET"] = deptz
      fields["ARR_AIRPORT_TZOFFSET"] = arrtz
      yield json.dumps(fields)
   except KeyError as e:
      logging.exception(" Ignoring " + line + " because airport is not known")

Cuando ejecuto df04.py, al que se le han aplicado estos cambios, el vuelo de Honolulú a Dallas se convierte en:

{"FL_DATE": "2015-03-06", "UNIQUE_CARRIER": "AA", 
"ORIGIN_AIRPORT_SEQ_ID": "1217302", "ORIGIN": "HNL", 
"DEST_AIRPORT_SEQ_ID": "1129803", "DEST": "DFW", 
"CRS_DEP_TIME": "2015-03-07 05:30:00", "DEP_TIME": "2015-03-07 05:22:00", 
"DEP_DELAY": -8, "TAXI_OUT": 40, "WHEELS_OFF": "2015-03-07 06:02:00", 
"WHEELS_ON": "2015-03-07 12:32:00", "TAXI_IN": 7, 
"CRS_ARR_TIME": "2015-03-07 12:54:00", "ARR_TIME": "2015-03-07 12:39:00", 
"ARR_DELAY": -15, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "3784.00", 
"DEP_AIRPORT_TZOFFSET": -36000.0, "ARR_AIRPORT_TZOFFSET": -21600.0}

Como puedes ver, ahora se han corregido las fechas (ver las partes en negrita).

Crear eventos

Cuando tengamos nuestros datos corregidos en el tiempo, podremos pasar a crear eventos para publicarlos en Pub/Sub. Por ahora, nos limitaremos a los mensajes departed y arrived; podemos volver a ejecutar el proceso para crear los eventos adicionales cuando nuestro trabajo de modelado comience a utilizar otros eventos:

def get_next_event(fields):
    if len(fields["DEP_TIME"]) > 0:
       event = dict(fields)  # copy
       event["EVENT_TYPE"] = "departed"
       event["EVENT_TIME"] = fields["DEP_TIME"]
       for f in ["TAXI_OUT", "WHEELS_OFF", "WHEELS_ON", 
                 "TAXI_IN", "ARR_TIME", "ARR_DELAY", "DISTANCE"]:
          event.pop(f, None)  # not knowable at departure time
       yield event
    if len(fields["ARR_TIME"]) > 0:
       event = dict(fields)
       event["EVENT_TYPE"] = "arrived"
       event["EVENT_TIME"] = fields["ARR_TIME"]
       yield event

Esencialmente, recogemos la hora de salida y creamos un evento departed a esa hora, tras asegurarnos de eliminar los campos (como el retraso de llegada) que no podemos conocer a la hora de salida. Del mismo modo, utilizamos la hora de llegada para crear un evento arrived, como se muestra en la Figura 4-4.

En la cadena, el código de creación de eventos se llama en la flights PCollection después de que se haya producido la conversión a UTC:

flights = (pipeline
  |'flights:read' >> beam.io.ReadFromText('flights_sample.json')
  |'flights:tzcorr' >> beam.FlatMap(tz_correct,
                           beam.pvalue.AsDict(airports))
)
events = flights | beam.FlatMap(get_next_event)
Figura 4-4. Eventos, cuándo se publican, y algunos de los campos presentes en esos eventos.

Si ahora ejecutamos la canalización16 veremos dos eventos para cada vuelo:

{"FL_DATE": "2015-04-28", "UNIQUE_CARRIER": "EV", 
"ORIGIN_AIRPORT_SEQ_ID": "1013503", "ORIGIN": "ABE", 
"DEST_AIRPORT_SEQ_ID": "1039705", "DEST": "ATL", 
"CRS_DEP_TIME": "2015-04-28 20:00:00", "DEP_TIME": "2015-04-28 19:55:00",
"DEP_DELAY": -5, "CRS_ARR_TIME": "2015-04-28 22:09:00", "CANCELLED": false, 
"DIVERTED": false, "DEP_AIRPORT_TZOFFSET": -14400.0, 
"ARR_AIRPORT_TZOFFSET": -14400.0, "EVENT_TYPE": "departed", 
"EVENT_TIME": "2015-04-28 19:55:00"}
{"FL_DATE": "2015-04-28", "UNIQUE_CARRIER": "EV", 
"ORIGIN_AIRPORT_SEQ_ID": "1013503", "ORIGIN": "ABE", 
"DEST_AIRPORT_SEQ_ID": "1039705", "DEST": "ATL", 
"CRS_DEP_TIME": "2015-04-28 20:00:00", "DEP_TIME": "2015-04-28 19:55:00",
"DEP_DELAY": -5, "TAXI_OUT": 7, "WHEELS_OFF": "2015-04-28 20:02:00", 
"WHEELS_ON": "2015-04-28 21:47:00", "TAXI_IN": 4, 
"CRS_ARR_TIME": "2015-04-28 22:09:00", "ARR_TIME": "2015-04-28 21:51:00", 
"ARR_DELAY": -18, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "692.00",
"DEP_AIRPORT_TZOFFSET": -14400.0, "ARR_AIRPORT_TZOFFSET": -14400.0, 
"EVENT_TYPE": "arrived", "EVENT_TIME": "2015-04-28 21:51:00"}

El primer evento es un evento departed y debe publicarse a la hora de salida, mientras que el segundo evento es un evento arrived y debe publicarse a la hora de llegada. El evento departed tiene una serie de campos que faltan y que corresponden a datos que no se conocen en ese momento.

Una vez que tengamos este código funcionando, vamos a añadir un tercer evento que se enviará cuando el avión despegue:

    if len(fields["WHEELS_OFF"]) > 0:
       event = dict(fields)  # copy
       event["EVENT_TYPE"] = "wheelsoff"
       event["EVENT_TIME"] = fields["WHEELS_OFF"]
       for f in ["WHEELS_ON", "TAXI_IN", 
                 "ARR_TIME", "ARR_DELAY", "DISTANCE"]:
          event.pop(f, None)  # not knowable at departure time
       yield event

En este momento, aún no hemos creado un evento de descenso de ruedas.

Leer y escribir en la nube

Hasta ahora, hemos estado leyendo y escribiendo archivos locales. Sin embargo, una vez que empezamos a ejecutar nuestro código en producción, en un entorno sin servidor, el concepto de unidad local deja de tener sentido. Tenemos que leer y escribir desde el Almacenamiento en la Nube. Además, como se trata de datos estructurados, es preferible leer y escribir en BigQuery -recordemos que cargamos nuestro conjunto de datos completo en BigQuery en el Capítulo 2-. Ahora, nos gustaría poner allí también los datos transformados (corregidos en el tiempo).

Afortunadamente, todo lo que esto implica es cambiar la fuente o el sumidero. El resto de la cadena permanece igual. Por ejemplo, en la sección anterior (ver 04_streaming/transform/df05.py), leemos los aeropuertos.csv.gz como:

| 'airports:read' >> beam.io.ReadFromText('airports.csv.gz')

Ahora, para leer el archivo equivalente del Almacenamiento en la Nube, cambiamos el código correspondiente en 04_streaming/transform/df06.py para que sea

airports_filename = 'gs://{}/flights/airports/airports.csv.gz'.format(
                          bucket)
...
| 'airports:read' >> beam.io.ReadFromText(airports_filename)

Por supuesto, tendremos que asegurarnos de subir el archivo a Cloud Storage y hacerlo legible para quien vaya a ejecutar este código. Tener el archivo de datos disponible en nuestro repositorio de GitHub no iba a escalar de todos modos: Cloud Storage (o BigQuery) es el lugar adecuado para los datos.

En df05.py, tuve que leer un archivo local que contenía la exportación JSON de una parte inteligente del conjunto de datos y utilizar un analizador JSON para obtener un dict:

 | 'flights:read' >> beam.io.ReadFromText('flights_sample.json')
 | 'flights:parse' >> beam.Map(lambda line: json.loads(line))

En df06.py, el código correspondiente se simplifica porque el lector BigQuery devuelve un dict en el que los nombres de las columnas del conjunto de resultados son las claves:

'flights:read' >> beam.io.ReadFromBigQuery(
      query='SELECT * FROM dsongcp.flights WHERE rand() < 0.001', 
      use_standard_sql=True)

Por supuesto, cuando la ejecutemos de verdad, cambiaremos la consulta para eliminar el muestreo (rand() < 0.001) y poder procesar todo el conjunto de datos.

Del mismo modo, donde antes escribíamos en un archivo local utilizando:

  | 'flights:tostring' >> beam.Map(lambda fields: json.dumps(fields))
  | 'flights:out' >> beam.io.textio.WriteToText('all_flights')

cambiaremos el código para escribir en el Almacenamiento en la Nube utilizando:

 flights_output = 'gs://{}/flights/tzcorr/all_flights'.format(bucket)
... 
 | 'flights:tostring' >> beam.Map(lambda fields: json.dumps(fields))
 | 'flights:gcsout' >> beam.io.textio.WriteToText(flights_output)

También podemos escribir los mismos datos en una tabla BigQuery:

flights_schema = \
     'FL_DATE:date,UNIQUE_CARRIER:string,...CANCELLED:boolean'
...
 | 'flights:bqout' >> beam.io.WriteToBigQuery(
     'dsongcp.flights_tzcorr', schema=flights_schema,
     write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
    )

Ten en cuenta que tenemos que proporcionar un esquema al escribir en BigQuery, y especificar qué hacer si la tabla ya existe (pedimos que se trunque la tabla y se sustituya el contenido) y si la tabla aún no existe (pedimos que se cree la tabla).

Podemos intentar ejecutar este código, pero la canalización requerirá algunos parámetros adicionales. Así que donde antes teníamos

  with beam.Pipeline('DirectRunner') as pipeline:

que ahora necesitamos:

  argv = [
      '--project={0}'.format(project),
      '--staging_location=gs://{0}/flights/staging/'.format(bucket),
      '--temp_location=gs://{0}/flights/temp/'.format(bucket),
      '--runner=DirectRunner'
   ]
   with beam.Pipeline(argv=argv) as pipeline:

La razón es que cuando leemos desde BigQuery, estamos proporcionando una consulta:

'flights:read' >> beam.io.ReadFromBigQuery(
      query='SELECT * FROM dsongcp.flights WHERE rand() < 0.001', 
      use_standard_sql=True)

Por tanto, tenemos que proporcionar el proyecto que debe facturarse. Además, y esto es un detalle de implementación, algunos datos temporales necesitan ser almacenados y almacenados en caché en el Almacenamiento en la Nube, y tenemos que proporcionar a la canalización un lugar para almacenar estos datos temporales -nunca estaremos seguros de qué operaciones requerirán almacenamiento o almacenamiento en caché, por lo que es una buena idea especificar siempre una ubicación en el Almacenamiento en la Nube para este propósito.

Podemos ejecutar df06.py y comprobar entonces que se crean nuevas tablas en BigQuery. Hasta ahora, hemos estado ejecutando el código localmente, ya sea en tu portátil o en Cloud Shell.

A continuación, vamos a ver cómo ejecutar esto en Cloud Dataflow, que es el servicio gestionado de GCP para ejecutar pipelines de Apache Beam.

Ejecutar la tubería en la nube

Esa última ejecución tardó unos minutos en la máquina virtual (MV) local, ¡y sólo procesábamos mil líneas! Cambiemos el código (ver df07.py) para procesar todas las filas de la vista BigQuery:

'flights:read' >> beam.io.ReadFromBigQuery(
      query='SELECT * FROM dsongcp.flights', 
      use_standard_sql=True)

Ahora que tenemos muchos más datos, necesitamos distribuir el trabajo, y para ello cambiaremos el corredor de DirectRunner (que se ejecuta localmente) a DataflowRunner (que envía el trabajo a la nube y lo escala):

   argv = [
      '--project={0}'.format(project),
      '--job_name=ch04timecorr',
      '--save_main_session',
      '--staging_location=gs://{0}/flights/staging/'.format(bucket),
      '--temp_location=gs://{0}/flights/temp/'.format(bucket),
      '--setup_file=./setup.py',
      '--max_num_workers=8',
      '--region={}'.format(region),
      '--runner=DataflowRunner'
   ]

   pipeline = beam.Pipeline(argv=argv)

Observa que ahora hay algunos parámetros adicionales:

  • El nombre del trabajo proporciona el nombre con el que se listará este trabajo en la consola de GCP. Esto es para que podamos solucionar los problemas del trabajo si es necesario.

  • Pedimos al código de envío del flujo de datos que guarde nuestra sesión principal. Esto es necesario siempre que tengamos variables globales en nuestro programa Python.

  • El archivo setup. py debería listar los paquetes Python que necesitamos instalar (timezonefinder y pytz) a medida que avanzamos-Cloud Dataflow necesitará instalar estos paquetes en las instancias Compute Engine que lanza entre bastidores:

    REQUIRED_PACKAGES = [
        'timezonefinder',
        'pytz'
    ]
  • Por defecto, Dataflow autoescala el número de trabajadores en función del rendimiento: cuantas más líneas tengamos en nuestros archivos de datos de entrada, más trabajadores necesitaremos. Esto se denomina Autoescalado Horizontal. Para desactivar el autoescalado, podemos especificar --autoscaling_algorithm=NONE, y para limitarlo un poco, podemos especificar el número máximo de trabajadores.

  • Especificamos la región en la que debe ejecutarse el canal de flujo de datos.

  • El corredor ya no es DirectRunner (que se ejecuta localmente). Ahora es DataflowRunner.

La ejecución del programa Python envía el trabajo a la nube. Cloud Dataflow autoescala cada paso de la canalización en función del rendimiento, y transmite los datos de los eventos a BigQuery (ver Figura 4-3). Puedes monitorizar el trabajo en ejecución en la Consola de la Plataforma en la Nube, en la sección Cloud Dataflow.

Mientras se escriben los datos de los eventos, podemos consultarlos accediendo a la consola de BigQuery y escribiendo lo siguiente:

SELECT
  ORIGIN,
  DEP_TIME,
  DEST,
  ARR_TIME,
  ARR_DELAY,
  EVENT_TIME,
  EVENT_TYPE
FROM
  dsongcp.flights_simevents
WHERE
  (DEP_DELAY > 15 and ORIGIN = 'SEA') or
  (ARR_DELAY > 15 and DEST = 'SEA')
ORDER BY EVENT_TIME ASC
LIMIT
  5

Esto vuelve:

Row ORIGIN DEP_TIME DEST ARR_TIME ARR_DELAY EVENT_TIME EVENT_TYPE
1 SEA 2015-01-01 08:21:00 UTC IAD null null 2015-01-01 08:21:00 UTC departed
2 SEA 2015-01-01 08:21:00 UTC IAD null null 2015-01-01 08:38:00 UTC wheelsoff
3 SEA 2015-01-01 08:21:00 UTC IAD 2015-01-01 12:48:00 UTC 22.0 2015-01-01 12:48:00 UTC arrived
4 KOA 2015-01-01 10:11:00 UTC SEA 2015-01-01 15:45:00 UTC 40.0 2015-01-01 15:45:00 UTC arrived
5 SEA 2015-01-01 16:43:00 UTC PSP null null 2015-01-01 16:43:00 UTC departed

Como era de esperar, vemos tres sucesos para el vuelo SEA-IAD, uno a la salida, el siguiente al despegue y el tercero a la llegada. El retraso de llegada sólo se conoce a la llegada.

BigQuery es una base de datos columnar, por lo que una consulta que selecciona todos los campos:

SELECT
  *
FROM
  dsongcp.flights_simevents
ORDER BY EVENT_TIME ASC

será ineficaz. Sin embargo, necesitamos todos los datos de los eventos para enviar notificaciones de los mismos. Por lo tanto, cambiamos almacenamiento por velocidad añadiendo una columna extra llamada EVENT_DATA a nuestra tabla BigQuery y la rellenamos en nuestra canalización de flujo de datos de la siguiente manera:

def create_event_row(fields):
   featdict = dict(fields)  # copy
   featdict['EVENT_DATA'] = json.dumps(fields)
   return featdict

Entonces, nuestra consulta para obtener los eventos podría ser simplemente la siguiente

SELECT
  EVENT_TYPE,
  EVENT_TIME,
  EVENT_DATA
FROM
  dsongcp.flights_simevents
WHERE
  EVENT_TIME >= TIMESTAMP('2015-05-01 00:00:00 UTC')
  AND EVENT_TIME < TIMESTAMP('2015-05-03 00:00:00 UTC')
ORDER BY
  EVENT_TIME ASC
LIMIT
  2

El resultado es el siguiente:

Row EVENT_TYPE EVENT_TIME EVENT_DATA
1 wheelsoff 2015-05-01 00:00:00 UTC {"FL_DATE": "2015-04-30", "UNIQUE_CARRIER": "DL", "ORIGIN_AIRPORT_SEQ_ID": "1295302", "ORIGIN": "LGA", "DEST_AIRPORT_SEQ_ID": "1330303", "DEST": "MIA", "CRS_DEP_TIME": "2015-04-30T23:29:00", "DEP_TIME": "2015-04-30T23:35:00", "DEP_DELAY": 6.0, "TAXI_OUT": 25.0, "WHEELS_OFF": "2015-05-01T00:00:00", "CRS_ARR_TIME": "2015-05-01T02:53:00", "CANCELLED": false, "DIVERTED": false, "DEP_AIRPORT_TZOFFSET": -14400.0, "ARR_AIRPORT_TZOFFSET": -14400.0, "EVENT_TYPE": "wheelsoff", "EVENT_TIME": "2015-05-01T00:00:00"}
2 departed 2015-05-01 00:00:00 UTC {"FL_DATE": "2015-04-30", "UNIQUE_CARRIER": "DL", "ORIGIN_AIRPORT_SEQ_ID": "1295302", "ORIGIN": "LGA", "DEST_AIRPORT_SEQ_ID": "1320402", "DEST": "MCO", "CRS_DEP_TIME": "2015-04-30T23:55:00", "DEP_TIME": "2015-05-01T00:00:00", "DEP_DELAY": 5.0, "CRS_ARR_TIME": "2015-05-01T02:45:00", "CANCELLED": false, "DIVERTED": false, "DEP_AIRPORT_TZOFFSET": -14400.0, "ARR_AIRPORT_TZOFFSET": -14400.0, "EVENT_TYPE": "departed", "EVENT_TIME": "2015-05-01T00:00:00"}

Esta tabla servirá como fuente de nuestros eventos; a partir de ella simularemos la transmisión de datos de vuelo.

Publicar un flujo de eventos en Cloud Pub/Sub

Ahora que tenemos los eventos fuente de los datos de vuelo sin procesar, estamos listos para simular el flujo. Los datos en flujo en Google Cloud Platform se publican normalmente en Cloud Pub/Sub, un servicio de mensajería en tiempo real sin servidor. Cloud Pub/Sub proporciona una entrega fiable y puede escalar a más de un millón de mensajes por segundo. A menos que utilices Cloud Pub/Sub Lite (que es un servicio de una sola zona construido para un funcionamiento de bajo coste), Pub/Sub almacena copias de los mensajes en varias zonas para proporcionar una entrega garantizada "al menos una vez" a los suscriptores, y puede haber muchos suscriptores simultáneos.

Nuestro simulador leerá de la tabla de eventos en BigQuery (rellenada en la sección anterior) y publicará mensajes en Cloud Pub/Sub. Esencialmente, recorreremos los registros de eventos de vuelo, obteniendo la hora de notificación de cada uno, y simularemos la publicación de esos eventos a medida que se produzcan.

Factor de aceleración

Sin embargo, también utilizaremos un mapeo entre la hora de notificación del evento (hora de llegada o salida en función del evento) y la hora actual del sistema. ¿Por qué? Porque es ineficaz simular siempre los eventos de vuelo a velocidades de tiempo real. En cambio, puede que queramos ejecutar un día de datos de vuelo en una hora (siempre que el código que procesa estos eventos pueda manejar el aumento de la velocidad de los datos). En otras ocasiones, puede que estemos ejecutando nuestro código de procesamiento de eventos en un entorno de depuración que sea más lento y, por tanto, queramos ralentizar la simulación. Me referiré a esta relación entre el tiempo real y el tiempo de simulación como factor de aceleración : elfactor de aceleración será mayor que 1 si queremos que la simulación sea más rápida que el tiempo real y menor que 1 si queremos que sea más lenta que el tiempo real.

Según el factor de aceleración, tendremos que hacer una transformación lineal del tiempo del evento al tiempo del sistema. Si el factor de aceleración es 1, una diferencia de 60 minutos entre el inicio de la simulación en tiempo de evento y la marca de tiempo del registro actual debería encontrarse 60 minutos después del inicio de la simulación. Si el factor de aceleración es 60, una diferencia de 60 minutos en el tiempo del evento se traduce en una diferencia de 1 minuto en el tiempo del sistema, por lo que el registro debería publicarse un minuto más tarde. Si el reloj de la hora del evento se adelanta al reloj del sistema, dormimos el tiempo necesario para permitir que la simulación se ponga al día.

La simulación consta de cuatro pasos (ver también la Figura 4-5):

  • Ejecuta la consulta para obtener el conjunto de registros de eventos de vuelo que hay que publicar.

  • Recorre los resultados de la consulta.

  • Acumula eventos para publicarlos por lotes.

  • Publica los eventos acumulados y duerme según sea necesario.

Aunque se trata de un canal ETL, la necesidad de procesar los registros en estricto orden secuencial y de dormir entre ellos hace que este canal ETL no encaje bien en Cloud Dataflow. En su lugar, lo implementaremos como un programa Python puro. El problema de esta elección es que el código de simulación no es tolerante a fallos: si la simulación falla, no se reiniciará automáticamente y, definitivamente, no empezará desde el último evento notificado con éxito.

Figura 4-5. Los cuatro pasos de la simulación.

El código de simulación que estamos escribiendo es sólo para experimentar rápidamente con datos de flujo. Por lo tanto, no haré el esfuerzo adicional necesario para hacerlo tolerante a fallos. Si tuviéramos que hacerlo, podríamos hacer que la simulación fuera tolerante a fallos partiendo de una consulta BigQuery limitada en términos de un intervalo de tiempo cuyo inicio se dedujera automáticamente del último registro notificado en Cloud Pub/Sub. A continuación, podríamos lanzar el script de simulación desde un contenedor Docker y utilizar Cloud Run o Google Kubernetes Engine para reiniciar automáticamente la simulación si falla el código de simulación.

Conseguir Registros para Publicar

La consulta BigQuery está parametrizada por la hora de inicio y fin de la simulación y puede invocarse a través de la API de Google Cloud para Python (consulta 04_streaming/simulate/simulate.py en el repositorio de GitHub):

   bqclient = bq.Client(args.project)
   querystr = """
SELECT
  EVENT_TYPE,
  EVENT_TIME AS NOTIFY_TIME,
  EVENT_DATA
FROM
  dsongcp.flights_simevents
WHERE
  EVENT_TIME >= TIMESTAMP('{}')
  AND EVENT_TIME < TIMESTAMP('{}')
ORDER BY
  EVENT_TIME ASC
"""
   rows = bqclient.query(querystr.format(args.startTime,
                                         args.endTime))

Esto, sin embargo, es una mala idea. ¿Ves por qué?

Se debe a que estamos obteniendo la hora de inicio y la hora de finalización de la línea de comandos del script de simulación y pasándola directamente a BigQuery. Esto se denomina inyección SQL, que puede provocar problemas de seguridad.17 Un enfoque mejor es utilizar consultas parametrizadas: laconsulta BigQuery contiene los parámetros marcados como @startTime, etc., y la función de consulta Python toma las definiciones a través del parámetro de configuración del trabajo:

   bqclient = bq.Client(args.project)
   querystr = """
SELECT
  EVENT_TYPE,
  EVENT_TIME AS NOTIFY_TIME,
  EVENT_DATA
FROM
  dsongcp.flights_simevents
WHERE
  EVENT_TIME >= @startTime
  AND EVENT_TIME < @endTime
ORDER BY
  EVENT_TIME ASC
"""
   job_config = bq.QueryJobConfig(
       query_parameters=[
           bq.ScalarQueryParameter("startTime", "TIMESTAMP", args.startTime),
           bq.ScalarQueryParameter("endTime", "TIMESTAMP", args.endTime),
       ]
   )
   rows = bqclient.query(querystr, job_config=job_config)

La función de consulta devuelve un objeto (llamado rows en el fragmento anterior) por el que podemos iterar:

  for row in rows:
    # do something

¿Qué tenemos que hacer para cada una de las filas? Tendremos que iterar por los registros, crear un lote de eventos y publicar cada lote. Veamos cómo se realiza cada uno de estos pasos.

¿Cuántos Temas?

Mientras recorremos los resultados de la consulta, necesitamos publicar eventos en Cloud Pub/Sub. Tenemos tres opciones en cuanto a la arquitectura:

  • Podríamos publicar todos los eventos en un único tema. Sin embargo, esto puede suponer un derroche de ancho de banda de red si tenemos un suscriptor que sólo esté interesado en el evento wheelsoff. Dicho suscriptor tendrá que analizar el evento entrante, descodificar el archivo EVENT_TYPE en el JSON y descartar los eventos en los que no esté interesado.

  • Podríamos publicar todos los eventos en un único tema, pero añadiendo atributos a cada mensaje. Por ejemplo, para publicar un evento con dos atributos event_type y carrierharíamos

     publisher.publish(topic, event_data,
                       event_type='departed', carrier='AA')

    Entonces, el suscriptor podría solicitar un filtrado del lado del servidor basado en un atributo o combinación de atributos al crear la suscripción:

     subscriber.create_subscription(request={..., "filter": 
       "attributes.carrier='AS' AND attributes.event_type='arrived'"})
  • Crea un tema distinto por tipo de evento (es decir, un tema arrived, un tema departed y un tema wheelsoff ).

La Opción 1 es la más sencilla, y debería ser tu elección por defecto a menos que tengas muchos suscriptores que sólo estén interesados en subconjuntos del flujo de eventos. Si vas a tener suscriptores interesados en subconjuntos del flujo de eventos, elige entre las Opciones 2 y 3.

La opción 2 añade complejidad de software. La Opción 3 añade complejidad de infraestructura. Sugiero elegir la Opción 3 cuando sólo tengas un atributo, y ese atributo sólo tenga un puñado de opciones. Esto limita la complejidad de la infraestructura a la vez que simplifica el código del editor y del suscriptor. Elige la Opción 2 cuando tengas muchos atributos, cada uno de ellos con muchos valores posibles, porque la Opción 3 en ese caso provocará una explosión en el número de temas.

Iterar a través de registros

Elegiremos tener un tema separado por tipo de evento (es decir, un tema arrived, un tema departed tema y un tema wheelsoff ), por lo que crearemos tres temas:18

for event_type in ['wheelsoff', 'arrived', 'departed']:
  topics[event_type] = publisher.topic_path(args.project, event_type)
  try:
    publisher.get_topic(topic=topics[event_type])
    logging.info("Already exists: {}".format(topics[event_type]))
  except:
    logging.info("Creating {}".format(topics[event_type]))
    publisher.create_topic(name=topics[event_type])

Tras crear los temas, llamamos al método notify() pasando las filas leídas de BigQuery:

# notify about each row in the dataset
programStartTime = datetime.datetime.utcnow()
simStartTime = datetime.datetime.strptime(args.startTime,
                         TIME_FORMAT).replace(tzinfo=pytz.UTC)
notify(publisher, topics, rows, simStartTime, 
       programStartTime, args.speedFactor)

Crear un lote de eventos

El método notify() consiste en acumular las filas en lotes, publicar un lote y dormir hasta que llegue el momento de publicar el siguiente lote:

def notify(publisher, topics, rows, simStartTime, programStart, speedFactor):
   # sleep computation
   def compute_sleep_secs(notify_time):
        time_elapsed = (datetime.datetime.utcnow() - 
                        programStart).seconds
        sim_time_elapsed = (notify_time - simStartTime).seconds / speedFactor
        to_sleep_secs = sim_time_elapsed - time_elapsed
        return to_sleep_secs

   tonotify = {}
   for key in topics:
     tonotify[key] = list()

   for row in rows:
       event, notify_time, event_data = row

       # how much time should we sleep?
       if compute_sleep_secs(notify_time) > 1:
          # notify the accumulated tonotify
          publish(publisher, topics, tonotify, notify_time)
          for key in topics:
             tonotify[key] = list()

          # recompute sleep, since notification takes a while
          to_sleep_secs = compute_sleep_secs(notify_time)
          if to_sleep_secs > 0:
             logging.info('Sleeping {} seconds'.format(to_sleep_secs))
             time.sleep(to_sleep_secs)

       tonotify[event].append(event_data)
   # left-over records; notify again
   publish(publisher, topics, tonotify, notify_time)

Aquí hay que hacer algunas puntualizaciones. La primera es que trabajamos completamente en UTC para que los cálculos de la diferencia horaria tengan sentido. En segundo lugar, siempre calculamos si hay que dormir mirando la diferencia de tiempo desde el inicio de la simulación. Si simplemente seguimos moviendo un puntero hacia adelante, se acumularán errores en el tiempo. Por último, ten en cuenta que la primera vez comprobamos si el tiempo de sueño es superior a un segundo, para dar tiempo a que se acumulen los registros. Si, al ejecutar el programa, no ves ningún tiempo de espera, tu factor de aceleración es demasiado alto para la capacidad de la máquina que ejecuta el código de simulación y la red entre esa máquina y Google Cloud Platform. Ralentiza la simulación, consigue una máquina más grande o ejecútala detrás del cortafuegos de Google (como en Cloud Shell o en una instancia de Compute Engine).

Publicar un lote de eventos

El método notify() que vimos en el ejemplo de código anterior ha acumulado los eventos entre llamadas a dormir. Aunque parezca que estamos publicando un evento cada vez, en realidad el editor mantiene un lote distinto para cada tema:

def publish(publisher, topics, allevents):
   for key in topics:  # 'departed', 'arrived', etc.
      topic = topics[key]
      events = allevents[key]
      logging.info('Publishing {} {} events'.format(len(events), key))
      for event_data in events:
          publisher.publish(topic, event_data.encode())

Ten en cuenta que Cloud Pub/Sub no garantiza el orden en que se entregarán los mensajes, especialmente si el suscriptor deja que se acumule un gran retraso. Se producirán mensajes fuera de orden, y los suscriptores posteriores tendrán que ocuparse de ellos. Cloud Pub/Sub garantiza la entrega "al menos una vez" y reenviará el mensaje si el suscriptor no acusa recibo a tiempo. Utilizaré Cloud Dataflow para hacer la ingesta desde Cloud Pub/Sub, y Cloud Dataflow se ocupa de estos dos problemas (desorden y duplicación) de forma transparente.

Podemos probar la simulación escribiendo lo siguiente:

python3 simulate.py --startTime '2015-05-01 00:00:00 UTC' \
      --endTime '2015-05-04 00:00:00 UTC' --speedFactor=60

Esto simulará tres días de datos de vuelo (la hora de finalización es exclusiva) a 60 veces la velocidad en tiempo real y transmitirá los eventos a tres temas en Cloud Pub/Sub.19 Como la simulación se inicia a partir de una consulta BigQuery, es bastante sencillo limitar los eventos simulados a un solo aeropuerto o a aeropuertos dentro de un cuadro delimitador de latitud/longitud.

En esta sección, hemos visto cómo producir un flujo de eventos y publicar esos eventos en tiempo real. A lo largo de este libro, podemos utilizar este simulador y estos temas para experimentar cómo consumir datos de flujo y realizar análisis en tiempo real.

Procesamiento de flujos en tiempo real

Ahora que tenemos una fuente de datos de streaming que incluye información de localización, veamos cómo construir un panel de control en tiempo real. La Figura 4-6 presenta la arquitectura de referencia de muchas soluciones en Google Cloud Platform.20

Figura 4-6. Arquitectura de referencia para el procesamiento de datos en Google Cloud Platform.

En la sección anterior, configuramos un flujo de eventos en tiempo real en Cloud Pub/Sub que podemos agregar en Cloud Dataflow y escribir en BigQuery. Data Studio puede conectarse a BigQuery y proporcionar un panel interactivo en tiempo real. Vamos a empezar.

Streaming en flujo de datos

Cuando realizamos la corrección horaria de los datos de vuelo sin procesar, estábamos trabajando a partir de una tabla de vuelos completa de BigQuery, procesándolos en Cloud Dataflow y escribiendo la tabla de eventos en BigQuery. El procesamiento de un conjunto de datos de entrada finito y acotado se denomina procesamiento por lotes.

Aquí, sin embargo, tenemos que procesar eventos en Cloud Pub/Sub que están entrando en streaming. El conjunto de datos es ilimitado. Procesar un conjunto de datos ilimitado se denomina procesamiento en flujo. Afortunadamente, el código para procesar flujos en Apache Beam es idéntico al código para procesar lotes.

Podríamos simplemente recibir los eventos de Cloud Pub/Sub de forma similar a como leemos los datos de un archivo CSV:21

topic_name = "projects/{}/topics/arrived".format(project)
events = (pipeline
         | 'read' >> beam.io.ReadFromPubSub(topic=topic_name)
         | 'parse' >> beam.Map(lambda s: json.loads(s))
         )

El único cambio que tenemos que hacer es activar la bandera de streaming en las opciones de Flujo de datos:

argv = [
        ...
        '--streaming',
    ]

Podemos transmitir los eventos de lectura a BigQuery utilizando un código similar al que utilizamos en el procesamiento por lotes:

schema = 'FL_DATE:date,...,EVENT_TYPE:string,EVENT_TIME:timestamp'
(events
         | 'bqout' >> beam.io.WriteToBigQuery(
                    'dsongcp.streaming_events', schema=schema,
               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
            )
)

En el código anterior, nos suscribimos a un tema en Cloud Pub/Sub y empezamos a leerlo. A medida que llega cada mensaje, lo analizamos, lo convertimos en una fila de tabla en BigQuery y, a continuación, lo escribimos. De hecho, si esto es todo lo que necesitamos, podemos utilizar simplemente la plantilla de flujo de datos proporcionada por Google que va de Pub/Sub a BigQuery.

Pero digamos que queremos leer tanto los eventos que llegan como los que se van y escribirlos en la misma tabla BigQuery. Podemos hacerlo de forma muy sencilla en Beam:

events = {}
for event_name in ['arrived', 'departed']:
  topic_name = "projects/{}/topics/{}".format(project, event_name)
  events[event_name] = (pipeline
     | 'read:{}'.format(event_name) >>  
             beam.io.ReadFromPubSub(topic=topic_name)
     | 'parse:{}'.format(event_name) >> beam.Map(
             lambda s: json.loads(s))
  )

all_events = (events['arrived'], events['departed']) | beam.Flatten()

Al aplanar los dos conjuntos de eventos, los concatenamos en una única colección. A continuación, escribimos all_events en BigQuery.

Para probar este código, necesitamos ejecutar el simulador que escribimos en la sección anterior, de modo que el simulador pueda publicar eventos en los temas Pub/Sub. Para iniciar la simulación, inicia el simulador Python que desarrollamos en la sección anterior:

python simulate.py --startTime '2015-05-01 00:00:00 UTC'
--endTime '2015-05-04 00:00:00 UTC'  --speedFactor 30

El simulador enviará los eventos del 1 de mayo de 2015 al 3 de mayo de 2015 a una velocidad 30 veces superior a la del tiempo real, de modo que una hora de datos se envía a Cloud Pub/Sub en dos minutos. Puedes hacerlo desde Cloud Shell o desde tu portátil local. (Si es necesario, ejecuta install_packages.sh para instalar los paquetes Python necesarios y gcloud auth application-default login para dar a la aplicación las credenciales necesarias para ejecutar consultas).

En otro terminal, inicia avg01.py para leer el flujo de eventos y escribirlos en BigQuery. A continuación, podemos consultar el conjunto de datos en BigQuery incluso cuando los eventos se están transmitiendo. Puede que la interfaz de usuario de BigQuery ni siquiera muestre aún esta tabla de flujo, pero se puede consultar:

SELECT * FROM dsongcp.streaming_events
ORDER BY EVENT_TIME DESC
LIMIT 5

Ventana de una tubería

Aunque podríamos hacer sólo una transferencia directa de datos, me gustaría hacer algo más. Cuando rellene un panel de control en tiempo real de los retrasos de los vuelos, me gustaría que la información se agregara a lo largo de un intervalo razonable; por ejemplo, quiero una media móvil de los retrasos de los vuelos y del número total de vuelos de los últimos 60 minutos en cada aeropuerto. Así que, en lugar de simplemente tomar la información recibida de Cloud Pub/Sub y transmitirla a BigQuery, me gustaría llevar a cabo análisis de ventana de tiempo sobre los datos a medida que los recibo y escribir esos análisis en BigQuery.22 Cloud Dataflow puede ayudarnos a hacerlo.

Aunque hagamos una media de 60 minutos, ¿con qué frecuencia debemos calcular esta media de 60 minutos? Podría ser ventajoso, por ejemplo, utilizar una ventana móvil y calcular esta media de 60 minutos cada cinco minutos.

Agregación de secuencias

La diferencia clave entre la agregación por lotes y la agregación por flujo es la naturaleza ilimitada de los datos en el procesamiento por flujo. ¿Qué significa una operación como "máx" cuando los datos son ilimitados? Al fin y al cabo, sea cual sea nuestro máximo en este momento, podría aparecer un valor grande en el flujo en un momento posterior.

Un concepto clave al agregar datos en flujo es el de una ventana que se convierte en el ámbito de todas las agregaciones. Aquí, aplicamos una ventana deslizante basada en el tiempo en la canalización. A partir de ahora, todas las agrupaciones, agregaciones, etc., se realizan dentro de esa ventana temporal y hay un máximo, una media, etc., distintos en cada ventana temporal:

stats = (all_events
   | 'byairport' >> beam.Map(by_airport)
   | 'window' >> beam.WindowInto(
                 beam.window.SlidingWindows(60 * 60, 5 * 60))
   | 'group' >> beam.GroupByKey()
   | 'stats' >> beam.Map(lambda x: compute_stats(x[0], x[1]))
)

Repasemos detenidamente el fragmento de código anterior.

Lo primero que hacemos es tomar todos los acontecimientos y aplicarles la transformación by_airport:

   | 'byairport' >> beam.Map(by_airport)

Lo que esto hace es extraer el aeropuerto de origen para los eventos de salida y el aeropuerto de destino para los eventos de llegada:

def by_airport(event):
    if event['EVENT_TYPE'] == 'departed':
        return event['ORIGIN'], event
    else:
        return event['DEST'], event

A continuación, aplicamos una ventana deslizante al flujo de eventos. La ventana tiene una duración de 60 minutos y se aplica cada 5 minutos:

   | 'window' >> beam.WindowInto(
                 beam.window.SlidingWindows(60 * 60, 5 * 60))

A continuación, aplicamos un GroupByKey:

   | 'group' >> beam.GroupByKey()

¿Cuál es la clave?

En la función by_airport mencionada anteriormente, hicimos que el aeropuerto fuera la clave y todo el objeto evento el valor. Así, el GroupByKey agrupa los eventos por aeropuerto.

Pero la GroupByKey no es sólo por aeropuerto. Como ya hemos aplicado una ventana deslizante, se ha creado un grupo distinto para cada ventana de tiempo. Así, cada grupo consta ahora de 60 minutos de eventos de vuelo que llegaron o salieron de un aeropuerto concreto.

Es sobre estos grupos sobre los que llamamos a la función compute_stats en el último Map del fragmento:

   | 'stats' >> beam.Map(lambda x: compute_stats(x[0], x[1]))

La función compute_stats toma el aeropuerto y la lista de acontecimientos en ese aeropuerto, y luego calcula algunas estadísticas:

def compute_stats(airport, events):
    arrived = [event['ARR_DELAY'] for event in events 
                if event['EVENT_TYPE'] == 'arrived']
    avg_arr_delay = float(np.mean(arrived)) 
                if len(arrived) > 0 else None

    departed = [event['DEP_DELAY'] for event in events 
                if event['EVENT_TYPE'] == 'departed']
    avg_dep_delay = float(np.mean(departed)) 
                if len(departed) > 0 else None

    num_flights = len(events)
    start_time = min([event['EVENT_TIME'] for event in events])
    latest_time = max([event['EVENT_TIME'] for event in events])

    return {
        'AIRPORT': airport,
        'AVG_ARR_DELAY': avg_arr_delay,
        'AVG_DEP_DELAY': avg_dep_delay,
        'NUM_FLIGHTS': num_flights,
        'START_TIME': start_time,
        'END_TIME': latest_time
    }

En el código anterior, extraemos los sucesos llegados y calculamos el retraso medio de llegada. Del mismo modo, calculamos el retraso medio de salida de los eventos de salida. También calculamos el número de vuelos en la ventana de tiempo en este aeropuerto y devolvemos todas estas estadísticas.

A continuación, las estadísticas se escriben en BigQuery mediante un código que ya debería resultarte familiar:

stats_schema = ','.join(
    ['AIRPORT:string,AVG_ARR_DELAY:float,AVG_DEP_DELAY:float',                          
     'NUM_FLIGHTS:int64,START_TIME:timestamp,END_TIME:timestamp'])
(stats
    | 'bqout' >> beam.io.WriteToBigQuery(
             'dsongcp.streaming_delays', schema=stats_schema,
       create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
               )
)

Al igual que en la sección anterior, podemos iniciar el simulador y, a continuación, iniciar avg02.py. Cuando hice esto, las agregaciones resultantes se producían cada 5 minutos, pero dentro de cada periodo de 5 minutos, los eventos sobre los que se notificaba cubrían un rango de 150 minutos (porque la simulación 30x procesa 150 minutos de datos en 5 minutos).

El motor de procesamiento de flujos aplicaba las ventanas deslizantes basándose en la hora de un reloj de pared. Nosotros, sin embargo, queremos que aplique la ventana basándose en la marca de tiempo dentro de las imágenes.

¿Cómo lo hacemos?

Utilizar marcas de tiempo de eventos

Tenemos que añadir un atributo en el momento de publicar los eventos (en simulate.py):

publisher.publish(topic, event_data.encode(), EventTimeStamp=timestamp)

Entonces, en nuestra canalización Beam, al leer desde Pub/Sub, deberíamos decirle a la canalización que ignore la hora de publicación en Pub/Sub y que utilice en su lugar este atributo del mensaje como marca de tiempo:

| 'read:{}'.format(event_name) >> beam.io.ReadFromPubSub(
            topic=topic_name, timestamp_attribute='EventTimeStamp')

Con este cambio, cuando ejecuto la consulta

SELECT * FROM dsongcp.streaming_delays
WHERE AIRPORT = 'ATL'
ORDER BY END_TIME DESC

Obtengo filas separadas aproximadamente 5 minutos, como era de esperar:

Row AIRPORT AVG_ARR_DELAY AVG_DEP_DELAY NUM_FLIGHTS START_TIME END_TIME
1 ATL 35.72222222222222 13.666666666666666 48 2015-05-01 02:24:00 UTC 2015-05-01 03:21:00 UTC
2 ATL 35.25 8.717948717948717 59 2015-05-01 02:15:00 UTC 2015-05-01 03:12:00 UTC
3 ATL 38.666666666666664 9.882352941176471 52 2015-05-01 02:19:00 UTC 2015-05-01 03:12:00 UTC
4 ATL 38.473684210526315 5.916666666666667 55 2015-05-01 02:15:00 UTC 2015-05-01 03:08:00 UTC
5 ATL 35.111111111111114 5.53125 50 2015-05-01 02:15:00 UTC 2015-05-01 03:03:00 UTC

Las horas indicadas no están separadas exactamente por 5 minutos, porque corresponden al vuelo más temprano/más tardío en Atlanta dentro de la franja horaria. Ten en cuenta también que la duración de la ventana de tiempo es de aproximadamente una hora.

Sin embargo, es probable que Cloud Shell o tu portátil local tengan dificultades para seguir el flujo de eventos. Necesitamos ejecutar este pipeline en Dataflow de una forma sin servidor.

Ejecutar el procesamiento del flujo

Para ejecutar el pipeline Beam en Cloud Dataflow, todo lo que tengo que hacer es cambiar el runner (ver avg03.py en el repositorio de GitHub):

argv = [
        '--project={0}'.format(project),
        '--job_name=ch04avgdelay',
        '--streaming',
      ...
        '--runner=DataflowRunner'
    ]

Sin embargo, antes de iniciar este pipeline, es una buena idea eliminar las filas ya escritas en la tabla BigQuery por avg02.py en la sección anterior. La forma más sencilla de hacerlo es ejecutar el siguiente comando SQL DML para truncar la tabla:

TRUNCATE TABLE dsongcp.streaming_delays

Al ejecutar avg03. py se iniciará un trabajo de Flujo de datos. Si ahora navegas a la consola de la Plataforma Nube, a la sección Cloud Dataflow, verás que se ha iniciado un nuevo trabajo de flujo de datos y que el pipeline tiene el aspecto que se muestra en la Figura 4-7.

La canalización procesa los eventos de vuelo a medida que llegan a Pub/Sub, los agrega en ventanas de tiempo y transmite las estadísticas resultantes a BigQuery.

Figura 4-7. La cadena de flujo para calcular las estadísticas de retrasos en tiempo real en cada aeropuerto.

Analizar datos en streaming en BigQuery

Dos minutos después del lanzamiento de tu programa23 el primer conjunto de datos llegará a BigQuery. Puedes consultar las estadísticas de un aeropuerto concreto desde la consola de BigQuery utilizando la misma consulta que antes:

SELECT * FROM dsongcp.streaming_delays
WHERE AIRPORT = 'ATL'
ORDER BY END_TIME DESC

Lo mejor es que podemos hacer esta consulta incluso cuando los datos están fluyendo. ¿Cómo podríamos obtener los datos más recientes de todos los aeropuertos? Podríamos obtener todos los datos de cada aeropuerto, ordenarlos por tiempo y tomar el más reciente:

SELECT 
    AIRPORT,
    ARRAY_AGG(
        STRUCT(AVG_ARR_DELAY, AVG_DEP_DELAY, NUM_FLIGHTS, END_TIME)
        ORDER BY END_TIME DESC LIMIT 1) AS a
FROM dsongcp.streaming_delays d
GROUP BY AIRPORT

Los resultados son más o menos así:

Row AIRPORT a.AVG_ARR_DELAY a.AVG_DEP_DELAY a.NUM_FLIGHTS a.END_TIME
1 BUR -6.8 -5.666666666666667 8 2015-05-01 03:26:00 UTC
2 HNL 17.11111111111111 -3.7777777777777777 18 2015-05-01 03:46:00 UTC
3 CVG -7.75 null 4 2015-05-01 03:48:00 UTC
4 PHL 5.636363636363637 16.5 13 2015-05-01 03:48:00 UTC
5 IND 40.6 null 5 2015-05-01 03:45:00 UTC

Consultas como éstas sobre datos en flujo serán útiles cuando empecemos a construir nuestro cuadro de mando. Por ejemplo, la primera consulta nos permitirá construir un gráfico de series temporales de retrasos en un aeropuerto concreto. La segunda consulta nos permitirá construir un mapa de los retrasos medios en todo el país.

Panel de control en tiempo real

Ahora que tenemos datos en streaming en BigQuery y una forma de analizarlos a medida que entran en streaming, podemos crear un panel que muestre los retrasos de salida y llegada en contexto. Hay dos mapas que pueden ayudar a explicar a los usuarios finales nuestro modelo basado en tablas de contingencia: los retrasos actuales de las llegadas en todo el país y los retrasos actuales de las salidas en todo el país.

Para obtener los datos con los que rellenar estos gráficos, tenemos que añadir una fuente de datos BigQuery en Data Studio. Aunque Data Studio permite especificar la consulta directamente en la interfaz de usuario, es mucho mejor crear una vista en BigQuery y utilizar esa vista como fuente de datos en Data Studio. Las vistas de BigQuery tienen algunas ventajas sobre las consultas que escribes en Data Studio: tienden a ser reutilizables en informes y herramientas de visualización, sólo hay un lugar donde cambiar si se detecta un error, y las vistas de BigQuery se asignan mejor a los privilegios de acceso (roles de Cloud Identity and Access Management) en función de las columnas a las que necesitan acceder.

Esta es la consulta que he utilizado para crear la vista:

CREATE OR REPLACE VIEW dsongcp.airport_delays AS
WITH delays AS (
    SELECT d.*, a.LATITUDE, a.LONGITUDE
    FROM dsongcp.streaming_delays d
    JOIN dsongcp.airports a USING(AIRPORT) 
    WHERE a.AIRPORT_IS_LATEST = 1
)
 
SELECT 
    AIRPORT,
    CONCAT(LATITUDE, ',', LONGITUDE) AS LOCATION,
    ARRAY_AGG(
        STRUCT(AVG_ARR_DELAY, AVG_DEP_DELAY, NUM_FLIGHTS, END_TIME)
        ORDER BY END_TIME DESC LIMIT 1) AS a
FROM delays
GROUP BY AIRPORT, LONGITUDE, LATITUDE

Es ligeramente diferente de la segunda consulta del apartado anterior, ya que también añade la ubicación del aeropuerto uniéndola a la tabla de aeropuertos.

Una vez guardada la vista en BigQuery, podemos crear una fuente de datos para la vista en Data Studio, tal y como hicimos en el capítulo anterior:

  • Visita https://datastudio.google.com.

  • Crea una fuente de datos BigQuery, dirígela a la vista airport_delays y conéctate a ella.

  • Cambia el campo de ubicación de Texto a Geo | Latitud, Longitud, y haz clic en Crear Informe.

  • Añade un gráfico geográfico al informe.

  • Especifica el campo de ubicación como geo dimensión (ver Figura 4-8).

  • Especifica el retraso medio de salida como dimensión y Estados Unidos como nivel de zoom.

  • Cambia el estilo para que la barra de color incluya todas las zonas.

  • Repítelo para el retraso de llegada.

Figura 4-8. Cuadro de mandos de los últimos datos de vuelo de todo Estados Unidos.

Merece la pena reflexionar sobre lo que hicimos en esta sección. Procesamos los datos en streaming en Cloud Dataflow, creando medias móviles de 60 minutos que transmitimos en streaming a BigQuery. A continuación, creamos una vista en BigQuery que mostraría los datos más recientes de cada aeropuerto, incluso mientras se transmitían. La conectamos a un panel de control en Data Studio. Cada vez que el panel se actualiza, extrae nuevos datos de la vista, que a su vez refleja dinámicamente los datos más recientes en BigQuery.

Resumen

En este capítulo, hemos visto cómo construir una canalización de análisis en tiempo real para llevar a cabo análisis en tiempo real y rellenar cuadros de mando en tiempo real. En este libro, utilizamos un conjunto de datos que no está disponible en tiempo real. Por lo tanto, simulamos la creación de un feed en tiempo real para poder demostrar cómo construir una canalización de ingesta de flujo. Construir la simulación también nos proporciona una práctica herramienta de prueba: ya no tenemos que esperar a que ocurra un evento interesante. Basta con reproducir un evento grabado.

En el proceso de creación de la simulación, nos dimos cuenta de que el manejo de la hora en el conjunto de datos original era problemático. Por lo tanto, mejoramos el tratamiento del tiempo en los datos originales y creamos un nuevo conjunto de datos con marcas de tiempo UTC y desplazamientos locales. Éste es el conjunto de datos que utilizaremos en adelante.

También vimos la arquitectura de referencia para manejar datos en streaming en Google Cloud Platform. Primero, recibe tus datos en Cloud Pub/Sub para que los mensajes puedan recibirse de forma asíncrona. Procesa los mensajes Cloud Pub/Sub en Cloud Dataflow, calculando agregaciones sobre los datos según sea necesario, y transmite los datos en bruto o los datos agregados (o ambos) a BigQuery. Trabajamos con los tres productos de Google Cloud (Cloud Pub/Sub, Cloud Dataflow y BigQuery) utilizando las bibliotecas cliente de Google Cloud en Python. Sin embargo, en ninguno de estos casos tuvimos que crear una máquina virtual nosotros mismos, ya que se trata de ofertas sin servidor y autoescalables. Así, pudimos concentrarnos en escribir código, dejando que la plataforma gestionara el resto.

Recursos sugeridos

El sitio web de Apache Beam tiene ejercicios de codificación interactivos, llamados Katas, que proporcionan una excelente forma práctica de aprender conceptos de streaming y cómo implementarlos utilizando Beam.

Las plantillas de flujo de datos son canalizaciones preescritas de Apache Beam que resultan útiles para la migración de datos. En el capítulo mencionamos la plantilla Dataflow para ingerir datos de Pub/Sub en BigQuery. También existen plantillas Dataflow para fuentes ajenas a Google. Por ejemplo, hay un conector Dataflow de SAP HANA a BigQuery, como se describe en la entrada del blog de Google de 2017 "Using Apache Beam and Cloud Dataflow to Integrate SAP HANA and BigQuery" (Uso de Apache Beam y Cloud Dataflow para integrar SAP HANA y BigQuery), de Babu Prasad Elumalai y Mark Shalda. Ese conector en concreto está escrito en Java.

Este tutorial te guía a través del proceso de creación de tu propia plantilla Dataflow. Cualquier canalización de flujo de datos puede convertirse en una plantilla para compartirla fácilmente y lanzarla con comodidad.

En este artículo de 2021, "Procesamiento de miles de millones de eventos en tiempo real en Twitter", los ingenieros de Twitter Lu Zhang y Chukwudiuto Malife describen cómo Twitter procesa 400.000 millones de eventos en tiempo real utilizando Dataflow.

1 Se trata de una situación habitual. Sólo cuando empiezas a explorar un conjunto de datos descubres que necesitas conjuntos de datos auxiliares. Si lo hubiera sabido de antemano, habría ingestado ambos conjuntos de datos. Pero estás siguiendo mi flujo de trabajo y, en este momento, sabía que necesitaba un conjunto de datos de desfases horarios, ¡pero aún no lo había buscado!

2 Consulta 04_streaming/design/mktbl.sh para ver la sintaxis real; aquí hemos hecho ajustes a efectos de impresión.

3 O haz una copia o una vista de la tabla con los valores de las columnas anonimizados: en el Capítulo 7 y en el Apéndice tratamos la salvaguarda de la información de identificación personal.

4 Por ejemplo, el huso horario de Sebastopol cambió en 2014 de la Hora de Europa Oriental (UTC+2) a la Hora de Moscú (UTC+4) tras la anexión de Crimea por la Federación Rusa.

5 La API de Java es mucho más madura y eficaz, pero Python es más sencillo y conciso. En este libro utilizaremos Python.

6 Si utilizas un shell efímero como Cloud Shell, tendrás que ejecutar la línea de activación cada vez que inicies una nueva sesión. Esto cargará el entorno virtual que estabas utilizando anteriormente. De este modo, no tendrás que reinstalar los paquetes de Python cada vez.

7 Este código está en 04_streaming/transform/df01.py del repositorio GitHub de este libro.

8 Este código está en 04_streaming/transform/df02.py del repositorio GitHub de este libro.

9 Consulta la respuesta a la pregunta "¿Cómo gestiono NameErrors?" en la documentación de Google.

10 Guardar objetos Python se llama decapar.

11 El aeropuerto de Chicago no hizo las maletas y se mudó el 30 de junio. Lo más probable es que en ese momento se abriera una nueva terminal o pista, y esto cambiara la ubicación del centroide de la extensión aérea del aeropuerto. Observa que el cambio es de sólo 0,0036 en grados de latitud. En la latitud de Chicago, esto se traduce en unos 400 metros.

12 Normalmente, la forma recomendada de muestrear una tabla BigQuery es hacerlo en SELECT * FROM dsongcp.flights WHERE TABLESAMPLE SYSTEM (0.001) porque el muestreo de tablas no se almacena en caché, por lo que obtendremos resultados diferentes cada vez. Sin embargo, en el momento de escribir esto, el muestreo de tablas sólo funciona en tablas y los vuelos son una Vista. Además, en nuestro caso de uso actual, no nos importa si obtenemos o no muestras diferentes cada vez que ejecutamos la consulta. Por eso utilizo rand().

13 Consulta el archivo 04_streaming/transform/bqsample.sh.

14 Este código está en 04_streaming/transform/df03.py del repositorio GitHub de este libro.

15 Este código está en 04_streaming/transform/df04.py del repositorio GitHub de este libro.

16 Este código está en 04_streaming/transform/df05.py del repositorio GitHub de este libro.

17 Abre una puerta a que alguien introduzca consultas que podrían, por ejemplo, borrar una tabla. Esta viñeta de XKCD es famosa por poner de relieve este problema.

18 Consulta 04_streaming/simulate/simulate.py en el repositorio de GitHub.

19 A 60 veces la velocidad en tiempo real, los 3 días de datos de vuelo tardarán más de una hora en completarse. Con suerte, será tiempo suficiente para completar el resto del capítulo. Si no, reinicia el simulador. Si terminas antes, pulsa Ctrl-C para detener el simulador.

20 Para ver un ejemplo, consulta la arquitectura de referencia para analizar juegos en dispositivos móviles.

21 Consulta 04_streaming/realtime/avg01.py en el repositorio de GitHub.

22 Si quisieras escribir en BigQuery los datos sin procesar que se reciben, también podrías hacerlo, por supuesto; eso es lo que se muestra en el fragmento de código anterior. En esta sección, asumo que sólo necesitamos las estadísticas agregadas de la última hora.

23 Recuerda que estamos calculando agregados de más de 60 minutos cada 5 minutos. Cloud Dataflow considera que la primera ventana "completa" se produce a los 65 minutos de la simulación. Como estamos simulando a una velocidad 30 veces superior, esto equivale a dos minutos en tu reloj.

Get Ciencia de Datos en la Plataforma en la Nube de Google, 2ª Edición now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.