If you've read our previous post about our new database you may remember that we selected Delta Lake as our table format for the new database. While it’s true that when we were developing our database we started with the Delta Lake table format, we’ve since made some changes to the system to better suit our needs. Internally we call this Great Lakes as it combines multiple modified Delta Lakes into a single database. This post will cover:
- How the basics of Delta Lake works
- The modifications we’ve made to Delta Lake to better work for us
Delta Lake is an open table format originally created by Databricks and donated to the Linux Foundation in 2019. It provides ACID transactions through multiversion concurrency control (MVCC). It does this all on top of object storage primitives, making it an attractive offering for cloud native databases that perform stream processing.
How Delta Lake Works
Delta Lake maintains the table state with an append only log called the delta log. The delta log isn't a single file but is made up of monotonically increasing immutable version files. These log files contain all file actions that happen on the table (writes, updates, deletes etc.). Because these logs are monotonically increasing, Delta Lake can ensure atomicity with object storages. Object stores can reject creations of the same object with only one write winning. Meaning if two writers to the Delta Lake table attempt to write the same version to the table only one of them will succeed.
For readers of the table they simply read all of the log files and apply them in increasing version order to create the full table state. Of course the number of log files can be quite large so to replay all of these files would make reads incredibly slow. Delta Lake solves this problem with checkpoints. Checkpoints are files that are periodically written after so many commits to the log. They basically act as a rollup of all the commits that came before it in the log. They’re stored with the version they are checkpointing up to (00000000123.checkpoint.parquet) and are stored in Parquet format, allowing them to be quite small using compression and data layout techniques. After writing a checkpoint, to allow readers to quickly discover it, the writer will update the only mutable file in the table. This is the _last_checkpoint file. This file contains the version of the last checkpoint. A full table layout may looks something like below:
00000000123.json
00000000124.json
00000000125.json
00000000125.checkpoint.json
00000000126.json
_last_checkpoint
So now reading a delta log looks like:
- Read
_last_checkpoint - Perform list operation from checkpoint offset
- Load all files in version order returned in the list operation.
Now that the full table state is known readers can perform file pruning. File pruning is the act of removing files from the set of candidates for a given query. This prevents queries from needing to read every file in the table, and instead only read the files that may contain data that matches the constraints of a query. Each file add action in Delta Lake carries with it some metadata. Some of this metadata contains the column bounds contained in the file. For example, consider the add action below. If a query was executing with the predicates of colA > 30 the query could drop this file from consideration without ever reading it because we know it doesn’t contain data that can satisfy the queries' predicates.
{
"add": {
"path": "file_001.parquet",
...
"stats": "{\"numRecords\":1,\"minValues\":{\"colA\": 10...
\"maxValues\":{\"colA\": 20...}"
}
}
Now there are a lot of features and settings that can be contained in these metadata files, but this is the basic operation of a Delta Lake table: atomically written immutable files to create a transaction log. MVCC to allow readers to replay the table at any previous version, and file statistics to make queries fast.
Delta Lake creates a great system for batch and stream processing, it is highly performant when it comes to writes, and it provides queries a quick way of performing file pruning. We’re using Delta Lake for a real time continuous profiling solution, so our queries need to be as fast as possible. Since we're building a profiling solution we were able to turn that magnifying glass on ourselves and use this profiling data to optimize Delta Lake.
Optimized Delta Lake
The first thing we optimized was the data file format that’s used. Delta Lake uses Parquet as its standard for storing data. Parquet is great at taking a lot of data and putting it into very small files. However Parquet is not so good at reading that data back into Arrow records. This is where new file formats come into play. Notably we replaced Parquet with Vortex, you can read more about this decision here, but the short story is we saw a 70% reduction in query times after switching to Vortex as well as a significant savings in the memory footprint of queries just by having a file format that was zero copy into Arrow.
Another thing we noticed in profiling data is queries were spending an awfully long time just parsing the json of the commit logs. We looked at replacing the json with a serialization free format such as flatbuffers. However, because the whole system processes these commits as Arrow records it ended up saving more cpu time and memory having these files also be in Vortex format. This allows us to read the logs directly into Arrow records, and write them directly as Arrow records into the Vortex checkpoint files, saving us about 20% in table load times and up to 40% of the memory required to checkpoint.
This improvement uncovered another latency during queries. Once the log was read, a query still needed to parse the stats string contained in all of the add actions. This could be hundreds or thousands of json strings that all need to be parsed. We replaced these stats strings with formal Vortex struct columns, that way when a log is read into memory, all of the data is readily accessible for pruning; no more parsing of json in query paths.
The rest of the changes we’ve made are less Delta Lake specific, and are more specific to our database’s usage of Delta Lake. While Delta Lake is great at write performance it is inherently limited by the storage layer and how fast commit files can be written. Because we’re running in a cloud environment with object storage level latencies, we noticed that these latencies coupled with heavy write contention on the table caused really long commit latencies as writers fought to win a commit version. Since we operate an append only database there are no logical commit conflicts on tables. A writer cannot update a row while another writer is trying to update that row since updates are never allowed. Leveraging this guarantee meant we could create a centralized commit coordinator that bundled up all the commits into a single commit action. Having this single writer meant we no longer had contention on commits to the table but were still able to have many concurrent writers to the table.
Another change we made to Delta Lake for our use case is embedding files in the log. If you read the previous blog post about our database you may recall that we operate multiple Delta Lake tables. There's a primary table where writes are bundled together to reduce our object storage write request costs. Once enough data is accumulated in this table it is moved from this primary table into its final database specific table. Because of this table move and the multiple disjointed commit logs we needed a way for these compaction operations to be atomic. This is why we added compaction configurations as a virtual file in the log. A virtual file doesn't exist anywhere in object storage, but is instead fully contained in the commit entry that adds it.
{
"add": {
"path": "compaction_uuid.json"
...
"stats": "{\"config\": {\"from\": \"file_001.vortex\"...
\"to\": \"compacted.vortex\"}"
}
}
The contents necessary to perform a compaction are fully serialized inside the stats field. These compaction instructions are important for idempotency. If a compaction fails after partially completing the operation it can be retried without duplicating any of the data since it’s deterministic.
TL;DR
Delta Lake is an amazing open table format designed for write speed. It was exactly what we needed to start from when building our new database for Polar Signals. But like all technologies it was built with specific goals in mind that don't always perfectly align with what you may be building.
We
- Replaced the data and metadata file formats with Vortex files. This created both faster and more effecient reads and writes.
- Moved stats to top level columns in the metadata file reducing query latency by avoiding parsing stats fields.
- Centralized all commits into a single writer into the table, unlocking higher write throughput to the table.
- Added virtual files into the log to be able to safely move data from one table to another without risk of duplication of data.
These are the changes we've made to date to our Great Lakes fork of Delta Lake, while they've shown great performance improvements we're excited to continue to make improvements to our unique utilization of Delta Lake. Today our fork is not open source but we hope to contribute many of the changes back upstream in the future.
If you want to profile your own Delta Lake instances to see where your bottlenecks may be sign up for a free (no credit card required) 14 day trial of Polar Signals.