Wraz z nowym rokiem rozpoczynamy publikację nowych artykułów. Kolejnym tematem jaki chciałbym przedstawić są dwie bardzo ważne funkcjonalności związane z Azure Data Factory – chodzi mianowicie o aktywności Lookup oraz Foreach. Ci którzy nie znają ADFa polecam zapoznać się z poprzednimi artykułami na ten temat w celu wprowadzenia w tematyką:
- Azure Data Factory – Mapping Data Flow
- Azure Data Factory – Activities i Copy Data Activity
- Azure Data Factory – Copy Data Wizard
W opisywaniu funkcjonalności będziemy posiłkować się scenariuszem przepływu danych gdzie będziemy przerzucać dane z Azure Data Lake Storage do Azure SQL Database. Nie będziemy jednak tego robić standardowo, a na podstawie danych w tabeli – innymi słowy będziemy ładować te pliki których nazwa znajduje się w tabeli konfiguracyjnej. O tym jak stworzyć bazę oraz storage nie będę pisał ponieważ nie jest to przedmiotem niniejszego artykułu dlatego odsyłam do dokumentacji (link, link).
Przechodząc do meritum na storage mamy wrzucone trzy pliki csv, a mianowicie:
- Customer.csv
- InternetSales.csv
- Product.csv
Jeśli chcielbyście powtórzyć to ćwiczenie to są to po prostu wyeksportowane tabele o tych właśnie nazwach z AdventureWorksDW.
Baza danych zawiera następującą tabelę konfiguracyjną:
CREATE TABLE [dbo].[ETLConfiguration]( [ID] [int] IDENTITY(1,1) NOT NULL, [GroupName] [nvarchar](100) NULL, [SourceTableName] [nvarchar](100) NULL, [TargetSchemaName] [nvarchar](100) NULL, [TargetTableName] [nvarchar](100) NULL, PRIMARY KEY CLUSTERED ( [ID] ASC )); GO
Tabela zawiera tylko trzy tabele wiersze:
SET IDENTITY_INSERT [dbo].[ETLConfiguration] ON GO INSERT [dbo].[ETLConfiguration] ([ID], [GroupName], [SourceTableName], [TargetSchemaName], [TargetTableName]) VALUES (1, N'Facts', N'InternetSales', N'dwh', N'FactInternetSales') GO INSERT [dbo].[ETLConfiguration] ([ID], [GroupName], [SourceTableName], [TargetSchemaName], [TargetTableName]) VALUES (2, N'Dimensions', N'Customer', N'dwh', N'DimCustomer') GO INSERT [dbo].[ETLConfiguration] ([ID], [GroupName], [SourceTableName], [TargetSchemaName], [TargetTableName]) VALUES (3, N'Dimensions', N'Product', N'dwh', N'DimProduct') GO SET IDENTITY_INSERT [dbo].[ETLConfiguration] OFF GO
Jak już zapewne się domyśliliście tabela ta pełni rolę uproszczonego mappingu gdzie wskazujemy nazwę pliku w kolumnie SourceTableName, oraz miejsce docelowe w relacyjnej bazie danych w postacie schematu (TargetSchemaName) oraz tabeli (TargetTableName).
Wiemy już jak wygląda nasz scenariusz testowy przejdźmy zatem dalej i w ramach Data factory stwórzmy połączenie do Azure SQL Database. Przechodzimy zatem na zakładkę Manage:
Bedąc w sekcji Linked services klikamy w ikonę New dzięki czemu będziemy w stanie dodać nowy połączenie do wybranego przez nas serwisu:
W możliwych do połączenia serwisach wyszukujemy Azure SQL Database:
W szczegółach połączenia do bazy wybieramy adres serwera i nazwę bazy do której będziemy się łączyć oraz sposób uwierzytelnienia. Ja wybrałem SQL Authentication co z całą pewnością nie jest najlepszym wyborem ale do naszych celów testowych jest to jak najbardziej w porządku:
Drugim serwisem jaki dołączymy jest Azure Data Lake Storage:
Do ADLS połączyłem się używając Account Key:
Po stworzeniu połączeń do serwisów możemy przejść do stworzenia nowego pipeline’a. Pierwszą rzeczą jaką dodamy jest aktywność Lookup którą znajdziemy w sekcji General:
Transformacja ta posłuży nam do pobrania z bazy danych listy plików z tabeli konfiguracyjnej, które mają zostać załadowane. Pierwsze okno konfiguracyjne tej aktywności nie jest zbytnio skomplikowane i przedstawia się następująco:
Mamy zatem nazwę, opis, timeout które nie wymagają dodatkowego wytłumaczenia. Retry oznacza ilość prób jakie ma podjąć ADF w przypadku wystąpienia błędu podczas wykonania oraz ile ma upłynąć czasu pomiędzy poszczególnymi próbami (Retry interval). Dodatkowo możemy zaznaczyć, że dane wejściowe oraz wyjściowe do tej konkretnej aktywności zawierają dane które mają nie trafić do logów. Nieco ciekawsze opcje pojawiają się na zakładce Settings. W tym miejscu wskażemy zestaw danych (dataset) który będzie zawierał nasze pliki do załadowania – póki co takiego nie mamy więc klikamy New:
Nasz zestaw będzie opierał się o wcześniej zdefiniowane połączenie do Azure SQL Database więc taki też serwis wybieramy:
Po nadaniu nazwy oraz po wybraniu zdefiniowanego wcześniej połączenia mamy możliwość wybrania tabeli z której będziemy czytać dane. Na ten moment na sztywno wpiszemy dbo.ETLConfigruation czyli nazwę naszej tabeli konfiguracyjnej- w późniejszym czasie użyjemy do tego parametrów:
Aby przetestować to czy wszystko działa jak należy uruchomimy naszego pipeline’a w trybie Debug:
Jeśli wszystko przebiegło zgodnie z planem to w sekcji Output znajdziemy status. Obok nazwy naszej aktywności możemy zauważyć dwie ikony gdzie pierwsza pozwala podejrzeć co było na wejściu, a co na wyjściu:
Po kliknięciu drugiej opcji zobaczymy dane zwrócone przez nasz Lookup Activity w postaci JSONa:
Na powyższym zrzucie ekranowym możecie zauważyć, że zwrócona wartość wiersza o ID=1, GroupName=Facts, SourceTableName=InternetSales, TargetSchemaName=dwh, TargetTableName = FactInternetSales. Dlaczego został zwrócony tylko jeden wiersz? A no dlatego, że w oknie konfiguracyjnym naszej aktywności mieliśmy zaznaczoną opcję First Row Only. My chcemy pobrać więcej niż jedną wartość więc powinniśmy ją odznaczyć co też uczyniłem:
Po odznaczeniu wspomnianej opcji Lookup zwrócił zgodnie z oczekiwaniami trzy wiersze, które znajdują się w węźle value co jest dosyć istotne w przypadku odwoływania się do wartości zwracanych przez tą aktywność (o tym za chwilę):
{ "count": 3, "value": [ { "ID": 1, "GroupName": "Facts", "SourceTableName": "InternetSales", "TargetSchemaName": "dwh", "TargetTableName": "FactInternetSales" }, { "ID": 2, "GroupName": "Dimensions", "SourceTableName": "Customer", "TargetSchemaName": "dwh", "TargetTableName": "DimCustomer" }, { "ID": 3, "GroupName": "Dimensions", "SourceTableName": "Product", "TargetSchemaName": "dwh", "TargetTableName": "DimProduct" } ], "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (West Europe)", "billingReference": { "activityType": "PipelineActivity", "billableDuration": [ { "meterType": "AzureIR", "duration": 0.016666666666666666, "unit": "DIUHours" } ] }, "durationInQueue": { "integrationRuntimeQueue": 0 } }
Przejdźmy dalej czyli dodajmy do pipeline’a pętlę Foreach i połączymy obie aktywności tak jak na poniższym zrzucie (tzn. pętla ma się wykonać wtedy gdy Lookup zakończy się sukcesem):
Zanim przejdziemy do konfiguracji samej pętli wejdźmy do jej wnętrza (zrobimy to klikając ikonę ołówka znajdującą się wewnątrz aktywności widoczną na powyższym zrzucie ekranowym). Nasza pętla będzie miałą za zadanie załadować dane z Data Lake’a do SQL dlatego dodajemy Copy Activity i nadajemy nazwę FlatFileToSQL:
Jako Source wskażemy plik csv znajdujący się na ADSL – nie mamy na ten moment stworzonego datasetu wybieramy zatem New:
Następnie wybieramy Azure Data Lake Storage Gen2:
Format to oczywiście DelimitedText:
Nazwę zestawu danych zdefiniowałem jako CsvOnADLS, podpiąłem wcześniej zdefiniowany Linked Service. Na pierwszy rzut podpiąłem plik Customer.csv i wskazałem, że w pierwszym wierszu znajduje się nagłówek. Pozostałe ustawienia pozostawiłem z domyślnymi wartościami (ADF powinien sobie poradzić z rozpoznaniem struktury pliku, jeśli tak się nie stanie to można kliknąć Advanced i dostosować wybrane opcje):
Zdefiniowaliśmy źródło w ramach Copy Activity dlatego możemy przejść do miejsca docelowego czyli zakładki Sink. W tym miejscu nie będziemy tworzyć Datasetu ponieważ mamy już takowy – chodzi mianowicie o ten który jest używany w aktywności Lookup (na ten moment dataset wskazuje na tabelę konfiguracyjną ale jak już wspominałem będziemy mogli to parametryzować):
Mamy wstępną konfigurację Copy Activity wewnątrz pętli wróćmy zatem krok wyżej i przejdźmy do konfiguracji pętli Foreach. Aby przejść poziom wyżej można użyć menu nawigacyjnego dostępnego w górnej części okna:
We właściwości Foreach na zakładce Settings znajduje się właściwość Items gdzie będziemy mogli napisać wyrażenie wskazujące zestaw danych po której iterować ma pętla. W tym miejscu pierwszy raz możemy zetknąć się z językiem wyrażeń wbudowanym w ADF. Do dyspozycji mamy cały szereg różnego rodzaju funkcji i co najważniejsze możemy przy jego pomocy przechwycić wartości wyjściowe pochodzące z innych aktywności (nie tylko tych poprzedzających ale wszystkich). My chcemy iterować po zbiorze zwróconym przez aktywność Lookup dlatego moje wyrażenie przyjeło formę
@activity('Get List Of Files').output.value
Słówko output w powyższym kodzie oznacza wartość wyjściową ze wskazanej aktywności, a value to nic innego jak odpowiedni węzeł w wyjściowym JSONie (jeśli chcecie go zobaczyć przewinijcie ekran do góry gdzie wrzuciłem cały JSON wyjściowy z aktywności Lookup):
Na powyższym obrazku widzimy jeszcze dwie dodatkowe właściwości gdzie Sequential oznacza, iż chcemy iterować sekwencyjnie tzn jeden po drugim, Batch count oznacza, że możemy wykonywać wiele iteracji na raz i tutaj możemy wskazać ile równolegle iteracji chcemy wykonać równolegle.
Pamiętamy, że Dataset który jest oparty o Azure SQL jest wykorzystywany zarówno przez Lookup jak i Copy Activity wewnątrz pętli. Za każdą iteracją pętli będziemy odczytywać inny plik i ładować go do innej tabeli docelowej dlatego też zestaw danych musi zostać zparametryzowany. Aby to zrobić wystarczy otworzyć interesujący nas zestaw danych i tam w sekcji Parameters dodać parametry – w moim przypadku będzie to schemat oraz nazwa tabeli, oba parametry są oczywiście typu tekstowego, a wartość domyślna nie ma w tym wypadku większego znaczenia ponieważ zawsze będzie ona nadpisywana przez odpowiednie aktywności pipeline’a:
Parametry zostały zdefiniowane ale jeszcze nie są używane w naszym zestawie danych dlatego też przechodzimy na zakładkę Connection i tam przy wskazaniu tabeli zaznaczamy Edit, następnie wskazujemy odpowiednie parametry tak jak na poniższym obrazku:
Tak jak w przypadku zestawu danych do Azure SQL tak i zestaw danych do CSV musi zostać zparametryzowany w tym przypadku będzie to parametr odpowiadający nazwie pliku na Azure Data Lake:
Na zakładce Connection podobnie jak w poprzednim przypadku jako nazwy pliku używamy powyższego parametru:
Wracamy do aktywności Lookup i ze względu na fakt, iż dodaliśmy parametry do datasetu z którego korzysta ta aktywność to musimy im przypisać wartości. Wygląda to następująco:
W dalszej kolejności przechodzimy do Copy Activity i na zakładce Sink do parametrów wartości pochodzące z pętli. Do bieżącego elementu iteracji pętli możemy się odwołać wykorzystując obiekt item() tak przedstawiłem to poniżej. Obiekt ten posiada takie właściwości jakie mają obiekty przekazane do pętli. Dodatkowo we właściwości Pre-copy script wpisujemy wyrażenie, które wyczyści nam tabelę którą aktualnie ładujemy:
@{concat('TRUNCATE TABLE ',item().TargetSchemaName,'.',item().TargetTableName)}
Kiedy to wszystko jest gotowe możemy uruchomić nasz przepływ aby zobaczyć czy wszystko jest w porządku. Pniżej możecie zobaczyć, że na zakładce Output mamy log gdzie możemy prześledzić co i w jakim czasie się wykonało oraz jakie były wartości wejściowe/wyjściowe:
Powyżej możecie zauważyć, że wykonała się aktywność Lookup, później Foreach, a następnie trzykrotnie Copy activity (ze względu na fakt, że zbiór po którym iterowaliśmy zawierał trzy elementy). Dodatkowo aby mieć pewność, że wszystko jest w porządku to możemy sprawdzić w bazie docelowej czy konkretne tabele zawierają dane:
Jak możecie zauważyć Data Factory daje wiele możliwości dynamicznego ładowania danych. To co w np. Integration Services było nie do osiągnięcia przy pomocy standardowych komponentów tutaj dostajemy z pudełka i wiele scenariuszy może być realizowanych właśnie w taki dynamiczny sposób. Z całą pewnością ilość wzorców ładowania danych w ADF jest zdecydowanie większa i postaram się je przedstawić w ramach kolejnych artykułów. Na ten moment pozdrawiam i zachęcam do subskrypcji.
- Avoiding Issues: Monitoring Query Pushdowns in Databricks Federated Queries - October 27, 2024
- Microsoft Fabric: Using Workspace Identity for Authentication - September 25, 2024
- Executing SQL queries from Azure DevOps using Service Connection credentials - August 28, 2024
Last comments