Ładowanie Azure Synapse przy pomocy COPY INTO

Jednym z najgorętszych tematów w świecie platformy danych Microsoft jest bez wątpienia Azure Synapse Analytics. Jest to usługa która wyewoluowała z Azure Datawarehouse i obecnie opiera się na kilku filarach gdzie z jednej strony mamy do czynienia z Dedicated SQL Pool czyli relacyjnym silnikiem SQL działającym w technologii MPP (Massive Parallel Processing), SQL Serverless gdzie w ogóle nie interesują nas zasoby, a jedynie wolumen danych jakie chcemy przetworzyć oraz  Spark Pool czyli osobny serwis oparty o Apache Spark. Całość wzbogacona o integrację z kontrolą wersji, pipeline’ami, data lake’iem oraz Power BI i kilkoma innymi usługami w ramach chmury Azure. Naprawdę jest tego dużo i ogólnie rzecz biorąc usługa daje ogromne możliwości analizowania, składowania i przetwarzania przeróżnych danych na kilka sposobów. Ze względu na fakt, iż Azure już dawno jest tematem numer jeden jeśli chodzi o moje techniczne zainteresowania to dziś chciałbym się z wami podzielić tym jak załadować dane do wspomnianego Dedicated SQL Pool. Oczywiście możliwości wykonania tego typu operacji jak zawsze jest kilka jednak ja dziś chciałbym się pochylić nad stosunkowo prostą, ale niezwykle efektywną opcją jaką bez wątpienia jest COPY INTO.

Do niedawna preferowaną metodą ładowania danych do Synapse był znany z SQL Server mechanizm o nazwie Polybase. Jest to technologia będąca niczym innym jak wirutalizacją danych gdzie w przypadku chmury możemy ładować dane z Data Lake Storage czy też BLOB Storage tworząc External Data Source itd.  Obecnie nieco prostszą metodą jest użycie bohatera niniejszego artykułu czyli komendy COPY INTO, która pozwala na bezpośrednie załadowanie danych do Synapse bez potrzeby tworzenia całego szeregu obiektów wymaganych przy Polybase.

Na wstępie warto zaznaczyć, że użytkownik(serwis) chcący wywoływać COPY INTO musi posiadać następujące uprawnienia w ramach bazy danych:

  • ADMINISTER DATABASE BULK OPERATIONS,
  • INSERT.

COPY INTO może załadować dane z następujących usług storage:

  • Azure BLOB Storage,
  • Azure Data Lake Storage Gen2.

W tym miejscu odrazu nasuwa się dobra praktyka jaką jest potrzeba ładowania danych najpierw na storage, a dopiero później do SQL Pool – jest to w większości przypadków preferowany model ładowania danych pozwalający osiągnąć najlepsze efekty. W przypadku usługi storage powszechnie znany fakt jest taki, że ADLS jest prefereowany dla workloadu analitycznego ze względu na wiele czynników jak chociażby na możliwość implementacji hierarchicznej struktury kontenerów, która wspiera szybkie wyszukiwanie interesujących nas danych. W zdecydowanej większości przypadków będziemy zatem ładować dane z lake’a ale warto wspomnieć, że ze względu na kompatybilność oraz fakt, że ADSL jest roszerzeniem BLOB’a to sposób podłączania się do obu usług jest niemal identyczny. W moim przykładzie będziemy odwoływać się do lake’a do którego możemy się dostać używając następujących metod uwierzytelnienia:

  • Shared Access Signature,
  • MSI – Managed Service Identity,
  • Service Principal,
  • Service Key,
  • Azure Active Directory.

Możliwości do wyboru jest całkiem dużo, a dodatkowo dane możemy trzymać w różnych typach plików:

  • CSV (skompresowany lub nie),
  • Parquet,
  • ORC.

Plik CSV raczej nie wymaga wyjaśnienia gdyż w świecie danych format ten stał się standardem i znajduje zastosowanie od dziesiątek lat. Ciekawe jest to, że dane zawarte w tego typu pliku mogą być dodatkowo skompresowane co skutkuje mniejszą objętością pliku i zwiększoną wydajnością odczytu. Plik parquet jest również opensource’owym standardem wchodzącym w skład ekosystemu Apache. Jest to format pliku charakteryzujący się składowaniem kolumnowym z naprawde wydajnym mechanizmami kompresji. Dzięki zastosowaniu tego typu formatu odczytywane są tylko te kolumny, które są dla nas istotne co pozwala zaoszczędzić czas potrzebny na odczyt danych ze storage’u. Cechy te czynią ten format szczególnie przydatnym w przypadku usług bezpośrednio korzystających z danych znajdujących się w plikach jak chociażby wspomniany we wstępie SQL Serverless. Zbliżonym formatem z nieco innymi charakterystykami jest Apache ORC, który również opiera się na kolumnowym składowaniu danych i znajduje szerokie zastosowania w różnych implementacjach big data (więcej na temat formatów plików itp. w linkach na końcu artykułu).

W porządku, po krótkim wstępie teoretycznym przejdźmy do demonstracji. Jak już wspomniałem powyżej polecenie COPY INTO nie wymaga tworzenia dodatkowych obiektów bazodanowych
i sam też takowych nie tworzy więc zanim będziemy dane ładować musimy przygotować tabelę docelową. Stwórzmy sobie zatem tabelę Date do której będziemy ładować dane kalendarzowe.
W ramach artykułu będziemy ładować dane pochodzące z zestawu danych NY Taxi (Microsoft udostępnił Azure Data Lake Storage gdzie znajdują się dane, których będziemy używać – link):

CREATE TABLE [dbo].[stage_Date]
(
    [DateID] int NOT NULL,
    [Date] datetime NULL,
    [DateBKey] char(10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [DayOfMonth] varchar(2) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [DaySuffix] varchar(4) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [DayName] varchar(9) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [DayOfWeek] char(1) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [DayOfWeekInMonth] varchar(2) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [DayOfWeekInYear] varchar(2) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [DayOfQuarter] varchar(3) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [DayOfYear] varchar(3) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [WeekOfMonth] varchar(1) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [WeekOfQuarter] varchar(2) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [WeekOfYear] varchar(2) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [Month] varchar(2) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [MonthName] varchar(9) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [MonthOfQuarter] varchar(2) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [Quarter] char(1) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [QuarterName] varchar(9) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [Year] char(4) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [YearName] char(7) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [MonthYear] char(10) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [MMYYYY] char(6) COLLATE SQL_Latin1_General_CP1_CI_AS NULL,
    [FirstDayOfMonth] date NULL,
    [LastDayOfMonth] date NULL,
    [FirstDayOfQuarter] date NULL,
    [LastDayOfQuarter] date NULL,
    [FirstDayOfYear] date NULL,
    [LastDayOfYear] date NULL,
    [IsHolidayUSA] bit NULL,
    [IsWeekday] bit NULL,
    [HolidayUSA] varchar(50) COLLATE SQL_Latin1_General_CP1_CI_AS NULL
)
WITH
(
    DISTRIBUTION = ROUND_ROBIN,
    HEAP
);

Warto zwrócić uwagę na to, że tabela jest zdefiniowana jako sterta i wskazujemy to w sekcji WITH (jeśli jawnie tego nie wskażemy to domyślnie tworzony jest COLUMNSTORE) oraz dystrybucja danych jest tutaj ustawiona jako ROUND_ROBIN czyli proporcjonalne wypełnienie danymi – indeksacja jak i dystrybucja to temat na zupełnie inny artykuł jednakże na ten moment mogę jedynie powiedzieć, że właśnie taki zestaw ustawień jest dobra praktyką dla tabel przejściwoych/stagingowych w przypadku Dedicated SQL Pool.

Mamy zatem naszą tabelę docelową i możemy przejść do testowania samego zapytania:

COPY INTO [dbo].[stage_Date]
FROM 'https://seequalitystorageaccount.blob.core.windows.net/nytaxi/data/Date/*.txt'
WITH
(
    CREDENTIAL =(IDENTITY = 'Managed Identity'),
    FILE_TYPE = 'CSV',
    FIELDTERMINATOR = ',',
    ROWTERMINATOR='0X0A',
    ENCODING = 'UTF8'
)
OPTION (LABEL = 'COPY : dbo.Date');

Powyższa składnia jest dosyć jasna i oczywista ale kilka jej elementów wymaga dodatkowego wyjaśnienia. W sekcji FROM zapytania wskazałem ścieżkę do kontenera, który chcę załadować – jeśli bym chciał to mogę po przecinku wskazać kilka kontenerów. We wskazanej przeze mnie lokalizacji znajduje się oczywiście wiele plików – zgodnie z hierarchicznością lake’a Synapse przeszuka wskazaną ścieżkę wraz z podfolderami wewnątrz. Bardzo ciekawy jest zapis OPTION (LABEL) gdzie możemy przypisać do naszego zapytania etykietę, którą później możemy wykorzystać m.in. do przeszukiwania obiektów systemowych związanych z naszym zapytaniem – naprawdę bardzo fajne udogodnienie!

W sekcji WITH zapytania wskazałem kilka dodatkowych właściwości istotnych z punktu widzenia ładowania jak chociażby typ pliku (FILE_TYPE), separator kolumn (FIELDTERMINATOR), separator wierszy czyli znak nowej linii (ROWTERMINATOR), oraz kodowanie pliku (UTF8). Oczywistym jest fakt, że to nie wszystkie możliwe opcje możliwe do wykorzystania ale ze względu na to, że plik z datami jest stosunkowo prosty to nie trzeba ich jawnie specyfikować. W zapytaniu widoczna jest również opcja Credential gdzie musimy wskazać w jaki sposób Synapse ma się dostać do wskazanej lokalizacji. W tym wypadku wykorzystałem Managed Identity czyli tożsamość Synapse’a (o tym jak nadać takie uprawnienie pisałem w poprzednich artykułach o Data Factory link).

Gdy struktura pliku się zgadza i Synapse jest w stanie dostać się do wskazanej lokalizacji wszystko powinno odbyć się bez większego problemu co też się stało w moim przypadku:

To oczywiście nie wszystkie przełączniki jakie oferuje COPY INTO – jest ich znacznie więcej:

COPY INTO [schema.]table_name
[(Column_list)] 
FROM '<external_location>' [,...n]
WITH  
 ( 
 [FILE_TYPE = {'CSV' | 'PARQUET' | 'ORC'} ]
 [,FILE_FORMAT = EXTERNAL FILE FORMAT OBJECT ]	
 [,CREDENTIAL = (AZURE CREDENTIAL) ]
 [,ERRORFILE = '[http(s)://storageaccount/container]/errorfile_directory[/]]' 
 [,ERRORFILE_CREDENTIAL = (AZURE CREDENTIAL) ]
 [,MAXERRORS = max_errors ] 
 [,COMPRESSION = { 'Gzip' | 'DefaultCodec'| 'Snappy'}] 
 [,FIELDQUOTE = 'string_delimiter'] 
 [,FIELDTERMINATOR =  'field_terminator']  
 [,ROWTERMINATOR = 'row_terminator']
 [,FIRSTROW = first_row]
 [,DATEFORMAT = 'date_format'] 
 [,ENCODING = {'UTF8'|'UTF16'}] 
 [,IDENTITY_INSERT = {'ON' | 'OFF'}]
)

W powyższym kodzie jasno widać, że możemy wskazać pierwszy wiersz w każdym pliku, metodę kompresji oraz obsługę błędów. Tą ostatnią opcję chciałbym również tutaj przedstawić gdyż jest ona dosyć ciekawa. W tym celu stworzyłem plik płaski który strukturą jest zgodny z innymi plikami zawierającymi daty:

19000101,1900-01-00 00:00:00.000,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,

Większość kolumn pozostawiłem puste jednakże nie to jest najwazniejsze, a celowy błąd znajdujący się w kolumnie numer dwa gdzie jako dzień wpisałem 00 co oczywiście powinno skutkować błędem i rzeczywiście tak też się stało gdy uruchomiłem COPY INTO:

Msg 13812, Level 16, State 1, Line 145
Bulk load data conversion error (type mismatch or invalid character for the specified codepage) for row 1, column 2 (Date) in data file /data/Date/MyTestFile.txt.

Jeśli jakaś liczba błędów jest dla nas akceptowalna to możemy użyć przełącznika MAXERRORS – np. poniżej dopuszczam wystąpienie jednego błędu:

TRUNCATE TABLE [dbo].[stage_Date]

COPY INTO [dbo].[stage_Date]
FROM 'https://seequalitystorageaccount.blob.core.windows.net/nytaxi/data/Date/*.txt'
WITH
(
    CREDENTIAL =(IDENTITY = 'Managed Identity'),
    FILE_TYPE = 'CSV',
	FIELDTERMINATOR = ',',
	ROWTERMINATOR='0X0A',
    ENCODING = 'UTF8',
	MAXERRORS = 1
)

Co prawda poprawne dane zostały załadowane ale dodatkowo dostałem informację, że jeden wiersz został odrzucony:

Query completed. Rows were rejected while reading from external source(s). 
1 row rejected from table [stage_Date] in plan step 4 of query execution:
	Bulk load data conversion error (type mismatch or invalid character for the specified codepage) for row 1, column 2 (Date) in data file /data/Date/MyTestFile.txt.

Komunikat jest dosyć klarowny i wskazuje nie tylko plik, z którego pochodzi błąd ale również kolumnę oraz wiersz. Tego typu komunikat może nam umknąć więc mamy jeszcze możliwość przekierować błędne wiersze do określonej lokalizacji służy do tego przełącznik ERRORFILE (oraz ERRORFILE_CREDENTIAL, który będzie użyty gdy wskażemy pełną ścieżkę do lokalizacji z błędami. Jeśli zrzut błędów ma znajdować się na tym samym storage co dane to należy podać ścieżkę względną poczynając od głównego kontenera podanego w sekcji FROM. Wtedy też do uwierzytelnienia użyty zostanie sposób wskazany we właściwości CREDENTIAL):

COPY INTO [dbo].[stage_Date]
FROM 'https://seequalitystorageaccount.blob.core.windows.net/nytaxi/data/Date/'
WITH
(
    CREDENTIAL =(IDENTITY = 'Managed Identity'),
    FILE_TYPE = 'CSV',
	FIELDTERMINATOR = ',',
	ROWTERMINATOR='0X0A',
    ENCODING = 'UTF8',
	ERRORFILE = '/errors'
)

Oczywiście całe zapytanie zwróciło błąd jednak tym razem problematyczny wiersz został przekierowany do wskazanej przez nas lokalizacji:

Jak możecie zauważyć na powyższym zrzucie ekranowym we wskazanej lokalizacji powstał kontener _rejectedrows i wewnątrz niego kolejny kontener wskazujący datę i czas uruchomienia zapytania. W tej lokalizacji do dyspozycji mamy dwa pliki gdzie plik Row zawiera rzeczywisty wiersz, który został odrzucony, a error tekst błędu.

Ciekawie zachowa się zapytanie w momencie gdy jednocześnie podamy MAXERROR większy niż zero oraz ERRORFILE. Wtedy też błąd razem z komunikatem zostanie przekierowany ale zapytanie nie zakończy się błędem. Ostatnią cechą związaną z obsługą błędów o jakiej chciałbym wspomnieć jest to, że nawet gdy nie wystąpi błąd to w lokalizacji zrzutu błędów powstanie kontener co jest dosyć specyficznym zachowaniem.

Przechodząc dalej powiedzmy sobie o ładowanie podzbioru kolumn oraz o tym jak zastępować wartości puste. Poniższe zapytanie pobiera z mojego testowego pliku jedynie dwie pierwsze kolumny (identyfikujemy je za pomocą nazw tabeli docelowej), dodatkowo dla kolumny DateBKey w przypadku występienia wartości pustej wstawiamy ‘#’:

COPY INTO [dbo].[stage_Date] (DateID,Date,DateBKey default '#')
FROM 'https://seequalitystorageaccount.blob.core.windows.net/nytaxi/data/Date/MyTestFile.txt'
WITH
(
    CREDENTIAL =(IDENTITY = 'Managed Identity'),
    FILE_TYPE = 'CSV',
	FIELDTERMINATOR = ',',
	ROWTERMINATOR='0X0A',
    ENCODING = 'UTF8'
)

Jeśli zatem przewidujemy już na tym etapie zastępowanie pustych wartości to warto rozważyć wykorzystanie tej funkcjonalności.

COPY INTO możemy wywoływać nie tylko z poziomu czystego TSQL ale również w ramach Azure Data Factory. Gdy zdecydujemy się na ADF mamy kilka spraw na które musimy zwrócić
uwagę – po pierwsze cały proces jest tak naprawdę obsługiwany przez Synapse, a ADF jest jedynie operatorem dlatego też w ramach Copy Activity jednostki DIU powinno być ustawione na 2 – podnoszenie tej wartości w żadnym wypadku nie zwiększa wydajności. Po drugie w zależności od rodzaju plików jakie ładujemy musimy odpowiednio skonfigurować Copy Activity.

W przypadku CSV są to następujące kwestie:

  • separator wierszy musi być ustawiony na pojedynczy znak lub \r\n,
  • wartość null musi być pozostawiona bez obsługi lub wstawiony powinien być pusty string,
  • kodowanie plików tak jak w standardowej komendzie COPY INTO musi być ustawione na utf-8 lub utf-16,
  • znak ucieczki (escapeChar) oraz znak cytowania stringów (quoteChar) musi być niepusty i dokładnie taki sam dla obu ustawień,
  • ilość linii do zignorowania musi być ustawiona na 0 lub pozostawiona na ustawieniach domyślnych,
  • kompresja może być wskazana na Gzip lub jej brak.

Jeśli chodzi o Parquet lub ORC musimy jedynie zadbać o wskazanie rodzaju kompresji zastosowanej w tych plikach. Ładowanie plików płaskich jest nieco bardziej skomplikowane dlatego też pokażmy sobie jak to zrobić.

Pierwszym krokiem jest oczywiście  stworzenie połączeń (Linked Service) do Data Lake Storage Gen2 oraz Synapse – operacja ta przebiega w standardowy sposób dlatego pozwoliłem sobie to pominąć. W dalszej kolejności na podstawie LS do ADSLG2 stworzyłem Data Set, którego konfiguracja wygląda następująco:

  • wskazałem kontener i podległe foldery, ale nie wskazałem żadnego pliku bo chcemy ładować wszystkie obiekty znajdujące się w tej lokalizacji,
  • jako Escape character oraz Quote Character ustawiłem backslash ponieważ muszę coś ustawić, a moje pliki nie zawierają ani jednego ani drugiego więc ustawiam znak będący neutralnym dla mojego zbioru,
  • Null value pozostawiam pusty więc wartości puste nie będą w żaden sposób obsługiwane.

Dataset wskazujący na tabelę w Synapse jest bardzo prosty i  sprowadza się do podania tabeli docelowej:

W Copy Activity wskazanie źródła(Source) wygląda następująco:

  • jako wskazanie pliku ustawiamy sparametryzowaną ścieżkę czyli Wildcard file path gdzie jedyne co musimy zrobić to wstawić gwiazdkę we wskazaniu pliku (tylko gwiazdka jest wspierana w tym scenariuszu).
  • Opcja Recursively musi być zaznaczona jeśli chcemy ładować pliki z podfolderów wskazanej lokalizacji.

Miejsce docelowe nie powinno sprawić problemów:

To właśnie w tym miejscu możecie zauważyć, że mamy możliwość wybrania metody ładowania – ja oczywiście zaznaczyłem Copy command. Następnie w Default values możemy wskazać wartości domyślne na tej samej zasadzie jak to robiłem w przypadku TSQL. Additional options służy do tego aby wskazać dodatkowe opcje, które mogliśmy użyć w sekcji WITH komendy COPY INTO. Reszta opcji jak zapewne zauważyliście pozostała bez zmian jedynie dodałem czyszczenie tabeli przed załadowaniem.

To by było na tyle jeśli chodzi o konfiguracje. Po uruchomieniu stworzonego pipeline’a dane powinny załadować się w prawidłowy sposób:

Jak już wspomniałem cały przepływ jest zarządzany przez Synapse i tak naprawdę użycie ADFa powoduje wygenerowanie odpowiedniego zapytania COPY INTO. Zapytanie to możemy podejrzeć wykorzystując wbudowany widok dynamiczny sys.dm_pdw_exec_requests:

SELECT *
FROM sys.dm_pdw_exec_requests
WHERE status in ('Completed','Failed','Cancelled')

lub poprzez portal przechodząc do SQL Pool i znajdując opcję Query activity:

Wykorzystując jedną z powyższych metod jesteśmy w stanie znaleźć interesujące nas zapytanie:

COPY INTO [dbo].[stage_Date] 
(
	 [DateID] 1
	,[Date] 2
	,[DateBKey] 3
	,[DayOfMonth] 4
	,[DaySuffix] 5
	,[DayName] 6
	,[DayOfWeek] 7
	,[DayOfWeekInMonth] 8
	,[DayOfWeekInYear] 9
	,[DayOfQuarter] 10
	,[DayOfYear] 11
	,[WeekOfMonth] 12
	,[WeekOfQuarter] 13
	,[WeekOfYear] 14
	,[Month] 15
	,[MonthName] 16
	,[MonthOfQuarter] 17
	,[Quarter] 18
	,[QuarterName] 19
	,[Year] 20
	,[YearName] 21
	,[MonthYear] 22
	,[MMYYYY] 23
	,[FirstDayOfMonth] 24
	,[LastDayOfMonth] 25
	,[FirstDayOfQuarter] 26
	,[LastDayOfQuarter] 27
	,[FirstDayOfYear] 28
	,[LastDayOfYear] 29
	,[IsHolidayUSA] 30
	,[IsWeekday] 31
	,[HolidayUSA] 32
	) 
	FROM 
	'https://seequalitystorageaccount.dfs.core.windows.net:443/nytaxi/data/Date/*' 
	WITH 
	(
		 IDENTITY_INSERT='OFF'
		,CREDENTIAL=(IDENTITY='Storage Account Key',SECRET='***')
		,FILE_TYPE='CSV'
		,COMPRESSION='NONE'
		,FIELDQUOTE='\'
		,FIELDTERMINATOR=','
		,ROWTERMINATOR='0x0A'
		,FIRSTROW=1
	) OPTION (LABEL='ADF Activity ID: 4062133e-ac79-4378-8393-9139687fd129')

Na powyższym zapytaniu możemy zobaczyć całą naszą konfigurację. Zwróćcie proszę uwagę na bardzo ważny aspekt – do uwierzytelnienia do Data Lake’a wykorzystany został Storage Account Key (ponieważ właśnie tą metodę uwierzytelnienia do źródła wykorzystałem w ADF w Linked Service), a sam klucz został ukryty i nie jest dostępny dla nikogo kto będzi eprzeglądał logi itd. Dodatkowo została przypisana etykieta z identyfikatorem aktywności ADF co też będzie można łatwo połączyć z konkretnym wywołaniem ADFa.

To by było na tyle jeśli chodzi o dzisiejszy artykuł. Jak widzicie ładowanie Synapse’a jest dosyć mocno uproszczone dzięki komendzie COPY INTO. Polecam potestować wszystko samemu i zapoznać się z poniższymy artykułami, które nieco poszerzą waszą wiedzę w tym zakresie. Pozdrawiam!

Linki:

 

 

Leave a Reply