FaACbricStreamPart01_00

Processing stream data with Microsoft Fabric Event Streams (part1)

As you probably know Microsoft Fabric addresses most data analytics scenarios but one of the most important ones is Real-Time Analytics:

What does it mean? It means that Fabric gives us set of services where we can process our streaming data. We can del with such data in many different ways like:

  • streaming datasets/push datasets (aka semantic models) – well known feature of Power BI platform where we can push data directly to the model instead of pulling it. Pushing can be done in a streaming manner and visualized interactively on the Report or Dashboard (more about it you can find here),
  • streaming dataflows (retired) – this feature never entered GA and now it retired so I will not spend time on it (link),
  • Spark Structured Streaming – integral part of Spark which is available within Fabric. It is huge topic that we can write a lot about but for now I can just say that with Structured Streaming we have possibility to process streaming data in a micro-batch manner with error handling, integrity checks, aggregating in time-window based on distributed spark engine etc. If you are interested in this you can always check official documentation.
  • Event Streams – new compontent that gives no-code experience with possibility to read, process and write data in Fabric ecosystem. This compontent we be our today hero.

On the below screenshoot you can see how the tool like like:

As you can see this tool is fully no-code experience. It allows you to gather data from following data sources:

  • Azure Event Hub,
  • Azure IOT Hub,
  • Custom app – in this case endpoint will be created to which we can transfer data,

We can redirect our stream data to the following destinations:

  • Lakehouse – it will be registered in lakehouse metastore and will be saved in delta format on the OneLake.
  • KQL Database – data will be saved in KQL Db and optionally saved ub delta firnat ib the OneLake,
  • Custom App – we can redirect stream data to the custom endpoint outside of this Event Stream.

Of course, we can perform much more than taking data from source and save it in destination. For that we can use Event Processor – special Event streams component for data transformations:

As you can see, there is once again a no-code experience where you can execute the following transformations:

– Aggregation & grouping
– Filtration
– Union
– Expanding nested types
– Modifying, dropping, and creating new attributes (using a set of built-in functions designed for that purpose).

I would like to emphasize the importance of grouping and aggregating, particularly in the context of streaming data. You have the ability to group by time window, and in the Event Processor, the available time windows include:

  • Hopping: A series of fixed-size, overlapping, and contiguous time intervals. For example, a 10-second window with a 5-second offset means that another window will start after 5 seconds when the previous one was created.
  • Session: Groups events that arrive at similar times, filtering out periods of time where there is no data.
  • Sliding: The system is asked to logically consider all possible windows of a given length, for example, 10 seconds. In this case, the system decides how many windows will be created.
  • Snapshot: Groups events that have the same timestamp.
  • Tumbling: A series of fixed-sized, non-overlapping, and contiguous time intervals. For example, a 10-second window during which we aggregate events. After 10 seconds, another window is created, and aggregation starts once more.

If you wish to understand how it works, I recommend reading the official Stream Analytics documentation [link]. In most cases, you will need such functions to conduct a proper analysis based on this data. I will provide an example later in this article to demonstrate its functionality.

Now that we have a better understanding of how these processes function, let’s proceed to test them!

First of all, I would like to present my scenario:

As you can see in the above picture, I will have two data sources. The first one will be an IoT device simulated as a Raspberry Pi using the Azure Simulator, which you will find here.

It will send information in JSON format to my Azure IoT Hub in my subscription. The code responsible for sending the data looks like this:

const wpi = require('wiring-pi');
const Client = require('azure-iot-device').Client;
const Message = require('azure-iot-device').Message;
const Protocol = require('azure-iot-device-mqtt').Mqtt;
const BME280 = require('bme280-sensor');

const BME280_OPTION = {
  i2cBusNo: 1,
  i2cAddress: BME280.BME280_DEFAULT_I2C_ADDRESS()
};

const connectionString = 'HostName=ioth-sample.azure-devices.net;DeviceId=device01;SharedAccessKey=XBHu4fCiNZTyrGwv5laNWojeY6AqV06CGAIoTMbYvQU=';
const LEDPin = 4;

var sendingMessage = false;
var messageId = 100; // Start messageId from 100
var client, sensor;
var cityDeviceIds = {}; // Store deviceIds for each city
var blinkLEDTimeout = null;

function getDeviceId(city) {
  if (!cityDeviceIds[city]) {
    cityDeviceIds[city] = Math.floor(Math.random() * 900) + 101; // Generate a number greater than 100
  }
  return cityDeviceIds[city];
}

function getMessage(cb) {
  messageId++;
  const cities = ['Warszawa', 'Poznan', 'Lodz', 'Lublin','Krakow', 'Wroclaw', 'Katowice', 'Rzeszow', 'Zakopane'];

  for (let i = cities.length - 1; i > 0; i--) {
    const j = Math.floor(Math.random() * (i + 1));
    [cities[i], cities[j]] = [cities[j], cities[i]];
  }

  const selectedCity = cities[messageId % cities.length];
  const deviceID2 = getDeviceId(selectedCity);
  const customersEntered2 = Math.floor(Math.random() * 10) + 1;

  // Get current UTC time
  const processedTime = new Date().toISOString();

  cb(JSON.stringify({
    messageId: messageId,
    deviceId: 'device2',
    city: selectedCity,
    entryType: 'registered',
    customerCounter: customersEntered2,
    processedTime: processedTime  // Add the current UTC time
  }));
}

function sendMessage() {
  if (!sendingMessage) { return; }

  getMessage(function (content) {
    var message = new Message(content);
    console.log('Sending message: ' + content);
    client.sendEvent(message, function (err) {
      if (err) {
        console.error('Failed to send message to Azure IoT Hub');
      } else {
        blinkLED();
        console.log('Message sent to Azure IoT Hub');
      }
    });
  });
}

function onStart(request, response) {
  console.log('Try to invoke method start(' + request.payload + ')');
  sendingMessage = true;

  response.send(200, 'Successfully start sending message to cloud', function (err) {
    if (err) {
      console.error('[IoT hub Client] Failed sending a method response:\n' + err.message);
    }
  });
}

function onStop(request, response) {
  console.log('Try to invoke method stop(' + request.payload + ')');
  sendingMessage = false;

  response.send(200, 'Successfully stop sending message to cloud', function (err) {
    if (err) {
      console.error('[IoT hub Client] Failed sending a method response:\n' + err.message);
    }
  });
}

function receiveMessageCallback(msg) {
  blinkLED();
  var message = msg.getData().toString('utf-8');
  client.complete(msg, function () {
    console.log('Receive message: ' + message);
  });
}

function blinkLED() {
  if (blinkLEDTimeout) {
    clearTimeout(blinkLEDTimeout);
  }
  wpi.digitalWrite(LEDPin, 1);
  blinkLEDTimeout = setTimeout(function () {
    wpi.digitalWrite(LEDPin, 0);
  }, 500);
}

wpi.setup('wpi');
wpi.pinMode(LEDPin, wpi.OUTPUT);
sensor = new BME280(BME280_OPTION);
sensor.init()
  .then(function () {
    sendingMessage = true;
  })
  .catch(function (err) {
    console.error(err.message || err);
  });

client = Client.fromConnectionString(connectionString, Protocol);

client.open(function (err) {
  if (err) {
    console.error('[IoT hub Client] Connect error: ' + err.message);
    return;
  }

  client.onDeviceMethod('start', onStart);
  client.onDeviceMethod('stop', onStop);
  client.on('message', receiveMessageCallback);
  setInterval(sendMessage, 500);
});

 

Secondly, we have a source called ‘Software,’ which should simulate a real application. In my case, it’s just a PowerShell script that will simultaneously send information to Azure Event Hub. The code looks like this:

$namespace = "ehn-fabric-streaming"
$eventHubName = "eventhub01"
$connectionString = "Endpoint=sb://$namespace.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=KkEW2KIc3IvAZMiM0GbIqCSYFZLTmwdzY+AEhEdCJw8="

# Load the System.Web assembly to enable UrlEncode
[Reflection.Assembly]::LoadWithPartialName("System.Web") | Out-Null

function Get-RandomEventData {
    $eventData = @{
        messageId = (Get-Random)
        deviceId = "device1"
        entryType = "anonymous"
        city = Get-Random -InputObject @('Warszawa', 'Poznan', 'Lodz', 'Lublin','Krakow', 'Wroclaw', 'Katowice', 'Rzeszow', 'Zakopane')
        customerCounter = Get-Random -Minimum 1 -Maximum 51
        processedTime = (Get-Date).ToUniversalTime().ToString("yyyy-MM-ddTHH:mm:ssZ")
    }

    # Create JSON string manually
    $jsonString = @"
    {
        "messageId": $($eventData['messageId']),
        "deviceId": "$($eventData['deviceId'])",
        "city": "$($eventData['city'])",
        "entryType": "$($eventData['entryType'])",
        "customerCounter": $($eventData['customerCounter']),
        "processedTime": "$($eventData['processedTime'])"
    }
"@

    return $jsonString
}

while ($true) {
    $URI = "{0}.servicebus.windows.net/{1}" -f @($namespace, $eventHubName)
    $encodedURI = [System.Web.HttpUtility]::UrlEncode($URI)

    # Calculate expiry value one hour ahead
    $expiry = [string](([DateTimeOffset]::Now.ToUnixTimeSeconds()) + 3600)

    # Create the signature
    $stringToSign = [System.Web.HttpUtility]::UrlEncode($URI) + "`n" + $expiry

    $hmacsha = New-Object System.Security.Cryptography.HMACSHA256
    $hmacsha.key = [Text.Encoding]::ASCII.GetBytes($connectionString.Split(';')[2].Substring("SharedAccessKey=".Length))

    $signature = $hmacsha.ComputeHash([Text.Encoding]::ASCII.GetBytes($stringToSign))
    $signature = [System.Web.HttpUtility]::UrlEncode([Convert]::ToBase64String($signature))

    # Create Request Body with sample data
    $body = Get-RandomEventData

    # API headers
    $headers = @{
        "Authorization" = "SharedAccessSignature sr=" + $encodedURI + "&sig=" + $signature + "&se=" + $expiry + "&skn=RootManageSharedAccessKey";
        "Content-Type" = "application/json"; # Change to application/json
        "Content-Length" = ("{0}" -f ($body.Length))
    }

    # Execute the Azure REST API
    $method = "POST"
    $dest = 'https://' + $URI + '/messages?timeout=60&api-version=2014-01'

    Write-Host $body
    # Invoke REST API
    Invoke-RestMethod -Uri $dest -Method $method -Headers $headers -Body $body -Verbose

    # Wait for 1 second before sending the next message
    Start-Sleep -Seconds 2
}

I will not explain in detail how above scripts work but I want to put them here just in case if you want to test functionalities on your own.

So now, data will be sent to the Azure Event Hub and Azure IoT Hub, from where we will consume it using our hero of the day: Event Stream. This service will process the data and sink it into one of two sources:

  1. Lakehouse for long-term retention, where data will be aggregated and prepared for further analysis.
  2. KQL Database for short-term retention, where data will not be aggregated, and it will be ready for near real-time visualization in the report.

I believe it is pretty straightforward, so let’s start by creating the proper objects in the Fabric workspace. First of all, we will create targets (Lakehouse and KQL). It is extremely simple, and in our scenario, it only involves giving the proper name to both objects:

After that, we can create the Event Stream by selecting the appropriate icon and giving it a proper name:

 

When it is created, we can open our Event Stream and add our sources. Let’s begin by creating a connection to Azure IoT Hub. We can do this by clicking the plus icon in the ‘New source’ section:

Creating a connection should not be a problem. We just have to provide the IoT Hub URL and the shared access key, which is the only authentication supported right now for this kind of source. When we create the connection, we can also define the proper consumer group (in my case, it will be just the default one) and the format in which data will be sent, which is JSON

Once it is created, it should be visible in the diagram, and after some time, it will get the status ‘Streaming.’ Without hesitation, let’s create another source for Azure Event Hub by selecting the proper option from the graphical interface:

As you can see in the screenshots below, creating a connection to Azure Event Hub is very similar. In this case, the only difference is that we have to provide the Event Hub Namespace and the specific Event Hub within this namespace that we want to use. The rest of it is the same:

After some time, both sources should have the proper status. When we click on either of them, we should see a preview of the data. Additionally, we can specify the data format so that EventStream will display the data in a nicely formatted tabular form:

 

That is is for now, in the next part I will try to show you how data can be ingested into the target and visualized in the Power BI report.

Leave a Reply