Chapter 4. Table Deletes, Updates, and Merges

Since Delta Lake adds a transactional layer to classic data lakes, we can perform classic DML operations, such as updates, deletes, and merges. When you perform a DELETE operation on a Delta table, the operation is performed at the data file level, removing and adding data files as needed. Removed data files are no longer part of the current version of the Delta table, but should not be physically deleted immediately since you might want to revert to an older version of the table with time travel (time travel is covered in Chapter 6). The same is true when you run an UPDATE operation. Data files will be added and removed from your Delta table as required.

The most powerful Delta Lake DML operation is the MERGE operation, which allows you to perform an “upsert” operation, which is a mix of UPDATE, DELETE, and INSERT operations, on your Delta table. You join a source and a target table, write a match condition, and then specify what should happen with the records that either match or don’t match.

Deleting Data from a Delta Table

We will start with a clean taxidb.YellowTaxis table. This table is created by the “Chapter Initialization” script for Chapter 4.1 It has 9,999,995 million rows:

%sql
SELECT  
    COUNT(id)
FROM    
    taxidb.YellowTaxis

Output:

+----------+
| count(1) |
+----------+
| 9999995  |
+----------+

Table Creation and DESCRIBE HISTORY

The taxidb.YellowTaxis Delta table was created in the “Chapter Initialization” script and copied into our /chapter04 folder. Let’s look at DESCRIBE HISTORY for the table:2

%sql
DESCRIBE HISTORY taxidb.YellowTaxis

Output (only relevant portions shown):

+-----------+--------------------------------+---------------------+
| operation | operationParameters            | operationMetrics    |
+-----------+--------------------------------+---------------------+
| WRITE     | [('mode', 'Overwrite'), (...)] | [('numFiles', '2'), |
|           |                                |  ('numOutputRows',  | 
|           |                                |  '9999995'), ...]   |
+-----------+--------------------------------+---------------------+

We can see that we have one transaction containing a WRITE operation, writing two data files for a total of 9,999,995 rows. Let’s find out some details about both of those files.

In Chapter 2 you learned how you can use the transaction log to see the add and remove file actions. Let’s take a look at the _delta_log directory:

%sh
ls /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/_delta_log/*.json

As expected, we see only one transaction log entry:

/dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/_delta_log/….0000.json

This log entry should have two file add actions, since the numfiles entry in DESCRIBE HISTORY was two. Again, let’s use our grep command to find those sections:

%sh
grep \"add\" /dbfs/…/chapter04/YellowTaxisDelta/_delta_log/..0000.json |
  sed -n 1p > /tmp/commit.json
python -m json.tool < /tmp/commit.json

One variation of the previous command is that since you now have two entries, we need to use the sed command to extract the right add entry.

Tip

You can pipe the grep command output to the sed3 command. sed is a stream editor that performs basic text transformations on an input stream and writes the result to an output stream. The -n option suppresses normal output, and the 1p command prints only the first line of the input. To find the next add entry, you can simply use sed -n 2p, which outputs the second line.

Produced output (only relevant portions shown):

{
    "add": {
        "path": "part-00000-...-c000.snappy.parquet",
        …
        "stats": "{\"numRecords\":5530100,...}}",
        "tags": {
            …
        }
  }

Here, we see the name of the first data file created for our table, and the number of records in that file. We can use the same command with sed -n 2p to get the second add action to get the second data file:

{
    "add": {
        "path": "part-00001-...-c000.snappy.parquet",
        "...: "”
stats”: {\"numRecords\":4469895,...}}",
        "tags": {
            …
        }
    }
}

Now we know that our table has the following data files:

Table 4-1. Parquet files created
Parquet filename Number of records
part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet 5,530,100
part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet 4,469,895

These files correspond with our directory listing, so the transaction log and the directory listing report are consistent:

%sh
ls -al /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta

drwxrwxrwx 2 _delta_log
-rwxrwxrwx 1 part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet
-rwxrwxrwx 1 part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet

Performing the DELETE Operation

Let’s assume that we want to delete a single record, in this case the record with RideId = 100000. First, we should make sure that the record is indeed still in the table with a SQL SELECT:4

%sql
-- First, show that you have data for RideId = 10000
SELECT  
    RideId,
    VendorId,
    CabNumber,
    TotalAmount
FROM    
    taxidb.YellowTaxis
WHERE  
    RideId = 100000

Output:

+--------+----------+-----------+-------------+
| RideId | VendorId | CabNumber | TotalAmount |
+--------+----------+-----------+-------------+
| 100000 | 2        | T478827C  | 7.56        |
+--------+----------+-----------+-------------+

To delete this row, we can use a simple SQL DELETE. We can use the DELETE command to selectively delete rows based upon a predicate, or filtering, condition:

%sql
DELETE FROM
    taxidb.YellowTaxis
WHERE RideId = 100000

Output:

+------------------+
|num_affected_rows |
+------------------+
|  1               |
+------------------+

We can confirm that we did in fact delete one row. When we use the DESCRIBE HISTORY command to look at the different operations on the table, we get the following for version 1 (the output of the row is pivoted for readability):

version:             1
timestamp:           2022-12-14T17:50:23.000+0000
operation:           DELETE
operationParameters: [('predicate', 
                     '["(spark_catalog.taxidb.YellowTaxis.RideId = 100000)"]')]
operationMetrics:    [('numRemovedFiles', '1'),
                      ('numCopiedRows', '5530099'),
                      ('numAddedChangeFiles', '0'),
                      ('executionTimeMs', '32534'),
                      ('numDeletedRows', '1'),
                      ('scanTimeMs', '1524'),
                      ('numAddedFiles', '1'),
                      ('rewriteTimeMs', '31009')]

We can see the operation was a DELETE and the predicate we used for the deletion was WHERE RideId = 100000. Delta Lake removed one file (numRemovedFiles = 1) and added one new file (numAdded​Files = 1). If we use our trusted grep command to find out the details, things look as follows:

Table 4-2. Result of DELETE operation
Action Filename # of records
Add part-00000-96c2f047-99cc-4a43-b2ea-0d3e0e77c4c1-c000.snappy.parquet 5,530,099
Remove part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet 4,469,895

Figure 4-1 illustrates Delta Lake’s actions when we deleted the record.

YellowTaxis Delta table before and after the DELETE operation
Figure 4-1. YellowTaxis Delta table before and after the DELETE operation

Delta Lake performs the following actions as part of the DELETE operation:

  1. Delta Lake made the first scan of the data to identify any files containing rows matching the predicate condition. In this case, the file is the e229aa1d5616 data file; it contains the record with RideId = 100000.

  2. In a second scan, Delta Lake reads the matching data files into memory. At this point Delta Lake deletes the rows in question before writing out a new clean data file to storage. This new data file is the 0d3e0e77c4c1 data file. Since Delta Lake deleted one record, this new data file contains 5,530,099 records (5,530,100 – 1).

  3. As Delta Lake completes the DELETE operation, the data file e229aa1d5616 is now removed from the Delta transaction log, since it is no longer part of the Delta table. This process is called “tombstoning.” However, it is important to note that this old data file is not deleted, because you might still need it to time travel back to an earlier version of the table. You can use the VACUUM command to delete files older than a certain time period. Time travel and the VACUUM command are covered in detail in Chapter 6.

  4. The data file fedaa0f6623d remains part of the Delta table, since no changes applied to it.

We can see the one data file (0d3e0e77c4c1) that has been added to the directory in our directory listing:

%sh
ls -al /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/

drwxrwxrwx _delta_log
-rwxrwxrwx part-00000-96c2f047-99cc-4a43-b2ea-0d3e0e77c4c1-c000.snappy.parquet
-rwxrwxrwx part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet
-rwxrwxrwx part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet

The data file e229aa1d5616 was not physically deleted.

The most important message to take away from this is that the delete transaction occurs at the data file level. Delta Lake will create new partitions and insert new add file and remove file actions in the transaction log, as needed. Chapter 6 on performance tuning will cover the VACUUM command and other ways to clean up tombstoned data files that are no longer required.

DELETE Performance Tuning Tips

The main way to improve the performance of a DELETE operation on Delta Lake is to add more predicates to narrow the search spectrum. For example, if you have partitioned data, and know the partition that the to-be-deleted records are part of, you can add their partition clause to the DELETE predicate.

Delta Lake also provides a number of other optimization conditions, such as data skipping and z-order optimization. Z-ordering reorganizes the layout of each data file so that similar column values are strategically colocated near one another for maximum efficiency. Please refer to Chapter 5 for more details.

Updating Data in a Table

Now that you have seen the impact of a DELETE operation on the YellowTaxis table, let’s take a quick look at an UPDATE operation. You can use the UPDATE operation to selectively update any rows matching a filtering condition, also known as a predicate.

Use Case Description

Let’s assume there was an error with the DropLocationId for the record where RideId = 9999994. First, let’s ensure this record is present in our table with the following SELECT:

SELECT
    INPUT_FILE_NAME(),
    RideId,
    VendorId,
    DropLocationId
FROM    
    taxidb.YellowTaxis
WHERE
    RideId = 9999994

The Spark SQL INPUT_FILE_NAME() function is a handy function that gives us the filename in which the record is located:

+---------------------------+---------+----------+----------------+
| input_file_name()         | RideId  | VendorId | DropLocationId |
+---------------------------+---------+----------+----------------+
| .../part-00001-...parquet | 9999994 | 1        | 243            |
+---------------------------+---------+----------+----------------+

The INPUT_FILE_NAME function shows that our record is located in the fedaa0f6623d data file, which makes sense, since it is one of the last records, so logically it is located in the last-created data file. We can see that the existing DropLocationId is currently 243. Let’s assume that we want to update this field to have a value of 250. We’ll take a look at the actual DELETE operation next.

Updating Data in a Table

We can now write the UPDATE SQL statement as follows:

%sql

UPDATE
    taxidb.YellowTaxis
SET
    DropLocationId = 250
WHERE
    RideId = 9999994

We see that we updated a single row:

+-------------------+
| num_affected_rows |
+-------------------+
|   1               |
+-------------------+

Let’s first verify that we updated the table successfully:

%sql
SELECT  
    RideId,
    DropLocationId
FROM
    taxidb.YellowTaxis
WHERE
    RideId = 9999994

+---------+----------------+
| RideId  | DropLocationId |
+---------+----------------+
| 9999994 | 250            |
+---------+----------------+

The output shows the record was updated successfully. When we use the DESCRIBE HISTORY command on the table, we see the UPDATE operation on version 3 of the table (output pivoted for clarity):

version:             3
timestamp:           2022-12-23 17:20:45+00:00
operation:           UPDATE
operationParameters: [('predicate', '(RideId = 9999994)')]
operationMetrics:    [('numRemovedFiles', '1'),
                      ('numCopiedRows', '4469894'),
                      ('numAddedChangeFiles', '0'),
                      ('executionTimeMs', '25426'),
                      ('scanTimeMs', '129'),
                      ('numAddedFiles', '1'),
                      ('numUpdatedRows', '1'),
                      ('rewriteTimeMs', '25293')]

One file was removed ('numRemovedFiles', '1'), and one was added ('numAdded​Files', '1'). We can also see our UPDATE predicate [('predicate', '(RideId = 9999994)')]. If we use the grep command to find out the details, things look as follows:

Table 4-3. Actions taken as a result of the UPDATE operation
Action Filename # of records
Add part-00000-da1ef656-46e-4de5-a189-50807db851f6-c000.snappy.parquet 4,469,895
Remove part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet 4,469,895

Figure 4-2 illustrates the actions that Delta Lake took when we deleted the record.

Before and after an UPDATE operation
Figure 4-2. Before and after an UPDATE operation

Delta Lake performs an UPDATE on a table in two steps:

  1. It finds and selects the data files containing data that match the predicate and therefore need to be updated. Delta Lake uses data skipping whenever possible to speed up this process. In this case, that is the fedaa0f6623d data file. We could also verify that with the INPUT_FILE_NAME() SQL function.

  2. Next, Delta Lake reads each matching file into memory, updates the relevant rows, and writes out the result in a new data file. The new data file in this case is the 50807db851f6 file. It now contains all the records of the fedaa0f6623d partition, but with the applied updates, which in this case is the update for RideId = 9999994. This data file is 50807db851f6. This data file continues to hold 4,469,895 records. Once Delta Lake has executed the UPDATE successfully, it adds an add file action for the new data file.

Since it is no longer required, the data file fedaa0f6623d is removed from the Delta table with a remove commit action in the transaction log. However, like the DELETE operation, the file is not physically deleted, in case we might want to look at an old version of the table with time travel.

The data file 0d3e0e77c4c1 was unaffected by our update, so it remains part of the Delta table and continues to hold 5,530,099 records.

UPDATE Performance Tuning Tips

Like the DELETE operation, the main way to improve the performance of an UPDATE command on Delta Lake is to add more predicates to narrow the search scope. The more specific the search, the fewer files Delta Lake needs to scan and/or modify.

As mentioned in the previous section, other Delta Lake features, such as Z-ordering, can be used to speed up UPDATE operations further. See Chapter 5 for details on Delta Lake optimization.

Upsert Data Using the MERGE Operation

The Delta Lake MERGE command allows you to perform upserts on your data. An upsert is a mix of an UPDATE and an INSERT command. To understand upserts, let’s assume that we have an existing table (the target table) and a source table that contain a mix of new records and updates to existing records. Here is how an upsert actually works:

  1. When a record from the source table matches a preexisting record in the target table, Delta Lake updates the record.

  2. When there is no such match, Delta Lake inserts the new record.

Use Case Description

Let’s apply a MERGE operation to our YellowTaxis table. Let’s perform a count of our YellowTaxis table:

%sql
SELECT
    COUNT(*)
FROM
    taxidb.YellowTaxis

We see that we have 9,999,994 records.

+----------+
| count(1) |
+----------+
| 9999994  |
+----------+

We want to reinsert the record with RideId = 100000 that we deleted in the DELETE section of this chapter. So, in our source data, we need one record with a RideId set to 100000.

For this example, let’s assume we also want to update the records with RideId = 999991 because the VendorId was inserted wrong, and it needs to be updated to 1 (VendorId = 1) for these five records. Finally, we want to bring the record count to an even 10,000,000 records, so we have 5 more records, with RideIds ranging from 999996 through 10000000.

The MERGE Dataset

In our companion source data files for the book, we have a file named YellowTaxisMergeData.csv, which has these records. Since we need to supply a schema, we first read the schema from our existing table:

df = spark.read.format("delta").table("taxidb.YellowTaxis")
yellowTaxiSchema = df.schema
print(yellowTaxiSchema)

Once we have loaded the schema, we can load our merge data CSV file:

yellowTaxisMergeDataFrame = spark      \
            .read                      \
            .option("header", "true")  \
            .schema(yellowTaxiSchema)  \
            .csv("/mnt/datalake/book/chapter04/YellowTaxisMergeData.csv") 
            .sort(col("RideId"))

yellowTaxisMergeDataFrame.show()

A partial output is shown here:

+----------+----------+------------------------------+
| RideId   | VendorId | PickupTime                   |
+----------+----------+------------------------------+
|   100000 | 2        | 2022-03-01T00:00:00.000+0000 |
|  9999991 | 1        | 2022-04-04T20:54:04.000+0000 |
|  9999992 | 1        | 2022-04-04T20:54:04.000+0000 |
|  9999993 | 1        | 2022-04-04T20:54:04.000+0000 |
|  9999994 | 1        | 2022-04-04T20:54:04.000+0000 |
|  9999995 | 1        | 2022-04-04T20:54:04.000+0000 |
|  9999996 | 3        | 2022-03-01T00:00:00.000+0000 |
|  9999997 | 3        | 2022-03-01T00:00:00.000+0000 |
|  9999998 | 3        | 2022-03-01T00:00:00.000+0000 |
|  9999999 | 3        | 2022-03-01T00:00:00.000+0000 |
| 10000000 | 3        | 2022-03-01T00:00:00.000+0000 |
+----------+----------+------------------------------+

We can see our record with RideId = 100000, the five records (9999991 through 9999995) with their new VendorId of 1, and the five new records, starting at 9999996.

We want to write our MERGE statement in SQL, so we need to have our DataFrame available in SQL. The DataFrame class has a handy method called createOrReplaceTempView, which does exactly that:

# Create a Temporary View on top of our DataFrame, making it
# accessible to the SQL MERGE statement below
yellowTaxisMergeDataFrame.createOrReplaceTempView("YellowTaxiMergeData")

We can now just use the view name in SQL:

%sql
SELECT
    *
FROM    
    YellowTaxiMergeData

This shows exactly the same output as shown with the display() method of the DataFrame.

The MERGE Statement

You can now write your MERGE statement as follows:

%sql
MERGE INTO taxidb.YellowTaxis AS target
    USING YellowTaxiMergeData AS source
        ON target.RideId = source.RideId

-- You need to update the VendorId if the records
-- matched
WHEN MATCHED                                      
    THEN
        -- If you want to update all columns,
        -- you can say "SET *"
        UPDATE SET target.VendorId = source.VendorId
WHEN NOT MATCHED
    THEN
        -- If all columns match, you can also do a "INSERT *"
        INSERT(RideId, VendorId, PickupTime, DropTime, PickupLocationId,
               DropLocationId, CabNumber, DriverLicenseNumber, PassengerCount,
               TripDistance, RateCodeId, PaymentType, TotalAmount, FareAmount,
               Extra, MtaTax, TipAmount, TollsAmount, ImprovementSurCharge)
        VALUES(RideId, VendorId, PickupTime, DropTime, PickupLocationId,
               DropLocationId, CabNumber, DriverLicenseNumber, PassengerCount,
               TripDistance, RateCodeId, PaymentType, TotalAmount, FareAmount,
               Extra, MtaTax, TipAmount, TollsAmount, ImprovementSurCharge)

Let’s analyze this statement:

  1. We are going to MERGE INTO the YellowTaxis Delta table. Notice that we give the table an alias of source.

  2. Using the USING clause we specify the source dataset, which in this case is the view YellowTaxiMergeData, and give it an alias of source.

  3. Define the join condition between the source and target dataset. In our case, we simply want to join on the VendorId. If you have partitioned data, and want to target a partition, you might want to add that condition here with an AND statement.

  4. Specify the action when the RideId matches between the source and target. In this use case, we want to update the source with the VendorId of the source, which is set to 1. Here, we are only updating one column, but if we need to, we can supply a column list, separated by commas. If we want to update all columns, we simply say UPDATE SET *.

  5. Define the action when the record exists in the source, but not in the target. We do not have any additional condition with the WHEN NOT MATCHED, but you can add any additional clauses if the use case calls for it. Most of the time you will provide an INSERT statement as an action. Since our source and target column names are identical, we could have also used a simple INSERT *.

When we execute this MERGE statement, we get the following output:

+-------------------+------------------+------------------+-------------------+
| num_affected_rows | num_updated_rows | num_deleted_rows | num_inserted_rows |
+-------------------+------------------+------------------+-------------------+
|        11         |        5         |        0         |         6         |
+-------------------+------------------+------------------+-------------------+

This output is exactly what you expected:

  • We update five rows (VendorIds 9999991 through 9999995)

  • We insert six rows:

    • One row with a RideId of 100000

    • The five rows at the end (9999996 through 10000000)

We can see the updates on the first five rows:

%sql
-- Make sure that the VendorId has been updated
-- for the records with RideId between
-- 9999991 and 9999995
SELECT
    RideId,
    VendorId
FROM
    taxidb.YellowTaxis
WHERE RideId BETWEEN 9999991 and 9999995

+---------+----------+
| RideId  | VendorId |
+---------+----------+
| 9999991 | 1        |
| 9999992 | 1        |
| 9999993 | 1        |
| 9999994 | 1        |
| 9999995 | 1        |
+---------+----------+

All rows now have the source VendorId of 1.

We can see the inserted record with RideId = 100000:

%sql
--Make sure that you have a record with VendorId = 100000
SELECT
    *
FROM    
    taxidb.YellowTaxis
WHERE
    RideId = 100000

Output (partial output shown):

+--------+----------+---------------------------+---------------------------+
| RideId | VendorId | PickupTime                | DropTime                  |
+--------+----------+---------------------------+---------------------------+
| 100000 | 2        | 2022-03-01 00:00:00+00:00 | 2022-03-01 00:12:01+00:00 |
+--------+----------+---------------------------+---------------------------+

And finally, we can see the new rows with RideId > 9999995:

%sql
SELECT  
    *
FROM
    taxidb.YellowTaxis
WHERE
    RideId > 9999995

+----------+----------+---------------------------+
| RideId   | VendorId | PickupTime                |
+----------+----------+---------------------------+
| 9999996  | 3        | 2022-03-01 00:00:00+00:00 |
| 9999997  | 3        | 2022-03-01 00:00:00+00:00 |
| 9999998  | 3        | 2022-03-01 00:00:00+00:00 |
| 9999999  | 3        | 2022-03-01 00:00:00+00:00 |
| 10000000 | 3        | 2022-03-01 00:00:00+00:00 |
+----------+----------+---------------------------+

And a grand total of 10 million records:

%sql
SELECT  
    COUNT(RideId)
FROM    
    taxidb.YellowTaxis

+----------+
| count(1) |
+----------+
| 10000000 |
+----------+

Modifying unmatched rows using MERGE

An important addition to the Delta Lake MERGE operation is the recently released WHEN NOT MATCHED BY SOURCE clause. This clause can be used to UPDATE or DELETE records in the target table that do not have corresponding records in the source table. This can be a useful operation for deleting records in the target table that no longer exist in the source table, or for flagging records that indicate the data should be considered deleted or inactive, while still keeping the records in the target table (i.e., soft delete).

Note

WHEN NOT MATCHED BY SOURCE clauses are supported by the Scala, Python, and Java Delta Lake APIs in Delta 2.3 and above. SQL is supported in Delta 2.4 and above.

To delete records that exist in the source tables and not in the target table (i.e., hard delete), use the WHEN NOT MATCHED BY SOURCE clause, as seen in the following code example:

Note

The WHEN NOT MATCHED BY SOURCE code is for demonstration purposes only and is not meant to be executed in sequence with the earlier code examples. Please note that if you execute the WHEN NOT MATCHED BY SOURCE code examples, then the remaining code outputs in this chapter will not align with the examples and expected outputs in this chapter.

%sql
MERGE INTO taxidb.YellowTaxis AS target
    USING YellowTaxiMergeData AS source
        ON target.RideId = source.RideId
WHEN MATCHED                                      
        UPDATE SET *
WHEN NOT MATCHED
        INSERT *
-- DELETE records in the target that are not matched by the source
WHEN NOT MATCHED BY SOURCE
    DELETE

If you wish to flag records in the target table that no longer exist in the source table (i.e., soft delete) that satisfy a certain condition, you can specify a MERGE condition and an UPDATE:

%sql
MERGE INTO taxidb.YellowTaxis AS target
    USING YellowTaxiMergeData AS source
        ON target.RideId = source.RideId
WHEN MATCHED                                      
        UPDATE SET *
WHEN NOT MATCHED
        INSERT *
-- Set target.status = 'inactive' when records in the target table 
-- don’t exist in the source table and condition is met
WHEN NOT MATCHED BY SOURCE target.PickupTime >= 
  (current_date() - INTERVAL '5' DAY) 
THEN
          UPDATE SET target.status = 'inactive'

It is best practice to add an optional MERGE condition when you add the WHEN NOT MATCHED BY SOURCE clause to UPDATE or DELETE target rows. This is because when there is no specified MERGE condition, this can lead to a large number of target rows being modified. Therefore, for best performance, apply a MERGE condition to the WHEN NOT MATCHED BY SOURCE clause (e.g., target.PickupTime >= (current_date() - INTERVAL '5' DAY in the previous code example) to limit the number of target rows being updated or deleted, because then a target row is only modified if that condition is true for that row.

You can also add multiple WHEN NOT MATCHED BY SOURCE clauses to a MERGE operation. When there are multiple clauses, they are evaluated in the order they are specified and all WHEN NOT MATCHED BY SOURCE clauses, except the last one, must have conditions.

Analyzing the MERGE operation with DESCRIBE HISTORY

When we run DESCRIBE HISTORY on the YellowTaxis table in the operationsParameters section of the output, we can see our MERGE predicate:

operation: MERGE

[('predicate',
  '(target.RideId = source.RideId)'),
   ('matchedPredicates', '[{"actionType":"update"}]'),
   ('notMatchedPredicates', '[{"actionType":"insert"}]')]

We can see the join condition (target.RideId = source.RideId), the matchedPredicate that specifies an update, and the notMatchedPredicate, which specifies an insert.

The operationMetrics output sections show the details of the different actions:

 [('numTargetRowsCopied', '4469890'),
  ('numTargetRowsDeleted', '0'),
  ('numTargetFilesAdded', '4'),
  ('executionTimeMs', '91902'),
  ('numTargetRowsInserted', '6'),
  ('scanTimeMs', '8452'),
  ('numTargetRowsUpdated', '5'),
  ('numOutputRows', '4469901'),
  ('numTargetChangeFilesAdded', '0'),
  ('numSourceRows', '11'),
  ('numTargetFilesRemoved', '1'),
  ('rewriteTimeMs', '16782')]

Here, we can again see that six rows were inserted (numTargetRowsInserted), and five rows were updated (numTargetRowsUpdated). Four new data files were added to our Delta table, and one data file was removed.

Inner Workings of the MERGE Operation

Internally, Delta Lake completes a MERGE operation like this in two steps:

  1. It first performs an inner join between the target table and the source table to select all data files containing matches. This prevents the operation from unnecessarily shuffling data that can be safely ignored.

  2. Next, it performs an outer join between the selected files in the target and source tables, and applies the appropriate INSERT, DELETE, or UPDATE clause as specified by the user.

The main way that a MERGE differs from an UPDATE or a DELETE under the hood is that Delta Lake uses joins to complete a MERGE. This allows you to use some unique strategies when trying to improve performance.

Conclusion

DML operations like DELETE, UPDATE, and MERGE are essential operations for any table format and ETL operations, all of which are enabled through the transaction log. By leveraging these operations, you can start efficiently handling data changes and maintaining data integrity in your data platform.

Similar to tables in a traditional RDBMS, you read in this chapter that with Delta tables you can perform DELETE, UPDATE, and MERGE operations, but you can also apply these operations using SQL or the DataFrame API. More importantly, you learned what happens under the hood in Delta Lake with the underlying files in the Delta table directory, and how the transaction log records and tracks these different types of entries. Using the DESCRIBE HISTORY command, we can view details about the output of a table’s transactions. Each of these operations can also leverage predicates to reduce the number of files scanned and improve performance. Outside of using predicates during operations, there are other performance tuning techniques that can be applied to Delta tables that you will learn about in the following chapter.

1 GitHub repo location: /chapter04/00 - Chapter Initialization

2 GitHub repo location: /chapter04/01 - Delete Operations

3 Manual page for sed

4 GitHub repo location: /chapter04/01 - Delete Operations

Get Delta Lake: Up and Running now with the O’Reilly learning platform.

O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.