In my earlier article (link), I initiated a discussion on the functionality of Microsoft Fabric Event Streams. I provided an overview of the tool and covered the steps for connecting to data sources. In this follow-up article, I will guide you through the process of data processing and demonstrate how to efficiently store it in the target destination. Let’s go into the details!
Our current situation looks like this:
Our subsequent target in the configuration is the Lakehouse. To set this up, navigate to the Data Engineering view and select the Lakehouse button. Proceed by providing an appropriate name for the Lakehouse:
Moving to the next screen, a preview of our data will be displayed. It is crucial to choose the appropriate format to ensure that Event Streams can accurately interpret incoming messages. Under the “Advanced” settings, there is an option to specify the number of nested levels in our JSON data, among other details. Additionally, we can edit columns if necessary, allowing us to rename or remove fields as needed.
Once everything is set up correctly, you should receive a confirmation, signaling that the configuration is successful. At this point, you can proceed to the next steps.
Moving on to our next data source, choose the right option and point to the Lakehouse we set up earlier. Unlike the immediate data ingestion we chose for KQL, here we want to gather and process data within a specific time window. To make this adjustment, simply click on “Open event processor” as shown below:
- Aggregate and Group By: Data aggregation within specific columns, including a designated time window.
- Expand: Expand nested objects for a more detailed view.
- Manage Fields: Rename, drop, or add columns. Utilize built-in functions to create new columns.
- Filter: Apply criteria to filter messages based on specific conditions.
- Union: Combine multiple sources using the union operation.
Pay close attention to the Advanced section where you’ll find two crucial properties. These properties play a significant role in determining the minimum number of rows and the maximum time that Event Streams will wait before saving data. This is crucial for optimizing performance, especially considering that Delta files are highly dependent on file size. Performance can suffer if there are numerous small files, but having only a few very large files isn’t ideal either. I recommend adjusting these two options to values typically ranging between 128MB and 1GB. Keep in mind that you can always fine-tune the file size later as needed.
# Run the below script or schedule it to run regularly to optimize your Lakehouse table 'customers' from delta.tables import * deltaTable = DeltaTable.forName(spark, "customers") deltaTable.optimize().executeCompaction() # If you only want to optimize a subset of your data, you can specify an optional partition predicate. For example: # # from datetime import datetime, timedelta # startDate = (datetime.now() - timedelta(days=3)).strftime('%Y-%m-%d') # deltaTable.optimize().where("date > '{}'".format(startDate)).executeCompaction()
To test querying data in the Lakehouse, you can create a simple PySpark code, as illustrated below. Take note of the small triangle next to the “customers” table; it indicates that the data is stored on OneLake as a delta table. This icon was not visible in KQL database because we did not choose the option to store that data in OneLake.
Take a closer look at the detailed result in the Lakehouse. You’ll notice “SUM_customerCounter,” which shows the aggregation we defined in the Event Processor, and “Window_End_Time,” indicating the closing timestamp of a window. The repetition of the timestamp every five seconds aligns with our specified tumbling window.
Now, you have the flexibility to create any visualization that suits your needs, reflecting the data from the KQL Database.
Certainly, if you want the report to refresh automatically, remember to utilize the Page refresh option for the report page. This ensures that the report stays up-to-date without manual intervention.
- Executing SQL queries from Azure DevOps using Service Connection credentials - August 28, 2024
- Setup Git credentials for Service Principal in Azure Databricks - August 21, 2024
- Microsoft Fabric 101 Episode 3: Pausing and Scaling using portal and Powershell - August 8, 2024
Last comments