FabricEventStreamsIntro_01

Processing stream data with Microsoft Fabric Event Streams (part2)

In my earlier article (link), I initiated a discussion on the functionality of Microsoft Fabric Event Streams. I provided an overview of the tool and covered the steps for connecting to data sources. In this follow-up article, I will guide you through the process of data processing and demonstrate how to efficiently store it in the target destination. Let’s go into the details!

Our current situation looks like this:

With Azure Event Hub and IoT Hub successfully linked as sources in Event Streams, we can now move on to configuring our targets. To create a KQL Database, the simplest approach is to navigate to the Real-Time Analytics view. Locate the “KQL Database” button and proceed by providing the desired name:

Our subsequent target in the configuration is the Lakehouse. To set this up, navigate to the Data Engineering view and select the Lakehouse button. Proceed by providing an appropriate name for the Lakehouse:

Our target objects are now prepared. Now let’s link KQL to the Event Stream, the process is straightforward, as illustrated in the screenshot below. It’s important to note the availability of two data ingestion modes:

  1. Direct Ingestion: This mode ensures that data is ingested into KQL as promptly as possible, without additional transformations.
  2. Event Processing before Ingestion: This utilizes the Event Processor to introduce modifications before ingestion.

For our KQL setup, where the priority is to save data ASAP so the appropriate choice is Direct Ingestion. Simply select this option and click “Add and configure” to proceed.

Upon selecting “Add and configure,” a user-friendly wizard will open, allowing us to set up all the necessary details. Initially, we can add the target table in KQL and specify additional filters, such as the event retrieval start date:

Moving to the next screen, a preview of our data will be displayed. It is crucial to choose the appropriate format to ensure that Event Streams can accurately interpret incoming messages. Under the “Advanced” settings, there is an option to specify the number of nested levels in our JSON data, among other details. Additionally, we can edit columns if necessary, allowing us to rename or remove fields as needed.

Once everything is set up correctly, you should receive a confirmation, signaling that the configuration is successful. At this point, you can proceed to the next steps.

To verify the ingestion of data into the KQL database, you can execute some queries. KQL, as the name suggests, is very similar to technologies like Log Analytics or Data Explorer already present in the Azure ecosystem. For individuals accustomed to working with these technologies, adapting to KQL should be relatively straightforward. I highly recommend familiarizing yourself with Kusto Query Language, as its simplicity and flexibility offer numerous opportunities for data analysis and exploration. On the screenshot below you can find information that data is loaded correctly:

Moving on to our next data source, choose the right option and point to the Lakehouse we set up earlier. Unlike the immediate data ingestion we chose for  KQL, here we want to gather and process data within a specific time window. To make this adjustment, simply click on “Open event processor” as shown below:

This action will open a new window where we can use a graphical interface to transform our data.

  • Aggregate and Group By: Data aggregation within specific columns, including a designated time window.
  • Expand: Expand nested objects for a more detailed view.
  • Manage Fields: Rename, drop, or add columns. Utilize built-in functions to create new columns.
  • Filter: Apply criteria to filter messages based on specific conditions.
  • Union: Combine multiple sources using the union operation.

In our specific scenario, I used the “Group By” option. As depicted in the screenshot below, I selected to group the data based on deviceId, city, and entryType. Furthermore, my intention is to aggregate the information within a five-second tumbling window:

Pay close attention to the Advanced section where you’ll find two crucial properties. These properties play a significant role in determining the minimum number of rows and the maximum time that Event Streams will wait before saving data. This is crucial for optimizing performance, especially considering that Delta files are highly dependent on file size. Performance can suffer if there are numerous small files, but having only a few very large files isn’t ideal either. I recommend adjusting these two options to values typically ranging between 128MB and 1GB. Keep in mind that you can always fine-tune the file size later as needed.

 

When choosing the Lakehouse icon, take note of the “Table optimization is available” feature:

Clicking on it will generate a new notebook containing basic code that executes the OPTIMIZE command on your delta table (as discussed briefly here). It’s essential to recognize that this represents the simplest form of OPTIMIZE available. It’s advisable to develop your own code, treating this as a sample and a helpful starting point.

# Run the below script or schedule it to run regularly to optimize your Lakehouse table 'customers'

from delta.tables import *
deltaTable = DeltaTable.forName(spark, "customers")
deltaTable.optimize().executeCompaction()

# If you only want to optimize a subset of your data, you can specify an optional partition predicate. For example:
#
#     from datetime import datetime, timedelta
#     startDate = (datetime.now() - timedelta(days=3)).strftime('%Y-%m-%d')
#     deltaTable.optimize().where("date > '{}'".format(startDate)).executeCompaction()

Our two targets should be set up and ready for use at this point:

To test querying data in the Lakehouse, you can create a simple PySpark code, as illustrated below. Take note of the small triangle next to the “customers” table; it indicates that the data is stored on OneLake as a delta table. This icon was not visible in KQL database because we did not choose the option to store that data in OneLake.

Take a closer look at the detailed result in the Lakehouse. You’ll notice “SUM_customerCounter,” which shows the aggregation we defined in the Event Processor, and “Window_End_Time,” indicating the closing timestamp of a window. The repetition of the timestamp every five seconds aligns with our specified tumbling window.

Now, let’s move on to the visualization phase. While we could directly connect to KQL or Lakehouse from Power BI Desktop, for our current purpose, let’s navigate to the KQL Database and select “Build Power BI report” directly from the browser.

Now, you have the flexibility to create any visualization that suits your needs, reflecting the data from the KQL Database.

Certainly, if you want the report to refresh automatically, remember to utilize the Page refresh option for the report page. This ensures that the report stays up-to-date without manual intervention.

 

It’s important to note that the page refresh option can be controlled by administrators, who can specify the minimum time interval for page refresh. It’s advisable to configure this setting based on the actual business requirements, considering that each refresh consumes resources.

That concludes the article for now. I hope you found it informative and enjoyable. I encourage you to explore the described functionality on your own. Stay tuned for more articles about Fabric coming soon.

Leave a Reply