SynapseStreamAnalytics_00

Loading stream data into Synapse with Event Hub and Stream Analytics

Azure contains multiple services that can handle stream data. I can mention just few of them:

  • Stream Analytics
  • Data Explorer
  • Synapse Spark
  • Databricks

The above list is not everything that you can use. If you want to have a solution that will minimize maintenance and simplify the entire process you can use PaaS services like Event Hubs and Stream Analytics. Azure Event Hub is used as an endpoint for stream publisher. It is especially appropriate when you want to deal with Kafka stream because Event Hub is fully compatible with such scenarios. Try to imagine the situation where you want to get stream data and then materialize it in your data warehouse. Is it possible? Of course it is! In this article I would like to share with you an example of how you can load Synapse Analytics with Azure Event Hubs and Stream Analytics.

First of all, we will configure our Event Hub Namespace. This service is fairly simple to prepare – on the first screen you have to specify not only standard properties like location, name and resource group but also some specific ones:

  • Pricing Tier – Event Hub contains four options regarding pricing tier: Basic, Standard, Premium and Dedicated. You have to choose one that will be appropriate for your needs. If you ask what is the main difference between them I mention few of them for example Capture mechanism that will automatically save event messages to Azure Storage is not available in Basic tier, storage retention is also different between tiers – more information you can find in the documentation.
  • Throughput Units – it is a performance unit, one Throughput Unit provides 1 MB/s ingress and 2 MB/s egress, you can of course increase it if you need it,
  • Enable Auto-Inflate – when turned on it automatically scales up by increasing the number of TUs, to meet technical needs.

On the Advanced tab we can choose which TLS version we want to use to secure connection to our Event Hub:

Networking tab gives us option to specify if we want to use Public or Private endpoint:

As you can see the configuration is pretty simple – for our demo purposes you can see what I chose on the below screen:

When we have Event Hubs Namespace already in space we can go to Event Hubs to create new one:

On the first screen we can set a name but also a number of partitions that we want to use and how long our messages should be kept:

On the next page, we have the option to define a Capture mechanism that can automatically save our messages to Azure Storage in AVRO format. It is pretty useful but for our scenario we will leave it turned off:

Our Event hub is up and running so we can go to the next service that we want to use – Stream Analytics. In this case, we will use Stream Analytics job instead of the cluster because we have a pretty small stream so there is no need to establish an entire cluster. In Stream analytics performance is measured in SU (Streaming units) which represents CPU, Memory etc. associated with the job – for our purpose 1 is enough:


Job is done so we can set input:

Many different sources can be used as input for our job but we chose Event Hub. The only thing that we should set here that is not obvious is consumer group – we can say in a nutshell that you can have multiple applications that will consume the stream and they should have an independent state of a stream – in this case, they should be in a different consumer group. Next important thing is authentication – I chose to use system assigned managed identity so my Stream Analytics should be added to Event Hub to have proper rights to read the data. Next, we have some additional settings like format of a message compression etc:

When we have an input then we can set a query that will take the data from input and put it in a proper output. My query depends on TableName column, if value is Customer then it should be redirected to Customer output, if Product then Product output:

SELECT
*
INTO
    [Customer]
FROM
    [seequality-eh]
WHERE
    [TableName]='Customer';

SELECT
*
INTO
    [Product]
FROM
    [seequality-eh]
WHERE
    [TableName]='Product';
Before we will create output in Stream Analytics we have to create target tables in Dedicated SQL Pool – both tables are typical staging tables with no indexes and ROUND_ROBIN distribution mechanism. Please notice also columns like EventProcessedUtcTime, ParitionId and EventEnqueuedUtcTime that are mandatory metadata columns added to the stream in previous stages:
CREATE TABLE [dbo].[staging_Customer]
(
    [Name] [nvarchar](50)  NULL,
    [Surname] [nvarchar](50)  NULL,
    [Country] [nvarchar](50)  NULL,
    [BirthDate] [date]  NULL,
    [Gender] [char](1)  NULL,
    [Status] [nvarchar](50)  NULL,
    [TableName] [nvarchar](500)  NULL,
    [EventProcessedUtcTime] [datetime]  NULL,
    [PartitionId] [bigint]  NULL,
    [EventEnqueuedUtcTime] [datetime]  NULL
)
WITH
(
    DISTRIBUTION = ROUND_ROBIN,
    HEAP
)
GO

CREATE TABLE [dbo].[staging_Product]
(
    [Name] [nvarchar](50)  NULL,
    [Group] [nvarchar](50)  NULL,
    [Price] [decimal](16,2)  NULL,
    [FirstPurchaseDate] [date]  NULL,
    [CountryOfOrigin] [nvarchar](50)  NULL,
    [TableName] [nvarchar](500)  NULL,
    [EventProcessedUtcTime] [datetime]  NULL,
    [PartitionId] [bigint]  NULL,
    [EventEnqueuedUtcTime] [datetime]  NULL
)
WITH
(
    DISTRIBUTION = ROUND_ROBIN,
    HEAP
)
GO
In this case Stream Analytics also should proper rights to write data to those tables. To simplify things I added its managed identity to the db_owner role:
CREATE USER [sqstreamanalytics] FROM EXTERNAL PROVIDER
sp_addrolemember 'db_owner','sqstreamanalytics'

When everything is ready on the database side we can prepare output (please notice that Managed Identity based authentication is still in preview – if you don’t want to use there are different options for example Basic auth):

When we chose Synapse Dedicated SQL Pool as a target then we additionally will be asked to add a connection to Storage Account – why? You will see later:

Everything looks good so now we need our stream. For that purpose, I have a demo script that will send messages directly to Event Hub (for both tables). The only thing that you have to set is the name of objects inside Event hub and the policy name that will use (more about authentication to Event Hub you can find here) – for demo purposes I used RootManagedSharedAccessKey but in production deployments, it is good to define permissions on the lower level.

$eventHubName = "seequality-eh"
$eventHubNameSpace = "seequality-ehns"
$policyName = "RootManageSharedAccessKey"
$policyKey = "9kSCwblMmGb9Wm0sUeCYl7EX07nLJzk44PaSa/uMG0g="

# Load the System.Web assembly to enable UrlEncode

[Reflection.Assembly]::LoadWithPartialName("System.Web") | Out-Null

$URI = "{0}.servicebus.windows.net/{1}" -f @($eventHubNameSpace,$eventHubName)
$encodedURI = [System.Web.HttpUtility]::UrlEncode($URI)
$expiry = [string](([DateTimeOffset]::Now.ToUnixTimeSeconds())+3600)

# Create the signature

$stringToSign = [System.Web.HttpUtility]::UrlEncode($URI) + "`n" + $expiry
$hmacsha = New-Object System.Security.Cryptography.HMACSHA256
$hmacsha.key = [Text.Encoding]::ASCII.GetBytes($policyKey)

$signature = $hmacsha.ComputeHash([Text.Encoding]::ASCII.GetBytes($stringToSign))
$signature = [System.Web.HttpUtility]::UrlEncode([Convert]::ToBase64String($signature))

# create Request Body
$body = "{
'Name':'John',
'Surname':'Doe',
'Country':'Denmark',
'BirthDate':'1998-01-01',
'Gender':'M',
'Status':'Single',
'TableName':'Customer'
}"

$headers = @{

"Authorization"="SharedAccessSignature sr=" + $encodedURI + "&sig=" + $signature + "&se=" + $expiry + "&skn=" + $policyName;

"Content-Type"="application/atom+xml;type=entry;charset=utf-8"; # must be this

"Content-Length" = ("{0}" -f ($body.Length))

}

# execute the Azure REST API

$method = "POST"
$uriToInvoke = 'https://' +$URI +'/messages?timeout=60&api-version=2014-01'
Invoke-RestMethod -Uri $uriToInvoke -Method $method -Headers $headers -Body $body -Verbose
}

As you can see below everything works as expected and data landed in the proper table:

Let’s change in above script “body” part to pass another row to Customer table:

$body = "{
'Name':'John',
'Surname':'Doe',
'Country':'Denmark',
'BirthDate':'1998-01-01',
'Gender':'M',
'Status':'Single',
'TableName':'Customer'
}"

Let’s change the body part one more time – this time we will pass data to Product table:

$body = "{
'Name':'Mountain Bike',
'Group':'Bike',
'Price':'1000.50',
'FirstPurchaseDate':'2000-02-01',
'CountryOfOrigin':'USA',
'TableName':'Product'
}"

Below you can see result of my transformation:

You can ask how this data was loaded into the table and the answer is visible on the below screenshot that comes from logs:

COPY INTO is used and to do that properly data must be loaded from Azure Storage and that is the reason why we have to choose storage during the setup of Stream Analytics output. Data for some period of time is stored on the storage just to load it properly to Dedicated SQL Pool.

In this article, I showed only the basic configuration of the stream directly to Synapse but I hope now you have a general overview of this setup. Please keep in mind that you have much more complex transformations in Stream analytics and you can also aggregate events or get them together for some period of time to load data in bigger chunks. I hope you will find the article useful. Thanks!

Leave a Reply