38c2f468b2da4585819c1577c4f511ed

Load Synapse Analytics SQL Pool with Azure Databricks

In many projects, we have different tools that must integrate with each other. These scenarios are common when we talk about Synapse Analytics and Azure Databricks. Integrating those platforms can be beneficial in several ways. By combining the capabilities of Azure Synapse’s SQL pool with Azure Databricks’ Apache Spark cluster, you can analyze and transform data using a range of powerful features. Additionally, you can use Azure Synapse to store and manage data and use Azure Databricks to perform advanced analytics on that data. The integration enables you to easily access data in Azure Synapse from Azure Databricks and vice versa, making it simple to share data between the two platforms. You can also use Azure Synapse to schedule and orchestrate data pipelines with Azure Data Factory and use Azure Databricks to perform data transformations. Additionally, the integration allows you to take advantage of Azure Synapse’s security and compliance features in your Azure Databricks notebooks and jobs. Overall, the integration between Azure Synapse and Azure Databricks enables you to build powerful data pipelines and perform advanced analytics on your data.

Microsoft and Databricks work closely together, so we can see architectures where these tools are used together. For example, you can see the reference architecture from Microsoft called Modern analytics architecture with Azure Databricks (link):

In this case, you can notice that Databricks acts as an ETL tool and Azure Synapse Analytics is a data consumer and final target for already transformed data. It works well and I use this approach in many projects because it has many benefits and all the tools integrate nicely. In this article, I will try to show you how easily you can connect from Databricks to Synapse.

First of all, I want to mention that I will use a dedicated service principal to provide communication between services. If you don’t know what it is, I can say that a service principal is an identity that is used to authenticate an application or service in Azure Active Directory (AAD). It is similar to a user account, but is intended to be used for automated tasks and processes, rather than by a human user.

You have several methods to register a service principal, but I will do it via the Azure portal. In the search box, type ‘app registration‘:

To add a new service principal click New registration:

For our demo purposes, we will add the name ‘databricks_to_synapse‘. Other options can be set as they are set by default.

 

When the object is created, we can open it and go to Certificates & secrets to create a new secret.

As you can see below, we can set how long our secret is valid. Choose a proper value according to your organization’s policies:

After that, our secret will be visible, so you can save it securely, for example, in Azure Key Vault. Be careful because this secret is only visible once after the service principal’s creation.

 

When our service principal is ready, we can give it some permissions. On the dedicated SQL pool, we can add a user and assign it to the proper role (I use ‘db_owner‘ for demo purposes, but of course, as always, assign the minimal needed privileges).

 

CREATE USER [databricks_to_synapse] FROM EXTERNAL PROVIDER

sp_addrolemember 'db_owner','databricks_to_synapse'

We will use the same service principal to connect to our data lake, so I added it to the ‘Storage Blob Data Contributor‘ role (I also added the Synapse Analytics Managed Identity to the same role).

Ok, we are almost ready to perform our test. Let’s prepare some sample data on the Databricks side – I created a table called ’employee’ (you probably noticed three-level naming – it is because I have the Unity Catalog configured on my Databricks instance).

 

CREATE OR REPLACE TABLE main.demoschema.employee
(
  empcode      INT,
  empname      STRING,
  empsurname  STRING,
  country     STRING
);

Then let’s put some samples:

INSERT INTO  main.demoschema.employee VALUES
  (10, 'John','Doe' ,'USA'),
  (20, 'Jane', 'Doe', 'UK')

Everything works as expected:

Ok, we have data to send, so we can start configuring the connection from the Databricks side. Below you can see some session configurations where we provide:

Storage connection configuration:

  • fs.azure.account.auth.type: type of authentication, in our case it will be OAuth because we want to use a service principal.
  • fs.azure.account.oauth.provider.type: provider type.
  • fs.azure.account.oauth2.client.id: Application (client) ID that can be found in the properties of our service principal.
  • fs.azure.account.oauth2.client.secret: secret connected to the service principal. Please don’t hardcode it as I did – it is much better to use the Databricks secrets store or Key Vault.
  • fs.azure.account.oauth2.client.endpoint: endpoint URL with tenant ID that can also be found in the service principal’s properties.

Synapse Analytics (can be a different service principal; if not specified, then storage credentials will be used):

  • spark.databricks.sqldw.jdbc.service.principal.client.id: Application (client) ID that can be found in the properties of our service principal.
  • spark.databricks.sqldw.jdbc.service.principal.client.secret: secret connected to the service principal. Please don’t hardcode it as I did – it is much better to use the Databricks secrets store or Key Vault.
%python

spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type",  "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id", "bda516b4-1930-47a1-829c-ac22c947c818")
spark.conf.set("fs.azure.account.oauth2.client.secret", "rzI8Q~r3gr2CJ7J6PAeAYQGtJraS9VKcym3kDdpE")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/deca7fa5-bee9-4134-9d26-e211fb2fe976/oauth2/token")

spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.id", "bda516b4-1930-47a1-829c-ac22c947c818")
spark.conf.set("spark.databricks.sqldw.jdbc.service.principal.client.secret", "rzI8Q~r3gr2CJ7J6PAeAYQGtJraS8VKcym3kDdpE")

The above code sets those settings for the current session, but you can also specify them in the Init script of your cluster (see documentation).

We have the basic connection configuration, so let’s prepare a data frame with the data that we want to push – we will use the simple spark.sql method:

%python
dfData = spark.sql("SELECT * FROM main.demoschema.employee")

To write this data to the Synapse we have a dedicated method .write:

%python
(
dfData.write
    .format("com.databricks.spark.sqldw")
    .option("url", "jdbc:sqlserver://seequality-synapse-demo.sql.azuresynapse.net:1433;database=dsp;encrypt=true;trustServerCertificate=true")
    .option("enableServicePrincipalAuth", "true")
    .option("dbTable", f"dbo.Employees")
    .option("tempDir", f"abfss://data@adlssynapsestorage2022.dfs.core.windows.net/tmp/employees")
    .mode("overwrite")
    .save()
)

As you probably noticed there is many options that we can specify, above I used”

  • format – for Dedicated SQL Pool it will be com.databricks.spark.sqldw,
  • option(“url”) – connection string to dedicated sql pool,
  • option(“enableServicePrincipalAuth”) – setting that tells that we want to use Service Principal for both Storage and Synapse connections,
  • option(“dbTable”) – target table in Synapse,
  • option(“tempDir”) – location on the Storage where data will be saved as parquet file before it will be inserted into Synapse. It is very important because as you probably know best options to load data into Synapse is COPY and POLYBASE and both options require data to be loaded from storage and both support parquet. It is important to say that this temporary storage is not cleaned up after load so you have to do it manually or use some mechanisms like Lifecycle Management.
  • mode(“overwrite”) – mode that specifies how data will be inserted. We have few options here:
    • overwrite – target table will be recreated every time,
    • append – When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
    • errorifexists – When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.
    • ignore – Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected not to save the contents of the DataFrame and not to change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

Temporary location contains data partitioned in the following form:

<tempDir>/<yyyy-MM-dd>/<HH-mm-ss-SSS>/<randomUUID>/

Data is stored as a compressed parquet file:

When we execute above code you should see very important information – it is evidence that data will be loaded into Synapse using COPY from a temporary location (COPY will be used in newer Databricks Runtimes for older ones POLYBASE will be used):

After load we should see data in place:

What could be interesting for you is activity that take place behind the scenes. You can see what is happening using DMVs in Synapse or by going to Monitoring -> SQL requests like it is visible on the below screenshot:

 

A lot of queries were sent by Databricks – I chose most important to describe them.

SELECT CONVERT(varchar(200), DATABASEPROPERTYEX('dsp',
'ServiceObjective')) AS ServiceObjective

Above query is checking what service objective (performance tier) our database has.

IF OBJECT_ID('"dbo"."Employees"') IS NOT NULL BEGIN DROP TABLE "dbo"."Employees" END


SELECT CAST(CASE WHEN (OBJECT_ID('"dbo"."Employees"') IS NOT NULL) THEN 1 ELSE 0 END AS BIT)

Next queries check if target table exists. If it exists it will be dropped – this behaviour is strictly connected to the mode that I chose in Databricks – overwrite means drop and recreate.

SELECT co.name
FROM sys.columns AS co

LEFT JOIN sys.identity_columns AS ic
 ON co.object_id = ic.object_id
 AND co.column_id = ic.column_id
      

WHERE co.object_id = OBJECT_ID('"dbo"."Employees"')
AND ic.column_id IS NULL
ORDER BY co.column_id

Databricks is listing columns available in the target table.

CREATE TABLE "dbo"."Employees"
  (
  "empcode" INT, "empname" NVARCHAR(256), "empsurname" NVARCHAR(256), "country" NVARCHAR(256)
)
         
  WITH (CLUSTERED COLUMNSTORE INDEX, DISTRIBUTION = ROUND_ROBIN);

The table was dropped so in the next query it is created. Data types and other table definition options can be controlled from the Databricks side and I will show you later.

COPY INTO "dbo"."Employees" ("empcode","empname","empsurname","country")
FROM 'https://adlssynapsestorage2022.dfs.core.windows.net/data/tmp/employees/2023-01-06/15-28-02-119/f9f8db10-db23-4e3e-b0b4-2a087ef0bd82/'
WITH
(
  FILE_TYPE = 'PARQUET'
  ,CREDENTIAL = (IDENTITY = 'bda516b4-1930-47a1-829c-ac22c947c818@https://login.microsoftonline.com/deca7fa5-bee9-4134-9d26-e211fb2fe976/oauth2/token', SECRET = '***')
  ,COMPRESSION = 'Snappy',
  IDENTITY_INSERT = 'OFF',
  MAXERRORS = 0
)
    
  OPTION (LABEL = 'Databricks Batch Load; Container Build a959850: "dbo"."Employees"');

Above query is the most important one because it is performing data movement using COPY INTO. Please notice that secret used to authenticate is hidden.

SELECT * FROM "dbo"."Employees" WHERE 1 = 2
OPTION (LABEL = 'Databricks Resolve Columns; Container Build a959850')

During the process Databricks many times checks structures etc.

As I said previously we can change default behaviour and impact process of table creation. For example below you can see additional option called tableOptions where you can specify table index and distribution:

%python
(
dfData.write
    .format("com.databricks.spark.sqldw")
    .option("url", "jdbc:sqlserver://seequality-synapse-demo.sql.azuresynapse.net:1433;database=dsp;encrypt=true;trustServerCertificate=true")
    .option("enableServicePrincipalAuth", "true")
    .option("dbTable", f"dbo.Employees")
    .option("tableOptions","HEAP, DISTRIBUTION = ROUND_ROBIN")
    .option("tempDir", f"abfss://data@adlssynapsestorage2022.dfs.core.windows.net/tmp/employees")
    .mode("overwrite")
    .save()
)

After execution you can see that table was created with specified options:

CREATE TABLE "dbo"."Employees"
  (
  "empcode" INT, "empname" NVARCHAR(256), "empsurname" NVARCHAR(256), "country" NVARCHAR(256)
)
         
  WITH (HEAP, DISTRIBUTION = ROUND_ROBIN);

What is also important to know is that length of string columns also can be controlled. Many people asked me if in this scenario we will end up with NVARCHAR(250) and answer is no. You can specify max length of strings columns in maxStrLength property:

%python
(
dfData.write
    .format("com.databricks.spark.sqldw")
    .option("url", "jdbc:sqlserver://seequality-synapse-demo.sql.azuresynapse.net:1433;database=dsp;encrypt=true;trustServerCertificate=true")
    .option("enableServicePrincipalAuth", "true")
    .option("dbTable", f"dbo.Employees")
    .option("tableOptions","HEAP, DISTRIBUTION = ROUND_ROBIN")
    .option("maxStrLength","50")
    .option("tempDir", f"abfss://data@adlssynapsestorage2022.dfs.core.windows.net/tmp/employees")
    .mode("overwrite")
    .save()
)

It is very needed many times but if you want to specify based on different criteria then table an be created in advance and Databricks can load data in “append” mode.

CREATE TABLE [dbo].[Employees]
( 
    [empcode] [int]  NULL,
    [empname] [nvarchar](50)  NULL,
    [empsurname] [nvarchar](50)  NULL,
    [country] [nvarchar](50)  NULL
)
WITH
(
    DISTRIBUTION = ROUND_ROBIN,
    HEAP
)
GO

You can also communicate with Synapse not only by specifying table that you want to read but also sending custom query. To do that instead of dbTable option you can use “query” option like it is visible in the below picture.

What you probably notice, this operation is also using temporary storage and what is a little suprising POLYBASE is used for that! When you are reading data from Synapse no matter if you are using “dbTable” or “query” be careful to not read huge objects. Used driver has some options for predicate pushdowns – following operators can be pushed down into Azure Synapse:

  • Filter
  • Project
  • Limit

The Project and Filter operators support the following expressions:

  • Most boolean logic operators
  • Comparisons
  • Basic arithmetic operations
  • Numeric and string casts

So for example following code will push down filtering:

from pyspark.sql.functions import col

df = (
    spark.read.format("com.databricks.spark.sqldw")
    .option("url", "jdbc:sqlserver://seequality-synapse-demo.sql.azuresynapse.net:1433;database=dsp;encrypt=true;trustServerCertificate=true")
    .option("enableServicePrincipalAuth", "true")
    .option("dbTable", f"dbo.Employees")
    .option("tempDir", f"abfss://data@adlssynapsestorage2022.dfs.core.windows.net/tmp/employees")
    .load()
    .filter(col("empcode")==10)
)

On Synapse side we can see that table was really filtered on the source side – data was exported to temporary storage using CREATE EXTERNAL TABLE AS SELECT (CETAS):

CREATE EXTERNAL TABLE tmp_databricks_2023_01_07_12_25_54_180_ff70a1f541384107b0250220f6b094f7_external_table
WITH
(
    LOCATION = '/tmp/employees/2023-01-07/12-25-54-179/01b2b7ce-c278-4bad-bb84-9558675823da/',
    DATA_SOURCE = tmp_databricks_2023_01_07_12_25_54_180_ff70a1f541384107b0250220f6b094f7_data_source,
    FILE_FORMAT = tmp_databricks_2023_01_07_12_25_54_180_ff70a1f541384107b0250220f6b094f7_file_format,
    REJECT_TYPE = VALUE,
    REJECT_VALUE = 0
)
AS SELECT "empcode", "empname", "empsurname", "country" FROM (SELECT TOP 1001 * FROM (SELECT  * FROM (SELECT  * FROM "dbo"."Employees" ) AS "subquery_0"  WHERE (("subquery_0"."empcode" IS NOT NULL) AND ("subquery_0"."empcode" = 10))) AS "subquery_1" ) q 
OPTION (LABEL = 'Databricks Unload; Container Build a959850');

Please be aware that if you will write more complex filtering or logic and it will not be pushed then Synapse will push not filtered set to the temporary storage and Databricks will filter after which is not the behavior that we want. In such a scenario consider writing your SQL query directly.

We already said a few words about temporary storage used to exchange data those are not the only temporary objects. As we know COPY and especially POLYBASE needs some additional objects to be created and those are created by Databricks behind the scenes. We can notice in the system the following objects:

  • DATABASE SCOPED CREDENTIAL,
  • EXTERNAL DATA SOURCE,
  • EXTERNAL FILE FORMAT,
  • EXTERNAL TABLE.

Of course, those objects are dropped automatically but sometimes they can exist so we should remove them periodically. We can easily identify them because all of them have a name according to the following pattern tmp_databricks_<yyyy_MM_dd_HH_mm_ss_SSS>_<randomUUID>_<internalObject>. So we can easily identify and remove them.

As you can see it is not very complicated to work with Synapse and Databricks together and in many scenarios, we can benefit from both. It is especially important to know how to integrate both because, In 2021, Microsoft announced a strategic partnership with Databricks to integrate the company’s products with its Azure cloud platform, including Synapse Analytics. This collaboration allows customers to use Synapse Analytics to manage data pipelines and analyze data stored in Databricks, making it easier to work with large amounts of data in the cloud. The partnership also includes the integration of Databricks’ machine learning and artificial intelligence capabilities with Synapse Analytics, enabling users to build and deploy AI models more easily. Overall, the collaboration between Databricks and Synapse Analytics provides customers with a powerful and seamless data platform for all their analytics needs. It is really great information because, for me personally, both platforms are my favorites. For sure that is not all that I want to write about so stay tuned!

 

Leave a Reply