Dziś kolejny wpis o Data Factory w którym powiemy sobie o tym jak wykorzystać komponent Mapping Data Flow, który pozwala za pomocą graficznego interfejsu użytkownika budować procesy ETL. Jeśli dla kogoś ADF to całkiem nowe narzędzie to przed przeczytaniem tego artykułu odsyłam do poprzednich dwóch, które miały charakter wprowadzający do narzędzia (link1, link2). Zachęcam również do subskrypcji i polubienia SEEQUALITY na facebooku aby być na bieżąco ze wszystkimi publikowanymi treściami.
Mapping Data Flow to jeden z dwóch dostępnych komponentów w ramach ADF, które pozwalają w graficzny sposób wykonać proces transformujący dane. W rzeczywistości jest to graficzna nakładka na Spark’a, ale o tym zdążymy sobie jeszcze powiedzieć. Myślę, że ten konkretny komponent najłatwiej będzie przedstawić na przykładzie dlatego też stworzyłem nowy Pipeline’a i z dostępnych aktywności (Activities) wybrałem Data flow:
Po jego przeciągnięciu na okno designera naszym oczom powinno ukazać się okno Adding data flow gdzie możemy wskazać istniejący data flow, który ma być wykonany w ramach danego pipeline’a lub stworzyć nowy. Oznacza to, że te same data flow (w dalszej części artykułu będę używał skrótu DF) mogą być uruchamiane w różnych pipeline’ach co w połączeniu z możliwościami parametryzacji daje naprawdę fajne możliwości. Ze względu na to, że jeszcze nie mam stworzonego żadnego DF’a w poniższym oknie wybrałem Create new data flow:
Jak możecie zauważyć mamy do wyboru dwa rodzaje przepływów danych tj. Data flow oraz Wranging Data Flow. Bohaterem niniejszego artykułu jest pierwszy z nich, o Wrangling Data Flow postaram się jeszcze coś napisać w przyszłości natomiast na ten moment powiem tylko, że jest to mniej więcej ta sama technologia co Power Query dostępna m.in w Power BI i jak z pewnością zauważyliście na moment pisania niniejszego artykułu pozostaje ona w fazie preview. Po dodaniu Data Flow zostaniemy przeniesieni do okna tej aktywności gdzie będziemy mogli budować naszego ETLa, jednak zanim się tam przeniesiemy wrócimy do okna pipeline’a w którym osadziliśmy nasz data flow aby przejrzeć dostępne ustawienia. W sekcji General nie znajdziemy nic szczególnego i większość rzeczy omówiliśmy w poprzednim wpisie dlatego też zainteresowanych odsyłam do poprzedniego artykułu. Dla celów demonstracyjnych nadałem jedynie nazwę tj. Products. Ciekawsze rzeczy pojawiają się na zakładce Settings:
Te ustawienia omówimy sobie nieco bardziej szczegółowo:
- Data flow – to wskazanie który data flow będzie wywoływany,
- Run on (Azure IR) – wskazanie Integration Runtime czyli w skrócie ustawienie jakie zasoby będą wykorzystane do uruchomienia naszego DF. Domyślnie są to oczywiście zasoby chmury Azure ale możemy również mieć tzw. Self-Hosted Integration runtime czyli środowisko uruchomieniowe w naszej infrastrukturze – na ten moment zostawiłem domyśle ustawienia.
- Compute type – jak już wspomniałem nasz Data Flow pod spodem uruchamia klaster Apache Spark i to w tym miejscu wybieramy jaki typ klastra chcemy. Do wyboru mamy Compute Optimize, Memory Optimize lub General Purpose. Bez wgłębiania się w szczegóły wybieramy ostatnią opcję.
- Core count – kolejne ustawienie klastra gdzie możemy wskazać ilość core’ów – dla celów demonstracyjnych i deweloperskich najniższa liczba czyli 4 (+4 Driver cores) wystarczy w zupełności. Pamiętajmy, że płacimy za ten klaster i im potężniejszych zasobów będziemy używać tym większy będzie nasz rachunek.
- Logging level – poziom logowania gdzie None to brak logowania, Basic to logowanie podstawowe oraz Verbose to logowanie pełne, ja ustawiłem Basic.
- Polybase – jeśli naszym miejscem docelowym byłby Synapse to Data Flow będzie używał Polybase do ładowania danych i będziemy ją musieli skonfigurować (a raczej stagingowy Data Lake Storage do tymczasowego przechowywania plików). My nie będziemy w tej demonstracji używać Synapse więc zostawiamy to bez żadnych modyfikacji.
W ramach ogólnych ustawień mamy również możliwość dodania parametrów oraz dodatkowe opisowe właściwości użytkownika które póki co zostawiamy. O parametrach z całą pewnością pojawi się osobny, dedykowany artykuł gdyż jest to temat zbyt ważny aby potraktować go ogólnikowo.
Po dodaniu DF powinien on być widoczny w oknie Factory Resources w sekcji Data flows tak jak zostało to przedstawione na poniższym zrzucie ekranowym:
Gdy otworzymy sobie nasz DF zobaczymy graficzny designer w którym będziemy tworzyć sobie nasze przepływy:
Bardzo ważny jest przełącznik dostępny w górnej części ekranu o nazwie Data flow debugging:
Włączenie tego przełącznika spowoduje, że uruchomiony zostanie wspomniany już wcześniej klaster Apache Spark na którym będą wykonywane nasze transformacje zdefiniowane w DF. Uruchomienie trybu debugowania daje nam wiele możliwości np. podejrzenie danych na każdym etapie naszego przepływu itp. Jeszcze raz wspomnę, że musimy za czas działania naszego klastra płacić bez względu na to czy działa on w trybie standardowym czy też w trybie debugowania. Po włączeniu klaster będzie powstawał kilka minut i jak już wstanie jest do naszej dyspozycji – informuje nas o tym zielona ikona obok przełącznika. Jeśli ikona jest pomarańczowa tak jak na moim zrzucie oznacza to, że klaster nie był używany przez jakiś czas i niedługo się wyłączy. Kolor czerwony oznacza, że klaster został wyłączony gdyż był zbyt długo nieużywany.
Warto wspomnieć o tym, że oczywiście mamy wpływ na to jak będzie działał tryb debugowania poprzez jego ustawienia dostępne w górnej części ekranu jako Debug Settings:
Zazwyczaj nie chcemy zaciągać wszystkich danych podczas developmentu ponieważ albo będzie to nieporęczne albo nieefektywne, dlatego też w tym miejscu możemy wskazać ile wierszy ze źródła chcemy pobrać. Mamy również możliwość wskazania pliku zawierającego dane przykładowe które chcielibyśmy użyć do celów deweloperskich. Dodatkowo w tym oknie możemy ustawić wartości parametrów jeśli takowe zostały zdefiniowane:
Na potrzeby niniejszej demonstracji wrzuciłem na Data Lake Storage trzy pliki płaskie zawierające produkty, podkategorie i kategorie produktów:
Spróbujmy zatem pobrać je w designerze – aby to zrobić klikamy pole oznaczone jako Add Source dzięki czemu pojawi się nowy komponent potrzebny do zdefiniowania źródła danych. Po jego zaznaczeniu w dolnej części ekranu pojawi się okno Source settings gdzie możemy wybrać nazwę dla naszego zestawu danych. Jako Source Type wybieramy Dataset ponieważ zdefiniujemy sobie zestaw danych na podstawie pliku znajdującego się na Azure Data Lake Storage:
Na powyższym zrzucie możecie również zauważyć kilka dodatkowych opcji jak np.:
- Allow scehma drift – wskazanie, że schemat naszego źródła danych może się kiedyś zmienić – tak jest to możliwe w ADF, że struktura strumienia danych się zmieni – ustawienie to determinuje to czy dla nas jest to akceptowalne czy też ma zostać zwrócony błąd.
- Infer drifted column types – autodetekcja typów danych,
- Validate schema – zaznaczenie spowoduje, że jeśli dane pobrane ze źródła mają inny typ niż zdefiniowany to cały flow ma zwrócić błąd.
- Sampling – jeśli nie chcemy zaciągać całego zestawu danych to możemy wskazać w tym miejscu ile wierszy chcemy pobrać.
Ustawienia te zostawiłem tak jak na zrzucie i kliknąłem New aby dodać nowy Dataset. Dodawanie datasetu pokazywałem już w poprzednich wpisach ale dla przypomnienia poniżej trzy szybkie zrzuty ekranu pokazujące jak dodałem wskazanie do pliku płaskiego znajdującego się na Data Lake’u:
Następnie w analogiczny sposób dodałem Datasety i osadziłem je w moim Mapping Data Flow dla pozostałych dwóch plików:
Mamy zatem zdefiniowane źródła dla naszego przepływu. Zobaczmy jakie mamy dostępne opcje dla naszych źródeł:
Na zakładce Source options mamy następujące opcje:
- Wildcart paths – możemy podstawić gwiazdkę (*) aby załadować określony zestaw plików, czyli nadpisujemy ścieżkę zdefiniowaną w ramach Datasetu.
- Partition root path – o partycjonowaniu powiemy sobie w osobnym wpisie dlatego pozwolę sobie zostawić tą opcję jako czarną skrzynkę. W naszym testowym przepływie nie będziemy partycjonować więc zostawiamy to puste.
- Allow no files found – domyślnie jeśli ADF nie znajdzie żadnego pliku to zwróci błąd – jeśli nie chcemy aby tak się stało to możemy zaznaczyć tą opcję.
- List of files – bardzo ciekawa opcja po zaznaczeniu której ADF oczekuje, że wskazujemy plik zawierający ścieżki do plików które chcemy załadować (ścieżki relatywne wobec głównego kontenera na Data Lake Storage)
- Multiline rows – w przypadku gdy mamy długie stringi których wartości mogą rozciągać się na kilka linii to ta opcja musi być zaznaczona, a stringi muszą być w cudzysłowie.
- Column to store file name – jeśli chcemy dodać nową kolumnę w przepływie z nazwą pliku z której dany wiersz został odczytany do tutaj musimy podać jej nazwę.
- After completion – wskazanie co ma się stać ze źródłowym plikiem po zakończeniu przepływu – do wyboru oczywiście to, że nic ma się nie wydarzyć, pliki mogą być usunięte lub przeniesione do innej lokalizacji.
- Filter by last modified – możemy wskazać, że chcemy załadować tylko pliki które zostały zmodyfikowane w danym zakresie czasowym. Myślę, że ustawianie tego na sztywno ma raczej mniejsze zastosowanie niż parametryzacja tych ustawień więc póki co zostawiamy to puste.
O pozostałych zakładkach bardzo krótko:
- Projection – tutaj znajdziemy schemat naszych danych gdzie możemy przypisać określony typ do danej kolumny lub np. ustawić domyślny format daty.
- Optimize – w tym miejscu definiujemy partycjonowanie,
- Inspect – pozwala przejrzeć zdefiniowane typy danych kolumny itp.
- Data preview – jeśli mamy włączony tryb Debug to na tej zakładce będziemy mogli podejrzeć jak wyglądają nasze dane.
W dalszej kolejności będziemy chcieli połączyć ze sobą produkty z podkategoriami i kategoriami. Aby dodać kolejną transformacje klikamy małą ikonę plusa dostępną przy każdym źródle:
Ja rozpocznę od Products gdzie po kliknięciu ikony plusa pojawiła się cała gama dostępnych transformacji:
Jak widzimy opcji jest całkiem dużo i są one podzielone na sekcje (niewidoczne jest tutaj jedynie źródło, które definiujemy na samym początku):
- Multiple input/outputs – szereg transformacji pozwalający łączyć ze sobą źródła lub dystrybuować dane na kilka wyjść.
- Schema modifier – transformacje modyfikujące schemat czyli np. agregaty zmieniające poziom szczegółowości, wszelkie transformacje dodające lub usuwające kolumny, rankingi itp
- Row modifier – transformacje modyfikujące zestaw danych jak np. filtracja czy też sortowanie.
- Destination – tutaj definiujemy miejsce docelowe naszego przepływu.
Transformacji jest całkiem sporo i nie będę ich wszystkich omawiał, a zainteresowanych odsyłam do dokumentacji. W naszej demonstracji chcemy złączyć ze sobą trzy różne pliki dlatego wybierzemy sobie transformację Join:
Na powyższym zrzucie możecie dostrzec jak wygląda konfiguracja złączenia. Pierwsza opcja pozwala nazwać wyjściowy strumień danych. W dalszej kolejności wskazujemy który strumień danych będzie traktowany jako lewy zestaw do złączenia, a który jako prawy. Następnie ustawiamy typ złączenia gdzie do dyspozycji mamy złączenie pełne, wewnętrzne, jednostronne lub krzyżowe tak więc większość scenariuszy mamy tutaj dostępnych. W ostatniej sekcji podajemy kolumny po jakich ma nastąpić złączenie. Oczywiście jeśli istnieje taka potrzeba możemy dodać tutaj więcej niż jedną kolumnę jako klucz złączenia. Warty odnotowania jest również fakt, że możemy wykonać złączenie w oparciu o inny znak niż równość co nie jest standardem w narzędziach tego typu warto pamiętać o tej funkcjonalności.
W Data preview możemy podejrzeć (jeśli włączyliśmy Debug) jak wygląda nasz zestaw wyjściowy:
W polu designera możemy zobaczyć, że pojawiła się graficzna interpretacja naszych działań:
Pod spodem jest generowany kod w JSONie i co ważne sama interpretacja graficzna nie zapisuje w kodzie np. koordynatów w którym miejscu designera znajduje się która transformacja co było (i jest) bardzo irytującą cechą Integration Services gdzie każde najmniejsze przesunięcie zadania było odczytywane jako zmiana. Analogicznie do powyższego do zestawu danych dołączyłem kategorie. Poniżej możecie dostrzec w jaki sposób zostało to zaprezentowane graficznie:
Dalej w naszej demonstracji dodamy transformację Filter gdzie odfiltrujemy sobie produkty, które nie mają kategorii (oczywiście mógłbym to zrobić wybierając odpowiedni typ złączenia ale dla celów demonstracyjnych nieco dostosowałem ten scenariusz):
Konfiguracja zadania filtrującego również nie powinna nikomu sprawić problemu. Główne okno wygląda następująco:
Po wskazaniu źródłowego strumienia danych dla tego zadania i nadaniu nazwy możemy kliknąć pole Filter On które otworzy nam edytor wyrażeń dostępny w ramach Mapping Data Flow:
Edytor ten jest naprawdę dobrze zrobiony i jest wyposażony w bardzo fajnie działający Intellisense, który będzie podpowiadał nam składnie. Jeśli chodzi o język wyrażeń, który mamy tutaj dostępny to warto zapoznać się z dokumentacją (link). W przypadku naszej filtracji całe wyrażenie jest bardzo proste i wygląda następująco:
isNull(EnglishProductSubcategoryName)==false()
Przechodząc dalej dodamy sobie transformację Derived Column po to aby dodać nową kolumnę zawierającą aktualną datę i czas oraz nazwę docelowego pliku. Po dodaniu nowego komponentu nasz przepływ wygląda następująco:
Konfiguracja tego zadania również nie powinna sprawić problemów i sprowadza się do podania nazwy kolumny oraz odpowiedniego wyrażenia zwracającego pożądany rezultat. Do stworzenia kolumny z datą i czasem użyłem funkcji currentUTC(), a pliki docelowe będą zawierały nazwę kategorii produktu i rozszerzenie “txt”:
Nic nie stoi na przeszkodzie aby w ramach pojedynczego zadania Derived Column było dużo więcej kolumn kalkulowanych. Ostatnim elementem naszego przepływu będzie miejsce docelowe czyli Sink.
W kwestiach konfiguracyjnych Sink jest bardzo podobny do źródła i sprowadza się do zdefiniowania wejściowego strumienia danych oraz Datasetu do którego chcemy ładować dane:
Moim datasetem docelowym będzie plik płaski na Data Lake Storage. W przypadku ładowania do pliku płaskiego należy również skonfigurować zakładkę Settings:
To właśnie w tym miejscu możemy zdefiniować w jaki sposób ma powstać nasz plik docelowy. Jeśli wybierzemy Output to single file to wtedy powstanie jeden plik docelowy, ale to oczywiście nie jedyne wyjście. Możemy skonfigurować to w taki sposób aby powstało wiele plików docelowych zdefiniowanych według:
- wzorca,
- mechanizmu partycjonowania,
- wartości w określonej kolumnie.
Ja wybrałem tą ostatnią opcję i wskazałem kolumnę zawierającą kategorię + rozszerzenie “txt”, którą stworzyłem w zadaniu Derived Column nieco wyżej. Docelowo powinienem otrzymać tyle plików ile było kategorii w moim zestawie danych. Na zakładce Mapping mogę dokonać ręcznego mapowania jeśli zachodzi taka potrzeba jednak w większości przypadków możemy polegać na wbudowanym, automatycznym mapowaniu.
Teraz kiedy mój przepływ jest gotowy mogę wrócić do pipeline’a i spróbować go uruchomić. Mogę to zrobić oczywiście wybierając przycisk Debug dostępny w górnej części ekranu:
Po pewnym czasie powinniśmy otrzymać komunikat z informacją o tym czy wykonanie naszego DF się powiodło czy też nie – wszystkie te informacje znajdziemy w sekcji Output:
Nie pozostaje nic innego jak sprawdzenie czy dane zostały prawidłowo podzielone między pliki. Ze względu na fakt, iż ładowałem dane na Azure Data Lake Storage to użyłem narzędzia Storage Explorer aby zajrzeć do wskazanej przeze mnie lokalizacji. W tym miejscu warto zaznaczyć, że Storage explorer ma swoją wersję desktopową którą możecie zainstalować na swojej maszynie lub skorzystać z wersji webowej dostępnej na portalu Azure. Na poniższym zrzucie możecie zobaczyć, że rzeczywiście powstały 4 pliki – każdy z nich różni się nazwą i rozmiarem:
Dane również zostały w poprawny sposób rozdystrybuowane. To co jest potężną zaletą ADF to fakt, że prawie wszystko jest tutaj dynamiczne i automatycznie propagowane. Wyobraźmy sobie sytuacje, że mieliśmy gotowy przepływ ale po jego stworzeniu chcemy dodać pewien poziom agregacji. Poniżej możecie zobaczyć taką sytuację gdzie dodałem komponent Aggregate odpowiadający jak sama nazwa wskazuje za agregację danych:
Jego konfiguracja w dużej mierze sprowadza się do wskazania kolumn grupujących oraz funkcji agregacji:
Według powyższych ustawień będziemy mieli tylko trzy kolumny zawierające dane biznesowe: nazwę kategorii, podkategorii oraz liczbę produktów. To co czekałoby nas np. w Integration Services to przeklikanie wszystkich transformacji następujących po agregacji, a w ADF? Tutaj wszystko dzieje się automatycznie i po dodaniu tej transformacji możemy przejść do Sink (miejsca docelowego) i tam na zakładkę Inspect przejrzeć strukturę danych:
Jak widać mamy tylko te kolumny wynikowe naszej agregacji + 2 dodatkowe pochodzące z transformacji Derived Column. Takie zachowanie dostrzeżemy praktycznie zawsze chyba, że mamy zdefiniowane jakieś manualne działania, które muszą być dostosowane ręcznie. Co ciekawe po uruchomieniu przepływu gdy podejrzymy pliki to dane które trafiły są jak najbardziej poprawne:
Możliwości jest bardzo dużo i nie sposób je wszystkie pokazać w jednym artykule ale mam nadzieję, że pokazałem kilka podstawowych aspektów związanych z Mapping Data Flow w ramach Azure Data Factory. Z całą pewnością nie wyczerapliśmy tematu więc spodziewajcie sie kolejnych wpisów traktujących o ADF i Data Flow już wkrótce. Pozdrawiam!
- Executing SQL queries from Azure DevOps using Service Connection credentials - August 28, 2024
- Setup Git credentials for Service Principal in Azure Databricks - August 21, 2024
- Microsoft Fabric 101 Episode 3: Pausing and Scaling using portal and Powershell - August 8, 2024
Last comments