Kapitel 4. Streaming-Daten: Veröffentlichung und Ingest mit Pub/Sub und Dataflow
Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com
In Kapitel 3, , haben wir ein Dashboard entwickelt, um ein auf einer Kontingenztabelle basierendes Modell zu erklären, das vorschlägt, ob ein Treffen abgesagt werden soll. Dem von uns entwickelten Dashboard fehlte jedoch die Unmittelbarkeit, da es nicht mit dem Kontext der Nutzer verknüpft war. Da die Nutzerinnen und Nutzer ein Dashboard aufrufen und die Informationen sehen müssen, die für sie zu diesem Zeitpunkt relevant sind, müssen wir ein Echtzeit-Dashboard mit Ortsangaben entwickeln.
Wie würden wir unserem Dashboard Kontext hinzufügen? Wir müssen Karten mit Verspätungen in Echtzeit anzeigen. Dazu brauchen wir die Standorte der Flughäfen und Echtzeitdaten. Die Standorte der Flughäfen können beim US Bureau of Transportation Statistics (BTS; dieselbe Behörde, von der wir unsere historischen Flugdaten erhalten haben) abgefragt werden. Flugdaten in Echtzeit sind jedoch ein kommerzielles Produkt. Wenn wir ein Geschäft mit der Vorhersage von Flugankünften aufbauen wollten, würden wir diese Daten kaufen. Für die Zwecke dieses Buches wollen wir es aber nur simulieren.
Die Simulation der Erstellung eines Echtzeit-Feeds aus historischen Daten hat den Vorteil, dass wir beide Seiten einer Streaming-Pipeline (Produktion und Verbrauch) sehen können. Im folgenden Abschnitt sehen wir uns an, wie wir Daten in die Datenbank streamen könnten, wenn wir sie in Echtzeit empfangen würden.
Alle Codeschnipsel in diesem Kapitel sind im Ordner 04_streaming im GitHub-Repository des Buches verfügbar. In der Datei README.md in diesem Verzeichnis findest du Anweisungen, wie du die in diesem Kapitel beschriebenen Schritte ausführst.
Gestaltung des Event Feeds
Nehmen wir an, dass wir einen Ereignis-Feed erstellen wollen, der nicht alle 100 Felder des BTS-Rohdatensatzes enthält, sondern nur die wenigen Felder, die wir in Kapitel 3 als relevant für das Problem der Flugverspätungsvorhersage ausgewählt haben (siehe Abbildung 4-1).
Um einen Echtzeit-Stream der in Abbildung 4-1 gezeigten Fluginformationen zu simulieren, können wir zunächst die historischen Daten in der Ansicht flights
in BigQuery verwenden, müssen sie aber weiter transformieren. Welche Arten von Transformationen werden benötigt?
Erforderliche Transformationen
Beachte, dass FL_DATE
ein Date
ist, während DEP_TIME
ein STRING
ist. Das liegt daran, dass FL_DATE
in der Form 2015-07-03
für den 3. Juli 2015 ist, während DEP_DATE
in der Form 1406
für 14:06 Uhr Ortszeit ist. Das ist unglücklich. Ich mache mir keine Sorgen über die Trennung von Datum und Uhrzeit in zwei Spalten - das können wir ändern. Bedauerlich ist nur, dass die Abfahrtszeit nicht mit einer Zeitzonenverschiebung verbunden ist. So kann in diesem Datensatz eine Abflugzeit von 1406
in verschiedenen Zeilen je nach Zeitzone des Ausgangsflughafens eine andere Zeit sein.
Die Zeitzonenverschiebungen (es gibt zwei, eine für den Start- und eine für den Zielflughafen) sind in den Daten nicht enthalten. Da die Zeitverschiebung vom Standort des Flughafens abhängt, müssen wir einen Datensatz finden, der die Zeitzonenverschiebung jedes Flughafens enthält, und dann diese Daten mit diesem Datensatz zusammenführen.1 Um die nachgelagerte Analyse zu vereinfachen, werden wir dann alle Zeiten in den Daten in eine gemeinsame Zeitzone einordnen - die koordinierte Weltzeit (UTC) ist die traditionelle Wahl für eine gemeinsame Zeitzone für Datensätze. Wir können jedoch nicht auf die Ortszeit verzichten - wir brauchen die Ortszeit, um Analysen durchzuführen, z. B. die typische Verspätung von Morgen- und Abendflügen. Obwohl wir die Ortszeit in UTC umwandeln, speichern wir auch die Zeitzonenverschiebung (z. B. -3.600 Minuten), um die Ortszeit bei Bedarf abrufen zu können.
Deshalb führen wir zwei Transformationen des Originaldatensatzes durch. Erstens konvertieren wir alle Zeitfelder im Rohdatensatz in UTC. Zweitens fügen wir zusätzlich zu den Feldern in den Rohdaten drei Felder für den Herkunftsflughafen und die gleichen drei Felder für den Zielflughafen hinzu: den Breitengrad, den Längengrad und die Zeitzonenverschiebung. Diese Felder werden benannt:
DEP_AIRPORT_LAT, DEP_AIRPORT_LON, DEP_AIRPORT_TZOFFSET ARR_AIRPORT_LAT, ARR_AIRPORT_LON, ARR_AIRPORT_TZOFFSET
Die dritte Änderung, die wir vornehmen müssen, ist, dass wir für jede Zeile im historischen Datensatz mehrere Ereignisse veröffentlichen müssen. Denn wenn wir warten, bis das Flugzeug angekommen ist, um ein einziges Ereignis mit allen Zeilendaten zu veröffentlichen, wäre es zu spät. Wenn wir dies zum Zeitpunkt des Abflugs des Flugzeugs tun, verletzen unsere Modelle die Kausalitätsbeschränkungen. Stattdessen müssen wir für jeden Zustand, in dem sich das Flugzeug befindet, ein Ereignis aussenden. Wir entscheiden uns dafür, fünf Ereignisse für jeden Flug zu senden: wenn der Flug zum ersten Mal geplant wird, wenn der Flug das Gate verlässt, wenn der Flug abhebt, wenn der Flug landet und wenn der Flug ankommt. Diese fünf Ereignisse können nicht alle mit denselben Daten verknüpft werden, weil sich die Erkennbarkeit der Spalten während des Fluges ändert. Wenn wir zum Beispiel ein Ereignis zur Abflugzeit senden, kennen wir die Ankunftszeit nicht. Der Einfachheit halber können wir dieselbe Struktur melden, aber wir müssen sicherstellen, dass nicht wissbare Daten durch ein null
und nicht durch den tatsächlichen Datenwert gekennzeichnet sind.
Architektur
Tabelle 4-1 zeigt, wann diese Ereignisse gesendet werden können und welche Felder in jedem Ereignis enthalten sind.
Veranstaltung | Gesendet um (UTC) | In der Ereignismeldung enthaltene Felder |
---|---|---|
Geplant | CRS_DEP_TIME minus 7 Tage |
FL_DATE , UNIQUE_CARRIER , ORIGIN_AIRPORT_SEQ_ID , ORIGIN , DEST_AIRPORT_SEQ_ID , DEST , CRS_DEP_TIME [nulls] , CRS_ARR_TIME [nulls] , DISTANCE |
Abgereist | DEP_TIME |
Alle Felder, die in der geplanten Nachricht verfügbar sind, plus:
|
Wheelsoff | WHEELS_OFF |
Alle Felder, die in der Abwesenheitsnachricht verfügbar sind, plus: TAXI_OUT und WHEELS_OFF |
Wheelson | WHEELS_ON |
Alle Felder, die in der Abrollnachricht verfügbar sind, plus:
|
Angekommen | ARR_TIME |
Alle Felder, die in der wheelson-Nachricht verfügbar sind, plus: ARR_TIME und ARR_DELAY |
Wir führen die erforderlichen Umwandlungen durch und speichern die umgewandelten Daten dann in einer Datenbank, damit sie für den Ereignissimulationscode zur Verfügung stehen. Abbildung 4-2 zeigt die Schritte, die wir in unserer ETL-Pipeline (Extract-Transform-Load) durchführen werden, sowie die nachfolgenden Schritte, um aus diesen Ereignissen einen Ereignisstrom zu simulieren und anschließend ein Echtzeit-Dashboard aus dem simulierten Ereignisstrom zu erstellen.
Informationen zum Flughafen erhalten
Um die Zeitkorrektur vorzunehmen, brauchen wir den Breiten- und Längengrad jedes Flughafens. Der BTS verfügt über einen Datensatz, der diese Informationen enthält und den wir für die Suche verwenden können. Der Einfachheit halber habe ich die Daten heruntergeladen und sie unter gs://data-science-on-gcp/edition2/raw/airports.csv öffentlich zugänglich gemacht.
Untersuchen wir die Daten, um herauszufinden, wie wir die Längen- und Breitengrade der Flughäfen erhalten. In Kapitel 2, als ich die Daten von flights
untersuchen musste, um das erste Verspätungsmodell zu erstellen, habe ich die Daten in BigQuery geladen.
Müssen wir alle Daten, die uns zur Verfügung gestellt werden, in unser BigQuery-Dataset importieren, um sie zu untersuchen? Nein, natürlich nicht. Wir können BigQuery-Datensätze in anderen Projekten abfragen, ohne dass wir unsere eigenen Kopien der Daten erstellen müssen. In der FROM
Klausel der BigQuery-Abfrage müssen wir nur den Namen des Projekts angeben, in dem sich das Dataset befindet:
SELECT airline, AVG(departure_delay) AS avg_dep_delay FROM `bigquery-samples.airline_ontime_data.flights` GROUP BY airline ORDER by avg_dep_delay DESC
Was aber, wenn uns jemand eine CSV-Datei (Comma-Separated Values) zur Verfügung stellt? Müssen wir die Daten in BigQuery laden, um zu sehen, was in der Datei steht? Nein.
BigQuery ermöglicht die Abfrage von Daten in der Cloud Speicherung durch seine föderierten Abfragefunktionen. Das ist die Fähigkeit von BigQuery, Daten abzufragen, die nicht innerhalb des Data Warehouse-Produkts gespeichert sind, sondern auf Datenquellen wie Google Sheets (eine Tabellenkalkulation auf Google Drive) oder Dateien auf Cloud Storage zugreifen. So könnten wir die Dateien als CSV in der Cloud Speicherung belassen, eine Tabellenstruktur darauf definieren und die CSV-Dateien direkt abfragen. Erinnere dich daran, dass wir die Verwendung von Cloud Storage vorgeschlagen haben, wenn dein Hauptanalyseverfahren die Arbeit mit deinen Daten auf der Ebene von Flat Files beinhaltet.
Der erste Schritt besteht darin, das Schema dieser Dateien zu ermitteln. Schauen wir uns die erste Zeile an:
gsutil cat gs://data-science-on-gcp/edition2/raw/airports.csv | head -1
Wir bekommen:
"AIRPORT_SEQ_ID","AIRPORT_ID","AIRPORT","DISPLAY_AIRPORT_NAME", "DISPLAY_AIRPORT_CITY_NAME_FULL","AIRPORT_WAC_SEQ_ID2","AIRPORT_WAC", "AIRPORT_COUNTRY_NAME","AIRPORT_COUNTRY_CODE_ISO","AIRPORT_STATE_NAME", "AIRPORT_STATE_CODE","AIRPORT_STATE_FIPS","CITY_MARKET_SEQ_ID","CITY_MARKET_ID", "DISPLAY_CITY_MARKET_NAME_FULL","CITY_MARKET_WAC_SEQ_ID2","CITY_MARKET_WAC", "LAT_DEGREES","LAT_HEMISPHERE","LAT_MINUTES","LAT_SECONDS","LATITUDE", "LON_DEGREES","LON_HEMISPHERE","LON_MINUTES","LON_SECONDS","LONGITUDE", "UTC_LOCAL_TIME_VARIATION","AIRPORT_START_DATE","AIRPORT_THRU_DATE", "AIRPORT_IS_CLOSED","AIRPORT_IS_LATEST"
CAST
Verwende diese Kopfzeile, um einen BigQuery-Schema-String des Formats zu schreiben (gib STRING
für jede Spalte an, bei der du dir nicht sicher bist, da du sie bei der Abfrage der Daten jederzeit in das passende Format umwandeln kannst):
AIRPORT_SEQ_ID:INTEGER,AIRPORT_ID:STRING,AIRPORT:STRING, ...
Wenn du einen ähnlichen Datensatz herumliegen hast, kannst du auch mit dessen Schema beginnen und es bearbeiten:
bq show --format=prettyjson dsongcp.sometable > starter.json
Sobald wir das Schema der GCS-Dateien haben, können wir eine Tabellendefinition für die föderierte Quelle erstellen:2
bq mk --external_table_definition= \ ./airport_schema.json@CSV=gs://data-science-on-gcp/edition2/raw/airports.csv \ dsongcp.airports_gcs
Wenn du jetzt die BigQuery-Webkonsole aufrufst, solltest du eine neue Tabelle im dsongcp
Dataset sehen (lade die Seite ggf. neu). Dabei handelt es sich um eine föderierte Datenquelle, deren Speicherung in der CSV-Datei auf Cloud Storage erfolgt. Du kannst sie jedoch wie jede andere BigQuery-Tabelle abfragen:
SELECT AIRPORT_SEQ_ID, AIRPORT_ID, AIRPORT, DISPLAY_AIRPORT_NAME, LAT_DEGREES, LAT_HEMISPHERE, LAT_MINUTES, LAT_SECONDS, LATITUDE FROM dsongcp.airports_gcs WHERE DISPLAY_AIRPORT_NAME LIKE '%Seattle%'
In der vorangegangenen Abfrage versuche ich, in der Datei herauszufinden, welche Spalte für den Flughafen und welche für den Breitengrad ich verwenden muss. Das Ergebnis zeigt, dass AIRPORT
und LATITUDE
die Spalten sind, die mich interessieren, aber dass es mehrere Zeilen gibt, die dem Flughafen SEA entsprechen:
Row |
AIRPORT_SEQ_ID |
AIRPORT_ID |
AIRPORT |
DISPLAY_AIRPORT_NAME |
LAT_DEGREES |
LAT_HEMISPHERE |
LAT_MINUTES |
LAT_SECONDS |
LATITUDE |
---|---|---|---|---|---|---|---|---|---|
1 |
1247701 |
12477 |
JFB |
Seattle 1st National.Bank Helipad |
47 |
N |
36 |
25 |
47.60694444 |
2 |
1474701 |
14747 |
SEA |
Seattle International |
47 |
N |
26 |
50 |
47.44722222 |
3 |
1474702 |
14747 |
SEA |
Seattle/Tacoma International |
47 |
N |
26 |
57 |
47.44916667 |
4 |
1474703 |
14747 |
SEA |
Seattle/Tacoma International |
47 |
N |
27 |
0 |
47.45 |
Zum Glück gibt es eine Spalte, die anzeigt, welche Zeile die neueste Information ist:
SELECT AIRPORT, LATITUDE, LONGITUDE FROM dsongcp.airports_gcs WHERE AIRPORT_IS_LATEST = 1 AND AIRPORT = 'DFW'
Lass dich aber nicht von föderierten Abfragen hinreißen. Am besten eignen sich föderierte Quellen für häufig wechselnde, relativ kleine Datensätze, die mit großen Datensätzen in nativen BigQuery-Tabellen verbunden werden müssen. Da die spaltenbasierte Speicherung in BigQuery so wichtig für die Leistung ist, werden wir die meisten Daten in das native Format von BigQuery laden.
Daten teilen
Jetzt, wo wir die airports.csv in der Cloud Speicherung und den Datensatz der Flughäfen in BigQuery haben, ist es sehr wahrscheinlich, dass unsere Kollegen diese Daten auch nutzen wollen. Einer der Vorteile, wenn du deine Daten in die Cloud (oder genauer gesagt in ein Data Warehouse) bringst, ist, dass du Datensätze über Unternehmensgrenzen hinweg zusammenführen kannst. Wenn es also keinen triftigen Grund gibt, der dagegen spricht, wie z. B. Sicherheitsvorkehrungen, solltest du versuchen, deine Daten weithin zugänglich zu machen.
Die Kosten für die Abfrage werden von der Person getragen, die die Abfrage an die BigQuery-Engine stellt. Du brauchst dir also keine Sorgen zu machen, dass dir dadurch zusätzliche Kosten für deine Abteilung entstehen. Es ist möglich, einen GCS-Bucket "requester-pays" zu machen, um die gleiche Art der Abrechnungstrennung für Daten in der Cloud Speicherung zu erhalten.
Einen Datensatz der Cloud Speicherung teilen
Um einige Daten in der Cloud Speicherung freizugeben, verwende gsutil
:
gsutil -m acl ch -r -u abc@xyz.com:R gs://$BUCKET/data
In dem vorangegangenen Befehl gibt das -m
den Multithreading-Modus an, das -r
ermöglicht den Zugriff rekursiv, beginnend mit dem angegebenen Top-Level-Verzeichnis, und das -u
zeigt an, dass es sich um einen Benutzer handelt, dem Lese (:R
) Zugriff erhält.
Wir könnten der gesamten Organisation oder einer Google-Gruppe Lesezugriff gewähren, indem wir -g
:
gsutil -m acl ch -r -g xyz.com:R gs://$BUCKET/data
Freigabe eines BigQuery-Datensatzes
Die Freigabe von BigQuery kann auf der Granularität einer Spalte, einer Tabelle oder eines Datensatzes erfolgen. Keine unserer BigQuery-Tabellen enthält persönlich identifizierbare oder vertrauliche Informationen. Daher gibt es keinen zwingenden Grund für die Zugriffskontrolle, die Weitergabe von Flugdaten auf Spalten- oder Tabellenebene zu kontrollieren. Wir können also den in Kapitel 2 erstellten Datensatz dsongcp
freigeben und alle Mitarbeiter des Unternehmens, die an diesem Projekt arbeiten, zu bigquery.user
machen, damit sie Abfragen auf diesem Datensatz durchführen können. Das kannst du in der BigQuery-Webkonsole über das Menü Dataset tun.
In manchen Fällen kann es vorkommen, dass dein Datensatz oder deine Tabelle bestimmte Spalten enthält, die persönliche oder vertrauliche Informationen enthalten. Möglicherweise musst du den Zugriff auf diese Spalten einschränken, während der Rest der Tabelle für ein breiteres Publikum zugänglich bleibt.3 Wenn du den Zugriff auf eine Teilmenge einer Tabelle in BigQuery ermöglichen musst (egal, ob es sich um bestimmte Spalten oder bestimmte Zeilen handelt), kannst du Views verwenden. Lege die Tabelle selbst in einem Dataset ab, das nur einer sehr kleinen Gruppe von Nutzern zugänglich ist. Dann erstellst du eine Ansicht auf diese Tabelle, die die relevanten Spalten und Zeilen herauszieht, und speicherst diese Ansicht in einem separaten Dataset, das für eine größere Gruppe von Nutzern zugänglich ist. Deine Nutzer/innen werden nur diese Ansicht abfragen, und da die personenbezogenen oder vertraulichen Informationen nicht einmal in der Ansicht enthalten sind, ist die Wahrscheinlichkeit eines unbeabsichtigten Datenverlusts geringer.
Eine weitere Möglichkeit, den Zugriff auf der Ebene einer BigQuery-Tabelle zu beschränken, ist die Verwendung von Cloud IAM. Um den Zugriff auf der Ebene einer Spalte zu kontrollieren, verwendest du Richtlinien-Tags und den Data Catalog.
Dataplex und Analytics Hub
Sobald du dir angewöhnt hast, Daten in großem Umfang zu teilen, kann die Verwaltung problematisch werden. Es ist besser, wenn du die Daten in der gesamten Cloud Speicherung einheitlich verwalten und die Herkunft der Daten verfolgen kannst. Dafür gibt es Dataplex.
Es kann ziemlich mühsam sein, Tabellen und Datensätze einzeln mit einem Nutzer oder einer Gruppe zu teilen. Um die Freigabe in großem Umfang zu implementieren und Statistiken darüber zu erhalten, wie Personen die von dir freigegebenen Daten nutzen, verwende Analytics Hub.
Zeitkorrektur
Das Korrigieren der in Ortszeit gemeldeten Zeiten in UTC ist kein einfaches Unterfangen. Es gibt mehrere Schritte:
-
Die Ortszeit hängt von dem Ort ab, an dem du dich befindest. Die Flugdaten, die uns vorliegen, enthalten nur den Namen des Flughafens (z.B. ALB für Albany). Wir müssen also den Längen- und Breitengrad anhand eines Flughafencodes ermitteln. Die BTS verfügt über einen Datensatz, der diese Informationen enthält und den wir für die Suche verwenden können.
-
Bei einem Paar aus Breiten- und Längengrad müssen wir die Zeitzone auf einer Karte der globalen Zeitzonen nachschlagen. Wenn wir zum Beispiel den Breiten- und Längengrad des Flughafens in Albany angeben, müssen wir auf
America/New_York
zurückkommen. Es gibt mehrere Webdienste, die dies tun, aber das Python-Pakettimezonefinder
ist eine effizientere Option, weil es komplett offline arbeitet. Der Nachteil ist, dass dieses Paket keine ozeanischen Gebiete und einige historische Zeitzonenänderungen verarbeiten kann,4 aber das ist ein Kompromiss, den wir vorerst eingehen können. -
Die Zeitzonenverschiebung (gegenüber der Greenwich Mean Time [GMT/UTC]) an einem Ort ändert sich im Laufe des Jahres aufgrund von Sommerzeitkorrekturen. In New York zum Beispiel liegt sie im Sommer sechs Stunden und im Winter fünf Stunden hinter der UTC. Mit der Zeitzone (
America/New_York
) brauchen wir also auch das örtliche Abreisedatum und die Uhrzeit (z.B. 13. Januar 2015, 14:08 Uhr), um die entsprechende Zeitzonenverschiebung zu finden. Das Python-Paketpytz
bietet diese Möglichkeit, indem es das zugrunde liegende Betriebssystem nutzt.
Das Problem der uneindeutigen Zeiten bleibt bestehen - jeder Zeitpunkt zwischen 01:00 und 02:00 Uhr Ortszeit tritt zweimal an dem Tag auf, an dem die Uhr von Sommerzeit auf Winterzeit umgestellt wird. Wenn unser Datensatz also einen Flug enthält, der um 01:30 Uhr ankommt, müssen wir entscheiden, welche Zeit das ist. In einer realen Situation würdest du dir die typische Dauer des Fluges ansehen und diejenige wählen, die am wahrscheinlichsten ist. In diesem Buch gehe ich immer von der Winterzeit aus ( is_dst
ist False
), weil dies die Standardzeitzone für diesen Ort ist.
Die Komplexität dieser Schritte sollte dich hoffentlich davon überzeugen, bei der Speicherung von Zeit bewährte Methoden anzuwenden.
Apache Beam/Cloud Dataflow
Der klassische Weg, um Datenpipelines auf der Google Cloud Platform zu erstellen, ist die Verwendung von Cloud Dataflow. Cloud Dataflow ist eine Externalisierung der Technologien Flume und MillWheel, die bei Google schon seit einigen Jahren weit verbreitet sind. Es verwendet ein Programmiermodell, das sowohl Batch- als auch Streaming-Daten auf einheitliche Weise verarbeitet und so die Möglichkeit bietet, dieselbe Codebasis sowohl für die Batch- als auch für die kontinuierliche Stream-Verarbeitung zu nutzen. Der Code selbst wird in Apache Beam geschrieben, entweder in Java, Python oder Go,5 und ist portabel, d. h. er kann in verschiedenen Ausführungsumgebungen ausgeführt werden, darunter Apache Flink und Apache Spark. Auf GCP bietet Cloud Dataflow einen vollständig verwalteten (serverlosen) Dienst, der Beam-Pipelines ausführen kann. Die Ressourcen werden bei Bedarf zugewiesen und skalieren automatisch, um sowohl minimale Latenzzeiten als auch eine hohe Ressourcenauslastung zu erreichen.
Bei der Beam-Programmierung wird eine Pipeline (eine Reihe von Datenumwandlungen) erstellt, die an einen Runner übergeben wird. Der Runner baut einen Graphen auf und leitet die Daten dann durch ihn hindurch. Jeder Eingangsdatensatz kommt von einer Quelle und jeder Ausgangsdatensatz wird an eine Senke gesendet. Abbildung 4-3 zeigt die Beam-Pipeline, die wir gleich aufbauen werden.
Vergleiche die Schritte in Abbildung 4-2 mit dem Blockdiagramm der ETL-Pipeline (extract-transform-load) in Abbildung 4-3. Lass uns die Datenpipeline Stück für Stück aufbauen.
Parsing von Flughafendaten
Du kannst Informationen über die Lage der Flughäfen von der BTS-Website herunterladen. Ich habe alle Felder ausgewählt, die CSV-Datei auf meine lokale Festplatte heruntergeladen, sie extrahiert und mit gzip
komprimiert. Die gepackte Flughafendatei ist im GitHub-Repository für dieses Buch verfügbar.
Um Apache Beam von der Cloud Shell aus nutzen zu können, müssen wir es in unserer Python-Umgebung installieren. Installiere jetzt auch die Zeitzonenpakete:6
virtualenv ~/beam_env source ~/beam_env/bin/activate python3 -m pip install --upgrade \ timezonefinder pytz \ 'apache-beam[gcp]'
Die Read
Transformation in der folgenden Beam-Pipeline liest die Flughafendatei Zeile für Zeile ein:7
with beam.Pipeline('DirectRunner') as pipeline: airports = (pipeline | beam.io.ReadFromText('airports.csv.gz') | beam.Map(lambda line: next(csv.reader([line]))) | beam.Map(lambda fields: (fields[0], (fields[21], fields[26]))) )
Nehmen wir zum Beispiel an, dass eine der Eingabezeilen, die aus der Textdatei source ausgelesen wird, die folgende ist:
1000401,10004,"04A","Lik Mining Camp","Lik, AK",101,1,"United States","US","Alaska","AK","02",3000401,30004,"Lik, AK",101,1,68,"N",5,0,68.08333333,163,"W",10,0,-163.16666667,"",2007-07-01,,0,1,
Die erste Map
nimmt diese Zeile und gibt sie an einen CSV-Leser weiter, der sie parst (und dabei Felder wie Lik, AK
berücksichtigt, die Kommas enthalten) und die Felder als eine Liste von Strings herauszieht. Diese Felder werden dann an die nächste Transformation weitergegeben. Die zweite Map
nimmt die Felder als Eingabe und gibt ein Tupel der Form aus (die extrahierten Felder sind im vorherigen Beispiel fett gedruckt):
(1000401, (68.08333333,-163.16666667))
Die erste Zahl ist der eindeutige Flughafencode (wir verwenden diesen anstelle des dreibuchstabigen Codes des Flughafens, weil sich die Lage des Flughafens im Laufe der Zeit ändern kann), und die nächsten beiden Zahlen sind das Paar aus Breiten- und Längengrad für die Lage des Flughafens. Die Variable airports
, die das Ergebnis dieser drei Transformationen ist, ist keine einfache speicherinterne Liste dieser Tupel. Stattdessen handelt es sich um eine unveränderliche Sammlung, die PCollection
genannt wird, und die du aus dem Speicher nehmen und verteilen kannst.
Wir können den Inhalt von PCollection
in eine Textdatei schreiben, um zu überprüfen, ob sich die Pipeline richtig verhält:
(airports | beam.Map(lambda airport_data: '{},{}'.format(airport_data[0], ',' \ .join(airport_data[1])) ) | beam.io.WriteToText('extracted_airports') )
Probiere es aus: Der Code in 04_streaming/transform/df01.py ist einfach ein Python-Programm, das du von der Kommandozeile aus starten kannst. Installiere zuerst das Apache Beam-Paket, falls du das noch nicht getan hast, und führe dann das Programm df01.py aus, während du dich in dem Verzeichnis befindest, das das GitHub-Repository dieses Buches enthält:
cd 04_streaming/simulate ./install_packages.sh python3 ./df01.py
Dadurch wird der Code in df01.py lokal ausgeführt. Später werden wir die Pipeline-Zeile ändern in:
with beam.Pipeline('DataflowRunner') as pipeline:
und die Pipeline auf der Google Cloud Platform mit dem Cloud Dataflow Service laufen lassen. Mit dieser Änderung wird die Datenpipeline einfach durch die Ausführung des Python-Programms auf mehreren Workern in der Cloud gestartet. Wie bei vielen verteilten Systemen wird die Ausgabe von Cloud Dataflow potenziell in eine oder mehrere Dateien gesplittet. Du erhältst eine Datei, deren Name mit "extracted_airports" beginnt (bei mir war es extracted_airports-00000-of-00001), von der einige Zeilen etwa so aussehen könnten:
1000101,58.10944444,-152.90666667 1000301,65.54805556,-161.07166667
Die Spalten sind AIRPORT_SEQ_ID
, LATITUDE
und LONGITUDE
- die Reihenfolge der Zeilen hängt davon ab, welcher der parallelen Arbeiter zuerst fertig geworden ist, sie kann also unterschiedlich sein.
Hinzufügen von Zeitzoneninformationen
Ändern wir nun den Code, um die Zeitzone zu bestimmen, die einem Breitengrad/Längengrad-Paar entspricht. In unserer Pipeline geben wir nicht einfach das Paar Breitengrad/Längengrad aus, sondern eine Liste mit drei Elementen: Breitengrad, Längengrad und Zeitzone:
airports = (pipeline | beam.Read(beam.io.ReadFromText('airports.csv.gz')) | beam.Map(lambda line: next(csv.reader([line]))) | beam.Map(lambda fields: (fields[0], addtimezone(fields[21], fields[26]))) )
Das Schlüsselwort lambda
in Python setzt eine anonyme Funktion ein. Im Fall der ersten Verwendung von lambda
im vorangegangenen Schnipsel nimmt diese Methode einen Parameter (line
) und gibt das zurück, was auf den Doppelpunkt folgt. Wir können die Zeitzone mit Hilfe des Pakets timezonefinder
bestimmen:8
def addtimezone(lat, lon): import timezonefinder tf = timezonefinder.TimezoneFinder() lat = float(lat) lon = float(lon) return (lat, lon, tf.timezone_at(lng=lon, lat=lat))
Die Position der Import-Anweisung im vorangegangenen Beispiel mag seltsam aussehen (die meisten Python-Importe befinden sich am Anfang der Datei), aber dieses Import-innerhalb-der-Funktion-Muster wird von Cloud Dataflow empfohlen, damit,9 damit bei der Übermittlung an die Cloud nicht auch die importierten Pakete in die Hauptsitzung importiert werden.10
Für den Moment werden wir diese Datei(df02.py) jedoch lokal ausführen. Das wird eine Weile dauern, weil die Berechnung der Zeitzone eine große Anzahl von Polygonüberschneidungen erfordert und weil wir sie lokal ausführen und (noch!) nicht in der Cloud verteilen. Um die Berechnung zu beschleunigen, fügen wir einen Filter hinzu, um die Anzahl der Flughäfen zu reduzieren, die wir nachschlagen müssen:
| beam.io.ReadFromText('airports.csv.gz') | beam.Filter(lambda line: "United States" in line and line[-2:] == '1,')
Die BTS-Flugverspätungsdaten beziehen sich nur auf Inlandsflüge in den USA, daher brauchen wir die Zeitzonen von Flughäfen außerhalb der Vereinigten Staaten nicht. Der Grund für die zweite Überprüfung ist, dass sich die Standorte der Flughäfen im Laufe der Zeit ändern, wir uns aber nur für den aktuellen Standort des Flughafens interessieren. Hier sind zum Beispiel die Flughafenstandorte für ORD (oder Chicago):
1393001,...,"ORD",...,41.97805556,...,-87.90611111,...,1950-01-01,2011-06-30,0,0, 1393002,...,"ORD",...,41.98166667,...,-87.90666667,...,2011-07-01,2013-09-30,0,0, 1393003,...,"ORD",...,41.97944444,...,-87.90750000,...,2013-10-01,2015-09-30,0,0, 1393004,...,"ORD",...,41.97722222,...,-87.90805556,...,2015-10-01,,0,1,
Die erste Reihe erfasst die Lage des Flughafens von Chicago zwischen 1950 und dem 30. Juni 2011.11 Die zweite Zeile ist vom 1. Juli 2011 bis zum 30. September 2013 gültig. Die letzte Zeile hingegen ist der aktuelle Standort, was dadurch gekennzeichnet ist, dass die letzte Spalte (das Feld AIRPORT_IS_LATEST
) eine 1 ist.
Das ist aber nicht die einzige Zeile, die uns interessiert! Bei Flügen vor dem 01.10.2015 wird die ID der vorletzten Zeile gemeldet. Wir könnten eine Prüfung dafür einbauen, aber das sieht für eine kleine Optimierung ziemlich riskant aus. Ich entferne also diese letzte Prüfung, so dass wir nur noch:
| beam.io.ReadFromText('airports.csv.gz') | beam.Filter(lambda line: "United States" in line)
Wenn ich dies tue und df02.py ausführe, sehen die extrahierten Informationen für die Flughäfen wie folgt aus:
1672301,62.03611111,-151.45222222,America/Anchorage 1672401,43.87722222,-73.41305556,America/New_York 1672501,40.75722222,-119.21277778,America/Los_Angeles
Die letzte Spalte der extrahierten Informationen enthält die Zeitzone, die aus dem Breiten- und Längengrad jedes Flughafens ermittelt wurde.
Zeiten in UTC umwandeln
Jetzt, da wir die Zeitzone für jeden Flughafen kennen, können wir uns daran machen, die Zeiten in den Daten von flights
in UTC umzuwandeln. Zu dem Zeitpunkt, an dem wir das Programm entwickeln, möchten wir nicht alle Monate, die wir in BigQuery haben, verarbeiten - das Warten auf die Abfrage bei jedem Programmstart wäre lästig. Stattdessen werden wir eine kleine Stichprobe der flights
Daten in BigQuery erstellen, mit denen wir unseren Code entwickeln:12
SELECT * FROM dsongcp.flights WHERE RAND() < 0.001
Dies ergibt etwa 6.000 Zeilen. Wir können die BigQuery-Web-UI verwenden, um die Ergebnisse als JSON-Datei (JavaScript Object Notation) zu speichern. Ich ziehe es jedoch vor, die Dinge per Skript zu erledigen:13
bq query --destination_table dsongcp.flights_sample \ --replace --nouse_legacy_sql \ 'SELECT * FROM dsongcp.flights WHERE RAND() < 0.001' bq extract --destination_format=NEWLINE_DELIMITED_JSON \ dsongcp.flights_sample \ gs://${BUCKET}/flights/ch4/flights_sample.json gsutil cp gs://${BUCKET}/flights/ch4/flights_sample.json
Dadurch wird eine Datei mit dem Namen flight_sample.json erstellt, von der eine Zeile ähnlich aussieht wie diese:
{"FL_DATE":"2015-04-28","UNIQUE_CARRIER":"EV","ORIGIN_AIRPORT_SEQ_ID":"1013503", "ORIGIN":"ABE","DEST_AIRPORT_SEQ_ID":"1039705","DEST":"ATL", "CRS_DEP_TIME":"1600","DEP_TIME":"1555","DEP_DELAY":-5,"TAXI_OUT":7, "WHEELS_OFF":"1602","WHEELS_ON":"1747","TAXI_IN":4,"CRS_ARR_TIME":"1809", "ARR_TIME":"1751","ARR_DELAY":-18,"CANCELLED":false,"DIVERTED":false, "DISTANCE":"692.00"}
Das Lesen der Flugdaten beginnt ähnlich wie das Lesen der Flughafendaten:14
flights = (pipeline | 'flights:read' >> beam.io.ReadFromText('flights_sample.json') | 'flights:parse' >> beam.Map(lambda line: json.loads(line))
Dies ist derselbe Code wie beim Einlesen der Datei airports.csv.gz, nur dass ich diesem Transformationsschritt auch einen Namen gebe (flights:read
) und einen JSON-Parser statt eines CSV-Parsers verwende. Beachte die Syntax hier:
| 'name-of-step' >> transform_function()
Der nächste Schritt ist jedoch anders, da er zwei PCollections
beinhaltet. Wir müssen die Flugdaten mit den Flughafendaten verknüpfen, um die Zeitzone für jeden Flug zu finden. Dazu machen wir die Flughäfen PCollection
zu einem "Side-Input". Side-Inputs in Beam sind wie Sichten auf das Original PCollection
und sind entweder Listen oder Dicts (Wörterbücher). In diesem Fall erstellen wir ein Diktat, das die Flughafen-ID den Informationen über die Flughäfen zuordnet:
flights = (pipeline |'flights:read' >> beam.io.ReadFromText('flights_sample.json') | 'flights:parse' >> beam.Map(lambda line: json.loads(line)) |'flights:tzcorr' >> beam.FlatMap(tz_correct, beam.pvalue.AsDict(airports)) )
Die Tatsache, dass PCollection
eine Python-Liste oder ein Python-Dict sein muss, bedeutet, dass die Seiteneingaben klein genug sein müssen, um in den Speicher zu passen. Wenn du zwei große PCollections
verbinden musst, die nicht in den Speicher passen, verwende eine CoGroupByKey.
Die Methode FlatMap()
ruft eine Methode tz_correct()
auf, die den geparsten Inhalt einer Zeile aus flights_sample.json (mit den Informationen zu einem einzelnen Flug) und ein Python-Wörterbuch (mit allen Zeitzoneninformationen der Flughäfen) entgegennimmt:
def tz_correct(fields, airport_timezones): try: # convert all times to UTC # ORIGIN_AIRPORT_SEQ_ID is the name of JSON attribute dep_airport_id = fields["ORIGIN_AIRPORT_SEQ_ID"] arr_airport_id = fields["DEST_AIRPORT_SEQ_ID"] # airport_id is the key to airport_timezones dict # and the value is a tuple (lat, lon, timezone) dep_timezone = airport_timezones[dep_airport_id][2] arr_timezone = airport_timezones[arr_airport_id][2] for f in ["CRS_DEP_TIME", "DEP_TIME", "WHEELS_OFF"]: fields[f] = as_utc(fields["FL_DATE"], fields[f], dep_timezone) for f in ["WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]: fields[f] = as_utc(fields["FL_DATE"], fields[f], arr_timezone) yield json.dumps(fields) except KeyError as e: logging.exception(" Ignoring " + line + " because airport is not known")
Warum FlatMap()
anstelle von Map
, um tz_correct()
aufzurufen? Eine Map
ist eine 1-zu-1-Beziehung zwischen Eingabe und Ausgabe, während eine FlatMap()
0-N
Ausgaben pro Eingabe zurückgeben kann. Dies geschieht mit einer Python-Generatorfunktion (d.h. mit dem Schlüsselwort yield
- stell dir yield
als Rückgabe vor, die ein Element nach dem anderen zurückgibt, bis keine Daten mehr zurückgegeben werden können). Die Verwendung von FlatMap
ermöglicht es uns, Fluginformationen zu unbekannten Flughäfen zu ignorieren - auch wenn dies in den historischen Daten, die wir verarbeiten, nicht vorkommt, kann ein bisschen defensive Programmierung nicht schaden.
Der Code tz_correct()
holt sich die ID des Abflughafens aus den Flugdaten und sucht dann die Zeitzone für diese Flughafen-ID aus den Flughafendaten heraus. Nachdem er die Zeitzone ermittelt hat, ruft er die Methode as_utc()
auf, um jede der in der Zeitzone des Flughafens gemeldeten Uhrzeiten in UTC umzuwandeln:
def as_utc(date, hhmm, tzone): try: if len(hhmm) > 0 and tzone is not None: import datetime, pytz loc_tz = pytz.timezone(tzone) loc_dt = loc_tz.localize(datetime.datetime.strptime(date,'%Y-%m-%d'), is_dst=False) loc_dt += datetime.timedelta(hours=int(hhmm[:2]), minutes=int(hhmm[2:])) utc_dt = loc_dt.astimezone(pytz.utc) return utc_dt.strftime('%Y-%m-%d %H:%M:%S') else: return '' # empty string corresponds to canceled flights except ValueError as e: print('{} {} {}'.format(date, hhmm, tzone)) raise e
Wie bisher kannst du es lokal ausführen. Führe dazu df03.py
aus. Eine Zeile, die ursprünglich (in den Rohdaten) so aussah:
{"FL_DATE":"2015-11-05","UNIQUE_CARRIER":"DL","ORIGIN_AIRPORT_SEQ_ID":"1013503", "ORIGIN":"ABE","DEST_AIRPORT_SEQ_ID":"1039705","DEST":"ATL", "CRS_DEP_TIME":"0600","DEP_TIME":"0556","DEP_DELAY":-4,"TAXI_OUT":12, "WHEELS_OFF":"0608","WHEELS_ON":"0749","TAXI_IN":10,"CRS_ARR_TIME":"0818", "ARR_TIME":"0759","ARR_DELAY":-19,"CANCELLED":false, "DIVERTED":false,"DISTANCE":"692.00"}
wird jetzt:
{"FL_DATE": "2015-11-05", "UNIQUE_CARRIER": "DL", "ORIGIN_AIRPORT_SEQ_ID": "1013503", "ORIGIN": "ABE", "DEST_AIRPORT_SEQ_ID": "1039705", "DEST": "ATL", "CRS_DEP_TIME": "2015-11-05 11:00:00", "DEP_TIME": "2015-11-05 10:56:00", "DEP_DELAY": -4, "TAXI_OUT": 12, "WHEELS_OFF": "2015-11-05 11:08:00", "WHEELS_ON": "2015-11-05 12:49:00", "TAXI_IN": 10, "CRS_ARR_TIME": "2015-11-05 13:18:00", "ARR_TIME": "2015-11-05 12:59:00", "ARR_DELAY": -19, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "692.00"}
Alle Zeiten wurden in UTC umgerechnet. Zum Beispiel wurde die 0759
Ankunftszeit in Atlanta in UTC umgewandelt und lautet nun 12:59:00.
Daten korrigieren
Schau dir die folgende Zeile über einen Flug von Honolulu (HNL) nach Dallas-Fort Worth (DFW) genau an. Fällt dir etwas Merkwürdiges auf?
{"FL_DATE": "2015-03-06", "UNIQUE_CARRIER": "AA", "ORIGIN_AIRPORT_SEQ_ID": "1217302", "ORIGIN": "HNL", "DEST_AIRPORT_SEQ_ID": "1129803", "DEST": "DFW", "CRS_DEP_TIME": "2015-03-07 05:30:00", "DEP_TIME": "2015-03-07 05:22:00", "DEP_DELAY": -8, "TAXI_OUT": 40, "WHEELS_OFF": "2015-03-07 06:02:00", "WHEELS_ON": "2015-03-06 12:32:00", "TAXI_IN": 7, "CRS_ARR_TIME": "2015-03-06 12:54:00", "ARR_TIME": "2015-03-06 12:39:00", "ARR_DELAY": -15, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "3784.00"}
Schau dir die Abflugzeit in Honolulu und die Ankunftszeit in Dallas an - der Flug kommt am Tag vor dem Abflug an! Das liegt daran, dass das Flugdatum (2015-03-06) das Datum des Abflugs in Ortszeit ist. Wenn dann noch ein Zeitunterschied zwischen den Flughäfen hinzukommt, ist es gut möglich, dass es nicht das Ankunftsdatum ist. Wir suchen nach solchen Situationen und fügen bei Bedarf 24 Stunden hinzu. Das ist natürlich ein ziemlicher Hack (habe ich schon erwähnt, dass die Zeiten in UTC gespeichert werden sollten?!):
def add_24h_if_before(arrtime, deptime): import datetime if len(arrtime) > 0 and len(deptime) > 0 and arrtime < deptime: adt = datetime.datetime.strptime(arrtime, '%Y-%m-%d %H:%M:%S') adt += datetime.timedelta(hours=24) return adt.strftime('%Y-%m-%d %H:%M:%S') else: return arrtime
Der 24-Stunden-Hack wird kurz vor dem Ertrag in tz_correct
aufgerufen.15 Jetzt, da wir neue Daten über die Flughäfen haben, ist es wahrscheinlich sinnvoll, sie zu unserem Datensatz hinzuzufügen. Außerdem wollen wir, wie bereits erwähnt, die Abweichung der Zeitzone von der UTC im Auge behalten, da für einige Analysen die Kenntnis der Ortszeit erforderlich sein könnte. Der neue Code tz_correct
lautet also wie folgt:
def tz_correct(line, airport_timezones): fields = json.loads(line) try: # convert all times to UTC dep_airport_id = fields["ORIGIN_AIRPORT_SEQ_ID"] arr_airport_id = fields["DEST_AIRPORT_SEQ_ID"] dep_timezone = airport_timezones[dep_airport_id][2] arr_timezone = airport_timezones[arr_airport_id][2] for f in ["CRS_DEP_TIME", "DEP_TIME", "WHEELS_OFF"]: fields[f], deptz = as_utc(fields["FL_DATE"], fields[f], dep_timezone) for f in ["WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]: fields[f], arrtz = as_utc(fields["FL_DATE"], fields[f], arr_timezone) for f in ["WHEELS_OFF", "WHEELS_ON", "CRS_ARR_TIME", "ARR_TIME"]: fields[f] = add_24h_if_before(fields[f], fields["DEP_TIME"]) fields["DEP_AIRPORT_TZOFFSET"] = deptz fields["ARR_AIRPORT_TZOFFSET"] = arrtz yield json.dumps(fields) except KeyError as e: logging.exception(" Ignoring " + line + " because airport is not known")
Wenn ich df04.py ausführe, auf die diese Änderungen angewendet wurden, wird der Flug von Honolulu nach Dallas:
{"FL_DATE": "2015-03-06", "UNIQUE_CARRIER": "AA", "ORIGIN_AIRPORT_SEQ_ID": "1217302", "ORIGIN": "HNL", "DEST_AIRPORT_SEQ_ID": "1129803", "DEST": "DFW", "CRS_DEP_TIME": "2015-03-07 05:30:00", "DEP_TIME": "2015-03-07 05:22:00", "DEP_DELAY": -8, "TAXI_OUT": 40, "WHEELS_OFF": "2015-03-07 06:02:00", "WHEELS_ON": "2015-03-07 12:32:00", "TAXI_IN": 7, "CRS_ARR_TIME": "2015-03-07 12:54:00", "ARR_TIME": "2015-03-07 12:39:00", "ARR_DELAY": -15, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "3784.00", "DEP_AIRPORT_TZOFFSET": -36000.0, "ARR_AIRPORT_TZOFFSET": -21600.0}
Wie du sehen kannst, wurden die Daten jetzt korrigiert (siehe die fettgedruckten Teile).
Ereignisse schaffen
Nachdem wir unsere zeitkorrigierten Daten haben, können wir mit der Erstellung von Ereignissen für die Veröffentlichung in Pub/Sub fortfahren. Für den Moment beschränken wir uns auf die Nachrichten departed
und arrived
- wir können die Pipeline wiederholen, um die zusätzlichen Ereignisse zu erstellen, wenn wir bei der Modellierung andere Ereignisse verwenden:
def get_next_event(fields): if len(fields["DEP_TIME"]) > 0: event = dict(fields) # copy event["EVENT_TYPE"] = "departed" event["EVENT_TIME"] = fields["DEP_TIME"] for f in ["TAXI_OUT", "WHEELS_OFF", "WHEELS_ON", "TAXI_IN", "ARR_TIME", "ARR_DELAY", "DISTANCE"]: event.pop(f, None) # not knowable at departure time yield event if len(fields["ARR_TIME"]) > 0: event = dict(fields) event["EVENT_TYPE"] = "arrived" event["EVENT_TIME"] = fields["ARR_TIME"] yield event
Im Wesentlichen nehmen wir die Abfahrtszeit und erstellen ein departed
Ereignis zu dieser Zeit, nachdem wir sichergestellt haben, dass wir die Felder (wie die Ankunftsverspätung), die wir zur Abfahrtszeit nicht kennen können, entfernen. In ähnlicher Weise verwenden wir die Ankunftszeit, um ein arrived
Ereignis zu erstellen, wie in Abbildung 4-4 dargestellt.
In der Pipeline wird der Code zur Ereigniserzeugung auf flights PCollection
aufgerufen, nachdem die Umstellung auf UTC erfolgt ist:
flights = (pipeline |'flights:read' >> beam.io.ReadFromText('flights_sample.json') |'flights:tzcorr' >> beam.FlatMap(tz_correct, beam.pvalue.AsDict(airports)) ) events = flights | beam.FlatMap(get_next_event)
Wenn wir nun die Pipeline laufen lassen,16 werden wir zwei Ereignisse für jeden Flug sehen:
{"FL_DATE": "2015-04-28", "UNIQUE_CARRIER": "EV", "ORIGIN_AIRPORT_SEQ_ID": "1013503", "ORIGIN": "ABE", "DEST_AIRPORT_SEQ_ID": "1039705", "DEST": "ATL", "CRS_DEP_TIME": "2015-04-28 20:00:00", "DEP_TIME": "2015-04-28 19:55:00", "DEP_DELAY": -5, "CRS_ARR_TIME": "2015-04-28 22:09:00", "CANCELLED": false, "DIVERTED": false, "DEP_AIRPORT_TZOFFSET": -14400.0, "ARR_AIRPORT_TZOFFSET": -14400.0, "EVENT_TYPE": "departed", "EVENT_TIME": "2015-04-28 19:55:00"} {"FL_DATE": "2015-04-28", "UNIQUE_CARRIER": "EV", "ORIGIN_AIRPORT_SEQ_ID": "1013503", "ORIGIN": "ABE", "DEST_AIRPORT_SEQ_ID": "1039705", "DEST": "ATL", "CRS_DEP_TIME": "2015-04-28 20:00:00", "DEP_TIME": "2015-04-28 19:55:00", "DEP_DELAY": -5, "TAXI_OUT": 7, "WHEELS_OFF": "2015-04-28 20:02:00", "WHEELS_ON": "2015-04-28 21:47:00", "TAXI_IN": 4, "CRS_ARR_TIME": "2015-04-28 22:09:00", "ARR_TIME": "2015-04-28 21:51:00", "ARR_DELAY": -18, "CANCELLED": false, "DIVERTED": false, "DISTANCE": "692.00", "DEP_AIRPORT_TZOFFSET": -14400.0, "ARR_AIRPORT_TZOFFSET": -14400.0, "EVENT_TYPE": "arrived", "EVENT_TIME": "2015-04-28 21:51:00"}
Das erste Ereignis ist ein departed
Ereignis und soll zur Abfahrtszeit veröffentlicht werden, während das zweite Ereignis ein arrived
Ereignis ist und zur Ankunftszeit veröffentlicht werden soll. Das Ereignis departed
enthält eine Reihe fehlender Felder für Daten, die zu diesem Zeitpunkt noch nicht bekannt sind.
Sobald dieser Code funktioniert, fügen wir ein drittes Ereignis hinzu, das gesendet wird, wenn das Flugzeug abhebt:
if len(fields["WHEELS_OFF"]) > 0: event = dict(fields) # copy event["EVENT_TYPE"] = "wheelsoff" event["EVENT_TIME"] = fields["WHEELS_OFF"] for f in ["WHEELS_ON", "TAXI_IN", "ARR_TIME", "ARR_DELAY", "DISTANCE"]: event.pop(f, None) # not knowable at departure time yield event
Zu diesem Zeitpunkt haben wir noch kein Abroll-Ereignis erstellt.
Lesen und Schreiben in der Cloud
Bis jetzt haben wir lokale Dateien gelesen und geschrieben. Sobald wir jedoch unseren Code in einer serverlosen Umgebung in Produktion gehen lassen, macht das Konzept eines lokalen Laufwerks keinen Sinn mehr. Wir müssen von der Cloud Speicherung lesen und schreiben. Da es sich um strukturierte Daten handelt, ist es außerdem besser, sie in BigQuery zu lesen und zu schreiben - erinnerst du dich daran, dass wir in Kapitel 2 unseren kompletten Datensatz in BigQuery geladen haben? Jetzt möchten wir auch die umgewandelten (zeitkorrigierten) Daten dort ablegen.
Glücklicherweise muss dabei nur die Quelle oder die Senke geändert werden. Der Rest der Pipeline bleibt unverändert. Im vorigen Abschnitt (siehe 04_streaming/transform/df05.py) haben wir die Datei airports.csv.gz zum Beispiel so gelesen:
| 'airports:read' >> beam.io.ReadFromText('airports.csv.gz')
Um die entsprechende Datei aus der Cloud Speicherung zu lesen, ändern wir den entsprechenden Code in 04_streaming/transform/df06.py wie folgt:
airports_filename = 'gs://{}/flights/airports/airports.csv.gz'.format( bucket) ... | 'airports:read' >> beam.io.ReadFromText(airports_filename)
Natürlich müssen wir dafür sorgen, dass die Datei in die Cloud Speicherung hochgeladen wird und von demjenigen, der den Code ausführt, gelesen werden kann. Die Datendatei in unserem GitHub-Repository zur Verfügung zu stellen, würde ohnehin nicht skalieren - Cloud Storage (oder BigQuery) ist der richtige Ort für die Daten.
In df05.py musste ich eine lokale Datei lesen, die den JSON-Export eines intelligenten Teils des Datensatzes enthielt, und einen JSON-Parser verwenden, um ein Diktat zu erhalten:
| 'flights:read' >> beam.io.ReadFromText('flights_sample.json') | 'flights:parse' >> beam.Map(lambda line: json.loads(line))
In df06.py wird der entsprechende Code einfacher, weil der BigQuery-Reader ein dict zurückgibt, in dem die Spaltennamen der Ergebnismenge die Schlüssel sind:
'flights:read' >> beam.io.ReadFromBigQuery( query='SELECT * FROM dsongcp.flights WHERE rand() < 0.001', use_standard_sql=True)
Wenn wir die Abfrage wirklich ausführen, werden wir natürlich die Stichproben entfernen (rand() < 0.001
), damit wir den gesamten Datensatz verarbeiten können.
Ähnlich verhält es sich, wenn wir vorher in eine lokale Datei geschrieben haben:
| 'flights:tostring' >> beam.Map(lambda fields: json.dumps(fields)) | 'flights:out' >> beam.io.textio.WriteToText('all_flights')
ändern wir den Code, um in die Cloud Speicherung zu schreiben:
flights_output = 'gs://{}/flights/tzcorr/all_flights'.format(bucket) ... | 'flights:tostring' >> beam.Map(lambda fields: json.dumps(fields)) | 'flights:gcsout' >> beam.io.textio.WriteToText(flights_output)
Wir können die gleichen Daten auch in eine BigQuery-Tabelle schreiben:
flights_schema = \ 'FL_DATE:date,UNIQUE_CARRIER:string,...CANCELLED:boolean' ... | 'flights:bqout' >> beam.io.WriteToBigQuery( 'dsongcp.flights_tzcorr', schema=flights_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED )
Beachte, dass wir ein Schema angeben müssen, wenn wir in BigQuery schreiben, und angeben müssen, was zu tun ist, wenn die Tabelle bereits existiert (wir bitten darum, dass die Tabelle gekürzt und der Inhalt ersetzt wird) und wenn die Tabelle noch nicht existiert (wir bitten darum, dass die Tabelle erstellt wird).
Wir können versuchen, diesen Code auszuführen, aber die Pipeline benötigt ein paar zusätzliche Parameter. Wo wir also vorher hatten:
with beam.Pipeline('DirectRunner') as pipeline:
die wir jetzt brauchen:
argv = [ '--project={0}'.format(project), '--staging_location=gs://{0}/flights/staging/'.format(bucket), '--temp_location=gs://{0}/flights/temp/'.format(bucket), '--runner=DirectRunner' ] with beam.Pipeline(argv=argv) as pipeline:
Der Grund dafür ist, dass wir, wenn wir aus BigQuery lesen, eine Abfrage bereitstellen:
'flights:read' >> beam.io.ReadFromBigQuery( query='SELECT * FROM dsongcp.flights WHERE rand() < 0.001', use_standard_sql=True)
Wir müssen also das Projekt angeben, das abgerechnet werden soll. Darüber hinaus - und das ist ein Detail der Implementierung - müssen einige temporäre Daten in der Cloud-Speicherung bereitgestellt und zwischengespeichert werden, und wir müssen der Pipeline einen Speicherort für diese temporären Daten zur Verfügung stellen - wir können nie sicher sein, welche Vorgänge eine Bereitstellung oder Zwischenspeicherung erfordern.
Wir können df06.py ausführen und dann überprüfen, ob neue Tabellen in BigQuery erstellt werden. Bis jetzt haben wir den Code lokal ausgeführt, entweder auf deinem Laptop oder in der Cloud Shell.
Als Nächstes schauen wir uns an, wie wir dies in Cloud Dataflow, dem GCP-Managed Service für Apache Beam-Pipelines, ausführen können.
Betrieb der Pipeline in der Cloud
Dieser letzte Durchlauf hat auf der lokalen virtuellen Maschine (VM) ein paar Minuten gedauert, und wir haben nur tausend Zeilen verarbeitet! Ändern wir den Code (siehe df07.py), um alle Zeilen in der BigQuery-Ansicht zu verarbeiten:
'flights:read' >> beam.io.ReadFromBigQuery( query='SELECT * FROM dsongcp.flights', use_standard_sql=True)
Jetzt, da wir viel mehr Daten haben, müssen wir die Arbeit verteilen. Dazu ändern wir den Runner von DirectRunner
(der lokal läuft) zu DataflowRunner
(der den Auftrag in die Cloud verlagert und skaliert):
argv = [ '--project={0}'.format(project), '--job_name=ch04timecorr', '--save_main_session', '--staging_location=gs://{0}/flights/staging/'.format(bucket), '--temp_location=gs://{0}/flights/temp/'.format(bucket), '--setup_file=./setup.py', '--max_num_workers=8', '--region={}'.format(region), '--runner=DataflowRunner' ] pipeline = beam.Pipeline(argv=argv)
Beachte, dass es jetzt ein paar zusätzliche Parameter gibt:
-
Der Auftragsname ist der Name, unter dem der Auftrag in der GCP-Konsole aufgeführt wird. So können wir bei Bedarf eine Fehlersuche für den Auftrag durchführen.
-
Wir bitten den Dataflow-Übermittlungscode, unsere Hauptsitzung zu speichern. Das ist immer dann notwendig, wenn wir globale Variablen in unserem Python-Programm haben.
-
Die Datei setup.py sollte die Python-Pakete auflisten, die wir nach und nach installieren müssen (
timezonefinder
undpytz
) - Cloud Dataflow muss diese Pakete auf den Compute Engine-Instanzen installieren, die es im Hintergrund startet:REQUIRED_PACKAGES = [ 'timezonefinder', 'pytz' ]
-
Standardmäßig skaliert Dataflow die Anzahl der Arbeiter auf der Grundlage des Durchsatzes - je mehr Zeilen wir in unseren Eingabedateien haben, desto mehr Arbeiter brauchen wir. Dies wird als horizontale Autoskalierung bezeichnet. Um die automatische Skalierung auszuschalten, kannst du
--autoscaling_algorithm=NONE
angeben und um sie etwas einzuschränken, kannst du die maximale Anzahl der Arbeiter angeben. -
Wir legen die Region fest, in der die Dataflow-Pipeline laufen soll.
-
Der Runner ist nicht mehr
DirectRunner
(der lokal läuft). Er ist jetztDataflowRunner
.
Wenn du das Python-Programm ausführst, wird der Auftrag an die Cloud übermittelt. Cloud Dataflow skaliert jeden Schritt der Pipeline automatisch auf Basis des Durchsatzes und streamt die Ereignisdaten in BigQuery (siehe Abbildung 4-3). Du kannst den laufenden Auftrag in der Cloud Platform Console im Bereich Cloud Dataflow überwachen.
Noch während die Ereignisdaten geschrieben werden, können wir sie abfragen, indem wir die BigQuery-Konsole aufrufen und Folgendes eingeben:
SELECT ORIGIN, DEP_TIME, DEST, ARR_TIME, ARR_DELAY, EVENT_TIME, EVENT_TYPE FROM dsongcp.flights_simevents WHERE (DEP_DELAY > 15 and ORIGIN = 'SEA') or (ARR_DELAY > 15 and DEST = 'SEA') ORDER BY EVENT_TIME ASC LIMIT 5
Das gibt zurück:
Row |
ORIGIN |
DEP_TIME |
DEST |
ARR_TIME |
ARR_DELAY |
EVENT_TIME |
EVENT_TYPE |
---|---|---|---|---|---|---|---|
1 |
SEA |
2015-01-01 08:21:00 UTC |
IAD |
null |
null |
2015-01-01 08:21:00 UTC |
departed |
2 |
SEA |
2015-01-01 08:21:00 UTC |
IAD |
null |
null |
2015-01-01 08:38:00 UTC |
wheelsoff |
3 |
SEA |
2015-01-01 08:21:00 UTC |
IAD |
2015-01-01 12:48:00 UTC |
22.0 |
2015-01-01 12:48:00 UTC |
arrived |
4 |
KOA |
2015-01-01 10:11:00 UTC |
SEA |
2015-01-01 15:45:00 UTC |
40.0 |
2015-01-01 15:45:00 UTC |
arrived |
5 |
SEA |
2015-01-01 16:43:00 UTC |
PSP |
null |
null |
2015-01-01 16:43:00 UTC |
departed |
Wie erwartet, gibt es für den Flug SEA-IAD drei Ereignisse: eines beim Abflug, das nächste beim Abrollen und das dritte bei der Ankunft. Die Ankunftsverspätung ist nur bei der Ankunft bekannt.
BigQuery ist eine spaltenorientierte Datenbank, also eine Abfrage, die alle Felder auswählt:
SELECT * FROM dsongcp.flights_simevents ORDER BY EVENT_TIME ASC
ineffizient sein wird. Wir brauchen jedoch alle Ereignisdaten, um Ereignisbenachrichtigungen zu versenden. Deshalb haben wir einen Kompromiss zwischen Speicherung und Geschwindigkeit gefunden, indem wir eine zusätzliche Spalte namens EVENT_DATA
zu unserer BigQuery-Tabelle hinzugefügt und sie wie folgt in unsere Dataflow-Pipeline eingefügt haben:
def create_event_row(fields): featdict = dict(fields) # copy featdict['EVENT_DATA'] = json.dumps(fields) return featdict
Dann könnte unsere Abfrage zum Abrufen der Ereignisse einfach wie folgt lauten:
SELECT EVENT_TYPE, EVENT_TIME, EVENT_DATA FROM dsongcp.flights_simevents WHERE EVENT_TIME >= TIMESTAMP('2015-05-01 00:00:00 UTC') AND EVENT_TIME < TIMESTAMP('2015-05-03 00:00:00 UTC') ORDER BY EVENT_TIME ASC LIMIT 2
Das Ergebnis sieht so aus:
Row |
EVENT_TYPE |
EVENT_TIME |
EVENT_DATA |
---|---|---|---|
1 |
wheelsoff |
2015-05-01 00:00:00 UTC |
{"FL_DATE": "2015-04-30", "UNIQUE_CARRIER": "DL", "ORIGIN_AIRPORT_SEQ_ID": "1295302", "ORIGIN": "LGA", "DEST_AIRPORT_SEQ_ID": "1330303", "DEST": "MIA", "CRS_DEP_TIME": "2015-04-30T23:29:00", "DEP_TIME": "2015-04-30T23:35:00", "DEP_DELAY": 6.0, "TAXI_OUT": 25.0, "WHEELS_OFF": "2015-05-01T00:00:00", "CRS_ARR_TIME": "2015-05-01T02:53:00", "CANCELLED": false, "DIVERTED": false, "DEP_AIRPORT_TZOFFSET": -14400.0, "ARR_AIRPORT_TZOFFSET": -14400.0, "EVENT_TYPE": "wheelsoff", "EVENT_TIME": "2015-05-01T00:00:00"} |
2 |
departed |
2015-05-01 00:00:00 UTC |
{"FL_DATE": "2015-04-30", "UNIQUE_CARRIER": "DL", "ORIGIN_AIRPORT_SEQ_ID": "1295302", "ORIGIN": "LGA", "DEST_AIRPORT_SEQ_ID": "1320402", "DEST": "MCO", "CRS_DEP_TIME": "2015-04-30T23:55:00", "DEP_TIME": "2015-05-01T00:00:00", "DEP_DELAY": 5.0, "CRS_ARR_TIME": "2015-05-01T02:45:00", "CANCELLED": false, "DIVERTED": false, "DEP_AIRPORT_TZOFFSET": -14400.0, "ARR_AIRPORT_TZOFFSET": -14400.0, "EVENT_TYPE": "departed", "EVENT_TIME": "2015-05-01T00:00:00"} |
Diese Tabelle wird als Quelle für unsere Ereignisse dienen; aus einer solchen Abfrage werden wir die Streaming-Flugdaten simulieren.
Veröffentlichen eines Ereignisstroms in Cloud Pub/Sub
Jetzt, da wir die Quell-Ereignisse aus den rohen Flugdaten haben, können wir den Stream simulieren. Streaming-Daten werden auf der Google Cloud Platform in der Regel über Cloud Pub/Sub veröffentlicht, einen serverlosen Echtzeit-Nachrichtendienst. Cloud Pub/Sub sorgt für eine zuverlässige Zustellung und kann auf mehr als eine Million Nachrichten pro Sekunde skaliert werden. Sofern du nicht Cloud Pub/Sub Lite verwendest (ein Dienst mit nur einer Zone, der für einen kostengünstigen Betrieb ausgelegt ist), speichert Pub/Sub Kopien der Nachrichten in mehreren Zonen, um eine garantierte Zustellung an die Abonnenten zu gewährleisten, und es kann viele gleichzeitige Abonnenten geben.
Unser Simulator liest aus der Ereignistabelle in BigQuery (die im vorherigen Abschnitt erstellt wurde) und veröffentlicht Nachrichten an Cloud Pub/Sub. Im Wesentlichen werden wir die Flugereignisdatensätze durchgehen, die Benachrichtigungszeit von jedem abrufen und die Veröffentlichung dieser Ereignisse simulieren, sobald sie eintreten.
Speed-Up-Faktor
Wir verwenden aber auch eine Zuordnung zwischen der Zeit der Ereignismeldung (Ankunfts- oder Abfahrtszeit je nach Ereignis) und der aktuellen Systemzeit. Warum? Weil es ineffizient ist, die Flugereignisse immer in Echtzeit zu simulieren. Stattdessen kann es sein, dass wir die Flugdaten eines Tages in einer Stunde durchgehen wollen (solange der Code, der diese Ereignisse verarbeitet, die erhöhte Datenrate bewältigen kann). Zu anderen Zeiten kann es sein, dass wir unseren ereignisverarbeitenden Code in einer langsameren Debugging-Umgebung ausführen und die Simulation deshalb verlangsamen wollen. Ich bezeichne dieses Verhältnis zwischen der tatsächlichen Zeit und der Simulationszeit als Beschleunigungsfaktor - derBeschleunigungsfaktor ist größer als 1, wenn die Simulation schneller als die Echtzeit sein soll, und kleiner als 1, wenn sie langsamer als die Echtzeit sein soll.
Basierend auf dem Beschleunigungsfaktor müssen wir eine lineare Transformation der Ereigniszeit in die Systemzeit vornehmen. Wenn der Beschleunigungsfaktor 1 ist, sollte eine 60-minütige Differenz zwischen dem Start der Simulation in der Ereigniszeit und dem Zeitstempel des aktuellen Datensatzes 60 Minuten nach dem Start der Simulation auftreten. Wenn der Beschleunigungsfaktor 60 ist, entspricht ein 60-minütiger Unterschied in der Ereigniszeit einem 1-minütigen Unterschied in der Systemzeit, so dass der Datensatz eine Minute später veröffentlicht werden sollte. Wenn die Ereigniszeit der Systemzeit voraus ist, schlafen wir die notwendige Zeit, damit die Simulation aufholen kann.
Die Simulation besteht aus vier Schritten (siehe auch Abbildung 4-5):
-
Führe die Abfrage aus, um die zu veröffentlichenden Flugereignisdatensätze zu erhalten.
-
Iteriere durch die Abfrageergebnisse.
-
Sammle Ereignisse, um sie in einem Stapel zu veröffentlichen.
-
Veröffentliche angesammelte Ereignisse und schlafe bei Bedarf.
Obwohl es sich hier um eine ETL-Pipeline handelt, ist diese ETL-Pipeline aufgrund der Notwendigkeit, die Datensätze in strikter Reihenfolge zu verarbeiten und dazwischen zu pausieren, schlecht für Cloud Dataflow geeignet. Stattdessen werden wir sie als reines Python-Programm implementieren. Das Problem bei dieser Wahl ist, dass der Simulationscode nicht fehlertolerant ist - wenn die Simulation fehlschlägt, wird sie nicht automatisch neu gestartet und fängt definitiv nicht beim letzten erfolgreich gemeldeten Ereignis an.
Der Simulationscode, den wir schreiben, ist nur für schnelle Experimente mit Streaming-Daten gedacht. Daher werde ich nicht den zusätzlichen Aufwand betreiben, um ihn fehlertolerant zu machen. Wenn es nötig wäre, könnten wir die Simulation fehlertolerant machen, indem wir mit einer BigQuery-Abfrage beginnen, die durch einen Zeitbereich begrenzt ist, dessen Beginn automatisch aus dem zuletzt gemeldeten Datensatz in Cloud Pub/Sub abgeleitet wird. Dann könnten wir das Simulationsskript von einem Docker-Container aus starten und Cloud Run oder Google Kubernetes Engine verwenden, um die Simulation automatisch neu zu starten, wenn der Simulationscode fehlschlägt.
Datensätze zur Veröffentlichung erhalten
Die BigQuery-Abfrage wird durch die Start- und Endzeit der Simulation parametrisiert und kann über die Google Cloud API für Python aufgerufen werden (siehe 04_streaming/simulate/simulate.py im GitHub-Repository):
bqclient = bq.Client(args.project) querystr = """ SELECT EVENT_TYPE, EVENT_TIME AS NOTIFY_TIME, EVENT_DATA FROM dsongcp.flights_simevents WHERE EVENT_TIME >= TIMESTAMP('{}') AND EVENT_TIME < TIMESTAMP('{}') ORDER BY EVENT_TIME ASC """ rows = bqclient.query(querystr.format(args.startTime, args.endTime))
Das ist jedoch eine schlechte Idee. Verstehst du, warum?
Das liegt daran, dass wir die Start- und Endzeit aus der Befehlszeile des Simulationsskripts abrufen und direkt an BigQuery weitergeben. Das nennt man SQL-Injection, die zu Sicherheitsproblemen führen kann.17 Ein besserer Ansatz ist es, parametrisierte Abfragenzu verwenden - dieBigQuery-Abfrage enthält die Parameter, die als @startTime
usw. gekennzeichnet sind, und die Python-Abfragefunktion übernimmt die Definitionen über den Job-Konfigurationsparameter:
bqclient = bq.Client(args.project) querystr = """ SELECT EVENT_TYPE, EVENT_TIME AS NOTIFY_TIME, EVENT_DATA FROM dsongcp.flights_simevents WHERE EVENT_TIME >= @startTime AND EVENT_TIME < @endTime ORDER BY EVENT_TIME ASC """ job_config = bq.QueryJobConfig( query_parameters=[ bq.ScalarQueryParameter("startTime", "TIMESTAMP", args.startTime), bq.ScalarQueryParameter("endTime", "TIMESTAMP", args.endTime), ] ) rows = bqclient.query(querystr, job_config=job_config)
Die Abfragefunktion gibt ein Objekt zurück (im vorangegangenen Schnipsel rows
genannt), das wir durchlaufen können:
for row in rows: # do something
Was müssen wir für jede der Zeilen tun? Wir müssen durch die Datensätze iterieren, einen Stapel von Ereignissen erstellen und jeden Stapel veröffentlichen. Schauen wir uns an, wie jeder dieser Schritte abläuft.
Wie viele Themen?
Während wir die Abfrageergebnisse durchgehen, müssen wir Ereignisse in Cloud Pub/Sub veröffentlichen. Wir haben drei Möglichkeiten, was die Architektur angeht:
-
Wir könnten alle Ereignisse in einem einzigen Thema veröffentlichen. Das kann jedoch eine Verschwendung von Netzwerkbandbreite sein, wenn wir einen Abonnenten haben, der nur an dem Ereignis
wheelsoff
interessiert ist. Ein solcher Abonnent muss das eingehende Ereignis parsen, dieEVENT_TYPE
Datei im JSON dekodieren und Ereignisse, an denen er nicht interessiert ist, verwerfen. -
Wir könnten alle Ereignisse in einem einzigen Thema veröffentlichen, aber jeder Nachricht Attribute hinzufügen. Um zum Beispiel ein Ereignis mit zwei Attributen zu veröffentlichen
event_type
undcarrier
zu veröffentlichen, würden wir Folgendes tun:publisher.publish(topic, event_data, event_type='departed', carrier='AA')
Dann könnte der Abonnent bei der Erstellung des Abonnements eine serverseitige Filterung nach einem Attribut oder einer Kombination von Attributen verlangen :
subscriber.create_subscription(request={..., "filter": "attributes.carrier='AS' AND attributes.event_type='arrived'"})
-
Erstelle für jeden Ereignistyp ein eigenes Thema (d.h. ein
arrived
Thema, eindeparted
Thema und einwheelsoff
Thema).
Option 1 ist die einfachste und sollte deine Standardwahl sein, es sei denn, du hast viele Abonnenten, die nur an Teilmengen des Ereignisstroms interessiert sind. Wenn du viele Abonnenten hast, die sich nur für Teilmengen des Ereignisstroms interessieren, wähle zwischen Option 2 und 3.
Option 2 erhöht die Komplexität der Software. Option 3 erhöht die Komplexität der Infrastruktur. Ich schlage vor, Option 3 zu wählen, wenn du nur ein Attribut hast und dieses Attribut nur eine Handvoll Optionen hat. Das begrenzt die Komplexität der Infrastruktur und hält den Code für Publisher und Subscriber einfach. Wähle Option 2, wenn du viele Attribute mit vielen möglichen Werten hast, weil Option 3 in einem solchen Szenario zu einer explosionsartigen Zunahme der Anzahl der Themen führen würde.
Iteration durch Datensätze
Wir entscheiden uns dafür, für jede Ereignisart ein eigenes Thema zu erstellen (d. h. ein arrived
Thema, ein departed
Thema und ein wheelsoff
Thema), also erstellen wir drei Themen:18
for event_type in ['wheelsoff', 'arrived', 'departed']: topics[event_type] = publisher.topic_path(args.project, event_type) try: publisher.get_topic(topic=topics[event_type]) logging.info("Already exists: {}".format(topics[event_type])) except: logging.info("Creating {}".format(topics[event_type])) publisher.create_topic(name=topics[event_type])
Nachdem wir die Themen erstellt haben, rufen wir die Methode notify()
auf und geben die aus BigQuery gelesenen Zeilen weiter:
# notify about each row in the dataset programStartTime = datetime.datetime.utcnow() simStartTime = datetime.datetime.strptime(args.startTime, TIME_FORMAT).replace(tzinfo=pytz.UTC) notify(publisher, topics, rows, simStartTime, programStartTime, args.speedFactor)
Erstellen eines Stapels von Ereignissen
Die Methode notify()
besteht darin, die Zeilen in Stapeln zu sammeln, einen Stapel zu veröffentlichen und zu schlafen, bis es Zeit ist, den nächsten Stapel zu veröffentlichen:
def notify(publisher, topics, rows, simStartTime, programStart, speedFactor): # sleep computation def compute_sleep_secs(notify_time): time_elapsed = (datetime.datetime.utcnow() - programStart).seconds sim_time_elapsed = (notify_time - simStartTime).seconds / speedFactor to_sleep_secs = sim_time_elapsed - time_elapsed return to_sleep_secs tonotify = {} for key in topics: tonotify[key] = list() for row in rows: event, notify_time, event_data = row # how much time should we sleep? if compute_sleep_secs(notify_time) > 1: # notify the accumulated tonotify publish(publisher, topics, tonotify, notify_time) for key in topics: tonotify[key] = list() # recompute sleep, since notification takes a while to_sleep_secs = compute_sleep_secs(notify_time) if to_sleep_secs > 0: logging.info('Sleeping {} seconds'.format(to_sleep_secs)) time.sleep(to_sleep_secs) tonotify[event].append(event_data) # left-over records; notify again publish(publisher, topics, tonotify, notify_time)
Hier gibt es ein paar Punkte zu beachten. Erstens arbeiten wir komplett in UTC, damit die Zeitdifferenzberechnungen einen Sinn ergeben. Zweitens berechnen wir immer die Zeitdifferenz seit dem Start der Simulation, um zu sehen, ob wir schlafen sollen. Wenn wir den Zeiger einfach immer weiter nach vorne schieben, häufen sich die Fehler in der Zeit. Außerdem prüfen wir beim ersten Mal, ob die Schlafzeit mehr als eine Sekunde beträgt, damit die Aufzeichnungen Zeit haben, sich zu sammeln. Wenn du beim Ausführen des Programms keinen Ruhezustand siehst, ist dein Beschleunigungsfaktor zu hoch für die Leistungsfähigkeit des Rechners, auf dem der Simulationscode läuft, und für das Netzwerk zwischen diesem Rechner und der Google Cloud Platform. Verlangsame die Simulation, besorge dir einen größeren Rechner oder führe sie hinter der Google Firewall aus (z. B. in der Cloud Shell oder auf einer Compute Engine Instanz).
Veröffentlichen eines Stapels von Veranstaltungen
Die Methode notify()
, die wir im vorherigen Codebeispiel gesehen haben, hat die Ereignisse zwischen den Sleep-Aufrufen angesammelt. Auch wenn es so aussieht, als würden wir ein Ereignis nach dem anderen veröffentlichen, verwaltet der Verleger tatsächlich einen separaten Stapel für jedes Thema:
def publish(publisher, topics, allevents): for key in topics: # 'departed', 'arrived', etc. topic = topics[key] events = allevents[key] logging.info('Publishing {} {} events'.format(len(events), key)) for event_data in events: publisher.publish(topic, event_data.encode())
Beachte, dass Cloud Pub/Sub nicht garantiert, dass die Nachrichten in der richtigen Reihenfolge zugestellt werden, insbesondere wenn der Abonnent einen großen Rückstand aufbaut. Es kommt vor, dass Nachrichten nicht in der richtigen Reihenfolge zugestellt werden, und die nachgelagerten Abonnenten müssen sich darum kümmern. Cloud Pub/Sub garantiert die Zustellung "mindestens einmal" und sendet die Nachricht erneut, wenn der Abonnent eine Nachricht nicht rechtzeitig bestätigt. Ich werde Cloud Dataflow für den Ingest von Cloud Pub/Sub verwenden, und Cloud Dataflow kümmert sich transparent um diese beiden Probleme (außer der Reihe und Duplikation).
Wir können die Simulation ausprobieren, indem wir das Folgende eingeben:
python3 simulate.py --startTime '2015-05-01 00:00:00 UTC' \ --endTime '2015-05-04 00:00:00 UTC' --speedFactor=60
Damit werden drei Tage Flugdaten (die Endzeit ist exklusiv) mit 60-facher Echtzeitgeschwindigkeit simuliert und die Ereignisse in drei Topics auf Cloud Pub/Sub gestreamt.19 Da die Simulation von einer BigQuery-Abfrage ausgeht, ist es ganz einfach, die simulierten Ereignisse auf einen einzigen Flughafen oder auf Flughäfen innerhalb eines Längen- und Breitengrads zu beschränken.
In diesem Abschnitt haben wir uns angesehen, wie man einen Ereignisstrom erzeugt und diese Ereignisse in Echtzeit veröffentlicht. In diesem Buch können wir diesen Simulator und diese Themen nutzen, um zu erproben, wie wir Streaming-Daten nutzen und Echtzeitanalysen durchführen können.
Stream-Verarbeitung in Echtzeit
Da wir nun eine Quelle für Streaming-Daten mit Standortinformationen haben, wollen wir uns ansehen, wie wir ein Echtzeit-Dashboard erstellen können. Abbildung 4-6 zeigt die Referenzarchitektur für viele Lösungen auf der Google Cloud Platform.20
Im vorherigen Abschnitt haben wir einen Echtzeit-Ereignisstrom in Cloud Pub/Sub eingerichtet, den wir in Cloud Dataflow aggregieren und in BigQuery schreiben können. Data Studio kann sich mit BigQuery verbinden und ein interaktives Dashboard in Echtzeit bereitstellen. Legen wir los.
Streaming im Datenfluss
Als wir die Zeitkorrektur der Rohflugdaten durchführten, arbeiteten wir mit einer vollständigen BigQuery-Flugtabelle, verarbeiteten sie in Cloud Dataflow und schrieben die Ereignistabelle in BigQuery. Die Verarbeitung eines endlichen, begrenzten Eingabedatensatzes wird als Stapelverarbeitung bezeichnet.
Hier müssen wir jedoch Ereignisse in Cloud Pub/Sub verarbeiten, die in Strömen hereinströmen. Die Datenmenge ist unbegrenzt. Die Verarbeitung eines unbegrenzten Datensatzes wird Stream Processing genannt. Zum Glück ist der Code für die Stream-Verarbeitung in Apache Beam identisch mit dem Code für die Stapelverarbeitung.
Wir könnten die Ereignisse einfach von Cloud Pub/Sub empfangen, ähnlich wie wir Daten aus einer CSV-Datei lesen:21
topic_name = "projects/{}/topics/arrived".format(project) events = (pipeline | 'read' >> beam.io.ReadFromPubSub(topic=topic_name) | 'parse' >> beam.Map(lambda s: json.loads(s)) )
Die einzige Änderung, die wir vornehmen müssen, ist, das Streaming-Flag in den Dataflow-Optionen zu aktivieren:
argv = [ ... '--streaming', ]
Wir können die eingelesenen Ereignisse an BigQuery streamen, indem wir einen ähnlichen Code wie bei der Stapelverarbeitung verwenden:
schema = 'FL_DATE:date,...,EVENT_TYPE:string,EVENT_TIME:timestamp' (events | 'bqout' >> beam.io.WriteToBigQuery( 'dsongcp.streaming_events', schema=schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED ) )
Im vorangegangenen Code abonnieren wir ein Thema in Cloud Pub/Sub und beginnen, es zu lesen. Sobald eine Nachricht eintrifft, analysieren wir sie, wandeln sie in eine TableRow in BigQuery um und schreiben sie dann aus. Wenn das alles ist, was wir brauchen, können wir einfach die von Google bereitgestellte Dataflow-Vorlage verwenden, die von Pub/Sub zu BigQuery führt.
Aber nehmen wir an, dass wir sowohl die angekommenen als auch die abgegangenen Ereignisse lesen und in dieselbe BigQuery-Tabelle schreiben wollen. Das können wir ganz einfach in Beam machen:
events = {} for event_name in ['arrived', 'departed']: topic_name = "projects/{}/topics/{}".format(project, event_name) events[event_name] = (pipeline | 'read:{}'.format(event_name) >> beam.io.ReadFromPubSub(topic=topic_name) | 'parse:{}'.format(event_name) >> beam.Map( lambda s: json.loads(s)) ) all_events = (events['arrived'], events['departed']) | beam.Flatten()
Durch das Flattening werden die beiden Ereignisgruppen zu einer einzigen Sammlung verkettet. Dann schreiben wir all_events
an BigQuery aus.
Um diesen Code auszuprobieren, müssen wir den Simulator starten, den wir im vorherigen Abschnitt geschrieben haben, damit der Simulator Ereignisse in den Pub/Sub-Themen veröffentlichen kann. Um die Simulation zu starten, startest du den Python-Simulator, den wir im vorigen Abschnitt entwickelt haben:
python simulate.py --startTime '2015-05-01 00:00:00 UTC' --endTime '2015-05-04 00:00:00 UTC' --speedFactor 30
Der Simulator sendet Ereignisse vom 1. Mai 2015 bis zum 3. Mai 2015 mit 30-facher Echtzeitgeschwindigkeit, so dass eine Stunde Daten in zwei Minuten an Cloud Pub/Sub gesendet wird. Du kannst dies von der Cloud Shell oder von deinem lokalen Laptop aus tun. (Falls nötig, führe install_packages.sh
aus, um die erforderlichen Python-Pakete zu installieren, und gcloud auth application-default login
, um der Anwendung die nötigen Anmeldedaten für die Ausführung von Abfragen zu geben).
Starte in einem anderen Terminal avg01.py, um den Strom von Ereignissen zu lesen und sie in BigQuery zu schreiben. Dann können wir den Datensatz in BigQuery abfragen, während die Ereignisse hereinströmen. Die BigQuery-Benutzeroberfläche zeigt diese Streaming-Tabelle vielleicht noch nicht an, aber sie kann abgefragt werden:
SELECT * FROM dsongcp.streaming_events ORDER BY EVENT_TIME DESC LIMIT 5
Fensterung einer Pipeline
Obwohl wir nur einen einfachen Datentransfer machen könnten, würde ich gerne mehr tun. Wenn ich ein Echtzeit-Dashboard mit Flugverspätungen auffülle, möchte ich, dass die Informationen über ein angemessenes Intervall aggregiert werden. Ich möchte zum Beispiel einen gleitenden Durchschnitt der Flugverspätungen und die Gesamtzahl der Flüge der letzten 60 Minuten an jedem Flughafen. Anstatt die von Cloud Pub/Sub erhaltenen Daten einfach an BigQuery weiterzuleiten, möchte ich die Daten in dem Moment analysieren, in dem ich sie erhalte, und diese Analysen in BigQuery schreiben.22 Cloud Dataflow kann uns dabei helfen.
Auch wenn wir einen Durchschnitt über 60 Minuten bilden, wie oft sollten wir diesen 60-Minuten-Durchschnitt berechnen? Es könnte zum Beispiel von Vorteil sein, ein gleitendes Fenster zu verwenden und diesen 60-Minuten-Durchschnitt alle fünf Minuten zu berechnen.
Streaming-Aggregation
Der Hauptunterschied zwischen Batch-Aggregation und Streaming-Aggregation ist die Unbegrenztheit der Daten bei der Stream-Verarbeitung. Was bedeutet eine Operation wie "max", wenn die Daten unbegrenzt sind? Unabhängig davon, wie hoch der Maximalwert zu diesem Zeitpunkt ist, kann zu einem späteren Zeitpunkt ein größerer Wert im Datenstrom auftauchen.
Ein Schlüsselkonzept bei der Aggregation von Streaming-Daten ist das eines Fensters, das den Rahmen für alle Aggregationen bildet. Hier wenden wir ein zeitbasiertes gleitendes Fenster auf die Pipeline an. Von nun an finden alle Gruppierungen, Aggregationen usw. innerhalb dieses Zeitfensters statt, und in jedem Zeitfenster gibt es ein eigenes Maximum, einen Durchschnitt usw:
stats = (all_events | 'byairport' >> beam.Map(by_airport) | 'window' >> beam.WindowInto( beam.window.SlidingWindows(60 * 60, 5 * 60)) | 'group' >> beam.GroupByKey() | 'stats' >> beam.Map(lambda x: compute_stats(x[0], x[1])) )
Gehen wir den vorangegangenen Codeschnipsel genau durch.
Als erstes nehmen wir alle Ereignisse und wenden die by_airport
Transformation auf die Ereignisse an:
| 'byairport' >> beam.Map(by_airport)
Dabei wird der Herkunftsflughafen für abfliegende Ereignisse und der Zielflughafen für ankommende Ereignisse herausgezogen:
def by_airport(event): if event['EVENT_TYPE'] == 'departed': return event['ORIGIN'], event else: return event['DEST'], event
Als Nächstes wenden wir ein gleitendes Fenster auf den Ereignisstrom an. Das Fenster hat eine Dauer von 60 Minuten und wird alle 5 Minuten angewendet:
| 'window' >> beam.WindowInto( beam.window.SlidingWindows(60 * 60, 5 * 60))
Dann wenden wir eine GroupByKey
an:
| 'group' >> beam.GroupByKey()
Was ist der Schlüssel?
In der zuvor erwähnten Funktion by_airport
haben wir den Flughafen zum Schlüssel und das gesamte Ereignisobjekt zum Wert gemacht. Die GroupByKey
gruppiert die Ereignisse also nach Flughafen.
Aber die GroupByKey
ist nicht nur nach Flughafen geordnet. Da wir bereits ein gleitendes Fenster angewendet haben, wird für jedes Zeitfenster eine eigene Gruppe erstellt. Jede Gruppe besteht also aus 60 Minuten Flugereignissen, die an einem bestimmten Flughafen angekommen oder abgeflogen sind.
Für diese Gruppen rufen wir die Funktion compute_stats
im letzten Map
des Snippets auf:
| 'stats' >> beam.Map(lambda x: compute_stats(x[0], x[1]))
Die Funktion compute_stats
nimmt den Flughafen und die Liste der Ereignisse an diesem Flughafen und berechnet dann einige Statistiken:
def compute_stats(airport, events): arrived = [event['ARR_DELAY'] for event in events if event['EVENT_TYPE'] == 'arrived'] avg_arr_delay = float(np.mean(arrived)) if len(arrived) > 0 else None departed = [event['DEP_DELAY'] for event in events if event['EVENT_TYPE'] == 'departed'] avg_dep_delay = float(np.mean(departed)) if len(departed) > 0 else None num_flights = len(events) start_time = min([event['EVENT_TIME'] for event in events]) latest_time = max([event['EVENT_TIME'] for event in events]) return { 'AIRPORT': airport, 'AVG_ARR_DELAY': avg_arr_delay, 'AVG_DEP_DELAY': avg_dep_delay, 'NUM_FLIGHTS': num_flights, 'START_TIME': start_time, 'END_TIME': latest_time }
Im vorangegangenen Code ziehen wir die eingetroffenen Ereignisse heraus und berechnen die durchschnittliche Ankunftsverspätung. Genauso berechnen wir die durchschnittliche Abflugverspätung für die abgeflogenen Ereignisse. Außerdem berechnen wir die Anzahl der Flüge in dem Zeitfenster an diesem Flughafen und geben alle diese Statistiken zurück.
Die Statistiken werden dann mithilfe eines Codes, der dir inzwischen bekannt vorkommen sollte, an BigQuery übermittelt:
stats_schema = ','.join( ['AIRPORT:string,AVG_ARR_DELAY:float,AVG_DEP_DELAY:float', 'NUM_FLIGHTS:int64,START_TIME:timestamp,END_TIME:timestamp']) (stats | 'bqout' >> beam.io.WriteToBigQuery( 'dsongcp.streaming_delays', schema=stats_schema, create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED ) )
Wie im vorherigen Abschnitt können wir den Simulator starten und dann avg02.py starten. Als ich das tat, wurden die Aggregationen alle 5 Minuten erstellt, aber innerhalb jedes 5-Minuten-Zeitraums deckten die gemeldeten Ereignisse einen Bereich von 150 Minuten ab (weil die 30x-Simulation 150 Minuten Daten in 5 Minuten verarbeitet).
Die Stream-Processing-Engine wendet die Schiebefenster auf der Grundlage der Zeit auf einer Wanduhr an. Wir möchten jedoch, dass das Fenster auf der Grundlage des Zeitstempels in den Bildern angewendet wird.
Wie machen wir das?
Zeitstempel für Ereignisse verwenden
Wir müssen ein Attribut hinzufügen, wenn wir die Ereignisse veröffentlichen (in simulate.py):
publisher.publish(topic, event_data.encode(), EventTimeStamp=timestamp)
Dann sollten wir in unserer Beam-Pipeline beim Lesen aus Pub/Sub die Veröffentlichungszeit in Pub/Sub ignorieren und stattdessen dieses Attribut der Nachricht als Zeitstempel verwenden:
| 'read:{}'.format(event_name) >> beam.io.ReadFromPubSub( topic=topic_name, timestamp_attribute='EventTimeStamp')
Mit dieser Änderung, wenn ich die Abfrage ausführe:
SELECT * FROM dsongcp.streaming_delays WHERE AIRPORT = 'ATL' ORDER BY END_TIME DESC
Ich bekomme Reihen im Abstand von etwa 5 Minuten, wie erwartet:
Row |
AIRPORT |
AVG_ARR_DELAY |
AVG_DEP_DELAY |
NUM_FLIGHTS |
START_TIME |
END_TIME |
---|---|---|---|---|---|---|
1 |
ATL |
35.72222222222222 |
13.666666666666666 |
48 |
2015-05-01 02:24:00 UTC |
2015-05-01 03:21:00 UTC |
2 |
ATL |
35.25 |
8.717948717948717 |
59 |
2015-05-01 02:15:00 UTC |
2015-05-01 03:12:00 UTC |
3 |
ATL |
38.666666666666664 |
9.882352941176471 |
52 |
2015-05-01 02:19:00 UTC |
2015-05-01 03:12:00 UTC |
4 |
ATL |
38.473684210526315 |
5.916666666666667 |
55 |
2015-05-01 02:15:00 UTC |
2015-05-01 03:08:00 UTC |
5 |
ATL |
35.111111111111114 |
5.53125 |
50 |
2015-05-01 02:15:00 UTC |
2015-05-01 03:03:00 UTC |
Die gemeldeten Zeiten liegen nicht genau 5 Minuten auseinander, weil die gemeldeten Zeiten dem frühesten/ spätesten Flug in Atlanta innerhalb des Zeitfensters entsprechen. Beachte auch, dass die Länge des Zeitfensters ungefähr eine Stunde beträgt.
Es ist jedoch wahrscheinlich, dass Cloud Shell oder dein lokaler Laptop Schwierigkeiten haben werden, mit dem Ereignisstrom Schritt zu halten. Wir müssen diese Pipeline in Dataflow auf serverlose Weise ausführen.
Ausführen der Stream-Verarbeitung
Um die Beam-Pipeline in Cloud Dataflow auszuführen, muss ich nur den Runner ändern (siehe avg03.py im GitHub-Repository):
argv = [ '--project={0}'.format(project), '--job_name=ch04avgdelay', '--streaming', ... '--runner=DataflowRunner' ]
Bevor wir mit dieser Pipeline beginnen, sollten wir jedoch die Zeilen löschen, die avg02.py im vorherigen Abschnitt bereits in die BigQuery-Tabelle geschrieben hat. Das geht am einfachsten, indem du den folgenden SQL DML-Befehl ausführst, um die Tabelle abzuschneiden:
TRUNCATE TABLE dsongcp.streaming_delays
Wenn du avg03.py ausführst, wird ein Dataflow-Job gestartet. Wenn du jetzt in der Cloud Platform-Konsole den Abschnitt Cloud Dataflow aufrufst, siehst du, dass ein neuer Streaming-Job gestartet wurde und dass die Pipeline wie in Abbildung 4-7 aussieht.
Die Pipeline verarbeitet die Flugereignisse, während sie in Pub/Sub eingehen, fasst sie in Zeitfenstern zusammen und überträgt die daraus resultierenden Statistiken in BigQuery.
Analysieren von Streaming-Daten in BigQuery
Zwei Minuten nach dem Start deines Programms,23 wird der erste Datensatz in BigQuery gespeichert. Du kannst die Statistiken für einen bestimmten Flughafen über die BigQuery-Konsole abfragen, indem du die gleiche Abfrage wie zuvor verwendest:
SELECT * FROM dsongcp.streaming_delays WHERE AIRPORT = 'ATL' ORDER BY END_TIME DESC
Das Tolle daran ist, dass wir diese Abfrage sogar während des Datenstroms durchführen können! Wie würden wir die neuesten Daten für alle Flughäfen bekommen? Wir könnten alle Daten für jeden Flughafen abrufen, sie nach Zeit sortieren und die neuesten Daten nehmen:
SELECT AIRPORT, ARRAY_AGG( STRUCT(AVG_ARR_DELAY, AVG_DEP_DELAY, NUM_FLIGHTS, END_TIME) ORDER BY END_TIME DESC LIMIT 1) AS a FROM dsongcp.streaming_delays d GROUP BY AIRPORT
Die Ergebnisse sehen in etwa so aus:
Row |
AIRPORT |
a.AVG_ARR_DELAY |
a.AVG_DEP_DELAY |
a.NUM_FLIGHTS |
a.END_TIME |
---|---|---|---|---|---|
1 |
BUR |
-6.8 |
-5.666666666666667 |
8 |
2015-05-01 03:26:00 UTC |
2 |
HNL |
17.11111111111111 |
-3.7777777777777777 |
18 |
2015-05-01 03:46:00 UTC |
3 |
CVG |
-7.75 |
null |
4 |
2015-05-01 03:48:00 UTC |
4 |
PHL |
5.636363636363637 |
16.5 |
13 |
2015-05-01 03:48:00 UTC |
5 |
IND |
40.6 |
null |
5 |
2015-05-01 03:45:00 UTC |
Abfragen wie diese auf Streaming-Daten werden nützlich sein, wenn wir mit dem Aufbau unseres Dashboards beginnen. Mit der ersten Abfrage können wir zum Beispiel ein Zeitreihendiagramm der Verspätungen an einem bestimmten Flughafen erstellen. Mit der zweiten Abfrage können wir eine Karte mit den durchschnittlichen Verspätungen im ganzen Land erstellen.
Real-Time Dashboard
Da wir nun über Streaming-Daten in BigQuery und eine Möglichkeit zur Analyse der Daten verfügen, können wir ein Dashboard erstellen, das Abflug- und Ankunftsverspätungen im Kontext zeigt. Zwei Karten können dabei helfen, den Endnutzern unser auf einer Kontingenztabelle basierendes Modell zu erklären: die aktuellen Verspätungen bei der Ankunft im ganzen Land und die aktuellen Verspätungen beim Abflug im ganzen Land.
Um die Daten für diese Diagramme zu erhalten, müssen wir in Data Studio eine BigQuery-Datenquelle hinzufügen. Obwohl Data Studio die Angabe der Abfrage direkt in der Benutzeroberfläche unterstützt, ist es viel besser, eine Ansicht in BigQuery zu erstellen und diese Ansicht als Datenquelle in Data Studio zu verwenden. BigQuery-Ansichten haben einige Vorteile gegenüber Abfragen, die du in Data Studio eingibst: Sie sind in der Regel in allen Berichten und Visualisierungstools wiederverwendbar, es gibt nur eine Stelle, an der du sie ändern musst, wenn ein Fehler auftritt, und BigQuery-Ansichten lassen sich besser den Zugriffsrechten (Cloud Identity and Access Management-Rollen) zuordnen, je nachdem, auf welche Spalten sie zugreifen müssen.
Hier ist die Abfrage, die ich zum Erstellen der Ansicht verwendet habe:
CREATE OR REPLACE VIEW dsongcp.airport_delays AS WITH delays AS ( SELECT d.*, a.LATITUDE, a.LONGITUDE FROM dsongcp.streaming_delays d JOIN dsongcp.airports a USING(AIRPORT) WHERE a.AIRPORT_IS_LATEST = 1 ) SELECT AIRPORT, CONCAT(LATITUDE, ',', LONGITUDE) AS LOCATION, ARRAY_AGG( STRUCT(AVG_ARR_DELAY, AVG_DEP_DELAY, NUM_FLIGHTS, END_TIME) ORDER BY END_TIME DESC LIMIT 1) AS a FROM delays GROUP BY AIRPORT, LONGITUDE, LATITUDE
Diese Abfrage unterscheidet sich geringfügig von der zweiten Abfrage im vorherigen Abschnitt, da sie auch den Standort des Flughafens hinzufügt, indem sie mit der Flughafentabelle verknüpft wird.
Nachdem wir die Ansicht in BigQuery gespeichert haben, können wir in Data Studio eine Datenquelle für die Ansicht erstellen, so wie wir es im vorherigen Kapitel getan haben:
-
Besuche https://datastudio.google.com.
-
Erstelle eine BigQuery-Datenquelle, verweise sie auf die Ansicht
airport_delays
und verbinde dich mit ihr. -
Ändere das Ortsfeld von Text in Geo | Breitengrad, Längengrad und klicke dann auf Bericht erstellen.
-
Füge dem Bericht ein Geodiagramm hinzu.
-
Gib das Ortsfeld als Geodimension an (siehe Abbildung 4-8).
-
Gib die durchschnittliche Abflugverspätung als Dimension und die Vereinigten Staaten als Zoomstufe an.
-
Ändere den Stil so, dass der Farbbalken alle Bereiche umfasst.
-
Wiederhole den Vorgang für die Ankunftsverspätung.
Es lohnt sich, darüber nachzudenken, was wir in diesem Abschnitt getan haben. Wir verarbeiteten Streaming-Daten in Cloud Dataflow und erstellten gleitende 60-Minuten-Durchschnitte, die wir in BigQuery einspeisten. Dann haben wir in BigQuery eine Ansicht erstellt, die die neuesten Daten für jeden Flughafen anzeigt, auch wenn sie gerade hereinströmen. Diese haben wir mit einem Dashboard in Data Studio verbunden. Jedes Mal, wenn das Dashboard aktualisiert wird, zieht es neue Daten aus der Ansicht, die wiederum dynamisch die neuesten Daten in BigQuery widerspiegelt.
Zusammenfassung
In diesem Kapitel haben wir besprochen, wie man eine Echtzeit-Analyse-Pipeline aufbaut, um Streaming-Analysen durchzuführen und Echtzeit-Dashboards zu bestücken. In diesem Buch verwenden wir einen Datensatz, der nicht in Echtzeit verfügbar ist. Deshalb haben wir die Erstellung eines Echtzeit-Feeds simuliert, um zu zeigen, wie man eine Streaming-Ingest-Pipeline aufbaut. Der Aufbau der Simulation gibt uns auch ein praktisches Testwerkzeug an die Hand - wir müssen nicht mehr warten, bis ein interessantes Ereignis eintritt. Wir können einfach ein aufgezeichnetes Ereignis abspielen!
Bei der Entwicklung der Simulation haben wir festgestellt, dass die Handhabung der Zeit im ursprünglichen Datensatz problematisch war. Deshalb haben wir den Umgang mit der Zeit in den Originaldaten verbessert und einen neuen Datensatz mit UTC-Zeitstempeln und lokalen Versätzen erstellt. Das ist der Datensatz, den wir in Zukunft verwenden werden.
Wir haben uns auch die Referenzarchitektur für die Verarbeitung von Streaming-Daten in Google Cloud Platform angesehen. Zuerst empfängst du deine Daten in Cloud Pub/Sub, damit die Nachrichten asynchron empfangen werden können. Verarbeite die Cloud Pub/Sub-Nachrichten in Cloud Dataflow, berechne je nach Bedarf Aggregationen der Daten und streame entweder die Rohdaten oder die aggregierten Daten (oder beides) zu BigQuery. Wir haben mit allen drei Google Cloud-Produkten (Cloud Pub/Sub, Cloud Dataflow und BigQuery) gearbeitet und dabei die Google Cloud-Client-Bibliotheken in Python verwendet. In keinem dieser Fälle mussten wir jedoch jemals selbst eine virtuelle Maschine erstellen - es handelt sich um serverlose und automatisch skalierende Angebote. So konnten wir uns auf das Schreiben von Code konzentrieren und die Plattform den Rest erledigen lassen.
Empfohlene Ressourcen
Auf der Apache Beam-Website findest du interaktive Programmierübungen, sogenannte Katas, mit denen du Streaming-Konzepte und deren Umsetzung mit Beam spielerisch erlernen kannst.
Dataflow-Vorlagen sind vorgefertigte Apache Beam-Pipelines, die für die Datenmigration nützlich sind. In diesem Kapitel haben wir die Dataflow-Vorlage für das Einlesen von Daten aus Pub/Sub in BigQuery erwähnt. Dataflow Templates gibt es auch für andere Quellen als Google. So gibt es zum Beispiel einen Dataflow-Konnektor von SAP HANA zu BigQuery, der in dem Google-Blogbeitrag "Using Apache Beam and Cloud Dataflow to Integrate SAP HANA and BigQuery" von Babu Prasad Elumalai und Mark Shalda aus dem Jahr 2017 beschrieben wird. Dieser spezielle Connector ist in Java geschrieben.
In diesem Tutorial erfährst du, wie du deine eigene Dataflow-Vorlage erstellst. Jede Dataflow-Pipeline kann in eine Vorlage umgewandelt werden, um sie einfach zu teilen und bequem zu starten.
In dem Artikel "Processing Billions of Events in Real Time at Twitter" (Verarbeitung von Milliarden von Ereignissen in Echtzeit bei Twitter) aus dem Jahr 2021 beschreiben die Twitter-Ingenieure Lu Zhang und Chukwudiuto Malife, wie Twitter 400 Milliarden Ereignisse in Echtzeit mit Dataflow verarbeitet.
1 Das ist eine häufige Situation. Erst wenn du anfängst, einen Datensatz zu erforschen, entdeckst du, dass du zusätzliche Datensätze brauchst. Hätte ich das vorher gewusst, hätte ich beide Datensätze importiert. Aber du folgst ja meinem Arbeitsablauf, und zu diesem Zeitpunkt wusste ich zwar, dass ich einen Datensatz mit Zeitzonenverschiebungen brauche, aber ich hatte noch nicht danach gesucht!
2 Siehe 04_streaming/design/mktbl.sh für die eigentliche Syntax; wir haben hier Anpassungen für den Druck vorgenommen.
3 Oder erstelle eine Kopie oder eine Ansicht der Tabelle mit anonymisierten Spaltenwerten - wir behandeln den Schutz von personenbezogenen Daten in Kapitel 7 und im Anhang.
4 Zum Beispiel wurde die Zeitzone von Sewastopol 2014 nach der Annexion der Krim durch die Russische Föderation von osteuropäischer Zeit (UTC+2) auf Moskauer Zeit (UTC+4) geändert.
5 Die Java-API ist viel ausgereifter und leistungsfähiger, aber Python ist einfacher und übersichtlicher. In diesem Buch werden wir Python verwenden.
6 Wenn du eine ephemere Shell wie Cloud Shell verwendest, musst du die Zeile activate jedes Mal ausführen, wenn du eine neue Sitzung startest. Dadurch wird die virtuelle Umgebung geladen, die du zuvor verwendet hast. Auf diese Weise musst du die Python-Pakete nicht jedes Mal neu installieren.
7 Dieser Code befindet sich in 04_streaming/transform/df01.py aus dem GitHub-Repository dieses Buches.
8 Dieser Code befindet sich in 04_streaming/transform/df02.py aus dem GitHub-Repository dieses Buches.
9 Siehe die Antwort auf die Frage "Wie gehe ich mit NameErrors
um?" in der Google-Dokumentation.
10 Das Speichern von Python-Objekten wird Pickling genannt.
11 Der Flughafen von Chicago ist am 30. Juni nicht zusammengepackt und umgezogen. Höchstwahrscheinlich wurde zu diesem Zeitpunkt ein neues Terminal oder eine neue Start- und Landebahn eröffnet, wodurch sich die Lage des Schwerpunkts der Luftausdehnung des Flughafens änderte. Beachte, dass die Veränderung nur 0,0036 Breitengrade beträgt. Bei Chicagos Breitengrad entspricht das etwa 400 Metern.
12 Normalerweise wird empfohlen, eine BigQuery-Tabelle zu sampeln ( SELECT * FROM dsongcp.flights WHERE TABLESAMPLE SYSTEM (0.001)
), weil das Tabellensampling nicht zwischengespeichert wird, sodass wir jedes Mal andere Ergebnisse erhalten. Zum Zeitpunkt der Erstellung dieses Artikels funktioniert das Table Sampling jedoch nur bei Tabellen und Flights ist eine View. Außerdem ist es uns in unserem aktuellen Anwendungsfall egal, ob wir jedes Mal, wenn wir die Abfrage ausführen, unterschiedliche Stichproben erhalten oder nicht. Deshalb verwende ich rand()
.
13 Siehe die Datei 04_streaming/transform/bqsample.sh.
14 Dieser Code befindet sich in 04_streaming/transform/df03.py aus dem GitHub-Repository dieses Buches.
15 Dieser Code befindet sich in 04_streaming/transform/df04.py aus dem GitHub-Repository dieses Buches.
16 Dieser Code befindet sich in 04_streaming/transform/df05.py aus dem GitHub-Repository dieses Buches.
17 Es öffnet jemandem die Tür, der Abfragen einreicht, die zum Beispiel eine Tabelle löschen können. Dieser XKCD-Cartoon ist berühmt dafür, dass er das Problem hervorhebt.
18 Siehe 04_streaming/simulate/simulate.py im GitHub-Repository.
19 Bei 60-facher Echtzeitgeschwindigkeit dauert es über eine Stunde, bis die Flugdaten von 3 Tagen vollständig vorliegen. Hoffentlich ist das genug Zeit, um den Rest des Kapitels abzuschließen. Wenn nicht, starte den Simulator einfach neu. Wenn du zu früh fertig bist, drücke Strg-C, um den Simulator anzuhalten.
20 Ein Beispiel dafür findest du in der Referenzarchitektur zur Analyse von Spielen auf mobilen Geräten.
21 Siehe 04_streaming/realtime/avg01.py im GitHub-Repository.
22 Wenn du die empfangenen Rohdaten in BigQuery schreiben möchtest, kannst du das natürlich auch tun - so wie im vorherigen Codeschnipsel gezeigt. In diesem Abschnitt gehe ich davon aus, dass wir nur die aggregierten Statistiken der letzten Stunde benötigen.
23 Erinnere dich daran, dass wir alle 5 Minuten Aggregate über 60 Minuten berechnen. Cloud Dataflow geht davon aus, dass das erste "volle" Fenster 65 Minuten nach Beginn der Simulation stattfindet. Da wir mit 30-facher Geschwindigkeit simulieren, sind das zwei Minuten auf deiner Uhr.
Get Data Science auf der Google Cloud Platform, 2. now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.