Kapitel 4. Dateneingabe: Daten extrahieren
Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com
Wie in Kapitel 3 erläutert, ist das ELT-Muster das ideale Design für Datenpipelines, die für Datenanalyse, Data Science und Datenprodukte entwickelt werden. Die ersten beiden Schritte des ELT-Patterns, das Extrahieren und das Laden, werden zusammenfassend als Dateneingabe bezeichnet. In diesem Kapitel geht es darum, wie du deine Entwicklungsumgebung und Infrastruktur für beides einrichtest und wie du Daten aus verschiedenen Quellsystemen extrahierst. Kapitel 5 befasst sich mit dem Laden der resultierenden Datensätze in ein Data Warehouse.
Hinweis
Die Codebeispiele zum Extrahieren und Laden in diesem Kapitel sind vollständig voneinander entkoppelt. Die Koordinierung der beiden Schritte zur Datenübernahme ist ein Thema, das in Kapitel 7 behandelt wird.
Wie in Kapitel 2 erläutert, gibt es zahlreiche Arten von Quellsystemen, aus denen Daten extrahiert werden können, und zahlreiche Zielsysteme, in die sie geladen werden müssen. Außerdem liegen die Daten in vielen verschiedenen Formen vor, die alle unterschiedliche Herausforderungen für die Datenaufnahme mit sich bringen.
Dieses und das nächste Kapitel enthalten Codebeispiele für den Export und die Aufnahme von Daten aus und in gängige Systeme. Der Code ist stark vereinfacht und enthält nur eine minimale Fehlerbehandlung. Jedes Beispiel ist als leicht verständlicher Ausgangspunkt für Datenübernahmen gedacht, aber voll funktionsfähig und erweiterbar für skalierbarere Lösungen.
Hinweis
Die Codebeispiele in diesem Kapitel schreiben die extrahierten Daten in CSV-Dateien, die dann in das Ziel-Datawarehouse geladen werden. Manchmal ist es sinnvoller, die extrahierten Daten vor dem Laden in einem anderen Format, z. B. JSON, zu speichern. Dort, wo es sinnvoll ist, weise ich darauf hin, wo du eine solche Anpassung vornehmen solltest.
In Kapitel 5 werden auch einige Open-Source-Frameworks besprochen, auf denen du aufbauen kannst, sowie kommerzielle Alternativen, die Dateningenieuren und -analysten "Low-Code"-Optionen für das Ingesting von Daten bieten.
Einrichten deiner Python-Umgebung
Alle folgenden Code-Beispiele sind in Python und SQL geschrieben und verwenden Open-Source-Frameworks, die heute im Bereich der Datentechnik üblich sind. Der Einfachheit halber ist die Anzahl der Quellen und Ziele begrenzt. Wo es möglich ist, gebe ich jedoch Hinweise, wie man sie für ähnliche Systeme anpassen kann.
Um den Beispielcode auszuführen, brauchst du eine physische oder virtuelle Maschine mit Python 3.x. Außerdem musst du ein paar Bibliotheken installieren und importieren.
Wenn du Python nicht auf deinem Rechner installiert hast, kannst du die Distribution und den Installer für dein Betriebssystem direkt von ihnen beziehen.
Hinweis
Die folgenden Befehle sind für die Kommandozeile von Linux oder Macintosh geschrieben. Unter Windows musst du eventuell die ausführbare Datei Python 3 zu deinem PATH hinzufügen.
Bevor du die in diesem Kapitel verwendeten Bibliotheken installierst, ist es am besten, eine virtuelle Umgebung zu erstellen, in der du sie installieren kannst. Dazu kannst du ein Tool namens virtualenv
verwenden. virtualenv
ist hilfreich bei der Verwaltung von Python-Bibliotheken für verschiedene Projekte und Anwendungen. Damit kannst du die Python-Bibliotheken nicht global, sondern nur innerhalb eines bestimmten Bereichs deines Projekts installieren. Erstelle zunächst eine virtuelle Umgebung namens env.
$ python -m venv env
Nachdem du deine virtuelle Umgebung erstellt hast, aktiviere sie mit dem folgenden Befehl:
$ source env/bin/activate
Du kannst auf zwei Arten überprüfen, ob deine virtuelle Umgebung aktiviert ist. Erstens wirst du feststellen, dass der Eingabeaufforderung der Name der Umgebung vorangestellt ist:
(env) $
Du kannst auch den Befehl which python
verwenden, um zu überprüfen, wo Python nach Bibliotheken sucht. Du solltest etwa so etwas sehen, das den Pfad des Verzeichnisses der virtuellen Umgebung anzeigt:
(env) $ which python env/bin/python
Jetzt kannst du die Bibliotheken installieren, die du für die folgenden Codebeispiele brauchst.
Hinweis
Auf einigen Betriebssystemen (OS) musst du python3
anstelle von python
verwenden, um die ausführbare Datei Python 3.x auszuführen. Ältere Betriebssysteme verwenden möglicherweise standardmäßig Python 2.x. Du kannst herausfinden, welche Python-Version dein Betriebssystem verwendet, indem du python --version
eingibst.
In diesem Kapitel wirst du pip
verwenden, um die in den Codebeispielen verwendeten Bibliotheken zu installieren. pip
ist ein Tool, das in den meisten Python-Distributionen enthalten ist.
Die erste Bibliothek, die du mit pip
installierst, ist configparser
, mit der du Konfigurationsinformationen auslesen kannst, die du später in eine Datei einträgst.
(env) $ pip install configparser
Als Nächstes erstellst du eine Datei namens pipeline.conf in demselben Verzeichnis wie die Python-Skripte, die du in den folgenden Abschnitten erstellen wirst. Lass die Datei vorerst leer. In den Codebeispielen in diesem Kapitel wirst du sie ergänzen müssen. In Linux- und Mac-Betriebssystemen kannst du die leere Datei auf der Kommandozeile mit dem folgenden Befehl erstellen:
(env) $ touch pipeline.conf
Einrichten der Cloud-Speicherung
Für jedes Beispiel in diesem Kapitel wirst du einen Amazon Simple Storage Service (Amazon S3 oder einfach S3) Bucket für die Speicherung von Dateien verwenden. S3 wird auf AWS gehostet und ist, wie der Name schon sagt, eine einfache Möglichkeit, Dateien zu speichern und darauf zuzugreifen. Außerdem ist es sehr kosteneffizient. Derzeit bietet AWS mit einem neuen AWS-Konto 5 GB kostenlose S3-Speicherung für 12 Monate an und berechnet danach weniger als 3 Cent pro Monat und Gigabyte für die Standard-S3-Speicherung. Angesichts der Einfachheit der Beispiele in diesem Kapitel kannst du die benötigten Daten kostenlos in S3 speichern, wenn du dich noch in den ersten 12 Monaten nach der Einrichtung eines AWS-Kontos befindest, oder für weniger als 1 USD pro Monat danach.
Um die Beispiele in diesem Kapitel auszuführen, brauchst du einen S3-Bucket. Zum Glück ist es ganz einfach, einen S3-Bucket zu erstellen, und die neuesten Anleitungen findest du in der AWS-Dokumentation. Die Einrichtung der richtigen Zugriffskontrolle für den S3-Bucket hängt davon ab, welches Data Warehouse du verwendest. Im Allgemeinen ist es am besten, AWS Identity and Access Management (IAM) Rollen für die Zugriffsverwaltung zu verwenden. Detaillierte Anweisungen zum Einrichten eines solchen Zugriffs für ein Amazon Redshift- und ein Snowflake-Datawarehouse findest du in den folgenden Abschnitten, aber befolge zunächst die Anleitung zum Erstellen eines neuen Buckets. Benenne ihn, wie du willst. Ich empfehle, die Standardeinstellungen zu verwenden und den Bucket privat zu halten.
Jedes Extraktionsbeispiel extrahiert Daten aus dem angegebenen Quellsystem und speichert die Ausgabe im S3-Bucket. Jedes Ladebeispiel in Kapitel 5 lädt diese Daten aus dem S3-Bucket in das Zielsystem. Dies ist ein gängiges Muster in Datenpipelines. Jeder große öffentliche Cloud-Provider hat einen ähnlichen Dienst wie S3. Äquivalente in anderen öffentlichen Clouds sind Azure Storage in Microsoft Azure und Google Cloud Storage (GCS) in GCP.
Es ist auch möglich, jedes Beispiel so zu ändern, dass es eine lokale oder lokale Speicherung verwendet. Es ist jedoch zusätzlicher Aufwand erforderlich, um Daten aus einer Speicherung außerhalb des Cloud-Providers in dein Data Warehouse zu laden. Unabhängig davon gelten die in diesem Kapitel beschriebenen Muster unabhängig davon, welchen Cloud-Provider du nutzt oder ob du deine Dateninfrastruktur vor Ort hostest.
Bevor ich zu den einzelnen Beispielen übergehe, musst du noch eine weitere Python-Bibliothek installieren, damit deine Skripte zum Extrahieren und Laden mit deinem S3-Bucket interagieren können. Boto3 ist das AWS SDK für Python. Stelle sicher, dass die virtuelle Umgebung, die du im vorherigen Abschnitt eingerichtet hast, aktiv ist, und benutze pip
, um sie zu installieren:
(env) $ pip install boto3
In den folgenden Beispielen wirst du aufgefordert, boto3
wie folgt in deine Python-Skripte zu importieren:
import
boto3
Da du die boto3
Python-Bibliothek verwendest, um mit deinem S3-Bucket zu interagieren, musst du auch einen IAM-Benutzer anlegen, Zugriffsschlüssel für diesen Benutzer erstellen und die Schlüssel in einer Konfigurationsdatei speichern, aus der deine Python-Skripte lesen können. Das alles ist notwendig, damit deine Skripte die Berechtigung haben, Dateien in deinem S3-Bucket zu lesen und zu schreiben.
Zuerst erstellst du den IAM-Benutzer:
-
Navigiere im Menü Services in der AWS-Konsole (oder in der oberen Navigationsleiste) zu IAM.
-
Klicke im Navigationsbereich auf Benutzer und dann auf "Benutzer hinzufügen". Gib den Benutzernamen für den neuen Benutzer ein. In diesem Beispiel nennst du den Benutzer data_pipeline_readwrite.
-
Klicke auf die Art des Zugriffs für diesen IAM-Benutzer. Klicke auf "programmatischer Zugriff", da dieser Benutzer sich nicht bei der AWS-Konsole anmelden muss, sondern über Python-Skripte programmatisch auf AWS-Ressourcen zugreift.
-
Klicke auf Weiter: Berechtigungen.
-
Auf der Seite "Berechtigungen festlegen" klickst du auf die Option "Vorhandene Richtlinien direkt an den Benutzer anhängen". Füge die Richtlinie AmazonS3FullAccess hinzu.
-
Klicke auf Weiter: Tags. Es ist eine bewährte Methode in AWS, verschiedene Objekte und Services mit Tags zu versehen, damit du sie später wiederfinden kannst. Dies ist jedoch optional.
-
Klicke auf Weiter: Überprüfen, um deine Einstellungen zu überprüfen. Wenn alles gut aussieht, klicke auf "Benutzer erstellen".
-
Du musst die Zugangsschlüssel-ID und den geheimen Zugangsschlüssel für den neuen IAM-Benutzer speichern. Dazu klickst du auf Download.csv und speicherst die Datei an einem sicheren Ort, damit du sie gleich verwenden kannst.
Zum Schluss fügst du der Datei pipeline.conf einen Abschnitt namens [aws_boto_credentials]
hinzu, in dem du die Zugangsdaten für den IAM-Benutzer und die Informationen zum S3-Bucket speicherst. Die ID deines AWS-Kontos findest du, indem du auf deinen Kontonamen oben rechts auf einer beliebigen Seite klickst, wenn du auf der AWS-Website angemeldet bist. Verwende den Namen des S3-Buckets, den du zuvor erstellt hast, für den Wert bucket_name
. Der neue Abschnitt in der pipline.conf sieht dann so aus:
[aws_boto_credentials] access_key = ijfiojr54rg8er8erg8erg8 secret_key = 5r4f84er4ghrg484eg84re84ger84 bucket_name = pipeline-bucket account_id = 4515465518
Daten aus einer MySQL-Datenbank extrahieren
Das Extrahieren von Daten aus einer MySQL-Datenbank kann auf zwei Arten erfolgen:
-
Vollständige oder inkrementelle Extraktion mit SQL
-
Binär-Log (binlog) Replikation
Die vollständige oder inkrementelle Extraktion mit SQL ist viel einfacher zu implementieren, aber auch weniger skalierbar für große Datensätze mit häufigen Änderungen. Außerdem gibt es Kompromisse zwischen vollständiger und inkrementeller Extraktion, auf die ich im folgenden Abschnitt eingehe.
Die binäre Log-Replikation ist zwar komplexer zu implementieren, eignet sich aber besser für Fälle, in denen das Datenvolumen der Änderungen in den Quelltabellen hoch ist oder in denen häufigere Dateneingaben aus der MySQL-Quelle erforderlich sind.
Hinweis
Die Binlog-Replikation ist auch ein Weg, um einen Streaming Data Ingestion zu erstellen. Im Abschnitt "Batch versus Stream Ingestion" in diesem Kapitel findest du weitere Informationen zum Unterschied zwischen den beiden Ansätzen und zu den Implementierungsmustern.
Dieser Abschnitt ist für diejenigen Leser relevant, die eine MySQL-Datenquelle haben, aus der sie Daten extrahieren müssen. Wenn du jedoch eine einfache Datenbank einrichten möchtest, um die Codebeispiele auszuprobieren, hast du zwei Möglichkeiten. Erstens kannst du MySQL kostenlos auf deinem lokalen Rechner oder deiner virtuellen Maschine installieren. Ein Installationsprogramm für dein Betriebssystem findest du auf der MySQL-Downloadseite.
Alternativ kannst du auch eine vollständig verwaltete Amazon RDS for MySQL-Instanz in AWS erstellen. Ich finde diese Methode unkomplizierter, und es ist schön, dass ich auf meinem lokalen Rechner kein unnötiges Chaos anrichten muss!
Warnung
Wenn du den verlinkten Anweisungen folgst, um eine MySQL RDS-Datenbankinstanz einzurichten, wirst du aufgefordert, deine Datenbank als öffentlich zugänglich einzustellen. Das ist für das Lernen und Arbeiten mit Beispieldaten genau richtig. Es macht es sogar viel einfacher, sich von dem Rechner aus zu verbinden, auf dem du die Beispiele in diesem Abschnitt ausführst. Für mehr Sicherheit in einer Produktionsumgebung empfehle ich dir jedoch, die bewährten Methoden für Amazon RDS zu befolgen.
Beachte, dass genau wie bei den oben erwähnten S3-Preisen Kosten anfallen, wenn du nicht mehr für den kostenlosen Tier von AWS in Frage kommst. Ansonsten ist das Einrichten und Betreiben kostenlos! Erinnere dich nur daran, deine RDS-Instanz zu löschen, wenn du fertig bist, damit du es nicht vergisst und dir Gebühren entstehen, wenn dein kostenloses Angebot ausläuft.
Die Codebeispiele in diesem Abschnitt sind recht einfach und beziehen sich auf eine Tabelle namens Orders
in einer MySQL-Datenbank. Sobald du eine MySQL-Instanz zur Verfügung hast, kannst du die Tabelle erstellen und einige Beispielzeilen einfügen, indem du die folgenden SQL-Befehle ausführst:
CREATE
TABLE
Orders
(
OrderId
int
,
OrderStatus
varchar
(
30
),
LastUpdated
timestamp
);
INSERT
INTO
Orders
VALUES
(
1
,
'Backordered'
,
'2020-06-01 12:00:00'
);
INSERT
INTO
Orders
VALUES
(
1
,
'Shipped'
,
'2020-06-09 12:00:25'
);
INSERT
INTO
Orders
VALUES
(
2
,
'Shipped'
,
'2020-07-11 3:05:00'
);
INSERT
INTO
Orders
VALUES
(
1
,
'Shipped'
,
'2020-06-09 11:50:00'
);
INSERT
INTO
Orders
VALUES
(
3
,
'Shipped'
,
'2020-07-12 12:00:00'
);
Vollständige oder inkrementelle MySQL-Tabellenextraktion
Wenn du entweder alle oder eine Teilmenge von Spalten aus einer MySQL-Tabelle in ein Data Warehouse oder einen Data Lake übernehmen willst, kannst du entweder die vollständige oder die inkrementelle Extraktion verwenden.
Bei einer vollständigen Extraktion wird jeder Datensatz der Tabelle bei jedem Durchlauf des Extraktionsauftrags extrahiert. Dies ist die am wenigsten komplexe Methode, kann aber bei Tabellen mit hohem Datenaufkommen sehr lange dauern. Wenn du z. B. eine vollständige Extraktion für eine Tabelle namens Orders
durchführen möchtest, sieht das SQL, das in der MySQL-Quelldatenbank ausgeführt wird, wie folgt aus:
SELECT
*
FROM
Orders
;
Bei einer inkrementellen Extraktion werden nur die Datensätze aus der Quelltabelle extrahiert, die sich seit dem letzten Lauf des Auftrags geändert haben oder neu hinzugekommen sind. Der Zeitstempel der letzten Extraktion kann entweder in einer Extraktionsauftragsprotokolltabelle im Data Warehouse gespeichert oder durch Abfrage des maximalen Zeitstempels in einer LastUpdated
Spalte in der Zieltabelle im Warehouse abgerufen werden. Am Beispiel der fiktiven Tabelle Orders
sieht die SQL-Abfrage, die in der MySQL-Quelldatenbank ausgeführt wird, wie folgt aus:
SELECT
*
FROM
Orders
WHERE
LastUpdated
>
{{
last_extraction_run
}
}
;
Hinweis
Für Tabellen, die unveränderliche Daten enthalten (d.h. Datensätze können eingefügt, aber nicht aktualisiert werden), kannst du anstelle einer LastUpdated
Spalte den Zeitstempel verwenden, der angibt, wann der Datensatz erstellt wurde.
Die Variable {{ last_extraction_run }}
ist ein Zeitstempel, der den letzten Lauf des Extraktionsauftrags angibt. In der Regel wird sie von der Zieltabelle im Data Warehouse abgefragt. In diesem Fall würde das folgende SQL im Data Warehouse ausgeführt und der daraus resultierende Wert für {{ last_extraction_run }}
verwendet:
SELECT
MAX
(
LastUpdated
)
FROM
warehouse
.
Orders
;
Obwohl die inkrementelle Extraktion ideal für eine optimale Leistung ist, gibt es einige Nachteile und Gründe warum sie für eine bestimmte Tabelle nicht möglich ist.
Erstens: Bei dieser Methode werden gelöschte Zeilen nicht erfasst. Wenn eine Zeile aus der MySQL-Quelltabelle gelöscht wird, erfährst du das nicht und sie bleibt in der Zieltabelle, als hätte sich nichts geändert.
Zweitens muss die Quelltabelle einen verlässlichen Zeitstempel haben, wann sie zuletzt aktualisiert wurde (die Spalte LastUpdated
im vorherigen Beispiel). Es ist nicht ungewöhnlich, dass in den Tabellen des Quellsystems eine solche Spalte fehlt oder dass sie nicht zuverlässig aktualisiert wird. Nichts hindert Entwickler daran, Datensätze in der Quelltabelle zu aktualisieren und dabei zu vergessen, den Zeitstempel LastUpdated
zu aktualisieren.
Die inkrementelle Extraktion macht es jedoch einfacher, aktualisierte Zeilen zu erfassen. Wenn in den folgenden Codebeispielen eine bestimmte Zeile in der Tabelle Orders
aktualisiert wird, bringen sowohl die vollständige als auch die inkrementelle Extraktion die neueste Version der Zeile zurück. Bei der vollständigen Extraktion gilt das für alle Zeilen in der Tabelle, da die Extraktion eine vollständige Kopie der Tabelle abruft. Bei der inkrementellen Extraktion werden nur die Zeilen abgerufen, die sich geändert haben.
Wenn es Zeit für den Ladeschritt ist, werden Vollextrakte in der Regel geladen, indem zunächst die Zieltabelle abgeschnitten und die neu extrahierten Daten geladen werden. In diesem Fall steht dir nur die letzte Version der Zeile im Data Warehouse zur Verfügung.
Wenn du Daten aus einer inkrementellen Extraktion lädst, werden die resultierenden Daten an die Daten in der Zieltabelle angehängt. In diesem Fall hast du sowohl den ursprünglichen Datensatz als auch die aktualisierte Version. Beides kann bei der Umwandlung und Analyse von Daten nützlich sein, wie ich in Kapitel 6 erkläre.
Tabelle 4-1 zeigt zum Beispiel den ursprünglichen Datensatz für OrderId
1 in der MySQL-Datenbank. Als der Kunde die Bestellung aufgegeben hat, war sie im Rückstand. Tabelle 4-2 zeigt den aktualisierten Datensatz in der MySQL-Datenbank. Wie du siehst, wurde die Bestellung aktualisiert, weil sie am 2020-06-09 ausgeliefert wurde.
OrderId | OrderStatus | LastUpdated |
---|---|---|
1 |
Rückständige Bestellungen |
2020-06-01 12:00:00 |
OrderId | OrderStatus | LastUpdated |
---|---|---|
1 |
Versandt |
2020-06-09 12:00:25 |
Wenn eine vollständige Extraktion durchgeführt wird, wird die Zieltabelle im Data Warehouse zunächst abgeschnitten und dann mit der Ausgabe der Extraktion geladen. Das Ergebnis für OrderId
1 ist der in Tabelle 4-2 gezeigte einzelne Datensatz. Bei einer inkrementellen Extraktion hingegen wird die Ausgabe der Extraktion einfach an die Zieltabelle im Data Warehouse angehängt. Das Ergebnis ist, dass sich sowohl die ursprünglichen als auch die aktualisierten Datensätze für OrderId
1 im Data Warehouse befinden, wie in Tabelle 4-3 dargestellt.
OrderId | OrderStatus | LastUpdated |
---|---|---|
1 |
Rückständige Bestellungen |
2020-06-01 12:00:00 |
1 |
Versandt |
2020-06-09 12:00:25 |
Mehr über das Laden von vollständigen und inkrementellen Extraktionen erfährst du in Kapitel 5, u.a. unter "Laden von Daten in ein Redshift Warehouse".
Warnung
Gehe nie davon aus, dass eine LastUpdated
Spalte in einem Quellsystem zuverlässig aktualisiert wird. Erkundige dich beim Eigentümer des Quellsystems und bestätige dies, bevor du dich für eine inkrementelle Extraktion auf diese Spalte verlässt.
Sowohl vollständige als auch inkrementelle Extraktionen aus einer MySQL-Datenbank können mit SQL-Abfragen implementiert werden, die in der Datenbank ausgeführt, aber von Python-Skripten ausgelöst werden. Zusätzlich zu den Python-Bibliotheken, die in den vorherigen Abschnitten installiert wurden, musst du die Bibliothek PyMySQL
mit pip
installieren:
(env) $ pip install pymysql
Du musst auch einen neuen Abschnitt in die Datei pipeline.conf einfügen, um die Verbindungsinformationen für die MySQL-Datenbank zu speichern:
[mysql_config] hostname = my_host.com port = 3306 username = my_user_name password = my_password database = db_name
Erstelle nun ein neues Python-Skript namens extract_mysql_full.py. Du musst mehrere Bibliotheken importieren, z. B. pymysql
, die eine Verbindung zur MySQL-Datenbank herstellt, und die Bibliothek csv
, damit du die extrahierten Daten strukturieren und in eine flache Datei schreiben kannst, die sich im Ladeschritt des Ingestion leicht in ein Data Warehouse importieren lässt. Importiere auch boto3
, damit du die resultierende CSV-Datei in deinen S3-Bucket hochladen kannst, um sie später in das Data Warehouse zu laden:
import
pymysql
import
csv
import
boto3
import
configparser
Jetzt kannst du eine Verbindung zur MySQL-Datenbank initialisieren:
parser
=
configparser
.
ConfigParser
()
parser
.
read
(
"pipeline.conf"
)
hostname
=
parser
.
get
(
"mysql_config"
,
"hostname"
)
port
=
parser
.
get
(
"mysql_config"
,
"port"
)
username
=
parser
.
get
(
"mysql_config"
,
"username"
)
dbname
=
parser
.
get
(
"mysql_config"
,
"database"
)
password
=
parser
.
get
(
"mysql_config"
,
"password"
)
conn
=
pymysql
.
connect
(
host
=
hostname
,
user
=
username
,
password
=
password
,
db
=
dbname
,
port
=
int
(
port
))
if
conn
is
None
:
(
"Error connecting to the MySQL database"
)
else
:
(
"MySQL connection established!"
)
Führe eine vollständige Extraktion der Tabelle Orders
aus dem vorherigen Beispiel aus. Der folgende Code extrahiert den gesamten Inhalt der Tabelle und schreibt ihn in eine CSV-Datei mit Pipe-Begrenzung. Für die Extraktion wird ein cursor
Objekt aus der pymysql
Bibliothek verwendet, um die SELECT-Abfrage auszuführen:
m_query
=
"SELECT * FROM Orders;"
local_filename
=
"order_extract.csv"
m_cursor
=
conn
.
cursor
()
m_cursor
.
execute
(
m_query
)
results
=
m_cursor
.
fetchall
()
with
open
(
local_filename
,
'w'
)
as
fp
:
csv_w
=
csv
.
writer
(
fp
,
delimiter
=
'|'
)
csv_w
.
writerows
(
results
)
fp
.
close
()
m_cursor
.
close
()
conn
.
close
()
Nachdem die CSV-Datei lokal geschrieben wurde, muss sie in den S3-Bucket hochgeladen werden, damit sie später in das Data Warehouse oder ein anderes Ziel geladen werden kann. Wie im Abschnitt "Einrichten der Cloud-Dateispeicherung" beschrieben, hast du für die Boto3-Bibliothek einen IAM-Benutzer eingerichtet, der für die Authentifizierung gegenüber dem S3-Bucket verwendet wird. Außerdem hast du die Anmeldedaten im Abschnitt aws_boto_credentials
der Datei pipeline.conf gespeichert. Hier ist der Code zum Hochladen der CSV-Datei in deinen S3-Bucket:
# load the aws_boto_credentials values
parser
=
configparser
.
ConfigParser
()
parser
.
read
(
"pipeline.conf"
)
access_key
=
parser
.
get
(
"aws_boto_credentials"
,
"access_key"
)
secret_key
=
parser
.
get
(
"aws_boto_credentials"
,
"secret_key"
)
bucket_name
=
parser
.
get
(
"aws_boto_credentials"
,
"bucket_name"
)
s3
=
boto3
.
client
(
's3'
,
aws_access_key_id
=
access_key
,
aws_secret_access_key
=
secret_key
)
s3_file
=
local_filename
s3
.
upload_file
(
local_filename
,
bucket_name
,
s3_file
)
Du kannst das Skript wie folgt ausführen:
(env) $ python extract_mysql_full.py
Wenn das Skript ausgeführt wird, befindet sich der gesamte Inhalt der Tabelle Orders
in einer CSV-Datei im S3-Bucket und wartet darauf, in das Data Warehouse oder einen anderen Datenspeicher geladen zu werden. In Kapitel 5 erfährst du mehr über das Laden in einen Datenspeicher deiner Wahl.
Wenn du die Daten inkrementell extrahieren willst, musst du ein paar Änderungen am Skript vornehmen. Ich schlage vor, eine Kopie von extract_mysql_full.py mit dem Namen extract_mysql_incremental.py zu erstellen.
Finde zunächst den Zeitstempel des letzten Datensatzes, der aus der Quelltabelle Orders
extrahiert wurde. Dazu fragst du den Wert MAX(LastUpdated)
aus der Tabelle Orders
im Data Warehouse ab. In diesem Beispiel verwende ich ein Redshift Data Warehouse (siehe "Konfigurieren eines Amazon Redshift Warehouse als Ziel"), aber du kannst die gleiche Logik auch mit einem Warehouse deiner Wahl anwenden.
Um mit deinem Redshift-Cluster zu interagieren, installiere die psycopg2
Bibliothek, falls du das nicht schon getan hast.
(env) $ pip install psycopg2
Hier ist der Code, um eine Verbindung zum Redshift-Cluster herzustellen und den Wert MAX(LastUpdated)
aus der Tabelle Orders
abzufragen:
import
psycopg2
# get db Redshift connection info
parser
=
configparser
.
ConfigParser
()
parser
.
read
(
"pipeline.conf"
)
dbname
=
parser
.
get
(
"aws_creds"
,
"database"
)
user
=
parser
.
get
(
"aws_creds"
,
"username"
)
password
=
parser
.
get
(
"aws_creds"
,
"password"
)
host
=
parser
.
get
(
"aws_creds"
,
"host"
)
port
=
parser
.
get
(
"aws_creds"
,
"port"
)
# connect to the redshift cluster
rs_conn
=
psycopg2
.
connect
(
"dbname="
+
dbname
+
" user="
+
user
+
" password="
+
password
+
" host="
+
host
+
" port="
+
port
)
rs_sql
=
"""SELECT COALESCE(MAX(LastUpdated),
'1900-01-01')
FROM Orders;"""
rs_cursor
=
rs_conn
.
cursor
()
rs_cursor
.
execute
(
rs_sql
)
result
=
rs_cursor
.
fetchone
()
# there's only one row and column returned
last_updated_warehouse
=
result
[
0
]
rs_cursor
.
close
()
rs_conn
.
commit
()
Ändere die Extraktionsabfrage in der MySQL-Datenbank mit dem in last_updated_warehouse
gespeicherten Wert so ab, dass nur die Datensätze aus der Tabelle Orders
abgerufen werden, die seit dem letzten Lauf des Extraktionsauftrags aktualisiert worden sind. Die neue Abfrage enthält einen Platzhalter, der durch %s
für den Wert last_updated_warehouse
dargestellt wird. Der Wert wird dann als Tupel (ein Datentyp, der zum Speichern von Datensammlungen verwendet wird) an die Funktion .execute()
des Cursors übergeben. Dies ist der richtige und sichere Weg, um einer SQL-Abfrage Parameter hinzuzufügen und so eine mögliche SQL-Injection zu vermeiden. Hier ist der aktualisierte Codeblock für die Ausführung der SQL-Abfrage in der MySQL-Datenbank:
m_query
=
"""SELECT *
FROM Orders
WHERE LastUpdated >
%s
;"""
local_filename
=
"order_extract.csv"
m_cursor
=
conn
.
cursor
()
m_cursor
.
execute
(
m_query
,
(
last_updated_warehouse
,))
Das gesamte Skript extract_mysql_incremental.py für die inkrementelle Extraktion (unter Verwendung eines Redshift-Clusters für den Wert last_updated
) sieht wie folgt aus:
import
pymysql
import
csv
import
boto3
import
configparser
import
psycopg2
# get db Redshift connection info
parser
=
configparser
.
ConfigParser
()
parser
.
read
(
"pipeline.conf"
)
dbname
=
parser
.
get
(
"aws_creds"
,
"database"
)
user
=
parser
.
get
(
"aws_creds"
,
"username"
)
password
=
parser
.
get
(
"aws_creds"
,
"password"
)
host
=
parser
.
get
(
"aws_creds"
,
"host"
)
port
=
parser
.
get
(
"aws_creds"
,
"port"
)
# connect to the redshift cluster
rs_conn
=
psycopg2
.
connect
(
"dbname="
+
dbname
+
" user="
+
user
+
" password="
+
password
+
" host="
+
host
+
" port="
+
port
)
rs_sql
=
"""SELECT COALESCE(MAX(LastUpdated),
'1900-01-01')
FROM Orders;"""
rs_cursor
=
rs_conn
.
cursor
()
rs_cursor
.
execute
(
rs_sql
)
result
=
rs_cursor
.
fetchone
()
# there's only one row and column returned
last_updated_warehouse
=
result
[
0
]
rs_cursor
.
close
()
rs_conn
.
commit
()
# get the MySQL connection info and connect
parser
=
configparser
.
ConfigParser
()
parser
.
read
(
"pipeline.conf"
)
hostname
=
parser
.
get
(
"mysql_config"
,
"hostname"
)
port
=
parser
.
get
(
"mysql_config"
,
"port"
)
username
=
parser
.
get
(
"mysql_config"
,
"username"
)
dbname
=
parser
.
get
(
"mysql_config"
,
"database"
)
password
=
parser
.
get
(
"mysql_config"
,
"password"
)
conn
=
pymysql
.
connect
(
host
=
hostname
,
user
=
username
,
password
=
password
,
db
=
dbname
,
port
=
int
(
port
))
if
conn
is
None
:
(
"Error connecting to the MySQL database"
)
else
:
(
"MySQL connection established!"
)
m_query
=
"""SELECT *
FROM Orders
WHERE LastUpdated >
%s
;"""
local_filename
=
"order_extract.csv"
m_cursor
=
conn
.
cursor
()
m_cursor
.
execute
(
m_query
,
(
last_updated_warehouse
,))
results
=
m_cursor
.
fetchall
()
with
open
(
local_filename
,
'w'
)
as
fp
:
csv_w
=
csv
.
writer
(
fp
,
delimiter
=
'|'
)
csv_w
.
writerows
(
results
)
fp
.
close
()
m_cursor
.
close
()
conn
.
close
()
# load the aws_boto_credentials values
parser
=
configparser
.
ConfigParser
()
parser
.
read
(
"pipeline.conf"
)
access_key
=
parser
.
get
(
"aws_boto_credentials"
,
"access_key"
)
secret_key
=
parser
.
get
(
"aws_boto_credentials"
,
"secret_key"
)
bucket_name
=
parser
.
get
(
"aws_boto_credentials"
,
"bucket_name"
)
s3
=
boto3
.
client
(
's3'
,
aws_access_key_id
=
access_key
,
aws_secret_access_key
=
secret_key
)
s3_file
=
local_filename
s3
.
upload_file
(
local_filename
,
bucket_name
,
s3_file
)
Warnung
Hüte dich davor, dass große Extraktionsaufträge - egal ob vollständig oder inkrementell - die MySQL-Quelldatenbank belasten und sogar die Ausführung von Produktionsabfragen blockieren. Sprich mit dem Eigentümer der Datenbank und ziehe in Erwägung, ein Replikat für die Extraktion einzurichten, anstatt aus der primären Quelldatenbank zu extrahieren.
Binäre Log-Replikation von MySQL-Daten
Obwohl die Implementierung komplexer ist, ist das Einlesen von Daten aus einer MySQL-Datenbank unter Verwendung des Inhalts des MySQL-Binlogs zur Replikation von Änderungen effizient, wenn große Mengen an Daten eingelesen werden müssen.
Hinweis
Die Binlog-Replikation ist eine Form der Änderungsdatenerfassung (CDC). Viele Quelldatenspeicher verfügen über eine Form von CDC, die du nutzen kannst.
Das MySQL binlog ist ein Protokoll, das jede Operation in der Datenbank aufzeichnet. Je nachdem, wie es konfiguriert ist, protokolliert es zum Beispiel die Einzelheiten jeder Tabellenerstellung oder -änderung sowie jede INSERT
, UPDATE
und DELETE
Operation. Obwohl es ursprünglich dazu gedacht war, Daten auf andere MySQL-Instanzen zu replizieren, ist es nicht schwer zu verstehen, warum der Inhalt des binlog für Dateningenieure, die Daten in ein Data Warehouse einspeisen wollen, so interessant ist.
Da es sich bei deinem Data Warehouse wahrscheinlich nicht um eine MySQL-Datenbank handelt, ist es nicht möglich, einfach die eingebauten MySQL-Replikationsfunktionen zu nutzen. Um das binlog für die Dateneingabe in eine Nicht-MySQL-Quelle zu nutzen, sind einige Schritte erforderlich:
-
Aktiviere und konfiguriere das binlog auf dem MySQL-Server.
-
Führe eine erste vollständige Tabellenextraktion und -ladung durch.
-
Auszug aus dem Binlog auf kontinuierlicher Basis.
-
Übersetze und lade Binlog-Auszüge in das Data Warehouse.
Hinweis
Schritt 3 wird nicht im Detail besprochen, aber um das Binlog für die Aufnahme zu verwenden, musst du zunächst die Tabellen im Data Warehouse mit dem aktuellen Stand der MySQL-Datenbank auffüllen und dann das Binlog verwenden, um nachfolgende Änderungen aufzunehmen. Dazu musst du die Tabellen, die du extrahieren möchtest, mit LOCK
verknüpfen, mysqldump
ausführen und das Ergebnis von mysqldump
in das Warehouse laden, bevor du die binlog-Ingestion aktivierst.
Obwohl es am besten ist, in der aktuellen MySQL binlog-Dokumentation nachzulesen, wie man das binäre Logging aktiviert und konfiguriert, werde ich die wichtigsten Konfigurationswerte durchgehen.
Es gibt zwei wichtige Einstellungen in der MySQL-Datenbank, die bei der binlog-Konfiguration beachtet werden müssen.
Stelle zunächst sicher, dass die binäre Protokollierung aktiviert ist. Normalerweise ist sie standardmäßig aktiviert, aber du kannst das überprüfen, indem du die folgende SQL-Abfrage in der Datenbank ausführst (die genaue Syntax kann je nach MySQL-Distribution variieren):
SELECT
variable_value
as
bin_log_status
FROM
performance_schema
.
global_variables
WHERE
variable_name
=
'log_bin'
;
Wenn das binäre Logging aktiviert ist, siehst du Folgendes. Wenn der zurückgegebene Status OFF
lautet, musst du die MySQL-Dokumentation für die entsprechende Version konsultieren, um sie zu aktivieren.
+ — — — — — — — — — — — — — — — — — — -+ | bin_log_status :: | + — — — — — — — — — — — — — — — — — — -+ | ON | + — — — — — — — — — — — — — — — — — — -+ 1 row in set (0.00 sec)
Vergewissere dich als Nächstes, dass das binäre Logging-Format richtig eingestellt ist. Es gibt drei Formate, die in der aktuellen Version von MySQL unterstützt werden:
-
STATEMENT
-
ROW
-
MIXED
Das Format STATEMENT
protokolliert jede SQL-Anweisung, die eine Zeile im binlog einfügt oder ändert. Wenn du Daten von einer MySQL-Datenbank zu einer anderen replizieren möchtest, ist dieses Format nützlich. Um die Daten zu replizieren, könntest du einfach alle Anweisungen ausführen, um den Zustand der Datenbank zu reproduzieren. Da die extrahierten Daten jedoch wahrscheinlich für ein Data Warehouse bestimmt sind, das auf einer anderen Plattform läuft, sind die in der MySQL-Datenbank erzeugten SQL-Anweisungen möglicherweise nicht mit deinem Data Warehouse kompatibel.
Im Format ROW
wird jede Änderung einer Tabellenzeile in einer Zeile des binlogs nicht als SQL-Anweisung dargestellt, sondern als die Daten in der Zeile selbst. Dies ist das bevorzugte Format, das verwendet wird.
Das Format MIXED
protokolliert sowohl STATEMENT
- als auch ROW
-formatierte Datensätze im binlog. Obwohl es möglich ist, später nur die ROW
Daten auszusieben, ist es nicht notwendig, MIXED
zu aktivieren, wenn das binlog nicht für andere Zwecke verwendet wird, da es zusätzlichen Speicherplatz beansprucht.
Du kannst das aktuelle binlog-Format überprüfen, indem du die folgende SQL-Abfrage ausführst:
SELECT
variable_value
as
bin_log_format
FROM
performance_schema
.
global_variables
WHERE
variable_name
=
'binlog_format'
;
Die Anweisung gibt das Format zurück, das gerade aktiv ist:
+ — — — — — — — — — — — — — — — — — — — -+ | bin_log_format :: | + — — — — — — — — — — — — — — — — — — — -+ | ROW | + — — — — — — — — — — — — — — — — — — — -+ 1 row in set (0.00 sec)
Das binlog-Format und andere Konfigurationseinstellungen werden normalerweise in der Datei my.cnf für die MySQL-Datenbankinstanz festgelegt. Wenn du die Datei öffnest, wirst du eine Zeile wie die folgende sehen:
[mysqld] binlog_format=row ........
Auch hier ist es am besten, den Eigentümer der MySQL-Datenbank oder die aktuelle MySQL-Dokumentation zu konsultieren, bevor du irgendwelche Konfigurationen änderst.
Jetzt, da die binäre Protokollierung in einem ROW
Format aktiviert ist, kannst du einen Prozess erstellen, um die relevanten Informationen daraus zu extrahieren und in einer Datei zu speichern, die in dein Data Warehouse geladen wird.
Es gibt drei verschiedene Arten von ROW
-formatierten Ereignissen, die du aus dem binlog ziehen willst. Für dieses Ingestion-Beispiel kannst du andere Ereignisse, die du im Log findest, ignorieren. Bei fortgeschritteneren Replikationsstrategien ist es jedoch auch sinnvoll, Ereignisse zu extrahieren, die die Struktur einer Tabelle verändern. Die Ereignisse, mit denen du arbeiten wirst, sind die folgenden
-
WRITE_ROWS_EVENT
-
UPDATE_ROWS_EVENT
-
DELETE_ROWS_EVENT
Als Nächstes ist es an der Zeit, die Ereignisse aus dem binlog abzurufen. Zum Glück gibt es einige Open-Source-Python-Bibliotheken, mit denen du loslegen kannst. Eine der beliebtesten ist das Projekt python-mysql-replication
, das du auf GitHub findest. Um loszulegen, installiere es mit pip
:
(env) $ pip install mysql-replication
Um eine Vorstellung davon zu bekommen, wie die Ausgabe des binlogs aussieht, kannst du dich mit der Datenbank verbinden und das binlog auslesen. In diesem Beispiel verwende ich die MySQL-Verbindungsinformationen, die in der Datei pipeline.conf für das Beispiel der vollständigen und inkrementellen Erfassung weiter oben in diesem Abschnitt hinzugefügt wurden.
Hinweis
Das folgende Beispiel liest aus der Standard-Binlog-Datei des MySQL-Servers. Der Standard-Binlog-Dateiname und der Pfad werden in der Variable log_bin
festgelegt, die in der Datei my.cnf für die MySQL-Datenbank gespeichert ist. In manchen Fällen werden die binlogs im Laufe der Zeit rotiert (vielleicht täglich oder stündlich). In diesem Fall musst du den Dateipfad auf der Grundlage der vom MySQL-Administrator gewählten Methode der Log-Rotation und des Dateinamensschemas bestimmen und ihn als Wert an den Parameter log_file
übergeben, wenn du die Instanz BinLogStreamReader
erstellst. Weitere Informationen findest du in der Dokumentation für die Klasse BinLogStreamReader
.
from
pymysqlreplication
import
BinLogStreamReader
from
pymysqlreplication
import
row_event
import
configparser
import
pymysqlreplication
# get the MySQL connection info
parser
=
configparser
.
ConfigParser
()
parser
.
read
(
"pipeline.conf"
)
hostname
=
parser
.
get
(
"mysql_config"
,
"hostname"
)
port
=
parser
.
get
(
"mysql_config"
,
"port"
)
username
=
parser
.
get
(
"mysql_config"
,
"username"
)
password
=
parser
.
get
(
"mysql_config"
,
"password"
)
mysql_settings
=
{
"host"
:
hostname
,
"port"
:
int
(
port
),
"user"
:
username
,
"passwd"
:
password
}
b_stream
=
BinLogStreamReader
(
connection_settings
=
mysql_settings
,
server_id
=
100
,
only_events
=
[
row_event
.
DeleteRowsEvent
,
row_event
.
WriteRowsEvent
,
row_event
.
UpdateRowsEvent
]
)
for
event
in
b_stream
:
event
.
dump
()
b_stream
.
close
()
Es gibt ein paar Dinge über das BinLogStreamReader
Objekt zu beachten, das im Codebeispiel instanziiert wird. Zunächst stellt es eine Verbindung zur MySQL-Datenbank her, die in der Datei pipeline.conf angegeben ist, und liest aus einer bestimmten binlog-Datei. Als Nächstes wird durch die Kombination aus der Einstellung resume_stream=True
und dem Wert log_pos
festgelegt, dass das Lesen des Binlogs an einer bestimmten Stelle beginnt. In diesem Fall ist das die Position 1400. Schließlich weise ich BinLogStreamReader
an, nur die Ereignisse DeleteRowsEvent
, WriteRowsEvent
und UpdateRowsEvent
zu lesen, indem ich den Parameter only_events
verwende.
Als Nächstes durchläuft das Skript alle Ereignisse und gibt sie in einem für Menschen lesbaren Format aus. Für deine Datenbank mit der Tabelle Orders
siehst du in etwa die folgende Ausgabe:
=== WriteRowsEvent === Date: 2020-06-01 12:00:00 Log position: 1400 Event size: 30 Read bytes: 20 Table: orders Affected columns: 3 Changed rows: 1 Values: -- * OrderId : 1 * OrderStatus : Backordered * LastUpdated : 2020-06-01 12:00:00 === UpdateRowsEvent === Date: 2020-06-09 12:00:25 Log position: 1401 Event size: 56 Read bytes: 15 Table: orders Affected columns: 3 Changed rows: 1 Affected columns: 3 Values: -- * OrderId : 1 => 1 * OrderStatus : Backordered => Shipped * LastUpdated : 2020-06-01 12:00:00 => 2020-06-09 12:00:25
Wie du sehen kannst, gibt es zwei Ereignisse, die die INSERT
und UPDATE
von OrderId
1 darstellen, die in Tabelle 4-3 gezeigt wurden. In diesem fiktiven Beispiel liegen die beiden aufeinanderfolgenden binlog-Ereignisse Tage auseinander, aber in der Realität lägen zahlreiche Ereignisse dazwischen, die alle Änderungen in der Datenbank darstellen.
Hinweis
Der Wert von log_pos
, der BinLogStreamReader
mitteilt, wo er beginnen soll, ist ein Wert, den du irgendwo in einer eigenen Tabelle speichern musst, damit du weißt, wo du bei der nächsten Extraktion weitermachen musst. Ich finde es am besten, den Wert in einer Log-Tabelle im Data Warehouse zu speichern, aus der er gelesen werden kann, wenn die Extraktion beginnt, und in die er geschrieben werden kann, zusammen mit dem Positionswert des letzten Ereignisses, wenn sie beendet ist.
Das Codebeispiel zeigt zwar, wie die Ereignisse in einem für Menschen lesbaren Format aussehen, aber damit die Ausgabe einfach in das Data Warehouse geladen werden kann, sind noch ein paar weitere Schritte erforderlich:
-
Analysiere die Daten und schreibe sie in ein anderes Format. Um das Laden zu vereinfachen, wird im nächsten Codebeispiel jedes Ereignis in eine Zeile einer CSV-Datei geschrieben.
-
Schreibe eine Datei pro Tabelle, die du extrahieren und laden möchtest. Das Beispiel-Binlog enthält zwar nur Ereignisse, die sich auf die Tabelle
Orders
beziehen, aber es ist sehr wahrscheinlich, dass in einem echten Binlog auch Ereignisse enthalten sind, die sich auf andere Tabellen beziehen.
Um die erste Änderung vorzunehmen, werde ich statt der Funktion .dump()
die Ereignisattribute auslesen und in eine CSV-Datei schreiben. Bei der zweiten Änderung schreibe ich der Einfachheit halber nur die Ereignisse der Tabelle Orders
in eine Datei namens orders_extract.csv, anstatt für jede Tabelle eine eigene Datei zu erstellen. Bei einer vollständig implementierten Extraktion musst du dieses Codebeispiel ändern, um die Ereignisse nach Tabellen zu gruppieren und mehrere Dateien zu schreiben, eine für jede Tabelle, für die du Änderungen aufnehmen willst. Im letzten Schritt des finalen Codebeispiels wird die CSV-Datei in den S3-Bucket hochgeladen, damit sie in das Data Warehouse geladen werden kann, wie in Kapitel 5 ausführlich beschrieben:
from
pymysqlreplication
import
BinLogStreamReader
from
pymysqlreplication
import
row_event
import
configparser
import
pymysqlreplication
import
csv
import
boto3
# get the MySQL connection info
parser
=
configparser
.
ConfigParser
()
parser
.
read
(
"pipeline.conf"
)
hostname
=
parser
.
get
(
"mysql_config"
,
"hostname"
)
port
=
parser
.
get
(
"mysql_config"
,
"port"
)
username
=
parser
.
get
(
"mysql_config"
,
"username"
)
password
=
parser
.
get
(
"mysql_config"
,
"password"
)
mysql_settings
=
{
"host"
:
hostname
,
"port"
:
int
(
port
),
"user"
:
username
,
"passwd"
:
password
}
b_stream
=
BinLogStreamReader
(
connection_settings
=
mysql_settings
,
server_id
=
100
,
only_events
=
[
row_event
.
DeleteRowsEvent
,
row_event
.
WriteRowsEvent
,
row_event
.
UpdateRowsEvent
]
)
order_events
=
[]
for
binlogevent
in
b_stream
:
for
row
in
binlogevent
.
rows
:
if
binlogevent
.
table
==
'orders'
:
event
=
{}
if
isinstance
(
binlogevent
,
row_event
.
DeleteRowsEvent
):
event
[
"action"
]
=
"delete"
event
.
update
(
row
[
"values"
]
.
items
())
elif
isinstance
(
binlogevent
,
row_event
.
UpdateRowsEvent
):
event
[
"action"
]
=
"update"
event
.
update
(
row
[
"after_values"
]
.
items
())
elif
isinstance
(
binlogevent
,
row_event
.
WriteRowsEvent
):
event
[
"action"
]
=
"insert"
event
.
update
(
row
[
"values"
]
.
items
())
order_events
.
append
(
event
)
b_stream
.
close
()
keys
=
order_events
[
0
]
.
keys
()
local_filename
=
'orders_extract.csv'
with
open
(
local_filename
,
'w'
,
newline
=
''
)
as
output_file
:
dict_writer
=
csv
.
DictWriter
(
output_file
,
keys
,
delimiter
=
'|'
)
dict_writer
.
writerows
(
order_events
)
# load the aws_boto_credentials values
parser
=
configparser
.
ConfigParser
()
parser
.
read
(
"pipeline.conf"
)
access_key
=
parser
.
get
(
"aws_boto_credentials"
,
"access_key"
)
secret_key
=
parser
.
get
(
"aws_boto_credentials"
,
"secret_key"
)
bucket_name
=
parser
.
get
(
"aws_boto_credentials"
,
"bucket_name"
)
s3
=
boto3
.
client
(
's3'
,
aws_access_key_id
=
access_key
,
aws_secret_access_key
=
secret_key
)
s3_file
=
local_filename
s3
.
upload_file
(
local_filename
,
bucket_name
,
s3_file
)
Nach der Ausführung sieht die Datei orders_extract.csv wie folgt aus:
insert|1|Backordered|2020-06-01 12:00:00 update|1|Shipped|2020-06-09 12:00:25
Wie ich in Kapitel 5 erkläre, ist das Format der resultierenden CSV-Datei für schnelles Laden optimiert. Die Aufbereitung der extrahierten Daten ist eine Aufgabe für den Transformationsschritt in einer Pipeline, die in Kapitel 6 ausführlich beschrieben wird.
Daten aus einer PostgreSQL-Datenbank extrahieren
Wie bei MySQL gibt es auch bei PostgreSQL (allgemein als Postgres bekannt) zwei Möglichkeiten, Daten aus der Datenbank zu übernehmen: entweder durch vollständige oder inkrementelle Extraktionen mit SQL oder durch die Nutzung von Funktionen der Datenbank, die die Replikation auf andere Knotenpunkte unterstützen. Im Fall von Postgres gibt es mehrere Möglichkeiten, aber dieses Kapitel konzentriert sich auf eine Methode: die Umwandlung des Postgres Write-Ahead Log (WAL) in einen Datenstrom.
Wie der vorherige Abschnitt ist auch dieser für diejenigen gedacht, die Daten aus einer bestehenden Postgres-Datenbank einlesen müssen. Wenn du jedoch nur die Codebeispiele ausprobieren möchtest, kannst du Postgres entweder auf deinem lokalen Rechner installieren oder in AWS eine RDS-Instanz verwenden, was ich empfehle. Im vorigen Abschnitt findest du Hinweise zu den bewährten Methoden in Bezug auf Preise und Sicherheit für RDS MySQL, die auch für RDS Postgres gelten.
Die Codebeispiele in diesem Abschnitt sind recht einfach und beziehen sich auf eine Tabelle namens Orders
in einer Postgres-Datenbank. Sobald du eine Postgres-Instanz zur Verfügung hast, kannst du die Tabelle erstellen und einige Beispielzeilen einfügen, indem du die folgenden SQL-Befehle ausführst:
CREATE
TABLE
Orders
(
OrderId
int
,
OrderStatus
varchar
(
30
),
LastUpdated
timestamp
);
INSERT
INTO
Orders
VALUES
(
1
,
'Backordered'
,
'2020-06-01 12:00:00'
);
INSERT
INTO
Orders
VALUES
(
1
,
'Shipped'
,
'2020-06-09 12:00:25'
);
INSERT
INTO
Orders
VALUES
(
2
,
'Shipped'
,
'2020-07-11 3:05:00'
);
INSERT
INTO
Orders
VALUES
(
1
,
'Shipped'
,
'2020-06-09 11:50:00'
);
INSERT
INTO
Orders
VALUES
(
3
,
'Shipped'
,
'2020-07-12 12:00:00'
);
Vollständige oder inkrementelle Postgres-Tabellenextraktion
Diese Methode ähnelt den vollständigen, inkrementellen und vollständigen Extraktionen, die in "Daten aus einer MySQL-Datenbank extrahieren" gezeigt werden . Sie ist so ähnlich, dass ich hier nur auf einen Unterschied im Code eingehen werde. Wie das Beispiel in diesem Abschnitt extrahiert auch dieses Beispiel Daten aus einer Tabelle namens Orders
in einer Quelldatenbank, schreibt sie in eine CSV-Datei und lädt sie dann in einen S3-Bucket hoch.
Der einzige Unterschied in diesem Abschnitt ist die Python-Bibliothek, die ich zum Extrahieren der Daten verwenden werde. Anstelle von PyMySQL
verwende ich pyscopg2
, um eine Verbindung zu einer Postgres-Datenbank herzustellen. Wenn du sie noch nicht installiert hast, kannst du das mit pip
tun:
(env) $ pip install pyscopg2
Außerdem musst du der Datei pipeline.conf einen neuen Abschnitt mit den Verbindungsinformationen für die Postgres-Datenbank hinzufügen:
[postgres_config] host = myhost.com port = 5432 username = my_username password = my_password database = db_name
Der Code zum Ausführen der vollständigen Extraktion der Tabelle Orders
ist fast identisch mit dem Beispiel aus dem MySQL-Abschnitt, aber wie du sehen kannst, verwendet er pyscopg2
, um sich mit der Quelldatenbank zu verbinden und die Abfrage auszuführen. Hier ist der vollständige Code:
import
psycopg2
import
csv
import
boto3
import
configparser
parser
=
configparser
.
ConfigParser
()
parser
.
read
(
"pipeline.conf"
)
dbname
=
parser
.
get
(
"postgres_config"
,
"database"
)
user
=
parser
.
get
(
"postgres_config"
,
"username"
)
password
=
parser
.
get
(
"postgres_config"
,
"password"
)
host
=
parser
.
get
(
"postgres_config"
,
"host"
)
port
=
parser
.
get
(
"postgres_config"
,
"port"
)
conn
=
psycopg2
.
connect
(
"dbname="
+
dbname
+
" user="
+
user
+
" password="
+
password
+
" host="
+
host
,
port
=
port
)
m_query
=
"SELECT * FROM Orders;"
local_filename
=
"order_extract.csv"
m_cursor
=
conn
.
cursor
()
m_cursor
.
execute
(
m_query
)
results
=
m_cursor
.
fetchall
()
with
open
(
local_filename
,
'w'
)
as
fp
:
csv_w
=
csv
.
writer
(
fp
,
delimiter
=
'|'
)
csv_w
.
writerows
(
results
)
fp
.
close
()
m_cursor
.
close
()
conn
.
close
()
# load the aws_boto_credentials values
parser
=
configparser
.
ConfigParser
()
parser
.
read
(
"pipeline.conf"
)
access_key
=
parser
.
get
(
"aws_boto_credentials"
,
"access_key"
)
secret_key
=
parser
.
get
(
"aws_boto_credentials"
,
"secret_key"
)
bucket_name
=
parser
.
get
(
"aws_boto_credentials"
,
"bucket_name"
)
s3
=
boto3
.
client
(
's3'
,
aws_access_key_id
=
access_key
,
aws_secret_access_key
=
secret_key
)
s3_file
=
local_filename
s3
.
upload_file
(
local_filename
,
bucket_name
,
s3_file
)
Das Ändern der inkrementellen Version, die im Abschnitt MySQL gezeigt wird, ist genauso einfach. Alles, was du tun musst, ist, psycopg2
statt PyMySQL
zu verwenden.
Replikation von Daten mit dem Write-Ahead Log
Wie das MySQL-Binlog (wie im vorherigen Abschnitt beschrieben) kann das Postgres-WAL als CDC-Methode für die Extraktion verwendet werden. Wie das MySQL-Binlog ist auch die Verwendung des WAL für die Dateneingabe in eine Pipeline recht komplex.
Obwohl du einen ähnlichen, vereinfachten Ansatz wählen kannst, wie er im Beispiel mit dem MySQL-Binlog verwendet wurde, schlage ich vor, eine verteilte Open-Source-Plattform namens Debezium zu verwenden, um den Inhalt des Postgres-WAL in einen S3-Bucket oder ein Data Warehouse zu streamen.
Obwohl die Einzelheiten der Konfiguration und des Betriebs von Debezium-Diensten ein Thema sind, dem ein ganzes Buch gewidmet werden sollte, gebe ich in "Streaming Data Ingestions with Kafka and Debezium" einen Überblick über Debezium und wie es für Dateningestionen verwendet werden kann . Dort erfährst du auch mehr darüber, wie es für Postgres CDC genutzt werden kann.
Daten aus der MongoDB extrahieren
Dieses Beispiel zeigt, wie du eine Teilmenge von MongoDB-Dokumenten aus einer Sammlung extrahieren kannst. In dieser MongoDB-Beispielsammlung stellen die Dokumente Ereignisse dar, die von einem System wie z. B. einem Webserver aufgezeichnet wurden. Jedes Dokument hat einen Zeitstempel, wann es erstellt wurde, sowie eine Reihe von Eigenschaften, aus denen der Beispielcode eine Teilmenge extrahiert. Nachdem die Extraktion abgeschlossen ist, werden die Daten in eine CSV-Datei geschrieben und in einem S3-Bucket gespeichert, damit sie in einem späteren Schritt in ein Data Warehouse geladen werden können (siehe Kapitel 5).
Um eine Verbindung zur MongoDB-Datenbank herzustellen, musst du zunächst die PyMongo-Bibliothek installieren. Wie andere Python-Bibliotheken auch, kannst du sie mit pip
installieren:
(env) $ pip install pymongo
Du kannst den folgenden Beispielcode natürlich abändern, um dich mit deiner eigenen MongoDB-Instanz zu verbinden und Daten aus deinen Dokumenten zu extrahieren. Wenn du das Beispiel jedoch so ausführen möchtest, wie es ist, kannst du mit MongoDB Atlas kostenlos einen MongoDB-Cluster erstellen. Atlas ist ein vollständig verwalteter MongoDB-Dienst und umfasst eine kostenlose Stufe mit viel Speicherung und Rechenleistung für das Lernen und Ausführen von Beispielen wie dem von mir bereitgestellten. Für den produktiven Einsatz kannst du auf einen kostenpflichtigen Plan upgraden.
In dieser Anleitung erfährst du, wie du einen kostenlosen MongoDB-Cluster in Atlas erstellst, eine Datenbank einrichtest und sie so konfigurierst, dass du dich über ein Python-Skript, das auf deinem lokalen Rechner läuft, verbinden kannst.
Du musst eine weitere Python-Bibliothek mit dem Namen dnspython
installieren, um pymongo
bei der Verbindung zu deinem in MongoDB Atlas gehosteten Cluster zu unterstützen. Du kannst sie mit pip
installieren:
(env) $ pip install dnspython
Als Nächstes fügst du der Datei pipeline.conf einen neuen Abschnitt mit den Verbindungsinformationen für die MongoDB-Instanz hinzu, aus der du Daten extrahieren willst. Fülle jede Zeile mit deinen eigenen Verbindungsdaten aus. Wenn du MongoDB Atlas verwendest und dich nicht mehr an diese Werte von der Einrichtung deines Clusters erinnerst, kannst du in den Atlas-Dokumenten nachlesen, wie du sie findest.
[mongo_config] hostname = my_host.com username = mongo_user password = mongo_password database = my_database collection = my_collection
Bevor du das Extraktionsskript erstellst und ausführst, kannst du einige Beispieldaten einfügen, mit denen du arbeiten kannst. Erstelle eine Datei namens sample_mongodb.py mit dem folgenden Code:
from
pymongo
import
MongoClient
import
datetime
import
configparser
# load the mongo_config values
parser
=
configparser
.
ConfigParser
()
parser
.
read
(
"pipeline.conf"
)
hostname
=
parser
.
get
(
"mongo_config"
,
"hostname"
)
username
=
parser
.
get
(
"mongo_config"
,
"username"
)
password
=
parser
.
get
(
"mongo_config"
,
"password"
)
database_name
=
parser
.
get
(
"mongo_config"
,
"database"
)
collection_name
=
parser
.
get
(
"mongo_config"
,
"collection"
)
mongo_client
=
MongoClient
(
"mongodb+srv://"
+
username
+
":"
+
password
+
"@"
+
hostname
+
"/"
+
database_name
+
"?retryWrites=true&"
+
"w=majority&ssl=true&"
+
"ssl_cert_reqs=CERT_NONE"
)
# connect to the db where the collection resides
mongo_db
=
mongo_client
[
database_name
]
# choose the collection to query documents from
mongo_collection
=
mongo_db
[
collection_name
]
event_1
=
{
"event_id"
:
1
,
"event_timestamp"
:
datetime
.
datetime
.
today
(),
"event_name"
:
"signup"
}
event_2
=
{
"event_id"
:
2
,
"event_timestamp"
:
datetime
.
datetime
.
today
(),
"event_name"
:
"pageview"
}
event_3
=
{
"event_id"
:
3
,
"event_timestamp"
:
datetime
.
datetime
.
today
(),
"event_name"
:
"login"
}
# insert the 3 documents
mongo_collection
.
insert_one
(
event_1
)
mongo_collection
.
insert_one
(
event_2
)
mongo_collection
.
insert_one
(
event_3
)
Wenn du sie ausführst, werden die drei Dokumente in deine MongoDB-Sammlung eingefügt:
(env) $ python sample_mongodb.py
Erstelle nun ein neues Python-Skript mit dem Namen mongo_extract.py, in das du die folgenden Codeblöcke einfügen kannst.
Importiere zunächst PyMongo und Boto3, damit du Daten aus der MongoDB-Datenbank extrahieren und die Ergebnisse in einem S3-Bucket speichern kannst. Importiere auch die csv
Bibliothek, damit du die extrahierten Daten strukturieren und in eine flache Datei schreiben kannst, die du im Ladeschritt des Ingestion in ein Data Warehouse importieren kannst. Schließlich brauchst du für dieses Beispiel noch einige datetime
Funktionen, mit denen du die Beispiel-Ereignisdaten in der MongoDB-Sammlung durchlaufen kannst:
from
pymongo
import
MongoClient
import
csv
import
boto3
import
datetime
from
datetime
import
timedelta
import
configparser
Als Nächstes verbindest du dich mit der MongoDB-Instanz, die in der Datei pipelines.conf
angegeben ist, und erstellst ein collection
Objekt, in dem die Dokumente, die du extrahieren möchtest, gespeichert werden:
# load the mongo_config values
parser
=
configparser
.
ConfigParser
()
parser
.
read
(
"pipeline.conf"
)
hostname
=
parser
.
get
(
"mongo_config"
,
"hostname"
)
username
=
parser
.
get
(
"mongo_config"
,
"username"
)
password
=
parser
.
get
(
"mongo_config"
,
"password"
)
database_name
=
parser
.
get
(
"mongo_config"
,
"database"
)
collection_name
=
parser
.
get
(
"mongo_config"
,
"collection"
)
mongo_client
=
MongoClient
(
"mongodb+srv://"
+
username
+
":"
+
password
+
"@"
+
hostname
+
"/"
+
database_name
+
"?retryWrites=true&"
+
"w=majority&ssl=true&"
+
"ssl_cert_reqs=CERT_NONE"
)
# connect to the db where the collection resides
mongo_db
=
mongo_client
[
database_name
]
# choose the collection to query documents from
mongo_collection
=
mongo_db
[
collection_name
]
Jetzt ist es an der Zeit, die zu extrahierenden Dokumente abzufragen. Dazu rufst du die Funktion .find()
auf mongo_collection
auf, um die gesuchten Dokumente abzufragen. Im folgenden Beispiel werden alle Dokumente abgefragt, deren Wert im Feld event_timestamp
zwischen zwei im Skript definierten Daten liegt.
Hinweis
Das Extrahieren von unveränderlichen Daten wie Protokolldatensätzen oder allgemeinen "Ereignis"-Datensätzen aus einem Datenspeicher nach Datumsbereich ist ein häufiger Anwendungsfall. Obwohl der Beispielcode einen im Skript definierten Datumsbereich verwendet, ist es wahrscheinlicher, dass du dem Skript einen Datumsbereich übergibst oder das Skript dein Data Warehouse abfragen lässt, um den Zeitpunkt des letzten geladenen Ereignisses zu ermitteln und die nachfolgenden Datensätze aus dem Quelldatenspeicher zu extrahieren. Ein Beispiel dafür findest du unter "Extrahieren von Daten aus einer MySQL-Datenbank".
start_date
=
datetime
.
datetime
.
today
()
+
timedelta
(
days
=
-
1
)
end_date
=
start_date
+
timedelta
(
days
=
1
)
mongo_query
=
{
"$and"
:[{
"event_timestamp"
:
{
"$gte"
:
start_date
}},
{
"event_timestamp"
:
{
"$lt"
:
end_date
}}]
}
event_docs
=
mongo_collection
.
find
(
mongo_query
,
batch_size
=
3000
)
Hinweis
Der Parameter batch_size
ist in diesem Beispiel auf 3000
gesetzt. PyMongo macht für jeden Batch einen Roundtrip zum MongoDB-Host. Wenn result_docs Cursor
z. B. 6.000 Ergebnisse enthält, sind zwei Fahrten zum MongoDB-Host nötig, um alle Dokumente auf den Rechner zu bringen, auf dem dein Python-Skript läuft. Welchen Wert du als Stapelgröße festlegst, ist dir überlassen und hängt von dem Kompromiss ab, mehr Dokumente im Speicher des Systems zu speichern, auf dem der Extrakt ausgeführt wird, anstatt viele Fahrten zur MongoDB-Instanz zu unternehmen.
Das Ergebnis des vorangegangenen Codes ist eine Cursor
mit dem Namen event_docs
, die ich verwenden werde, um die resultierenden Dokumente zu durchlaufen. Erinnere dich daran, dass in diesem vereinfachten Beispiel jedes Dokument ein Ereignis darstellt, das von einem System wie z. B. einem Webserver erzeugt wurde. Ein Ereignis kann z. B. das Einloggen eines Benutzers, das Betrachten einer Seite oder das Absenden eines Feedback-Formulars sein. Obwohl die Dokumente Dutzende von Feldern haben könnten, um z. B. den Browser darzustellen, mit dem sich der Nutzer angemeldet hat, nehme ich für dieses Beispiel nur ein paar Felder:
# create a blank list to store the results
all_events
=
[]
# iterate through the cursor
for
doc
in
event_docs
:
# Include default values
event_id
=
str
(
doc
.
get
(
"event_id"
,
-
1
))
event_timestamp
=
doc
.
get
(
"event_timestamp"
,
None
)
event_name
=
doc
.
get
(
"event_name"
,
None
)
# add all the event properties into a list
current_event
=
[]
current_event
.
append
(
event_id
)
current_event
.
append
(
event_timestamp
)
current_event
.
append
(
event_name
)
# add the event to the final list of events
all_events
.
append
(
current_event
)
Ich füge einen Standardwert in den doc.get()
Funktionsaufruf ein (-1 oder None). Warum das? Es liegt in der Natur von unstrukturierten Dokumentdaten, dass Felder in einem Dokument fehlen können. Du kannst also nicht davon ausgehen, dass jedes der Dokumente, die du durchgehst, ein Feld "event_name" oder ein anderes Feld enthält. In solchen Fällen kannst du doc.get()
anweisen, einen None
Wert zurückzugeben, anstatt einen Fehler zu melden.
Nachdem du alle Ereignisse in event_docs
durchlaufen hast, ist die Liste all_events
bereit, in eine CSV-Datei geschrieben zu werden. Dazu verwendest du das Modul csv
, das in der Standard-Python-Distribution enthalten ist und bereits in diesem Beispiel importiert wurde:
export_file
=
"export_file.csv"
with
open
(
export_file
,
'w'
)
as
fp
:
csvw
=
csv
.
writer
(
fp
,
delimiter
=
'|'
)
csvw
.
writerows
(
all_events
)
fp
.
close
()
Nun lädst du die CSV-Datei in den S3-Bucket hoch, den du unter "Einrichten der Cloud-Dateispeicherung" konfiguriert hast . Verwende dazu die Boto3-Bibliothek:
# load the aws_boto_credentials values
parser
=
configparser
.
ConfigParser
()
parser
.
read
(
"pipeline.conf"
)
access_key
=
parser
.
get
(
"aws_boto_credentials"
,
"access_key"
)
secret_key
=
parser
.
get
(
"aws_boto_credentials"
,
"secret_key"
)
bucket_name
=
parser
.
get
(
"aws_boto_credentials"
,
"bucket_name"
)
s3
=
boto3
.
client
(
's3'
,
aws_access_key_id
=
access_key
,
aws_secret_access_key
=
secret_key
)
s3_file
=
export_file
s3
.
upload_file
(
export_file
,
bucket_name
,
s3_file
)
Das war's! Die Daten, die du aus der MongoDB-Sammlung extrahiert hast, befinden sich jetzt im S3-Bucket und warten darauf, in das Data Warehouse oder einen anderen Datenspeicher geladen zu werden. Wenn du die mitgelieferten Beispieldaten verwendet hast, sieht der Inhalt von export_file.csv etwa so aus:
1|2020-12-13 11:01:37.942000|signup 2|2020-12-13 11:01:37.942000|pageview 3|2020-12-13 11:01:37.942000|login
In Kapitel 5 erfährst du mehr über das Laden der Daten in den Datenspeicher deiner Wahl.
Daten aus einer REST API extrahieren
REST-APIs sind eine gängige Quelle, um Daten zu extrahieren. Möglicherweise musst du Daten von einer API übernehmen, die dein Unternehmen erstellt und verwaltet, oder von einer API eines externen Dienstes oder Anbieters, den dein Unternehmen nutzt, wie Salesforce, HubSpot oder Twitter. Unabhängig von der API gibt es ein gemeinsames Muster für die Datenextraktion, das ich in dem folgenden einfachen Beispiel verwenden werde:
-
Akzeptiere die Antwort, die höchstwahrscheinlich in JSON formatiert ist.
-
Analysiere die Antwort und wandle sie in eine CSV-Datei um, die du später in dein Data Warehouse laden kannst.
Hinweis
Obwohl ich die JSON-Antwort analysiere und in einer flachen Datei (CSV) speichere, kannst du die Daten auch im JSON-Format speichern und in dein Data Warehouse laden. Der Einfachheit halber bleibe ich bei dem Muster dieses Kapitels und verwende CSV-Dateien. Weitere Informationen zum Laden von Daten in einem anderen Format als CSV findest du in Kapitel 5 oder in deiner Data Warehouse-Dokumentation.
In diesem Beispiel werde ich mich mit einer API namens Open Notify verbinden. Die API hat mehrere Endpunkte, die jeweils Daten von der NASA über Ereignisse im Weltraum zurückgeben. Ich frage den Endpunkt ab, der die nächsten fünf Vorbeiflüge der Internationalen Raumstation (ISS) an einem bestimmten Ort auf der Erde anzeigt.
Bevor ich dir den Python-Code für die Abfrage des Endpunkts zeige, kannst du sehen, wie die Ausgabe einer einfachen Abfrage aussieht, indem du die folgende URL in deinen Browser eingibst:
http://api.open-notify.org/iss-pass.json?lat=42.36&lon=71.05
Das resultierende JSON sieht wie folgt aus:
{ "message": "success", "request": { "altitude": 100, "datetime": 1596384217, "latitude": 42.36, "longitude": 71.05, "passes": 5 }, "response": [ { "duration": 623, "risetime": 1596384449 }, { "duration": 169, "risetime": 1596390428 }, { "duration": 482, "risetime": 1596438949 }, { "duration": 652, "risetime": 1596444637 }, { "duration": 624, "risetime": 1596450474 } ] }
Das Ziel dieser Extraktion ist es, die Daten in der Antwort abzurufen und sie in einer CSV-Datei mit einer Zeile für jede Zeit und Dauer jedes Überflugs, den die ISS über das Längen-/Breitengradpaar macht, zu formatieren. Die ersten beiden Zeilen der CSV-Datei lauten zum Beispiel wie folgt:
42.36,|71.05|623|1596384449 42.36,|71.05|169|1596390428
Um die API abzufragen und die Antworten in Python zu verarbeiten, musst du die Bibliothek requests
installieren. requests
macht die Arbeit mit HTTP-Anfragen und -Antworten in Python einfach. Du kannst sie mit pip
installieren:
(env) $ pip install requests
Jetzt kannst du requests
verwenden, um den API-Endpunkt abzufragen, die Antwort zurückzubekommen und das resultierende JSON auszudrucken, das so aussieht, wie du es in deinem Browser gesehen hast:
import
requests
lat
=
42.36
lon
=
71.05
lat_log_params
=
{
"lat"
:
lat
,
"lon"
:
lon
}
api_response
=
requests
.
get
(
"http://api.open-notify.org/iss-pass.json"
,
params
=
lat_log_params
)
(
api_response
.
content
)
Anstatt das JSON auszudrucken, werde ich die Antwort durchlaufen, die Werte für Dauer und Anstiegszeit auslesen, die Ergebnisse in eine CSV-Datei schreiben und die Datei in den S3-Bucket hochladen.
Um die JSON-Antwort zu parsen, importiere ich die Python-Bibliothek json
. Du brauchst sie nicht zu installieren, da sie in der Standard-Python-Installation enthalten ist. Als Nächstes importiere ich die Bibliothek csv
, die auch in der Standard-Python-Distribution enthalten ist, um die CSV-Datei zu schreiben. Schließlich verwende ich die Bibliothek configparser
, um die Anmeldeinformationen zu erhalten, die die Boto3-Bibliothek benötigt, um die CSV-Datei in den S3-Bucket hochzuladen:
import
requests
import
json
import
configparser
import
csv
import
boto3
Als Nächstes fragst du die API ab, wie du es zuvor getan hast:
lat
=
42.36
lon
=
71.05
lat_log_params
=
{
"lat"
:
lat
,
"lon"
:
lon
}
api_response
=
requests
.
get
(
"http://api.open-notify.org/iss-pass.json"
,
params
=
lat_log_params
)
Jetzt ist es an der Zeit, die Antwort zu durchlaufen, die Ergebnisse in einer Python-Datei list
namens all_passes
zu speichern und die Ergebnisse in einer CSV-Datei zu speichern. Beachte, dass ich auch den Breiten- und Längengrad aus der Anfrage speichere, obwohl sie nicht in der Antwort enthalten sind. Sie werden in jeder Zeile der CSV-Datei benötigt, damit die Durchgangszeiten beim Laden in das Data Warehouse mit dem richtigen Längen- und Breitengrad verknüpft werden:
# create a json object from the response content
response_json
=
json
.
loads
(
api_response
.
content
)
all_passes
=
[]
for
response
in
response_json
[
'response'
]:
current_pass
=
[]
#store the lat/log from the request
current_pass
.
append
(
lat
)
current_pass
.
append
(
lon
)
# store the duration and risetime of the pass
current_pass
.
append
(
response
[
'duration'
])
current_pass
.
append
(
response
[
'risetime'
])
all_passes
.
append
(
current_pass
)
export_file
=
"export_file.csv"
with
open
(
export_file
,
'w'
)
as
fp
:
csvw
=
csv
.
writer
(
fp
,
delimiter
=
'|'
)
csvw
.
writerows
(
all_passes
)
fp
.
close
()
Zum Schluss lädst du die CSV-Datei mithilfe der Boto3-Bibliothek in den S3-Bucket hoch:
# load the aws_boto_credentials values
parser
=
configparser
.
ConfigParser
()
parser
.
read
(
"pipeline.conf"
)
access_key
=
parser
.
get
(
"aws_boto_credentials"
,
"access_key"
)
secret_key
=
parser
.
get
(
"aws_boto_credentials"
,
"secret_key"
)
bucket_name
=
parser
.
get
(
"aws_boto_credentials"
,
"bucket_name"
)
s3
=
boto3
.
client
(
's3'
,
aws_access_key_id
=
access_key
,
aws_secret_access_key
=
secret_key
)
s3
.
upload_file
(
export_file
,
bucket_name
,
export_file
)
Streaming Data Ingestions mit Kafka und Debezium
Wenn es darum geht, Daten aus einem CDC-System wie MySQL-Binlogs oder Postgres-WALs zu importieren, gibt es keine einfache Lösung ohne die Hilfe eines guten Frameworks.
Debezium ist ein verteiltes System , das aus mehreren Open-Source-Diensten besteht, die Änderungen auf Zeilenebene von gängigen CDC-Systemen erfassen und diese dann als Ereignisse streamen, die von anderen Systemen genutzt werden können. Eine Debezium-Installation besteht aus drei Hauptkomponenten:
-
Apache Zookeeper verwaltet die verteilte Umgebung und übernimmt die Konfiguration der einzelnen Dienste.
-
Apache Kafka ist eine verteilte Streaming-Plattform, die häufig zum Aufbau hochskalierbarer Datenpipelines verwendet wird.
-
Apache Kafka Connect ist ein Tool, um Kafka mit anderen Systemen zu verbinden, damit die Daten einfach über Kafka gestreamt werden können. Konnektoren werden für Systeme wie MySQL und Postgres gebaut und verwandeln Daten aus deren CDC-Systemen (binlogs und WAL) in Kakfa-Themen.
Kafka tauscht Nachrichten aus, die nach Themen geordnet sind. Ein System kann ein Thema veröffentlichen, während ein oder mehrere Systeme das Thema konsumieren oder abonnieren können.
Debezium verbindet diese Systeme miteinander und enthält Konnektoren für gängige CDC-Implementierungen. Die Herausforderungen für CDC habe ich zum Beispiel in "Daten aus einer MySQL-Datenbank extrahieren" und "Daten aus einer PostgreSQL-Datenbank extrahieren" beschrieben . Glücklicherweise gibt es bereits Konnektoren, die auf das MySQL binlog und das Postgres WAL "hören". Die Daten werden dann als Datensätze in einem Topic durch Kakfa geleitet und über einen anderen Konnektor in ein Ziel wie ein S3-Bucket, Snowflake oder Redshift Data Warehouse übertragen. Abbildung 4-1 zeigt ein Beispiel für die Verwendung von Debezium und seinen einzelnen Komponenten, um die von einem MySQL-Binlog erzeugten Ereignisse an ein Snowflake-Datawarehouse zu senden.
Zum Zeitpunkt der Erstellung dieses Dokuments gibt es bereits eine Reihe von Debezium-Konnektoren für die Quellsysteme von , die du möglicherweise benötigen wirst:
-
MongoDB
-
MySQL
-
PostgreSQL
-
Microsoft SQL Server
-
Oracle
-
Db2
-
Cassandra
Außerdem gibt es Kafka Connect-Konnektoren für die gängigsten Data Warehouses und Speicherungen wie S3 und Snowflake.
Obwohl Debezium und Kafka selbst ein Thema sind, das ein eigenes Buch rechtfertigt, möchte ich auf den Wert von Debezium hinweisen, wenn du dich entscheidest, dass du CDC als Methode für die Dateneingabe verwenden möchtest. Das einfache Beispiel, das ich im Abschnitt über die MySQL-Extraktion in diesem Kapitel verwendet habe, ist funktional. Wenn du CDC jedoch in großem Maßstab einsetzen willst, empfehle ich dir dringend, etwas wie Debezium zu verwenden, anstatt selbst eine Plattform wie Debezium aufzubauen!
Tipp
Die Debezium-Dokumentation ist hervorragend und ein guter Ausgangspunkt, um das System kennenzulernen.
Get Data Pipelines Pocket Reference now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.