Kontynuujemy nasz przygodę z Azure Data Factory. W ramach dzisiejszego artykułu powiemy sobie kilka słów o tym czym są tzw. Activities dostępne w ramach narzędzia i nieco bliżej przyjrzymy się jednemu z najważniejszych jakim bez wątpienia jest Copy Data Activity. Dla zainteresowanych tematyką ADF, którzy nie czytali poprzedniego artykułu (link) polecam się z nim zapoznać gdyż to właśnie tam pozwoliłem sobie wytłumaczyć kilka podstawowych aspektów związanych z omawianym narzędziem. Zachęcam Was również do subskrypcji aby pozostać na bieżąco z publikowanymi treściami.
W poprzednim artykule dowiedzieliśmy się, że pojedynczy przepływ w narzędziu odbywa się poprzez obiekt nazwany pipeline. Wspomniałem tam również, że pojedynczy pipeline jest podobnym (nieidentycznym) konceptem co pakiet (package) w Integration Services. W ramach pipeline możemy wykonywać wiele różnego rodzaju zadań i to właśnie te zadania nazywamy aktywnościami (activity). Stwórzmy sobie nowy pipeline będąc w przeglądarkowym edytorze ADF klikamy na ikonę plusa i z menu kontekstowego wybieramy Pipeline:
Spowoduje to dodanie nowego przepływu do naszego Data Factory. Jedyną rzeczą potrzebną do zdefiniowania przy tworzeniu tego obiektu jest podanie nazwy, ewentualnego opisu i ustawienie równoległości (Concurrency) czyli ile razy ten pipeline może zostać odpalony w tym samym czasie:
Na ten moment zostawiamy te ustawienia wraz z domyślnymi wartościami – w razie potrzeby istnieje możliwość ich zmiany w późniejszym czasie. Poszczególne przepływy są widoczne w sekcji Pipelines i mogą tam być grupowane w foldery tak jak możecie zobaczyć na poniższej animacji:
W ramach pojedynczego przepływu mamy cały szereg różnych aktywności czy też zadań które możemy wykonać. Są one widoczne w momencie gdy otworzymy pipeline w lewej części edytora:
Jak możecie zobaczyć na powyższym zrzucie wszystkie aktywności są pogrupowane w odpowiednie kategorie. Oprócz typowych zadań związanych z przerzucaniem danych (Move & Transform) mamy tam również ogólne zadania związane m.in z używaniem zmiennych (General) czy też różnego rodzaj pętle czy instrukcje warunkowe (Iteration & conditionals). Oprócz tego mamy szereg zadań związanych z innymi usługami wchodzącymi w skład chmury Azure jak możliwość uruchomienia Notebooka w Databricks czy też połączenie z HDInsight lub serwisem uczenia maszynowego.
Warty wspomnienia jest fakt, że w grupie General znajdziemy zadanie Execute Pipeline:
Tak więc z poziomu pojedynczego przepływu jesteśmy w stanie wywoływać inne dzięki czemu możemy zaimplementować swego rodzaju hierarchiczność przepływów lub chociażby sterować kolejnością ich wywoływania. W ramach niniejszego artykułu nie będziemy oczywiście omawiać każdej istniejącej aktywności tylko skupimy się na Copy data activity, który znajduje w sekcji Move & transform:
Aby użyć aktywności wystarczy przeciągnąć ją na pole designera:
Poszczególne zadania mogą być od siebie zależne tzn. kolejne zadanie może zostać wykonane tylko wtedy gdy inne zakończą się powodzeniem lub błędem. Do dyspozycji mamy strzałki, którymi możemy połączyć poszczególne zadania:
Zależności pomiędzy aktywnościami mogą nam nieco przypominać Precedence Constraint z SSIS jednakże tak naprawdę nimi nie są. Są to po prostu zależności i nic więcej.
Domyślny zielony kolor oznacza sukces ale mamy również:
- Failure – zadanie zostanie wykonane gdy zadanie poprzedzające zakończy się błędem:
- Completion – zadanie zostanie wykonane gdy zadanie poprzedzające zakończy się błędem lub sukcesem:
- Skipped – ten status najłatwiej będzie wytłumaczyć na przykładzie: Activity B zostanie wykonane tylko jeśli Activity A zakończy się sukcesem. W przypadku gdy Activity A zakończy się błędem to Activity B dostanie status Skipped, a co za tym idzie wykonane zostanie Activity C:
Warto również wymienić różnice gdy porównamy omawiane ADF Depenencies od SSIS:
- brak możliwości zamodelowania za pomocą strzałek wykonania poszczególnych zadań w zależności od wykonania wyrażenia (expression),
- brak możliwości wykonania logicznego OR np spójrzmy na poniższy obrazek:
Wyobraźmy sobie scenariusz, że Activity C ma być wykonane jeśli Activity A LUB Activity B zakończy się błędem. Powyższy połączenie zadań uniemożliwia zamodelowanie takiego przepływu ponieważ poszczególne strzałki zawsze działają na zasadzie logicznego AND czyli patrząc powyżej Activity C zostanie wykonane tylko jeśli oba poprzedzające zadania zakończą się błędem.
Oczywiście po kliknięciu prawym przyciskiem myszy na daną strzałkę możemy zmienić jej typ:
Rozwiązaniem tego problemu może być np. stworzenie kopii Activity C dla każdego z zadań:
lub dla przykładu wywołanie Activity A oraz Activity B w ramach jednego pipeline:
po to by wywołać je z poziomu innego pipeline i wywołać Activity C, które zostanie wywołane jeśli którykolwiek z komponentów wywołanego pipeline’a zakończy się błędem:
Dzięki dynamizmowi jaki oferuje ADF wiele przypadków użycia które mogą się wydawać niemożliwe do implementacji w rzeczywistości da się wykonać, o czym z całą pewnością zdążycie się przekonać korzystając z tego narzędzia.
Przejdźmy do wykorzystania konkretnej aktywności którą jest Copy data. W poprzednim artykule omawiającym Copy data Wizard tak naprawdę tworzyliśmy Copy data activity. Jeśli nie chcemy używać kreatora możemy wszystko skonfigurować od zera z poziomu narzędzia. Jak już wspomniałem Copy data znajduje się w sekcji Move & transform i po jego przeciągnięciu oraz zaznaczeniu naszym oczom ukażą się właściwości:
W sekcji General mamy kilka właściwości:
- Name – nazwa aktywności,
- Description – opis,
- Timeout – po jakim czasie wykonywania działanie ma zostać przerwane,
- Retry – po błędzie lub timeoutcie ile razy ADF ma próbować jeszcze raz wykonać zadanie,
- Retry interval – co ile sekund ADF ma ponawiać uruchomienie,
- Secure output/Secure input – jeśli zaznaczymy jedną lub drugą opcję to informacje o inpucie lub outpucie nie będą logowane co w niektórych szczególnie wrażliwych scenariuszach może być przydatne.
Druga sekcja tj. Source będzie wyglądało różnie w zależności od wybranego źródła danych – dla pliku płaskiego wygląda to następująco:
Dla pliku CSV wygląda to następująco:
- Source dataset – Tutaj wskazujemy źródłowy zestaw danych (lub tworzymy nowy).
- File path type – wskazujemy czy ścieżka do pliku już znajduje się w źródłowym zestawie danych, czy ładujemy listę plików.
- Filter by last modified – w tym miejscu możemy założyć filtr pozwalający załadować tylko ten plik/pliki które zostały zmodyfikowane w danym interwale czasowym.
- Recursively – czy mają być przeszukiwane podfoldery czy też nie. Jeśli dataset wskazuje na pojedynczy plik to ustawienie to nie ma znaczenia.
- Enable partition discovery – jeśli łączymy się np. do Data Lake Storage gdzie poszczególne kontenery (foldery) tworzą pewną strukturę partycji to możemy tą opcję zaznaczyć dzięki czemu dostaniemy dodatkowe kolumny wskazujące z którego kontenera został dany plik załadowany.
- Max concurrent connections – maksymalna liczba połączeń do źródła.
- Skip line count – ile linii ma zostać pominiętych.
Przechodząc dalej mamy Sink czyli miejsce docelowe. Tam również możemy wybrać istniejący Linked Service lub nawiązać połączenie od nowa. Podobnie jak w poprzednim artykule moim miejscem docelowym będzie Azure Synapse SQL Pool:
Szereg opcji dostępnych dla połączenia tego typu również nie powinien nikomu sprawić problemów:
- Copy method – jakim sposobem będziemy ładować dane do wybory oczywiście Polybase, Copy Into oraz Bulk Insert. W produkcyjnych rozwiązaniach zapewne użył bym Copy Into ale dla celów pokazowych wybrałem Bulk Insert.
- Table option – w niektórych przypadkach możliwe jest automatyczne stworzenie tabeli w miejscu docelowym – zanzaczyłem None ponieważ moja tabela docelowa istnieje.
- Pre-copu script – jak sama nazwa wskazuje jest to nic innego jak kod TSQL jaki chciałbym wykonać zanim wykona się operacja kopiowania – może to być np. TRUNCATE TABLE.
- Write batch timeout– timeout dla zapisu.
- Write batch size – rozmiar batcha jakim będą ładowane dane – co jest szczególnie istotne jak ładujemy dane np. do indeksu kolumnowego,
- Max concurrent connections – ile równoległych połączeń może zostać nawiązanych w ramach tego copy actiity,
- Disable performance metrics analytics – podczas wykonania tego zadania zbierane są pewne dane o tym wykonaniu w celu zbudowania pewnych rekomendacji – jeśli nie chcemy aby dane o wykonaniu były analizowane to możemy zaznaczyć ten checkbox.
Jeśli chodzi o Mapping to w ramach Copy Activity mamy ich kilka rodzajów:
- implicit mapping – ADF pomapuje kolumny automatycznie po nazwie,
- explicit mapping – użytkownik sam pomapuje poszczególne kolumny.
Może tutaj pojawić się pytanie po co w ogóle używać jawnego mapowania (explicit mapping). Sytuacji jest kilka m.in gdy nasze miejsce docelowe (Sink) posiada inne nazwy niż dane źródłowe lub gdy np. chcemy do pliku płaskiego zrzucić tylko podzbiór kolumn.
Poniższy zrzut pokazuje jak domyślnie wygląda okno mapowania – jeśli niczego nie zmienimy to nastąpi niejawne mapowanie (implicit mapping):
Po kliknięciu New mapping jesteśmy w stanie ręcznie pomapować źródło do miejsca docelowego kolumna po kolumnie:
Możemy również użyć przycisku Import schemas aby ADF wygenerował dla nas pełne mapowanie kolumn, a na tej podstawie sami zmienimy to co akurat nas interesuje:
Dobrym pomysłem jest jednak modyfikacja mapowania tylko wtedy jeśli chcemy zrobić coś niestandardowego – jeśli nie mamy niczego takie w zamiarze to lepiej polegać na “implicit mapping”. Kolejna zakładka czyli Settings jest naprawdę istotna:
- Data integration unit – DIU to zasoby jakie mogą być używane przez nasz Copy Activity. Nie mamy możliwości sterowania bezpośrednio dostępną pamięcią czy też CPU ale to właśnie DIU są ich odzwierciedleniem. Jak możecie zauważyć w komunikacie widocznym an zrzucie ekranowym w zależności od ilości użytych DIU i czasu trwania zadania będzie zależał nasz rachunek. Oprócz jawnej wartości możemy również wybrać Auto aby ADF wybrał odpowiednią wartość za nas, na czas developmentu myślę, że najmniejsza wartość czyli 2 będzie odpowiednia.
- Degree of copy parallelism – ustawienia równoległości.
- Data consistency verification – jeśli zanzaczymy tą opcję to ADF wykona dodatkowe sprawdzenie konsystencji.
- Fault tolerance – ustawienie mówiące co ma się stać np. z błędnymi wierszami:
- Enable staging – po zaznaczeniu możemy wskazać konto BLOB Storage na które ADF będzie tymczasowo przechowywał sobie dane podczas wykonywania operacji kopiowania:
To by było na tyle jeśli chodzi o ustawienia. W dalszej kolejności możemy zwalidować naszego pipeline’y:
Jeśli nie otrzymaliśmy żadnego komunikatu błędu to możemy uruchomić pipeline’a w trybie Debug::
Po chwili oczekiwania zobaczymy rezultat naszych działań – szczegóły wykonania możemy poznać klikając w małą ikonę okularów dostępną na zakładce Output:
Na tym ekranie możemy zobaczyć ile danych zostało odczytanych i zapisanych czy chociażby ile wierszy trafiło do miejsca docelowego plus standardowe statystyki jak czas trwania itp:
Jeśli pojawiłby się błąd wykonania to informacji na jego temat również powinniśmy szukać w właśnie w sekcji Output. Jeśli wszystko przebiegło zgodnie z oczekiwaniami możemy zapisać czy też opublikować nasz pipeline wybierając z belki przycisk Publish all:
Po opublikowaniu będziemy mogli dodać trigger do naszego pipeline’a czyli sposób uruchomienia ale o tym powiemy sobie w ramach osobnego artykułu. Na ten moment to wszystko – do usłyszenia!
- Avoiding Issues: Monitoring Query Pushdowns in Databricks Federated Queries - October 27, 2024
- Microsoft Fabric: Using Workspace Identity for Authentication - September 25, 2024
- Executing SQL queries from Azure DevOps using Service Connection credentials - August 28, 2024
Last comments