Today, I would like to write a few words about schema enforcement and schema evolution in Delta Lake. Both concepts are quite crucial if we want to implement lakehouse solutions in Microsoft Fabric, Databricks, or any other tools that work with Delta Lake. Before we delve deeper into the details, let’s briefly explain our topic for today. This article is part of the series ‘Delta Lake 101,’ and you can find all the previously written articles here.”
Schema Enforcement in Delta Lake is a mechanism for ensuring the integrity of the schema when saving data. It guarantees that the data we attempt to insert into a table must adhere to the defined schema. In the event of a schema violation, the write operation is rejected, aiding in data integrity and preventing the introduction of erroneous or incorrect data. The good news is that you don’t need to take any specific actions because every Delta table enforces schema by default.
Schema Evolution in Delta Lake is a flexible capability for modifying the schema of a table to accommodate changing requirements. It enables the addition of new columns, modification of column types, renaming columns, or deleting columns without the need to rewrite existing data. This is particularly important in environments where the data schema may evolve over time. With Schema Evolution, Delta Lake allows for the adaptation of the schema to current requirements without the need for complex data transformation operations.
It’s important to note that Delta Lake will automatically adjust for certain schema changes without requiring additional settings. These specific changes include:
- Adding New Columns: Delta Lake can automatically accommodate the addition of new columns to the schema.
- Changing Data Type from NullType to Any Other Type: If you change the data type from NullType to any other data type, Delta Lake will handle this automatically.
- Upcasting from ByteType to ShortType to IntegerType: Delta Lake can automatically upcast data types from ByteType to ShortType and to IntegerType without any additional settings.
However, for other schema changes that go beyond these specific scenarios, you may need to set the overwriteSchema option to true. When this option is enabled, all Parquet files associated with the Delta Lake table will be rewritten to accommodate the schema changes. This ensures data consistency and compatibility with the updated schema.
Schema Enforcement and Schema Evolution together strike a balance between data integrity requirements and schema management flexibility. They enable effective data management during evolution and ensure data integrity in Delta Lake.
As a first example let’s prepare some some data frame with specific schema. Schema can be specified using simple StructType and StructField and data frame can be created by using spark.createDataFrame method. This data frame will be saved as a table. To ensure that we did everything correctly let’s display the schema by using printSchema mehtod:
from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType, LongType, ShortType data = [ Row(product_id=1, product_name="Product A", price=100.0, quantity=10, description="Description A"), Row(product_id=2, product_name="Product B", price=150.0, quantity=8, description="Description B"), Row(product_id=3, product_name="Product C", price=75.0, quantity=15, description="Description C") ] schema = StructType([ StructField("product_id", ShortType(), False), StructField("product_name", StringType(), True), StructField("price", FloatType(), True), StructField("quantity", IntegerType(), True), StructField("description", StringType(), True) ]) df = spark.createDataFrame(data, schema=schema) df.write.format("delta").mode("overwrite").saveAsTable("SeequalityLakehouse.sales_parquet") spark.table("SeequalityLakehouse.sales_parquet").printSchema()
As you can see, everything works as expected, and our Delta table has the proper types assigned. To confirm that schema enforcement is working correctly, we will attempt to append a DataFrame with a different schema, specifically, a new column called ‘ordinal’ has been added:
new_data = Row(product_id=4, product_name="Product D", price=120.0, quantity=12, description="Description D",ordinal=1) table_schema = StructType([ StructField("product_id", IntegerType(), False), StructField("product_name", StringType(), True), StructField("price", FloatType(), True), StructField("quantity", IntegerType(), True), StructField("description", StringType(), True), StructField("ordinal", IntegerType(), True) ]) new_row_df = spark.createDataFrame([new_data], schema=table_schema) new_row_df.write.format("delta").mode("append").saveAsTable("sales")
As a result, we received an error that specifically points out the differences in the schema:
In such a situation, we will enable schema evolution by setting the mergeSchema property to true. Additionally, we will print the schema of the target table after the operation:
new_row_df.write.format("delta").mode("append").option("mergeSchema",True).saveAsTable("sales") spark.table("SeequalityLakehouse.sales").printSchema()
Everything finished successfully, and as you can see below, a new column has been added:
It’s an amazing feature, but you must be careful when dealing with it. Every new column added to the source will automatically be added to the target Delta table. This can become uncontrolled, especially when your data source can include manual files, etc. Let’s check it with an example – as you can see below, I added a prefix for every column in the source table, and all of them have been added to the target without any problem:
new_data = Row(product_id=4, product_name="Product D", price=120.0, quantity=12, description="Description D",ordinal=1) table_schema = StructType([ StructField("a_product_id", IntegerType(), False), StructField("a_product_name", StringType(), True), StructField("a_price", FloatType(), True), StructField("a_quantity", IntegerType(), True), StructField("a_description", StringType(), True), StructField("a_ordinal", IntegerType(), True) ]) new_row_df = spark.createDataFrame([new_data], schema=table_schema) new_row_df.write.format("delta").mode("append").option("mergeSchema",True).saveAsTable("sales")
If you want to completely overwrite target schema you can sepcify overwriteSchema:
new_row_df.write.format("delta").mode("overwrite").option("overwriteSchema", "True").saveAsTable("sales")
As I said in the beginning of the article this option should also be used with caution.
In the end I would like to mention also how schema evolution works with the merge statement. To demnstrate it lets create two data frames one for source and one for target. Data frame for source will have one additional column:
from delta.tables import DeltaTable from pyspark.sql import Row from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType, DateType, LongType, ShortType source_data = [ Row(product_id=1, product_name="Product A", price=100.0, quantity=10, description="Description A"), Row(product_id=2, product_name="Product B", price=150.0, quantity=8, description="Description B"), Row(product_id=3, product_name="Product C", price=75.0, quantity=15, description="Description C") ] source_schema = StructType([ StructField("product_id", ShortType(), False), StructField("product_name", StringType(), True), StructField("price", FloatType(), True), StructField("quantity", IntegerType(), True), StructField("description", StringType(), True) ]) df_source = spark.createDataFrame(source_data, schema=source_schema) target_data = [] target_schema = StructType([ StructField("product_id", ShortType(), False), StructField("product_name", StringType(), True), StructField("price", FloatType(), True), StructField("quantity", IntegerType(), True), StructField("description", StringType(), True), StructField("oridnal", IntegerType(), True) ]) df_target.write.format("delta").mode("overwrite").saveAsTable("SeequalityLakehouse.target_table") spark.sql("DESCRIBE DETAIL SeequalityLakehouse.target_table") target_table_path = spark.sql("DESCRIBE DETAIL SeequalityLakehouse.target_table"). select ("location") target_table = DeltaTable.forPath(spark,target_table_path). select ("location") .head() [0]
Now we will enable schema evolution for a session:
spark.sql("SET spark.databricks.delta.schema.autoMerge.enabled = true")
after that let’s perform MERGE operation with whenNotMatchedInsertAll() option:
( target_table .alias("target") .merge(df_source.alias("source"),"source.product_id = target.product_id") .whenNotMatchedInsertAll() .execute() )
everything was performed successfully and target schema updated correctly:
MERGE is a powerful operation, so it’s worth summarizing what will happen in specific scenarios when you want to use it ( I will not cover all the details so it is worth going to the official documentation of delta link):
- When the source has new columns that the target does not have but also contains all the columns that the target has:
- With schema evolution: Matched columns will be updated, and new columns will be added to the target schema.
- Without schema evolution: Columns that match will be inserted/updated, and new columns will be ignored.
- When the source has new columns that the target does not have, but some columns from the target are missing:
- With schema evolution: New columns are added to the target schema, matched ones are updated, and those that exist only in the target are skipped (null values will be inserted/updated).
- Without schema evolution: The operation will throw an error because some columns that exist in the target are missing in the source.
This article introduces schema enforcement and schema evolution in Delta Lake, essential for lakehouse solutions in tools like Microsoft Fabric and Databricks. Schema enforcement guarantees data integrity by rejecting write operations with schema violations. Schema evolution offers flexibility to modify table schemas without rewriting data, accommodating evolving data needs.
It emphasizes caution when using schema evolution, as new source columns are automatically added to the target table. The article concludes by showcasing schema evolution in a MERGE operation, updating the target schema based on source changes. I recommend you to test those options on your own. Enjoy!
- Executing SQL queries from Azure DevOps using Service Connection credentials - August 28, 2024
- Setup Git credentials for Service Principal in Azure Databricks - August 21, 2024
- Microsoft Fabric 101 Episode 3: Pausing and Scaling using portal and Powershell - August 8, 2024
Last comments