Kapitel 4. Dateneingabe: Daten extrahieren

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

Wie in Kapitel 3 erläutert, ist das ELT-Muster das ideale Design für Datenpipelines, die für Datenanalyse, Data Science und Datenprodukte entwickelt werden. Die ersten beiden Schritte des ELT-Patterns, das Extrahieren und das Laden, werden zusammenfassend als Dateneingabe bezeichnet. In diesem Kapitel geht es darum, wie du deine Entwicklungsumgebung und Infrastruktur für beides einrichtest und wie du Daten aus verschiedenen Quellsystemen extrahierst. Kapitel 5 befasst sich mit dem Laden der resultierenden Datensätze in ein Data Warehouse.

Hinweis

Die Codebeispiele zum Extrahieren und Laden in diesem Kapitel sind vollständig voneinander entkoppelt. Die Koordinierung der beiden Schritte zur Datenübernahme ist ein Thema, das in Kapitel 7 behandelt wird.

Wie in Kapitel 2 erläutert, gibt es zahlreiche Arten von Quellsystemen, aus denen Daten extrahiert werden können, und zahlreiche Zielsysteme, in die sie geladen werden müssen. Außerdem liegen die Daten in vielen verschiedenen Formen vor, die alle unterschiedliche Herausforderungen für die Datenaufnahme mit sich bringen.

Dieses und das nächste Kapitel enthalten Codebeispiele für den Export und die Aufnahme von Daten aus und in gängige Systeme. Der Code ist stark vereinfacht und enthält nur eine minimale Fehlerbehandlung. Jedes Beispiel ist als leicht verständlicher Ausgangspunkt für Datenübernahmen gedacht, aber voll funktionsfähig und erweiterbar für skalierbarere Lösungen.

Hinweis

Die Codebeispiele in diesem Kapitel schreiben die extrahierten Daten in CSV-Dateien, die dann in das Ziel-Datawarehouse geladen werden. Manchmal ist es sinnvoller, die extrahierten Daten vor dem Laden in einem anderen Format, z. B. JSON, zu speichern. Dort, wo es sinnvoll ist, weise ich darauf hin, wo du eine solche Anpassung vornehmen solltest.

In Kapitel 5 werden auch einige Open-Source-Frameworks besprochen, auf denen du aufbauen kannst, sowie kommerzielle Alternativen, die Dateningenieuren und -analysten "Low-Code"-Optionen für das Ingesting von Daten bieten.

Einrichten deiner Python-Umgebung

Alle folgenden Code-Beispiele sind in Python und SQL geschrieben und verwenden Open-Source-Frameworks, die heute im Bereich der Datentechnik üblich sind. Der Einfachheit halber ist die Anzahl der Quellen und Ziele begrenzt. Wo es möglich ist, gebe ich jedoch Hinweise, wie man sie für ähnliche Systeme anpassen kann.

Um den Beispielcode auszuführen, brauchst du eine physische oder virtuelle Maschine mit Python 3.x. Außerdem musst du ein paar Bibliotheken installieren und importieren.

Wenn du Python nicht auf deinem Rechner installiert hast, kannst du die Distribution und den Installer für dein Betriebssystem direkt von ihnen beziehen.

Hinweis

Die folgenden Befehle sind für die Kommandozeile von Linux oder Macintosh geschrieben. Unter Windows musst du eventuell die ausführbare Datei Python 3 zu deinem PATH hinzufügen.

Bevor du die in diesem Kapitel verwendeten Bibliotheken installierst, ist es am besten, eine virtuelle Umgebung zu erstellen, in der du sie installieren kannst. Dazu kannst du ein Tool namens virtualenv verwenden. virtualenv ist hilfreich bei der Verwaltung von Python-Bibliotheken für verschiedene Projekte und Anwendungen. Damit kannst du die Python-Bibliotheken nicht global, sondern nur innerhalb eines bestimmten Bereichs deines Projekts installieren. Erstelle zunächst eine virtuelle Umgebung namens env.

$ python -m venv env

Nachdem du deine virtuelle Umgebung erstellt hast, aktiviere sie mit dem folgenden Befehl:

$ source env/bin/activate

Du kannst auf zwei Arten überprüfen, ob deine virtuelle Umgebung aktiviert ist. Erstens wirst du feststellen, dass der Eingabeaufforderung der Name der Umgebung vorangestellt ist:

(env) $

Du kannst auch den Befehl which python verwenden, um zu überprüfen, wo Python nach Bibliotheken sucht. Du solltest etwa so etwas sehen, das den Pfad des Verzeichnisses der virtuellen Umgebung anzeigt:

(env) $ which python
env/bin/python

Jetzt kannst du die Bibliotheken installieren, die du für die folgenden Codebeispiele brauchst.

Hinweis

Auf einigen Betriebssystemen (OS) musst du python3 anstelle von python verwenden, um die ausführbare Datei Python 3.x auszuführen. Ältere Betriebssysteme verwenden möglicherweise standardmäßig Python 2.x. Du kannst herausfinden, welche Python-Version dein Betriebssystem verwendet, indem du python --version eingibst.

In diesem Kapitel wirst du pip verwenden, um die in den Codebeispielen verwendeten Bibliotheken zu installieren. pip ist ein Tool, das in den meisten Python-Distributionen enthalten ist.

Die erste Bibliothek, die du mit pip installierst, ist configparser, mit der du Konfigurationsinformationen auslesen kannst, die du später in eine Datei einträgst.

(env) $ pip install configparser

Als Nächstes erstellst du eine Datei namens pipeline.conf in demselben Verzeichnis wie die Python-Skripte, die du in den folgenden Abschnitten erstellen wirst. Lass die Datei vorerst leer. In den Codebeispielen in diesem Kapitel wirst du sie ergänzen müssen. In Linux- und Mac-Betriebssystemen kannst du die leere Datei auf der Kommandozeile mit dem folgenden Befehl erstellen:

(env) $ touch pipeline.conf

Einrichten der Cloud-Speicherung

Für jedes Beispiel in diesem Kapitel wirst du einen Amazon Simple Storage Service (Amazon S3 oder einfach S3) Bucket für die Speicherung von Dateien verwenden. S3 wird auf AWS gehostet und ist, wie der Name schon sagt, eine einfache Möglichkeit, Dateien zu speichern und darauf zuzugreifen. Außerdem ist es sehr kosteneffizient. Derzeit bietet AWS mit einem neuen AWS-Konto 5 GB kostenlose S3-Speicherung für 12 Monate an und berechnet danach weniger als 3 Cent pro Monat und Gigabyte für die Standard-S3-Speicherung. Angesichts der Einfachheit der Beispiele in diesem Kapitel kannst du die benötigten Daten kostenlos in S3 speichern, wenn du dich noch in den ersten 12 Monaten nach der Einrichtung eines AWS-Kontos befindest, oder für weniger als 1 USD pro Monat danach.

Um die Beispiele in diesem Kapitel auszuführen, brauchst du einen S3-Bucket. Zum Glück ist es ganz einfach, einen S3-Bucket zu erstellen, und die neuesten Anleitungen findest du in der AWS-Dokumentation. Die Einrichtung der richtigen Zugriffskontrolle für den S3-Bucket hängt davon ab, welches Data Warehouse du verwendest. Im Allgemeinen ist es am besten, AWS Identity and Access Management (IAM) Rollen für die Zugriffsverwaltung zu verwenden. Detaillierte Anweisungen zum Einrichten eines solchen Zugriffs für ein Amazon Redshift- und ein Snowflake-Datawarehouse findest du in den folgenden Abschnitten, aber befolge zunächst die Anleitung zum Erstellen eines neuen Buckets. Benenne ihn, wie du willst. Ich empfehle, die Standardeinstellungen zu verwenden und den Bucket privat zu halten.

Jedes Extraktionsbeispiel extrahiert Daten aus dem angegebenen Quellsystem und speichert die Ausgabe im S3-Bucket. Jedes Ladebeispiel in Kapitel 5 lädt diese Daten aus dem S3-Bucket in das Zielsystem. Dies ist ein gängiges Muster in Datenpipelines. Jeder große öffentliche Cloud-Provider hat einen ähnlichen Dienst wie S3. Äquivalente in anderen öffentlichen Clouds sind Azure Storage in Microsoft Azure und Google Cloud Storage (GCS) in GCP.

Es ist auch möglich, jedes Beispiel so zu ändern, dass es eine lokale oder lokale Speicherung verwendet. Es ist jedoch zusätzlicher Aufwand erforderlich, um Daten aus einer Speicherung außerhalb des Cloud-Providers in dein Data Warehouse zu laden. Unabhängig davon gelten die in diesem Kapitel beschriebenen Muster unabhängig davon, welchen Cloud-Provider du nutzt oder ob du deine Dateninfrastruktur vor Ort hostest.

Bevor ich zu den einzelnen Beispielen übergehe, musst du noch eine weitere Python-Bibliothek installieren, damit deine Skripte zum Extrahieren und Laden mit deinem S3-Bucket interagieren können. Boto3 ist das AWS SDK für Python. Stelle sicher, dass die virtuelle Umgebung, die du im vorherigen Abschnitt eingerichtet hast, aktiv ist, und benutze pip, um sie zu installieren:

(env) $ pip install boto3

In den folgenden Beispielen wirst du aufgefordert, boto3 wie folgt in deine Python-Skripte zu importieren:

import boto3

Da du die boto3 Python-Bibliothek verwendest, um mit deinem S3-Bucket zu interagieren, musst du auch einen IAM-Benutzer anlegen, Zugriffsschlüssel für diesen Benutzer erstellen und die Schlüssel in einer Konfigurationsdatei speichern, aus der deine Python-Skripte lesen können. Das alles ist notwendig, damit deine Skripte die Berechtigung haben, Dateien in deinem S3-Bucket zu lesen und zu schreiben.

Zuerst erstellst du den IAM-Benutzer:

  1. Navigiere im Menü Services in der AWS-Konsole (oder in der oberen Navigationsleiste) zu IAM.

  2. Klicke im Navigationsbereich auf Benutzer und dann auf "Benutzer hinzufügen". Gib den Benutzernamen für den neuen Benutzer ein. In diesem Beispiel nennst du den Benutzer data_pipeline_readwrite.

  3. Klicke auf die Art des Zugriffs für diesen IAM-Benutzer. Klicke auf "programmatischer Zugriff", da dieser Benutzer sich nicht bei der AWS-Konsole anmelden muss, sondern über Python-Skripte programmatisch auf AWS-Ressourcen zugreift.

  4. Klicke auf Weiter: Berechtigungen.

  5. Auf der Seite "Berechtigungen festlegen" klickst du auf die Option "Vorhandene Richtlinien direkt an den Benutzer anhängen". Füge die Richtlinie AmazonS3FullAccess hinzu.

  6. Klicke auf Weiter: Tags. Es ist eine bewährte Methode in AWS, verschiedene Objekte und Services mit Tags zu versehen, damit du sie später wiederfinden kannst. Dies ist jedoch optional.

  7. Klicke auf Weiter: Überprüfen, um deine Einstellungen zu überprüfen. Wenn alles gut aussieht, klicke auf "Benutzer erstellen".

  8. Du musst die Zugangsschlüssel-ID und den geheimen Zugangsschlüssel für den neuen IAM-Benutzer speichern. Dazu klickst du auf Download.csv und speicherst die Datei an einem sicheren Ort, damit du sie gleich verwenden kannst.

Zum Schluss fügst du der Datei pipeline.conf einen Abschnitt namens [aws_boto_credentials] hinzu, in dem du die Zugangsdaten für den IAM-Benutzer und die Informationen zum S3-Bucket speicherst. Die ID deines AWS-Kontos findest du, indem du auf deinen Kontonamen oben rechts auf einer beliebigen Seite klickst, wenn du auf der AWS-Website angemeldet bist. Verwende den Namen des S3-Buckets, den du zuvor erstellt hast, für den Wert bucket_name. Der neue Abschnitt in der pipline.conf sieht dann so aus:

[aws_boto_credentials]
access_key = ijfiojr54rg8er8erg8erg8
secret_key = 5r4f84er4ghrg484eg84re84ger84
bucket_name = pipeline-bucket
account_id = 4515465518

Daten aus einer MySQL-Datenbank extrahieren

Das Extrahieren von Daten aus einer MySQL-Datenbank kann auf zwei Arten erfolgen:

  • Vollständige oder inkrementelle Extraktion mit SQL

  • Binär-Log (binlog) Replikation

Die vollständige oder inkrementelle Extraktion mit SQL ist viel einfacher zu implementieren, aber auch weniger skalierbar für große Datensätze mit häufigen Änderungen. Außerdem gibt es Kompromisse zwischen vollständiger und inkrementeller Extraktion, auf die ich im folgenden Abschnitt eingehe.

Die binäre Log-Replikation ist zwar komplexer zu implementieren, eignet sich aber besser für Fälle, in denen das Datenvolumen der Änderungen in den Quelltabellen hoch ist oder in denen häufigere Dateneingaben aus der MySQL-Quelle erforderlich sind.

Hinweis

Die Binlog-Replikation ist auch ein Weg, um einen Streaming Data Ingestion zu erstellen. Im Abschnitt "Batch versus Stream Ingestion" in diesem Kapitel findest du weitere Informationen zum Unterschied zwischen den beiden Ansätzen und zu den Implementierungsmustern.

Dieser Abschnitt ist für diejenigen Leser relevant, die eine MySQL-Datenquelle haben, aus der sie Daten extrahieren müssen. Wenn du jedoch eine einfache Datenbank einrichten möchtest, um die Codebeispiele auszuprobieren, hast du zwei Möglichkeiten. Erstens kannst du MySQL kostenlos auf deinem lokalen Rechner oder deiner virtuellen Maschine installieren. Ein Installationsprogramm für dein Betriebssystem findest du auf der MySQL-Downloadseite.

Alternativ kannst du auch eine vollständig verwaltete Amazon RDS for MySQL-Instanz in AWS erstellen. Ich finde diese Methode unkomplizierter, und es ist schön, dass ich auf meinem lokalen Rechner kein unnötiges Chaos anrichten muss!

Warnung

Wenn du den verlinkten Anweisungen folgst, um eine MySQL RDS-Datenbankinstanz einzurichten, wirst du aufgefordert, deine Datenbank als öffentlich zugänglich einzustellen. Das ist für das Lernen und Arbeiten mit Beispieldaten genau richtig. Es macht es sogar viel einfacher, sich von dem Rechner aus zu verbinden, auf dem du die Beispiele in diesem Abschnitt ausführst. Für mehr Sicherheit in einer Produktionsumgebung empfehle ich dir jedoch, die bewährten Methoden für Amazon RDS zu befolgen.

Beachte, dass genau wie bei den oben erwähnten S3-Preisen Kosten anfallen, wenn du nicht mehr für den kostenlosen Tier von AWS in Frage kommst. Ansonsten ist das Einrichten und Betreiben kostenlos! Erinnere dich nur daran, deine RDS-Instanz zu löschen, wenn du fertig bist, damit du es nicht vergisst und dir Gebühren entstehen, wenn dein kostenloses Angebot ausläuft.

Die Codebeispiele in diesem Abschnitt sind recht einfach und beziehen sich auf eine Tabelle namens Orders in einer MySQL-Datenbank. Sobald du eine MySQL-Instanz zur Verfügung hast, kannst du die Tabelle erstellen und einige Beispielzeilen einfügen, indem du die folgenden SQL-Befehle ausführst:

CREATE TABLE Orders (
  OrderId int,
  OrderStatus varchar(30),
  LastUpdated timestamp
);

INSERT INTO Orders
  VALUES(1,'Backordered', '2020-06-01 12:00:00');
INSERT INTO Orders
  VALUES(1,'Shipped', '2020-06-09 12:00:25');
INSERT INTO Orders
  VALUES(2,'Shipped', '2020-07-11 3:05:00');
INSERT INTO Orders
  VALUES(1,'Shipped', '2020-06-09 11:50:00');
INSERT INTO Orders
  VALUES(3,'Shipped', '2020-07-12 12:00:00');

Vollständige oder inkrementelle MySQL-Tabellenextraktion

Wenn du entweder alle oder eine Teilmenge von Spalten aus einer MySQL-Tabelle in ein Data Warehouse oder einen Data Lake übernehmen willst, kannst du entweder die vollständige oder die inkrementelle Extraktion verwenden.

Bei einer vollständigen Extraktion wird jeder Datensatz der Tabelle bei jedem Durchlauf des Extraktionsauftrags extrahiert. Dies ist die am wenigsten komplexe Methode, kann aber bei Tabellen mit hohem Datenaufkommen sehr lange dauern. Wenn du z. B. eine vollständige Extraktion für eine Tabelle namens Orders durchführen möchtest, sieht das SQL, das in der MySQL-Quelldatenbank ausgeführt wird, wie folgt aus:

SELECT *
FROM Orders;

Bei einer inkrementellen Extraktion werden nur die Datensätze aus der Quelltabelle extrahiert, die sich seit dem letzten Lauf des Auftrags geändert haben oder neu hinzugekommen sind. Der Zeitstempel der letzten Extraktion kann entweder in einer Extraktionsauftragsprotokolltabelle im Data Warehouse gespeichert oder durch Abfrage des maximalen Zeitstempels in einer LastUpdated Spalte in der Zieltabelle im Warehouse abgerufen werden. Am Beispiel der fiktiven Tabelle Orders sieht die SQL-Abfrage, die in der MySQL-Quelldatenbank ausgeführt wird, wie folgt aus:

SELECT *
FROM Orders
WHERE LastUpdated > {{ last_extraction_run} };
Hinweis

Für Tabellen, die unveränderliche Daten enthalten (d.h. Datensätze können eingefügt, aber nicht aktualisiert werden), kannst du anstelle einer LastUpdated Spalte den Zeitstempel verwenden, der angibt, wann der Datensatz erstellt wurde.

Die Variable {{ last_extraction_run }} ist ein Zeitstempel, der den letzten Lauf des Extraktionsauftrags angibt. In der Regel wird sie von der Zieltabelle im Data Warehouse abgefragt. In diesem Fall würde das folgende SQL im Data Warehouse ausgeführt und der daraus resultierende Wert für {{ last_extraction_run }} verwendet:

SELECT MAX(LastUpdated)
FROM warehouse.Orders;

Obwohl die inkrementelle Extraktion ideal für eine optimale Leistung ist, gibt es einige Nachteile und Gründe warum sie für eine bestimmte Tabelle nicht möglich ist.

Erstens: Bei dieser Methode werden gelöschte Zeilen nicht erfasst. Wenn eine Zeile aus der MySQL-Quelltabelle gelöscht wird, erfährst du das nicht und sie bleibt in der Zieltabelle, als hätte sich nichts geändert.

Zweitens muss die Quelltabelle einen verlässlichen Zeitstempel haben, wann sie zuletzt aktualisiert wurde (die Spalte LastUpdated im vorherigen Beispiel). Es ist nicht ungewöhnlich, dass in den Tabellen des Quellsystems eine solche Spalte fehlt oder dass sie nicht zuverlässig aktualisiert wird. Nichts hindert Entwickler daran, Datensätze in der Quelltabelle zu aktualisieren und dabei zu vergessen, den Zeitstempel LastUpdated zu aktualisieren.

Die inkrementelle Extraktion macht es jedoch einfacher, aktualisierte Zeilen zu erfassen. Wenn in den folgenden Codebeispielen eine bestimmte Zeile in der Tabelle Orders aktualisiert wird, bringen sowohl die vollständige als auch die inkrementelle Extraktion die neueste Version der Zeile zurück. Bei der vollständigen Extraktion gilt das für alle Zeilen in der Tabelle, da die Extraktion eine vollständige Kopie der Tabelle abruft. Bei der inkrementellen Extraktion werden nur die Zeilen abgerufen, die sich geändert haben.

Wenn es Zeit für den Ladeschritt ist, werden Vollextrakte in der Regel geladen, indem zunächst die Zieltabelle abgeschnitten und die neu extrahierten Daten geladen werden. In diesem Fall steht dir nur die letzte Version der Zeile im Data Warehouse zur Verfügung.

Wenn du Daten aus einer inkrementellen Extraktion lädst, werden die resultierenden Daten an die Daten in der Zieltabelle angehängt. In diesem Fall hast du sowohl den ursprünglichen Datensatz als auch die aktualisierte Version. Beides kann bei der Umwandlung und Analyse von Daten nützlich sein, wie ich in Kapitel 6 erkläre.

Tabelle 4-1 zeigt zum Beispiel den ursprünglichen Datensatz für OrderId 1 in der MySQL-Datenbank. Als der Kunde die Bestellung aufgegeben hat, war sie im Rückstand. Tabelle 4-2 zeigt den aktualisierten Datensatz in der MySQL-Datenbank. Wie du siehst, wurde die Bestellung aktualisiert, weil sie am 2020-06-09 ausgeliefert wurde.

Tabelle 4-1. Ursprünglicher Zustand von OrderId 1
OrderId OrderStatus LastUpdated

1

Rückständige Bestellungen

2020-06-01 12:00:00

Tabelle 4-2. Aktualisierter Status von OrderId 1
OrderId OrderStatus LastUpdated

1

Versandt

2020-06-09 12:00:25

Wenn eine vollständige Extraktion durchgeführt wird, wird die Zieltabelle im Data Warehouse zunächst abgeschnitten und dann mit der Ausgabe der Extraktion geladen. Das Ergebnis für OrderId 1 ist der in Tabelle 4-2 gezeigte einzelne Datensatz. Bei einer inkrementellen Extraktion hingegen wird die Ausgabe der Extraktion einfach an die Zieltabelle im Data Warehouse angehängt. Das Ergebnis ist, dass sich sowohl die ursprünglichen als auch die aktualisierten Datensätze für OrderId 1 im Data Warehouse befinden, wie in Tabelle 4-3 dargestellt.

Tabelle 4-3. Alle Versionen von OrderId 1 im Data Warehouse
OrderId OrderStatus LastUpdated

1

Rückständige Bestellungen

2020-06-01 12:00:00

1

Versandt

2020-06-09 12:00:25

Mehr über das Laden von vollständigen und inkrementellen Extraktionen erfährst du in Kapitel 5, u.a. unter "Laden von Daten in ein Redshift Warehouse".

Warnung

Gehe nie davon aus, dass eine LastUpdated Spalte in einem Quellsystem zuverlässig aktualisiert wird. Erkundige dich beim Eigentümer des Quellsystems und bestätige dies, bevor du dich für eine inkrementelle Extraktion auf diese Spalte verlässt.

Sowohl vollständige als auch inkrementelle Extraktionen aus einer MySQL-Datenbank können mit SQL-Abfragen implementiert werden, die in der Datenbank ausgeführt, aber von Python-Skripten ausgelöst werden. Zusätzlich zu den Python-Bibliotheken, die in den vorherigen Abschnitten installiert wurden, musst du die Bibliothek PyMySQL mit pip installieren:

(env) $ pip install pymysql

Du musst auch einen neuen Abschnitt in die Datei pipeline.conf einfügen, um die Verbindungsinformationen für die MySQL-Datenbank zu speichern:

[mysql_config]
hostname = my_host.com
port = 3306
username = my_user_name
password = my_password
database = db_name

Erstelle nun ein neues Python-Skript namens extract_mysql_full.py. Du musst mehrere Bibliotheken importieren, z. B. pymysql, die eine Verbindung zur MySQL-Datenbank herstellt, und die Bibliothek csv, damit du die extrahierten Daten strukturieren und in eine flache Datei schreiben kannst, die sich im Ladeschritt des Ingestion leicht in ein Data Warehouse importieren lässt. Importiere auch boto3, damit du die resultierende CSV-Datei in deinen S3-Bucket hochladen kannst, um sie später in das Data Warehouse zu laden:

import pymysql
import csv
import boto3
import configparser

Jetzt kannst du eine Verbindung zur MySQL-Datenbank initialisieren:

parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mysql_config", "hostname")
port = parser.get("mysql_config", "port")
username = parser.get("mysql_config", "username")
dbname = parser.get("mysql_config", "database")
password = parser.get("mysql_config", "password")

conn = pymysql.connect(host=hostname,
        user=username,
        password=password,
        db=dbname,
        port=int(port))

if conn is None:
  print("Error connecting to the MySQL database")
else:
  print("MySQL connection established!")

Führe eine vollständige Extraktion der Tabelle Orders aus dem vorherigen Beispiel aus. Der folgende Code extrahiert den gesamten Inhalt der Tabelle und schreibt ihn in eine CSV-Datei mit Pipe-Begrenzung. Für die Extraktion wird ein cursor Objekt aus der pymysql Bibliothek verwendet, um die SELECT-Abfrage auszuführen:

m_query = "SELECT * FROM Orders;"
local_filename = "order_extract.csv"

m_cursor = conn.cursor()
m_cursor.execute(m_query)
results = m_cursor.fetchall()

with open(local_filename, 'w') as fp:
  csv_w = csv.writer(fp, delimiter='|')
  csv_w.writerows(results)

fp.close()
m_cursor.close()
conn.close()

Nachdem die CSV-Datei lokal geschrieben wurde, muss sie in den S3-Bucket hochgeladen werden, damit sie später in das Data Warehouse oder ein anderes Ziel geladen werden kann. Wie im Abschnitt "Einrichten der Cloud-Dateispeicherung" beschrieben, hast du für die Boto3-Bibliothek einen IAM-Benutzer eingerichtet, der für die Authentifizierung gegenüber dem S3-Bucket verwendet wird. Außerdem hast du die Anmeldedaten im Abschnitt aws_boto_credentials der Datei pipeline.conf gespeichert. Hier ist der Code zum Hochladen der CSV-Datei in deinen S3-Bucket:

# load the aws_boto_credentials values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
access_key = parser.get("aws_boto_credentials", "access_key")
secret_key = parser.get("aws_boto_credentials", "secret_key")
bucket_name = parser.get("aws_boto_credentials", "bucket_name")

s3 = boto3.client('s3', aws_access_key_id=access_key, aws_secret_access_key=secret_key)

s3_file = local_filename

s3.upload_file(local_filename, bucket_name, s3_file)

Du kannst das Skript wie folgt ausführen:

(env) $ python extract_mysql_full.py

Wenn das Skript ausgeführt wird, befindet sich der gesamte Inhalt der Tabelle Orders in einer CSV-Datei im S3-Bucket und wartet darauf, in das Data Warehouse oder einen anderen Datenspeicher geladen zu werden. In Kapitel 5 erfährst du mehr über das Laden in einen Datenspeicher deiner Wahl.

Wenn du die Daten inkrementell extrahieren willst, musst du ein paar Änderungen am Skript vornehmen. Ich schlage vor, eine Kopie von extract_mysql_full.py mit dem Namen extract_mysql_incremental.py zu erstellen.

Finde zunächst den Zeitstempel des letzten Datensatzes, der aus der Quelltabelle Orders extrahiert wurde. Dazu fragst du den Wert MAX(LastUpdated) aus der Tabelle Orders im Data Warehouse ab. In diesem Beispiel verwende ich ein Redshift Data Warehouse (siehe "Konfigurieren eines Amazon Redshift Warehouse als Ziel"), aber du kannst die gleiche Logik auch mit einem Warehouse deiner Wahl anwenden.

Um mit deinem Redshift-Cluster zu interagieren, installiere die psycopg2 Bibliothek, falls du das nicht schon getan hast.

(env) $ pip install psycopg2

Hier ist der Code, um eine Verbindung zum Redshift-Cluster herzustellen und den Wert MAX(LastUpdated) aus der Tabelle Orders abzufragen:

import psycopg2

# get db Redshift connection info
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
dbname = parser.get("aws_creds", "database")
user = parser.get("aws_creds", "username")
password = parser.get("aws_creds", "password")
host = parser.get("aws_creds", "host")
port = parser.get("aws_creds", "port")

# connect to the redshift cluster
rs_conn = psycopg2.connect(
    "dbname=" + dbname
    + " user=" + user
    + " password=" + password
    + " host=" + host
    + " port=" + port)

rs_sql = """SELECT COALESCE(MAX(LastUpdated),
        '1900-01-01')
        FROM Orders;"""
rs_cursor = rs_conn.cursor()
rs_cursor.execute(rs_sql)
result = rs_cursor.fetchone()

# there's only one row and column returned
last_updated_warehouse = result[0]

rs_cursor.close()
rs_conn.commit()

Ändere die Extraktionsabfrage in der MySQL-Datenbank mit dem in last_updated_warehouse gespeicherten Wert so ab, dass nur die Datensätze aus der Tabelle Orders abgerufen werden, die seit dem letzten Lauf des Extraktionsauftrags aktualisiert worden sind. Die neue Abfrage enthält einen Platzhalter, der durch %s für den Wert last_updated_warehouse dargestellt wird. Der Wert wird dann als Tupel (ein Datentyp, der zum Speichern von Datensammlungen verwendet wird) an die Funktion .execute() des Cursors übergeben. Dies ist der richtige und sichere Weg, um einer SQL-Abfrage Parameter hinzuzufügen und so eine mögliche SQL-Injection zu vermeiden. Hier ist der aktualisierte Codeblock für die Ausführung der SQL-Abfrage in der MySQL-Datenbank:

m_query = """SELECT *
    FROM Orders
    WHERE LastUpdated > %s;"""
local_filename = "order_extract.csv"

m_cursor = conn.cursor()
m_cursor.execute(m_query, (last_updated_warehouse,))

Das gesamte Skript extract_mysql_incremental.py für die inkrementelle Extraktion (unter Verwendung eines Redshift-Clusters für den Wert last_updated ) sieht wie folgt aus:

import pymysql
import csv
import boto3
import configparser
import psycopg2

# get db Redshift connection info
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
dbname = parser.get("aws_creds", "database")
user = parser.get("aws_creds", "username")
password = parser.get("aws_creds", "password")
host = parser.get("aws_creds", "host")
port = parser.get("aws_creds", "port")

# connect to the redshift cluster
rs_conn = psycopg2.connect(
    "dbname=" + dbname
    + " user=" + user
    + " password=" + password
    + " host=" + host
    + " port=" + port)

rs_sql = """SELECT COALESCE(MAX(LastUpdated),
        '1900-01-01')
        FROM Orders;"""
rs_cursor = rs_conn.cursor()
rs_cursor.execute(rs_sql)
result = rs_cursor.fetchone()

# there's only one row and column returned
last_updated_warehouse = result[0]

rs_cursor.close()
rs_conn.commit()

# get the MySQL connection info and connect
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mysql_config", "hostname")
port = parser.get("mysql_config", "port")
username = parser.get("mysql_config", "username")
dbname = parser.get("mysql_config", "database")
password = parser.get("mysql_config", "password")

conn = pymysql.connect(host=hostname,
        user=username,
        password=password,
        db=dbname,
        port=int(port))

if conn is None:
  print("Error connecting to the MySQL database")
else:
  print("MySQL connection established!")

m_query = """SELECT *
      FROM Orders
      WHERE LastUpdated > %s;"""
local_filename = "order_extract.csv"

m_cursor = conn.cursor()
m_cursor.execute(m_query, (last_updated_warehouse,))
results = m_cursor.fetchall()

with open(local_filename, 'w') as fp:
  csv_w = csv.writer(fp, delimiter='|')
  csv_w.writerows(results)

fp.close()
m_cursor.close()
conn.close()

# load the aws_boto_credentials values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
access_key = parser.get(
    "aws_boto_credentials",
    "access_key")
secret_key = parser.get(
    "aws_boto_credentials",
    "secret_key")
bucket_name = parser.get(
    "aws_boto_credentials",
    "bucket_name")

s3 = boto3.client(
    's3',
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key)

s3_file = local_filename

s3.upload_file(
    local_filename,
    bucket_name,
    s3_file)
Warnung

Hüte dich davor, dass große Extraktionsaufträge - egal ob vollständig oder inkrementell - die MySQL-Quelldatenbank belasten und sogar die Ausführung von Produktionsabfragen blockieren. Sprich mit dem Eigentümer der Datenbank und ziehe in Erwägung, ein Replikat für die Extraktion einzurichten, anstatt aus der primären Quelldatenbank zu extrahieren.

Binäre Log-Replikation von MySQL-Daten

Obwohl die Implementierung komplexer ist, ist das Einlesen von Daten aus einer MySQL-Datenbank unter Verwendung des Inhalts des MySQL-Binlogs zur Replikation von Änderungen effizient, wenn große Mengen an Daten eingelesen werden müssen.

Hinweis

Die Binlog-Replikation ist eine Form der Änderungsdatenerfassung (CDC). Viele Quelldatenspeicher verfügen über eine Form von CDC, die du nutzen kannst.

Das MySQL binlog ist ein Protokoll, das jede Operation in der Datenbank aufzeichnet. Je nachdem, wie es konfiguriert ist, protokolliert es zum Beispiel die Einzelheiten jeder Tabellenerstellung oder -änderung sowie jede INSERT, UPDATE und DELETE Operation. Obwohl es ursprünglich dazu gedacht war, Daten auf andere MySQL-Instanzen zu replizieren, ist es nicht schwer zu verstehen, warum der Inhalt des binlog für Dateningenieure, die Daten in ein Data Warehouse einspeisen wollen, so interessant ist.

Da es sich bei deinem Data Warehouse wahrscheinlich nicht um eine MySQL-Datenbank handelt, ist es nicht möglich, einfach die eingebauten MySQL-Replikationsfunktionen zu nutzen. Um das binlog für die Dateneingabe in eine Nicht-MySQL-Quelle zu nutzen, sind einige Schritte erforderlich:

  1. Aktiviere und konfiguriere das binlog auf dem MySQL-Server.

  2. Führe eine erste vollständige Tabellenextraktion und -ladung durch.

  3. Auszug aus dem Binlog auf kontinuierlicher Basis.

  4. Übersetze und lade Binlog-Auszüge in das Data Warehouse.

Hinweis

Schritt 3 wird nicht im Detail besprochen, aber um das Binlog für die Aufnahme zu verwenden, musst du zunächst die Tabellen im Data Warehouse mit dem aktuellen Stand der MySQL-Datenbank auffüllen und dann das Binlog verwenden, um nachfolgende Änderungen aufzunehmen. Dazu musst du die Tabellen, die du extrahieren möchtest, mit LOCK verknüpfen, mysqldump ausführen und das Ergebnis von mysqldump in das Warehouse laden, bevor du die binlog-Ingestion aktivierst.

Obwohl es am besten ist, in der aktuellen MySQL binlog-Dokumentation nachzulesen, wie man das binäre Logging aktiviert und konfiguriert, werde ich die wichtigsten Konfigurationswerte durchgehen.

Es gibt zwei wichtige Einstellungen in der MySQL-Datenbank, die bei der binlog-Konfiguration beachtet werden müssen.

Stelle zunächst sicher, dass die binäre Protokollierung aktiviert ist. Normalerweise ist sie standardmäßig aktiviert, aber du kannst das überprüfen, indem du die folgende SQL-Abfrage in der Datenbank ausführst (die genaue Syntax kann je nach MySQL-Distribution variieren):

SELECT variable_value as bin_log_status
FROM performance_schema.global_variables
WHERE variable_name='log_bin';

Wenn das binäre Logging aktiviert ist, siehst du Folgendes. Wenn der zurückgegebene Status OFF lautet, musst du die MySQL-Dokumentation für die entsprechende Version konsultieren, um sie zu aktivieren.

+ — — — — — — — — — — — — — — — — — — -+
| bin_log_status :: |
+ — — — — — — — — — — — — — — — — — — -+
| ON |
+ — — — — — — — — — — — — — — — — — — -+
1 row in set (0.00 sec)

Vergewissere dich als Nächstes, dass das binäre Logging-Format richtig eingestellt ist. Es gibt drei Formate, die in der aktuellen Version von MySQL unterstützt werden:

  • STATEMENT

  • ROW

  • MIXED

Das Format STATEMENT protokolliert jede SQL-Anweisung, die eine Zeile im binlog einfügt oder ändert. Wenn du Daten von einer MySQL-Datenbank zu einer anderen replizieren möchtest, ist dieses Format nützlich. Um die Daten zu replizieren, könntest du einfach alle Anweisungen ausführen, um den Zustand der Datenbank zu reproduzieren. Da die extrahierten Daten jedoch wahrscheinlich für ein Data Warehouse bestimmt sind, das auf einer anderen Plattform läuft, sind die in der MySQL-Datenbank erzeugten SQL-Anweisungen möglicherweise nicht mit deinem Data Warehouse kompatibel.

Im Format ROW wird jede Änderung einer Tabellenzeile in einer Zeile des binlogs nicht als SQL-Anweisung dargestellt, sondern als die Daten in der Zeile selbst. Dies ist das bevorzugte Format, das verwendet wird.

Das Format MIXED protokolliert sowohl STATEMENT- als auch ROW-formatierte Datensätze im binlog. Obwohl es möglich ist, später nur die ROW Daten auszusieben, ist es nicht notwendig, MIXED zu aktivieren, wenn das binlog nicht für andere Zwecke verwendet wird, da es zusätzlichen Speicherplatz beansprucht.

Du kannst das aktuelle binlog-Format überprüfen, indem du die folgende SQL-Abfrage ausführst:

SELECT variable_value as bin_log_format
FROM performance_schema.global_variables
WHERE variable_name='binlog_format';

Die Anweisung gibt das Format zurück, das gerade aktiv ist:

+ — — — — — — — — — — — — — — — — — — — -+
| bin_log_format :: |
+ — — — — — — — — — — — — — — — — — — — -+
| ROW |
+ — — — — — — — — — — — — — — — — — — — -+
1 row in set (0.00 sec)

Das binlog-Format und andere Konfigurationseinstellungen werden normalerweise in der Datei my.cnf für die MySQL-Datenbankinstanz festgelegt. Wenn du die Datei öffnest, wirst du eine Zeile wie die folgende sehen:

[mysqld]
binlog_format=row
........

Auch hier ist es am besten, den Eigentümer der MySQL-Datenbank oder die aktuelle MySQL-Dokumentation zu konsultieren, bevor du irgendwelche Konfigurationen änderst.

Jetzt, da die binäre Protokollierung in einem ROW Format aktiviert ist, kannst du einen Prozess erstellen, um die relevanten Informationen daraus zu extrahieren und in einer Datei zu speichern, die in dein Data Warehouse geladen wird.

Es gibt drei verschiedene Arten von ROW-formatierten Ereignissen, die du aus dem binlog ziehen willst. Für dieses Ingestion-Beispiel kannst du andere Ereignisse, die du im Log findest, ignorieren. Bei fortgeschritteneren Replikationsstrategien ist es jedoch auch sinnvoll, Ereignisse zu extrahieren, die die Struktur einer Tabelle verändern. Die Ereignisse, mit denen du arbeiten wirst, sind die folgenden

  • WRITE_ROWS_EVENT

  • UPDATE_ROWS_EVENT

  • DELETE_ROWS_EVENT

Als Nächstes ist es an der Zeit, die Ereignisse aus dem binlog abzurufen. Zum Glück gibt es einige Open-Source-Python-Bibliotheken, mit denen du loslegen kannst. Eine der beliebtesten ist das Projekt python-mysql-replication, das du auf GitHub findest. Um loszulegen, installiere es mit pip:

(env) $ pip install mysql-replication

Um eine Vorstellung davon zu bekommen, wie die Ausgabe des binlogs aussieht, kannst du dich mit der Datenbank verbinden und das binlog auslesen. In diesem Beispiel verwende ich die MySQL-Verbindungsinformationen, die in der Datei pipeline.conf für das Beispiel der vollständigen und inkrementellen Erfassung weiter oben in diesem Abschnitt hinzugefügt wurden.

Hinweis

Das folgende Beispiel liest aus der Standard-Binlog-Datei des MySQL-Servers. Der Standard-Binlog-Dateiname und der Pfad werden in der Variable log_bin festgelegt, die in der Datei my.cnf für die MySQL-Datenbank gespeichert ist. In manchen Fällen werden die binlogs im Laufe der Zeit rotiert (vielleicht täglich oder stündlich). In diesem Fall musst du den Dateipfad auf der Grundlage der vom MySQL-Administrator gewählten Methode der Log-Rotation und des Dateinamensschemas bestimmen und ihn als Wert an den Parameter log_file übergeben, wenn du die Instanz BinLogStreamReader erstellst. Weitere Informationen findest du in der Dokumentation für die Klasse BinLogStreamReader .

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication import row_event
import configparser
import pymysqlreplication

# get the MySQL connection info
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mysql_config", "hostname")
port = parser.get("mysql_config", "port")
username = parser.get("mysql_config", "username")
password = parser.get("mysql_config", "password")

mysql_settings = {
    "host": hostname,
    "port": int(port),
    "user": username,
    "passwd": password
}

b_stream = BinLogStreamReader(
            connection_settings = mysql_settings,
            server_id=100,
            only_events=[row_event.DeleteRowsEvent,
                        row_event.WriteRowsEvent,
                        row_event.UpdateRowsEvent]
            )

for event in b_stream:
    event.dump()

b_stream.close()

Es gibt ein paar Dinge über das BinLogStreamReader Objekt zu beachten, das im Codebeispiel instanziiert wird. Zunächst stellt es eine Verbindung zur MySQL-Datenbank her, die in der Datei pipeline.conf angegeben ist, und liest aus einer bestimmten binlog-Datei. Als Nächstes wird durch die Kombination aus der Einstellung resume_stream=True und dem Wert log_pos festgelegt, dass das Lesen des Binlogs an einer bestimmten Stelle beginnt. In diesem Fall ist das die Position 1400. Schließlich weise ich BinLogStreamReader an, nur die Ereignisse DeleteRowsEvent, WriteRowsEvent und UpdateRowsEvent zu lesen, indem ich den Parameter only_events verwende.

Als Nächstes durchläuft das Skript alle Ereignisse und gibt sie in einem für Menschen lesbaren Format aus. Für deine Datenbank mit der Tabelle Orders siehst du in etwa die folgende Ausgabe:

=== WriteRowsEvent ===
Date: 2020-06-01 12:00:00
Log position: 1400
Event size: 30
Read bytes: 20
Table: orders
Affected columns: 3
Changed rows: 1
Values:
--
* OrderId : 1
* OrderStatus : Backordered
* LastUpdated : 2020-06-01 12:00:00

=== UpdateRowsEvent ===
Date: 2020-06-09 12:00:25
Log position: 1401
Event size: 56
Read bytes: 15
Table: orders
Affected columns: 3
Changed rows: 1
Affected columns: 3
Values:
--
* OrderId : 1 => 1
* OrderStatus : Backordered => Shipped
* LastUpdated : 2020-06-01 12:00:00 => 2020-06-09 12:00:25

Wie du sehen kannst, gibt es zwei Ereignisse, die die INSERT und UPDATE von OrderId 1 darstellen, die in Tabelle 4-3 gezeigt wurden. In diesem fiktiven Beispiel liegen die beiden aufeinanderfolgenden binlog-Ereignisse Tage auseinander, aber in der Realität lägen zahlreiche Ereignisse dazwischen, die alle Änderungen in der Datenbank darstellen.

Hinweis

Der Wert von log_pos, der BinLogStreamReader mitteilt, wo er beginnen soll, ist ein Wert, den du irgendwo in einer eigenen Tabelle speichern musst, damit du weißt, wo du bei der nächsten Extraktion weitermachen musst. Ich finde es am besten, den Wert in einer Log-Tabelle im Data Warehouse zu speichern, aus der er gelesen werden kann, wenn die Extraktion beginnt, und in die er geschrieben werden kann, zusammen mit dem Positionswert des letzten Ereignisses, wenn sie beendet ist.

Das Codebeispiel zeigt zwar, wie die Ereignisse in einem für Menschen lesbaren Format aussehen, aber damit die Ausgabe einfach in das Data Warehouse geladen werden kann, sind noch ein paar weitere Schritte erforderlich:

  • Analysiere die Daten und schreibe sie in ein anderes Format. Um das Laden zu vereinfachen, wird im nächsten Codebeispiel jedes Ereignis in eine Zeile einer CSV-Datei geschrieben.

  • Schreibe eine Datei pro Tabelle, die du extrahieren und laden möchtest. Das Beispiel-Binlog enthält zwar nur Ereignisse, die sich auf die Tabelle Orders beziehen, aber es ist sehr wahrscheinlich, dass in einem echten Binlog auch Ereignisse enthalten sind, die sich auf andere Tabellen beziehen.

Um die erste Änderung vorzunehmen, werde ich statt der Funktion .dump() die Ereignisattribute auslesen und in eine CSV-Datei schreiben. Bei der zweiten Änderung schreibe ich der Einfachheit halber nur die Ereignisse der Tabelle Orders in eine Datei namens orders_extract.csv, anstatt für jede Tabelle eine eigene Datei zu erstellen. Bei einer vollständig implementierten Extraktion musst du dieses Codebeispiel ändern, um die Ereignisse nach Tabellen zu gruppieren und mehrere Dateien zu schreiben, eine für jede Tabelle, für die du Änderungen aufnehmen willst. Im letzten Schritt des finalen Codebeispiels wird die CSV-Datei in den S3-Bucket hochgeladen, damit sie in das Data Warehouse geladen werden kann, wie in Kapitel 5 ausführlich beschrieben:

from pymysqlreplication import BinLogStreamReader
from pymysqlreplication import row_event
import configparser
import pymysqlreplication
import csv
import boto3

# get the MySQL connection info
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mysql_config", "hostname")
port = parser.get("mysql_config", "port")
username = parser.get("mysql_config", "username")
password = parser.get("mysql_config", "password")

mysql_settings = {
    "host": hostname,
    "port": int(port),
    "user": username,
    "passwd": password
}

b_stream = BinLogStreamReader(
            connection_settings = mysql_settings,
            server_id=100,
            only_events=[row_event.DeleteRowsEvent,
                        row_event.WriteRowsEvent,
                        row_event.UpdateRowsEvent]
            )

order_events = []

for binlogevent in b_stream:
  for row in binlogevent.rows:
    if binlogevent.table == 'orders':
      event = {}
      if isinstance(
            binlogevent,row_event.DeleteRowsEvent
        ):
        event["action"] = "delete"
        event.update(row["values"].items())
      elif isinstance(
            binlogevent,row_event.UpdateRowsEvent
        ):
        event["action"] = "update"
        event.update(row["after_values"].items())
      elif isinstance(
            binlogevent,row_event.WriteRowsEvent
        ):
        event["action"] = "insert"
        event.update(row["values"].items())

      order_events.append(event)

b_stream.close()

keys = order_events[0].keys()
local_filename = 'orders_extract.csv'
with open(
        local_filename,
        'w',
        newline='') as output_file:
    dict_writer = csv.DictWriter(
                output_file, keys,delimiter='|')
    dict_writer.writerows(order_events)

# load the aws_boto_credentials values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
access_key = parser.get(
                "aws_boto_credentials",
                "access_key")
secret_key = parser.get(
                "aws_boto_credentials",
                "secret_key")
bucket_name = parser.get(
                "aws_boto_credentials",
                "bucket_name")

s3 = boto3.client(
    's3',
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key)

s3_file = local_filename

s3.upload_file(
    local_filename,
    bucket_name,
    s3_file)

Nach der Ausführung sieht die Datei orders_extract.csv wie folgt aus:

insert|1|Backordered|2020-06-01 12:00:00
update|1|Shipped|2020-06-09 12:00:25

Wie ich in Kapitel 5 erkläre, ist das Format der resultierenden CSV-Datei für schnelles Laden optimiert. Die Aufbereitung der extrahierten Daten ist eine Aufgabe für den Transformationsschritt in einer Pipeline, die in Kapitel 6 ausführlich beschrieben wird.

Daten aus einer PostgreSQL-Datenbank extrahieren

Wie bei MySQL gibt es auch bei PostgreSQL (allgemein als Postgres bekannt) zwei Möglichkeiten, Daten aus der Datenbank zu übernehmen: entweder durch vollständige oder inkrementelle Extraktionen mit SQL oder durch die Nutzung von Funktionen der Datenbank, die die Replikation auf andere Knotenpunkte unterstützen. Im Fall von Postgres gibt es mehrere Möglichkeiten, aber dieses Kapitel konzentriert sich auf eine Methode: die Umwandlung des Postgres Write-Ahead Log (WAL) in einen Datenstrom.

Wie der vorherige Abschnitt ist auch dieser für diejenigen gedacht, die Daten aus einer bestehenden Postgres-Datenbank einlesen müssen. Wenn du jedoch nur die Codebeispiele ausprobieren möchtest, kannst du Postgres entweder auf deinem lokalen Rechner installieren oder in AWS eine RDS-Instanz verwenden, was ich empfehle. Im vorigen Abschnitt findest du Hinweise zu den bewährten Methoden in Bezug auf Preise und Sicherheit für RDS MySQL, die auch für RDS Postgres gelten.

Die Codebeispiele in diesem Abschnitt sind recht einfach und beziehen sich auf eine Tabelle namens Orders in einer Postgres-Datenbank. Sobald du eine Postgres-Instanz zur Verfügung hast, kannst du die Tabelle erstellen und einige Beispielzeilen einfügen, indem du die folgenden SQL-Befehle ausführst:

CREATE TABLE Orders (
  OrderId int,
  OrderStatus varchar(30),
  LastUpdated timestamp
);

INSERT INTO Orders
  VALUES(1,'Backordered', '2020-06-01 12:00:00');
INSERT INTO Orders
  VALUES(1,'Shipped', '2020-06-09 12:00:25');
INSERT INTO Orders
  VALUES(2,'Shipped', '2020-07-11 3:05:00');
INSERT INTO Orders
  VALUES(1,'Shipped', '2020-06-09 11:50:00');
INSERT INTO Orders
  VALUES(3,'Shipped', '2020-07-12 12:00:00');

Vollständige oder inkrementelle Postgres-Tabellenextraktion

Diese Methode ähnelt den vollständigen, inkrementellen und vollständigen Extraktionen, die in "Daten aus einer MySQL-Datenbank extrahieren" gezeigt werden . Sie ist so ähnlich, dass ich hier nur auf einen Unterschied im Code eingehen werde. Wie das Beispiel in diesem Abschnitt extrahiert auch dieses Beispiel Daten aus einer Tabelle namens Orders in einer Quelldatenbank, schreibt sie in eine CSV-Datei und lädt sie dann in einen S3-Bucket hoch.

Der einzige Unterschied in diesem Abschnitt ist die Python-Bibliothek, die ich zum Extrahieren der Daten verwenden werde. Anstelle von PyMySQL verwende ich pyscopg2, um eine Verbindung zu einer Postgres-Datenbank herzustellen. Wenn du sie noch nicht installiert hast, kannst du das mit pip tun:

(env) $ pip install pyscopg2

Außerdem musst du der Datei pipeline.conf einen neuen Abschnitt mit den Verbindungsinformationen für die Postgres-Datenbank hinzufügen:

[postgres_config]
host = myhost.com
port = 5432
username = my_username
password = my_password
database = db_name

Der Code zum Ausführen der vollständigen Extraktion der Tabelle Orders ist fast identisch mit dem Beispiel aus dem MySQL-Abschnitt, aber wie du sehen kannst, verwendet er pyscopg2, um sich mit der Quelldatenbank zu verbinden und die Abfrage auszuführen. Hier ist der vollständige Code:

import psycopg2
import csv
import boto3
import configparser

parser = configparser.ConfigParser()
parser.read("pipeline.conf")
dbname = parser.get("postgres_config", "database")
user = parser.get("postgres_config", "username")
password = parser.get("postgres_config",
    "password")
host = parser.get("postgres_config", "host")
port = parser.get("postgres_config", "port")

conn = psycopg2.connect(
        "dbname=" + dbname
        + " user=" + user
        + " password=" + password
        + " host=" + host,
        port = port)

m_query = "SELECT * FROM Orders;"
local_filename = "order_extract.csv"

m_cursor = conn.cursor()
m_cursor.execute(m_query)
results = m_cursor.fetchall()

with open(local_filename, 'w') as fp:
  csv_w = csv.writer(fp, delimiter='|')
  csv_w.writerows(results)

fp.close()
m_cursor.close()
conn.close()

# load the aws_boto_credentials values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
access_key = parser.get(
                "aws_boto_credentials",
                "access_key")
secret_key = parser.get(
                "aws_boto_credentials",
                "secret_key")
bucket_name = parser.get(
                "aws_boto_credentials",
                "bucket_name")

s3 = boto3.client(
      's3',
      aws_access_key_id = access_key,
      aws_secret_access_key = secret_key)

s3_file = local_filename

s3.upload_file(
    local_filename,
    bucket_name,
    s3_file)

Das Ändern der inkrementellen Version, die im Abschnitt MySQL gezeigt wird, ist genauso einfach. Alles, was du tun musst, ist, psycopg2 statt PyMySQL zu verwenden.

Replikation von Daten mit dem Write-Ahead Log

Wie das MySQL-Binlog (wie im vorherigen Abschnitt beschrieben) kann das Postgres-WAL als CDC-Methode für die Extraktion verwendet werden. Wie das MySQL-Binlog ist auch die Verwendung des WAL für die Dateneingabe in eine Pipeline recht komplex.

Obwohl du einen ähnlichen, vereinfachten Ansatz wählen kannst, wie er im Beispiel mit dem MySQL-Binlog verwendet wurde, schlage ich vor, eine verteilte Open-Source-Plattform namens Debezium zu verwenden, um den Inhalt des Postgres-WAL in einen S3-Bucket oder ein Data Warehouse zu streamen.

Obwohl die Einzelheiten der Konfiguration und des Betriebs von Debezium-Diensten ein Thema sind, dem ein ganzes Buch gewidmet werden sollte, gebe ich in "Streaming Data Ingestions with Kafka and Debezium" einen Überblick über Debezium und wie es für Dateningestionen verwendet werden kann . Dort erfährst du auch mehr darüber, wie es für Postgres CDC genutzt werden kann.

Daten aus der MongoDB extrahieren

Dieses Beispiel zeigt, wie du eine Teilmenge von MongoDB-Dokumenten aus einer Sammlung extrahieren kannst. In dieser MongoDB-Beispielsammlung stellen die Dokumente Ereignisse dar, die von einem System wie z. B. einem Webserver aufgezeichnet wurden. Jedes Dokument hat einen Zeitstempel, wann es erstellt wurde, sowie eine Reihe von Eigenschaften, aus denen der Beispielcode eine Teilmenge extrahiert. Nachdem die Extraktion abgeschlossen ist, werden die Daten in eine CSV-Datei geschrieben und in einem S3-Bucket gespeichert, damit sie in einem späteren Schritt in ein Data Warehouse geladen werden können (siehe Kapitel 5).

Um eine Verbindung zur MongoDB-Datenbank herzustellen, musst du zunächst die PyMongo-Bibliothek installieren. Wie andere Python-Bibliotheken auch, kannst du sie mit pip installieren:

(env) $ pip install pymongo

Du kannst den folgenden Beispielcode natürlich abändern, um dich mit deiner eigenen MongoDB-Instanz zu verbinden und Daten aus deinen Dokumenten zu extrahieren. Wenn du das Beispiel jedoch so ausführen möchtest, wie es ist, kannst du mit MongoDB Atlas kostenlos einen MongoDB-Cluster erstellen. Atlas ist ein vollständig verwalteter MongoDB-Dienst und umfasst eine kostenlose Stufe mit viel Speicherung und Rechenleistung für das Lernen und Ausführen von Beispielen wie dem von mir bereitgestellten. Für den produktiven Einsatz kannst du auf einen kostenpflichtigen Plan upgraden.

In dieser Anleitung erfährst du, wie du einen kostenlosen MongoDB-Cluster in Atlas erstellst, eine Datenbank einrichtest und sie so konfigurierst, dass du dich über ein Python-Skript, das auf deinem lokalen Rechner läuft, verbinden kannst.

Du musst eine weitere Python-Bibliothek mit dem Namen dnspython installieren, um pymongo bei der Verbindung zu deinem in MongoDB Atlas gehosteten Cluster zu unterstützen. Du kannst sie mit pip installieren:

(env) $ pip install dnspython

Als Nächstes fügst du der Datei pipeline.conf einen neuen Abschnitt mit den Verbindungsinformationen für die MongoDB-Instanz hinzu, aus der du Daten extrahieren willst. Fülle jede Zeile mit deinen eigenen Verbindungsdaten aus. Wenn du MongoDB Atlas verwendest und dich nicht mehr an diese Werte von der Einrichtung deines Clusters erinnerst, kannst du in den Atlas-Dokumenten nachlesen, wie du sie findest.

[mongo_config]
hostname = my_host.com
username = mongo_user
password = mongo_password
database = my_database
collection = my_collection

Bevor du das Extraktionsskript erstellst und ausführst, kannst du einige Beispieldaten einfügen, mit denen du arbeiten kannst. Erstelle eine Datei namens sample_mongodb.py mit dem folgenden Code:

from pymongo import MongoClient
import datetime
import configparser

# load the mongo_config values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mongo_config", "hostname")
username = parser.get("mongo_config", "username")
password = parser.get("mongo_config", "password")
database_name = parser.get("mongo_config",
                    "database")
collection_name = parser.get("mongo_config",
                    "collection")

mongo_client = MongoClient(
                "mongodb+srv://" + username
                + ":" + password
                + "@" + hostname
                + "/" + database_name
                + "?retryWrites=true&"
                + "w=majority&ssl=true&"
                + "ssl_cert_reqs=CERT_NONE")

# connect to the db where the collection resides
mongo_db = mongo_client[database_name]

# choose the collection to query documents from
mongo_collection = mongo_db[collection_name]

event_1 = {
  "event_id": 1,
  "event_timestamp": datetime.datetime.today(),
  "event_name": "signup"
}

event_2 = {
  "event_id": 2,
  "event_timestamp": datetime.datetime.today(),
  "event_name": "pageview"
}

event_3 = {
  "event_id": 3,
  "event_timestamp": datetime.datetime.today(),
  "event_name": "login"
}

# insert the 3 documents
mongo_collection.insert_one(event_1)
mongo_collection.insert_one(event_2)
mongo_collection.insert_one(event_3)

Wenn du sie ausführst, werden die drei Dokumente in deine MongoDB-Sammlung eingefügt:

(env) $ python sample_mongodb.py

Erstelle nun ein neues Python-Skript mit dem Namen mongo_extract.py, in das du die folgenden Codeblöcke einfügen kannst.

Importiere zunächst PyMongo und Boto3, damit du Daten aus der MongoDB-Datenbank extrahieren und die Ergebnisse in einem S3-Bucket speichern kannst. Importiere auch die csv Bibliothek, damit du die extrahierten Daten strukturieren und in eine flache Datei schreiben kannst, die du im Ladeschritt des Ingestion in ein Data Warehouse importieren kannst. Schließlich brauchst du für dieses Beispiel noch einige datetime Funktionen, mit denen du die Beispiel-Ereignisdaten in der MongoDB-Sammlung durchlaufen kannst:

from pymongo import MongoClient
import csv
import boto3
import datetime
from datetime import timedelta
import configparser

Als Nächstes verbindest du dich mit der MongoDB-Instanz, die in der Datei pipelines.conf angegeben ist, und erstellst ein collection Objekt, in dem die Dokumente, die du extrahieren möchtest, gespeichert werden:

# load the mongo_config values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
hostname = parser.get("mongo_config", "hostname")
username = parser.get("mongo_config", "username")
password = parser.get("mongo_config", "password")
database_name = parser.get("mongo_config",
                    "database")
collection_name = parser.get("mongo_config",
                    "collection")

mongo_client = MongoClient(
                "mongodb+srv://" + username
                + ":" + password
                + "@" + hostname
                + "/" + database_name
                + "?retryWrites=true&"
                + "w=majority&ssl=true&"
                + "ssl_cert_reqs=CERT_NONE")

# connect to the db where the collection resides
mongo_db = mongo_client[database_name]

# choose the collection to query documents from
mongo_collection = mongo_db[collection_name]

Jetzt ist es an der Zeit, die zu extrahierenden Dokumente abzufragen. Dazu rufst du die Funktion .find() auf mongo_collection auf, um die gesuchten Dokumente abzufragen. Im folgenden Beispiel werden alle Dokumente abgefragt, deren Wert im Feld event_timestamp zwischen zwei im Skript definierten Daten liegt.

Hinweis

Das Extrahieren von unveränderlichen Daten wie Protokolldatensätzen oder allgemeinen "Ereignis"-Datensätzen aus einem Datenspeicher nach Datumsbereich ist ein häufiger Anwendungsfall. Obwohl der Beispielcode einen im Skript definierten Datumsbereich verwendet, ist es wahrscheinlicher, dass du dem Skript einen Datumsbereich übergibst oder das Skript dein Data Warehouse abfragen lässt, um den Zeitpunkt des letzten geladenen Ereignisses zu ermitteln und die nachfolgenden Datensätze aus dem Quelldatenspeicher zu extrahieren. Ein Beispiel dafür findest du unter "Extrahieren von Daten aus einer MySQL-Datenbank".

start_date = datetime.datetime.today() + timedelta(days = -1)
end_date = start_date + timedelta(days = 1 )

mongo_query = { "$and":[{"event_timestamp" : { "$gte": start_date }}, {"event_timestamp" : { "$lt": end_date }}] }

event_docs = mongo_collection.find(mongo_query, batch_size=3000)
Hinweis

Der Parameter batch_size ist in diesem Beispiel auf 3000 gesetzt. PyMongo macht für jeden Batch einen Roundtrip zum MongoDB-Host. Wenn result_docs Cursor z. B. 6.000 Ergebnisse enthält, sind zwei Fahrten zum MongoDB-Host nötig, um alle Dokumente auf den Rechner zu bringen, auf dem dein Python-Skript läuft. Welchen Wert du als Stapelgröße festlegst, ist dir überlassen und hängt von dem Kompromiss ab, mehr Dokumente im Speicher des Systems zu speichern, auf dem der Extrakt ausgeführt wird, anstatt viele Fahrten zur MongoDB-Instanz zu unternehmen.

Das Ergebnis des vorangegangenen Codes ist eine Cursor mit dem Namen event_docs, die ich verwenden werde, um die resultierenden Dokumente zu durchlaufen. Erinnere dich daran, dass in diesem vereinfachten Beispiel jedes Dokument ein Ereignis darstellt, das von einem System wie z. B. einem Webserver erzeugt wurde. Ein Ereignis kann z. B. das Einloggen eines Benutzers, das Betrachten einer Seite oder das Absenden eines Feedback-Formulars sein. Obwohl die Dokumente Dutzende von Feldern haben könnten, um z. B. den Browser darzustellen, mit dem sich der Nutzer angemeldet hat, nehme ich für dieses Beispiel nur ein paar Felder:

# create a blank list to store the results
all_events = []

# iterate through the cursor
for doc in event_docs:
    # Include default values
    event_id = str(doc.get("event_id", -1))
    event_timestamp = doc.get(
                        "event_timestamp", None)
    event_name = doc.get("event_name", None)

    # add all the event properties into a list
    current_event = []
    current_event.append(event_id)
    current_event.append(event_timestamp)
    current_event.append(event_name)

    # add the event to the final list of events
    all_events.append(current_event)

Ich füge einen Standardwert in den doc.get() Funktionsaufruf ein (-1 oder None). Warum das? Es liegt in der Natur von unstrukturierten Dokumentdaten, dass Felder in einem Dokument fehlen können. Du kannst also nicht davon ausgehen, dass jedes der Dokumente, die du durchgehst, ein Feld "event_name" oder ein anderes Feld enthält. In solchen Fällen kannst du doc.get() anweisen, einen None Wert zurückzugeben, anstatt einen Fehler zu melden.

Nachdem du alle Ereignisse in event_docs durchlaufen hast, ist die Liste all_events bereit, in eine CSV-Datei geschrieben zu werden. Dazu verwendest du das Modul csv, das in der Standard-Python-Distribution enthalten ist und bereits in diesem Beispiel importiert wurde:

export_file = "export_file.csv"

with open(export_file, 'w') as fp:
	csvw = csv.writer(fp, delimiter='|')
	csvw.writerows(all_events)

fp.close()

Nun lädst du die CSV-Datei in den S3-Bucket hoch, den du unter "Einrichten der Cloud-Dateispeicherung" konfiguriert hast . Verwende dazu die Boto3-Bibliothek:

# load the aws_boto_credentials values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
access_key = parser.get("aws_boto_credentials",
                "access_key")
secret_key = parser.get("aws_boto_credentials",
                "secret_key")
bucket_name = parser.get("aws_boto_credentials",
                "bucket_name")

s3 = boto3.client('s3',
        aws_access_key_id=access_key,
        aws_secret_access_key=secret_key)

s3_file = export_file

s3.upload_file(export_file, bucket_name, s3_file)

Das war's! Die Daten, die du aus der MongoDB-Sammlung extrahiert hast, befinden sich jetzt im S3-Bucket und warten darauf, in das Data Warehouse oder einen anderen Datenspeicher geladen zu werden. Wenn du die mitgelieferten Beispieldaten verwendet hast, sieht der Inhalt von export_file.csv etwa so aus:

1|2020-12-13 11:01:37.942000|signup
2|2020-12-13 11:01:37.942000|pageview
3|2020-12-13 11:01:37.942000|login

In Kapitel 5 erfährst du mehr über das Laden der Daten in den Datenspeicher deiner Wahl.

Daten aus einer REST API extrahieren

REST-APIs sind eine gängige Quelle, um Daten zu extrahieren. Möglicherweise musst du Daten von einer API übernehmen, die dein Unternehmen erstellt und verwaltet, oder von einer API eines externen Dienstes oder Anbieters, den dein Unternehmen nutzt, wie Salesforce, HubSpot oder Twitter. Unabhängig von der API gibt es ein gemeinsames Muster für die Datenextraktion, das ich in dem folgenden einfachen Beispiel verwenden werde:

  1. Sende eine HTTP-GET-Anfrage an den API-Endpunkt.

  2. Akzeptiere die Antwort, die höchstwahrscheinlich in JSON formatiert ist.

  3. Analysiere die Antwort und wandle sie in eine CSV-Datei um, die du später in dein Data Warehouse laden kannst.

Hinweis

Obwohl ich die JSON-Antwort analysiere und in einer flachen Datei (CSV) speichere, kannst du die Daten auch im JSON-Format speichern und in dein Data Warehouse laden. Der Einfachheit halber bleibe ich bei dem Muster dieses Kapitels und verwende CSV-Dateien. Weitere Informationen zum Laden von Daten in einem anderen Format als CSV findest du in Kapitel 5 oder in deiner Data Warehouse-Dokumentation.

In diesem Beispiel werde ich mich mit einer API namens Open Notify verbinden. Die API hat mehrere Endpunkte, die jeweils Daten von der NASA über Ereignisse im Weltraum zurückgeben. Ich frage den Endpunkt ab, der die nächsten fünf Vorbeiflüge der Internationalen Raumstation (ISS) an einem bestimmten Ort auf der Erde anzeigt.

Bevor ich dir den Python-Code für die Abfrage des Endpunkts zeige, kannst du sehen, wie die Ausgabe einer einfachen Abfrage aussieht, indem du die folgende URL in deinen Browser eingibst:

http://api.open-notify.org/iss-pass.json?lat=42.36&lon=71.05

Das resultierende JSON sieht wie folgt aus:

{
  "message": "success",
  "request": {
    "altitude": 100,
    "datetime": 1596384217,
    "latitude": 42.36,
    "longitude": 71.05,
    "passes": 5
  },
  "response": [
    {
      "duration": 623,
      "risetime": 1596384449
    },
    {
      "duration": 169,
      "risetime": 1596390428
    },
    {
      "duration": 482,
      "risetime": 1596438949
    },
    {
      "duration": 652,
      "risetime": 1596444637
    },
    {
      "duration": 624,
      "risetime": 1596450474
    }
  ]
}

Das Ziel dieser Extraktion ist es, die Daten in der Antwort abzurufen und sie in einer CSV-Datei mit einer Zeile für jede Zeit und Dauer jedes Überflugs, den die ISS über das Längen-/Breitengradpaar macht, zu formatieren. Die ersten beiden Zeilen der CSV-Datei lauten zum Beispiel wie folgt:

42.36,|71.05|623|1596384449
42.36,|71.05|169|1596390428

Um die API abzufragen und die Antworten in Python zu verarbeiten, musst du die Bibliothek requests installieren. requests macht die Arbeit mit HTTP-Anfragen und -Antworten in Python einfach. Du kannst sie mit pip installieren:

(env) $ pip install requests

Jetzt kannst du requests verwenden, um den API-Endpunkt abzufragen, die Antwort zurückzubekommen und das resultierende JSON auszudrucken, das so aussieht, wie du es in deinem Browser gesehen hast:

import requests

lat = 42.36
lon = 71.05
lat_log_params = {"lat": lat, "lon": lon}

api_response = requests.get(
    "http://api.open-notify.org/iss-pass.json", params=lat_log_params)

print(api_response.content)

Anstatt das JSON auszudrucken, werde ich die Antwort durchlaufen, die Werte für Dauer und Anstiegszeit auslesen, die Ergebnisse in eine CSV-Datei schreiben und die Datei in den S3-Bucket hochladen.

Um die JSON-Antwort zu parsen, importiere ich die Python-Bibliothek json. Du brauchst sie nicht zu installieren, da sie in der Standard-Python-Installation enthalten ist. Als Nächstes importiere ich die Bibliothek csv, die auch in der Standard-Python-Distribution enthalten ist, um die CSV-Datei zu schreiben. Schließlich verwende ich die Bibliothek configparser, um die Anmeldeinformationen zu erhalten, die die Boto3-Bibliothek benötigt, um die CSV-Datei in den S3-Bucket hochzuladen:

import requests
import json
import configparser
import csv
import boto3

Als Nächstes fragst du die API ab, wie du es zuvor getan hast:

lat = 42.36
lon = 71.05
lat_log_params = {"lat": lat, "lon": lon}

api_response = requests.get(
    "http://api.open-notify.org/iss-pass.json", params=lat_log_params)

Jetzt ist es an der Zeit, die Antwort zu durchlaufen, die Ergebnisse in einer Python-Datei list namens all_passes zu speichern und die Ergebnisse in einer CSV-Datei zu speichern. Beachte, dass ich auch den Breiten- und Längengrad aus der Anfrage speichere, obwohl sie nicht in der Antwort enthalten sind. Sie werden in jeder Zeile der CSV-Datei benötigt, damit die Durchgangszeiten beim Laden in das Data Warehouse mit dem richtigen Längen- und Breitengrad verknüpft werden:

# create a json object from the response content
response_json = json.loads(api_response.content)

all_passes = []
for response in response_json['response']:
    current_pass = []

    #store the lat/log from the request
    current_pass.append(lat)
    current_pass.append(lon)

    # store the duration and risetime of the pass
    current_pass.append(response['duration'])
    current_pass.append(response['risetime'])

    all_passes.append(current_pass)

export_file = "export_file.csv"

with open(export_file, 'w') as fp:
	csvw = csv.writer(fp, delimiter='|')
	csvw.writerows(all_passes)

fp.close()

Zum Schluss lädst du die CSV-Datei mithilfe der Boto3-Bibliothek in den S3-Bucket hoch:

# load the aws_boto_credentials values
parser = configparser.ConfigParser()
parser.read("pipeline.conf")
access_key = parser.get("aws_boto_credentials",
                "access_key")
secret_key = parser.get("aws_boto_credentials",
                "secret_key")
bucket_name = parser.get("aws_boto_credentials",
                "bucket_name")

s3 = boto3.client(
    's3',
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key)

s3.upload_file(
    export_file,
    bucket_name,
    export_file)

Streaming Data Ingestions mit Kafka und Debezium

Wenn es darum geht, Daten aus einem CDC-System wie MySQL-Binlogs oder Postgres-WALs zu importieren, gibt es keine einfache Lösung ohne die Hilfe eines guten Frameworks.

Debezium ist ein verteiltes System , das aus mehreren Open-Source-Diensten besteht, die Änderungen auf Zeilenebene von gängigen CDC-Systemen erfassen und diese dann als Ereignisse streamen, die von anderen Systemen genutzt werden können. Eine Debezium-Installation besteht aus drei Hauptkomponenten:

  • Apache Zookeeper verwaltet die verteilte Umgebung und übernimmt die Konfiguration der einzelnen Dienste.

  • Apache Kafka ist eine verteilte Streaming-Plattform, die häufig zum Aufbau hochskalierbarer Datenpipelines verwendet wird.

  • Apache Kafka Connect ist ein Tool, um Kafka mit anderen Systemen zu verbinden, damit die Daten einfach über Kafka gestreamt werden können. Konnektoren werden für Systeme wie MySQL und Postgres gebaut und verwandeln Daten aus deren CDC-Systemen (binlogs und WAL) in Kakfa-Themen.

Kafka tauscht Nachrichten aus, die nach Themen geordnet sind. Ein System kann ein Thema veröffentlichen, während ein oder mehrere Systeme das Thema konsumieren oder abonnieren können.

Debezium verbindet diese Systeme miteinander und enthält Konnektoren für gängige CDC-Implementierungen. Die Herausforderungen für CDC habe ich zum Beispiel in "Daten aus einer MySQL-Datenbank extrahieren" und "Daten aus einer PostgreSQL-Datenbank extrahieren" beschrieben . Glücklicherweise gibt es bereits Konnektoren, die auf das MySQL binlog und das Postgres WAL "hören". Die Daten werden dann als Datensätze in einem Topic durch Kakfa geleitet und über einen anderen Konnektor in ein Ziel wie ein S3-Bucket, Snowflake oder Redshift Data Warehouse übertragen. Abbildung 4-1 zeigt ein Beispiel für die Verwendung von Debezium und seinen einzelnen Komponenten, um die von einem MySQL-Binlog erzeugten Ereignisse an ein Snowflake-Datawarehouse zu senden.

dppr 0401
Abbildung 4-1. Verwendung von Komponenten von Debezium für CDC von MySQL zu Snowflake.

Zum Zeitpunkt der Erstellung dieses Dokuments gibt es bereits eine Reihe von Debezium-Konnektoren für die Quellsysteme von , die du möglicherweise benötigen wirst:

  • MongoDB

  • MySQL

  • PostgreSQL

  • Microsoft SQL Server

  • Oracle

  • Db2

  • Cassandra

Außerdem gibt es Kafka Connect-Konnektoren für die gängigsten Data Warehouses und Speicherungen wie S3 und Snowflake.

Obwohl Debezium und Kafka selbst ein Thema sind, das ein eigenes Buch rechtfertigt, möchte ich auf den Wert von Debezium hinweisen, wenn du dich entscheidest, dass du CDC als Methode für die Dateneingabe verwenden möchtest. Das einfache Beispiel, das ich im Abschnitt über die MySQL-Extraktion in diesem Kapitel verwendet habe, ist funktional. Wenn du CDC jedoch in großem Maßstab einsetzen willst, empfehle ich dir dringend, etwas wie Debezium zu verwenden, anstatt selbst eine Plattform wie Debezium aufzubauen!

Tipp

Die Debezium-Dokumentation ist hervorragend und ein guter Ausgangspunkt, um das System kennenzulernen.

Get Data Pipelines Pocket Reference now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.