UpsertCopyActivityDataFactory_00

Azure Data Factory – Upsert w Copy Activity

Kilka dni temu bez większego szumu pojawiła się bardzo ciekawa aktualizacja działania Copy Activity wewnątrz Azure Data Factory. Chodzi mianowicie o fakt, iż aktywność ta potrafi już nie tylko w prosty sposób wstawić dane, ale również porównać je z tabelą docelową i w razie konieczności wstawić nowe wiersze lub uaktualnić istniejące czyli wykonać klasyczny UPSERT. Nowa funkcjonalność jest dostępna dla miejsc docelowych takich jak SQL Server, Azure SQL oraz Synapse. Operacja ta na tyle mnie zaciekawiła, że postanowiłem sprawdzić jak jest wykonywana pod spodem.

Do testów wykorzystamy bazę AdventureWorksLT, która jest dostępna w Azure SQL Database jako baza testowa. Baza ta została przeze mnie stworzona i będzie służyć zarówno jako źródło jak i miejsce docelowe naszej operacji kopiowania. Żeby mieć łatwą kontrolę nad tym jakie dane są pobierane przez ADF stworzyłem widok z prostym zapytaniem SELECT:

CREATE VIEW [dbo].[SourceCustomer]
AS
SELECT
CustomerID
,Title
,FirstName
,LastNAme
FROM [SalesLT].[Customer]
WHERE CustomerID<10
GO

Miejscem docelowym będzie nowa tabela, która ma identyczną strukturę kolumn jak tabela klientów w AdventureWorks:

CREATE TABLE [dbo].[TargetCustomer](
[CustomerID] [bigint] NULL,
[Title] [nvarchar](50) NULL,
[FirstName] [nvarchar](50) NULL,
[LastNAme] [nvarchar](50) NULL
);
GO

Te proste struktury powinny wystarczyć do przeprowadzenia testu. Przechodząc do ADF stworzyłem Linked Service do Azure SQL Database – operacja jest na tyle standardowa, że postanowiłem ją pominąć (zainteresowanych odsyłam tutaj). Jedyną rzecz jaką zrobiłem to dodałem parametry dla nazwy schematu oraz nazwy tabeli tak abym mógł użyć tego samego Linked Service oraz Dataset zarówno dla źródła jak i dla miejsca docelowego.

Samo Copy Activity po stronie źródła (Source) wygląda dosyć standardowo:

Po stronie miejsca docelowego (Sink) pojawia się nasza nowa opcja:

Po wybraniu Upsert mamy jeszcze możliwość wskazać czy chcemy aby struktury pośrednie były tworzone w TempDB oraz po jakiej kolumnie (kolumnach) ma być wykonane porównanie aby zdiagnozować czy dany wiersz pochodzący ze źródła jest nowy czy też nie. W ramach niniejszego artykułu przetestujemy sobie scenariusz zarówno z użyciem TempDB jak i bez niego.

Jeśli chodzi o konfigurację to właściwie to wszystko, brakuje nam jedynie sposobu przechwycenia aktywności jaką ADF wykonuje na bazie.  W tym wypadku mamy kilka możliwości jak chociażby Extended Events czy Query Store jednak w tym przypadku posłużę się tradycyjnymi widokami dynamicznymi w których owa aktywność powinna zostać zarejestrowana automatycznie. Zapytanie wygląda następująco:

SELECT
dest.text AS QueryText
,last_execution_time AS ExecutionTime
,deqs.execution_count AS ExecutionCount
,deqs.total_elapsed_time AS TotalElapsedTime
,deqs.total_worker_time AS TotalWorkerTime
FROM sys.dm_exec_query_stats as deqs
CROSS APPLY sys.dm_exec_sql_text(deqs.sql_handle) as dest
WHERE
deqs. last_execution_time >=DATEADD(MINUTE,-1,GETDATE())
AND dest.text NOT LIKE '%PlaceHolderForExclude%'
ORDER BY last_execution_time asc

Jak widzicie na powyższym kodzie wyświetlamy kilka podstawowych statystyk oraz tekst zapytania. Dodałem również filtrację aby do  wyniku trafiły tylko zapytania z ostatniej minuty oraz wykluczyłem samo zapytanie do DMV.

Mając już tak przygotowane środowisko uruchomiłem ADF:

Samo wykonanie nie trwało zbyt długo bo też przerzucaliśmy dosłownie kilka wierszy. Na wyjściu (OUTPUT) z tego zadania dostaliśmy następujące informacje:

"profile": {
"queue": {
"status": "Completed",
"duration": 1
},
"transfer": {
"status": "Completed",
"duration": 1,
"details": {
"readingFromSource": {
"type": "AzureSqlDatabase",
"workingDuration": 0,
"timeToFirstByte": 0
},
"writingToInterim": {
"type": "AzureSqlDatabase",
"workingDuration": 0
},
"writingToSink": {
"type": "AzureSqlDatabase",
"workingDuration": 0
}
}
}
},
"detailedDurations": {
"queuingDuration": 1,
"timeToFirstByte": 0,
"transferDuration": 1
},
"interimDataWritten": 266,
"interimRowsCopied": 7
}

Zwróćcie proszę uwagę na właściwości writingToInterim oraz interimDataWritten oraz intermRowsCopied – wygląda na to, że są to własności mówiące o tym, że dane zostały wstawione do jakiejś struktury pośredniej/tymczasowej. Jeśli chodzi o aktywność po stronie bazy to moje zapytanie wychwyciło już kilka ciekawych rzeczy.

Pierwsza rzecz na którą zwróciłem uwagę to fakt, że stworzona została pusta globalna tabela tymczasowa na podstawie mojej tabeli docelowej:

SELECT *
INTO [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a]
FROM [dbo].[TargetCustomer]
WHERE 1 = 2
UNION
SELECT *
FROM [dbo].[TargetCustomer]
WHERE 1 = 2;

W nazwie widzimy InterimTable (z tego co zauważyłem te słowa pojawiają się za każdym razem) plus pewnego rodzaju identyfikator, który mimo moich podejrzeń nie jest identyfikatorem RunId z ADFa tylko generowany jest w jakiś inny sposób.

W kolejnym kroku do tabeli dodana została autoinkrementalna kolumna BatchIdentifier typu BIGINT zapewne po to żeby móc zidentyfikować poszczególne batche w jakich przesyłane są dane:

ALTER TABLE [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a]
ADD BatchIdentifier BIGINT IDENTITY(1, 1);

Dalej widać, że mechanizm przeprowadza test spójności struktury tabeli tymczasowej (FMTONLY ustawione na ON pozwala zwrócić jedynie metadane z zapytania):

SET FMTONLY OFF;
SET FMTONLY ON;
SET FMTONLY ON;
SELECT TOP 0 *
FROM [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a];
SET FMTONLY OFF;
SET FMTONLY OFF;

Dalej mamy :

SELECT @@trancount;
SET FMTONLY ON;
SELECT *
FROM [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a];
SET FMTONLY OFF;
EXEC tempdb..sp_tablecollations_100
N'.[##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a]';

Znów kilka sprawdzeń + odwołanie do procedury systemowej która prawdopowodbnie sprawdza collation tabeli tymczasowej (prawdopodobnie ponieważ wygląda, iż nie jest ona udokumentowana).

W dalszej kolejności mamy operację, której zapewne się spodziewaliśmy czyli wykonanie akutalizacji na tych wierszach, które istnieją w tabeli docelowej:

UPDATE [dbo].[TargetCustomer]
SET
[Title] = [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].[Title],
[FirstName] = [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].[FirstName],
[LastNAme] = [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].[LastNAme]
FROM [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a]
WHERE [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].BatchIdentifier >= @start
AND [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].BatchIdentifier < @end
AND [dbo].[TargetCustomer].[CustomerID] = [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].[CustomerID];

Zwróćcie proszę uwagę, że następuje tutaj filtracja po batchach co daje nam informację o tym iż ogromne aktualizacje nie będą wykonywane w jednorazowej operacji co powinno wpłynąć pozytywnie na wydajność takich działań.

Ostatnią zarejestrowaną operacją jest wstawienie nowych wierszy:

INSERT INTO [dbo].[TargetCustomer]
([CustomerID],
[Title],
[FirstName],
[LastNAme]
)
SELECT [CustomerID],
[Title],
[FirstName],
[LastNAme]
FROM [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a]
WHERE [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].BatchIdentifier >= @start
AND [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].BatchIdentifier < @end
AND NOT EXISTS
(
SELECT *
FROM [dbo].[TargetCustomer]
WHERE [dbo].[TargetCustomer].[CustomerID] = [##InterimTable_c313c88b-ada9-4fca-bf1b-2dba6021f24a].[CustomerID]
);

To czego nie zarejestrowałem to z całą pewnością dwie operacje, które już działają w tempdb czyli wstawienie wierszy do tabeli tymczasowej oraz jej usunięcie. Wygląda to naprawdę ciekawie! Sprawdziłem również jak zachowa się mechanizm w przypadku gdy odznaczymy w ADF możliwość użycia tempdb co spowodowało konieczność wskazania w jakim schemacie ADF może utworzyć tabelę na potrzeby tymczasowego przechowywania danych. Tutaj należy odrazu zaznaczyć, że przy takiej konfigruacji ADF, a raczej konto na którym działa połączenie do SQL musi mieć uprawnienie do operacji DDL we wskazanym schemacie. Ponadto zaznaczyłem opcję związaną z blokowaniem tabeli przy wstawieniu co powinno skutkować dodaniem TABLOCKa:

Rezultat tego ćwiczenia jest taki, że ADF stworzył tabelę tymczasową w podanym przeze mnie schemacie. Chciałem też zobaczyć jak wygląda sytuacja z operacjami DDL czyli czy tabela tymczasowa jest usuwana wielokrotnie i tworzona na nowo. Aby dostać się do tych informacji stworzyłem DDL triggera, który loguje mi te informacje (na ten moment potraktujmy skrypt jako czarną skrzynkę gdyż mam w planach opisanie podobnego mechanizmu). W rezultacie dostałem informację, iż tabela została stworzona poprzez SELECT INTO  następnie ALTER dodający kolumnę z identyfikatorem batcha i na końcu usunięcie:

Aby potwierdzić niektóre ze swoich spostrzeżeń postanowiłem dodatkowo skonfigurować Self-Hosted Integration Runtime i połączyć się z ADF do mojej lokalnej instancji SQL Server gdzie mam pełny podgląd na to co się dzieje pod spodem. Tam po założeniu sesji Extended Events mogłem podejrzeć nieco więcej i tak mam już podgląd pod to jak wygląda wypełnienie danych w tabeli tymczasowej:

INSERT BULK [dbo].[InterimTable_446f7488-c0c3-4d30-b3a3-3f45897f2972]([CustomerID] BIGINT, [Title] NVARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [FirstName] NVARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS, [LastNAme] NVARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS) WITH(TABLOCK, CHECK_CONSTRAINTS);

Powyższe zapytanie zostało wykonane mnóstwo razy gdyż ustawiłem testowo rozmiar batcha na 1:

Samo wstawienie danych wyglądało nieco inaczej niż w przypadku Azure SQL ponieważ wykorzystany został sp_executesql:

EXEC sp_executesql
N'update [dbo].[TargetCustomer] set [Title] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[Title], [FirstName] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[FirstName], [LastNAme] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[LastNAme] from [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1] where [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].BatchIdentifier >= @start and [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].BatchIdentifier < @end and [dbo].[TargetCustomer].[CustomerId] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[CustomerId]',
N'@start bigint,@end bigint',
@start = 1,
@end = 10000001;

EXEC sp_executesql
N'update [dbo].[TargetCustomer] set [Title] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[Title], [FirstName] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[FirstName], [LastNAme] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[LastNAme] from [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1] where [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].BatchIdentifier >= @start and [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].BatchIdentifier < @end and [dbo].[TargetCustomer].[CustomerId] = [dbo].[InterimTable_3922348a-6cf5-49ec-847e-d96a901191b1].[CustomerId]',
N'@start bigint,@end bigint',
@start = 10000001,
@end = 20000001;

Powyżej widzimy polecenia UPDATE ale również analgiczne dwa zostały wykonane dla INSERT. Jak widać samo zapisywanie w batchach odnosi się do tego jak wstawiane są dane do tabeli pośredniej, a nie do tabeli docelowej co samo w sobie jest dosyć ciekawe.  Czyli tak naprawdę wbrew moim przypuszczeniom dane nie są wstawian/aktualizowane batch po batchu,a w ogromnych porcjach po 10 milionów 🙂 Należy zatem uważać w przypadku przerzucania dużej ilości danych gdyż całość może wpłynąć na wydajność naszego procesu. Przy mniejszych zbiorach nie powinno to oczywiście stanowić problemu.

To by było na tyle, mam nadzieję, że nieco przybliżyłem Wam co się dzieje pod spodem nowego mechanizmu UPSERT dostępnego w ramach Copy Activity. Pozdrawiam!

2 Comments

    • Oczywiście da się to zrobić trzeba np. wywołać zewnętrzny serwis jak Logic Apps czy SendGrid, który dokonuje fizycznej wysyłki.

Leave a Reply