UpsertInADF_00

Upsert operation in Azure Data Factory Copy Activity

Some time ago, without much fanfare, a very interesting update to the Copy Activity operation within Azure Data Factory appeared. Specifically, it’s about the fact that this activity can now not only simply insert data but also compare it with the target table and, if necessary, insert new rows or update existing ones, i.e., perform a classic UPSERT. The new functionality is available for destinations such as SQL Server, Azure SQL, and Synapse. This operation intrigued me so much that I decided to check how it is performed underneath.

For tests, we will use the AdventureWorksLT database, which is available in Azure SQL Database as a sample. This database was created by me and will serve both as the source and the destination of our copy operation. To have easy control over what data is being fetched by ADF, I created a view with a simple SELECT query:

CREATE VIEW [dbo].[SourceCustomer]
AS
SELECT
CustomerID
,Title
,FirstName
,LastNAme
FROM [SalesLT].[Customer]
WHERE CustomerID<10
GO

The destination will be a new table that has an identical column structure to the customers table in AdventureWorks:

CREATE TABLE [dbo].[TargetCustomer](
[CustomerID] [bigint] NULL,
[Title] [nvarchar](50) NULL,
[FirstName] [nvarchar](50) NULL,
[LastNAme] [nvarchar](50) NULL
);
GO

These simple structures should suffice for conducting the test. Moving on to ADF, I created a Linked Service to Azure SQL Database – the operation is so standard that I decided to skip it (interested parties are referred here). The only thing I did was add parameters for the schema name and table name so that I could use the same Linked Service and Dataset for both the source and the destination.

The Copy Activity on the source side (Source) looks quite standard:

On the destination side (Sink), our new option appears:

After selecting Upsert, we also have the option to specify whether we want intermediate structures to be created in TempDB and by which column (columns) the comparison should be made to diagnose whether a given row coming from the source is new or not. In this article, we will test the scenario both with and without the use of TempDB.

As for the configuration, that’s basically it, we only lack a way to capture the activity that ADF performs on the database. In this case, we have several options such as Extended Events or Query Store, but in this case, I will use traditional dynamic views in which such activity should be automatically registered. The query looks as follows:

SELECT
dest.text AS QueryText
,last_execution_time AS ExecutionTime
,deqs.execution_count AS ExecutionCount
,deqs.total_elapsed_time AS TotalElapsedTime
,deqs.total_worker_time AS TotalWorkerTime
FROM sys.dm_exec_query_stats as deqs
CROSS APPLY sys.dm_exec_sql_text(deqs.sql_handle) as dest
WHERE
deqs. last_execution_time >=DATEADD(MINUTE,-1,GETDATE())
AND dest.text NOT LIKE '%PlaceHolderForExclude%'
ORDER BY last_execution_time asc

As you can see in the code above, we display a few basic statistics and the query text. I also added filtering so that only queries from the last minute are included in the results, and I excluded the query to the DMV itself.

Having prepared the environment in this way, I launched ADF:

The execution itself did not take too long as we were literally transferring just a few rows. From the output (OUTPUT) of this task, we received the following information:

"profile": {
"queue": {
"status": "Completed",
"duration": 1
},
"transfer": {
"status": "Completed",
"duration": 1,
"details": {
"readingFromSource": {
"type": "AzureSqlDatabase",
"workingDuration": 0,
"timeToFirstByte": 0
},
"writingToInterim": {
"type": "AzureSqlDatabase",
"workingDuration": 0
},
"writingToSink": {
"type": "AzureSqlDatabase",
"workingDuration": 0
}
}
}
},
"detailedDurations": {
"queuingDuration": 1,
"timeToFirstByte": 0,
"transferDuration": 1
},
"interimDataWritten": 266,
"interimRowsCopied": 7
}

Please note the properties writingToInterim, interimDataWritten, and intermRowsCopied – it seems that these properties indicate that the data was inserted into some intermediate/temporary structure. As for the activity on the database side, my query has already caught a few interesting things.

The first thing I noticed is that an empty global temporary table was created based on my target table:

SELECT *
INTO [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a]
FROM [dbo].[TargetCustomer]
WHERE 1 = 2
UNION
SELECT *
FROM [dbo].[TargetCustomer]
WHERE 1 = 2;

In the name, we see InterimTable (from what I’ve noticed, these words appear every time) plus some kind of identifier, which despite my suspicions is not the RunId identifier from ADF but is generated in some other way.

In the next step, an auto-incrementing column BatchIdentifier of type BIGINT was added to the table, probably in order to identify the individual batches in which data is transmitted:

ALTER TABLE [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a]
ADD BatchIdentifier BIGINT IDENTITY(1, 1);

Further, it can be seen that the mechanism conducts a consistency test of the temporary table structure (FMTONLY set to ON allows only metadata from the query to be returned):

SET FMTONLY OFF;
SET FMTONLY ON;
SET FMTONLY ON;
SELECT TOP 0 *
FROM [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a];
SET FMTONLY OFF;
SET FMTONLY OFF;

Next:

SELECT @@trancount;
SET FMTONLY ON;
SELECT *
FROM [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a];
SET FMTONLY OFF;
EXEC tempdb..sp_tablecollations_100
N'.[##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a]';

Again, a few checks + a reference to a system procedure that probably checks the collation of the temporary table (probably because it seems to be undocumented).

Next, we have the operation we probably expected, which is the update of those rows that exist in the target table:

UPDATE [dbo].[TargetCustomer]
SET
[Title] = [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].[Title],
[FirstName] = [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].[FirstName],
[LastNAme] = [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].[LastNAme]
FROM [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a]
WHERE [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].BatchIdentifier >= @start
AND [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].BatchIdentifier < @end
AND [dbo].[TargetCustomer].[CustomerID] = [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].[CustomerID];

Please note that here filtering by batches occurs, which informs us that huge updates will not be performed in a single operation, which should positively affect the performance of such actions.

The last recorded operation is the insertion of new rows:

INSERT INTO [dbo].[TargetCustomer]
([CustomerID],
[Title],
[FirstName],
[LastNAme]
)
SELECT [CustomerID],
[Title],
[FirstName],
[LastNAme]
FROM [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a]
WHERE [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].BatchIdentifier >= @start
AND [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].BatchIdentifier < @end
AND NOT EXISTS
(
SELECT *
FROM [dbo].[TargetCustomer]
WHERE [dbo].[TargetCustomer].[CustomerID] = [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].[CustomerID]
);

What I didn’t record are certainly two operations that already work in tempdb, namely the insertion of rows into the temporary table and its deletion. It looks really interesting! I also checked how the mechanism behaves when we uncheck the option in ADF to use tempdb, which necessitated specifying in which schema ADF can create a table for temporary data storage. Here it should be immediately noted that with such a configuration, ADF, or rather the account on which the SQL connection operates, must have permission for DDL operations in the indicated schema. Moreover, I selected the option related to locking the table upon insertion, which should result in adding TABLOCK:

The result of this exercise is that ADF created a temporary table in the schema I provided. I also wanted to see what the situation looks like with DDL operations, i.e., whether the temporary table is repeatedly deleted and recreated. To access this information, I created a DDL trigger that logs this information (for now, let’s treat the script as a black box as I plan to describe a similar mechanism). As a result, I received information that the table was created via SELECT INTO, then ALTER adding a column with the batch identifier, and finally deletion:

To confirm some of my observations, I decided to additionally configure Self-Hosted Integration Runtime and connect from ADF to my local SQL Server instance where I have full visibility into what’s happening underneath. There, after setting up an Extended Events session, I was able to observe more, and now I have insight into what the data filling in the temporary table looks like:

INSERT BULK [dbo].[InterimTable_446f7488-c0c3-4d30-b3a3-3f45897f2972]([CustomerID] BIGINT, [Title] NVARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [FirstName] NVARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [LastNAme] NVARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS) WITH(TABLOCK, CHECK_CONSTRAINTS);

The above query was executed numerous times because I set the batch size to 1 for testing purposes:

The data insertion looked slightly different than in the case of Azure SQL because sp_executesql was used:

EXEC sp_executesql
N'update [dbo].[TargetCustomer] set [Title] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[Title], [FirstName] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[FirstName], [LastNAme] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[LastNAme] from [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1] where [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].BatchIdentifier >= @start and [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].BatchIdentifier < @end and [dbo].[TargetCustomer].[CustomerId] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[CustomerId]',
N'@start bigint,@end bigint',
@start = 1,
@end = 10000001;

EXEC sp_executesql
N'update [dbo].[TargetCustomer] set [Title] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[Title], [FirstName] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[FirstName], [LastNAme] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[LastNAme] from [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1] where [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].BatchIdentifier >= @start and [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].BatchIdentifier < @end and [dbo].[TargetCustomer].[CustomerId] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[CustomerId]',
N'@start bigint,@end bigint',
@start = 10000001,
@end = 20000001;

Above, we see UPDATE commands but also two analogous ones were executed for INSERT. As you can see, batch writing refers to how data is inserted into the intermediate table, not the target table, which in itself is quite interesting. So, contrary to my assumptions, data are not inserted/updated batch by batch, but in huge portions of up to 10 million 🙂 Therefore, one should be careful when transferring a large amount of data as it may affect the performance of our process. With smaller datasets, this should not, of course, be a problem.

That would be all, I hope I have brought you a bit closer to understanding what happens underneath the new UPSERT mechanism available as part of the Copy Activity. Regards!

2 Comments

  1. Thanks for the article. Yes, by the way, it’s worth remembering that in Azure SQL, tempdb has simple recovery, hence the potential performance benefits. By the way, I don’t understand why there isn’t an Azure SQL SKU that, at the cost of SLA or RPO, offers simple recovery. The closest engine in Azure is either IaaS or Synapse Dedicated Pool. Nonsense..

Leave a Reply