Kapitel 4. Streaming-Daten: Veröffentlichung und Ingest mit Pub/Sub und Dataflow

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

In Kapitel 3, , haben wir ein Dashboard entwickelt, um ein auf einer Kontingenztabelle basierendes Modell zu erklären, das vorschlägt, ob ein Treffen abgesagt werden soll. Dem von uns entwickelten Dashboard fehlte jedoch die Unmittelbarkeit, da es nicht mit dem Kontext der Nutzer verknüpft war. Da die Nutzerinnen und Nutzer ein Dashboard aufrufen und die Informationen sehen müssen, die für sie zu diesem Zeitpunkt relevant sind, müssen wir ein Echtzeit-Dashboard mit Ortsangaben entwickeln.

Wie würden wir unserem Dashboard Kontext hinzufügen? Wir müssen Karten mit Verspätungen in Echtzeit anzeigen. Dazu brauchen wir die Standorte der Flughäfen und Echtzeitdaten. Die Standorte der Flughäfen können beim US Bureau of Transportation Statistics (BTS; dieselbe Behörde, von der wir unsere historischen Flugdaten erhalten haben) abgefragt werden. Flugdaten in Echtzeit sind jedoch ein kommerzielles Produkt. Wenn wir ein Geschäft mit der Vorhersage von Flugankünften aufbauen wollten, würden wir diese Daten kaufen. Für die Zwecke dieses Buches wollen wir es aber nur simulieren.

Die Simulation der Erstellung eines Echtzeit-Feeds aus historischen Daten hat den Vorteil, dass wir beide Seiten einer Streaming-Pipeline (Produktion und Verbrauch) sehen können. Im folgenden Abschnitt sehen wir uns an, wie wir Daten in die Datenbank streamen könnten, wenn wir sie in Echtzeit empfangen würden.

Alle Codeschnipsel in diesem Kapitel sind im Ordner 04_streaming im GitHub-Repository des Buches verfügbar. In der Datei README.md in diesem Verzeichnis findest du Anweisungen, wie du die in diesem Kapitel beschriebenen Schritte ausführst.

Gestaltung des Event Feeds

Nehmen wir an, dass wir einen Ereignis-Feed erstellen wollen, der nicht alle 100 Felder des BTS-Rohdatensatzes enthält, sondern nur die wenigen Felder, die wir in Kapitel 3 als relevant für das Problem der Flugverspätungsvorhersage ausgewählt haben (siehe Abbildung 4-1).

Abbildung 4-1. In Kapitel 3 haben wir in BigQuery eine Ansicht mit den Feldern erstellt, die für die Vorhersage von Flugverspätungen relevant sind. In diesem Kapitel werden wir einen Echtzeitstrom dieser Informationen simulieren.

Um einen Echtzeit-Stream der in Abbildung 4-1 gezeigten Fluginformationen zu simulieren, können wir zunächst die historischen Daten in der Ansicht flights in BigQuery verwenden, müssen sie aber weiter transformieren. Welche Arten von Transformationen werden benötigt?

Erforderliche Transformationen

Beachte, dass FL_DATE ein Date ist, während DEP_TIME ein STRING ist. Das liegt daran, dass FL_DATE in der Form 2015-07-03 für den 3. Juli 2015 ist, während DEP_DATE in der Form 1406 für 14:06 Uhr Ortszeit ist. Das ist unglücklich. Ich mache mir keine Sorgen über die Trennung von Datum und Uhrzeit in zwei Spalten - das können wir ändern. Bedauerlich ist nur, dass die Abfahrtszeit nicht mit einer Zeitzonenverschiebung verbunden ist. So kann in diesem Datensatz eine Abflugzeit von 1406 in verschiedenen Zeilen je nach Zeitzone des Ausgangsflughafens eine andere Zeit sein.

Die Zeitzonenverschiebungen (es gibt zwei, eine für den Start- und eine für den Zielflughafen) sind in den Daten nicht enthalten. Da die Zeitverschiebung vom Standort des Flughafens abhängt, müssen wir einen Datensatz finden, der die Zeitzonenverschiebung jedes Flughafens enthält, und dann diese Daten mit diesem Datensatz zusammenführen.1 Um die nachgelagerte Analyse zu vereinfachen, werden wir dann alle Zeiten in den Daten in eine gemeinsame Zeitzone einordnen - die koordinierte Weltzeit (UTC) ist die traditionelle Wahl für eine gemeinsame Zeitzone für Datensätze. Wir können jedoch nicht auf die Ortszeit verzichten - wir brauchen die Ortszeit, um Analysen durchzuführen, z. B. die typische Verspätung von Morgen- und Abendflügen. Obwohl wir die Ortszeit in UTC umwandeln, speichern wir auch die Zeitzonenverschiebung (z. B. -3.600 Minuten), um die Ortszeit bei Bedarf abrufen zu können.

Deshalb führen wir zwei Transformationen des Originaldatensatzes durch. Erstens konvertieren wir alle Zeitfelder im Rohdatensatz in UTC. Zweitens fügen wir zusätzlich zu den Feldern in den Rohdaten drei Felder für den Herkunftsflughafen und die gleichen drei Felder für den Zielflughafen hinzu: den Breitengrad, den Längengrad und die Zeitzonenverschiebung. Diese Felder werden benannt:

DEP_AIRPORT_LAT, DEP_AIRPORT_LON, DEP_AIRPORT_TZOFFSET
ARR_AIRPORT_LAT, ARR_AIRPORT_LON, ARR_AIRPORT_TZOFFSET

Die dritte Änderung, die wir vornehmen müssen, ist, dass wir für jede Zeile im historischen Datensatz mehrere Ereignisse veröffentlichen müssen. Denn wenn wir warten, bis das Flugzeug angekommen ist, um ein einziges Ereignis mit allen Zeilendaten zu veröffentlichen, wäre es zu spät. Wenn wir dies zum Zeitpunkt des Abflugs des Flugzeugs tun, verletzen unsere Modelle die Kausalitätsbeschränkungen. Stattdessen müssen wir für jeden Zustand, in dem sich das Flugzeug befindet, ein Ereignis aussenden. Wir entscheiden uns dafür, fünf Ereignisse für jeden Flug zu senden: wenn der Flug zum ersten Mal geplant wird, wenn der Flug das Gate verlässt, wenn der Flug abhebt, wenn der Flug landet und wenn der Flug ankommt. Diese fünf Ereignisse können nicht alle mit denselben Daten verknüpft werden, weil sich die Erkennbarkeit der Spalten während des Fluges ändert. Wenn wir zum Beispiel ein Ereignis zur Abflugzeit senden, kennen wir die Ankunftszeit nicht. Der Einfachheit halber können wir dieselbe Struktur melden, aber wir müssen sicherstellen, dass nicht wissbare Daten durch ein null und nicht durch den tatsächlichen Datenwert gekennzeichnet sind.

Architektur

Tabelle 4-1 zeigt, wann diese Ereignisse gesendet werden können und welche Felder in jedem Ereignis enthalten sind.

Tabelle 4-1. Felder, die in jedem der fünf zu veröffentlichenden Ereignisse enthalten sein werden. Vergleiche die Reihenfolge der Felder mit den Feldern im Schema in Abbildung 4-1.
Veranstaltung Gesendet um (UTC) In der Ereignismeldung enthaltene Felder
Geplant CRS_DEP_TIME minus 7 Tage FL_DATE, UNIQUE_CARRIER, ORIGIN_AIRPORT_SEQ_ID, ORIGIN, DEST_AIRPORT_SEQ_ID, DEST, CRS_DEP_TIME [nulls], CRS_ARR_TIME [nulls], DISTANCE
Abgereist DEP_TIME Alle Felder, die in der geplanten Nachricht verfügbar sind, plus:
  • DEP_TIME, DEP_DELAY CANCELLED
  • CANCELLATION_CODE
  • DEP_AIRPORT_LAT, DEP_AIRPORT_LON, DEP_AIRPORT_TZOFFSET
Wheelsoff WHEELS_OFF Alle Felder, die in der Abwesenheitsnachricht verfügbar sind, plus: TAXI_OUT und WHEELS_OFF
Wheelson WHEELS_ON Alle Felder, die in der Abrollnachricht verfügbar sind, plus:
  • WHEELS_ON
  • DIVERTED
  • ARR_AIRPORT_LAT, ARR_AIRPORT_LON, ARR_AIRPORT_TZOFFSET
Angekommen ARR_TIME Alle Felder, die in der wheelson-Nachricht verfügbar sind, plus: ARR_TIME und ARR_DELAY

Wir führen die erforderlichen Umwandlungen durch und speichern die umgewandelten Daten dann in einer Datenbank, damit sie für den Ereignissimulationscode zur Verfügung stehen. Abbildung 4-2 zeigt die Schritte, die wir in unserer ETL-Pipeline (Extract-Transform-Load) durchführen werden, sowie die nachfolgenden Schritte, um aus diesen Ereignissen einen Ereignisstrom zu simulieren und anschließend ein Echtzeit-Dashboard aus dem simulierten Ereignisstrom zu erstellen.

Abbildung 4-2. Schritte in unserer ETL-Pipeline (extract-transform-load), um (a) die Rohdaten in Ereignisse umzuwandeln, (b) den Ereignisstrom zu simulieren und (c) den Ereignisstrom zu verarbeiten, um ein Echtzeit-Dashboard zu füllen.

Informationen zum Flughafen erhalten

Um die Zeitkorrektur vorzunehmen, brauchen wir den Breiten- und Längengrad jedes Flughafens. Der BTS verfügt über einen Datensatz, der diese Informationen enthält und den wir für die Suche verwenden können. Der Einfachheit halber habe ich die Daten heruntergeladen und sie unter gs://data-science-on-gcp/edition2/raw/airports.csv öffentlich zugänglich gemacht.

Untersuchen wir die Daten, um herauszufinden, wie wir die Längen- und Breitengrade der Flughäfen erhalten. In Kapitel 2, als ich die Daten von flights untersuchen musste, um das erste Verspätungsmodell zu erstellen, habe ich die Daten in BigQuery geladen.

Müssen wir alle Daten, die uns zur Verfügung gestellt werden, in unser BigQuery-Dataset importieren, um sie zu untersuchen? Nein, natürlich nicht. Wir können BigQuery-Datensätze in anderen Projekten abfragen, ohne dass wir unsere eigenen Kopien der Daten erstellen müssen. In der FROM Klausel der BigQuery-Abfrage müssen wir nur den Namen des Projekts angeben, in dem sich das Dataset befindet:

SELECT 
  airline,
  AVG(departure_delay) AS avg_dep_delay
 FROM `bigquery-samples.airline_ontime_data.flights`
 GROUP BY airline
 ORDER by avg_dep_delay DESC

Was aber, wenn uns jemand eine CSV-Datei (Comma-Separated Values) zur Verfügung stellt? Müssen wir die Daten in BigQuery laden, um zu sehen, was in der Datei steht? Nein.

BigQuery ermöglicht die Abfrage von Daten in der Cloud Speicherung durch seine föderierten Abfragefunktionen. Das ist die Fähigkeit von BigQuery, Daten abzufragen, die nicht innerhalb des Data Warehouse-Produkts gespeichert sind, sondern auf Datenquellen wie Google Sheets (eine Tabellenkalkulation auf Google Drive) oder Dateien auf Cloud Storage zugreifen. So könnten wir die Dateien als CSV in der Cloud Speicherung belassen, eine Tabellenstruktur darauf definieren und die CSV-Dateien direkt abfragen. Erinnere dich daran, dass wir die Verwendung von Cloud Storage vorgeschlagen haben, wenn dein Hauptanalyseverfahren die Arbeit mit deinen Daten auf der Ebene von Flat Files beinhaltet.

Der erste Schritt besteht darin, das Schema dieser Dateien zu ermitteln. Schauen wir uns die erste Zeile an:

gsutil cat gs://data-science-on-gcp/edition2/raw/airports.csv | head -1

Wir bekommen:

"AIRPORT_SEQ_ID","AIRPORT_ID","AIRPORT","DISPLAY_AIRPORT_NAME",
"DISPLAY_AIRPORT_CITY_NAME_FULL","AIRPORT_WAC_SEQ_ID2","AIRPORT_WAC",
"AIRPORT_COUNTRY_NAME","AIRPORT_COUNTRY_CODE_ISO","AIRPORT_STATE_NAME",
"AIRPORT_STATE_CODE","AIRPORT_STATE_FIPS","CITY_MARKET_SEQ_ID","CITY_MARKET_ID",
"DISPLAY_CITY_MARKET_NAME_FULL","CITY_MARKET_WAC_SEQ_ID2","CITY_MARKET_WAC",
"LAT_DEGREES","LAT_HEMISPHERE","LAT_MINUTES","LAT_SECONDS","LATITUDE",
"LON_DEGREES","LON_HEMISPHERE","LON_MINUTES","LON_SECONDS","LONGITUDE",
"UTC_LOCAL_TIME_VARIATION","AIRPORT_START_DATE","AIRPORT_THRU_DATE",
"AIRPORT_IS_CLOSED","AIRPORT_IS_LATEST"

CAST Verwende diese Kopfzeile, um einen BigQuery-Schema-String des Formats zu schreiben (gib STRING für jede Spalte an, bei der du dir nicht sicher bist, da du sie bei der Abfrage der Daten jederzeit in das passende Format umwandeln kannst):

AIRPORT_SEQ_ID:INTEGER,AIRPORT_ID:STRING,AIRPORT:STRING, ...

Wenn du einen ähnlichen Datensatz herumliegen hast, kannst du auch mit dessen Schema beginnen und es bearbeiten:

bq show --format=prettyjson dsongcp.sometable > starter.json

Sobald wir das Schema der GCS-Dateien haben, können wir eine Tabellendefinition für die föderierte Quelle erstellen:2

bq mk --external_table_definition= \
./airport_schema.json@CSV=gs://data-science-on-gcp/edition2/raw/airports.csv \
dsongcp.airports_gcs

Wenn du jetzt die BigQuery-Webkonsole aufrufst, solltest du eine neue Tabelle im dsongcp Dataset sehen (lade die Seite ggf. neu). Dabei handelt es sich um eine föderierte Datenquelle, deren Speicherung in der CSV-Datei auf Cloud Storage erfolgt. Du kannst sie jedoch wie jede andere BigQuery-Tabelle abfragen:

SELECT 
AIRPORT_SEQ_ID, AIRPORT_ID, AIRPORT, DISPLAY_AIRPORT_NAME,
LAT_DEGREES, LAT_HEMISPHERE, LAT_MINUTES, LAT_SECONDS, LATITUDE
FROM dsongcp.airports_gcs
WHERE DISPLAY_AIRPORT_NAME LIKE '%Seattle%'

In der vorangegangenen Abfrage versuche ich, in der Datei herauszufinden, welche Spalte für den Flughafen und welche für den Breitengrad ich verwenden muss. Das Ergebnis zeigt, dass AIRPORT und LATITUDE die Spalten sind, die mich interessieren, aber dass es mehrere Zeilen gibt, die dem Flughafen SEA entsprechen:

Row AIRPORT_​SEQ_​ID AIRPORT_​ID AIRPORT DISPLAY_​AIRPORT_​NAME LAT_​DEGREES LAT_​HEMISPHERE LAT_​MINUTES LAT_​SECONDS LATITUDE
1 1247701 12477 JFB Seattle 1st National.Bank Helipad 47 N 36 25 47.60694444
2 1474701 14747 SEA Seattle International 47 N 26 50 47.44722222
3 1474702 14747 SEA Seattle/Tacoma International 47 N 26 57 47.44916667
4 1474703 14747 SEA Seattle/Tacoma International 47 N 27 0 47.45

Zum Glück gibt es eine Spalte, die anzeigt, welche Zeile die neueste Information ist:

SELECT 
  AIRPORT, LATITUDE, LONGITUDE
FROM dsongcp.airports_gcs
WHERE AIRPORT_IS_LATEST = 1 AND AIRPORT = 'DFW'

Lass dich aber nicht von föderierten Abfragen hinreißen. Am besten eignen sich föderierte Quellen für häufig wechselnde, relativ kleine Datensätze, die mit großen Datensätzen in nativen BigQuery-Tabellen verbunden werden müssen. Da die spaltenbasierte Speicherung in BigQuery so wichtig für die Leistung ist, werden wir die meisten Daten in das native Format von BigQuery laden.

Daten teilen

Jetzt, wo wir die airports.csv in der Cloud Speicherung und den Datensatz der Flughäfen in BigQuery haben, ist es sehr wahrscheinlich, dass unsere Kollegen diese Daten auch nutzen wollen. Einer der Vorteile, wenn du deine Daten in die Cloud (oder genauer gesagt in ein Data Warehouse) bringst, ist, dass du Datensätze über Unternehmensgrenzen hinweg zusammenführen kannst. Wenn es also keinen triftigen Grund gibt, der dagegen spricht, wie z. B. Sicherheitsvorkehrungen, solltest du versuchen, deine Daten weithin zugänglich zu machen.

Die Kosten für die Abfrage werden von der Person getragen, die die Abfrage an die BigQuery-Engine stellt. Du brauchst dir also keine Sorgen zu machen, dass dir dadurch zusätzliche Kosten für deine Abteilung entstehen. Es ist möglich, einen GCS-Bucket "requester-pays" zu machen, um die gleiche Art der Abrechnungstrennung für Daten in der Cloud Speicherung zu erhalten.

Einen Datensatz der Cloud Speicherung teilen

Um einige Daten in der Cloud Speicherung freizugeben, verwende gsutil:

gsutil -m acl ch -r -u abc@xyz.com:R gs://$BUCKET/data

In dem vorangegangenen Befehl gibt das -m den Multithreading-Modus an, das -r ermöglicht den Zugriff rekursiv, beginnend mit dem angegebenen Top-Level-Verzeichnis, und das -u zeigt an, dass es sich um einen Benutzer handelt, dem Lese (:R) Zugriff erhält.

Wir könnten der gesamten Organisation oder einer Google-Gruppe Lesezugriff gewähren, indem wir -g:

gsutil -m acl ch -r -g xyz.com:R gs://$BUCKET/data

Freigabe eines BigQuery-Datensatzes

Die Freigabe von BigQuery kann auf der Granularität einer Spalte, einer Tabelle oder eines Datensatzes erfolgen. Keine unserer BigQuery-Tabellen enthält persönlich identifizierbare oder vertrauliche Informationen. Daher gibt es keinen zwingenden Grund für die Zugriffskontrolle, die Weitergabe von Flugdaten auf Spalten- oder Tabellenebene zu kontrollieren. Wir können also den in Kapitel 2 erstellten Datensatz dsongcp freigeben und alle Mitarbeiter des Unternehmens, die an diesem Projekt arbeiten, zu bigquery.user machen, damit sie Abfragen auf diesem Datensatz durchführen können. Das kannst du in der BigQuery-Webkonsole über das Menü Dataset tun.

In manchen Fällen kann es vorkommen, dass dein Datensatz oder deine Tabelle bestimmte Spalten enthält, die persönliche oder vertrauliche Informationen enthalten. Möglicherweise musst du den Zugriff auf diese Spalten einschränken, während der Rest der Tabelle für ein breiteres Publikum zugänglich bleibt.3 Wenn du den Zugriff auf eine Teilmenge einer Tabelle in BigQuery ermöglichen musst (egal, ob es sich um bestimmte Spalten oder bestimmte Zeilen handelt), kannst du Views verwenden. Lege die Tabelle selbst in einem Dataset ab, das nur einer sehr kleinen Gruppe von Nutzern zugänglich ist. Dann erstellst du eine Ansicht auf diese Tabelle, die die relevanten Spalten und Zeilen herauszieht, und speicherst diese Ansicht in einem separaten Dataset, das für eine größere Gruppe von Nutzern zugänglich ist. Deine Nutzer/innen werden nur diese Ansicht abfragen, und da die personenbezogenen oder vertraulichen Informationen nicht einmal in der Ansicht enthalten sind, ist die Wahrscheinlichkeit eines unbeabsichtigten Datenverlusts geringer.

Eine weitere Möglichkeit, den Zugriff auf der Ebene einer BigQuery-Tabelle zu beschränken, ist die Verwendung von Cloud IAM. Um den Zugriff auf der Ebene einer Spalte zu kontrollieren, verwendest du Richtlinien-Tags und den Data Catalog.

Dataplex und Analytics Hub

Sobald du dir angewöhnt hast, Daten in großem Umfang zu teilen, kann die Verwaltung problematisch werden. Es ist besser, wenn du die Daten in der gesamten Cloud Speicherung einheitlich verwalten und die Herkunft der Daten verfolgen kannst. Dafür gibt es Dataplex.

Es kann ziemlich mühsam sein, Tabellen und Datensätze einzeln mit einem Nutzer oder einer Gruppe zu teilen. Um die Freigabe in großem Umfang zu implementieren und Statistiken darüber zu erhalten, wie Personen die von dir freigegebenen Daten nutzen, verwende Analytics Hub.

Zeitkorrektur

Das Korrigieren der in Ortszeit gemeldeten Zeiten in UTC ist kein einfaches Unterfangen. Es gibt mehrere Schritte:

  • Die Ortszeit hängt von dem Ort ab, an dem du dich befindest. Die Flugdaten, die uns vorliegen, enthalten nur den Namen des Flughafens (z.B. ALB für Albany). Wir müssen also den Längen- und Breitengrad anhand eines Flughafencodes ermitteln. Die BTS verfügt über einen Datensatz, der diese Informationen enthält und den wir für die Suche verwenden können.

  • Bei einem Paar aus Breiten- und Längengrad müssen wir die Zeitzone auf einer Karte der globalen Zeitzonen nachschlagen. Wenn wir zum Beispiel den Breiten- und Längengrad des Flughafens in Albany angeben, müssen wir auf America/New_York zurückkommen. Es gibt mehrere Webdienste, die dies tun, aber das Python-Paket timezonefinder ist eine effizientere Option, weil es komplett offline arbeitet. Der Nachteil ist, dass dieses Paket keine ozeanischen Gebiete und einige historische Zeitzonenänderungen verarbeiten kann,4 aber das ist ein Kompromiss, den wir vorerst eingehen können.

  • Die Zeitzonenverschiebung (gegenüber der Greenwich Mean Time [GMT/UTC]) an einem Ort ändert sich im Laufe des Jahres aufgrund von Sommerzeitkorrekturen. In New York zum Beispiel liegt sie im Sommer sechs Stunden und im Winter fünf Stunden hinter der UTC. Mit der Zeitzone (America/New_York) brauchen wir also auch das örtliche Abreisedatum und die Uhrzeit (z.B. 13. Januar 2015, 14:08 Uhr), um die entsprechende Zeitzonenverschiebung zu finden. Das Python-Paket pytz bietet diese Möglichkeit, indem es das zugrunde liegende Betriebssystem nutzt.

Das Problem der uneindeutigen Zeiten bleibt bestehen - jeder Zeitpunkt zwischen 01:00 und 02:00 Uhr Ortszeit tritt zweimal an dem Tag auf, an dem die Uhr von Sommerzeit auf Winterzeit umgestellt wird. Wenn unser Datensatz also einen Flug enthält, der um 01:30 Uhr ankommt, müssen wir entscheiden, welche Zeit das ist. In einer realen Situation würdest du dir die typische Dauer des Fluges ansehen und diejenige wählen, die am wahrscheinlichsten ist. In diesem Buch gehe ich immer von der Winterzeit aus ( is_dst ist False), weil dies die Standardzeitzone für diesen Ort ist.

Die Komplexität dieser Schritte sollte dich hoffentlich davon überzeugen, bei der Speicherung von Zeit bewährte Methoden anzuwenden.

Apache Beam/Cloud Dataflow

Der klassische Weg, um Datenpipelines auf der Google Cloud Platform zu erstellen, ist die Verwendung von Cloud Dataflow. Cloud Dataflow ist eine Externalisierung der Technologien Flume und MillWheel, die bei Google schon seit einigen Jahren weit verbreitet sind. Es verwendet ein Programmiermodell, das sowohl Batch- als auch Streaming-Daten auf einheitliche Weise verarbeitet und so die Möglichkeit bietet, dieselbe Codebasis sowohl für die Batch- als auch für die kontinuierliche Stream-Verarbeitung zu nutzen. Der Code selbst wird in Apache Beam geschrieben, entweder in Java, Python oder Go,5 und ist portabel, d. h. er kann in verschiedenen Ausführungsumgebungen ausgeführt werden, darunter Apache Flink und Apache Spark. Auf GCP bietet Cloud Dataflow einen vollständig verwalteten (serverlosen) Dienst, der Beam-Pipelines ausführen kann. Die Ressourcen werden bei Bedarf zugewiesen und skalieren automatisch, um sowohl minimale Latenzzeiten als auch eine hohe Ressourcenauslastung zu erreichen.

Bei der Beam-Programmierung wird eine Pipeline (eine Reihe von Datenumwandlungen) erstellt, die an einen Runner übergeben wird. Der Runner baut einen Graphen auf und leitet die Daten dann durch ihn hindurch. Jeder Eingangsdatensatz kommt von einer Quelle und jeder Ausgangsdatensatz wird an eine Senke gesendet. Abbildung 4-3 zeigt die Beam-Pipeline, die wir gleich aufbauen werden.

Vergleiche die Schritte in Abbildung 4-2 mit dem Blockdiagramm der ETL-Pipeline (extract-transform-load) in Abbildung 4-3. Lass uns die Datenpipeline Stück für Stück aufbauen.

Abbildung 4-3. Die Dataflow-Pipeline, die wir jetzt aufbauen werden.

Parsing von Flughafendaten

Du kannst Informationen über die Lage der Flughäfen von der BTS-Website herunterladen. Ich habe alle Felder ausgewählt, die CSV-Datei auf meine lokale Festplatte heruntergeladen, sie extrahiert und mit gzip komprimiert. Die gepackte Flughafendatei ist im GitHub-Repository für dieses Buch verfügbar.

Um Apache Beam von der Cloud Shell aus nutzen zu können, müssen wir es in unserer Python-Umgebung installieren. Installiere jetzt auch die Zeitzonenpakete:6

virtualenv ~/beam_env
source ~/beam_env/bin/activate
python3 -m pip install --upgrade \
             timezonefinder pytz \
             'apache-beam[gcp]'

Die Read Transformation in der folgenden Beam-Pipeline liest die Flughafendatei Zeile für Zeile ein:7

with beam.Pipeline('DirectRunner') as pipeline:
   airports = (pipeline
      | beam.io.ReadFromText('airports.csv.gz')
      | beam.Map(lambda line: next(csv.reader([line])))
      | beam.Map(lambda fields: (fields[0], (fields[21], fields[26])))
   )

Nehmen wir zum Beispiel an, dass eine der Eingabezeilen, die aus der Textdatei source ausgelesen wird, die folgende ist:

1000401,10004,"04A","Lik Mining Camp","Lik, AK",101,1,"United
States","US","Alaska","AK","02",3000401,30004,"Lik,
AK",101,1,68,"N",5,0,68.08333333,163,"W",10,0,-163.16666667,"",2007-07-01,,0,1,

Die erste Map nimmt diese Zeile und gibt sie an einen CSV-Leser weiter, der sie parst (und dabei Felder wie Lik, AK berücksichtigt, die Kommas enthalten) und die Felder als eine Liste von Strings herauszieht. Diese Felder werden dann an die nächste Transformation weitergegeben. Die zweite Map nimmt die Felder als Eingabe und gibt ein Tupel der Form aus (die extrahierten Felder sind im vorherigen Beispiel fett gedruckt):

(1000401, (68.08333333,-163.16666667))

Die erste Zahl ist der eindeutige Flughafencode (wir verwenden diesen anstelle des dreibuchstabigen Codes des Flughafens, weil sich die Lage des Flughafens im Laufe der Zeit ändern kann), und die nächsten beiden Zahlen sind das Paar aus Breiten- und Längengrad für die Lage des Flughafens. Die Variable airports, die das Ergebnis dieser drei Transformationen ist, ist keine einfache speicherinterne Liste dieser Tupel. Stattdessen handelt es sich um eine unveränderliche Sammlung, die PCollection genannt wird, und die du aus dem Speicher nehmen und verteilen kannst.

Wir können den Inhalt von PCollection in eine Textdatei schreiben, um zu überprüfen, ob sich die Pipeline richtig verhält:

(airports
   | beam.Map(lambda airport_data: '{},{}'.format(airport_data[0], ',' \
       .join(airport_data[1])) )
   | beam.io.WriteToText('extracted_airports')
)

Probiere es aus: Der Code in 04_streaming/transform/df01.py ist einfach ein Python-Programm, das du von der Kommandozeile aus starten kannst. Installiere zuerst das Apache Beam-Paket, falls du das noch nicht getan hast, und führe dann das Programm df01.py aus, während du dich in dem Verzeichnis befindest, das das GitHub-Repository dieses Buches enthält:

cd 04_streaming/simulate
./install_packages.sh
python3 ./df01.py

Dadurch wird der Code in df01.py lokal ausgeführt. Später werden wir die Pipeline-Zeile ändern in:

with beam.Pipeline('DataflowRunner') as pipeline:

und die Pipeline auf der Google Cloud Platform mit dem Cloud Dataflow Service laufen lassen. Mit dieser Änderung wird die Datenpipeline einfach durch die Ausführung des Python-Programms auf mehreren Workern in der Cloud gestartet. Wie bei vielen verteilten Systemen wird die Ausgabe von Cloud Dataflow potenziell in eine oder mehrere Dateien gesplittet. Du erhältst eine Datei, deren Name mit "extracted_airports" beginnt (bei mir war es extracted_airports-00000-of-00001), von der einige Zeilen etwa so aussehen könnten:

1000101,58.10944444,-152.90666667
1000301,65.54805556,-161.07166667

Die Spalten sind AIRPORT_SEQ_ID, LATITUDE und LONGITUDE- die Reihenfolge der Zeilen hängt davon ab, welcher der parallelen Arbeiter zuerst fertig geworden ist, sie kann also unterschiedlich sein.

Hinzufügen von Zeitzoneninformationen

Ändern wir nun den Code, um die Zeitzone zu bestimmen, die einem Breitengrad/Längengrad-Paar entspricht. In unserer Pipeline geben wir nicht einfach das Paar Breitengrad/Längengrad aus, sondern eine Liste mit drei Elementen: Breitengrad, Längengrad und Zeitzone:

airports = (pipeline
      | beam.Read(beam.io.ReadFromText('airports.csv.gz'))
      | beam.Map(lambda line: next(csv.reader([line])))
      | beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26])))
   )

Das Schlüsselwort lambda in Python setzt eine anonyme Funktion ein. Im Fall der ersten Verwendung von lambda im vorangegangenen Schnipsel nimmt diese Methode einen Parameter (line) und gibt das zurück, was auf den Doppelpunkt folgt. Wir können die Zeitzone mit Hilfe des Pakets timezonefinder bestimmen:8

def addtimezone(lat, lon):
      import timezonefinder
      tf = timezonefinder.TimezoneFinder()
      lat = float(lat)
      lon = float(lon)
      return (lat, lon, tf.timezone_at(lng=lon, lat=lat))

Die Position der Import-Anweisung im vorangegangenen Beispiel mag seltsam aussehen (die meisten Python-Importe befinden sich am Anfang der Datei), aber dieses Import-innerhalb-der-Funktion-Muster wird von Cloud Dataflow empfohlen, damit,9 damit bei der Übermittlung an die Cloud nicht auch die importierten Pakete in die Hauptsitzung importiert werden.10

Für den Moment werden wir diese Datei(df02.py) jedoch lokal ausführen. Das wird eine Weile dauern, weil die Berechnung der Zeitzone eine große Anzahl von Polygonüberschneidungen erfordert und weil wir sie lokal ausführen und (noch!) nicht in der Cloud verteilen. Um die Berechnung zu beschleunigen, fügen wir einen Filter hinzu, um die Anzahl der Flughäfen zu reduzieren, die wir nachschlagen müssen:

 | beam.io.ReadFromText('airports.csv.gz')
 | beam.Filter(lambda line: "United States" in line 
                              and line[-2:] == '1,')

Die BTS-Flugverspätungsdaten beziehen sich nur auf Inlandsflüge in den USA, daher brauchen wir die Zeitzonen von Flughäfen außerhalb der Vereinigten Staaten nicht. Der Grund für die zweite Überprüfung ist, dass sich die Standorte der Flughäfen im Laufe der Zeit ändern, wir uns aber nur für den aktuellen Standort des Flughafens interessieren. Hier sind zum Beispiel die Flughafenstandorte für ORD (oder Chicago):

1393001,...,"ORD",...,41.97805556,...,-87.90611111,...,1950-01-01,2011-06-30,0,0,
1393002,...,"ORD",...,41.98166667,...,-87.90666667,...,2011-07-01,2013-09-30,0,0,
1393003,...,"ORD",...,41.97944444,...,-87.90750000,...,2013-10-01,2015-09-30,0,0,
1393004,...,"ORD",...,41.97722222,...,-87.90805556,...,2015-10-01,,0,1,

Die erste Reihe erfasst die Lage des Flughafens von Chicago zwischen 1950 und dem 30. Juni 2011.11 Die zweite Zeile ist vom 1. Juli 2011 bis zum 30. September 2013 gültig. Die letzte Zeile hingegen ist der aktuelle Standort, was dadurch gekennzeichnet ist, dass die letzte Spalte (das Feld AIRPORT_IS_LATEST ) eine 1 ist.

Das ist aber nicht die einzige Zeile, die uns interessiert! Bei Flügen vor dem 01.10.2015 wird die ID der vorletzten Zeile gemeldet. Wir könnten eine Prüfung dafür einbauen, aber das sieht für eine kleine Optimierung ziemlich riskant aus. Ich entferne also diese letzte Prüfung, so dass wir nur noch:

 | beam.io.ReadFromText('airports.csv.gz')
 | beam.Filter(lambda line: "United States" in line)

Wenn ich dies tue und df02.py ausführe, sehen die extrahierten Informationen für die Flughäfen wie folgt aus:

1672301,62.03611111,-151.45222222,America/Anchorage
1672401,43.87722222,-73.41305556,America/New_York
1672501,40.75722222,-119.21277778,America/Los_Angeles

Die letzte Spalte der extrahierten Informationen enthält die Zeitzone, die aus dem Breiten- und Längengrad jedes Flughafens ermittelt wurde.

Zeiten in UTC umwandeln

Jetzt, da wir die Zeitzone für jeden Flughafen kennen, können wir uns daran machen, die Zeiten in den Daten von flights in UTC umzuwandeln. Zu dem Zeitpunkt, an dem wir das Programm entwickeln, möchten wir nicht alle Monate, die wir in BigQuery haben, verarbeiten - das Warten auf die Abfrage bei jedem Programmstart wäre lästig. Stattdessen werden wir eine kleine Stichprobe der flights Daten in BigQuery erstellen, mit denen wir unseren Code entwickeln:12

SELECT *
FROM dsongcp.flights
WHERE RAND() < 0.001

Dies ergibt etwa 6.000 Zeilen. Wir können die BigQuery-Web-UI verwenden, um die Ergebnisse als JSON-Datei (JavaScript Object Notation) zu speichern. Ich ziehe es jedoch vor, die Dinge per Skript zu erledigen:13

bq query --destination_table dsongcp.flights_sample \
   --replace --nouse_legacy_sql \
   'SELECT * FROM dsongcp.flights WHERE RAND() < 0.001'

bq extract --destination_format=NEWLINE_DELIMITED_JSON \
   dsongcp.flights_sample  \
   gs://${BUCKET}/flights/ch4/flights_sample.json

gsutil cp gs://${BUCKET}/flights/ch4/flights_sample.json

Dadurch wird eine Datei mit dem Namen flight_sample.json erstellt, von der eine Zeile ähnlich aussieht wie diese:

{"FL_DATE":"2015-04-28","UNIQUE_CARRIER":"EV","ORIGIN_AIRPORT_SEQ_ID":"1013503",
"ORIGIN":"ABE","DEST_AIRPORT_SEQ_ID":"1039705","DEST":"ATL",
"CRS_DEP_TIME":"1600","DEP_TIME":"1555","DEP_DELAY":-5,"TAXI_OUT":7,
"WHEELS_OFF":"1602","WHEELS_ON":"1747","TAXI_IN":4,"CRS_ARR_TIME":"1809",
"ARR_TIME":"1751","ARR_DELAY":-18,"CANCELLED":false,"DIVERTED":false,
"DISTANCE":"692.00"}

Das Lesen der Flugdaten beginnt ähnlich wie das Lesen der Flughafendaten:14

flights = (pipeline
 | 'flights:read' >> beam.io.ReadFromText('flights_sample.json')
 | 'flights:parse' >> beam.Map(lambda line: json.loads(line))

Dies ist derselbe Code wie beim Einlesen der Datei airports.csv.gz, nur dass ich diesem Transformationsschritt auch einen Namen gebe (flights:read) und einen JSON-Parser statt eines CSV-Parsers verwende. Beachte die Syntax hier:

 | 'name-of-step' >> transform_function()

Der nächste Schritt ist jedoch anders, da er zwei PCollections beinhaltet. Wir müssen die Flugdaten mit den Flughafendaten verknüpfen, um die Zeitzone für jeden Flug zu finden. Dazu machen wir die Flughäfen PCollection zu einem "Side-Input". Side-Inputs in Beam sind wie Sichten auf das Original PCollection und sind entweder Listen oder Dicts (Wörterbücher). In diesem Fall erstellen wir ein Diktat, das die Flughafen-ID den Informationen über die Flughäfen zuordnet:

flights = (pipeline
 |'flights:read' >> beam.io.ReadFromText('flights_sample.json')
 | 'flights:parse' >> beam.Map(lambda line: json.loads(line))
 |'flights:tzcorr' >> beam.FlatMap(tz_correct, 
                                   beam.pvalue.AsDict(airports))
)

Die Tatsache, dass PCollection eine Python-Liste oder ein Python-Dict sein muss, bedeutet, dass die Seiteneingaben klein genug sein müssen, um in den Speicher zu passen. Wenn du zwei große PCollections verbinden musst, die nicht in den Speicher passen, verwende eine CoGroupByKey.

Die Methode FlatMap() ruft eine Methode tz_correct() auf, die den geparsten Inhalt einer Zeile aus flights_sample.json (mit den Informationen zu einem einzelnen Flug) und ein Python-Wörterbuch (mit allen Zeitzoneninformationen der Flughäfen) entgegennimmt:

def tz_correct(fields, airport_timezones):
   try:
      # convert all times to UTC
      # ORIGIN_AIRPORT_SEQ_ID is the name of JSON attribute
      dep_airport_id = fields["ORIGIN_AIRPORT_SEQ_ID"]
      arr_airport_id = fields["DEST_AIRPORT_SEQ_ID"]
      # airport_id is the key to airport_timezones dict
      # and the value is a tuple (lat, lon, timezone)
      dep_timezone = airport_timezones[dep_airport_id][2]
      arr_timezone = airport_timezones[arr_airport_id][2]

      for f in ["CRS_DEP_TIME", "DEP_TIME", "WHEELS_OFF"]:
         fields[f] = as_utc(fields["FL_DATE"], fields[f], dep_timezone)
      for f in ["WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
         fields[f] = as_utc(fields["FL_DATE"], fields[f], arr_timezone)

      yield json.dumps(fields)
   except KeyError as e:
      logging.exception(" Ignoring " + line + 
                        " because airport is not known")

Warum FlatMap() anstelle von Map, um tz_correct() aufzurufen? Eine Map ist eine 1-zu-1-Beziehung zwischen Eingabe und Ausgabe, während eine FlatMap() 0-N Ausgaben pro Eingabe zurückgeben kann. Dies geschieht mit einer Python-Generatorfunktion (d.h. mit dem Schlüsselwort yield - stell dir yield als Rückgabe vor, die ein Element nach dem anderen zurückgibt, bis keine Daten mehr zurückgegeben werden können). Die Verwendung von FlatMap ermöglicht es uns, Fluginformationen zu unbekannten Flughäfen zu ignorieren - auch wenn dies in den historischen Daten, die wir verarbeiten, nicht vorkommt, kann ein bisschen defensive Programmierung nicht schaden.

Der Code tz_correct() holt sich die ID des Abflughafens aus den Flugdaten und sucht dann die Zeitzone für diese Flughafen-ID aus den Flughafendaten heraus. Nachdem er die Zeitzone ermittelt hat, ruft er die Methode as_utc() auf, um jede der in der Zeitzone des Flughafens gemeldeten Uhrzeiten in UTC umzuwandeln:

def as_utc(date, hhmm, tzone):
   try:
      if len(hhmm) > 0 and tzone is not None:
         import datetime, pytz
         loc_tz = pytz.timezone(tzone)
         loc_dt = loc_tz.localize(datetime.datetime.strptime(date,'%Y-%m-%d'),
                                  is_dst=False)
         loc_dt += datetime.timedelta(hours=int(hhmm[:2]),
                                      minutes=int(hhmm[2:]))
         utc_dt = loc_dt.astimezone(pytz.utc)
         return utc_dt.strftime('%Y-%m-%d %H:%M:%S')
      else:
         return '' # empty string corresponds to canceled flights
   except ValueError as e:
      print('{} {} {}'.format(date, hhmm, tzone))
      raise e

Wie bisher kannst du es lokal ausführen. Führe dazu df03.py aus. Eine Zeile, die ursprünglich (in den Rohdaten) so aussah:

{"FL_DATE":"2015-11-05","UNIQUE_CARRIER":"DL","ORIGIN_AIRPORT_SEQ_ID":"1013503",
"ORIGIN":"ABE","DEST_AIRPORT_SEQ_ID":"1039705","DEST":"ATL",
"CRS_DEP_TIME":"0600","DEP_TIME":"0556","DEP_DELAY":-4,"TAXI_OUT":12,
"WHEELS_OFF":"0608","WHEELS_ON":"0749","TAXI_IN":10,"CRS_ARR_TIME":"0818",
"ARR_TIME":"0759","ARR_DELAY":-19,"CANCELLED":false,
"DIVERTED":false,"DISTANCE":"692.00"}

wird jetzt:

{"FL_DATE": "2015-11-05", "UNIQUE_CARRIER": "DL", 
"ORIGIN_AIRPORT_SEQ_ID": "1013503", "ORIGIN": "ABE", 
"DEST_AIRPORT_SEQ_ID": "1039705", "DEST": "ATL", 
"CRS_DEP_TIME": "2015-11-05 11:00:00", "DEP_TIME": "2015-11-05 10:56:00", 
"DEP_DELAY": -4, "TAXI_OUT": 12, "WHEELS_OFF": "2015-11-05 11:08:00", 
"WHEELS_ON": "2015-11-05 12:49:00", "TAXI_IN": 10, 
"CRS_ARR_TIME": "2015-11-05 13:18:00", "ARR_TIME": "2015-11-05 12:59:00", 
"ARR_DELAY": -19, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "692.00"}

Alle Zeiten wurden in UTC umgerechnet. Zum Beispiel wurde die 0759 Ankunftszeit in Atlanta in UTC umgewandelt und lautet nun 12:59:00.

Daten korrigieren

Schau dir die folgende Zeile über einen Flug von Honolulu (HNL) nach Dallas-Fort Worth (DFW) genau an. Fällt dir etwas Merkwürdiges auf?

{"FL_DATE": "2015-03-06", "UNIQUE_CARRIER": "AA", 
"ORIGIN_AIRPORT_SEQ_ID": "1217302", "ORIGIN": "HNL", 
"DEST_AIRPORT_SEQ_ID": "1129803", "DEST": "DFW", 
"CRS_DEP_TIME": "2015-03-07 05:30:00", "DEP_TIME": "2015-03-07 05:22:00", 
"DEP_DELAY": -8, "TAXI_OUT": 40, "WHEELS_OFF": "2015-03-07 06:02:00", 
"WHEELS_ON": "2015-03-06 12:32:00", "TAXI_IN": 7, 
"CRS_ARR_TIME": "2015-03-06 12:54:00", "ARR_TIME": "2015-03-06 12:39:00", 
"ARR_DELAY": -15, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "3784.00"}

Schau dir die Abflugzeit in Honolulu und die Ankunftszeit in Dallas an - der Flug kommt am Tag vor dem Abflug an! Das liegt daran, dass das Flugdatum (2015-03-06) das Datum des Abflugs in Ortszeit ist. Wenn dann noch ein Zeitunterschied zwischen den Flughäfen hinzukommt, ist es gut möglich, dass es nicht das Ankunftsdatum ist. Wir suchen nach solchen Situationen und fügen bei Bedarf 24 Stunden hinzu. Das ist natürlich ein ziemlicher Hack (habe ich schon erwähnt, dass die Zeiten in UTC gespeichert werden sollten?!):

def add_24h_if_before(arrtime, deptime):
   import datetime
   if len(arrtime) > 0 and len(deptime) > 0 and arrtime < deptime:
      adt = datetime.datetime.strptime(arrtime, '%Y-%m-%d %H:%M:%S')
      adt += datetime.timedelta(hours=24)
      return adt.strftime('%Y-%m-%d %H:%M:%S')
   else:
      return arrtime

Der 24-Stunden-Hack wird kurz vor dem Ertrag in tz_correct aufgerufen.15 Jetzt, da wir neue Daten über die Flughäfen haben, ist es wahrscheinlich sinnvoll, sie zu unserem Datensatz hinzuzufügen. Außerdem wollen wir, wie bereits erwähnt, die Abweichung der Zeitzone von der UTC im Auge behalten, da für einige Analysen die Kenntnis der Ortszeit erforderlich sein könnte. Der neue Code tz_correct lautet also wie folgt:

def tz_correct(line, airport_timezones):
   fields = json.loads(line)
   try:
      # convert all times to UTC
      dep_airport_id = fields["ORIGIN_AIRPORT_SEQ_ID"]
      arr_airport_id = fields["DEST_AIRPORT_SEQ_ID"]
      dep_timezone = airport_timezones[dep_airport_id][2]
      arr_timezone = airport_timezones[arr_airport_id][2]

      for f in ["CRS_DEP_TIME", "DEP_TIME", "WHEELS_OFF"]:
         fields[f], deptz = as_utc(fields["FL_DATE"], fields[f], dep_timezone)
      for f in ["WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
         fields[f], arrtz = as_utc(fields["FL_DATE"], fields[f], arr_timezone)

      for f in ["WHEELS_OFF", "WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]:
         fields[f] = add_24h_if_before(fields[f], fields["DEP_TIME"])

      fields["DEP_AIRPORT_TZOFFSET"] = deptz
      fields["ARR_AIRPORT_TZOFFSET"] = arrtz
      yield json.dumps(fields)
   except KeyError as e:
      logging.exception(" Ignoring " + line + " because airport is not known")

Wenn ich df04.py ausführe, auf die diese Änderungen angewendet wurden, wird der Flug von Honolulu nach Dallas:

{"FL_DATE": "2015-03-06", "UNIQUE_CARRIER": "AA", 
"ORIGIN_AIRPORT_SEQ_ID": "1217302", "ORIGIN": "HNL", 
"DEST_AIRPORT_SEQ_ID": "1129803", "DEST": "DFW", 
"CRS_DEP_TIME": "2015-03-07 05:30:00", "DEP_TIME": "2015-03-07 05:22:00", 
"DEP_DELAY": -8, "TAXI_OUT": 40, "WHEELS_OFF": "2015-03-07 06:02:00", 
"WHEELS_ON": "2015-03-07 12:32:00", "TAXI_IN": 7, 
"CRS_ARR_TIME": "2015-03-07 12:54:00", "ARR_TIME": "2015-03-07 12:39:00", 
"ARR_DELAY": -15, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "3784.00", 
"DEP_AIRPORT_TZOFFSET": -36000.0, "ARR_AIRPORT_TZOFFSET": -21600.0}

Wie du sehen kannst, wurden die Daten jetzt korrigiert (siehe die fettgedruckten Teile).

Ereignisse schaffen

Nachdem wir unsere zeitkorrigierten Daten haben, können wir mit der Erstellung von Ereignissen für die Veröffentlichung in Pub/Sub fortfahren. Für den Moment beschränken wir uns auf die Nachrichten departed und arrived - wir können die Pipeline wiederholen, um die zusätzlichen Ereignisse zu erstellen, wenn wir bei der Modellierung andere Ereignisse verwenden:

def get_next_event(fields):
    if len(fields["DEP_TIME"]) > 0:
       event = dict(fields)  # copy
       event["EVENT_TYPE"] = "departed"
       event["EVENT_TIME"] = fields["DEP_TIME"]
       for f in ["TAXI_OUT", "WHEELS_OFF", "WHEELS_ON", 
                 "TAXI_IN", "ARR_TIME", "ARR_DELAY", "DISTANCE"]:
          event.pop(f, None)  # not knowable at departure time
       yield event
    if len(fields["ARR_TIME"]) > 0:
       event = dict(fields)
       event["EVENT_TYPE"] = "arrived"
       event["EVENT_TIME"] = fields["ARR_TIME"]
       yield event

Im Wesentlichen nehmen wir die Abfahrtszeit und erstellen ein departed Ereignis zu dieser Zeit, nachdem wir sichergestellt haben, dass wir die Felder (wie die Ankunftsverspätung), die wir zur Abfahrtszeit nicht kennen können, entfernen. In ähnlicher Weise verwenden wir die Ankunftszeit, um ein arrived Ereignis zu erstellen, wie in Abbildung 4-4 dargestellt.

In der Pipeline wird der Code zur Ereigniserzeugung auf flights PCollection aufgerufen, nachdem die Umstellung auf UTC erfolgt ist:

flights = (pipeline
  |'flights:read' >> beam.io.ReadFromText('flights_sample.json')
  |'flights:tzcorr' >> beam.FlatMap(tz_correct,
                           beam.pvalue.AsDict(airports))
)
events = flights | beam.FlatMap(get_next_event)
Abbildung 4-4. Ereignisse, wann sie veröffentlicht werden und einige der Felder, die in diesen Ereignissen enthalten sind.

Wenn wir nun die Pipeline laufen lassen,16 werden wir zwei Ereignisse für jeden Flug sehen:

{"FL_DATE": "2015-04-28", "UNIQUE_CARRIER": "EV", 
"ORIGIN_AIRPORT_SEQ_ID": "1013503", "ORIGIN": "ABE", 
"DEST_AIRPORT_SEQ_ID": "1039705", "DEST": "ATL", 
"CRS_DEP_TIME": "2015-04-28 20:00:00", "DEP_TIME": "2015-04-28 19:55:00",
"DEP_DELAY": -5, "CRS_ARR_TIME": "2015-04-28 22:09:00", "CANCELLED": false, 
"DIVERTED": false, "DEP_AIRPORT_TZOFFSET": -14400.0, 
"ARR_AIRPORT_TZOFFSET": -14400.0, "EVENT_TYPE": "departed", 
"EVENT_TIME": "2015-04-28 19:55:00"}
{"FL_DATE": "2015-04-28", "UNIQUE_CARRIER": "EV", 
"ORIGIN_AIRPORT_SEQ_ID": "1013503", "ORIGIN": "ABE", 
"DEST_AIRPORT_SEQ_ID": "1039705", "DEST": "ATL", 
"CRS_DEP_TIME": "2015-04-28 20:00:00", "DEP_TIME": "2015-04-28 19:55:00",
"DEP_DELAY": -5, "TAXI_OUT": 7, "WHEELS_OFF": "2015-04-28 20:02:00", 
"WHEELS_ON": "2015-04-28 21:47:00", "TAXI_IN": 4, 
"CRS_ARR_TIME": "2015-04-28 22:09:00", "ARR_TIME": "2015-04-28 21:51:00", 
"ARR_DELAY": -18, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "692.00",
"DEP_AIRPORT_TZOFFSET": -14400.0, "ARR_AIRPORT_TZOFFSET": -14400.0, 
"EVENT_TYPE": "arrived", "EVENT_TIME": "2015-04-28 21:51:00"}

Das erste Ereignis ist ein departed Ereignis und soll zur Abfahrtszeit veröffentlicht werden, während das zweite Ereignis ein arrived Ereignis ist und zur Ankunftszeit veröffentlicht werden soll. Das Ereignis departed enthält eine Reihe fehlender Felder für Daten, die zu diesem Zeitpunkt noch nicht bekannt sind.

Sobald dieser Code funktioniert, fügen wir ein drittes Ereignis hinzu, das gesendet wird, wenn das Flugzeug abhebt:

    if len(fields["WHEELS_OFF"]) > 0:
       event = dict(fields)  # copy
       event["EVENT_TYPE"] = "wheelsoff"
       event["EVENT_TIME"] = fields["WHEELS_OFF"]
       for f in ["WHEELS_ON", "TAXI_IN", 
                 "ARR_TIME", "ARR_DELAY", "DISTANCE"]:
          event.pop(f, None)  # not knowable at departure time
       yield event

Zu diesem Zeitpunkt haben wir noch kein Abroll-Ereignis erstellt.

Lesen und Schreiben in der Cloud

Bis jetzt haben wir lokale Dateien gelesen und geschrieben. Sobald wir jedoch unseren Code in einer serverlosen Umgebung in Produktion gehen lassen, macht das Konzept eines lokalen Laufwerks keinen Sinn mehr. Wir müssen von der Cloud Speicherung lesen und schreiben. Da es sich um strukturierte Daten handelt, ist es außerdem besser, sie in BigQuery zu lesen und zu schreiben - erinnerst du dich daran, dass wir in Kapitel 2 unseren kompletten Datensatz in BigQuery geladen haben? Jetzt möchten wir auch die umgewandelten (zeitkorrigierten) Daten dort ablegen.

Glücklicherweise muss dabei nur die Quelle oder die Senke geändert werden. Der Rest der Pipeline bleibt unverändert. Im vorigen Abschnitt (siehe 04_streaming/transform/df05.py) haben wir die Datei airports.csv.gz zum Beispiel so gelesen:

| 'airports:read' >> beam.io.ReadFromText('airports.csv.gz')

Um die entsprechende Datei aus der Cloud Speicherung zu lesen, ändern wir den entsprechenden Code in 04_streaming/transform/df06.py wie folgt:

airports_filename = 'gs://{}/flights/airports/airports.csv.gz'.format(
                          bucket)
...
| 'airports:read' >> beam.io.ReadFromText(airports_filename)

Natürlich müssen wir dafür sorgen, dass die Datei in die Cloud Speicherung hochgeladen wird und von demjenigen, der den Code ausführt, gelesen werden kann. Die Datendatei in unserem GitHub-Repository zur Verfügung zu stellen, würde ohnehin nicht skalieren - Cloud Storage (oder BigQuery) ist der richtige Ort für die Daten.

In df05.py musste ich eine lokale Datei lesen, die den JSON-Export eines intelligenten Teils des Datensatzes enthielt, und einen JSON-Parser verwenden, um ein Diktat zu erhalten:

 | 'flights:read' >> beam.io.ReadFromText('flights_sample.json')
 | 'flights:parse' >> beam.Map(lambda line: json.loads(line))

In df06.py wird der entsprechende Code einfacher, weil der BigQuery-Reader ein dict zurückgibt, in dem die Spaltennamen der Ergebnismenge die Schlüssel sind:

'flights:read' >> beam.io.ReadFromBigQuery(
      query='SELECT * FROM dsongcp.flights WHERE rand() < 0.001', 
      use_standard_sql=True)

Wenn wir die Abfrage wirklich ausführen, werden wir natürlich die Stichproben entfernen (rand() < 0.001), damit wir den gesamten Datensatz verarbeiten können.

Ähnlich verhält es sich, wenn wir vorher in eine lokale Datei geschrieben haben:

  | 'flights:tostring' >> beam.Map(lambda fields: json.dumps(fields))
  | 'flights:out' >> beam.io.textio.WriteToText('all_flights')

ändern wir den Code, um in die Cloud Speicherung zu schreiben:

 flights_output = 'gs://{}/flights/tzcorr/all_flights'.format(bucket)
... 
 | 'flights:tostring' >> beam.Map(lambda fields: json.dumps(fields))
 | 'flights:gcsout' >> beam.io.textio.WriteToText(flights_output)

Wir können die gleichen Daten auch in eine BigQuery-Tabelle schreiben:

flights_schema = \
     'FL_DATE:date,UNIQUE_CARRIER:string,...CANCELLED:boolean'
...
 | 'flights:bqout' >> beam.io.WriteToBigQuery(
     'dsongcp.flights_tzcorr', schema=flights_schema,
     write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,
     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
    )

Beachte, dass wir ein Schema angeben müssen, wenn wir in BigQuery schreiben, und angeben müssen, was zu tun ist, wenn die Tabelle bereits existiert (wir bitten darum, dass die Tabelle gekürzt und der Inhalt ersetzt wird) und wenn die Tabelle noch nicht existiert (wir bitten darum, dass die Tabelle erstellt wird).

Wir können versuchen, diesen Code auszuführen, aber die Pipeline benötigt ein paar zusätzliche Parameter. Wo wir also vorher hatten:

  with beam.Pipeline('DirectRunner') as pipeline:

die wir jetzt brauchen:

  argv = [
      '--project={0}'.format(project),
      '--staging_location=gs://{0}/flights/staging/'.format(bucket),
      '--temp_location=gs://{0}/flights/temp/'.format(bucket),
      '--runner=DirectRunner'
   ]
   with beam.Pipeline(argv=argv) as pipeline:

Der Grund dafür ist, dass wir, wenn wir aus BigQuery lesen, eine Abfrage bereitstellen:

'flights:read' >> beam.io.ReadFromBigQuery(
      query='SELECT * FROM dsongcp.flights WHERE rand() < 0.001', 
      use_standard_sql=True)

Wir müssen also das Projekt angeben, das abgerechnet werden soll. Darüber hinaus - und das ist ein Detail der Implementierung - müssen einige temporäre Daten in der Cloud-Speicherung bereitgestellt und zwischengespeichert werden, und wir müssen der Pipeline einen Speicherort für diese temporären Daten zur Verfügung stellen - wir können nie sicher sein, welche Vorgänge eine Bereitstellung oder Zwischenspeicherung erfordern.

Wir können df06.py ausführen und dann überprüfen, ob neue Tabellen in BigQuery erstellt werden. Bis jetzt haben wir den Code lokal ausgeführt, entweder auf deinem Laptop oder in der Cloud Shell.

Als Nächstes schauen wir uns an, wie wir dies in Cloud Dataflow, dem GCP-Managed Service für Apache Beam-Pipelines, ausführen können.

Betrieb der Pipeline in der Cloud

Dieser letzte Durchlauf hat auf der lokalen virtuellen Maschine (VM) ein paar Minuten gedauert, und wir haben nur tausend Zeilen verarbeitet! Ändern wir den Code (siehe df07.py), um alle Zeilen in der BigQuery-Ansicht zu verarbeiten:

'flights:read' >> beam.io.ReadFromBigQuery(
      query='SELECT * FROM dsongcp.flights', 
      use_standard_sql=True)

Jetzt, da wir viel mehr Daten haben, müssen wir die Arbeit verteilen. Dazu ändern wir den Runner von DirectRunner (der lokal läuft) zu DataflowRunner (der den Auftrag in die Cloud verlagert und skaliert):

   argv = [
      '--project={0}'.format(project),
      '--job_name=ch04timecorr',
      '--save_main_session',
      '--staging_location=gs://{0}/flights/staging/'.format(bucket),
      '--temp_location=gs://{0}/flights/temp/'.format(bucket),
      '--setup_file=./setup.py',
      '--max_num_workers=8',
      '--region={}'.format(region),
      '--runner=DataflowRunner'
   ]

   pipeline = beam.Pipeline(argv=argv)

Beachte, dass es jetzt ein paar zusätzliche Parameter gibt:

  • Der Auftragsname ist der Name, unter dem der Auftrag in der GCP-Konsole aufgeführt wird. So können wir bei Bedarf eine Fehlersuche für den Auftrag durchführen.

  • Wir bitten den Dataflow-Übermittlungscode, unsere Hauptsitzung zu speichern. Das ist immer dann notwendig, wenn wir globale Variablen in unserem Python-Programm haben.

  • Die Datei setup.py sollte die Python-Pakete auflisten, die wir nach und nach installieren müssen (timezonefinder und pytz) - Cloud Dataflow muss diese Pakete auf den Compute Engine-Instanzen installieren, die es im Hintergrund startet:

    REQUIRED_PACKAGES = [
        'timezonefinder',
        'pytz'
    ]
  • Standardmäßig skaliert Dataflow die Anzahl der Arbeiter auf der Grundlage des Durchsatzes - je mehr Zeilen wir in unseren Eingabedateien haben, desto mehr Arbeiter brauchen wir. Dies wird als horizontale Autoskalierung bezeichnet. Um die automatische Skalierung auszuschalten, kannst du --autoscaling_algorithm=NONE angeben und um sie etwas einzuschränken, kannst du die maximale Anzahl der Arbeiter angeben.

  • Wir legen die Region fest, in der die Dataflow-Pipeline laufen soll.

  • Der Runner ist nicht mehr DirectRunner (der lokal läuft). Er ist jetzt DataflowRunner.

Wenn du das Python-Programm ausführst, wird der Auftrag an die Cloud übermittelt. Cloud Dataflow skaliert jeden Schritt der Pipeline automatisch auf Basis des Durchsatzes und streamt die Ereignisdaten in BigQuery (siehe Abbildung 4-3). Du kannst den laufenden Auftrag in der Cloud Platform Console im Bereich Cloud Dataflow überwachen.

Noch während die Ereignisdaten geschrieben werden, können wir sie abfragen, indem wir die BigQuery-Konsole aufrufen und Folgendes eingeben:

SELECT
  ORIGIN,
  DEP_TIME,
  DEST,
  ARR_TIME,
  ARR_DELAY,
  EVENT_TIME,
  EVENT_TYPE
FROM
  dsongcp.flights_simevents
WHERE
  (DEP_DELAY > 15 and ORIGIN = 'SEA') or
  (ARR_DELAY > 15 and DEST = 'SEA')
ORDER BY EVENT_TIME ASC
LIMIT
  5

Das gibt zurück:

Row ORIGIN DEP_TIME DEST ARR_TIME ARR_DELAY EVENT_TIME EVENT_TYPE
1 SEA 2015-01-01 08:21:00 UTC IAD null null 2015-01-01 08:21:00 UTC departed
2 SEA 2015-01-01 08:21:00 UTC IAD null null 2015-01-01 08:38:00 UTC wheelsoff
3 SEA 2015-01-01 08:21:00 UTC IAD 2015-01-01 12:48:00 UTC 22.0 2015-01-01 12:48:00 UTC arrived
4 KOA 2015-01-01 10:11:00 UTC SEA 2015-01-01 15:45:00 UTC 40.0 2015-01-01 15:45:00 UTC arrived
5 SEA 2015-01-01 16:43:00 UTC PSP null null 2015-01-01 16:43:00 UTC departed

Wie erwartet, gibt es für den Flug SEA-IAD drei Ereignisse: eines beim Abflug, das nächste beim Abrollen und das dritte bei der Ankunft. Die Ankunftsverspätung ist nur bei der Ankunft bekannt.

BigQuery ist eine spaltenorientierte Datenbank, also eine Abfrage, die alle Felder auswählt:

SELECT
  *
FROM
  dsongcp.flights_simevents
ORDER BY EVENT_TIME ASC

ineffizient sein wird. Wir brauchen jedoch alle Ereignisdaten, um Ereignisbenachrichtigungen zu versenden. Deshalb haben wir einen Kompromiss zwischen Speicherung und Geschwindigkeit gefunden, indem wir eine zusätzliche Spalte namens EVENT_DATA zu unserer BigQuery-Tabelle hinzugefügt und sie wie folgt in unsere Dataflow-Pipeline eingefügt haben:

def create_event_row(fields):
   featdict = dict(fields)  # copy
   featdict['EVENT_DATA'] = json.dumps(fields)
   return featdict

Dann könnte unsere Abfrage zum Abrufen der Ereignisse einfach wie folgt lauten:

SELECT
  EVENT_TYPE,
  EVENT_TIME,
  EVENT_DATA
FROM
  dsongcp.flights_simevents
WHERE
  EVENT_TIME >= TIMESTAMP('2015-05-01 00:00:00 UTC')
  AND EVENT_TIME < TIMESTAMP('2015-05-03 00:00:00 UTC')
ORDER BY
  EVENT_TIME ASC
LIMIT
  2

Das Ergebnis sieht so aus:

Row EVENT_TYPE EVENT_TIME EVENT_DATA
1 wheelsoff 2015-05-01 00:00:00 UTC {"FL_DATE": "2015-04-30", "UNIQUE_CARRIER": "DL", "ORIGIN_AIRPORT_SEQ_ID": "1295302", "ORIGIN": "LGA", "DEST_AIRPORT_SEQ_ID": "1330303", "DEST": "MIA", "CRS_DEP_TIME": "2015-04-30T23:29:00", "DEP_TIME": "2015-04-30T23:35:00", "DEP_DELAY": 6.0, "TAXI_OUT": 25.0, "WHEELS_OFF": "2015-05-01T00:00:00", "CRS_ARR_TIME": "2015-05-01T02:53:00", "CANCELLED": false, "DIVERTED": false, "DEP_AIRPORT_TZOFFSET": -14400.0, "ARR_AIRPORT_TZOFFSET": -14400.0, "EVENT_TYPE": "wheelsoff", "EVENT_TIME": "2015-05-01T00:00:00"}
2 departed 2015-05-01 00:00:00 UTC {"FL_DATE": "2015-04-30", "UNIQUE_CARRIER": "DL", "ORIGIN_AIRPORT_SEQ_ID": "1295302", "ORIGIN": "LGA", "DEST_AIRPORT_SEQ_ID": "1320402", "DEST": "MCO", "CRS_DEP_TIME": "2015-04-30T23:55:00", "DEP_TIME": "2015-05-01T00:00:00", "DEP_DELAY": 5.0, "CRS_ARR_TIME": "2015-05-01T02:45:00", "CANCELLED": false, "DIVERTED": false, "DEP_AIRPORT_TZOFFSET": -14400.0, "ARR_AIRPORT_TZOFFSET": -14400.0, "EVENT_TYPE": "departed", "EVENT_TIME": "2015-05-01T00:00:00"}

Diese Tabelle wird als Quelle für unsere Ereignisse dienen; aus einer solchen Abfrage werden wir die Streaming-Flugdaten simulieren.

Veröffentlichen eines Ereignisstroms in Cloud Pub/Sub

Jetzt, da wir die Quell-Ereignisse aus den rohen Flugdaten haben, können wir den Stream simulieren. Streaming-Daten werden auf der Google Cloud Platform in der Regel über Cloud Pub/Sub veröffentlicht, einen serverlosen Echtzeit-Nachrichtendienst. Cloud Pub/Sub sorgt für eine zuverlässige Zustellung und kann auf mehr als eine Million Nachrichten pro Sekunde skaliert werden. Sofern du nicht Cloud Pub/Sub Lite verwendest (ein Dienst mit nur einer Zone, der für einen kostengünstigen Betrieb ausgelegt ist), speichert Pub/Sub Kopien der Nachrichten in mehreren Zonen, um eine garantierte Zustellung an die Abonnenten zu gewährleisten, und es kann viele gleichzeitige Abonnenten geben.

Unser Simulator liest aus der Ereignistabelle in BigQuery (die im vorherigen Abschnitt erstellt wurde) und veröffentlicht Nachrichten an Cloud Pub/Sub. Im Wesentlichen werden wir die Flugereignisdatensätze durchgehen, die Benachrichtigungszeit von jedem abrufen und die Veröffentlichung dieser Ereignisse simulieren, sobald sie eintreten.

Speed-Up-Faktor

Wir verwenden aber auch eine Zuordnung zwischen der Zeit der Ereignismeldung (Ankunfts- oder Abfahrtszeit je nach Ereignis) und der aktuellen Systemzeit. Warum? Weil es ineffizient ist, die Flugereignisse immer in Echtzeit zu simulieren. Stattdessen kann es sein, dass wir die Flugdaten eines Tages in einer Stunde durchgehen wollen (solange der Code, der diese Ereignisse verarbeitet, die erhöhte Datenrate bewältigen kann). Zu anderen Zeiten kann es sein, dass wir unseren ereignisverarbeitenden Code in einer langsameren Debugging-Umgebung ausführen und die Simulation deshalb verlangsamen wollen. Ich bezeichne dieses Verhältnis zwischen der tatsächlichen Zeit und der Simulationszeit als Beschleunigungsfaktor - derBeschleunigungsfaktor ist größer als 1, wenn die Simulation schneller als die Echtzeit sein soll, und kleiner als 1, wenn sie langsamer als die Echtzeit sein soll.

Basierend auf dem Beschleunigungsfaktor müssen wir eine lineare Transformation der Ereigniszeit in die Systemzeit vornehmen. Wenn der Beschleunigungsfaktor 1 ist, sollte eine 60-minütige Differenz zwischen dem Start der Simulation in der Ereigniszeit und dem Zeitstempel des aktuellen Datensatzes 60 Minuten nach dem Start der Simulation auftreten. Wenn der Beschleunigungsfaktor 60 ist, entspricht ein 60-minütiger Unterschied in der Ereigniszeit einem 1-minütigen Unterschied in der Systemzeit, so dass der Datensatz eine Minute später veröffentlicht werden sollte. Wenn die Ereigniszeit der Systemzeit voraus ist, schlafen wir die notwendige Zeit, damit die Simulation aufholen kann.

Die Simulation besteht aus vier Schritten (siehe auch Abbildung 4-5):

  • Führe die Abfrage aus, um die zu veröffentlichenden Flugereignisdatensätze zu erhalten.

  • Iteriere durch die Abfrageergebnisse.

  • Sammle Ereignisse, um sie in einem Stapel zu veröffentlichen.

  • Veröffentliche angesammelte Ereignisse und schlafe bei Bedarf.

Obwohl es sich hier um eine ETL-Pipeline handelt, ist diese ETL-Pipeline aufgrund der Notwendigkeit, die Datensätze in strikter Reihenfolge zu verarbeiten und dazwischen zu pausieren, schlecht für Cloud Dataflow geeignet. Stattdessen werden wir sie als reines Python-Programm implementieren. Das Problem bei dieser Wahl ist, dass der Simulationscode nicht fehlertolerant ist - wenn die Simulation fehlschlägt, wird sie nicht automatisch neu gestartet und fängt definitiv nicht beim letzten erfolgreich gemeldeten Ereignis an.

Abbildung 4-5. Die vier Schritte der Simulation.

Der Simulationscode, den wir schreiben, ist nur für schnelle Experimente mit Streaming-Daten gedacht. Daher werde ich nicht den zusätzlichen Aufwand betreiben, um ihn fehlertolerant zu machen. Wenn es nötig wäre, könnten wir die Simulation fehlertolerant machen, indem wir mit einer BigQuery-Abfrage beginnen, die durch einen Zeitbereich begrenzt ist, dessen Beginn automatisch aus dem zuletzt gemeldeten Datensatz in Cloud Pub/Sub abgeleitet wird. Dann könnten wir das Simulationsskript von einem Docker-Container aus starten und Cloud Run oder Google Kubernetes Engine verwenden, um die Simulation automatisch neu zu starten, wenn der Simulationscode fehlschlägt.

Datensätze zur Veröffentlichung erhalten

Die BigQuery-Abfrage wird durch die Start- und Endzeit der Simulation parametrisiert und kann über die Google Cloud API für Python aufgerufen werden (siehe 04_streaming/simulate/simulate.py im GitHub-Repository):

   bqclient = bq.Client(args.project)
   querystr = """
SELECT
  EVENT_TYPE,
  EVENT_TIME AS NOTIFY_TIME,
  EVENT_DATA
FROM
  dsongcp.flights_simevents
WHERE
  EVENT_TIME >= TIMESTAMP('{}')
  AND EVENT_TIME < TIMESTAMP('{}')
ORDER BY
  EVENT_TIME ASC
"""
   rows = bqclient.query(querystr.format(args.startTime,
                                         args.endTime))

Das ist jedoch eine schlechte Idee. Verstehst du, warum?

Das liegt daran, dass wir die Start- und Endzeit aus der Befehlszeile des Simulationsskripts abrufen und direkt an BigQuery weitergeben. Das nennt man SQL-Injection, die zu Sicherheitsproblemen führen kann.17 Ein besserer Ansatz ist es, parametrisierte Abfragenzu verwenden - dieBigQuery-Abfrage enthält die Parameter, die als @startTime usw. gekennzeichnet sind, und die Python-Abfragefunktion übernimmt die Definitionen über den Job-Konfigurationsparameter:

   bqclient = bq.Client(args.project)
   querystr = """
SELECT
  EVENT_TYPE,
  EVENT_TIME AS NOTIFY_TIME,
  EVENT_DATA
FROM
  dsongcp.flights_simevents
WHERE
  EVENT_TIME >= @startTime
  AND EVENT_TIME < @endTime
ORDER BY
  EVENT_TIME ASC
"""
   job_config = bq.QueryJobConfig(
       query_parameters=[
           bq.ScalarQueryParameter("startTime", "TIMESTAMP", args.startTime),
           bq.ScalarQueryParameter("endTime", "TIMESTAMP", args.endTime),
       ]
   )
   rows = bqclient.query(querystr, job_config=job_config)

Die Abfragefunktion gibt ein Objekt zurück (im vorangegangenen Schnipsel rows genannt), das wir durchlaufen können:

  for row in rows:
    # do something

Was müssen wir für jede der Zeilen tun? Wir müssen durch die Datensätze iterieren, einen Stapel von Ereignissen erstellen und jeden Stapel veröffentlichen. Schauen wir uns an, wie jeder dieser Schritte abläuft.

Wie viele Themen?

Während wir die Abfrageergebnisse durchgehen, müssen wir Ereignisse in Cloud Pub/Sub veröffentlichen. Wir haben drei Möglichkeiten, was die Architektur angeht:

  • Wir könnten alle Ereignisse in einem einzigen Thema veröffentlichen. Das kann jedoch eine Verschwendung von Netzwerkbandbreite sein, wenn wir einen Abonnenten haben, der nur an dem Ereignis wheelsoff interessiert ist. Ein solcher Abonnent muss das eingehende Ereignis parsen, die EVENT_TYPE Datei im JSON dekodieren und Ereignisse, an denen er nicht interessiert ist, verwerfen.

  • Wir könnten alle Ereignisse in einem einzigen Thema veröffentlichen, aber jeder Nachricht Attribute hinzufügen. Um zum Beispiel ein Ereignis mit zwei Attributen zu veröffentlichen event_type und carrierzu veröffentlichen, würden wir Folgendes tun:

     publisher.publish(topic, event_data,
                       event_type='departed', carrier='AA')

    Dann könnte der Abonnent bei der Erstellung des Abonnements eine serverseitige Filterung nach einem Attribut oder einer Kombination von Attributen verlangen :

     subscriber.create_subscription(request={..., "filter": 
       "attributes.carrier='AS' AND attributes.event_type='arrived'"})
  • Erstelle für jeden Ereignistyp ein eigenes Thema (d.h. ein arrived Thema, ein departed Thema und ein wheelsoff Thema).

Option 1 ist die einfachste und sollte deine Standardwahl sein, es sei denn, du hast viele Abonnenten, die nur an Teilmengen des Ereignisstroms interessiert sind. Wenn du viele Abonnenten hast, die sich nur für Teilmengen des Ereignisstroms interessieren, wähle zwischen Option 2 und 3.

Option 2 erhöht die Komplexität der Software. Option 3 erhöht die Komplexität der Infrastruktur. Ich schlage vor, Option 3 zu wählen, wenn du nur ein Attribut hast und dieses Attribut nur eine Handvoll Optionen hat. Das begrenzt die Komplexität der Infrastruktur und hält den Code für Publisher und Subscriber einfach. Wähle Option 2, wenn du viele Attribute mit vielen möglichen Werten hast, weil Option 3 in einem solchen Szenario zu einer explosionsartigen Zunahme der Anzahl der Themen führen würde.

Iteration durch Datensätze

Wir entscheiden uns dafür, für jede Ereignisart ein eigenes Thema zu erstellen (d. h. ein arrived Thema, ein departed Thema und ein wheelsoff Thema), also erstellen wir drei Themen:18

for event_type in ['wheelsoff', 'arrived', 'departed']:
  topics[event_type] = publisher.topic_path(args.project, event_type)
  try:
    publisher.get_topic(topic=topics[event_type])
    logging.info("Already exists: {}".format(topics[event_type]))
  except:
    logging.info("Creating {}".format(topics[event_type]))
    publisher.create_topic(name=topics[event_type])

Nachdem wir die Themen erstellt haben, rufen wir die Methode notify() auf und geben die aus BigQuery gelesenen Zeilen weiter:

# notify about each row in the dataset
programStartTime = datetime.datetime.utcnow()
simStartTime = datetime.datetime.strptime(args.startTime,
                         TIME_FORMAT).replace(tzinfo=pytz.UTC)
notify(publisher, topics, rows, simStartTime, 
       programStartTime, args.speedFactor)

Erstellen eines Stapels von Ereignissen

Die Methode notify() besteht darin, die Zeilen in Stapeln zu sammeln, einen Stapel zu veröffentlichen und zu schlafen, bis es Zeit ist, den nächsten Stapel zu veröffentlichen:

def notify(publisher, topics, rows, simStartTime, programStart, speedFactor):
   # sleep computation
   def compute_sleep_secs(notify_time):
        time_elapsed = (datetime.datetime.utcnow() - 
                        programStart).seconds
        sim_time_elapsed = (notify_time - simStartTime).seconds / speedFactor
        to_sleep_secs = sim_time_elapsed - time_elapsed
        return to_sleep_secs

   tonotify = {}
   for key in topics:
     tonotify[key] = list()

   for row in rows:
       event, notify_time, event_data = row

       # how much time should we sleep?
       if compute_sleep_secs(notify_time) > 1:
          # notify the accumulated tonotify
          publish(publisher, topics, tonotify, notify_time)
          for key in topics:
             tonotify[key] = list()

          # recompute sleep, since notification takes a while
          to_sleep_secs = compute_sleep_secs(notify_time)
          if to_sleep_secs > 0:
             logging.info('Sleeping {} seconds'.format(to_sleep_secs))
             time.sleep(to_sleep_secs)

       tonotify[event].append(event_data)
   # left-over records; notify again
   publish(publisher, topics, tonotify, notify_time)

Hier gibt es ein paar Punkte zu beachten. Erstens arbeiten wir komplett in UTC, damit die Zeitdifferenzberechnungen einen Sinn ergeben. Zweitens berechnen wir immer die Zeitdifferenz seit dem Start der Simulation, um zu sehen, ob wir schlafen sollen. Wenn wir den Zeiger einfach immer weiter nach vorne schieben, häufen sich die Fehler in der Zeit. Außerdem prüfen wir beim ersten Mal, ob die Schlafzeit mehr als eine Sekunde beträgt, damit die Aufzeichnungen Zeit haben, sich zu sammeln. Wenn du beim Ausführen des Programms keinen Ruhezustand siehst, ist dein Beschleunigungsfaktor zu hoch für die Leistungsfähigkeit des Rechners, auf dem der Simulationscode läuft, und für das Netzwerk zwischen diesem Rechner und der Google Cloud Platform. Verlangsame die Simulation, besorge dir einen größeren Rechner oder führe sie hinter der Google Firewall aus (z. B. in der Cloud Shell oder auf einer Compute Engine Instanz).

Veröffentlichen eines Stapels von Veranstaltungen

Die Methode notify(), die wir im vorherigen Codebeispiel gesehen haben, hat die Ereignisse zwischen den Sleep-Aufrufen angesammelt. Auch wenn es so aussieht, als würden wir ein Ereignis nach dem anderen veröffentlichen, verwaltet der Verleger tatsächlich einen separaten Stapel für jedes Thema:

def publish(publisher, topics, allevents):
   for key in topics:  # 'departed', 'arrived', etc.
      topic = topics[key]
      events = allevents[key]
      logging.info('Publishing {} {} events'.format(len(events), key))
      for event_data in events:
          publisher.publish(topic, event_data.encode())

Beachte, dass Cloud Pub/Sub nicht garantiert, dass die Nachrichten in der richtigen Reihenfolge zugestellt werden, insbesondere wenn der Abonnent einen großen Rückstand aufbaut. Es kommt vor, dass Nachrichten nicht in der richtigen Reihenfolge zugestellt werden, und die nachgelagerten Abonnenten müssen sich darum kümmern. Cloud Pub/Sub garantiert die Zustellung "mindestens einmal" und sendet die Nachricht erneut, wenn der Abonnent eine Nachricht nicht rechtzeitig bestätigt. Ich werde Cloud Dataflow für den Ingest von Cloud Pub/Sub verwenden, und Cloud Dataflow kümmert sich transparent um diese beiden Probleme (außer der Reihe und Duplikation).

Wir können die Simulation ausprobieren, indem wir das Folgende eingeben:

python3 simulate.py --startTime '2015-05-01 00:00:00 UTC' \
      --endTime '2015-05-04 00:00:00 UTC' --speedFactor=60

Damit werden drei Tage Flugdaten (die Endzeit ist exklusiv) mit 60-facher Echtzeitgeschwindigkeit simuliert und die Ereignisse in drei Topics auf Cloud Pub/Sub gestreamt.19 Da die Simulation von einer BigQuery-Abfrage ausgeht, ist es ganz einfach, die simulierten Ereignisse auf einen einzigen Flughafen oder auf Flughäfen innerhalb eines Längen- und Breitengrads zu beschränken.

In diesem Abschnitt haben wir uns angesehen, wie man einen Ereignisstrom erzeugt und diese Ereignisse in Echtzeit veröffentlicht. In diesem Buch können wir diesen Simulator und diese Themen nutzen, um zu erproben, wie wir Streaming-Daten nutzen und Echtzeitanalysen durchführen können.

Stream-Verarbeitung in Echtzeit

Da wir nun eine Quelle für Streaming-Daten mit Standortinformationen haben, wollen wir uns ansehen, wie wir ein Echtzeit-Dashboard erstellen können. Abbildung 4-6 zeigt die Referenzarchitektur für viele Lösungen auf der Google Cloud Platform.20

Abbildung 4-6. Referenzarchitektur für die Datenverarbeitung auf der Google Cloud Platform.

Im vorherigen Abschnitt haben wir einen Echtzeit-Ereignisstrom in Cloud Pub/Sub eingerichtet, den wir in Cloud Dataflow aggregieren und in BigQuery schreiben können. Data Studio kann sich mit BigQuery verbinden und ein interaktives Dashboard in Echtzeit bereitstellen. Legen wir los.

Streaming im Datenfluss

Als wir die Zeitkorrektur der Rohflugdaten durchführten, arbeiteten wir mit einer vollständigen BigQuery-Flugtabelle, verarbeiteten sie in Cloud Dataflow und schrieben die Ereignistabelle in BigQuery. Die Verarbeitung eines endlichen, begrenzten Eingabedatensatzes wird als Stapelverarbeitung bezeichnet.

Hier müssen wir jedoch Ereignisse in Cloud Pub/Sub verarbeiten, die in Strömen hereinströmen. Die Datenmenge ist unbegrenzt. Die Verarbeitung eines unbegrenzten Datensatzes wird Stream Processing genannt. Zum Glück ist der Code für die Stream-Verarbeitung in Apache Beam identisch mit dem Code für die Stapelverarbeitung.

Wir könnten die Ereignisse einfach von Cloud Pub/Sub empfangen, ähnlich wie wir Daten aus einer CSV-Datei lesen:21

topic_name = "projects/{}/topics/arrived".format(project)
events = (pipeline
         | 'read' >> beam.io.ReadFromPubSub(topic=topic_name)
         | 'parse' >> beam.Map(lambda s: json.loads(s))
         )

Die einzige Änderung, die wir vornehmen müssen, ist, das Streaming-Flag in den Dataflow-Optionen zu aktivieren:

argv = [
        ...
        '--streaming',
    ]

Wir können die eingelesenen Ereignisse an BigQuery streamen, indem wir einen ähnlichen Code wie bei der Stapelverarbeitung verwenden:

schema = 'FL_DATE:date,...,EVENT_TYPE:string,EVENT_TIME:timestamp'
(events
         | 'bqout' >> beam.io.WriteToBigQuery(
                    'dsongcp.streaming_events', schema=schema,
               create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
            )
)

Im vorangegangenen Code abonnieren wir ein Thema in Cloud Pub/Sub und beginnen, es zu lesen. Sobald eine Nachricht eintrifft, analysieren wir sie, wandeln sie in eine TableRow in BigQuery um und schreiben sie dann aus. Wenn das alles ist, was wir brauchen, können wir einfach die von Google bereitgestellte Dataflow-Vorlage verwenden, die von Pub/Sub zu BigQuery führt.

Aber nehmen wir an, dass wir sowohl die angekommenen als auch die abgegangenen Ereignisse lesen und in dieselbe BigQuery-Tabelle schreiben wollen. Das können wir ganz einfach in Beam machen:

events = {}
for event_name in ['arrived', 'departed']:
  topic_name = "projects/{}/topics/{}".format(project, event_name)
  events[event_name] = (pipeline
     | 'read:{}'.format(event_name) >>  
             beam.io.ReadFromPubSub(topic=topic_name)
     | 'parse:{}'.format(event_name) >> beam.Map(
             lambda s: json.loads(s))
  )

all_events = (events['arrived'], events['departed']) | beam.Flatten()

Durch das Flattening werden die beiden Ereignisgruppen zu einer einzigen Sammlung verkettet. Dann schreiben wir all_events an BigQuery aus.

Um diesen Code auszuprobieren, müssen wir den Simulator starten, den wir im vorherigen Abschnitt geschrieben haben, damit der Simulator Ereignisse in den Pub/Sub-Themen veröffentlichen kann. Um die Simulation zu starten, startest du den Python-Simulator, den wir im vorigen Abschnitt entwickelt haben:

python simulate.py --startTime '2015-05-01 00:00:00 UTC'
--endTime '2015-05-04 00:00:00 UTC'  --speedFactor 30

Der Simulator sendet Ereignisse vom 1. Mai 2015 bis zum 3. Mai 2015 mit 30-facher Echtzeitgeschwindigkeit, so dass eine Stunde Daten in zwei Minuten an Cloud Pub/Sub gesendet wird. Du kannst dies von der Cloud Shell oder von deinem lokalen Laptop aus tun. (Falls nötig, führe install_packages.sh aus, um die erforderlichen Python-Pakete zu installieren, und gcloud auth application-default login, um der Anwendung die nötigen Anmeldedaten für die Ausführung von Abfragen zu geben).

Starte in einem anderen Terminal avg01.py, um den Strom von Ereignissen zu lesen und sie in BigQuery zu schreiben. Dann können wir den Datensatz in BigQuery abfragen, während die Ereignisse hereinströmen. Die BigQuery-Benutzeroberfläche zeigt diese Streaming-Tabelle vielleicht noch nicht an, aber sie kann abgefragt werden:

SELECT * FROM dsongcp.streaming_events
ORDER BY EVENT_TIME DESC
LIMIT 5

Fensterung einer Pipeline

Obwohl wir nur einen einfachen Datentransfer machen könnten, würde ich gerne mehr tun. Wenn ich ein Echtzeit-Dashboard mit Flugverspätungen auffülle, möchte ich, dass die Informationen über ein angemessenes Intervall aggregiert werden. Ich möchte zum Beispiel einen gleitenden Durchschnitt der Flugverspätungen und die Gesamtzahl der Flüge der letzten 60 Minuten an jedem Flughafen. Anstatt die von Cloud Pub/Sub erhaltenen Daten einfach an BigQuery weiterzuleiten, möchte ich die Daten in dem Moment analysieren, in dem ich sie erhalte, und diese Analysen in BigQuery schreiben.22 Cloud Dataflow kann uns dabei helfen.

Auch wenn wir einen Durchschnitt über 60 Minuten bilden, wie oft sollten wir diesen 60-Minuten-Durchschnitt berechnen? Es könnte zum Beispiel von Vorteil sein, ein gleitendes Fenster zu verwenden und diesen 60-Minuten-Durchschnitt alle fünf Minuten zu berechnen.

Streaming-Aggregation

Der Hauptunterschied zwischen Batch-Aggregation und Streaming-Aggregation ist die Unbegrenztheit der Daten bei der Stream-Verarbeitung. Was bedeutet eine Operation wie "max", wenn die Daten unbegrenzt sind? Unabhängig davon, wie hoch der Maximalwert zu diesem Zeitpunkt ist, kann zu einem späteren Zeitpunkt ein größerer Wert im Datenstrom auftauchen.

Ein Schlüsselkonzept bei der Aggregation von Streaming-Daten ist das eines Fensters, das den Rahmen für alle Aggregationen bildet. Hier wenden wir ein zeitbasiertes gleitendes Fenster auf die Pipeline an. Von nun an finden alle Gruppierungen, Aggregationen usw. innerhalb dieses Zeitfensters statt, und in jedem Zeitfenster gibt es ein eigenes Maximum, einen Durchschnitt usw:

stats = (all_events
   | 'byairport' >> beam.Map(by_airport)
   | 'window' >> beam.WindowInto(
                 beam.window.SlidingWindows(60 * 60, 5 * 60))
   | 'group' >> beam.GroupByKey()
   | 'stats' >> beam.Map(lambda x: compute_stats(x[0], x[1]))
)

Gehen wir den vorangegangenen Codeschnipsel genau durch.

Als erstes nehmen wir alle Ereignisse und wenden die by_airport Transformation auf die Ereignisse an:

   | 'byairport' >> beam.Map(by_airport)

Dabei wird der Herkunftsflughafen für abfliegende Ereignisse und der Zielflughafen für ankommende Ereignisse herausgezogen:

def by_airport(event):
    if event['EVENT_TYPE'] == 'departed':
        return event['ORIGIN'], event
    else:
        return event['DEST'], event

Als Nächstes wenden wir ein gleitendes Fenster auf den Ereignisstrom an. Das Fenster hat eine Dauer von 60 Minuten und wird alle 5 Minuten angewendet:

   | 'window' >> beam.WindowInto(
                 beam.window.SlidingWindows(60 * 60, 5 * 60))

Dann wenden wir eine GroupByKey an:

   | 'group' >> beam.GroupByKey()

Was ist der Schlüssel?

In der zuvor erwähnten Funktion by_airport haben wir den Flughafen zum Schlüssel und das gesamte Ereignisobjekt zum Wert gemacht. Die GroupByKey gruppiert die Ereignisse also nach Flughafen.

Aber die GroupByKey ist nicht nur nach Flughafen geordnet. Da wir bereits ein gleitendes Fenster angewendet haben, wird für jedes Zeitfenster eine eigene Gruppe erstellt. Jede Gruppe besteht also aus 60 Minuten Flugereignissen, die an einem bestimmten Flughafen angekommen oder abgeflogen sind.

Für diese Gruppen rufen wir die Funktion compute_stats im letzten Map des Snippets auf:

   | 'stats' >> beam.Map(lambda x: compute_stats(x[0], x[1]))

Die Funktion compute_stats nimmt den Flughafen und die Liste der Ereignisse an diesem Flughafen und berechnet dann einige Statistiken:

def compute_stats(airport, events):
    arrived = [event['ARR_DELAY'] for event in events 
                if event['EVENT_TYPE'] == 'arrived']
    avg_arr_delay = float(np.mean(arrived)) 
                if len(arrived) > 0 else None

    departed = [event['DEP_DELAY'] for event in events 
                if event['EVENT_TYPE'] == 'departed']
    avg_dep_delay = float(np.mean(departed)) 
                if len(departed) > 0 else None

    num_flights = len(events)
    start_time = min([event['EVENT_TIME'] for event in events])
    latest_time = max([event['EVENT_TIME'] for event in events])

    return {
        'AIRPORT': airport,
        'AVG_ARR_DELAY': avg_arr_delay,
        'AVG_DEP_DELAY': avg_dep_delay,
        'NUM_FLIGHTS': num_flights,
        'START_TIME': start_time,
        'END_TIME': latest_time
    }

Im vorangegangenen Code ziehen wir die eingetroffenen Ereignisse heraus und berechnen die durchschnittliche Ankunftsverspätung. Genauso berechnen wir die durchschnittliche Abflugverspätung für die abgeflogenen Ereignisse. Außerdem berechnen wir die Anzahl der Flüge in dem Zeitfenster an diesem Flughafen und geben alle diese Statistiken zurück.

Die Statistiken werden dann mithilfe eines Codes, der dir inzwischen bekannt vorkommen sollte, an BigQuery übermittelt:

stats_schema = ','.join(
    ['AIRPORT:string,AVG_ARR_DELAY:float,AVG_DEP_DELAY:float',                          
     'NUM_FLIGHTS:int64,START_TIME:timestamp,END_TIME:timestamp'])
(stats
    | 'bqout' >> beam.io.WriteToBigQuery(
             'dsongcp.streaming_delays', schema=stats_schema,
       create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED
               )
)

Wie im vorherigen Abschnitt können wir den Simulator starten und dann avg02.py starten. Als ich das tat, wurden die Aggregationen alle 5 Minuten erstellt, aber innerhalb jedes 5-Minuten-Zeitraums deckten die gemeldeten Ereignisse einen Bereich von 150 Minuten ab (weil die 30x-Simulation 150 Minuten Daten in 5 Minuten verarbeitet).

Die Stream-Processing-Engine wendet die Schiebefenster auf der Grundlage der Zeit auf einer Wanduhr an. Wir möchten jedoch, dass das Fenster auf der Grundlage des Zeitstempels in den Bildern angewendet wird.

Wie machen wir das?

Zeitstempel für Ereignisse verwenden

Wir müssen ein Attribut hinzufügen, wenn wir die Ereignisse veröffentlichen (in simulate.py):

publisher.publish(topic, event_data.encode(), EventTimeStamp=timestamp)

Dann sollten wir in unserer Beam-Pipeline beim Lesen aus Pub/Sub die Veröffentlichungszeit in Pub/Sub ignorieren und stattdessen dieses Attribut der Nachricht als Zeitstempel verwenden:

| 'read:{}'.format(event_name) >> beam.io.ReadFromPubSub(
            topic=topic_name, timestamp_attribute='EventTimeStamp')

Mit dieser Änderung, wenn ich die Abfrage ausführe:

SELECT * FROM dsongcp.streaming_delays
WHERE AIRPORT = 'ATL'
ORDER BY END_TIME DESC

Ich bekomme Reihen im Abstand von etwa 5 Minuten, wie erwartet:

Row AIRPORT AVG_ARR_DELAY AVG_DEP_DELAY NUM_FLIGHTS START_TIME END_TIME
1 ATL 35.72222222222222 13.666666666666666 48 2015-05-01 02:24:00 UTC 2015-05-01 03:21:00 UTC
2 ATL 35.25 8.717948717948717 59 2015-05-01 02:15:00 UTC 2015-05-01 03:12:00 UTC
3 ATL 38.666666666666664 9.882352941176471 52 2015-05-01 02:19:00 UTC 2015-05-01 03:12:00 UTC
4 ATL 38.473684210526315 5.916666666666667 55 2015-05-01 02:15:00 UTC 2015-05-01 03:08:00 UTC
5 ATL 35.111111111111114 5.53125 50 2015-05-01 02:15:00 UTC 2015-05-01 03:03:00 UTC

Die gemeldeten Zeiten liegen nicht genau 5 Minuten auseinander, weil die gemeldeten Zeiten dem frühesten/ spätesten Flug in Atlanta innerhalb des Zeitfensters entsprechen. Beachte auch, dass die Länge des Zeitfensters ungefähr eine Stunde beträgt.

Es ist jedoch wahrscheinlich, dass Cloud Shell oder dein lokaler Laptop Schwierigkeiten haben werden, mit dem Ereignisstrom Schritt zu halten. Wir müssen diese Pipeline in Dataflow auf serverlose Weise ausführen.

Ausführen der Stream-Verarbeitung

Um die Beam-Pipeline in Cloud Dataflow auszuführen, muss ich nur den Runner ändern (siehe avg03.py im GitHub-Repository):

argv = [
        '--project={0}'.format(project),
        '--job_name=ch04avgdelay',
        '--streaming',
      ...
        '--runner=DataflowRunner'
    ]

Bevor wir mit dieser Pipeline beginnen, sollten wir jedoch die Zeilen löschen, die avg02.py im vorherigen Abschnitt bereits in die BigQuery-Tabelle geschrieben hat. Das geht am einfachsten, indem du den folgenden SQL DML-Befehl ausführst, um die Tabelle abzuschneiden:

TRUNCATE TABLE dsongcp.streaming_delays

Wenn du avg03.py ausführst, wird ein Dataflow-Job gestartet. Wenn du jetzt in der Cloud Platform-Konsole den Abschnitt Cloud Dataflow aufrufst, siehst du, dass ein neuer Streaming-Job gestartet wurde und dass die Pipeline wie in Abbildung 4-7 aussieht.

Die Pipeline verarbeitet die Flugereignisse, während sie in Pub/Sub eingehen, fasst sie in Zeitfenstern zusammen und überträgt die daraus resultierenden Statistiken in BigQuery.

Abbildung 4-7. Die Streaming-Pipeline zur Berechnung von Verspätungsstatistiken in Echtzeit an jedem Flughafen.

Analysieren von Streaming-Daten in BigQuery

Zwei Minuten nach dem Start deines Programms,23 wird der erste Datensatz in BigQuery gespeichert. Du kannst die Statistiken für einen bestimmten Flughafen über die BigQuery-Konsole abfragen, indem du die gleiche Abfrage wie zuvor verwendest:

SELECT * FROM dsongcp.streaming_delays
WHERE AIRPORT = 'ATL'
ORDER BY END_TIME DESC

Das Tolle daran ist, dass wir diese Abfrage sogar während des Datenstroms durchführen können! Wie würden wir die neuesten Daten für alle Flughäfen bekommen? Wir könnten alle Daten für jeden Flughafen abrufen, sie nach Zeit sortieren und die neuesten Daten nehmen:

SELECT 
    AIRPORT,
    ARRAY_AGG(
        STRUCT(AVG_ARR_DELAY, AVG_DEP_DELAY, NUM_FLIGHTS, END_TIME)
        ORDER BY END_TIME DESC LIMIT 1) AS a
FROM dsongcp.streaming_delays d
GROUP BY AIRPORT

Die Ergebnisse sehen in etwa so aus:

Row AIRPORT a.AVG_ARR_DELAY a.AVG_DEP_DELAY a.NUM_FLIGHTS a.END_TIME
1 BUR -6.8 -5.666666666666667 8 2015-05-01 03:26:00 UTC
2 HNL 17.11111111111111 -3.7777777777777777 18 2015-05-01 03:46:00 UTC
3 CVG -7.75 null 4 2015-05-01 03:48:00 UTC
4 PHL 5.636363636363637 16.5 13 2015-05-01 03:48:00 UTC
5 IND 40.6 null 5 2015-05-01 03:45:00 UTC

Abfragen wie diese auf Streaming-Daten werden nützlich sein, wenn wir mit dem Aufbau unseres Dashboards beginnen. Mit der ersten Abfrage können wir zum Beispiel ein Zeitreihendiagramm der Verspätungen an einem bestimmten Flughafen erstellen. Mit der zweiten Abfrage können wir eine Karte mit den durchschnittlichen Verspätungen im ganzen Land erstellen.

Real-Time Dashboard

Da wir nun über Streaming-Daten in BigQuery und eine Möglichkeit zur Analyse der Daten verfügen, können wir ein Dashboard erstellen, das Abflug- und Ankunftsverspätungen im Kontext zeigt. Zwei Karten können dabei helfen, den Endnutzern unser auf einer Kontingenztabelle basierendes Modell zu erklären: die aktuellen Verspätungen bei der Ankunft im ganzen Land und die aktuellen Verspätungen beim Abflug im ganzen Land.

Um die Daten für diese Diagramme zu erhalten, müssen wir in Data Studio eine BigQuery-Datenquelle hinzufügen. Obwohl Data Studio die Angabe der Abfrage direkt in der Benutzeroberfläche unterstützt, ist es viel besser, eine Ansicht in BigQuery zu erstellen und diese Ansicht als Datenquelle in Data Studio zu verwenden. BigQuery-Ansichten haben einige Vorteile gegenüber Abfragen, die du in Data Studio eingibst: Sie sind in der Regel in allen Berichten und Visualisierungstools wiederverwendbar, es gibt nur eine Stelle, an der du sie ändern musst, wenn ein Fehler auftritt, und BigQuery-Ansichten lassen sich besser den Zugriffsrechten (Cloud Identity and Access Management-Rollen) zuordnen, je nachdem, auf welche Spalten sie zugreifen müssen.

Hier ist die Abfrage, die ich zum Erstellen der Ansicht verwendet habe:

CREATE OR REPLACE VIEW dsongcp.airport_delays AS
WITH delays AS (
    SELECT d.*, a.LATITUDE, a.LONGITUDE
    FROM dsongcp.streaming_delays d
    JOIN dsongcp.airports a USING(AIRPORT) 
    WHERE a.AIRPORT_IS_LATEST = 1
)
 
SELECT 
    AIRPORT,
    CONCAT(LATITUDE, ',', LONGITUDE) AS LOCATION,
    ARRAY_AGG(
        STRUCT(AVG_ARR_DELAY, AVG_DEP_DELAY, NUM_FLIGHTS, END_TIME)
        ORDER BY END_TIME DESC LIMIT 1) AS a
FROM delays
GROUP BY AIRPORT, LONGITUDE, LATITUDE

Diese Abfrage unterscheidet sich geringfügig von der zweiten Abfrage im vorherigen Abschnitt, da sie auch den Standort des Flughafens hinzufügt, indem sie mit der Flughafentabelle verknüpft wird.

Nachdem wir die Ansicht in BigQuery gespeichert haben, können wir in Data Studio eine Datenquelle für die Ansicht erstellen, so wie wir es im vorherigen Kapitel getan haben:

  • Besuche https://datastudio.google.com.

  • Erstelle eine BigQuery-Datenquelle, verweise sie auf die Ansicht airport_delays und verbinde dich mit ihr.

  • Ändere das Ortsfeld von Text in Geo | Breitengrad, Längengrad und klicke dann auf Bericht erstellen.

  • Füge dem Bericht ein Geodiagramm hinzu.

  • Gib das Ortsfeld als Geodimension an (siehe Abbildung 4-8).

  • Gib die durchschnittliche Abflugverspätung als Dimension und die Vereinigten Staaten als Zoomstufe an.

  • Ändere den Stil so, dass der Farbbalken alle Bereiche umfasst.

  • Wiederhole den Vorgang für die Ankunftsverspätung.

Abbildung 4-8. Dashboard mit den neuesten Flugdaten aus den Vereinigten Staaten.

Es lohnt sich, darüber nachzudenken, was wir in diesem Abschnitt getan haben. Wir verarbeiteten Streaming-Daten in Cloud Dataflow und erstellten gleitende 60-Minuten-Durchschnitte, die wir in BigQuery einspeisten. Dann haben wir in BigQuery eine Ansicht erstellt, die die neuesten Daten für jeden Flughafen anzeigt, auch wenn sie gerade hereinströmen. Diese haben wir mit einem Dashboard in Data Studio verbunden. Jedes Mal, wenn das Dashboard aktualisiert wird, zieht es neue Daten aus der Ansicht, die wiederum dynamisch die neuesten Daten in BigQuery widerspiegelt.

Zusammenfassung

In diesem Kapitel haben wir besprochen, wie man eine Echtzeit-Analyse-Pipeline aufbaut, um Streaming-Analysen durchzuführen und Echtzeit-Dashboards zu bestücken. In diesem Buch verwenden wir einen Datensatz, der nicht in Echtzeit verfügbar ist. Deshalb haben wir die Erstellung eines Echtzeit-Feeds simuliert, um zu zeigen, wie man eine Streaming-Ingest-Pipeline aufbaut. Der Aufbau der Simulation gibt uns auch ein praktisches Testwerkzeug an die Hand - wir müssen nicht mehr warten, bis ein interessantes Ereignis eintritt. Wir können einfach ein aufgezeichnetes Ereignis abspielen!

Bei der Entwicklung der Simulation haben wir festgestellt, dass die Handhabung der Zeit im ursprünglichen Datensatz problematisch war. Deshalb haben wir den Umgang mit der Zeit in den Originaldaten verbessert und einen neuen Datensatz mit UTC-Zeitstempeln und lokalen Versätzen erstellt. Das ist der Datensatz, den wir in Zukunft verwenden werden.

Wir haben uns auch die Referenzarchitektur für die Verarbeitung von Streaming-Daten in Google Cloud Platform angesehen. Zuerst empfängst du deine Daten in Cloud Pub/Sub, damit die Nachrichten asynchron empfangen werden können. Verarbeite die Cloud Pub/Sub-Nachrichten in Cloud Dataflow, berechne je nach Bedarf Aggregationen der Daten und streame entweder die Rohdaten oder die aggregierten Daten (oder beides) zu BigQuery. Wir haben mit allen drei Google Cloud-Produkten (Cloud Pub/Sub, Cloud Dataflow und BigQuery) gearbeitet und dabei die Google Cloud-Client-Bibliotheken in Python verwendet. In keinem dieser Fälle mussten wir jedoch jemals selbst eine virtuelle Maschine erstellen - es handelt sich um serverlose und automatisch skalierende Angebote. So konnten wir uns auf das Schreiben von Code konzentrieren und die Plattform den Rest erledigen lassen.

Empfohlene Ressourcen

Auf der Apache Beam-Website findest du interaktive Programmierübungen, sogenannte Katas, mit denen du Streaming-Konzepte und deren Umsetzung mit Beam spielerisch erlernen kannst.

Dataflow-Vorlagen sind vorgefertigte Apache Beam-Pipelines, die für die Datenmigration nützlich sind. In diesem Kapitel haben wir die Dataflow-Vorlage für das Einlesen von Daten aus Pub/Sub in BigQuery erwähnt. Dataflow Templates gibt es auch für andere Quellen als Google. So gibt es zum Beispiel einen Dataflow-Konnektor von SAP HANA zu BigQuery, der in dem Google-Blogbeitrag "Using Apache Beam and Cloud Dataflow to Integrate SAP HANA and BigQuery" von Babu Prasad Elumalai und Mark Shalda aus dem Jahr 2017 beschrieben wird. Dieser spezielle Connector ist in Java geschrieben.

In diesem Tutorial erfährst du, wie du deine eigene Dataflow-Vorlage erstellst. Jede Dataflow-Pipeline kann in eine Vorlage umgewandelt werden, um sie einfach zu teilen und bequem zu starten.

In dem Artikel "Processing Billions of Events in Real Time at Twitter" (Verarbeitung von Milliarden von Ereignissen in Echtzeit bei Twitter) aus dem Jahr 2021 beschreiben die Twitter-Ingenieure Lu Zhang und Chukwudiuto Malife, wie Twitter 400 Milliarden Ereignisse in Echtzeit mit Dataflow verarbeitet.

1 Das ist eine häufige Situation. Erst wenn du anfängst, einen Datensatz zu erforschen, entdeckst du, dass du zusätzliche Datensätze brauchst. Hätte ich das vorher gewusst, hätte ich beide Datensätze importiert. Aber du folgst ja meinem Arbeitsablauf, und zu diesem Zeitpunkt wusste ich zwar, dass ich einen Datensatz mit Zeitzonenverschiebungen brauche, aber ich hatte noch nicht danach gesucht!

2 Siehe 04_streaming/design/mktbl.sh für die eigentliche Syntax; wir haben hier Anpassungen für den Druck vorgenommen.

3 Oder erstelle eine Kopie oder eine Ansicht der Tabelle mit anonymisierten Spaltenwerten - wir behandeln den Schutz von personenbezogenen Daten in Kapitel 7 und im Anhang.

4 Zum Beispiel wurde die Zeitzone von Sewastopol 2014 nach der Annexion der Krim durch die Russische Föderation von osteuropäischer Zeit (UTC+2) auf Moskauer Zeit (UTC+4) geändert.

5 Die Java-API ist viel ausgereifter und leistungsfähiger, aber Python ist einfacher und übersichtlicher. In diesem Buch werden wir Python verwenden.

6 Wenn du eine ephemere Shell wie Cloud Shell verwendest, musst du die Zeile activate jedes Mal ausführen, wenn du eine neue Sitzung startest. Dadurch wird die virtuelle Umgebung geladen, die du zuvor verwendet hast. Auf diese Weise musst du die Python-Pakete nicht jedes Mal neu installieren.

7 Dieser Code befindet sich in 04_streaming/transform/df01.py aus dem GitHub-Repository dieses Buches.

8 Dieser Code befindet sich in 04_streaming/transform/df02.py aus dem GitHub-Repository dieses Buches.

9 Siehe die Antwort auf die Frage "Wie gehe ich mit NameErrors um?" in der Google-Dokumentation.

10 Das Speichern von Python-Objekten wird Pickling genannt.

11 Der Flughafen von Chicago ist am 30. Juni nicht zusammengepackt und umgezogen. Höchstwahrscheinlich wurde zu diesem Zeitpunkt ein neues Terminal oder eine neue Start- und Landebahn eröffnet, wodurch sich die Lage des Schwerpunkts der Luftausdehnung des Flughafens änderte. Beachte, dass die Veränderung nur 0,0036 Breitengrade beträgt. Bei Chicagos Breitengrad entspricht das etwa 400 Metern.

12 Normalerweise wird empfohlen, eine BigQuery-Tabelle zu sampeln ( SELECT * FROM dsongcp.flights WHERE TABLESAMPLE SYSTEM (0.001) ), weil das Tabellensampling nicht zwischengespeichert wird, sodass wir jedes Mal andere Ergebnisse erhalten. Zum Zeitpunkt der Erstellung dieses Artikels funktioniert das Table Sampling jedoch nur bei Tabellen und Flights ist eine View. Außerdem ist es uns in unserem aktuellen Anwendungsfall egal, ob wir jedes Mal, wenn wir die Abfrage ausführen, unterschiedliche Stichproben erhalten oder nicht. Deshalb verwende ich rand().

13 Siehe die Datei 04_streaming/transform/bqsample.sh.

14 Dieser Code befindet sich in 04_streaming/transform/df03.py aus dem GitHub-Repository dieses Buches.

15 Dieser Code befindet sich in 04_streaming/transform/df04.py aus dem GitHub-Repository dieses Buches.

16 Dieser Code befindet sich in 04_streaming/transform/df05.py aus dem GitHub-Repository dieses Buches.

17 Es öffnet jemandem die Tür, der Abfragen einreicht, die zum Beispiel eine Tabelle löschen können. Dieser XKCD-Cartoon ist berühmt dafür, dass er das Problem hervorhebt.

18 Siehe 04_streaming/simulate/simulate.py im GitHub-Repository.

19 Bei 60-facher Echtzeitgeschwindigkeit dauert es über eine Stunde, bis die Flugdaten von 3 Tagen vollständig vorliegen. Hoffentlich ist das genug Zeit, um den Rest des Kapitels abzuschließen. Wenn nicht, starte den Simulator einfach neu. Wenn du zu früh fertig bist, drücke Strg-C, um den Simulator anzuhalten.

20 Ein Beispiel dafür findest du in der Referenzarchitektur zur Analyse von Spielen auf mobilen Geräten.

21 Siehe 04_streaming/realtime/avg01.py im GitHub-Repository.

22 Wenn du die empfangenen Rohdaten in BigQuery schreiben möchtest, kannst du das natürlich auch tun - so wie im vorherigen Codeschnipsel gezeigt. In diesem Abschnitt gehe ich davon aus, dass wir nur die aggregierten Statistiken der letzten Stunde benötigen.

23 Erinnere dich daran, dass wir alle 5 Minuten Aggregate über 60 Minuten berechnen. Cloud Dataflow geht davon aus, dass das erste "volle" Fenster 65 Minuten nach Beginn der Simulation stattfindet. Da wir mit 30-facher Geschwindigkeit simulieren, sind das zwei Minuten auf deiner Uhr.

Get Data Science auf der Google Cloud Platform, 2. now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.