Delta Lake is becoming more and more popular. It is right now the default storage format used by Spark engine so when you will not specify it differently it will be used by Synapse Analytics Spark Pools or Azure Databricks. Why it become so popular? Before we will start let’s say what was the problem with other data formats that we can use. For our example we will take csv and parquet. CSV over the years became very popular and people use on daily basis to export and transfer data. What is CSV? It is prety simple structure where typically on the first line we have header separated by comma and in the next lines we have corresponding values. It works pretty well in many scenarios but the biggest file will become the harder and longer it will take to read it. Why? Imagine that you have 10 files with customers 1GB each, to find information about specific customer like John Doe you have to read all the files from the beginning to the end. Of course it will take time and resources. To minimize time to read it from disk it can be compressed using one of the popular algorithms but with this technique you will overcome only one problem.
It is not the only problem with CSV. It also has limited data types. In this case, there is no information about which data type is used for a specific column because there is no metadata connected to the file. Because it lacks metadata, we have problems with data integrity inside the files. As you can see, there are multiple problems with it. CSV can be good for ad hoc data transfers and exports, but it is not suitable for analytical systems where performance, data integrity, and schema are critical aspects.
We partially solve this problem with formats like Apache Parquet. Of course it is not the only format currently in use for example we can say about ORC, Apache Hudi or Avro. They have some similarities to Parquet and also some differences but what they have in common is that they want to improve process of storing data. Parquet itself has columnar storage, so it improves the process of retrieving data by reading only the needed columns, and additionally, columnar storage provides better compression.
Apache Parquet also contains a schema, so data types are not lost. So we can notice some improvement when we compare to CSV but parquet cannot solve few problems that are important for analytical workloads like:
- updates – to update single value you have to rewrite entire file
- transactions – you have to ensure that data is in a consistent state because format itself cannot help with it,
- schema enforcement – every parquet file is independent so if you have set of files you have to take care that all of them has the same schema.
- versioning – to see how data looks like over time you have to prepare process of taking snapshot of data.
So next step in evolution is delta lake itself. Physically this format is based on parquet (for data storage) and json (for transaction log). Delta Lake uses a versioned Parquet file format that allows for the efficient processing of large datasets while providing mentioned above features that make it well-suited for complex data pipelines. What is also important to notice is the fact that delta itself is an open-source format so many different tools can adopt it in their solutions.
Microsoft Azure supports Delta Lake integration in Synapse Analytics Serverless (read-only), Synapse Spark Pools, Stream Analytics, and Databricks. Additionally, Data Factory with its Data flows can work with Delta Lake.
Please be aware that different tools can use different version of Spark. To see which functionalities work in which version please look at the official realease notes https://github.com/delta-io/delta/releases/. Example: support for TIMESTAMP AS OF in Spark SQL was released in Delta Lake 2.1.0 supported by Apache Spark 3.3.(Thanks Pawel for pointing it out!).
from pyspark.sql.types import StructType, StructField, IntegerType, StringType names = ["Alice", "Bob", "Charlie", "David", "Eve",'Adrian','Adam','Patrick','Mick','Christina'] schema = StructType([ StructField("id", IntegerType(), False), StructField("name", StringType(), True) ]) df = spark.createDataFrame([(i+1, name) for i, name in enumerate(names)], schema) df.show()
To save it in Delta format, we can specify the format as “delta” – please note that it is currently the default Spark format.
( df .where(df.id<6) .write .format("delta") .save("/delta/target/") )
After execution, in the destination folder, we can notice a few Parquet files, by default compressed using the snappy method. Why can we see multiple files instead of one? Please remember that Spark is based on parallel processing, and that is the reason for having more than one file. We can also notice the _delta_log directory that contains metadata and transaction logs for the data lake in the form of JSON. Each JSON file represents a transaction and contains information about the changes made:
To read Delta format, we can use the standard spark.read method, as you can see below:
( spark .read .format("delta") .load("/delta/target") .show() )
Of course, if we want, we can do the same from SQL – just remember to include the “delta” prefix inside the query.
%%sql SELECT * FROM delta.`/delta/target`
Result returned as expected:
Now, let’s try to insert more rows into our table. In this example, we will add a row with an ID equal to 6:
( df .where(df.id==6) .write.format("delta") .mode("append") .save("/delta/target/") )
As you can see, the row with ID 6 has been inserted into the target Delta. The row has been appended because that was the mode we chose in the above code.
Of course, append is not the only option – we can also overwrite:
( df .where(df.id==6) .write .format("delta") .mode("overwrite") .save("/delta/target/") )
As you probably noticed, the entire target dataset was replaced with only one row with an ID equal to 6.
What can be surprising for a lot of people is that I can perform an update on the Delta:
UPDATE delta.`/delta/target` SET id = 11
And you know what? It works perfectly well!
SELECT * FROM delta.`/delta/target`
What happened behind the scenes? The current version of the file was read, the value was replaced, and a new Parquet file appeared – all of these things happened automatically!
In the introduction, I mentioned that Delta Lake supports time-traveling and saves history by default. You can see evidence of this by looking at the folder structure of our Delta. The current version of the Delta table contains only one row, and as you can see below, there are many more Parquet files and three JSON files in the transaction log. Each one represents a different operation that I executed.
Of course, we can still read those historical values. To do that, we can use the history() method in PySpark or the DESCRIBE HISTORY statement in SQL.
from delta.tables import * ( DeltaTable .forPath(spark, "/delta/target/") .history() .show() )
%%sql DESCRIBE HISTORY delta.`/delta/target/`
As a result, we will get a lot of information like the number of versions (newest on top), when it was executed, what kind of operation it was, and much more. It can be interesting to look at the operationParameters, which contains all the information about the type of mode we used. Additionally, it is pretty useful to see the operationMetrics, where we can notice how many files our operation touched.
If we want to see a snapshot of data for a specific version, we can use the option of spark.read named versionAsOf. As a result, we will get data from that specific version.
( spark .read .format("delta") .option("versionAsOf", "0") .load("/delta/target/") .show() )
The same can be done with SQL:
SELECT * FROM delta.`/delta/target` VERSION AS OF 0
Every version has its own timestamp. We can also use the possibility to query it via a timestamp:
spark.read.format('delta').load('/delta/target').option('timestampAsOf', '2023-02-22').show()
SELECT * FROM delta.`/delta/target` TIMESTAMP AS OF '2019-01-29 00:37:58'
Additionally, Delta can enforce schema so that we will avoid a situation where data with a different schema is inserted into the target directory. Below you can see an example where I created a data frame with an additional column named ‘age’.
from pyspark.sql.types import StructType, StructField, IntegerType, StringType names = ["Alice", "Bob", "Charlie", "David", "Eve",'Adrian','Adam','Patrick','Mick','Christina'] schema = StructType([ StructField("id", IntegerType(), False), StructField("name", StringType(), True), StructField("age", IntegerType(), True) ]) df = spark.createDataFrame([(i+1, name, random.randint(18, 65)) for i, name in enumerate(names)], schema) df.show()
When we try to insert it into the Delta with a different format, we will get an error:
Of course, sometimes changes in schema are allowed and Delta can adjust to it – this mechanism is called schema evolution, and I will write about it in the future.
What else can we do with Delta? It’s always good to see what metadata we can get from the DESCRIBE command:
%%sql DESCRIBE delta.`/delta/target/`
Above as you can see we can get quick information about the schema and partitioning but if we want more detailed information of course we can use DESCRIBE EXTENDED:
%%sql DESCRIBE EXTENDED delta.`/delta/target/`
The last activity that I want to show is an option for converting parquet into delta. I have generated a few parquet files, which you can see below:
Converting them is pretty straight forward because we have CONVERT TO DELTA sql command or convertToDelta method:
%%sql CONVERT TO DELTA parquet.`/delta/target_parquet/`
from delta.tables import * DeltaTable.convertToDelta(spark, "parquet.`/delta/target_parquet/`")
After the conversion process, the delta log will be initiated, and we can treat it as a standard delta. The existing parquet files will become version 0 of our delta.
As you can see Delta format is powerful because it allows for efficient data storage and querying. It supports ACID transactions, schema enforcement, and schema evolution. It also offers time travel functionality and metadata management. These features make it easier to manage and analyze large-scale data sets in a reliable and efficient manner.
You can expect more about this topic from my side because I plan to write entire series of posts about Delta Lake and its functionalities. Stay tuned!
- Executing SQL queries from Azure DevOps using Service Connection credentials - August 28, 2024
- Setup Git credentials for Service Principal in Azure Databricks - August 21, 2024
- Microsoft Fabric 101 Episode 3: Pausing and Scaling using portal and Powershell - August 8, 2024
Last comments