Chapter 4. Optimizing the Performance of Iceberg Tables
As you saw in Chapter 3, Apache Iceberg tables provide a layer of metadata that allows the query engine to create smarter query plans for better performance. However, this metadata is only the beginning of how you can optimize the performance of your data.
You have various optimization levers at your disposal, including reducing the number of datafiles, data sorting, table partitioning, row-level update handling, metrics collection, and external factors. These levers play a vital role in enhancing data performance, and this chapter explores each of them, addressing potential slowdowns and providing acceleration insights. Implementing robust monitoring with preferred tools is crucial for identifying optimization needs, including the use of Apache Iceberg metadata tables, which we will cover in Chapter 10.
Compaction
Every procedure or process comes at a cost in terms of time, meaning longer queries and higher compute costs. Stated differently, the more steps you need to take to do something, the longer it will take for you to do it. When you are querying your Apache Iceberg tables, you need to open and scan each file and then close the file when you’re done. The more files you have to scan for a query, the greater the cost these file operations will put on your query. This problem is magnified in the world of streaming or “real-time” data, where data is ingested as it is created, generating lots of files with only a few records in each.
In contrast, batch ingestion, where you may ingest a whole day’s worth or a week’s worth of records in one job, allows you to more efficiently plan how to write the data as better-organized files. Even with batch ingestion, it is possible to run into the “small files problem,” where too many small files have an impact on the speed and performance of your scans because you’re doing more file operations, have a lot more metadata to read (there is metadata on each file), and have to delete more files when doing cleanup and maintenance operations. Figure 4-1 depicts both of these scenarios.
Essentially, when it comes to reading data, there are fixed costs you can’t avoid and variable costs you can avoid using different strategies. Fixed costs include reading the particular data relevant to your query; you can’t avoid having to read the data to process it. Although variable costs would include file operations to access the data, using many of the strategies we will discuss throughout this chapter you can reduce those variable costs as much as possible. After using these strategies, you’ll be using only the necessary compute to get your job done more cheaply and more quickly (getting the job done more quickly has the benefit of being able to terminate compute clusters earlier, reducing their costs).
The solution to this problem is to periodically take the data in all these small files and rewrite it into fewer larger files (you may also want to rewrite manifests if there are too many manifests relative to the number of datafiles you have). This process is called compaction, as you are compacting many files into a few. Compaction is illustrated in Figure 4-2.
Hands-on with Compaction
You may be thinking that while the solution sounds simple, it will involve you having to write some extensive code in Java or Python. Fortunately, Apache Iceberg’s Actions package includes several maintenance procedures (the Actions package is specifically for Apache Spark, but other engines can create their own maintenance operation implementation). This package is used from within Spark either by writing SparkSQL as shown through most of this chapter or by writing imperative code such as the following (keep in mind that these actions still maintain the same ACID guarantees as normal Iceberg transactions):
Table
table
=
catalog
.
loadTable
(
"myTable"
);
SparkActions
.
get
()
.
rewriteDataFiles
(
table
)
.
option
(
"rewrite-job-order"
,
"files-desc"
)
.
execute
();
In this snippet, we initiated a new instance of our table and then triggered rewriteDataFiles
, which is the Spark action for compaction. The builder pattern used by SparkActions
allows us to chain methods together to fine-tune the compaction job to express not only that we want compaction to be done, but also how we want it to be done.
There are several methods you can chain between the call to rewriteDataFiles
and the execute
method that begins the job:
binPack
-
Sets the compaction strategy to binpack (discussed later), which is the default and doesn’t need to be explicitly supplied
Sort
-
Changes the compaction strategy to sort the data rewritten by one or more fields in a priority order, further discussed in “Compaction Strategies”
zOrder
-
Changes the compaction strategy to z-order–sort the data based on multiple fields with equal weighting, further discussed in “Sorting”
filter
-
Enables you to pass an expression used to limit which files are rewritten
option
options
There are several possible options you can pass to configure the job; here are a few important ones:
target-file-size-bytes
-
This will set the intended size of the output files. By default, this will use the
write.target.file-size-bytes
property of the table, which defaults to 512 MB. max-concurrent-file-group-rewrites
-
This is the ceiling for the number of file groups to write simultaneously.
max-file-group-size-bytes
-
The maximum size of a file group is not one single file. This setting should be used when dealing with partitions larger than the memory available to the worker writing a particular file group so that it can split that partition into multiple file groups to be written concurrently.
partial-progress-enabled
-
This allows commits to occur while file groups are compacted. Therefore, for long-running compaction, this can allow concurrent queries to benefit from already compacted files.
partial-progress-max-commits
-
If partial progress is enabled, this setting sets the maximum number of commits allowed to complete the job.
rewrite-job-order
-
The order to write file groups, which can matter when using partial progress to make sure the higher-priority file groups are committed sooner rather than later, can be based on the groups ordered by byte size or number of files in a group (
bytes-asc
,bytes-desc
,files-asc
,files-desc
,none
).
Note
As the engine plans the new files to be written in the compaction job, it will begin grouping these files into file groups that will be written in parallel (meaning one file from each group can be written concurrently). In your compaction jobs, you can configure options on how big these file groups can be and how many should be written simultaneously to help prevent memory issues.
The following code snippet uses several of the possible table options in practice:
Table table = catalog.loadTable("myTable"); SparkActions .get() .rewriteDataFiles(table) .sort() .filter(Expressions.and( Expressions.greaterThanOrEqual("date", "2023-01-01"), Expressions.lessThanOrEqual("date", "2023-01-31"))) .option("rewrite-job-order", "files-desc") .execute();
In the preceding example, we implemented a sort strategy that, by default, adheres to the sort order specified in the table’s properties. Additionally, we incorporated a filter to exclusively rewrite data from the month of January. It’s important to note that this filter requires creation of an expression using Apache Iceberg’s internal expression-building interface. Furthermore, we configured the rewrite-job-order
to prioritize the rewriting of larger groups of files first. This means a file that is being rewritten from a group of five files will be processed before one that consolidates from just two files.
Note
The Expressions library is designed to facilitate creating expressions around Apache Iceberg’s metadata structures. The library provides APIs to build and manipulate these expressions, which can then be used to filter data in tables and read operations. Iceberg’s expressions can also be used in manifest files to summarize the data in each datafile, which allows Iceberg to skip files that do not contain rows that could match a filter. This mechanism is essential for Iceberg’s scalable metadata architecture.
While this is all well and good, it can be done more easily using the Spark SQL extensions, which include call procedures that can be called using the following syntax from Spark SQL:
-- using positional arguments
CALL
catalog
.
system
.
procedure
(
arg1
,
arg2
,
arg3
)
-- using named arguments
CALL
catalog
.
system
.
procedure
(
argkey1
=>
argval1
,
argkey2
=>
argval2
)
Using the rewriteDataFiles
procedure in this syntax would look like Example 4-1.
Example 4-1. Using the rewrite_data_files
procedure to run compaction jobs
-- Rewrite Data Files CALL Procedure in SparkSQL
CALL
catalog
.
system
.
rewrite_data_files
(
table
=>
'musicians'
,
strategy
=>
'binpack'
,
where
=>
'genre = "rock"'
,
options
=>
map
(
'rewrite-job-order'
,
'bytes-asc'
,
'target-file-size-bytes'
,
'1073741824'
,
-- 1GB
'max-file-group-size-bytes'
,
'10737418240'
-- 10GB
)
)
In this scenario, we may have been streaming some data into our musicians
table and noticed that a lot of small files were generated for rock bands, so instead of running compaction on the whole table, which can be time-consuming, we targeted just the data that was problematic. We also tell Spark to prioritize file groups that are larger in bytes and to keep files that are around 1 GB each with each file group of around 10 GB. You can see what the result of these settings would be in Figure 4-3.
Tip
Notice in Example 4-1 the use of double quotation marks in our where
filter. Because we had to use single quotes around the filter, we use double quotes in the string, even if SQL would normally use single quotes for "rock"
. The where
option is essentially equivalent to the filter method mentioned earlier. Without it, the whole table would possibly be rewritten.
Other engines can implement their own custom compaction tools. For example, Dremio has its own Iceberg table management feature via its OPTIMIZE
command, which is a unique implementation but follows many of the APIs from the RewriteDataFiles
action:
OPTIMIZE TABLE catalog.MyTable
The preceding command would achieve your basic binpack compaction by compacting all the files into fewer, more optimal files. But like the rewriteDataFiles
procedure in Spark, we can get more granular.
For example, here we are compacting only a particular partition:
OPTIMIZE TABLE catalog.MyTable FOR PARTITIONS sales_year IN (2022, 2023) AND sales_month IN ('JAN', 'FEB', 'MAR')
And here we are compacting with particular file size parameters:
OPTIMIZE TABLE catalog.MyTable REWRITE DATA (MIN_FILE_SIZE_MB=100, MAX_FILE_SIZE_MB=1000, TARGET_FILE_SIZE_MB=512)
In this code snippet, we are rewriting only the manifests:
OPTIMIZE TABLE catalog.MyTable REWRITE MANIFESTS
As you can see, you can use Spark or Dremio to achieve compaction of your Apache Iceberg tables.
Compaction Strategies
As mentioned earlier, there are several compaction strategies that you can use when using the rewriteDataFiles
procedure. Table 4-1 summarizes these strategies, including their pros and cons. In this section, we will discuss binpack compaction; standard sorting and z-order sorting will be covered later in the book.
Strategy | What it does | Pros | Cons |
---|---|---|---|
Binpack | Combines files only; no global sorting (will do local sorting within tasks) | This offers the fastest compaction jobs. | Data is not clustered. |
Sort | Sorts by one or more fields sequentially prior to allocating tasks (e.g., sort by field a, then within that, sort by field b) | Data clustered by often queried fields can lead to much faster read times. | This results in longer compaction jobs compared to binpack. |
z-order | Sorts by multiple fields that are equally weighted, prior to allocating tasks (X and Y values in this range are in one grouping; those in another range are in another grouping) | If queries often rely on filters on multiple fields, this can improve read times even further. | This results in longer-running compaction jobs compared to binpack. |
The binpack strategy is essentially pure compaction with no other considerations for how the data is organized beyond the size of the files. Of the three strategies, binpack is the fastest as it can just write the contents of the smaller files to a larger file of your target size, whereas sort and z-order must sort the data before they can allocate file groups for writing. This is particularly useful when you have streaming data and need compaction to run at a speed that meets your service level agreements (SLAs).
Tip
If an Apache Iceberg table has a sort order set within its settings, even if you use binpack, this sort order will be used for sorting data within a single task (local sort). Using the sort and z-order strategies will sort the data before the query engine allocates the records into different tasks, optimizing the clustering of data across tasks.
If you were ingesting streaming data, you may need to run a quick compaction on data that is ingested after every hour. You could do something like this:
CALL
catalog
.
system
.
rewrite_data_files
(
table
=>
'streamingtable'
,
strategy
=>
'binpack'
,
where
=>
'created_at between "2023-01-26 09:00:00" and "2023-01-26 09:59:59" '
,
options
=>
map
(
'rewrite-job-order'
,
'bytes-asc'
,
'target-file-size-bytes'
,
'1073741824'
,
'max-file-group-size-bytes'
,
'10737418240'
,
'partial-progress-enabled'
,
'true'
)
)
In this compaction job, the binpack strategy is employed for faster alignment with streaming SLA requirements. It specifically targets data ingestion within a one-hour time frame, which can dynamically adjust to the most recent hour. The use of partial progress commits ensures that as file groups are written, they are immediately committed, leading to immediate performance enhancements for readers. Importantly, this compaction process focuses solely on previously written data, isolating it from any concurrent writes coming from streaming operations that would introduce new datafiles.
Using a faster strategy on a limited scope of data can make your compaction jobs much faster. Of course, you could probably compact the data even more if you allowed compaction beyond one hour, but you have to balance out the need to run the compaction job quickly with the need for optimization. You may have an additional compaction job for a day’s worth of data overnight and a compaction job for a week’s worth of data over the weekend to keep optimizing in continuous intervals while interfering as little as possible with other operations. Keep in mind that compaction always honors the current partition spec, so if data from an old partition spec is rewritten, it will have the new partitioning rules applied.
Automating Compaction
It would be a little tricky to meet all your SLAs if you have to manually run these compaction jobs, so looking into how to automate these processes could be a real benefit. Here are a couple of approaches you can take to automate these jobs:
-
You can use an orchestration tool such as Airflow, Dagster, Prefect, Argo, or Luigi to send the proper SQL to an engine such as Spark or Dremio after an ingestion job completes or at a certain time or periodic interval.
-
You can use serverless functions to trigger the job after data lands in cloud object storage.
-
You can set up cron jobs to run the appropriate jobs at specific times.
These approaches require you to script out and deploy these services manually. However, there is also a class of managed Apache Iceberg catalog services that features automated table maintenance and includes compaction. Examples of these kinds of services include Dremio Arctic and Tabular.
Sorting
Before we get into the details of the sort compaction strategy, let’s understand sorting as it relates to optimizing a table.
Sorting or “clustering” your data has a very particular benefit when it comes to your queries: it helps limit the number of files that need to be scanned to get the data needed for a query. Sorting the data allows data with similar values to be concentrated into fewer files, allowing for more efficient query planning.
For example, suppose you have a dataset representing every player on every NFL team across 100 Parquet files that aren’t sorted in any particular way. If you did a query just for players on the Detroit Lions, even if a file of 100 records has only one record of a Detroit Lions player, that file must be added to the query plan and be scanned. This means you may need to scan up to 53 files (the maximum number of players that can be on an NFL team). If you sorted the data alphabetically by team name, all the Detroit Lions players should be in about four files (100 files divided by 32 NFL teams equals 3.125), which would probably include a handful of players from the Green Bay Packers and the Denver Broncos. So, by having the data sorted, you’ve reduced the number of files you have to scan from possibly 53 to 4, which, as we discussed in “Compaction Strategies”, greatly improves the performance of the query. Figure 4-4 depicts the benefits of scanning sorted datasets.
Sorted data can be quite useful if how the data is sorted leans into typical query patterns such as in this example, where you may regularly query the NFL data based on a particular team. Sorting data in Apache Iceberg can happen at many different points, so you want to make sure you leverage all these points.
There are two main ways to create a table. One way is with a standard CREATE TABLE
statement:
-- Spark Syntax
CREATE
TABLE
catalog
.
nfl_players
(
id
bigint
,
player_name
varchar
,
team
varchar
,
num_of_touchdowns
int
,
num_of_yards
int
,
player_position
varchar
,
player_number
int
,
)
-- Dremio Syntax
CREATE
TABLE
catalog
.
nfl_players
(
id
bigint
,
player_name
varchar
,
team
varchar
,
num_of_touchdowns
int
,
num_of_yards
int
,
player_position
varchar
,
player_number
int
,
)
The other way is with a CREATE TABLE…AS SELECT
(CTAS) statement:
-- Spark SQL & Dremio Syntax
CREATE
TABLE
catalog
.
nfl_players
AS
(
SELECT
*
FROM
non_iceberg_teams_table
);
After creating the table, you set the sort order of the table, which any engine that supports the property will use to sort the data before writing and will also be the default sort field when using the sort compaction strategy:
ALTER
TABLE
catalog
.
nfl_teams
WRITE
ORDERED
BY
team
;
If doing a CTAS, sort the data in your AS
query:
CREATE
TABLE
catalog
.
nfl_teams
AS
(
SELECT
*
FROM
non_iceberg_teams_table
ORDER
BY
team
);
ALTER
TABLE
catalog
.
nfl_teams
WRITE
ORDERED
BY
team
;
The ALTER TABLE
statement sets a global sort order that will be used for all future writes by engines that honor the sort order. You could also specify it with INSERT INTO
, like so:
INSERT
INTO
catalog
.
nfl_teams
SELECT
*
FROM
staging_table
ORDER
BY
team
This will ensure that the data is sorted as you write it, but it isn’t perfect. Going back to the previous example, if the NFL dataset was updated each year for changes in the teams’ rosters, you may end up having many files splitting Lions and Packers players from multiple writes. This is because you’d now need to write a new file with the new Lions players for the current year. This is where the sort compaction strategy comes into play.
The sort compaction strategy will sort the data across all the files targeted by the job. So, for example, if you wanted to rewrite the entire dataset with all players sorted by team globally, you could run the following statement:
CALL
catalog
.
system
.
rewrite_data_files
(
table
=>
'nfl_teams'
,
strategy
=>
'sort'
,
sort_order
=>
'team ASC NULLS LAST'
)
Here is a breakdown of the string that was passed for the sort order:
team
-
Will sort the data by the
team
field ASC
-
Will sort the data in ascending order (
DESC
would sort in descending order) NULLS LAST
-
Will put all players with a null value at the end of the sort, after the Washington Commanders (
NULLS FIRST
would put all players before the Arizona Cardinals)
Figure 4-5 shows the result of the sort.
You can sort by additional fields as well. For example, you may want the data sorted by team, but then within each team you may want it sorted alphabetically by name. You can achieve this by running a job with these parameters:
CALL
catalog
.
system
.
rewrite_data_files
(
table
=>
'nfl_teams'
,
strategy
=>
'sort'
,
sort_order
=>
'team ASC NULLS LAST, name ASC NULLS FIRST'
)
Sorting by team will have the highest weight, followed by sorting by name. You’ll probably see players in this order in the file where the Lions roster ends and the Packers roster begins, as shown in Figure 4-6.
If end users regularly asked questions such as “Who are all the Lions players whose name starts with A,” this dual sort would accelerate the query even further. However, if end users asked “Who are all the NFL players whose name starts with A,” this wouldn’t be as helpful, as all the “A” players are stretched across more files than if you had just sorted by name alone. This is where z-ordering can be useful.
The bottom line is that to get the best advantage of sorting, you need to understand the types of questions your end users are asking so that you can have the data sorted to lean into their questions effectively.
Z-order
There are times when multiple fields are a priority when querying a table, and this is where a z-order sort may be quite helpful. With a z-order sort you are sorting the data by multiple data points, which allows engines a greater ability to reduce the files scanned in the final query plan. Let’s imagine we’re trying to locate item Z in a 4 × 4 grid (Figure 4-7).
Referring to “A” in Figure 4-7, we have a value (z), which we can say equals 3.5, and we want to narrow the area we want to search within our data. We can narrow down our search by breaking the field into four quadrants based on ranges of X and Y values, as shown in “B” in the figure.
So if we know what data we are looking for based on fields we z-ordered by, we can possibly avoid searching large portions of the data since it’s sorted by both fields. We can then take that quadrant and break it down even further and apply another z-order sort to the data in the quadrant, as shown in “C” in the figure. Since our search is based on multiple factors (X and Y), we could eliminate 75% of the searchable area by taking this approach.
You can sort and cluster your data in the datafiles in a similar way. For example, let’s say you have a dataset of all people involved in a medical cohort study, and you are trying to organize outcomes in the cohort by age and height; z-ordering the data may be quite worthwhile. You can see this in action in Figure 4-8.
Data that falls into a particular quadrant will be in the same datafiles, which can really slim down files to scan as you try to run analytics on different age/height groups. If you are searching for people with a height of 6 feet and an age of 60, you could immediately eliminate the datafiles that have data that belongs in the other three quadrants.
This works because the datafiles will fall into four categories:
-
A: File with records containing Age 1–50 and Height 1–5
-
B: File with records containing Age 51–100 and Height 1–5
-
C: File with records containing Age 1–50 and Height 5–10
-
D: File with records containing Age 51–100 and Height 5–10
If the engine knows you are searching for someone who is 60 years of age and is 6 feet tall, as it uses the Apache Iceberg metadata to plan the query, all the datafiles in categories A, B, and C will be eliminated and will never be scanned. Keep in mind that even if you only searched by age, you’d see a benefit from clustering by being able to eliminate at least two of the four quadrants.
Achieving this would involve running a compaction job:
CALL
catalog
.
system
.
rewrite_data_files
(
table
=>
'people'
,
strategy
=>
'sort'
,
sort_order
=>
'zorder(age,height)'
)
Using the sort and z-order compaction strategies not only allows you to reduce the number of files your data exists in, but also makes sure the order of the data in those files enables even more efficient query planning.
While sorting is effective, it comes with some challenges. First, as new data is ingested, it becomes unsorted, and until the next compaction job, the data remains somewhat scattered across multiple files. This occurs because new data is added to a new file and is potentially sorted within that file but not in the context of all previous records. Second, files may still contain data for multiple values of the sorted field, which can be inefficient for queries that only require data with a specific value. For instance, in the earlier example, files contained data for both Lions and Packers players, making it inefficient to scan Packers records when you were only interested in Lions players.
Partitioning
If you know a particular field is pivotal to how the data is accessed, you may want to go beyond sorting and into partitioning. When a table is partitioned, instead of just sorting the order based on a field, it will write records with distinct values of the target field into their own datafiles.
For example, in politics, you’ll likely often query voter data based on a voter’s party affiliation, making this a good partition field. This would mean all voters in the “Blue” party will be listed in distinct files from those in the “Red,” “Yellow,” and “Green” parties. If you were to query for voters in the “Yellow” party, none of the datafiles you scan would include anyone from any other parties. You can see this illustrated in Figure 4-9.
Traditionally, partitioning a table based on derived values of a particular field required creating an additional field that had to be maintained separately and required users to have knowledge of that separate field when querying. For example:
-
Partitioning by day, month, or year on a timestamp column required you to create an additional column based on the timestamp expressing the year, month, or day in isolation.
-
Partitioning by the first letter of a text value required you to create an additional column that only had that letter.
-
Partitioning into buckets (a set number of divisions to evenly distribute records into based on a hash function) required you to create an additional column that stated which bucket the record belonged in.
You’d then set the partitioning at table creation to be based on the derived fields, and the files would be organized into subdirectories based on their partition:
--Spark SQL
CREATE
TABLE
MyHiveTable
(...)
PARTITIONED
BY
month
;
You’d have to manually transform the value every time you inserted records:
INSERT
INTO
MyTable
(
SELECT
MONTH
(
time
)
AS
month
,
...
FROM
data_source
);
When querying the table, the engine would have no awareness of the relationship between the original field and the derived field. This would mean that the following query would benefit from partitioning:
SELECT
*
FROM
MYTABLE
WHERE
time
BETWEEN
'2022-07-01 00:00:00'
AND
'2022-07-31
00:00:00'
AND
month
=
7
;
However, users often aren’t aware of this workaround column (and they shouldn’t have to be). This means that most of the time, users would issue a query similar to the following, which would result in a full table scan, making the query take much longer to complete and consume far more resources:
SELECT
*
FROM
MYTABLE
WHERE
time
BETWEEN
'2022-07-01 00:00:00'
AND
'2022-07-31
00:00:00'
;
The preceding query is more intuitive for a business user or data analyst using the data, as they may not be as aware of the internal engineering of the table, resulting in many accidental full table scans. This is where Iceberg’s hidden partitioning capability comes in.
Partition Evolution
Another challenge with traditional partitioning is that since it relied on the physical structure of the files being laid out into subdirectories, changing how the table was partitioned required you to rewrite the entire table. This becomes an inevitable problem as data and query patterns evolve, necessitating that we rethink how we partition and sort the data.
Apache Iceberg solves this problem with its metadata-tracked partitioning as well, because the metadata tracks not only partition values but also historical partition schemes, allowing the partition schemes to evolve. So, if the data in two different files were written based on two different partition schemes, the Iceberg metadata would make the engine aware so that it could create a plan with partition scheme A separately from partition scheme B, creating an overall scan plan at the end.
For example, let’s say you have a table of membership records partitioned by the year in which members registered:
CREATE
TABLE
catalog
.
members
(...)
PARTITIONED
BY
years
(
registration_ts
)
USING
iceberg
;
Then, several years later, the pace of membership growth made it worthwhile to start breaking the records down by month. You could alter the table to adjust the partitioning like so:
ALTER
TABLE
catalog
.
members
ADD
PARTITION
FIELD
months
(
registration_ts
)
The neat thing about Apache Iceberg’s date-related partition transforms is that if you evolve to something granular, there is no need to remove the less granular partitioning rule. However, if you are using bucket or truncate and you decide you no longer want to partition the table by a particular field, you can update your partition scheme like so:
ALTER
TABLE
catalog
.
members
DROP
PARTITION
FIELD
bucket
(
24
,
id
);
When a partitioning scheme is updated, it applies only to new data written to the table going forward, so there is no need to rewrite the existing data. Also, keep in mind that any data rewritten by the rewriteDataFiles
procedure will be rewritten using the new partitioning scheme, so if you want to keep older data in the old scheme, make sure to use the proper filters in your compaction jobs to not rewrite it.
Other Partitioning Considerations
Say you migrate a Hive table using the migrate procedure (discussed in Chapter 13). It may currently be partitioned on a derived column (e.g., a month column based on a timestamp column in the same table), but you want to express to Apache Iceberg that it should use an Iceberg transform instead. There is a REPLACE PARTITION
command for just this purpose:
ALTER
TABLE
catalog
.
members
REPLACE
PARTITION
FIELD
registration_day
WITH
days
(
registration_ts
)
AS
day_of_registration
;
This will not alter any datafiles, but it will allow the metadata to track the partition values using Iceberg transforms.
You can optimize tables in many ways. For example, using partitioning to write data with unique values to unique files, sorting the data in those files, and then making sure to compact those files into fewer larger files will keep your table performance nice and crisp. Although it’s not always about general use optimization, there are particular use cases, such as row-level updates and deletes, that you can optimize for as well using copy-on-write and merge-on-read.
Copy-on-Write Versus Merge-on-Read
Another consideration when it comes to the speed of your workloads is how you handle row-level updates. When you are adding new data, it just gets added to a new datafile, but when you want to update preexisting rows to either update or delete them, there are some considerations you need to be aware of:
-
In data lakes, and therefore in Apache Iceberg, datafiles are immutable, meaning they can’t be changed. This provides lots of benefits, such as the ability to achieve snapshot isolation (since files that old snapshots refer to will have consistent data).
-
If you’re updating 10 rows, there is no guarantee they are in the same file, so you may have to rewrite 10 files and every row of data in them to update 10 rows for the new snapshot.
There are three approaches to dealing with row-level updates, covered in detail throughout this section and summarized in Table 4-2.
Update style | Read speed | Write speed | Best practice |
---|---|---|---|
Copy-on-write | Fastest reads | Slowest updates/deletes | |
Merge-on-read (position deletes) | Fast reads | Fast updates/deletes | Use regular compaction to minimize read costs. |
Merge-on-read (equality deletes) | Slow reads | Fastest updates/deletes | Use more frequent compaction to minimize read costs. |
Copy-on-Write
The default approach is referred to as copy-on-write (COW). In this approach, if even a single row in a datafile is updated or deleted, that datafile is rewritten, and the new file takes its place in the new snapshot. You can see this exemplified in Figure 4-10.
This is ideal if you’re optimizing for reads because read queries can just read the data without having to reconcile any deleted or updated files. However, if your workloads consist of very regular row-level updates, rewriting entire datafiles for those updates may slow down your updates beyond what your SLAs allow. The pros of this approach include faster reads, while the cons involve slower row-level updates and deletes.
Merge-on-Read
The alternative to copy-on-write is merge-on-read (MOR), where instead of rewriting an entire datafile, you capture in a delete file the records to be updated in the existing file, with the delete file tracking which records should be ignored.
If you are deleting a record:
-
The record is listed in a delete file.
-
When a reader reads the table, it will reconcile the datafile with the delete file.
If you are updating a record:
-
The record to be updated is tracked in a delete file.
-
A new datafile is created with only the updated record.
-
When a reader reads the table, it will ignore the old version of the record because of the delete file and use the new version in the new datafile.
This is depicted in Figure 4-11.
This avoids the need to rewrite unchanged records to new files just because they exist in a datafile with a record to be updated, speeding up the write transaction. But it comes at the cost of slower reads, as queries will have to scan the delete files to know which records to ignore in the proper datafiles.
To minimize the cost of reads, you’ll want to run regular compaction jobs, and to keep those compaction jobs running efficiently, you’ll want to take advantage of some of the properties you learned before:
-
Use a
filter
/where
clause to only run compaction on the files ingested in the last time frame (hour, day). -
Use partial progress mode to make commits as file groups are rewritten so that readers can start seeing marginal improvements sooner rather than later.
Using these techniques, you can speed up the write side of heavy update workloads while minimizing the costs to read performance. The advantage of this approach includes faster row-level updates, but this comes with the drawback of slower reads due to the need to reconcile delete files.
When doing MOR writes, delete files enable you to track which records need to be ignored in existing datafiles for future reads. We’ll use an analogy to help you understand the high-level concept between the different types of delete files. (Keep in mind which types of delete files are written, as this is usually decided by the engine for particular use cases, not typically by table settings.)
When you have a ton of data and you want to kick out a specific row, you have a couple of options:
-
You can look for the row data based on where it’s sitting in the dataset, kind of like finding your friend in a movie theater by their seat number.
-
You can look for the row data based on what it’s made of, like picking out your friend in a crowd because they’re wearing a bright red hat.
If you use the first option, you’ll use what are called positional delete files. But if you use the second option, you’ll need equality delete files. Each method has its own strengths and weaknesses. This means that depending on the situation, you might want to pick one over the other. It’s all about what works best for you!
Let’s explore these two types of delete files. Position deletes track which rows in which files should be ignored. The following table is an example of how this data is laid out in a position delete file:
Row to delete (position deletes) | |
---|---|
Filepath | Position |
001.parquet | 0 |
001.parquet | 5 |
006.parquet | 5 |
When reading the specified files, the position delete file will skip the row at the specified position. This requires a much smaller cost at read time since it has a pretty specific point at which it must skip a row. However, this has write time costs, since the writer of the delete file will need to know the position of the deleted record, which requires it to read the file with the deleted records to identify those positions.
Equality deletes instead specify values that, if a record matches, should be ignored. The following table shows how the data in an equality delete file may be laid out:
Rows to delete (equality deletes) | |
---|---|
Team | State |
Yellow | NY |
Green | MA |
This requires no write time costs since you don’t need to open and read files to track the targeted values, but it has much greater read time costs. The read time costs exist because there is no information where records with matching values exist, so when reading the data, there has to be a comparison with every record that could possibly contain a matching record. Equality deletes are great if you need the highest write speed possible, but aggressive compaction should be planned to reconcile those equality deletes to reduce the impact on your reads.
Configuring COW and MOR
Whether a table is configured to handle row-level updates via COW or MOR depends on the following:
-
The table properties
-
Whether the engine you use to write to Apache Iceberg supports MOR writes
The following table properties determine whether a particular transaction is handled via COW or MOR:
write.delete.mode
-
Approach to use for delete transactions
write.update.mode
-
Approach to use for update transactions
write.merge.mode
-
Approach to use for merge transactions
Keep in mind that for this and all Apache Iceberg table properties, while many are part of the specification, it is still on the specific compute engine to honor the specification. You may run into different behavior, so read up on which table properties are honored by engines you use for particular jobs that use those properties. Query engine developers will have every intention of honoring all Apache Iceberg table properties, but this does require implementations for the specific engine’s architecture. Over time, engines should have all these properties honored so that you get the same behavior across all engines.
Since Apache Spark support for Apache Iceberg is handled from within the Apache Iceberg project, all these properties are honored from within Spark, and they can be set at the creation of a table in Spark like so:
CREATE
TABLE
catalog
.
people
(
id
int
,
first_name
string
,
last_name
string
)
TBLPROPERTIES
(
'write.delete.mode'
=
'copy-on-write'
,
'write.update.mode'
=
'merge-on-read'
,
'write.merge.mode'
=
'merge-on-read'
)
USING
iceberg
;
This property can also be set after the table is created using an ALTER TABLE
statement:
ALTER
TABLE
catalog
.
people
SET
TBLPROPERTIES
(
'write.delete.mode'
=
'merge-on-read'
,
'write.update.mode'
=
'copy-on-write'
,
'write.merge.mode'
=
'copy-on-write'
);
It’s as simple as that. But remember the following when working with non–Apache Spark engines:
Other Considerations
Beyond your datafiles and how they are organized, there are many levers for improving performance. We will discuss many of them in the following sections.
Metrics Collection
As discussed in Chapter 2, the manifest for each group of datafiles is tracking metrics for each field in the table to help with min/max filtering and other optimizations. The types of column-level metrics that are tracked include:
-
Counts of values, null values, and distinct values
-
Upper and lower bound values
If you have very wide tables (i.e., tables with lots of fields; e.g., 100+), the number of metrics being tracked can start to become a burden on reading your metadata. Fortunately, using Apache Iceberg’s table properties, you can fine-tune which columns have their metrics tracked and which columns don’t. This way, you can track metrics on columns that are often used in query filters and not capture metrics on ones that aren’t, so their metrics data doesn’t bloat your metadata.
You can tailor the level of metrics collection for the columns you want (you don’t need to specify all of them) using table properties, like so:
ALTER
TABLE
catalog
.
db
.
students
SET
TBLPROPERTIES
(
'write.metadata.metrics.column.col1'
=
'none'
,
'write.metadata.metrics.column.col2'
=
'full'
,
'write.metadata.metrics.column.col3'
=
'counts'
,
'write.metadata.metrics.column.col4'
=
'truncate(16)'
,
);
As you can see, you can set how the metrics are collected for each individual column to several potential values:
none
-
Don’t collect any metrics.
counts
-
Only collect counts (values, distinct values, null values).
truncate(XX)
-
Count and truncate the value to a certain number of characters, and base the upper/lower bounds on that. So, for example, a string column may be truncated to 16 characters and have its metadata value ranges be based on the abbreviated string values.
full
-
Base the counts and upper/lower bounds on the full value.
You don’t need to set this explicitly for every column as, by default, Iceberg sets this to truncate(16)
.
Rewriting Manifests
Sometimes the issue isn’t your datafiles, as they are well sized with well-sorted data. It’s that they’ve been written across several snapshots, so an individual manifest could be listing more datafiles. While manifests are more lightweight, more manifests still means more file operations. There is a separate rewriteManifests
procedure to rewrite only the manifest files so that you have a smaller total number of manifest files, and those manifest files list a large number of datafiles:
CALL
catalog
.
system
.
rewrite_manifests
(
'MyTable'
)
If you run into any memory issues while running this operation, you can turn off Spark caching by passing a second argument: false
. If you are rewriting lots of manifests and they are being cached by Spark, it could result in issues with individual executor nodes:
CALL
catalog
.
system
.
rewrite_manifests
(
'MyTable'
,
false
)
When it would be good to run this operation is a matter of when your datafile sizes are optimal but the number of manifest files isn’t. For example, if you have 5 GB of data in one partition split among 10 datafiles but these files are listed within five manifest files, you don’t need to rewrite the datafiles, but you can probably consolidate listing the 10 files into one manifest.
Optimizing Storage
As you make updates to the table or run compaction jobs, new files are created, but old files aren’t being deleted since those files are associated with historical snapshots of the table. To prevent storing a bunch of unneeded data, you should periodically expire snapshots. Keep in mind that you cannot time-travel to an expired snapshot. During expiration, any datafiles not associated with still-valid snapshots will get deleted.
You can expire snapshots that were created on or before a particular timestamp:
CALL
catalog
.
system
.
expire_snapshots
(
'MyTable'
,
TIMESTAMP
'2023-02-01
00:00:00.000'
,
100
)
The second argument is a minimum number of snapshots to retain (by default, it will retain the last five days of snapshots), so it will only expire snapshots that are on or before the timestamp. But if the snapshot falls within the 100 most recent snapshots, it will not expire.
You can also expire particular snapshot IDs:
CALL
catalog
.
system
.
expire_snapshots
(
table
=>
'MyTable'
,
snapshot_ids
=>
ARRAY
(
53
))
In this example, a snapshot with the ID of 53 is expired. We can look up the snapshot ID by opening the metadata.json file and examining its contents or by using the metadata tables detailed in Chapter 10. You may have a snapshot where you expose sensitive data by accident and want to expire that single snapshot to clean up the datafiles created in that transaction. This would give you that flexibility. Expirations are a transaction, so a new metadata.json file is created with an updated list of valid snapshots.
There are six arguments that can be passed to the expire_snapshots
procedure:
table
-
Table to run the operation on
older_than
-
Expires all snapshots on or before this timestamp
retain_last
-
Minimum number of snapshots to retain
snapshot_ids
-
Specific snapshot IDs to expire
max_concurrent _deletes
-
Number of threads to use for deleting files
stream_results
-
When true, sends deleted files to the Spark driver by Resilient Distributed Dataset (RDD) partition, which is useful for avoiding OOM issues when deleting large files
Another consideration when optimizing storage is orphan files. These are files and artifacts that accumulate in the table’s data directory but are not tracked in the metadata tree because they were written by failed jobs. These files will not be cleaned up by expiring snapshots, so a special procedure should sporadically be run to deal with this. This procedure will look at every file in your table’s default location and assess whether it relates to active snapshots. This can be an intensive process (which is why you should only do it sporadically). To delete orphan files, run a command such as the following:
CALL
catalog
.
system
.
remove_orphan_files
(
table
=>
'MyTable'
)
You can pass the following arguments to the removeOrphanFiles
procedure:
table
-
Table to operate on
older_than
-
Only deletes files created on or before this timestamp
location
-
Where to look for orphan files; defaults to the table’s default location
dry_run
-
Boolean if true; won’t delete files, but will return a list of what would be deleted
max_concurrent_deletes
-
Lists the max number of threads for deleting files
While for most tables the data will be located in its default location, there are times you may add external files via the addFiles
procedure (covered in Chapter 13) and later may want to clean artifacts in these directories. This is where the location argument comes in.
Write Distribution Mode
Write distribution mode requires an understanding of how massively parallel processing (MPP) systems handle writing files. These systems distribute the work across several nodes, each doing a job or task. The write distribution is how the records to be written are distributed across these tasks. If no specific write distribution mode is set, data will be distributed arbitrarily. The first X number of records will go to the first task, the next X number to the next task, and so on.
Each task is processed separately, so that task will create at least one file for each partition it has at least one record for. Therefore, if you have 10 records that belong in partition A distributed across 10 tasks, you will end up with 10 files in that partition with one record each, which isn’t ideal.
It would be better if all the records for that partition were allocated to the same tasks so that they can be written to the same file. This is where the write distribution comes in, that is, how the data is distributed among tasks. There are three options:
none
-
There is no special distribution. This is the fastest during write time and is ideal for presorted data.
hash
-
The data is hash-distributed by partition key.
range
-
The data is range-distributed by partition key or sort order.
In a hash distribution, the value of each record is put through a hash function and grouped together based on the result. Multiple values may end up in the same grouping based on the hash function. For example, if you have the values 1, 2, 3, 4, 5, and 6 in your data, you may get a hash distribution of data with 1 and 4 in task A, 2 and 5 in task B, and 3 and 6 in task C. You’ll still write the smallest number of files needed for all your partitions, but less sequential writing will be involved.
In a range distribution, the data is sorted and distributed, so you’d likely have values 1 and 2 in task A, 3 and 4 in task B, and 5 and 6 in task C. This sorting will be done by the partition value or by the SortOrder
if the table has one. In other words, if a SortOrder
is specified, data will be grouped into tasks not just by partition value but also by the value of the SortOrder
field. This is ideal for data that can benefit from clustering on certain fields. However, sorting the data for distribution sequentially has more overhead than throwing the data in a hash function and distributing it based on the output.
There is also a write distribution property to specify the behavior for deletes, updates, and merges:
ALTER
TABLE
catalog
.
MyTable
SET
TBLPROPERTIES
(
'write.distribution-mode'
=
'hash'
,
'write.delete.distribution-mode'
=
'none'
,
'write.update.distribution-mode'
=
'range'
,
'write.merge.distribution-mode'
=
'hash'
,
);
In a situation where you are regularly updating many rows but rarely deleting rows, you may want to have different distribution modes, as a different distribution mode may be more advantageous depending on your query patterns.
Object Storage Considerations
Object storage is a unique take on storing data. Instead of keeping files in a neat folder structure such as a traditional filesystem, object storage tosses everything into what are called buckets. Each file becomes an object and gets a bunch of metadata tagged along with it. This metadata tells us all sorts of things about the file; enabling improved concurrency and resiliency when using object storage as the underlying files can be replicated for regional access or concurrency while all users just interact with it as a simple “object.”
When you want to grab a file from object storage, you’re not clicking through folders. Instead, you’re using APIs. Just like you’d use a GET
or PUT
request to interact with a website, you’re doing the same here to access your data. For example, you’d use a GET
request to ask for a file, the system checks the metadata to find the file, and voilà, you’ve got your data.
This API-first approach helps the system juggle your data, like making copies in different places or dealing with loads of requests at the same time. Object storage, which most cloud vendors provide, is ideal for data lakes and data lakehouses, but it has one potential bottleneck.
Because of the architecture and how object stores handle parallelism, there are often limits on how many requests can go to files under the same “prefix.” Therefore, if you wanted to access /prefix1/fileA.txt and /prefix1/fileB.txt, even though they are different files, accessing both counts toward the limit on prefix1. This becomes a problem in partitions with lots of files, as queries can result in many requests to these partitions and can then run into throttling, which slows down the query.
Running compaction to limit the number of files in a partition can help, but Apache Iceberg is uniquely suited for this scenario since it doesn’t rely on how its files are physically laid out, meaning it can write files in the same partition across many prefixes.
You can enable this in your table properties like so:
ALTER
TABLE
catalog
.
MyTable
SET
TBLPROPERTIES
(
'write.object-storage.enabled'
=
true
);
This will distribute files in the same partition across many prefixes, including a hash to avoid potential throttling.
So, instead of this:
s3://bucket/database/table/field=value1/datafile1.parquet s3://bucket/database/table/field=value1/datafile2.parquet s3://bucket/database/table/field=value1/datafile3.parquet
you’ll get this:
s3://bucket/4809098/database/table/field=value1/datafile1.parquet s3://bucket/5840329/database/table/field=value1/datafile2.parquet s3://bucket/2342344/database/table/field=value1/datafile3.parquet
With the hash in the filepath, each file in the same partition is now treated as if it were under a different prefix, thereby avoiding throttling.
Datafile Bloom Filters
A bloom filter is a way of knowing whether a value possibly exists in a dataset. Imagine a lineup of bits (those 0s and 1s in binary code), all set to a length you decide. Now, when you add data to your dataset, you run each value through a process called a hash function. This function spits out a spot on your bit lineup, and you flip that bit from a 0 to a 1. This flipped bit is like a flag that says, “Hey, a value that hashes to this spot might be in the dataset.”
For example, let’s say we feed 1,000 records through a bloom filter that has 10 bits. When it’s done, our bloom filter might look like this:
[0,1,1,0,0,1,1,1,1,0]
Now let’s say we want to find a certain value; we’ll call it X. We put X through the same hash function, and it points us to spot number 3 on our bit lineup. According to our bloom filter, there’s a 1 in that third spot. This means there’s a chance our value X could be in the dataset because a value hashed to this spot before. So we go ahead and check the dataset to see if X is really there.
Now let’s look for a different value; we’ll call it Y. When we run Y through our hash function, it points us to the fourth spot on our bit lineup. But our bloom filter has a 0 there, which means no value hashed to this spot. So we can confidently say that Y is definitely not in our dataset, and we can save time by not digging through the data.
Bloom filters are handy because they can help us avoid unnecessary data scans. If we want to make them more precise, we can add more hash functions and bits. But remember, the more we add, the bigger our bloom filter gets, and the more space it will need. As with most things in life, it’s a balancing act. Everything is a trade-off.
You can enable the writing of bloom filters for a particular column in your Parquet files (this can also be done for ORC files) via your table properties:
ALTER
TABLE
catalog
.
MyTable
SET
TBLPROPERTIES
(
'write.parquet.bloom-filter-enabled.column.col1'
=
true
,
'write.parquet.bloom-filter-max-bytes'
=
1048576
);
Then engines querying your data may take advantage of these bloom filters to help make reading the datafiles even faster by skipping datafiles where bloom filters clearly indicate that the data you need doesn’t exist.
Conclusion
This chapter explored various strategies for optimizing the performance of Iceberg tables. We looked at critical table performance optimization methods such as compaction, sorting, z-ordering, copy-on-write versus merge-on-read mechanisms, and hidden partitioning. Each of these components plays a pivotal role in enhancing query efficiency, reducing read and write times, and ensuring optimal utilization of resources. Understanding and implementing these strategies effectively can lead to significant improvements in the management and operation of Apache Iceberg tables.
In Chapter 5, we’ll explore the concept of an Iceberg catalog, helping us make sure our Iceberg tables are portable and discoverable between our tools.
Get Apache Iceberg: The Definitive Guide now with the O’Reilly learning platform.
O’Reilly members experience books, live events, courses curated by job role, and more from O’Reilly and nearly 200 top publishers.