Kapitel 4. Löschen, Aktualisieren und Zusammenführen von Tabellen
Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com
Da Delta Lake den klassischen Data Lakes eine transaktionale Ebene hinzufügt, können wir klassische DML-Operationen wie Aktualisierungen, Löschungen und Zusammenführungen durchführen. Wenn du eine DELETE
Operation an einer Delta-Tabelle durchführst, wird die Operation auf der Ebene der Datendateien durchgeführt, wobei nach Bedarf Datendateien entfernt und hinzugefügt werden. Entfernte Datendateien sind nicht mehr Teil der aktuellen Version der Deltatabelle, sollten aber nicht sofort physisch gelöscht werden, da du eventuell zu einer älteren Version der Tabelle mit Zeitreise zurückkehren möchtest (Zeitreise wird in Kapitel 6 behandelt). Dasselbe gilt, wenn du eine UPDATE
Operation durchführst. Datendateien werden je nach Bedarf zu deiner Deltatabelle hinzugefügt oder aus ihr entfernt.
Die mächtigste DML-Operation in Delta Lake ist die Operation MERGE
, mit der du eine "Upsert"-Operation, also eine Mischung aus UPDATE
, DELETE
und INSERT
, in deiner Delta-Tabelle durchführen kannst. Du verbindest eine Quell- und eine Zieltabelle, schreibst eine Abgleichsbedingung und gibst dann an, was mit den Datensätzen geschehen soll, die entweder übereinstimmen oder nicht übereinstimmen.
Löschen von Daten aus einer Deltatabelle
Wir beginnen mit einer sauberen taxidb.YellowTaxis
Tabelle. Diese Tabelle wird durch das Skript "Kapitelinitialisierung" für Kapitel 4 erstellt.1 Sie hat 9.999.995 Millionen Zeilen:
%sql SELECT COUNT(id) FROM taxidb.YellowTaxis
Ausgabe:
+----------+ | count(1) | +----------+ | 9999995 | +----------+
Tabellenerstellung und HISTORIE BESCHREIBEN
Die Tabelle taxidb.YellowTaxis
Delta wurde im Skript "Kapitelinitialisierung" erstellt und in unseren Ordner /chapter04 kopiert. Schauen wir uns DESCRIBE HISTORY
für die Tabelle an:2
%sql DESCRIBE HISTORY taxidb.YellowTaxis
Ausgabe (nur relevante Teile werden angezeigt):
+-----------+--------------------------------+---------------------+ | operation | operationParameters | operationMetrics | +-----------+--------------------------------+---------------------+ | WRITE | [('mode', 'Overwrite'), (...)] | [('numFiles', '2'), | | | | ('numOutputRows', | | | | '9999995'), ...] | +-----------+--------------------------------+---------------------+
Wir sehen, dass wir eine Transaktion mit einer WRITE
Operation haben, die zwei Datendateien mit insgesamt 9.999.995 Zeilen schreibt. Lass uns einige Details über diese beiden Dateien herausfinden.
In Kapitel 2 hast du gelernt, wie du das Transaktionsprotokoll nutzen kannst, um die Aktionen zum Hinzufügen und Entfernen von Dateien zu sehen. Werfen wir einen Blick auf das Verzeichnis _delta_log:
%sh ls /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/_delta_log/*.json
Wie erwartet, sehen wir nur einen Eintrag im Transaktionsprotokoll:
/dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/_delta_log/….0000.json
Dieser Protokolleintrag sollte zwei Aktionen zum Hinzufügen von Dateien enthalten, da der Eintrag numfiles
in DESCRIBE HISTORY
zwei Aktionen enthielt. Benutzen wir wieder unseren grep
Befehl, um diese Abschnitte zu finden:
%sh grep \"add\" /dbfs/…/chapter04/YellowTaxisDelta/_delta_log/..0000.json | sed -n 1p > /tmp/commit.json python -m json.tool < /tmp/commit.json
Eine Abwandlung des vorherigen Befehls ist, dass wir, da du jetzt zwei Einträge hast, den Befehl sed
verwenden müssen, um den richtigen Add-Eintrag zu extrahieren.
Tipp
Du kannst die Ausgabe des Befehls grep
über die Pipeline an den Befehl sed
3 Befehl weiterleiten. sed
ist ein Stream-Editor, der grundlegende Textumwandlungen an einem Eingabestrom vornimmt und das Ergebnis in einen Ausgabestrom schreibt. Die Option -n
unterdrückt die normale Ausgabe, und der Befehl 1p
gibt nur die erste Zeile der Eingabe aus. Um den nächsten Add-Eintrag zu finden, kannst du einfach sed -n 2p
verwenden, der die zweite Zeile ausgibt.
Produzierter Output (nur relevante Teile werden angezeigt):
{ "add": { "path": "part-00000-...-c000.snappy.parquet", … "stats": "{\"numRecords\":5530100,...}}", "tags": { … } }
Hier sehen wir den Namen der ersten Datendatei, die für unsere Tabelle erstellt wurde, und die Anzahl der Datensätze in dieser Datei. Wir können den gleichen Befehl mit sed -n 2p
verwenden, um die zweite Add-Aktion zu erhalten, um die zweite Datendatei zu erhalten:
{ "add": { "path": "part-00001-...-c000.snappy.parquet", "...: "” stats”: {\"numRecords\":4469895,...}}", "tags": { … } } }
Jetzt wissen wir, dass unsere Tabelle die folgenden Datendateien enthält:
Parkett Dateiname | Anzahl der Datensätze |
---|---|
part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet | 5,530,100 |
part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet | 4,469,895 |
Diese Dateien entsprechen unserer Verzeichnisliste, so dass das Transaktionsprotokoll und der Verzeichnislistenbericht konsistent sind:
%sh ls -al /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta drwxrwxrwx 2 _delta_log -rwxrwxrwx 1 part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet -rwxrwxrwx 1 part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet
Ausführen der DELETE-Operation
Nehmen wir an, dass wir einen einzelnen Datensatz von löschen wollen, in diesem Fall den Datensatz mit RideId = 100000
. Zuerst sollten wir mit einem SQL SELECT sicherstellen, dass der Datensatz tatsächlich noch in der Tabelle ist:4
%sql -- First, show that you have data for RideId = 10000 SELECT RideId, VendorId, CabNumber, TotalAmount FROM taxidb.YellowTaxis WHERE RideId = 100000
Ausgabe:
+--------+----------+-----------+-------------+ | RideId | VendorId | CabNumber | TotalAmount | +--------+----------+-----------+-------------+ | 100000 | 2 | T478827C | 7.56 | +--------+----------+-----------+-------------+
Um diese Zeile zu löschen, können wir eine einfache SQL DELETE
verwenden. Wir können den Befehl DELETE
verwenden, um Zeilen auf der Grundlage eines Prädikats oder einer Filterbedingung selektiv zu löschen:
%sql DELETE FROM taxidb.YellowTaxis WHERE RideId = 100000
Ausgabe:
+------------------+ |num_affected_rows | +------------------+ | 1 | +------------------+
Wir können bestätigen, dass wir tatsächlich eine Zeile gelöscht haben. Wenn wir mit dem Befehl DESCRIBE HISTORY
verwenden, um die verschiedenen Operationen in der Tabelle zu betrachten, erhalten wir für Version 1 folgendes Ergebnis (die Ausgabe der Zeile ist zur besseren Lesbarkeit gedreht):
version: 1 timestamp: 2022-12-14T17:50:23.000+0000 operation: DELETE operationParameters: [('predicate', '["(spark_catalog.taxidb.YellowTaxis.RideId = 100000)"]')] operationMetrics: [('numRemovedFiles', '1'), ('numCopiedRows', '5530099'), ('numAddedChangeFiles', '0'), ('executionTimeMs', '32534'), ('numDeletedRows', '1'), ('scanTimeMs', '1524'), ('numAddedFiles', '1'), ('rewriteTimeMs', '31009')]
Wir können sehen, dass die Operation DELETE
war und das Prädikat, das wir für die Löschung verwendet haben, WHERE RideId = 100000
war. Delta Lake hat eine Datei entfernt (numRemovedFiles
= 1
) und eine neue Datei hinzugefügt (numAddedFiles =
1
). Wenn wir unseren vertrauten grep
Befehl verwenden, um die Details herauszufinden, sieht es folgendermaßen aus:
Aktion | Dateiname | # Anzahl der Datensätze |
---|---|---|
hinzufügen | part-00000-96c2f047-99cc-4a43-b2ea-0d3e0e77c4c1-c000.snappy.parquet | 5,530,099 |
entfernen | part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet | 4,469,895 |
Abbildung 4-1 veranschaulicht die Aktionen von Delta Lake, als wir den Datensatz gelöscht haben.
Delta Lake führt im Rahmen der Operation DELETE
die folgenden Aktionen durch:
Delta Lake hat den ersten Scan der Daten durchgeführt, um alle Dateien zu identifizieren, die Zeilen enthalten, die der Prädikatsbedingung entsprechen. In diesem Fall handelt es sich um die Datei
e229aa1d5616
; sie enthält den Datensatz mitRideId = 100000
.In einem zweiten Scan liest Delta Lake die passenden Datendateien in den Speicher. Dann löscht Delta Lake die fraglichen Zeilen und schreibt eine neue, saubere Datendatei in die Speicherung. Diese neue Datendatei ist die
0d3e0e77c4c1
Datendatei. Da Delta Lake einen Datensatz gelöscht hat, enthält diese neue Datendatei 5.530.099 Datensätze (5.530.100 - 1).Wenn Delta Lake den Vorgang
DELETE
abschließt, wird die Datendateie229aa1d5616
nun aus dem Delta-Transaktionsprotokoll entfernt, da sie nicht mehr Teil der Delta-Tabelle ist. Dieser Vorgang wird "Tombstoning" genannt. Es ist jedoch wichtig zu wissen, dass diese alte Datendatei nicht gelöscht wird, da du sie vielleicht noch brauchst, um eine Zeitreise zu einer früheren Version der Tabelle zu machen. Du kannst den BefehlVACUUM
verwenden, um Dateien zu löschen, die älter als ein bestimmter Zeitraum sind. Zeitreisen und der BefehlVACUUM
werden in Kapitel 6 ausführlich behandelt.Die Datendatei
fedaa0f6623d
bleibt Teil der Delta-Tabelle, da keine Änderungen daran vorgenommen wurden.
Wir können die eine Datendatei (0d3e0e77c4c1
), die dem Verzeichnis hinzugefügt wurde, in unserer Verzeichnisliste sehen:
%sh ls -al /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/ drwxrwxrwx _delta_log -rwxrwxrwx part-00000-96c2f047-99cc-4a43-b2ea-0d3e0e77c4c1-c000.snappy.parquet -rwxrwxrwx part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet -rwxrwxrwx part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet
Die Datendatei e229aa1d5616
wurde nicht physisch gelöscht.
Die wichtigste Botschaft, die du daraus ziehen kannst, ist, dass die Löschtransaktion auf der Ebene der Datendatei stattfindet. Delta Lake erstellt bei Bedarf neue Partitionen und fügt neue Aktionen zum Hinzufügen undEntfernen von Dateien in das Transaktionsprotokoll ein. In Kapitel 6 zur Leistungsoptimierung werden der Befehl VACUUM
und andere Möglichkeiten zum Aufräumen von nicht mehr benötigten Datendateien in Tombstones behandelt.
DELETE Leistungstuning-Tipps
Die wichtigste Möglichkeit, die Leistung einer DELETE
Operation in Delta Lake zu verbessern, besteht darin, weitere Prädikate hinzuzufügen, um das Suchspektrum einzugrenzen. Wenn du zum Beispiel partitionierte Daten hast und die Partition kennst, zu der die zu löschenden Datensätze gehören, kannst du die Partitionsklausel zum DELETE
Prädikat hinzufügen.
Delta Lake bietet außerdem eine Reihe weiterer Optimierungsbedingungen, wie das Überspringen von Daten und die Z-Order-Optimierung. Bei der Z-Reihenfolge wird das Layout jeder Datendatei so umorganisiert, dass ähnliche Spaltenwerte strategisch nahe beieinander liegen, um maximale Effizienz zu erreichen. Weitere Einzelheiten findest du in Kapitel 5.
Daten in einer Tabelle aktualisieren
Nachdem du nun gesehen hast, wie sich eine DELETE
Operation auf die Tabelle YellowTaxis
auswirkt, werfen wir einen kurzen Blick auf eine UPDATE
Operation. Mit der Operation UPDATE
kannst du selektiv alle Zeilen aktualisieren, die einer Filterbedingung entsprechen, die auch als Prädikat bezeichnet wird.
Anwendungsfall Beschreibung
Nehmen wir an, es gab einen Fehler mit dem DropLocationId
für den Datensatz, in dem RideId =
9999994
. Stellen wir zunächst sicher, dass dieser Datensatz in unserer Tabelle vorhanden ist, indem wir die folgende SELECT
verwenden:
SELECT INPUT_FILE_NAME(), RideId, VendorId, DropLocationId FROM taxidb.YellowTaxis WHERE RideId = 9999994
Die Spark SQL-Funktion INPUT_FILE_NAME()
ist eine praktische Funktion, die uns den Dateinamen liefert, in dem sich der Datensatz befindet:
+---------------------------+---------+----------+----------------+ | input_file_name() | RideId | VendorId | DropLocationId | +---------------------------+---------+----------+----------------+ | .../part-00001-...parquet | 9999994 | 1 | 243 | +---------------------------+---------+----------+----------------+
Die Funktion INPUT_FILE_NAME
zeigt, dass sich unser Datensatz in der Datei fedaa0f6623d
befindet. Das macht Sinn, da er einer der letzten Datensätze ist, also logischerweise in der zuletzt erstellten Datei liegt. Wir können sehen, dass die bestehende DropLocationId
derzeit 243 ist. Nehmen wir an, dass wir dieses Feld auf den Wert 250 aktualisieren wollen. Als Nächstes sehen wir uns die eigentliche DELETE
Operation an.
Daten in einer Tabelle aktualisieren
Wir können nun die UPDATE
SQL-Anweisung wie folgt schreiben:
%sql UPDATE taxidb.YellowTaxis SET DropLocationId = 250 WHERE RideId = 9999994
Wir sehen, dass wir eine einzelne Zeile aktualisiert haben:
+-------------------+ | num_affected_rows | +-------------------+ | 1 | +-------------------+
Überprüfen wir zunächst, ob wir die Tabelle erfolgreich aktualisiert haben:
%sql SELECT RideId, DropLocationId FROM taxidb.YellowTaxis WHERE RideId = 9999994 +---------+----------------+ | RideId | DropLocationId | +---------+----------------+ | 9999994 | 250 | +---------+----------------+
Die Ausgabe zeigt, dass der Datensatz erfolgreich aktualisiert wurde. Wenn wir den Befehl DESCRIBE HISTORY
auf die Tabelle anwenden, sehen wir die Operation UPDATE
für die Version 3 der Tabelle (die Ausgabe ist der Übersichtlichkeit halber geschwenkt):
version: 3 timestamp: 2022-12-23 17:20:45+00:00 operation: UPDATE operationParameters: [('predicate', '(RideId = 9999994)')] operationMetrics: [('numRemovedFiles', '1'), ('numCopiedRows', '4469894'), ('numAddedChangeFiles', '0'), ('executionTimeMs', '25426'), ('scanTimeMs', '129'), ('numAddedFiles', '1'), ('numUpdatedRows', '1'), ('rewriteTimeMs', '25293')]
Eine Datei wurde entfernt ('numRemovedFiles', '1'
), und eine wurde hinzugefügt ('numAddedFiles', '1'
). Wir können auch unser UPDATE
Prädikat [('predicate', '(RideId = 9999994)')]
sehen. Wenn wir den Befehl grep
verwenden, um die Details herauszufinden, sieht es folgendermaßen aus:
Aktion | Dateiname | # Anzahl der Datensätze |
---|---|---|
hinzufügen | part-00000-da1ef656-46e-4de5-a189-50807db851f6-c000.snappy.parquet | 4,469,895 |
entfernen | part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet | 4,469,895 |
Abbildung 4-2 zeigt die Aktionen, die Delta Lake durchgeführt hat, als wir den Datensatz gelöscht haben.
Delta Lake führt eine UPDATE
auf einem Tisch in zwei Schritten aus:
Es findet und wählt die Datendateien aus, die Daten enthalten, die dem Prädikat entsprechen und daher aktualisiert werden müssen. Um diesen Prozess zu beschleunigen, überspringt Delta Lake nach Möglichkeit Daten. In diesem Fall ist das die
fedaa0f6623d
Datendatei. Wir können das auch mit der SQL-FunktionINPUT_FILE_NAME()
überprüfen.Als Nächstes liest Delta Lake jede übereinstimmende Datei in den Speicher, aktualisiert die relevanten Zeilen und schreibt das Ergebnis in eine neue Datendatei. Die neue Datendatei ist in diesem Fall die Datei
50807db851f6
. Sie enthält nun alle Datensätze der Partitionfedaa0f6623d
, aber mit den angewandten Aktualisierungen, in diesem Fall die Aktualisierung fürRideId = 9999994
. Diese Datendatei ist50807db851f6
. Diese Datendatei enthält weiterhin 4.469.895 Datensätze. Sobald Delta Lake dieUPDATE
erfolgreich ausgeführt hat, fügt es eine Aktion zum Hinzufügen von Dateien für die neue Datendatei hinzu.
Da sie nicht mehr benötigt wird, wird die Datendatei fedaa0f6623d
mit einer Remove-Commit-Aktion im Transaktionsprotokoll aus der Delta-Tabelle entfernt. Wie bei der Operation DELETE
wird die Datei jedoch nicht physisch gelöscht, für den Fall, dass wir uns eine alte Version der Tabelle mit Zeitreise ansehen wollen.
Die Datendatei 0d3e0e77c4c1
war von unserer Aktualisierung nicht betroffen, sie bleibt also Teil der Delta-Tabelle und enthält weiterhin 5.530.099 Datensätze.
UPDATE Performance Tuning Tipps
Wie bei DELETE
besteht auch bei die wichtigste Möglichkeit, die Leistung eines UPDATE
Befehls in Delta Lake zu verbessern, darin, mehr Prädikate hinzuzufügen, um den Suchbereich einzugrenzen. Je spezifischer die Suche ist, desto weniger Dateien muss Delta Lake scannen und/oder ändern.
Wie im vorherigen Abschnitt erwähnt, können andere Delta Lake-Funktionen, wie z. B. die Z-Ordnung, verwendet werden, um die UPDATE
Operationen weiter zu beschleunigen. In Kapitel 5 findest du weitere Informationen zur Delta Lake-Optimierung.
Daten mit der MERGE-Operation hochladen
Mit dem Delta Lake MERGE
Befehl kannst du Upserts an deinen Daten durchführen. Ein Upsert ist eine Mischung aus einem UPDATE
und einem INSERT
Befehl. Um Upserts zu verstehen, nehmen wir an, dass wir eine bestehende Tabelle (die Zieltabelle ) und eine Quelltabelle haben, die eine Mischung aus neuen Datensätzen und Aktualisierungen bestehender Datensätze enthält. So funktioniert ein Upsert tatsächlich:
Wenn ein Datensatz aus der Quelltabelle mit einem bereits existierenden Datensatz in der Zieltabelle übereinstimmt, aktualisiert Delta Lake den Datensatz.
Wenn es keine solche Übereinstimmung gibt, fügt Delta Lake den neuen Datensatz ein.
Anwendungsfall Beschreibung
Wenden wir eine MERGE
Operation auf unsere YellowTaxis
Tabelle an. Führen wir eine Zählung unserer YellowTaxis
Tabelle durch:
%sql SELECT COUNT(*) FROM taxidb.YellowTaxis
Wir sehen, dass wir 9.999.994 Datensätze haben.
+----------+ | count(1) | +----------+ | 9999994 | +----------+
Wir wollen den Datensatz mit RideId = 100000
wieder einfügen, den wir im Abschnitt DELETE
dieses Kapitels gelöscht haben. In unseren Quelldaten brauchen wir also einen Datensatz mit einem RideId
, der auf 100000
gesetzt ist.
Für dieses Beispiel nehmen wir an, dass wir auch die Datensätze mit RideId
= 999991
aktualisieren wollen, weil VendorId
falsch eingefügt wurde und für diese fünf Datensätze auf 1 (VendorId = 1
) aktualisiert werden muss. Schließlich wollen wir die Anzahl der Datensätze auf glatte 10.000.000 Datensätze bringen, also haben wir 5 weitere Datensätze mit RideId
s von 999996
bis 10000000
.
Der MERGE-Datensatz
In unseren begleitenden Quelldatendateien für das Buch haben wir eine Datei namens YellowTaxisMergeData.csv, die diese Datensätze enthält. Da wir ein Schema angeben müssen, lesen wir zunächst das Schema aus unserer bestehenden Tabelle:
df = spark.read.format("delta").table("taxidb.YellowTaxis") yellowTaxiSchema = df.schema print(yellowTaxiSchema)
Sobald wir das Schema geladen haben, können wir unsere CSV-Datei mit den Seriendaten laden:
yellowTaxisMergeDataFrame = spark \ .read \ .option("header", "true") \ .schema(yellowTaxiSchema) \ .csv("/mnt/datalake/book/chapter04/YellowTaxisMergeData.csv") .sort(col("RideId")) yellowTaxisMergeDataFrame.show()
Hier ist ein Teil der Ausgabe zu sehen:
+----------+----------+------------------------------+ | RideId | VendorId | PickupTime | +----------+----------+------------------------------+ | 100000 | 2 | 2022-03-01T00:00:00.000+0000 | | 9999991 | 1 | 2022-04-04T20:54:04.000+0000 | | 9999992 | 1 | 2022-04-04T20:54:04.000+0000 | | 9999993 | 1 | 2022-04-04T20:54:04.000+0000 | | 9999994 | 1 | 2022-04-04T20:54:04.000+0000 | | 9999995 | 1 | 2022-04-04T20:54:04.000+0000 | | 9999996 | 3 | 2022-03-01T00:00:00.000+0000 | | 9999997 | 3 | 2022-03-01T00:00:00.000+0000 | | 9999998 | 3 | 2022-03-01T00:00:00.000+0000 | | 9999999 | 3 | 2022-03-01T00:00:00.000+0000 | | 10000000 | 3 | 2022-03-01T00:00:00.000+0000 | +----------+----------+------------------------------+
Wir sehen unseren Datensatz mit RideId = 100000
, die fünf Datensätze (9999991
bis 9999995
) mit ihren neuen VendorId
von 1
und die fünf neuen Datensätze, beginnend mit 9999996
.
Wir wollen unsere MERGE
Anweisung in SQL schreiben, also müssen wir unseren Datenrahmen in SQL zur Verfügung haben. Die Klasse DataFrame
hat eine praktische Methode namens createOrReplaceTempView
, die genau das tut:
# Create a Temporary View on top of our DataFrame, making it # accessible to the SQL MERGE statement below yellowTaxisMergeDataFrame.createOrReplaceTempView("YellowTaxiMergeData")
Wir können jetzt einfach den Namen der Ansicht in SQL verwenden:
%sql SELECT * FROM YellowTaxiMergeData
Dies zeigt genau die gleiche Ausgabe wie mit der Methode display()
des Datenrahmens.
Die MERGE-Anweisung
Du kannst nun deine MERGE
Anweisung wie folgt schreiben:
%sql MERGE INTO taxidb.YellowTaxis AS target USING YellowTaxiMergeData AS source ON target.RideId = source.RideId -- You need to update the VendorId if the records -- matched WHEN MATCHED THEN -- If you want to update all columns, -- you can say "SET *" UPDATE SET target.VendorId = source.VendorId WHEN NOT MATCHED THEN -- If all columns match, you can also do a "INSERT *" INSERT(RideId, VendorId, PickupTime, DropTime, PickupLocationId, DropLocationId, CabNumber, DriverLicenseNumber, PassengerCount, TripDistance, RateCodeId, PaymentType, TotalAmount, FareAmount, Extra, MtaTax, TipAmount, TollsAmount, ImprovementSurCharge) VALUES(RideId, VendorId, PickupTime, DropTime, PickupLocationId, DropLocationId, CabNumber, DriverLicenseNumber, PassengerCount, TripDistance, RateCodeId, PaymentType, TotalAmount, FareAmount, Extra, MtaTax, TipAmount, TollsAmount, ImprovementSurCharge)
Lass uns diese Aussage analysieren:
Wir werden die Tabelle
YellowTaxis
Delta aufMERGE INTO
aufrufen. Beachte, dass wir der Tabelle den Alias "Quelle" geben.Mit der
USING
Klausel geben wir den Quelldatensatz an, der in diesem Fall die ViewYellowTaxiMergeData
ist, und geben ihr den Alias source.Definiere die Join-Bedingung zwischen dem Quell- und dem Zieldatensatz. In unserem Fall wollen wir einfach die
VendorId
verknüpfen. Wenn du partitionierte Daten hast und eine Partition anvisieren willst, kannst du diese Bedingung hier mit einerAND
Anweisung hinzufügen.Gib die Aktion an, wenn
RideId
zwischensource
undtarget
liegt. In diesem Anwendungsfall wollen wir die Quelle mitVendorId
aktualisieren, das auf 1 gesetzt ist. Hier aktualisieren wir nur eine Spalte, aber bei Bedarf können wir eine durch Kommas getrennte Spaltenliste angeben. Wenn wir alle Spalten aktualisieren wollen, sagen wir einfachUPDATE SET *
.Definiere die Aktion, wenn der Datensatz in der Quelle existiert, aber nicht im Ziel. Wir haben keine zusätzliche Bedingung mit der
WHEN NOT MATCHED
, aber du kannst zusätzliche Klauseln hinzufügen, wenn der Anwendungsfall es erfordert. In den meisten Fällen wirst du eineINSERT
Anweisung als Aktion angeben. Da unsere Quell- und Zielspaltennamen identisch sind, hätten wir auch eine einfacheINSERT
*
verwenden können.
Wenn wir diese MERGE
Anweisung ausführen, erhalten wir die folgende Ausgabe:
+-------------------+------------------+------------------+-------------------+ | num_affected_rows | num_updated_rows | num_deleted_rows | num_inserted_rows | +-------------------+------------------+------------------+-------------------+ | 11 | 5 | 0 | 6 | +-------------------+------------------+------------------+-------------------+
Diese Ausgabe ist genau das, was du erwartet hast:
Wir aktualisieren fünf Zeilen (
VendorId
s9999991
bis9999995
)Wir fügen sechs Zeilen ein:
Eine Reihe mit einer
RideId
von100000
Die fünf Zeilen am Ende (
9999996
bis10000000
)
Wir können die Aktualisierungen in den ersten fünf Zeilen sehen:
%sql -- Make sure that the VendorId has been updated -- for the records with RideId between -- 9999991 and 9999995 SELECT RideId, VendorId FROM taxidb.YellowTaxis WHERE RideId BETWEEN 9999991 and 9999995
+---------+----------+ | RideId | VendorId | +---------+----------+ | 9999991 | 1 | | 9999992 | 1 | | 9999993 | 1 | | 9999994 | 1 | | 9999995 | 1 | +---------+----------+
Alle Zeilen haben jetzt die Quelle VendorId
von 1.
Wir können den eingefügten Datensatz mit RideId
= 100000
sehen:
%sql --Make sure that you have a record with VendorId = 100000 SELECT * FROM taxidb.YellowTaxis WHERE RideId = 100000
Ausgabe (Teilausgabe gezeigt):
+--------+----------+---------------------------+---------------------------+ | RideId | VendorId | PickupTime | DropTime | +--------+----------+---------------------------+---------------------------+ | 100000 | 2 | 2022-03-01 00:00:00+00:00 | 2022-03-01 00:12:01+00:00 | +--------+----------+---------------------------+---------------------------+
Und schließlich können wir die neuen Zeilen mit RideId >
9999995
sehen:
%sql SELECT * FROM taxidb.YellowTaxis WHERE RideId > 9999995 +----------+----------+---------------------------+ | RideId | VendorId | PickupTime | +----------+----------+---------------------------+ | 9999996 | 3 | 2022-03-01 00:00:00+00:00 | | 9999997 | 3 | 2022-03-01 00:00:00+00:00 | | 9999998 | 3 | 2022-03-01 00:00:00+00:00 | | 9999999 | 3 | 2022-03-01 00:00:00+00:00 | | 10000000 | 3 | 2022-03-01 00:00:00+00:00 | +----------+----------+---------------------------+
Und eine Gesamtzahl von 10 Millionen Datensätzen:
%sql SELECT COUNT(RideId) FROM taxidb.YellowTaxis +----------+ | count(1) | +----------+ | 10000000 | +----------+
Ändern nicht übereinstimmender Zeilen mit MERGE
Eine wichtige Ergänzung zu der Delta Lake MERGE
Operation ist die kürzlich veröffentlichte WHEN NOT MATCHED BY SOURCE
Klausel. Diese Klausel kann verwendet werden, um UPDATE
oder DELETE
Datensätze in der Zieltabelle zu löschen, die keine entsprechenden Datensätze in der Quelltabelle haben. Dies kann eine nützliche Operation sein, um Datensätze in der Zieltabelle zu löschen, die in der Quelltabelle nicht mehr vorhanden sind, oder um Datensätze zu markieren, die darauf hinweisen, dass die Daten als gelöscht oder inaktiv betrachtet werden sollten, während die Datensätze in der Zieltabelle erhalten bleiben (d. h. Soft Delete).
Hinweis
WHEN NOT MATCHED BY SOURCE
Klauseln werden von den Scala-, Python- und Java-Delta-Lake-APIs in Delta 2.3 und höher unterstützt. SQL wird in Delta 2.4 und höher unterstützt.
Um Datensätze zu löschen, die in den Quelltabellen, aber nicht in der Zieltabelle existieren (d.h. hartes Löschen), verwendest du die WHEN NOT MATCHED BY SOURCE
Klausel, wie im folgenden Codebeispiel zu sehen ist:
Hinweis
Der Code WHEN NOT MATCHED BY SOURCE
dient nur zur Veranschaulichung und sollte nicht zusammen mit den früheren Codebeispielen ausgeführt werden. Bitte beachte, dass die übrigen Codeausgaben in diesem Kapitel nicht mit den Beispielen und erwarteten Ausgaben in diesem Kapitel übereinstimmen, wenn du die WHEN NOT MATCHED BY SOURCE
Codebeispiele ausführst.
%sql MERGE INTO taxidb.YellowTaxis AS target USING YellowTaxiMergeData AS source ON target.RideId = source.RideId WHEN MATCHED UPDATE SET * WHEN NOT MATCHED INSERT * -- DELETE records in the target that are not matched by the source WHEN NOT MATCHED BY SOURCE DELETE
Wenn du Datensätze in der Zieltabelle markieren möchtest, die in der Quelltabelle nicht mehr existieren (d.h. Soft Delete) und eine bestimmte Bedingung erfüllen, kannst du eine MERGE
Bedingung und eine UPDATE
angeben:
%sql MERGE INTO taxidb.YellowTaxis AS target USING YellowTaxiMergeData AS source ON target.RideId = source.RideId WHEN MATCHED UPDATE SET * WHEN NOT MATCHED INSERT * -- Set target.status = 'inactive' when records in the target table -- don’t exist in the source table and condition is met WHEN NOT MATCHED BY SOURCE target.PickupTime >= (current_date() - INTERVAL '5' DAY) THEN UPDATE SET target.status = 'inactive'
Es ist die bewährte Methode, eine optionale MERGE
Bedingung hinzuzufügen, wenn du die WHEN NOT MATCHED BY SOURCE
Klausel zu UPDATE
oder DELETE
Zielzeilen hinzufügst. Denn wenn keine MERGE
Bedingung angegeben ist, kann dies dazu führen, dass eine große Anzahl von Zielzeilen geändert wird. Die beste Leistung erzielst du, wenn du die WHEN NOT MATCHED BY SOURCE
Klausel mit einer MERGE
Bedingung versiehst (z. B. target.PickupTime >= (current_date() - INTERVAL '5' DAY
im vorherigen Codebeispiel), um die Anzahl der aktualisierten oder gelöschten Zielzeilen zu begrenzen, denn dann wird eine Zielzeile nur geändert, wenn die Bedingung für diese Zeile wahr ist.
Du kannst auch mehrere WHEN NOT MATCHED BY SOURCE
Klauseln zu einem MERGE
Vorgang hinzufügen. Wenn es mehrere Klauseln gibt, werden sie in der Reihenfolge ausgewertet, in der sie angegeben sind, und alle WHEN NOT MATCHED BY SOURCE
Klauseln, außer der letzten, müssen Bedingungen haben.
Analyse des MERGE-Vorgangs mit DESCRIBE HISTORY
Wenn wir DESCRIBE HISTORY
auf die Tabelle YellowTaxis
im operationsParameters
Abschnitt der Ausgabe ausführen, können wir unser MERGE
Prädikat sehen:
operation: MERGE [('predicate', '(target.RideId = source.RideId)'), ('matchedPredicates', '[{"actionType":"update"}]'), ('notMatchedPredicates', '[{"actionType":"insert"}]')]
Wir sehen die Join-Bedingung (target.RideId = source.RideId
), die matchedPredicate
das eine Aktualisierung angibt, und das notMatchedPredicate
, das eine Einfügung angibt.
Die operationMetrics
Ausgabeabschnitte zeigen die Details der verschiedenen Aktionen:
[('numTargetRowsCopied', '4469890'), ('numTargetRowsDeleted', '0'), ('numTargetFilesAdded', '4'), ('executionTimeMs', '91902'), ('numTargetRowsInserted', '6'), ('scanTimeMs', '8452'), ('numTargetRowsUpdated', '5'), ('numOutputRows', '4469901'), ('numTargetChangeFilesAdded', '0'), ('numSourceRows', '11'), ('numTargetFilesRemoved', '1'), ('rewriteTimeMs', '16782')]
Auch hier sehen wir, dass sechs Zeilen eingefügt wurden (numTargetRowsInserted
) und fünf Zeilen aktualisiert wurden (numTargetRowsUpdated
). Vier neue Datendateien wurden zu unserer Delta-Tabelle hinzugefügt und eine Datendatei wurde entfernt.
Das Innenleben der MERGE-Operation
Intern führt Delta Lake eine MERGE
Operation wie diese in zwei Schritten durch:
Er führt zunächst einen
inner join
zwischen der Zieltabelle und der Quelltabelle durch, um alle Datendateien mit Übereinstimmungen auszuwählen. Dadurch wird verhindert, dass der Vorgang unnötig Daten mischt, die sicher ignoriert werden können.Als Nächstes führt er eine
outer join
zwischen den ausgewählten Dateien in den Ziel- und Quelltabellen durch und wendet die entsprechendeINSERT
,DELETE
oderUPDATE
Klausel an, wie vom Benutzer angegeben.
Der Hauptunterschied zwischen MERGE
und UPDATE
bzw. DELETE
besteht darin, dass Delta Lake Joins verwendet, um ein MERGE
zu vervollständigen. Dadurch kann einige einzigartige Strategien anwenden, um die Leistung zu verbessern.
Fazit
DML-Operationen wie DELETE
, UPDATE
und MERGE
sind wichtige Operationen für alle Tabellenformate und ETL-Operationen, die alle über das Transaktionsprotokoll aktiviert werden. Wenn du diese Operationen nutzt, kannst du Datenänderungen effizient verarbeiten und die Datenintegrität in deiner Datenplattform wahren.
Ähnlich wie bei Tabellen in einem herkömmlichen RDBMS hast du in diesem Kapitel gelesen, dass du mit Delta-Tabellen DELETE
, UPDATE
und MERGE
Operationen durchführen kannst, aber du kannst diese Operationen auch mit SQL oder der DataFrame API anwenden. Außerdem hast du erfahren, was unter der Haube von Delta Lake mit den zugrundeliegenden Dateien im Delta-Tabellenverzeichnis passiert und wie das Transaktionsprotokoll diese verschiedenen Arten von Einträgen aufzeichnet und verfolgt. Mit dem Befehl DESCRIBE HISTORY
können wir Details über die Ausgabe der Transaktionen einer Tabelle einsehen. Jeder dieser Vorgänge kann auch Prädikate verwenden, um die Anzahl der gescannten Dateien zu reduzieren und die Leistung zu verbessern. Neben der Verwendung von Prädikaten bei Operationen gibt es noch weitere Techniken zur Leistungsoptimierung von Delta-Tabellen, die du im folgenden Kapitel kennenlernen wirst.
Get Delta Lake: Auf und davon now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.