Anyone who has ever designed an ETL process involving more than a few tables of data has likely encountered the need to build a metadata-driven framework. By ‘framework,’ I mean any solution that standardizes this process and allows for scaling through configuration changes.
Regardless of whether it involved BIML, SSIS packages generated from C#, dynamic SQL, or PySpark notebooks, the core elements of creating such a solution were similar:
- Standardizing the process as much as possible
- Breaking it down into steps, each with a single responsibility
- Preparing a generic solution that can be scaled through changes in configuration files
GENERIC MERGE
One pattern that often appears in such a solution is incremental loading, which uses UPSERT/MERGE functionality during data loading into the target table. A simplified diagram of this process is shown below.
While the first two steps should not pose a problem when creating generics, the MERGE operation between BRONZE and SILVER can sometimes be tricky, especially when incorporating automatic schema evolution.
Psst, in case you haven’t encountered this term yet: Schema evolution is a feature that allows users to easily change a table’s current schema to accommodate data that is changing over time. Most commonly, it’s used when performing an append or overwrite operation, to automatically adapt the schema to include one or more new columns.
This kind of a solution must then address two problems simultaneously:
- DDL: modify the table schema i.e., ignore deleted columns and add new ones
- DML: apply a classic MERGE i.e., add new rows, modify existing ones, and sometimes also mark or delete rows removed from the source
It doesn’t sound like rocket science. There are at least a few methods to handle it; here are two:
Dynamic SQL:
- Retrieve the metadata of the source and target tables
- Identify schema changes and generate ALTER TABLE statements based on them
- Generate a MERGE statement that includes only the columns present in the new source schema
DELETE/INSERT pattern:
- Remove rows present in the increment from the target table
- Load the entire increment with schema evolution enabled
Both solutions have their drawbacks: the first appears overcomplicated and difficult to maintain (as is often the case with dynamic SQL) and also splits our transformation into separate DDL and DML operations. The second, on the other hand, divides the loading process into two distinct DML operations.
Any way to do it simpler?
MERGE WITH SCHEMA EVOLUTION
A huge simplification for us is the MERGE WITH SCHEMA EVOLUTION
command available in Databricks from runtime 15.2. This command extends the classic MERGE statement with schema evolution functionality, allowing these operations to be performed simultaneously using simple syntax. Without the need to generate detailed SQL commands.
The following example illustrates how to use this command when merging data into a sample table in Databricks.
We will start by creating the target table and loading a few rows of data into it.
schema_target = StructType([ StructField("ProdId", IntegerType()), StructField("ProdCode", StringType()), StructField("ProdName", StringType()), StructField("ProdDesc", StringType()), StructField("Manufacturer", StringType()), StructField("Category", StringType()) ]) data_target = [ (1, "P001", "Product 1", "Description 1", "Manufacturer A", "Category X"), (2, "P002", "Product 2", "Description 2", "Manufacturer B", "Category X"), (3, "P003", "Product 3", "Description 3", "Manufacturer A", "Category X"), (4, "P004", "Product 4", "Description 4", "Manufacturer A", "Category X"), (5, "P005", "Product 5", "Description 5", "Manufacturer B", "Category X") ] df_target = spark.createDataFrame(data_target, schema=schema_target) (df_target .write .format("delta") .mode("overwrite") .option("overwriteSchema", "true") .saveAsTable("terradbx.silver.products") )
Next, we will create the source table by making the following changes: removing the column ProdDesc
and adding the column Subcategory
.
schema_new = StructType([ StructField("ProdId", IntegerType()), StructField("ProdCode", StringType()), StructField("ProdName", StringType()), StructField("Manufacturer", StringType()), StructField("Category", StringType()), StructField("Subcategory", StringType()) ]) data_new = [ (4, "P004", "Product 4", "Manufacturer A", "Category Z", "Subcategory Z1"), (5, "P005", "Product 5", "Manufacturer B", "Category Z", "Subcategory Z2"), (6, "P006", "Product 6", "Manufacturer B", "Category Z", "Subcategory Z2") ] df_new = spark.createDataFrame(data_new, schema=schema_new) (df_new .write .format("delta") .mode("overwrite") .option("overwriteSchema", "true") .saveAsTable("terradbx.bronze.products") )
Here is a quick look at the target table…
… and the source one.
Let’s remind ourselves of what we want to achieve – our solution should perform the following tasks in a single operation:
- Ignore missing columns and fill new rows with NULL values
- Add new columns to the target table
- Execute an UPDATE operation on rows provided by the source
- INSERT new rows that do not yet exist in the target table
And this is exactly what MERGE WITH SCHEMA EVOLUTION
provides us with. Without the need to list columns, using simple syntax as shown below.
A quick look at the result to confirm that it really worked as expected…
… and a second one on the operation history to ensure that all changes were made within a single operation.
Test passed!
Now wrap this statement in a function, parameterize the table name, join condition, add some lineage columns, and you have a ready-made generic solution for incremental data loading.
- Terraforming Databricks #2: Catalogs & Schemas - September 16, 2024
- Terraforming Databricks #1: Unity Catalog Metastore - September 4, 2024
- Utilizing YAML Anchors in Databricks Asset Bundles - August 24, 2024
Last comments