StructuredStreamingFabric_00

Loading Files with Spark Structured Streaming in Microsoft Fabric

The concept of a lakehouse typically relies on the loading of raw data in native formats such as JSON, Parquet, Avro, etc., into the Delta format. A similar procedure is observed in Microsoft Fabric, where our initial task involves extracting files from source systems (utilizing tools such as Data Factory) and placing them in the staging area. Subsequently, we load these files into the first layer of a lakehouse, commonly referred to as the ‘bronze’ layer in accordance with the Medallion architecture. While there are various methods to execute this loading process, this article aims to demonstrate how to accomplish it using Structured Streaming. Let’s delve into the details!

First and foremost, we require sample data to replicate our staging dataset. Within my Azure subscription, I maintain a storage repository containing sample data obtained from Azure Open Datasets (accessible via this link). For our present purposes, we will utilize the dataset titled “NYC Taxi & Limousine Commission – yellow taxi trip records,” although you are free to employ any dataset of your choosing during the testing phase. As illustrated, my storage repository is currently empty:

Before we delve directly into the topic at hand, I have established a Fabric Workspace:

To facilitate smoother navigation, let’s transition to the Data Engineering view, where we will gain access to the artifacts we intend to create:

 

 

The initial object that we will generate is the Lakehouse. Upon its creation, a dedicated folder will be established in OneLake, serving as the destination for our data in Delta format:

Regarding the Lakehouse, I have previously prepared an article on this topic, which you can access here. For the purpose of this demonstration, I have designated it as “SeequalityLakehouse”:

In order to make my demo storage accessible within Fabric, I must create a new object called a “shortcut.” Shortcuts are objects within OneLake that function as pointers or references to other storage locations. These storage locations can be either internal, signifying they are located within OneLake, or external, indicating they are situated in an external storage object.

 

For external storages, OneLake provides compatibility with Azure Data Lake Storage Gen2 (Blob storage with hierarchical namespace enabled) or Amazon S3. You might wonder why we need shortcuts within OneLake. The answer is straightforward: in organizations with multiple departments, it’s common to have distinct sets of workspaces within the data architecture. By creating shortcuts, we gain the flexibility to implement an appropriate data strategy within a versatile data mesh approach. This enables efficient management of data across different parts of the organization while maintaining a coherent and scalable data infrastructure.

In our specific case, we will be creating a shortcut to Data Lake Storage. The configuration process is straightforward, and we simply need to provide the following information:

  • URL to the storage account.
  • Connection name.
  • Authentication kind, which can be either Account Key, SAS token, or OAuth authentication. For the sake of simplicity, I have used the Account Key in this demonstration. However, I strongly recommend utilizing Azure Active Directory (AAD) credentials for enhanced security and access control.

Of course, we have the option to assign a name to our shortcut and specify a subpath as well:

Our initial setup is configured for reading, allowing us to proceed with the creation of a new notebook where we will house our code:

Within our code, we will make references to the shortcut, facilitating the ability to right-click on it and copy the ABFS path as needed (we can also use relative paths if we want):

In the first cell of our notebook, we can import the necessary libraries and set up some variables:

  • source_path: This variable will store the ABFS path of our shortcut, which will act as our data source.
  • target_path: This variable will represent the location within OneLake’s Tables folder. Fabric will automatically detect the creation of a delta table there and register it automatically.
  • checkpoint_path: We will utilize Spark Structured Streaming for loading files from the directory, and to ensure the consistency of our data flow, it will use checkpoints. In this example, we will store checkpoint files next to the target table in a folder named “_checkpoint.” Please note that folders starting with an underscore are treated as technical folders.
import os
from pyspark.sql.types import *

source_path = "abfss://0757a31a-a300-4cdb-99b3-75456d1486e2@onelake.dfs.fabric.microsoft.com/6f7a5ea8-be90-4936-8cb7-1dd1e46194ab/Files/taxidata"
target_path = "abfss://0757a31a-a300-4cdb-99b3-75456d1486e2@onelake.dfs.fabric.microsoft.com/6f7a5ea8-be90-4936-8cb7-1dd1e46194ab/Tables/taxidata/"
checkpoint_path = target_path+"_checkpoint"

In the next cell, we have the main code for this simple scenario:

from pyspark.sql.functions import input_file_name

spark.conf.set("spark.sql.streaming.schemaInference",True)

parquet_stream = spark.readStream \
                       .parquet(source_path) \
                       .withColumn("FileName", input_file_name())
(
parquet_stream
    .writeStream.format("delta")
    .outputMode("append")
    .option("checkpointLocation",checkpoint_path)
    .option("format","delta")
    .option("path",target_path)
    .trigger(availableNow=True)
    .start()
    .awaitTermination()
)

We will begin by importing the input_file_name function, which will be essential for tracking the names of files that have already been loaded. Additionally, we’ll set the session variable spark.sql.streaming.schemaInference to True. This setting implies that Spark will automatically infer the schema from Parquet files. If you prefer not to use schema inference, you can manually define the schema in a dedicated struct or extract it from a single file during the process:

schema = StructType([
    StructField('VendorID', LongType(), True),
    StructField('tpep_pickup_datetime', TimestampType(), True),
    StructField('tpep_dropoff_datetime', TimestampType(), True),
    StructField('passenger_count', DoubleType(), True),
    StructField('trip_distance', DoubleType(), True),
    StructField('RatecodeID', DoubleType(), True),
    StructField('store_and_fwd_flag', StringType(), True),
    StructField('PULocationID', LongType(), True),
    StructField('DOLocationID', LongType(), True),
    StructField('payment_type', LongType(), True),
    StructField('fare_amount', DoubleType(), True),
    StructField('extra', DoubleType(), True),
    StructField('mta_tax', DoubleType(), True),
    StructField('tip_amount', DoubleType(), True),
    StructField('tolls_amount', DoubleType(), True),
    StructField('improvement_surcharge', DoubleType(), True),
    StructField('total_amount', DoubleType(), True),
    StructField('congestion_surcharge', DoubleType(), True),
    StructField('airport_fee', DoubleType(), True)
])

The above code can be passed as a parameter to the .schema method of a readStream.

  • spark.readStream: This method is used to read data from a streaming source. We need to specify where we are reading the stream from (in our case, it will be a directory) and the expected format. This method is versatile and can be used to read data from various sources, with more information available in the Spark documentation. Additionally, an extra column is added using the withColumn construction to store information about the source file.
  • spark.writeStream: This method is used to save streaming data. Key properties include:
    • format: Typically set to “delta” for Delta Lake.
    • outputMode: For loading files from a directory, “append” is often used. This means that data from new files is appended to the target table. Other possible values like “Complete” and “Update” are more suitable for aggregate queries.
    • checkpointLocation: Specifies the path on storage where checkpoints will be stored.
    • path: Specifies the target path where data will be saved. The toTable() method can also be used to save data directly to a registered table, but in this case, the path is specified, especially because Fabric will automatically discover it and make it visible from the Lakehouse.
    • trigger: When AvailableNow=True is set, it means the streaming job will execute once, load all the files, and end. This is useful for loading files in a batch-like manner. Spark keeps track of which files have already been loaded, ensuring idempotent reads and writes – the same file will be read and loaded exactly once.
    • start: Initiates the streaming process.
    • awaitTermination: An optional property that makes the streaming process wait until all files have been loaded.

With a basic understanding of how this works, let’s proceed to test it. I’ve uploaded one Parquet file to your source storage:

Following the execution of the above code, you can verify whether the target table contains data and determine how many rows were loaded from a specific file. You can accomplish this by switching to SQL language using the %%sql command:

%%sql

SELECT DISTINCT 
     FileName
    ,count(*) AS NoOfRows 
FROM taxidata
GROUP BY FileName

As evident from the results, one file has been loaded as expected:

I uploaded three more rows to observe if they will load correctly:

 

Upon the execution of the notebook, I can confirm that all the data is in place. It’s essential to note that the first file retains the exact number of rows it had before, demonstrating that the read and load operations were idempotent, ensuring that the data was not duplicated during the process.

What can we do with our code? We have a few options. One of them is to prepare a Spark Job Definition. Another option is to run the notebook from Data Factory. I’ll demonstrate the second option. Let’s navigate to the workspace and select “Data pipeline” to create it:

Please provide a suitable name for the pipeline:

Our pipeline will consist of a single activity called “Notebook,” which you can easily locate:

We can select our notebook from the list, and it will be available automatically because it is in the same workspace, requiring no additional configuration:

Our pipeline is now ready, and we have the option to run it on demand or set up a schedule for automatic execution:

I executed it on demand, and it was successful. Please take note that there is an “Output” button that not only displays basic information but also provides a URL for accessing a more detailed view:

This link will redirect us to a highly detailed view of how the code was executed by Spark:

In this article, we’re taking a journey through running Spark code using Azure Data Factory all within Fabric. It all starts with setting up a Fabric Workspace and creating some cool objects like Lakehouses and shortcuts. As you can see writing generic code that will fit all your needs is not very hard. Stay tuned for my upcoming articles where I’ll dive into more topics in this exciting domain!

Leave a Reply