Kapitel 4. Löschen, Aktualisieren und Zusammenführen von Tabellen

Diese Arbeit wurde mithilfe von KI übersetzt. Wir freuen uns über dein Feedback und deine Kommentare: translation-feedback@oreilly.com

Da Delta Lake den klassischen Data Lakes eine transaktionale Ebene hinzufügt, können wir klassische DML-Operationen wie Aktualisierungen, Löschungen und Zusammenführungen durchführen. Wenn du eine DELETE Operation an einer Delta-Tabelle durchführst, wird die Operation auf der Ebene der Datendateien durchgeführt, wobei nach Bedarf Datendateien entfernt und hinzugefügt werden. Entfernte Datendateien sind nicht mehr Teil der aktuellen Version der Deltatabelle, sollten aber nicht sofort physisch gelöscht werden, da du eventuell zu einer älteren Version der Tabelle mit Zeitreise zurückkehren möchtest (Zeitreise wird in Kapitel 6 behandelt). Dasselbe gilt, wenn du eine UPDATE Operation durchführst. Datendateien werden je nach Bedarf zu deiner Deltatabelle hinzugefügt oder aus ihr entfernt.

Die mächtigste DML-Operation in Delta Lake ist die Operation MERGE, mit der du eine "Upsert"-Operation, also eine Mischung aus UPDATE, DELETE und INSERT, in deiner Delta-Tabelle durchführen kannst. Du verbindest eine Quell- und eine Zieltabelle, schreibst eine Abgleichsbedingung und gibst dann an, was mit den Datensätzen geschehen soll, die entweder übereinstimmen oder nicht übereinstimmen.

Löschen von Daten aus einer Deltatabelle

Wir beginnen mit einer sauberen taxidb.YellowTaxis Tabelle. Diese Tabelle wird durch das Skript "Kapitelinitialisierung" für Kapitel 4 erstellt.1 Sie hat 9.999.995 Millionen Zeilen:

%sql
SELECT  
    COUNT(id)
FROM    
    taxidb.YellowTaxis

Ausgabe:

+----------+
| count(1) |
+----------+
| 9999995  |
+----------+

Tabellenerstellung und HISTORIE BESCHREIBEN

Die Tabelle taxidb.YellowTaxis Delta wurde im Skript "Kapitelinitialisierung" erstellt und in unseren Ordner /chapter04 kopiert. Schauen wir uns DESCRIBE HISTORY für die Tabelle an:2

%sql
DESCRIBE HISTORY taxidb.YellowTaxis

Ausgabe (nur relevante Teile werden angezeigt):

+-----------+--------------------------------+---------------------+
| operation | operationParameters            | operationMetrics    |
+-----------+--------------------------------+---------------------+
| WRITE     | [('mode', 'Overwrite'), (...)] | [('numFiles', '2'), |
|           |                                |  ('numOutputRows',  | 
|           |                                |  '9999995'), ...]   |
+-----------+--------------------------------+---------------------+

Wir sehen, dass wir eine Transaktion mit einer WRITE Operation haben, die zwei Datendateien mit insgesamt 9.999.995 Zeilen schreibt. Lass uns einige Details über diese beiden Dateien herausfinden.

In Kapitel 2 hast du gelernt, wie du das Transaktionsprotokoll nutzen kannst, um die Aktionen zum Hinzufügen und Entfernen von Dateien zu sehen. Werfen wir einen Blick auf das Verzeichnis _delta_log:

%sh
ls /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/_delta_log/*.json

Wie erwartet, sehen wir nur einen Eintrag im Transaktionsprotokoll:

/dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/_delta_log/….0000.json

Dieser Protokolleintrag sollte zwei Aktionen zum Hinzufügen von Dateien enthalten, da der Eintrag numfiles in DESCRIBE HISTORY zwei Aktionen enthielt. Benutzen wir wieder unseren grep Befehl, um diese Abschnitte zu finden:

%sh
grep \"add\" /dbfs/…/chapter04/YellowTaxisDelta/_delta_log/..0000.json |
  sed -n 1p > /tmp/commit.json
python -m json.tool < /tmp/commit.json

Eine Abwandlung des vorherigen Befehls ist, dass wir, da du jetzt zwei Einträge hast, den Befehl sed verwenden müssen, um den richtigen Add-Eintrag zu extrahieren.

Tipp

Du kannst die Ausgabe des Befehls grep über die Pipeline an den Befehl sed3 Befehl weiterleiten. sed ist ein Stream-Editor, der grundlegende Textumwandlungen an einem Eingabestrom vornimmt und das Ergebnis in einen Ausgabestrom schreibt. Die Option -n unterdrückt die normale Ausgabe, und der Befehl 1p gibt nur die erste Zeile der Eingabe aus. Um den nächsten Add-Eintrag zu finden, kannst du einfach sed -n 2p verwenden, der die zweite Zeile ausgibt.

Produzierter Output (nur relevante Teile werden angezeigt):

{
    "add": {
        "path": "part-00000-...-c000.snappy.parquet",
        …
        "stats": "{\"numRecords\":5530100,...}}",
        "tags": {
            …
        }
  }

Hier sehen wir den Namen der ersten Datendatei, die für unsere Tabelle erstellt wurde, und die Anzahl der Datensätze in dieser Datei. Wir können den gleichen Befehl mit sed -n 2p verwenden, um die zweite Add-Aktion zu erhalten, um die zweite Datendatei zu erhalten:

{
    "add": {
        "path": "part-00001-...-c000.snappy.parquet",
        "...: "”
stats”: {\"numRecords\":4469895,...}}",
        "tags": {
            …
        }
    }
}

Jetzt wissen wir, dass unsere Tabelle die folgenden Datendateien enthält:

Tabelle 4-1. Erstellte Parkettdateien
Parkett Dateiname Anzahl der Datensätze
part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet 5,530,100
part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet 4,469,895

Diese Dateien entsprechen unserer Verzeichnisliste, so dass das Transaktionsprotokoll und der Verzeichnislistenbericht konsistent sind:

%sh
ls -al /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta

drwxrwxrwx 2 _delta_log
-rwxrwxrwx 1 part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet
-rwxrwxrwx 1 part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet

Ausführen der DELETE-Operation

Nehmen wir an, dass wir einen einzelnen Datensatz von löschen wollen, in diesem Fall den Datensatz mit RideId = 100000. Zuerst sollten wir mit einem SQL SELECT sicherstellen, dass der Datensatz tatsächlich noch in der Tabelle ist:4

%sql
-- First, show that you have data for RideId = 10000
SELECT  
    RideId,
    VendorId,
    CabNumber,
    TotalAmount
FROM    
    taxidb.YellowTaxis
WHERE  
    RideId = 100000

Ausgabe:

+--------+----------+-----------+-------------+
| RideId | VendorId | CabNumber | TotalAmount |
+--------+----------+-----------+-------------+
| 100000 | 2        | T478827C  | 7.56        |
+--------+----------+-----------+-------------+

Um diese Zeile zu löschen, können wir eine einfache SQL DELETE verwenden. Wir können den Befehl DELETE verwenden, um Zeilen auf der Grundlage eines Prädikats oder einer Filterbedingung selektiv zu löschen:

%sql
DELETE FROM
    taxidb.YellowTaxis
WHERE RideId = 100000

Ausgabe:

+------------------+
|num_affected_rows |
+------------------+
|  1               |
+------------------+

Wir können bestätigen, dass wir tatsächlich eine Zeile gelöscht haben. Wenn wir mit dem Befehl DESCRIBE HISTORY verwenden, um die verschiedenen Operationen in der Tabelle zu betrachten, erhalten wir für Version 1 folgendes Ergebnis (die Ausgabe der Zeile ist zur besseren Lesbarkeit gedreht):

version:             1
timestamp:           2022-12-14T17:50:23.000+0000
operation:           DELETE
operationParameters: [('predicate', 
                     '["(spark_catalog.taxidb.YellowTaxis.RideId = 100000)"]')]
operationMetrics:    [('numRemovedFiles', '1'),
                      ('numCopiedRows', '5530099'),
                      ('numAddedChangeFiles', '0'),
                      ('executionTimeMs', '32534'),
                      ('numDeletedRows', '1'),
                      ('scanTimeMs', '1524'),
                      ('numAddedFiles', '1'),
                      ('rewriteTimeMs', '31009')]

Wir können sehen, dass die Operation DELETE war und das Prädikat, das wir für die Löschung verwendet haben, WHERE RideId = 100000 war. Delta Lake hat eine Datei entfernt (numRemovedFiles = 1 ) und eine neue Datei hinzugefügt (numAdded​Files = 1 ). Wenn wir unseren vertrauten grep Befehl verwenden, um die Details herauszufinden, sieht es folgendermaßen aus:

Tabelle 4-2. Ergebnis der Operation DELETE
Aktion Dateiname # Anzahl der Datensätze
hinzufügen part-00000-96c2f047-99cc-4a43-b2ea-0d3e0e77c4c1-c000.snappy.parquet 5,530,099
entfernen part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet 4,469,895

Abbildung 4-1 veranschaulicht die Aktionen von Delta Lake, als wir den Datensatz gelöscht haben.

YellowTaxis Delta table before and after the DELETE operation
Abbildung 4-1. YellowTaxis Delta-Tabelle vor und nach der DELETE Operation

Delta Lake führt im Rahmen der Operation DELETE die folgenden Aktionen durch:

  1. Delta Lake hat den ersten Scan der Daten durchgeführt, um alle Dateien zu identifizieren, die Zeilen enthalten, die der Prädikatsbedingung entsprechen. In diesem Fall handelt es sich um die Datei e229aa1d5616; sie enthält den Datensatz mit RideId = 100000.

  2. In einem zweiten Scan liest Delta Lake die passenden Datendateien in den Speicher. Dann löscht Delta Lake die fraglichen Zeilen und schreibt eine neue, saubere Datendatei in die Speicherung. Diese neue Datendatei ist die 0d3e0e77c4c1 Datendatei. Da Delta Lake einen Datensatz gelöscht hat, enthält diese neue Datendatei 5.530.099 Datensätze (5.530.100 - 1).

  3. Wenn Delta Lake den Vorgang DELETE abschließt, wird die Datendatei e229aa1d5616 nun aus dem Delta-Transaktionsprotokoll entfernt, da sie nicht mehr Teil der Delta-Tabelle ist. Dieser Vorgang wird "Tombstoning" genannt. Es ist jedoch wichtig zu wissen, dass diese alte Datendatei nicht gelöscht wird, da du sie vielleicht noch brauchst, um eine Zeitreise zu einer früheren Version der Tabelle zu machen. Du kannst den Befehl VACUUM verwenden, um Dateien zu löschen, die älter als ein bestimmter Zeitraum sind. Zeitreisen und der Befehl VACUUM werden in Kapitel 6 ausführlich behandelt.

  4. Die Datendatei fedaa0f6623d bleibt Teil der Delta-Tabelle, da keine Änderungen daran vorgenommen wurden.

Wir können die eine Datendatei (0d3e0e77c4c1), die dem Verzeichnis hinzugefügt wurde, in unserer Verzeichnisliste sehen:

%sh
ls -al /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/

drwxrwxrwx _delta_log
-rwxrwxrwx part-00000-96c2f047-99cc-4a43-b2ea-0d3e0e77c4c1-c000.snappy.parquet
-rwxrwxrwx part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet
-rwxrwxrwx part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet

Die Datendatei e229aa1d5616 wurde nicht physisch gelöscht.

Die wichtigste Botschaft, die du daraus ziehen kannst, ist, dass die Löschtransaktion auf der Ebene der Datendatei stattfindet. Delta Lake erstellt bei Bedarf neue Partitionen und fügt neue Aktionen zum Hinzufügen undEntfernen von Dateien in das Transaktionsprotokoll ein. In Kapitel 6 zur Leistungsoptimierung werden der Befehl VACUUM und andere Möglichkeiten zum Aufräumen von nicht mehr benötigten Datendateien in Tombstones behandelt.

DELETE Leistungstuning-Tipps

Die wichtigste Möglichkeit, die Leistung einer DELETE Operation in Delta Lake zu verbessern, besteht darin, weitere Prädikate hinzuzufügen, um das Suchspektrum einzugrenzen. Wenn du zum Beispiel partitionierte Daten hast und die Partition kennst, zu der die zu löschenden Datensätze gehören, kannst du die Partitionsklausel zum DELETE Prädikat hinzufügen.

Delta Lake bietet außerdem eine Reihe weiterer Optimierungsbedingungen, wie das Überspringen von Daten und die Z-Order-Optimierung. Bei der Z-Reihenfolge wird das Layout jeder Datendatei so umorganisiert, dass ähnliche Spaltenwerte strategisch nahe beieinander liegen, um maximale Effizienz zu erreichen. Weitere Einzelheiten findest du in Kapitel 5.

Daten in einer Tabelle aktualisieren

Nachdem du nun gesehen hast, wie sich eine DELETE Operation auf die Tabelle YellowTaxis auswirkt, werfen wir einen kurzen Blick auf eine UPDATE Operation. Mit der Operation UPDATE kannst du selektiv alle Zeilen aktualisieren, die einer Filterbedingung entsprechen, die auch als Prädikat bezeichnet wird.

Anwendungsfall Beschreibung

Nehmen wir an, es gab einen Fehler mit dem DropLocationId für den Datensatz, in dem RideId = 9999994 . Stellen wir zunächst sicher, dass dieser Datensatz in unserer Tabelle vorhanden ist, indem wir die folgende SELECT verwenden:

SELECT
    INPUT_FILE_NAME(),
    RideId,
    VendorId,
    DropLocationId
FROM    
    taxidb.YellowTaxis
WHERE
    RideId = 9999994

Die Spark SQL-Funktion INPUT_FILE_NAME() ist eine praktische Funktion, die uns den Dateinamen liefert, in dem sich der Datensatz befindet:

+---------------------------+---------+----------+----------------+
| input_file_name()         | RideId  | VendorId | DropLocationId |
+---------------------------+---------+----------+----------------+
| .../part-00001-...parquet | 9999994 | 1        | 243            |
+---------------------------+---------+----------+----------------+

Die Funktion INPUT_FILE_NAME zeigt, dass sich unser Datensatz in der Datei fedaa0f6623d befindet. Das macht Sinn, da er einer der letzten Datensätze ist, also logischerweise in der zuletzt erstellten Datei liegt. Wir können sehen, dass die bestehende DropLocationId derzeit 243 ist. Nehmen wir an, dass wir dieses Feld auf den Wert 250 aktualisieren wollen. Als Nächstes sehen wir uns die eigentliche DELETE Operation an.

Daten in einer Tabelle aktualisieren

Wir können nun die UPDATE SQL-Anweisung wie folgt schreiben:

%sql

UPDATE
    taxidb.YellowTaxis
SET
    DropLocationId = 250
WHERE
    RideId = 9999994

Wir sehen, dass wir eine einzelne Zeile aktualisiert haben:

+-------------------+
| num_affected_rows |
+-------------------+
|   1               |
+-------------------+

Überprüfen wir zunächst, ob wir die Tabelle erfolgreich aktualisiert haben:

%sql
SELECT  
    RideId,
    DropLocationId
FROM
    taxidb.YellowTaxis
WHERE
    RideId = 9999994

+---------+----------------+
| RideId  | DropLocationId |
+---------+----------------+
| 9999994 | 250            |
+---------+----------------+

Die Ausgabe zeigt, dass der Datensatz erfolgreich aktualisiert wurde. Wenn wir den Befehl DESCRIBE HISTORY auf die Tabelle anwenden, sehen wir die Operation UPDATE für die Version 3 der Tabelle (die Ausgabe ist der Übersichtlichkeit halber geschwenkt):

version:             3
timestamp:           2022-12-23 17:20:45+00:00
operation:           UPDATE
operationParameters: [('predicate', '(RideId = 9999994)')]
operationMetrics:    [('numRemovedFiles', '1'),
                      ('numCopiedRows', '4469894'),
                      ('numAddedChangeFiles', '0'),
                      ('executionTimeMs', '25426'),
                      ('scanTimeMs', '129'),
                      ('numAddedFiles', '1'),
                      ('numUpdatedRows', '1'),
                      ('rewriteTimeMs', '25293')]

Eine Datei wurde entfernt ('numRemovedFiles', '1'), und eine wurde hinzugefügt ('numAdded​Files', '1'). Wir können auch unser UPDATE Prädikat [('predicate', '(RideId = 9999994)')] sehen. Wenn wir den Befehl grep verwenden, um die Details herauszufinden, sieht es folgendermaßen aus:

Tabelle 4-3. Maßnahmen, die als Ergebnis der UPDATE Operation ergriffen wurden
Aktion Dateiname # Anzahl der Datensätze
hinzufügen part-00000-da1ef656-46e-4de5-a189-50807db851f6-c000.snappy.parquet 4,469,895
entfernen part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet 4,469,895

Abbildung 4-2 zeigt die Aktionen, die Delta Lake durchgeführt hat, als wir den Datensatz gelöscht haben.

Before and after an UPDATE operation
Abbildung 4-2. Vor und nach einer UPDATE Operation

Delta Lake führt eine UPDATE auf einem Tisch in zwei Schritten aus:

  1. Es findet und wählt die Datendateien aus, die Daten enthalten, die dem Prädikat entsprechen und daher aktualisiert werden müssen. Um diesen Prozess zu beschleunigen, überspringt Delta Lake nach Möglichkeit Daten. In diesem Fall ist das die fedaa0f6623d Datendatei. Wir können das auch mit der SQL-Funktion INPUT_FILE_NAME() überprüfen.

  2. Als Nächstes liest Delta Lake jede übereinstimmende Datei in den Speicher, aktualisiert die relevanten Zeilen und schreibt das Ergebnis in eine neue Datendatei. Die neue Datendatei ist in diesem Fall die Datei 50807db851f6. Sie enthält nun alle Datensätze der Partition fedaa0f6623d, aber mit den angewandten Aktualisierungen, in diesem Fall die Aktualisierung für RideId = 9999994. Diese Datendatei ist 50807db851f6. Diese Datendatei enthält weiterhin 4.469.895 Datensätze. Sobald Delta Lake die UPDATE erfolgreich ausgeführt hat, fügt es eine Aktion zum Hinzufügen von Dateien für die neue Datendatei hinzu.

Da sie nicht mehr benötigt wird, wird die Datendatei fedaa0f6623d mit einer Remove-Commit-Aktion im Transaktionsprotokoll aus der Delta-Tabelle entfernt. Wie bei der Operation DELETE wird die Datei jedoch nicht physisch gelöscht, für den Fall, dass wir uns eine alte Version der Tabelle mit Zeitreise ansehen wollen.

Die Datendatei 0d3e0e77c4c1 war von unserer Aktualisierung nicht betroffen, sie bleibt also Teil der Delta-Tabelle und enthält weiterhin 5.530.099 Datensätze.

UPDATE Performance Tuning Tipps

Wie bei DELETE besteht auch bei die wichtigste Möglichkeit, die Leistung eines UPDATE Befehls in Delta Lake zu verbessern, darin, mehr Prädikate hinzuzufügen, um den Suchbereich einzugrenzen. Je spezifischer die Suche ist, desto weniger Dateien muss Delta Lake scannen und/oder ändern.

Wie im vorherigen Abschnitt erwähnt, können andere Delta Lake-Funktionen, wie z. B. die Z-Ordnung, verwendet werden, um die UPDATE Operationen weiter zu beschleunigen. In Kapitel 5 findest du weitere Informationen zur Delta Lake-Optimierung.

Daten mit der MERGE-Operation hochladen

Mit dem Delta Lake MERGE Befehl kannst du Upserts an deinen Daten durchführen. Ein Upsert ist eine Mischung aus einem UPDATE und einem INSERT Befehl. Um Upserts zu verstehen, nehmen wir an, dass wir eine bestehende Tabelle (die Zieltabelle ) und eine Quelltabelle haben, die eine Mischung aus neuen Datensätzen und Aktualisierungen bestehender Datensätze enthält. So funktioniert ein Upsert tatsächlich:

  1. Wenn ein Datensatz aus der Quelltabelle mit einem bereits existierenden Datensatz in der Zieltabelle übereinstimmt, aktualisiert Delta Lake den Datensatz.

  2. Wenn es keine solche Übereinstimmung gibt, fügt Delta Lake den neuen Datensatz ein.

Anwendungsfall Beschreibung

Wenden wir eine MERGE Operation auf unsere YellowTaxis Tabelle an. Führen wir eine Zählung unserer YellowTaxis Tabelle durch:

%sql
SELECT
    COUNT(*)
FROM
    taxidb.YellowTaxis

Wir sehen, dass wir 9.999.994 Datensätze haben.

+----------+
| count(1) |
+----------+
| 9999994  |
+----------+

Wir wollen den Datensatz mit RideId = 100000 wieder einfügen, den wir im Abschnitt DELETE dieses Kapitels gelöscht haben. In unseren Quelldaten brauchen wir also einen Datensatz mit einem RideId, der auf 100000 gesetzt ist.

Für dieses Beispiel nehmen wir an, dass wir auch die Datensätze mit RideId = 999991 aktualisieren wollen, weil VendorId falsch eingefügt wurde und für diese fünf Datensätze auf 1 (VendorId = 1) aktualisiert werden muss. Schließlich wollen wir die Anzahl der Datensätze auf glatte 10.000.000 Datensätze bringen, also haben wir 5 weitere Datensätze mit RideIds von 999996 bis 10000000.

Der MERGE-Datensatz

In unseren begleitenden Quelldatendateien für das Buch haben wir eine Datei namens YellowTaxisMergeData.csv, die diese Datensätze enthält. Da wir ein Schema angeben müssen, lesen wir zunächst das Schema aus unserer bestehenden Tabelle:

df = spark.read.format("delta").table("taxidb.YellowTaxis")
yellowTaxiSchema = df.schema
print(yellowTaxiSchema)

Sobald wir das Schema geladen haben, können wir unsere CSV-Datei mit den Seriendaten laden:

yellowTaxisMergeDataFrame = spark      \
            .read                      \
            .option("header", "true")  \
            .schema(yellowTaxiSchema)  \
            .csv("/mnt/datalake/book/chapter04/YellowTaxisMergeData.csv") 
            .sort(col("RideId"))

yellowTaxisMergeDataFrame.show()

Hier ist ein Teil der Ausgabe zu sehen:

+----------+----------+------------------------------+
| RideId   | VendorId | PickupTime                   |
+----------+----------+------------------------------+
|   100000 | 2        | 2022-03-01T00:00:00.000+0000 |
|  9999991 | 1        | 2022-04-04T20:54:04.000+0000 |
|  9999992 | 1        | 2022-04-04T20:54:04.000+0000 |
|  9999993 | 1        | 2022-04-04T20:54:04.000+0000 |
|  9999994 | 1        | 2022-04-04T20:54:04.000+0000 |
|  9999995 | 1        | 2022-04-04T20:54:04.000+0000 |
|  9999996 | 3        | 2022-03-01T00:00:00.000+0000 |
|  9999997 | 3        | 2022-03-01T00:00:00.000+0000 |
|  9999998 | 3        | 2022-03-01T00:00:00.000+0000 |
|  9999999 | 3        | 2022-03-01T00:00:00.000+0000 |
| 10000000 | 3        | 2022-03-01T00:00:00.000+0000 |
+----------+----------+------------------------------+

Wir sehen unseren Datensatz mit RideId = 100000, die fünf Datensätze (9999991 bis 9999995) mit ihren neuen VendorId von 1 und die fünf neuen Datensätze, beginnend mit 9999996.

Wir wollen unsere MERGE Anweisung in SQL schreiben, also müssen wir unseren Datenrahmen in SQL zur Verfügung haben. Die Klasse DataFrame hat eine praktische Methode namens createOrReplaceTempView, die genau das tut:

# Create a Temporary View on top of our DataFrame, making it
# accessible to the SQL MERGE statement below
yellowTaxisMergeDataFrame.createOrReplaceTempView("YellowTaxiMergeData")

Wir können jetzt einfach den Namen der Ansicht in SQL verwenden:

%sql
SELECT
    *
FROM    
    YellowTaxiMergeData

Dies zeigt genau die gleiche Ausgabe wie mit der Methode display() des Datenrahmens.

Die MERGE-Anweisung

Du kannst nun deine MERGE Anweisung wie folgt schreiben:

%sql
MERGE INTO taxidb.YellowTaxis AS target
    USING YellowTaxiMergeData AS source
        ON target.RideId = source.RideId

-- You need to update the VendorId if the records
-- matched
WHEN MATCHED                                      
    THEN
        -- If you want to update all columns,
        -- you can say "SET *"
        UPDATE SET target.VendorId = source.VendorId
WHEN NOT MATCHED
    THEN
        -- If all columns match, you can also do a "INSERT *"
        INSERT(RideId, VendorId, PickupTime, DropTime, PickupLocationId,
               DropLocationId, CabNumber, DriverLicenseNumber, PassengerCount,
               TripDistance, RateCodeId, PaymentType, TotalAmount, FareAmount,
               Extra, MtaTax, TipAmount, TollsAmount, ImprovementSurCharge)
        VALUES(RideId, VendorId, PickupTime, DropTime, PickupLocationId,
               DropLocationId, CabNumber, DriverLicenseNumber, PassengerCount,
               TripDistance, RateCodeId, PaymentType, TotalAmount, FareAmount,
               Extra, MtaTax, TipAmount, TollsAmount, ImprovementSurCharge)

Lass uns diese Aussage analysieren:

  1. Wir werden die Tabelle YellowTaxis Delta auf MERGE INTO aufrufen. Beachte, dass wir der Tabelle den Alias "Quelle" geben.

  2. Mit der USING Klausel geben wir den Quelldatensatz an, der in diesem Fall die View YellowTaxiMergeData ist, und geben ihr den Alias source.

  3. Definiere die Join-Bedingung zwischen dem Quell- und dem Zieldatensatz. In unserem Fall wollen wir einfach die VendorId verknüpfen. Wenn du partitionierte Daten hast und eine Partition anvisieren willst, kannst du diese Bedingung hier mit einer AND Anweisung hinzufügen.

  4. Gib die Aktion an, wenn RideId zwischen source und target liegt. In diesem Anwendungsfall wollen wir die Quelle mit VendorId aktualisieren, das auf 1 gesetzt ist. Hier aktualisieren wir nur eine Spalte, aber bei Bedarf können wir eine durch Kommas getrennte Spaltenliste angeben. Wenn wir alle Spalten aktualisieren wollen, sagen wir einfach UPDATE SET *.

  5. Definiere die Aktion, wenn der Datensatz in der Quelle existiert, aber nicht im Ziel. Wir haben keine zusätzliche Bedingung mit der WHEN NOT MATCHED, aber du kannst zusätzliche Klauseln hinzufügen, wenn der Anwendungsfall es erfordert. In den meisten Fällen wirst du eine INSERT Anweisung als Aktion angeben. Da unsere Quell- und Zielspaltennamen identisch sind, hätten wir auch eine einfache INSERT * verwenden können.

Wenn wir diese MERGE Anweisung ausführen, erhalten wir die folgende Ausgabe:

+-------------------+------------------+------------------+-------------------+
| num_affected_rows | num_updated_rows | num_deleted_rows | num_inserted_rows |
+-------------------+------------------+------------------+-------------------+
|        11         |        5         |        0         |         6         |
+-------------------+------------------+------------------+-------------------+

Diese Ausgabe ist genau das, was du erwartet hast:

  • Wir aktualisieren fünf Zeilen (VendorIds 9999991 bis 9999995)

  • Wir fügen sechs Zeilen ein:

    • Eine Reihe mit einer RideId von 100000

    • Die fünf Zeilen am Ende (9999996 bis 10000000)

Wir können die Aktualisierungen in den ersten fünf Zeilen sehen:

%sql
-- Make sure that the VendorId has been updated
-- for the records with RideId between
-- 9999991 and 9999995
SELECT
    RideId,
    VendorId
FROM
    taxidb.YellowTaxis
WHERE RideId BETWEEN 9999991 and 9999995

+---------+----------+
| RideId  | VendorId |
+---------+----------+
| 9999991 | 1        |
| 9999992 | 1        |
| 9999993 | 1        |
| 9999994 | 1        |
| 9999995 | 1        |
+---------+----------+

Alle Zeilen haben jetzt die Quelle VendorId von 1.

Wir können den eingefügten Datensatz mit RideId = 100000 sehen:

%sql
--Make sure that you have a record with VendorId = 100000
SELECT
    *
FROM    
    taxidb.YellowTaxis
WHERE
    RideId = 100000

Ausgabe (Teilausgabe gezeigt):

+--------+----------+---------------------------+---------------------------+
| RideId | VendorId | PickupTime                | DropTime                  |
+--------+----------+---------------------------+---------------------------+
| 100000 | 2        | 2022-03-01 00:00:00+00:00 | 2022-03-01 00:12:01+00:00 |
+--------+----------+---------------------------+---------------------------+

Und schließlich können wir die neuen Zeilen mit RideId > 9999995 sehen:

%sql
SELECT  
    *
FROM
    taxidb.YellowTaxis
WHERE
    RideId > 9999995

+----------+----------+---------------------------+
| RideId   | VendorId | PickupTime                |
+----------+----------+---------------------------+
| 9999996  | 3        | 2022-03-01 00:00:00+00:00 |
| 9999997  | 3        | 2022-03-01 00:00:00+00:00 |
| 9999998  | 3        | 2022-03-01 00:00:00+00:00 |
| 9999999  | 3        | 2022-03-01 00:00:00+00:00 |
| 10000000 | 3        | 2022-03-01 00:00:00+00:00 |
+----------+----------+---------------------------+

Und eine Gesamtzahl von 10 Millionen Datensätzen:

%sql
SELECT  
    COUNT(RideId)
FROM    
    taxidb.YellowTaxis

+----------+
| count(1) |
+----------+
| 10000000 |
+----------+

Ändern nicht übereinstimmender Zeilen mit MERGE

Eine wichtige Ergänzung zu der Delta Lake MERGE Operation ist die kürzlich veröffentlichte WHEN NOT MATCHED BY SOURCE Klausel. Diese Klausel kann verwendet werden, um UPDATE oder DELETE Datensätze in der Zieltabelle zu löschen, die keine entsprechenden Datensätze in der Quelltabelle haben. Dies kann eine nützliche Operation sein, um Datensätze in der Zieltabelle zu löschen, die in der Quelltabelle nicht mehr vorhanden sind, oder um Datensätze zu markieren, die darauf hinweisen, dass die Daten als gelöscht oder inaktiv betrachtet werden sollten, während die Datensätze in der Zieltabelle erhalten bleiben (d. h. Soft Delete).

Hinweis

WHEN NOT MATCHED BY SOURCE Klauseln werden von den Scala-, Python- und Java-Delta-Lake-APIs in Delta 2.3 und höher unterstützt. SQL wird in Delta 2.4 und höher unterstützt.

Um Datensätze zu löschen, die in den Quelltabellen, aber nicht in der Zieltabelle existieren (d.h. hartes Löschen), verwendest du die WHEN NOT MATCHED BY SOURCE Klausel, wie im folgenden Codebeispiel zu sehen ist:

Hinweis

Der Code WHEN NOT MATCHED BY SOURCE dient nur zur Veranschaulichung und sollte nicht zusammen mit den früheren Codebeispielen ausgeführt werden. Bitte beachte, dass die übrigen Codeausgaben in diesem Kapitel nicht mit den Beispielen und erwarteten Ausgaben in diesem Kapitel übereinstimmen, wenn du die WHEN NOT MATCHED BY SOURCE Codebeispiele ausführst.

%sql
MERGE INTO taxidb.YellowTaxis AS target
    USING YellowTaxiMergeData AS source
        ON target.RideId = source.RideId
WHEN MATCHED                                      
        UPDATE SET *
WHEN NOT MATCHED
        INSERT *
-- DELETE records in the target that are not matched by the source
WHEN NOT MATCHED BY SOURCE
    DELETE

Wenn du Datensätze in der Zieltabelle markieren möchtest, die in der Quelltabelle nicht mehr existieren (d.h. Soft Delete) und eine bestimmte Bedingung erfüllen, kannst du eine MERGE Bedingung und eine UPDATE angeben:

%sql
MERGE INTO taxidb.YellowTaxis AS target
    USING YellowTaxiMergeData AS source
        ON target.RideId = source.RideId
WHEN MATCHED                                      
        UPDATE SET *
WHEN NOT MATCHED
        INSERT *
-- Set target.status = 'inactive' when records in the target table 
-- don’t exist in the source table and condition is met
WHEN NOT MATCHED BY SOURCE target.PickupTime >= 
  (current_date() - INTERVAL '5' DAY) 
THEN
          UPDATE SET target.status = 'inactive'

Es ist die bewährte Methode, eine optionale MERGE Bedingung hinzuzufügen, wenn du die WHEN NOT MATCHED BY SOURCE Klausel zu UPDATE oder DELETE Zielzeilen hinzufügst. Denn wenn keine MERGE Bedingung angegeben ist, kann dies dazu führen, dass eine große Anzahl von Zielzeilen geändert wird. Die beste Leistung erzielst du, wenn du die WHEN NOT MATCHED BY SOURCE Klausel mit einer MERGE Bedingung versiehst (z. B. target.PickupTime >= (current_date() - INTERVAL '5' DAY im vorherigen Codebeispiel), um die Anzahl der aktualisierten oder gelöschten Zielzeilen zu begrenzen, denn dann wird eine Zielzeile nur geändert, wenn die Bedingung für diese Zeile wahr ist.

Du kannst auch mehrere WHEN NOT MATCHED BY SOURCE Klauseln zu einem MERGE Vorgang hinzufügen. Wenn es mehrere Klauseln gibt, werden sie in der Reihenfolge ausgewertet, in der sie angegeben sind, und alle WHEN NOT MATCHED BY SOURCE Klauseln, außer der letzten, müssen Bedingungen haben.

Analyse des MERGE-Vorgangs mit DESCRIBE HISTORY

Wenn wir DESCRIBE HISTORY auf die Tabelle YellowTaxis im operationsParameters Abschnitt der Ausgabe ausführen, können wir unser MERGE Prädikat sehen:

operation: MERGE

[('predicate',
  '(target.RideId = source.RideId)'),
   ('matchedPredicates', '[{"actionType":"update"}]'),
   ('notMatchedPredicates', '[{"actionType":"insert"}]')]

Wir sehen die Join-Bedingung (target.RideId = source.RideId), die matchedPredicate das eine Aktualisierung angibt, und das notMatchedPredicate, das eine Einfügung angibt.

Die operationMetrics Ausgabeabschnitte zeigen die Details der verschiedenen Aktionen:

 [('numTargetRowsCopied', '4469890'),
  ('numTargetRowsDeleted', '0'),
  ('numTargetFilesAdded', '4'),
  ('executionTimeMs', '91902'),
  ('numTargetRowsInserted', '6'),
  ('scanTimeMs', '8452'),
  ('numTargetRowsUpdated', '5'),
  ('numOutputRows', '4469901'),
  ('numTargetChangeFilesAdded', '0'),
  ('numSourceRows', '11'),
  ('numTargetFilesRemoved', '1'),
  ('rewriteTimeMs', '16782')]

Auch hier sehen wir, dass sechs Zeilen eingefügt wurden (numTargetRowsInserted) und fünf Zeilen aktualisiert wurden (numTargetRowsUpdated). Vier neue Datendateien wurden zu unserer Delta-Tabelle hinzugefügt und eine Datendatei wurde entfernt.

Das Innenleben der MERGE-Operation

Intern führt Delta Lake eine MERGE Operation wie diese in zwei Schritten durch:

  1. Er führt zunächst einen inner join zwischen der Zieltabelle und der Quelltabelle durch, um alle Datendateien mit Übereinstimmungen auszuwählen. Dadurch wird verhindert, dass der Vorgang unnötig Daten mischt, die sicher ignoriert werden können.

  2. Als Nächstes führt er eine outer join zwischen den ausgewählten Dateien in den Ziel- und Quelltabellen durch und wendet die entsprechende INSERT, DELETE oder UPDATE Klausel an, wie vom Benutzer angegeben.

Der Hauptunterschied zwischen MERGE und UPDATE bzw. DELETE besteht darin, dass Delta Lake Joins verwendet, um ein MERGE zu vervollständigen. Dadurch kann einige einzigartige Strategien anwenden, um die Leistung zu verbessern.

Fazit

DML-Operationen wie DELETE, UPDATE und MERGE sind wichtige Operationen für alle Tabellenformate und ETL-Operationen, die alle über das Transaktionsprotokoll aktiviert werden. Wenn du diese Operationen nutzt, kannst du Datenänderungen effizient verarbeiten und die Datenintegrität in deiner Datenplattform wahren.

Ähnlich wie bei Tabellen in einem herkömmlichen RDBMS hast du in diesem Kapitel gelesen, dass du mit Delta-Tabellen DELETE, UPDATE und MERGE Operationen durchführen kannst, aber du kannst diese Operationen auch mit SQL oder der DataFrame API anwenden. Außerdem hast du erfahren, was unter der Haube von Delta Lake mit den zugrundeliegenden Dateien im Delta-Tabellenverzeichnis passiert und wie das Transaktionsprotokoll diese verschiedenen Arten von Einträgen aufzeichnet und verfolgt. Mit dem Befehl DESCRIBE HISTORY können wir Details über die Ausgabe der Transaktionen einer Tabelle einsehen. Jeder dieser Vorgänge kann auch Prädikate verwenden, um die Anzahl der gescannten Dateien zu reduzieren und die Leistung zu verbessern. Neben der Verwendung von Prädikaten bei Operationen gibt es noch weitere Techniken zur Leistungsoptimierung von Delta-Tabellen, die du im folgenden Kapitel kennenlernen wirst.

1 GitHub Repo-Speicherort: /chapter04/00 - Kapitel Initialisierung

2 GitHub Repo-Speicherort: /chapter04/01 - Löschvorgänge

3 Handbuchseite für sed

4 GitHub Repo-Speicherort: /chapter04/01 - Löschvorgänge

Get Delta Lake: Auf und davon now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.