Let’s continue our series on Delta Lake. In the first article, I covered the basics of the Delta format itself. Today, I would like to share with you some essential information about the Transaction Log.
As you know, after our first article, Delta consists of a set of Parquet files that hold the data itself and JSON files (with additional parquet file) that serve as building blocks of the transaction log, also known as DeltaLog. Why are these JSON files so important? Because they are crucial to understanding how everything works. DeltaLog contains information about every transaction that modifies our data. It guarantees to Spark a single source of truth, which means that every time a query is executed against a Delta table, the engine checks the log to see which data files should be read.
Additionally, please remember that lakehouse or data lake architectures are based on files, not on relational tables, so they have a different design to provide similar capabilities to traditional relational databases. For example, the implementation of atomicity is fully based on the Delta Log, and the basic unit of operation is a file.
Let’s see it in practice.
First of all, let’s open our notebook and create a database that we will use in our examples. (I will use Synapse Analytics Spark pool for all the examples below.)
%%sql CREATE DATABASE mydb
Then, we can create a table. Please pay attention to the keywords ‘using delta‘. Currently, in Spark, Delta is the default format, but we can explicitly specify it in our CREATE TABLE statement.
%%sql CREATE TABLE mydb.employees ( id int, name string, birthdate date ) USING DELTA
when this operation is performed then we can go to the default storage attached to our cluster and see what happened:
As you can see, there are no Parquet files (which is normal because we haven’t added any data yet), but we have an _delta_log folder. This is the key location that will always be called like that, and this directory is used to store the transaction log and metadata associated with a Delta table. Let’s see what’s inside:
We have one JSON file that contains the first record of our transaction log (which we can call a commit), and the _temporary directory is used to store data and metadata temporarily during certain operations. When a Delta table is modified, the changes are first written to the _temporary folder before being applied to the main data files in the table. This allows for more efficient processing of changes, as the changes can be optimized and applied in a batch, instead of updating each individual data file. The _temporary folder is also used during certain operations that require creating temporary files or directories, such as during merge operations or when compacting small files in a table. The temporary files are created in the _temporary folder and are then moved to the main data files once the operation is complete.
When we open the json file it looks like this:
{ "commitInfo": { "timestamp": 1679759207135, "operation": "CREATE TABLE", "operationParameters": { "isManaged": "true", "description": null, "partitionBy": "[]", "properties": "{}" }, "isolationLevel": "Serializable", "isBlindAppend": true, "operationMetrics": {}, "engineInfo": "Apache-Spark/3.3.1.5.2-84175989 Delta-Lake/2.2.0.1", "txnId": "4d8f952b-e3dd-4390-affc-ee2d8075a843" } }
A lot of useful information is contained in the commit JSON file. Let’s go through it step by step:
- timestamp: A Unix timestamp representing the time the commit was made, measured in milliseconds since the epoch (01.01.1970).
- operation: The type of operation that was performed. In this case, it is CREATE TABLE, indicating that a new Delta Lake table was created.
- operationParameters: An object containing the parameters that were passed to the operation.
- isManaged: Set to true to indicate that the table is managed by Delta Lake.
- partitionBy: Specifies how the table is partitioned.
- description: As the name suggests, it is a description of the table.
- properties: Custom table properties.
- isolationLevel: The isolation level of the operation. In this case, it is set to the default (Serializable).
- isBlindAppend: A Boolean value indicating whether the commit is a blind append. Blind INSERT operations, which involve adding data to a Delta Lake table without first reading any existing data, do not cause conflicts with any other operations, even if they affect the same partition(s) or the table as a whole. However, in the case where the isolation level has been set to Serializable, blind appends may potentially result in conflicts.
- operationMetrics: An object containing metrics about the operation. This is empty in this example.
- engineInfo: A string representing the version of Apache Spark and Delta Lake that were used to perform the operation.
- txnId: A UUID identifying the transaction associated with the commit.
Second part of a file contains following information:
{ "protocol": { "minReaderVersion": 1, "minWriterVersion": 2 } }
The transaction log of a Delta table includes protocol versioning information that enables Delta Lake evolution. Delta Lake ensures backward compatibility, which means that a Delta Lake reader with a higher protocol version can always read data written by a lower protocol version. However, sometimes Delta Lake may introduce changes that break forward compatibility. In such cases, lower protocol versions of Delta Lake may not be able to read or write data written by a higher protocol version, resulting in an error message prompting users to upgrade their version. You can specify the default version on the table level or using standard configuration. The reason we need these versions is that Delta is constantly changing, and sometimes we need to specify which version to use.
Last part of our file contains following information:
{ "metaData": { "id": "f913b755-8565-4508-8690-c0fb2aca13d6", "format": { "provider": "parquet", "options": {} }, "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthdate\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}", "partitionColumns": [], "configuration": {}, "createdTime": 1679759207032 } }
- id – unique identifier for this data source.
- format – data format for the data source, in this case it is parquet format with empty options.
- schemaString– the schema of the data source in a JSON-like format. It specifies the data type, nullable flag, and metadata of each field in the data source.
- partitionColumns – property that specifies the columns used for partitioning the data.
- configuration – property that contains any additional configuration options for the data source.
- createdTime – Unix timestamp indicating when the data source was created (epoch time).
Alright, now that we have a common understanding of how transaction log records are built, let’s perform some operations. For example, let’s insert three rows into our Delta table:
INSERT INTO mydb.employees VALUES (1, 'John Doe', '1980-01-01'), (2, 'Jane Smith', '1990-05-15'), (3, 'Bob Johnson', '1985-12-31');
New record appeared in the location:
Inside of it we can see one more time some information:
{ "commitInfo": { "timestamp": 1679759649292, "operation": "WRITE", "operationParameters": { "mode": "Append", "partitionBy": "[]" }, "readVersion": 0, "isolationLevel": "Serializable", "isBlindAppend": true, "operationMetrics": { "numFiles": "3", "numOutputRows": "3", "numOutputBytes": "2792" }, "engineInfo": "Apache-Spark/3.3.1.5.2-84175989 Delta-Lake/2.2.0.1", "txnId": "3805549d-ea9e-4326-907e-f96929b39752" } }
I will not explain each line one by one this time, but please note four important options:
- operation: It is set to WRITE, indicating that we are inserting rows into the Delta table.
- numFiles: It shows how many new files have been created as a result of the operation. In this case, it is 3, which corresponds to the 3 inserts that were performed. From a code perspective, it may appear as a single operation, but in reality, it consists of multiple writes.
- numOutputRows: It indicates how many rows were inserted.
- numOutputBytes: It shows the size of the new rows that were inserted.
Next to that we have detailed information about every single row described in the “add” section (it means “add parquet file”):
{ "add": { "path": "part-00000-4e30eecf-6fb6-460d-8897-3a5fdff9417a-c000.snappy.parquet", "partitionValues": {}, "size": 928, "modificationTime": 1679759648749, "dataChange": true, "stats": "{\"numRecords\":1,\"minValues\":{\"id\":1,\"name\":\"John\",\"birthdate\":\"1980-01-01\"},\"maxValues\":{\"id\":1,\"name\":\"John\",\"birthdate\":\"1980-01-01\"},\"nullCount\":{\"id\":0,\"name\":0,\"birthdate\":0}}", "tags": {} } }
{ "add": { "path": "part-00001-91bb7a63-6899-4fe9-9ee7-d229c947f318-c000.snappy.parquet", "partitionValues": {}, "size": 928, "modificationTime": 1679759639807, "dataChange": true, "stats": "{\"numRecords\":1,\"minValues\":{\"id\":2,\"name\":\"Mary\",\"birthdate\":\"1985-05-10\"},\"maxValues\":{\"id\":2,\"name\":\"Mary\",\"birthdate\":\"1985-05-10\"},\"nullCount\":{\"id\":0,\"name\":0,\"birthdate\":0}}", "tags": {} } }
{ "add": { "path": "part-00002-23519556-2463-4abf-b430-6efd6fda0aa2-c000.snappy.parquet", "partitionValues": {}, "size": 936, "modificationTime": 1679759648748, "dataChange": true, "stats": "{\"numRecords\":1,\"minValues\":{\"id\":3,\"name\":\"David\",\"birthdate\":\"1990-10-20\"},\"maxValues\":{\"id\":3,\"name\":\"David\",\"birthdate\":\"1990-10-20\"},\"nullCount\":{\"id\":0,\"name\":0,\"birthdate\":0}}", "tags": {} } }
Please notice stats section where you can find min and max values for every column. It is very important because based on that engine can know if the searched value is inside the file or not.
Let’s update one row:
UPDATE mydb.employees SET id = 4 where name = 'Jane Smith'
new file appeared:
{ "remove": { "path": "part-00001-bc768c00-1366-4414-b8f6-b9f7ab04fa26-c000.snappy.parquet", "deletionTimestamp": 1681676475849, "dataChange": true, "extendedFileMetadata": true, "partitionValues": {}, "size": 971, "tags": {} } } { "add": { "path": "part-00000-90860ef2-4f10-46ad-ba4b-917536ac0e1d-c000.snappy.parquet", "partitionValues": {}, "size": 971, "modificationTime": 1681676476679, "dataChange": true, "stats": "{\"numRecords\":1,\"minValues\":{\"id\":4,\"name\":\"Jane Smith\",\"birthdate\":\"1990-05-15\"},\"maxValues\":{\"id\":4,\"name\":\"Jane Smith\",\"birthdate\":\"1990-05-15\"},\"nullCount\":{\"id\":0,\"name\":0,\"birthdate\":0}}", "tags": {} } } { "commitInfo": { "timestamp": 1681676476795, "operation": "UPDATE", "operationParameters": { "predicate": "(name#499 = Jane Smith)" }, "readVersion": 1, "isolationLevel": "Serializable", "isBlindAppend": false, "operationMetrics": { "numRemovedFiles": "1", "numCopiedRows": "0", "executionTimeMs": "5145", "scanTimeMs": "4214", "numAddedFiles": "1", "numUpdatedRows": "1", "rewriteTimeMs": "931" }, "engineInfo": "Apache-Spark/3.2.2.5.1-86364249 Delta-Lake/1.2.1.10", "txnId": "1d5619d8-ffac-45f1-90d1-8a218c8378fe" } }
There is a ton of information, but we can find what we put in the WHERE clause in the operationParameters. This time, we have two operations: “add,” which indicates that a new file with an updated value was added, and “remove,” which indicates that a file with an old value was marked as “removed.” These operations help new queries that ask for the newest version of a table know which files to read and which to skip. What will happen when we remove some rows?
DELETE FROM mydb.employees where id <3
{ "remove": { "path": "part-00000-fb2fa16c-f20c-465a-b74b-9f182136b425-c000.snappy.parquet", "deletionTimestamp": 1681676944170, "dataChange": true, "extendedFileMetadata": true, "partitionValues": {}, "size": 956, "tags": {} } } { "add": { "path": "part-00000-77ad326e-9958-48c6-be5c-1d23f58ed68d-c000.snappy.parquet", "partitionValues": {}, "size": 472, "modificationTime": 1681676944086, "dataChange": true, "stats": "{"numRecords":0,"minValues":{},"maxValues":{},"nullCount":{}}", "tags": {} } } { "commitInfo": { "timestamp": 1681676944210, "operation": "DELETE", "operationParameters": { "predicate": "["(spark_catalog.mydb.employees.id < 3)"]" }, "readVersion": 2, "isolationLevel": "Serializable", "isBlindAppend": false, "operationMetrics": { "numRemovedFiles": "1", "numCopiedRows": "0", "executionTimeMs": "2840", "numDeletedRows": "1", "scanTimeMs": "2096", "numAddedFiles": "1", "rewriteTimeMs": "743" }, "engineInfo": "Apache-Spark/3.2.2.5.1-86364249 Delta-Lake/1.2.1.10", "txnId": "c9ddb2e0-12cb-41ab-946c-fc6643a6c086" } }
This time one add and one remove file operation registered. What kind of operations we can see in _delta_log:
- Add file – adds a file.
- Remove file – removes a file.
- Update metadata – Updates the of metadata a table, (i.e partitioning),
- Set transaction – This records that a micro-batch with a specific ID has been successfully committed by a structured streaming job.
- Change protocol – switch the Delta Lake transaction log to the newest protocol,
- Commit info – Contains information around the commit, which operation was made, from where and at what time.
When I run select statement:
SELECT * FROM mydb.employees
I received newest version:
How does Spark know what to read? It reads all the JSON files and based on that calculates which files to read and which ones should be skipped. Of course, instead of going through the JSON files manually like we did, we can also run the DESCRIBE HISTORY command:
DESCRIBE HISTORY mydb.employees
We can also read specific version and spark will determine what files can be read. I already mentioned how to read specific version in my previous article (link) but I can give you a sneak peek here also:
SELECT * FROM mydb.employees VERSION AS OF 1
We can also go back to specific version
RESTORE mydb.employees TO VERSION AS OF 1;
Then specified version become “newest”:
SELECT * FROM mydb.employees
You can treat this option as a “ROLLBACK” operation, but it is much more flexible than the standard rollback that we know from relational databases. This is because you can also rollback this rollback operation, as it is just another operation like any other in Delta Lake.:)
To show you additional information about Delta log, I added a few more rows, which led to a total of 11 transactions and a lot of individual Parquet files. This information can be obtained using the time travel feature, which allows reading of previous versions of a Delta table. In reality, every “version” corresponds to a commit on the table. By default, when we run a simple SELECT statement like the one above, it reads the latest version of the table and determines what files to read based on that:
How our transaction log looks like:
As you notice on the above picture after ninth commit checkpoint parquet file appeared. Once we’ve made several commits to the transaction log, Delta Lake saves a checkpoint file in Parquet format in the same _delta_log subdirectory. Delta Lake automatically generates checkpoint as needed to maintain good read performance. You can imagine that when entire log will consists of json files then after some time you will have an enormous number of files to read and those checkpoint files will contain information about all the commits that happen before it so it will be much easier to read one parquet file instead of thousands of small json files.In order to quickly catch up with the latest changes, Spark can perform an operation to list all files in the transaction log, skip to the latest checkpoint file, and process only the JSON commits made since that checkpoint was saved.
Of course there is multiple other topics that are important from Delta Lake point of view like concurrency control or vacuuming and optimizing but that is all that I wanted to show today. Thank you for reading.
- Executing SQL queries from Azure DevOps using Service Connection credentials - August 28, 2024
- Setup Git credentials for Service Principal in Azure Databricks - August 21, 2024
- Microsoft Fabric 101 Episode 3: Pausing and Scaling using portal and Powershell - August 8, 2024
Last comments