RESTORE(1)

Restore lakehouse to previous state

One of the common questions that come from data warehouse developers when they switch to a lakehouse approach in Microsoft Fabric or Azure Databricks is “how to restore our solution”. It is a pretty important question because in the old days of relational databases, it was very easy to restore, and sometimes this technique was used to restore the entire solution in case of failure in ETL load to prevent inconsistencies and errors. In Lakehouse, we have DELTA tables with support for the RESTOREcommand, but as you probably know, RESTORE switches to a different version of a single table. Some time ago, I wrote about this mechanism here, so if you are interested in how it works, I recommend you go there and read about it. Restoring a single table and keeping all the other tables can lead to inconsistencies. For example, try to imagine restoring the product dimension without restoring the fact sales that reference it – it is an easy way to get wrong results and a recipe for disaster. But we can use exactly the same mechanism to switch all the needed tables at once! How to do it? Just create a simple wrapper function and keep some metadata. It should work for both Microsoft Fabric and Azure Databricks. Please keep in mind that it is not disaster recovery solution!  Let’s see it in action!

For my demo, I created a lakehouse in Microsoft Fabric named lake01. Inside the lakehouse, I created a table etl_latest_tables_historyto save information about the delta table version. It is important to have a structure like this to know in an easy way what the table version was before my ETL because, in case of failure, I want to restore it.

%%sql

CREATE TABLE lake01.etl_latest_tables_history
(
    version BIGINT,
    timestamp TIMESTAMP,
    TableName STRING
)

Now let’s create three tables: products, categories, and subcategories, and fill them with some sample data.

products_data = [
    (1, "iPhone 15", "Latest Apple smartphone with A15 Bionic chip", 999.0, 1, 1),
    (2, "Samsung Galaxy S21", "Flagship Samsung smartphone with Exynos 2100", 799.0, 1, 1),
    (3, "Dell XPS 13", "High-performance laptop with Intel i7 processor", 1200.0, 1, 2),
    (4, "LG Refrigerator", "Double door refrigerator with smart inverter", 600.0, 2, 3),
    (5, "Bosch Washing Machine", "Front load washing machine with 7kg capacity", 400.0, 2, 4),
    (6, "The Great Gatsby", "Classic novel by F. Scott Fitzgerald", 10.0, 3, 5),
    (7, "Sapiens", "A brief history of humankind by Yuval Noah Harari", 15.0, 3, 6)
]

categories_data = [
    (1, "Electronics", "Devices and gadgets"),
    (2, "Home Appliances", "Appliances for home use"),
    (3, "Books", "Various kinds of books")
]

subcategories_data = [
    (1, "Mobile Phones", "Smartphones and accessories", 1),
    (2, "Laptops", "Personal and gaming laptops", 1),
    (3, "Refrigerators", "Home and commercial refrigerators", 2),
    (4, "Washing Machines", "Automatic and semi-automatic washing machines", 2),
    (5, "Fiction", "Fictional books and novels", 3),
    (6, "Non-Fiction", "Biographies, self-help, and more", 3)
]

# Create DataFrames
products_df = spark.createDataFrame(products_data, ["ProductID", "ProductName", "ProductDescription", "Price", "CategoryID", "SubcategoryID"])
categories_df = spark.createDataFrame(categories_data, ["CategoryID", "CategoryName", "CategoryDescription"])
subcategories_df = spark.createDataFrame(subcategories_data, ["SubcategoryID", "SubcategoryName", "SubcategoryDescription", "CategoryID"])

products_df.write.saveAsTable("lake01.products")
categories_df.write.saveAsTable("lake01.categories")
subcategories_df.write.saveAsTable("lake01.subcategories")

spark.sql("SHOW TABLES").show()

As you can see in the screenshot below, all the tables were created successfully.

To remind you, all the Delta tables are versioned, and you can investigate them using the DESCRIBE HISTORY command. In the example below, you can see that the products table has only one version because we created it and inserted some data in one operation.

%%sql

DESCRIBE HISTORY lake01.products

Based on the above example, you might think that restoring is pretty simple because the only thing we have to do is restore to version-1, but that is not true. Every table has its own log that is not synchronized with other tables, and we can perform different operations on every table. For example, data inserted into the products table will create only one version, but at the same time, you will see that categories were inserted and updated in separate operations, creating two versions. In many cases, it can be even more complicated, which is why we have to build a custom solution.

We have a table to keep information about versions, so let’s create a function that will iterate over tables and save their metadata. In the function, I skipped tables that start with ‘etl’ because I don’t want to restore technical tables used by the process:

from pyspark.sql.functions import lit

def save_latest_table_versions():
    tables = [row.tableName for row in spark.sql("SHOW TABLES").collect()]
    
    all_latest_history_df = None
    
    for table in tables:
        if not table.startswith("etl"):
            history_df = spark.sql(f"DESCRIBE HISTORY {table}").select("version", "timestamp")
            latest_history_df = history_df.orderBy("version", ascending=False).limit(1)
            latest_history_df = latest_history_df.withColumn("TableName", lit(table))
            
            if all_latest_history_df is None:
                all_latest_history_df = latest_history_df
            else:
                all_latest_history_df = all_latest_history_df.union(latest_history_df)
    
    all_latest_history_df.write.mode("overwrite").saveAsTable("etl_latest_tables_history")

Let’s execute the above function and see the result:

save_latest_table_versions()
display(spark.read.table("lake01.etl_latest_tables_history"))

As you can see above, we collected metadata about versions, and for all the tables, the current version is 0. Let’s complicate it a little bit and perform some inserts and updates:

spark.sql("""
    INSERT INTO products VALUES
    (8, 'Google Pixel 6', 'Latest Google smartphone with Tensor chip', 899.0, 1, 1)
""")

spark.sql("""
    UPDATE products SET Price = Price + 50 WHERE ProductID = 1
""")

spark.sql("""
    INSERT INTO categories VALUES
    (4, 'Furniture', 'Home and office furniture')
""")

spark.sql("""
    INSERT INTO subcategories VALUES
    (7, 'Office Chairs', 'Comfortable office chairs', 4)
""")

spark.sql("""
    UPDATE subcategories SET SubcategoryDescription = 'Updated description for Mobile Phones' WHERE SubcategoryID = 1
""")

When we look at the products table and its history, we see that 2 versions were created: one for the “WRITE” operation, which reflects the INSERT INTOoperation from the above script, and another version for the UPDATEoperation:

%%sql

DESCRIBE HISTORY products

Let’s say that something bad happened with the ETL process, and there was an error. Now we want to restore to the version before this ETL. We want to restore all the tables, not just one, so I created a simple function that goes through the ETL table and restores them one by one:

def restore_tables_to_latest_versions():
    latest_versions_df = spark.table("lake01.etl_latest_tables_history")
    
    for row in latest_versions_df.collect():
        table_name = row["TableName"]
        version = row["version"]
        
        spark.sql(f"RESTORE TABLE {table_name} TO VERSION AS OF {version}")

Before execution, we see that the products table contains 6 rows, and the price for productId equal to one is 1049.

%%sql
SELECT * FROM products

We perform the restore using our function:

restore_tables_to_latest_versions()

After checking the number of rows and the price, we see that the restore was executed successfully, just like for all the other tables in my lakehouse.

This functionality is very simple and works similarly to RESTORE POINTS in Microsoft Fabric Warehouse. Of course, the above example is simplified just to showcase my idea, but I hope you’ve got my point. The function to collect metadata can be executed at the beginning and end of the ETL process, and the restore can be executed in case of failure. As simple as that!

Adrian Chodkowski
Follow me

Leave a Reply