Chapter 4. Table Deletes, Updates, and Merges
Since Delta Lake adds a transactional layer to classic data lakes, we can perform classic DML operations, such as updates, deletes, and merges. When you perform a DELETE
operation on a Delta table, the operation is performed at the data file level, removing and adding data files as needed. Removed data files are no longer part of the current version of the Delta table, but should not be physically deleted immediately since you might want to revert to an older version of the table with time travel (time travel is covered in Chapter 6). The same is true when you run an UPDATE
operation. Data files will be added and removed from your Delta table as required.
The most powerful Delta Lake DML operation is the MERGE
operation, which allows you to perform an “upsert” operation, which is a mix of UPDATE
, DELETE
, and INSERT
operations, on your Delta table. You join a source and a target table, write a match condition, and then specify what should happen with the records that either match or don’t match.
Deleting Data from a Delta Table
We will start with a clean taxidb.YellowTaxis
table. This table is created by the “Chapter Initialization” script for Chapter 4.1 It has 9,999,995 million rows:
%sql SELECT COUNT(id) FROM taxidb.YellowTaxis
Output:
+----------+ | count(1) | +----------+ | 9999995 | +----------+
Table Creation and DESCRIBE HISTORY
The taxidb.YellowTaxis
Delta table was created in the “Chapter Initialization” script and copied into our /chapter04 folder. Let’s look at DESCRIBE HISTORY
for the table:2
%sql DESCRIBE HISTORY taxidb.YellowTaxis
Output (only relevant portions shown):
+-----------+--------------------------------+---------------------+ | operation | operationParameters | operationMetrics | +-----------+--------------------------------+---------------------+ | WRITE | [('mode', 'Overwrite'), (...)] | [('numFiles', '2'), | | | | ('numOutputRows', | | | | '9999995'), ...] | +-----------+--------------------------------+---------------------+
We can see that we have one transaction containing a WRITE
operation, writing two data files for a total of 9,999,995 rows. Let’s find out some details about both of those files.
In Chapter 2 you learned how you can use the transaction log to see the add and remove file actions. Let’s take a look at the _delta_log directory:
%sh ls /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/_delta_log/*.json
As expected, we see only one transaction log entry:
/dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/_delta_log/….0000.json
This log entry should have two file add actions, since the numfiles
entry in DESCRIBE HISTORY
was two. Again, let’s use our grep
command to find those sections:
%sh grep \"add\" /dbfs/…/chapter04/YellowTaxisDelta/_delta_log/..0000.json | sed -n 1p > /tmp/commit.json python -m json.tool < /tmp/commit.json
One variation of the previous command is that since you now have two entries, we need to use the sed
command to extract the right add entry.
Tip
You can pipe the grep
command output to the sed
3 command. sed
is a stream editor that performs basic text transformations on an input stream and writes the result to an output stream. The -n
option suppresses normal output, and the 1p
command prints only the first line of the input. To find the next add entry, you can simply use sed -n 2p
, which outputs the second line.
Produced output (only relevant portions shown):
{ "add": { "path": "part-00000-...-c000.snappy.parquet", … "stats": "{\"numRecords\":5530100,...}}", "tags": { … } }
Here, we see the name of the first data file created for our table, and the number of records in that file. We can use the same command with sed -n 2p
to get the second add action to get the second data file:
{ "add": { "path": "part-00001-...-c000.snappy.parquet", "...: "” stats”: {\"numRecords\":4469895,...}}", "tags": { … } } }
Now we know that our table has the following data files:
Parquet filename | Number of records |
---|---|
part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet | 5,530,100 |
part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet | 4,469,895 |
These files correspond with our directory listing, so the transaction log and the directory listing report are consistent:
%sh ls -al /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta drwxrwxrwx 2 _delta_log -rwxrwxrwx 1 part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet -rwxrwxrwx 1 part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet
Performing the DELETE Operation
Let’s assume that we want to delete a single record, in this case the record with RideId = 100000
. First, we should make sure that the record is indeed still in the table with a SQL SELECT:4
%sql -- First, show that you have data for RideId = 10000 SELECT RideId, VendorId, CabNumber, TotalAmount FROM taxidb.YellowTaxis WHERE RideId = 100000
Output:
+--------+----------+-----------+-------------+ | RideId | VendorId | CabNumber | TotalAmount | +--------+----------+-----------+-------------+ | 100000 | 2 | T478827C | 7.56 | +--------+----------+-----------+-------------+
To delete this row, we can use a simple SQL DELETE
. We can use the DELETE
command to selectively delete rows based upon a predicate, or filtering, condition:
%sql DELETE FROM taxidb.YellowTaxis WHERE RideId = 100000
Output:
+------------------+ |num_affected_rows | +------------------+ | 1 | +------------------+
We can confirm that we did in fact delete one row. When we use the DESCRIBE HISTORY
command to look at the different operations on the table, we get the following for version 1 (the output of the row is pivoted for readability):
version: 1 timestamp: 2022-12-14T17:50:23.000+0000 operation: DELETE operationParameters: [('predicate', '["(spark_catalog.taxidb.YellowTaxis.RideId = 100000)"]')] operationMetrics: [('numRemovedFiles', '1'), ('numCopiedRows', '5530099'), ('numAddedChangeFiles', '0'), ('executionTimeMs', '32534'), ('numDeletedRows', '1'), ('scanTimeMs', '1524'), ('numAddedFiles', '1'), ('rewriteTimeMs', '31009')]
We can see the operation was a DELETE
and the predicate we used for the deletion was WHERE RideId = 100000
. Delta Lake removed one file (numRemovedFiles
= 1
) and added one new file (numAddedFiles =
1
). If we use our trusted grep
command to find out the details, things look as follows:
Action | Filename | # of records |
---|---|---|
Add | part-00000-96c2f047-99cc-4a43-b2ea-0d3e0e77c4c1-c000.snappy.parquet | 5,530,099 |
Remove | part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet | 4,469,895 |
Figure 4-1 illustrates Delta Lake’s actions when we deleted the record.
Delta Lake performs the following actions as part of the DELETE
operation:
Delta Lake made the first scan of the data to identify any files containing rows matching the predicate condition. In this case, the file is the
e229aa1d5616
data file; it contains the record withRideId = 100000
.In a second scan, Delta Lake reads the matching data files into memory. At this point Delta Lake deletes the rows in question before writing out a new clean data file to storage. This new data file is the
0d3e0e77c4c1
data file. Since Delta Lake deleted one record, this new data file contains 5,530,099 records (5,530,100 – 1).As Delta Lake completes the
DELETE
operation, the data filee229aa1d5616
is now removed from the Delta transaction log, since it is no longer part of the Delta table. This process is called “tombstoning.” However, it is important to note that this old data file is not deleted, because you might still need it to time travel back to an earlier version of the table. You can use theVACUUM
command to delete files older than a certain time period. Time travel and theVACUUM
command are covered in detail in Chapter 6.The data file
fedaa0f6623d
remains part of the Delta table, since no changes applied to it.
We can see the one data file (0d3e0e77c4c1
) that has been added to the directory in our directory listing:
%sh ls -al /dbfs/mnt/datalake/book/chapter04/YellowTaxisDelta/ drwxrwxrwx _delta_log -rwxrwxrwx part-00000-96c2f047-99cc-4a43-b2ea-0d3e0e77c4c1-c000.snappy.parquet -rwxrwxrwx part-00000-d39cbaa1-ea7a-4913-a416-e229aa1d5616-c000.snappy.parquet -rwxrwxrwx part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet
The data file e229aa1d5616
was not physically deleted.
The most important message to take away from this is that the delete transaction occurs at the data file level. Delta Lake will create new partitions and insert new add file and remove file actions in the transaction log, as needed. Chapter 6 on performance tuning will cover the VACUUM
command and other ways to clean up tombstoned data files that are no longer required.
DELETE Performance Tuning Tips
The main way to improve the performance of a DELETE
operation on Delta Lake is to add more predicates to narrow the search spectrum. For example, if you have partitioned data, and know the partition that the to-be-deleted records are part of, you can add their partition clause to the DELETE
predicate.
Delta Lake also provides a number of other optimization conditions, such as data skipping and z-order optimization. Z-ordering reorganizes the layout of each data file so that similar column values are strategically colocated near one another for maximum efficiency. Please refer to Chapter 5 for more details.
Updating Data in a Table
Now that you have seen the impact of a DELETE
operation on the YellowTaxis
table, let’s take a quick look at an UPDATE
operation. You can use the UPDATE
operation to selectively update any rows matching a filtering condition, also known as a predicate.
Use Case Description
Let’s assume there was an error with the DropLocationId
for the record where RideId =
9999994
. First, let’s ensure this record is present in our table with the following SELECT
:
SELECT INPUT_FILE_NAME(), RideId, VendorId, DropLocationId FROM taxidb.YellowTaxis WHERE RideId = 9999994
The Spark SQL INPUT_FILE_NAME()
function is a handy function that gives us the filename in which the record is located:
+---------------------------+---------+----------+----------------+ | input_file_name() | RideId | VendorId | DropLocationId | +---------------------------+---------+----------+----------------+ | .../part-00001-...parquet | 9999994 | 1 | 243 | +---------------------------+---------+----------+----------------+
The INPUT_FILE_NAME
function shows that our record is located in the fedaa0f6623d
data file, which makes sense, since it is one of the last records, so logically it is located in the last-created data file. We can see that the existing DropLocationId
is currently 243. Let’s assume that we want to update this field to have a value of 250. We’ll take a look at the actual DELETE
operation next.
Updating Data in a Table
We can now write the UPDATE
SQL statement as follows:
%sql UPDATE taxidb.YellowTaxis SET DropLocationId = 250 WHERE RideId = 9999994
We see that we updated a single row:
+-------------------+ | num_affected_rows | +-------------------+ | 1 | +-------------------+
Let’s first verify that we updated the table successfully:
%sql SELECT RideId, DropLocationId FROM taxidb.YellowTaxis WHERE RideId = 9999994 +---------+----------------+ | RideId | DropLocationId | +---------+----------------+ | 9999994 | 250 | +---------+----------------+
The output shows the record was updated successfully. When we use the DESCRIBE HISTORY
command on the table, we see the UPDATE
operation on version 3 of the table (output pivoted for clarity):
version: 3 timestamp: 2022-12-23 17:20:45+00:00 operation: UPDATE operationParameters: [('predicate', '(RideId = 9999994)')] operationMetrics: [('numRemovedFiles', '1'), ('numCopiedRows', '4469894'), ('numAddedChangeFiles', '0'), ('executionTimeMs', '25426'), ('scanTimeMs', '129'), ('numAddedFiles', '1'), ('numUpdatedRows', '1'), ('rewriteTimeMs', '25293')]
One file was removed ('numRemovedFiles', '1'
), and one was added ('numAddedFiles', '1'
). We can also see our UPDATE
predicate [('predicate', '(RideId = 9999994)')]
. If we use the grep
command to find out the details, things look as follows:
Action | Filename | # of records |
---|---|---|
Add | part-00000-da1ef656-46e-4de5-a189-50807db851f6-c000.snappy.parquet | 4,469,895 |
Remove | part-00001-947cccf8-41ae-4212-a474-fedaa0f6623d-c000.snappy.parquet | 4,469,895 |
Figure 4-2 illustrates the actions that Delta Lake took when we deleted the record.
Delta Lake performs an UPDATE
on a table in two steps:
It finds and selects the data files containing data that match the predicate and therefore need to be updated. Delta Lake uses data skipping whenever possible to speed up this process. In this case, that is the
fedaa0f6623d
data file. We could also verify that with theINPUT_FILE_NAME()
SQL function.Next, Delta Lake reads each matching file into memory, updates the relevant rows, and writes out the result in a new data file. The new data file in this case is the
50807db851f6
file. It now contains all the records of thefedaa0f6623d
partition, but with the applied updates, which in this case is the update forRideId = 9999994
. This data file is50807db851f6
. This data file continues to hold 4,469,895 records. Once Delta Lake has executed theUPDATE
successfully, it adds an add file action for the new data file.
Since it is no longer required, the data file fedaa0f6623d
is removed from the Delta table with a remove commit action in the transaction log. However, like the DELETE
operation, the file is not physically deleted, in case we might want to look at an old version of the table with time travel.
The data file 0d3e0e77c4c1
was unaffected by our update, so it remains part of the Delta table and continues to hold 5,530,099 records.
UPDATE Performance Tuning Tips
Like the DELETE
operation, the main way to improve the performance of an UPDATE
command on Delta Lake is to add more predicates to narrow the search scope. The more specific the search, the fewer files Delta Lake needs to scan and/or modify.
As mentioned in the previous section, other Delta Lake features, such as Z-ordering, can be used to speed up UPDATE
operations further. See Chapter 5 for details on Delta Lake optimization.
Upsert Data Using the MERGE Operation
The Delta Lake MERGE
command allows you to perform upserts on your data. An upsert is a mix of an UPDATE
and an INSERT
command. To understand upserts, let’s assume that we have an existing table (the target table) and a source table that contain a mix of new records and updates to existing records. Here is how an upsert actually works:
When a record from the source table matches a preexisting record in the target table, Delta Lake updates the record.
When there is no such match, Delta Lake inserts the new record.
Use Case Description
Let’s apply a MERGE
operation to our YellowTaxis
table. Let’s perform a count of our YellowTaxis
table:
%sql SELECT COUNT(*) FROM taxidb.YellowTaxis
We see that we have 9,999,994 records.
+----------+ | count(1) | +----------+ | 9999994 | +----------+
We want to reinsert the record with RideId = 100000
that we deleted in the DELETE
section of this chapter. So, in our source data, we need one record with a RideId
set to 100000
.
For this example, let’s assume we also want to update the records with RideId
= 999991
because the VendorId
was inserted wrong, and it needs to be updated to 1 (VendorId = 1
) for these five records. Finally, we want to bring the record count to an even 10,000,000 records, so we have 5 more records, with RideId
s ranging from 999996
through 10000000
.
The MERGE Dataset
In our companion source data files for the book, we have a file named YellowTaxisMergeData.csv, which has these records. Since we need to supply a schema, we first read the schema from our existing table:
df = spark.read.format("delta").table("taxidb.YellowTaxis") yellowTaxiSchema = df.schema print(yellowTaxiSchema)
Once we have loaded the schema, we can load our merge data CSV file:
yellowTaxisMergeDataFrame = spark \ .read \ .option("header", "true") \ .schema(yellowTaxiSchema) \ .csv("/mnt/datalake/book/chapter04/YellowTaxisMergeData.csv") .sort(col("RideId")) yellowTaxisMergeDataFrame.show()
A partial output is shown here:
+----------+----------+------------------------------+ | RideId | VendorId | PickupTime | +----------+----------+------------------------------+ | 100000 | 2 | 2022-03-01T00:00:00.000+0000 | | 9999991 | 1 | 2022-04-04T20:54:04.000+0000 | | 9999992 | 1 | 2022-04-04T20:54:04.000+0000 | | 9999993 | 1 | 2022-04-04T20:54:04.000+0000 | | 9999994 | 1 | 2022-04-04T20:54:04.000+0000 | | 9999995 | 1 | 2022-04-04T20:54:04.000+0000 | | 9999996 | 3 | 2022-03-01T00:00:00.000+0000 | | 9999997 | 3 | 2022-03-01T00:00:00.000+0000 | | 9999998 | 3 | 2022-03-01T00:00:00.000+0000 | | 9999999 | 3 | 2022-03-01T00:00:00.000+0000 | | 10000000 | 3 | 2022-03-01T00:00:00.000+0000 | +----------+----------+------------------------------+
We can see our record with RideId = 100000
, the five records (9999991
through 9999995
) with their new VendorId
of 1
, and the five new records, starting at 9999996
.
We want to write our MERGE
statement in SQL, so we need to have our DataFrame available in SQL. The DataFrame
class has a handy method called createOrReplaceTempView
, which does exactly that:
# Create a Temporary View on top of our DataFrame, making it # accessible to the SQL MERGE statement below yellowTaxisMergeDataFrame.createOrReplaceTempView("YellowTaxiMergeData")
We can now just use the view name in SQL:
%sql SELECT * FROM YellowTaxiMergeData
This shows exactly the same output as shown with the display()
method of the DataFrame.
The MERGE Statement
You can now write your MERGE
statement as follows:
%sql MERGE INTO taxidb.YellowTaxis AS target USING YellowTaxiMergeData AS source ON target.RideId = source.RideId -- You need to update the VendorId if the records -- matched WHEN MATCHED THEN -- If you want to update all columns, -- you can say "SET *" UPDATE SET target.VendorId = source.VendorId WHEN NOT MATCHED THEN -- If all columns match, you can also do a "INSERT *" INSERT(RideId, VendorId, PickupTime, DropTime, PickupLocationId, DropLocationId, CabNumber, DriverLicenseNumber, PassengerCount, TripDistance, RateCodeId, PaymentType, TotalAmount, FareAmount, Extra, MtaTax, TipAmount, TollsAmount, ImprovementSurCharge) VALUES(RideId, VendorId, PickupTime, DropTime, PickupLocationId, DropLocationId, CabNumber, DriverLicenseNumber, PassengerCount, TripDistance, RateCodeId, PaymentType, TotalAmount, FareAmount, Extra, MtaTax, TipAmount, TollsAmount, ImprovementSurCharge)
Let’s analyze this statement:
We are going to
MERGE INTO
theYellowTaxis
Delta table. Notice that we give the table an alias of source.Using the
USING
clause we specify the source dataset, which in this case is the viewYellowTaxiMergeData
, and give it an alias of source.Define the join condition between the source and target dataset. In our case, we simply want to join on the
VendorId
. If you have partitioned data, and want to target a partition, you might want to add that condition here with anAND
statement.Specify the action when the
RideId
matches between thesource
andtarget
. In this use case, we want to update the source with theVendorId
of the source, which is set to 1. Here, we are only updating one column, but if we need to, we can supply a column list, separated by commas. If we want to update all columns, we simply sayUPDATE SET *
.Define the action when the record exists in the source, but not in the target. We do not have any additional condition with the
WHEN NOT MATCHED
, but you can add any additional clauses if the use case calls for it. Most of the time you will provide anINSERT
statement as an action. Since our source and target column names are identical, we could have also used a simpleINSERT
*
.
When we execute this MERGE
statement, we get the following output:
+-------------------+------------------+------------------+-------------------+ | num_affected_rows | num_updated_rows | num_deleted_rows | num_inserted_rows | +-------------------+------------------+------------------+-------------------+ | 11 | 5 | 0 | 6 | +-------------------+------------------+------------------+-------------------+
This output is exactly what you expected:
We update five rows (
VendorId
s9999991
through9999995
)We insert six rows:
One row with a
RideId
of100000
The five rows at the end (
9999996
through10000000
)
We can see the updates on the first five rows:
%sql -- Make sure that the VendorId has been updated -- for the records with RideId between -- 9999991 and 9999995 SELECT RideId, VendorId FROM taxidb.YellowTaxis WHERE RideId BETWEEN 9999991 and 9999995
+---------+----------+ | RideId | VendorId | +---------+----------+ | 9999991 | 1 | | 9999992 | 1 | | 9999993 | 1 | | 9999994 | 1 | | 9999995 | 1 | +---------+----------+
All rows now have the source VendorId
of 1.
We can see the inserted record with RideId
= 100000
:
%sql --Make sure that you have a record with VendorId = 100000 SELECT * FROM taxidb.YellowTaxis WHERE RideId = 100000
Output (partial output shown):
+--------+----------+---------------------------+---------------------------+ | RideId | VendorId | PickupTime | DropTime | +--------+----------+---------------------------+---------------------------+ | 100000 | 2 | 2022-03-01 00:00:00+00:00 | 2022-03-01 00:12:01+00:00 | +--------+----------+---------------------------+---------------------------+
And finally, we can see the new rows with RideId >
9999995
:
%sql SELECT * FROM taxidb.YellowTaxis WHERE RideId > 9999995 +----------+----------+---------------------------+ | RideId | VendorId | PickupTime | +----------+----------+---------------------------+ | 9999996 | 3 | 2022-03-01 00:00:00+00:00 | | 9999997 | 3 | 2022-03-01 00:00:00+00:00 | | 9999998 | 3 | 2022-03-01 00:00:00+00:00 | | 9999999 | 3 | 2022-03-01 00:00:00+00:00 | | 10000000 | 3 | 2022-03-01 00:00:00+00:00 | +----------+----------+---------------------------+
And a grand total of 10 million records:
%sql SELECT COUNT(RideId) FROM taxidb.YellowTaxis +----------+ | count(1) | +----------+ | 10000000 | +----------+
Modifying unmatched rows using MERGE
An important addition to the Delta Lake MERGE
operation is the recently released WHEN NOT MATCHED BY SOURCE
clause. This clause can be used to UPDATE
or DELETE
records in the target table that do not have corresponding records in the source table. This can be a useful operation for deleting records in the target table that no longer exist in the source table, or for flagging records that indicate the data should be considered deleted or inactive, while still keeping the records in the target table (i.e., soft delete).
Note
WHEN NOT MATCHED BY SOURCE
clauses are supported by the Scala, Python, and Java Delta Lake APIs in Delta 2.3 and above. SQL is supported in Delta 2.4 and above.
To delete records that exist in the source tables and not in the target table (i.e., hard delete), use the WHEN NOT MATCHED BY SOURCE
clause, as seen in the following code example:
Note
The WHEN NOT MATCHED BY SOURCE
code is for demonstration purposes only and is not meant to be executed in sequence with the earlier code examples. Please note that if you execute the WHEN NOT MATCHED BY SOURCE
code examples, then the remaining code outputs in this chapter will not align with the examples and expected outputs in this chapter.
%sql MERGE INTO taxidb.YellowTaxis AS target USING YellowTaxiMergeData AS source ON target.RideId = source.RideId WHEN MATCHED UPDATE SET * WHEN NOT MATCHED INSERT * -- DELETE records in the target that are not matched by the source WHEN NOT MATCHED BY SOURCE DELETE
If you wish to flag records in the target table that no longer exist in the source table (i.e., soft delete) that satisfy a certain condition, you can specify a MERGE
condition and an UPDATE
:
%sql MERGE INTO taxidb.YellowTaxis AS target USING YellowTaxiMergeData AS source ON target.RideId = source.RideId WHEN MATCHED UPDATE SET * WHEN NOT MATCHED INSERT * -- Set target.status = 'inactive' when records in the target table -- don’t exist in the source table and condition is met WHEN NOT MATCHED BY SOURCE target.PickupTime >= (current_date() - INTERVAL '5' DAY) THEN UPDATE SET target.status = 'inactive'
It is best practice to add an optional MERGE
condition when you add the WHEN NOT MATCHED BY SOURCE
clause to UPDATE
or DELETE
target rows. This is because when there is no specified MERGE
condition, this can lead to a large number of target rows being modified. Therefore, for best performance, apply a MERGE
condition to the WHEN NOT MATCHED BY SOURCE
clause (e.g., target.PickupTime >= (current_date() - INTERVAL '5' DAY
in the previous code example) to limit the number of target rows being updated or deleted, because then a target row is only modified if that condition is true for that row.
You can also add multiple WHEN NOT MATCHED BY SOURCE
clauses to a MERGE
operation. When there are multiple clauses, they are evaluated in the order they are specified and all WHEN NOT MATCHED BY SOURCE
clauses, except the last one, must have conditions.
Analyzing the MERGE operation with DESCRIBE HISTORY
When we run DESCRIBE HISTORY
on the YellowTaxis
table in the operationsParameters
section of the output, we can see our MERGE
predicate:
operation: MERGE [('predicate', '(target.RideId = source.RideId)'), ('matchedPredicates', '[{"actionType":"update"}]'), ('notMatchedPredicates', '[{"actionType":"insert"}]')]
We can see the join condition (target.RideId = source.RideId
), the matchedPredicate
that specifies an update, and the notMatchedPredicate
, which specifies an insert.
The operationMetrics
output sections show the details of the different actions:
[('numTargetRowsCopied', '4469890'), ('numTargetRowsDeleted', '0'), ('numTargetFilesAdded', '4'), ('executionTimeMs', '91902'), ('numTargetRowsInserted', '6'), ('scanTimeMs', '8452'), ('numTargetRowsUpdated', '5'), ('numOutputRows', '4469901'), ('numTargetChangeFilesAdded', '0'), ('numSourceRows', '11'), ('numTargetFilesRemoved', '1'), ('rewriteTimeMs', '16782')]
Here, we can again see that six rows were inserted (numTargetRowsInserted
), and five rows were updated (numTargetRowsUpdated
). Four new data files were added to our Delta table, and one data file was removed.
Inner Workings of the MERGE Operation
Internally, Delta Lake completes a MERGE
operation like this in two steps:
It first performs an
inner join
between the target table and the source table to select all data files containing matches. This prevents the operation from unnecessarily shuffling data that can be safely ignored.Next, it performs an
outer join
between the selected files in the target and source tables, and applies the appropriateINSERT
,DELETE
, orUPDATE
clause as specified by the user.
The main way that a MERGE
differs from an UPDATE
or a DELETE
under the hood is that Delta Lake uses joins to complete a MERGE
. This allows you to use some unique strategies when trying to improve performance.
Conclusion
DML operations like DELETE
, UPDATE
, and MERGE
are essential operations for any table format and ETL operations, all of which are enabled through the transaction log. By leveraging these operations, you can start efficiently handling data changes and maintaining data integrity in your data platform.
Similar to tables in a traditional RDBMS, you read in this chapter that with Delta tables you can perform DELETE
, UPDATE
, and MERGE
operations, but you can also apply these operations using SQL or the DataFrame API. More importantly, you learned what happens under the hood in Delta Lake with the underlying files in the Delta table directory, and how the transaction log records and tracks these different types of entries. Using the DESCRIBE HISTORY
command, we can view details about the output of a table’s transactions. Each of these operations can also leverage predicates to reduce the number of files scanned and improve performance. Outside of using predicates during operations, there are other performance tuning techniques that can be applied to Delta tables that you will learn about in the following chapter.
Get Delta Lake: Up and Running now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.