DeltaTableETLStats_00

Don’t count rows in ETL, use Delta Log metrics!

Collecting statistics during your ETL process can be highly beneficial. These statistics can prove useful in various scenarios. For example, they allow you to track the growth of your platform and predict when and how you might need to adjust it. In a standard setup using relational databases, this often involves manually counting rows or tracking operations such as deletes, updates, and inserts.

When working with Microsoft Fabric Lakehouses or Databricks, however, the underlying Delta Lake format automatically maintains such information in its transaction log. How does it work? Let’s look at an example. All the examples provided are prepared in Microsoft Fabric and executed within the Fabric environment, but similar code should also work in other Spark environments.

Our test dataset contains simple descriptions of cars. It includes three attributes: id, make, and year. All this data is stored in a Fabric Lakehouse called seequalitylake01 in a table named cars.

The first operation we performed was inserting three new cars into the table. Following that, we inserted an additional two cars into the table.

data = [("1", "Toyota", 2010), ("2", "Honda", 2012), ("3", "Ford", 2015)]
columns = ["id", "make", "year"]
df = spark.createDataFrame(data, columns)
df.createOrReplaceTempView("tmp_cars")

spark.sql("CREATE TABLE IF NOT EXISTS seequalitylake01.cars (id STRING, make STRING, year INT)")
df.write.insertInto("cars")

new_data = [("4", "Chevrolet", 2018), ("5", "BMW", 2020)]
new_df = spark.createDataFrame(new_data, columns)
new_df.write.insertInto("seequalitylake01.cars")

The history of all operations is saved in the Delta log. However, we can also access this history using the DESCRIBE HISTORY SQL command or the history() method in PySpark. In the output, we are particularly interested in the operation, operationParameters (for reference), and operationMetrics columns. For simplicity, we can display only these columns:

from delta.tables import DeltaTable
history_df = DeltaTable.forName(spark, "seequalitylake01.cars").history().select(["operation","operationParameters","operationMetrics"])
display(history_df)

As shown in the screenshot above, the operationMetrics column provides values indicating the number of Parquet files created, the number of output rows, and their total size.

{"numFiles":"1",
"numOutputRows":"3",
"numOutputBytes":"1625"}

These basic statistics are sufficient for operations marked as “Append,” which is essentially the simple insert operation we performed earlier. However, this alone is not enough, so let’s perform additional operations.

We will take another three rows and merge them into the target table to simulate both update and insert operations. You are likely familiar with the MERGE statement, which we’ll use in this example in its PySpark version:

merge_data = [("1", "Toyota", 2011), ("3", "Ford", 2016), ("6", "Tesla", 2021)]
merge_columns = ["id", "make", "year"]
merge_df = spark.createDataFrame(merge_data, merge_columns)
merge_df.createOrReplaceTempView("merge_cars")

delta_table.alias("tgt").merge(
    merge_df.alias("src"),
    "tgt.id = src.id"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()

After that, I checked the history again, and we can see the Merge operation along with additional details in the operationMetrics column.

This time, we have much more information, including the number of rows inserted, updated, and deleted, along with their sizes.

{
  "numTargetRowsCopied": "1",
  "numTargetRowsDeleted": "0",
  "numTargetFilesAdded": "2",
  "numTargetBytesAdded": "3205",
  "numTargetBytesRemoved": "1625",
  "numTargetDeletionVectorsAdded": "0",
  "numTargetRowsMatchedUpdated": "2",
  "executionTimeMs": "3147",
  "numTargetRowsInserted": "1",
  "numTargetRowsMatchedDeleted": "0",
  "numTargetDeletionVectorsUpdated": "0",
  "scanTimeMs": "2094",
  "numTargetRowsUpdated": "2",
  "numOutputRows": "4",
  "numTargetDeletionVectorsRemoved": "0",
  "numTargetRowsNotMatchedBySourceUpdated": "0",
  "numTargetChangeFilesAdded": "0",
  "numSourceRows": "3",
  "numTargetFilesRemoved": "1",
  "numTargetRowsNotMatchedBySourceDeleted": "0",
  "rewriteTimeMs": "1163"
}

The last operation we haven’t checked yet is the delete operation. Let’s perform a delete directly on the Delta table:

delta_table.delete("make = 'Ford'")

The history describes the operation as expected, and all the metrics are recorded accurately.

{
  "numRemovedFiles": "1",
  "numRemovedBytes": "1625",
  "numCopiedRows": "2",
  "numDeletionVectorsAdded": "0",
  "numDeletionVectorsRemoved": "0",
  "numAddedChangeFiles": "0",
  "executionTimeMs": "2508",
  "numDeletionVectorsUpdated": "0",
  "numDeletedRows": "1",
  "scanTimeMs": "2145",
  "numAddedFiles": "1",
  "numAddedBytes": "1606",
  "rewriteTimeMs": "362"
}

How can we retrieve all of this information? Below is a simple code example to extract data from the JSON structure available in the operationMetrics column. An example function performing this task is shown below:

from datetime        import datetime
from delta.tables    import DeltaTable

def log_operation_metrics(catalog_name, table_name, job_id, job_run_id):

    """
    This function retrieves the latest operation metrics from the Delta table's history,
    then it constructs a DataFrame with these metrics, and finally it append it to a metrics_logging table.
    
    Parameters:
    - catalog_name: The name of the catalog
    - table_name: The name of the Delta table for which metrics are being logged.
    """

    # Construct the fully qualified table name
    full_table_name = f"{catalog_name}.{table_name}"
    
    # Retrieve the DeltaTable
    delta_table = DeltaTable.forName(spark, full_table_name)
    
    # Get the latest history record for the Delta table
    latest_history_record = delta_table.history(1).collect()[0]
    
    # Extract the operation type, metrics, and version
    operation_type = latest_history_record["operation"]
    operation_metrics = latest_history_record["operationMetrics"]
    table_version = latest_history_record["version"]
    
    # Initialize metric counts
    num_source_rows = num_inserted_rows = num_updated_rows = num_deleted_rows = 0
    
  
    if operation_type == "MERGE":
        num_source_rows = int(operation_metrics.get("numSourceRows", 0))
        num_inserted_rows = int(operation_metrics.get("numInsertedRows", 0))
        num_updated_rows = int(operation_metrics.get("numUpdatedRows", 0))
        num_deleted_rows = int(operation_metrics.get("numDeletedRows", 0))
    elif operation_type == "INSERT" or operation_type == "CREATE OR REPLACE TABLE AS SELECT":
        num_inserted_rows = int(operation_metrics.get("numOutputRows", 0))
    elif operation_type == "DELETE":
        num_deleted_rows = int(operation_metrics.get("numOutputRows", 0))

    # Prepare the data for logging
    data = [(catalog_name, table_name, datetime.now(), operation_type, num_source_rows, num_inserted_rows, num_updated_rows, num_deleted_rows, table_version, job_id, job_run_id)]
    schema = "`catalog_name` STRING,  `table_name` STRING, `operation_timestamp` TIMESTAMP, `operation_type` STRING, `num_source_rows` INT, `num_inserted_rows` INT, `num_updated_rows` INT, `num_deleted_rows` INT, `table_version` LONG,`job_id` STRING,`job_run_id` STRING"
    metrics_df = spark.createDataFrame(data, schema)
    
    display(metrics_df)

 

Of course, the script above is just an example and I am just displaying last operation from history but there is no problem to save this information with additional ETL information (I have job id that identifies job that performs ETL and job run id that is poiting to specific execution id – similar how it is built in Databricks). You can always adjust this script or write your own version based on the same mechanism. The data is there, waiting to be captured and saved. It’s as simple as that!

 

Adrian Chodkowski
Follow me

Leave a Reply