4a316f02cf304d3a833a9b4c0d8062e9

Azure Databricks – orkiestracja notebooków z Multi-task jobs

Jednym z głównych zadań przy budowie wszelkiego rodzaju rozwiązań ETL czy innych procesów zasilających dane jest odpowiednia orkiestracja wykonań poszczególnych elementów procesu. Pod pojęciem orkiestracji kryje się w skrócie sekwencja zadań do wykonania oraz szereg zależności pomiędzy nimi. W przypadku chmury Azure i platformy Databricks mamy kilka możliwości implementacji tego typu procesu. Jednym z nich jest użycie głównego orkiestratora w Azure jakim jest Azure Data Factory, który sprawuje się świetnie w tego typu scenariuszach (pisałem o tym jakiś czas temu – link). Podejście z Data Factory jest tym które zazwyczaj preferuję jednakże zdarza się, iż nie ma możliwości go wykorzystać i wtedy można stworzyć jeden główny notebook który będzie uruchamiał inne notebooki poprzez wbudowane funkcjonalności takie jak %run lub dbuilts. Problemem w tym wypadku jest równoległość wykonania, która jest możliwa do implementacji ale wymaga troche gimnastyki. Z tego też powodu chciałbym dziś napisać kilka słów na temat nowego podejścia jakim jest Multi-task job. Mechanizm jobów był już wcześniej dostępny w Databricks jednakże obecna jego odsłona jest zdecydowanie bardziej rozbudowana i daje większe możliwości. Jakie? Zapraszam do zapoznania się z niniejszym wpisem.

Zanim przejdziemy do Databricks wytłumaczmy czym jest DAG czyli Directed Acyclic Graph. Jest to struktura(graf) składający się z węzłów połączonych ze sobą krawędzami. Słowo “directed” w nazwie oznacza “skierowany” i chodzi o to, że połączenie pomiędzy węzłami ma swój kierunek np. z węzła A do węzła B (a nie na odwrót). Acyclic oznacza, że nie mamy zapętlenia tzn. podążając z węzła do węzła nigdy nie trafimy na określony węzeł więcej niż jeden raz.  Być może brzmi to skomplikowania ale w rzeczywistości nie jest i dla kogoś kto używa takich narzędzi jak Airflow czy chociażby GIT nie powinno to sprawić problemu. Implementacją DAG jest np. struktura drzewiasta – przykład której można również zobaczyć na poniższym rysunku:

Opisywany w ramach dzisiejszego artykułu mechanizm Multi-task job również opiera się o DAG gdzie poszczególne węzły są po prostu pojedynczymi notebookami (najczęściej). W przypadku Multi-task job mamy do czynienia również z podziałem na DAG linearne oraz nielinearne. Linearność w tym wypadku oznacza sekwencję węzłów gdzie żaden z nich nie jest wykonywany równolegle tylko jeden po drugim. Nielinearność oznacza, że poszczególne węzły mogą być wykonywane równolegle (powyższe zestawienie przedstawia właśnie DAG nielinearny).

Zobaczmy zatem jak w praktyce możemy wykorzystać opisywaną funkcjonalność. Przejdźmy do workspace Databricks i tam przejdźmy do ustawień i Admin console:

Na zakładce Workspace Settings należy włączyć opcję Task orchestration in Jobs (UWAGA – opcja raz włączona nie jest możliwa do wyłączenia):

Po włączeniu będziemy mieli możliwość tworzenia jobów z wieloma zadaniami (taskami) wewnątrz. Ja będę chciał zademonstrować funkcjonalność na bardzo prostym przykładzie dlatego też przygotowałem 5 notebooków:

Każdy z nich zawiera bardzo prosty kod, składający się z pojedynczej instrukcji print oraz sleep:

import time
time.sleep(5)
print("Notebook 1 execution finished")

Zależności pomiędzy wykonaniem poszczególnych notebooków odzwierciedlają to co widzimy na przykładowym diagramie DAG z początku artykułu. Spróbujmy zatem coś takiego zaprojektować – w panelu po prawej stronie odnajdźmy zakładkę Jobs:

Jest ona dostępna zarówno w workspace Data Science & Engineering jak również w workspace Data Science. Będąc w odpowiednim miejscu możemy dodać nowy job:

Definicja pojedynczego zadania sprowadza się do:

  • nadania nazwy,
  • wskazania notebooka który ma być wykonywany,
  • wskazania klastra, który ma wykonać notebook przy czym możemy wybrać istniejący klaster lub klaster, który będzie powołany na potrzeby realizacji zadania,
  • przekazanie wartości parametrów jeśli takowe istnieją:

Dodatkowo każdy task w ramach joba ma kilka opcji zaawansowanych jak np.

  • możliwość dodania bibliotek potrzebnych do wykonania zadania,
  • ustawienia ponawiania w przypadku wystąpienia błędu,
  • timeout czyli maksymalny czas na wykonania zadania po którym zadanie zostanie przerwane:

Po skonfigurowaniu pojedynczego zadania możemy zacząć dodawać kolejne zrobimy to poprzez kliknięciu przycisku plusa dostępnego pod zadaniem (jeśli go niewidać oznacza to, że opcja którą ustawialiśmy wyżej nie została poprawnie ustawiona). Dodawanie kolejnego zadania nie różni się właściwie od poprzedniego kroku oprócz tego, że jesteśmy w stanie dodać zależność tzn wskazać zadanie które ma się wykonać przed danym zadaniem:

Po dodaniu wszystkich zadań otrzymałem następującą strukturę:

W prawej części ekranu mamy dostępne ustawienia całego joba:

Pierwszą z nich jest możliwość ustawienia harmonogramu uruchomień, który możemy przedstawić zarówno graficznie jak i używając składni CRON(link):

Dalej mamy ustawienia i informacje na temat klastra. Pamiętajmy, że jest możliwość wykorzystania różnych klastrów do różnych zadań dlatego też po najechaniu na wybrany klaster podświetlą nam się wszystkie zadania podłączone do tego klastra:

W dalszej kolejności mamy do dyspozycji Alerty czyli możliwość zdefiniowania adresu/adresów email do notyfikacji na temat wykonania joba. Ustawienie tego typu alertu jest bardzo proste i wygląda następująco:

Jak widać na powyższym zrzucie ekranowym możemy dostać powiadomienia zarówno o wystartowaniu, zakończeniu sukcesem jak i porażką. Mail który otrzymamy na wskazany adres email pochodzi z adresu azure-noreply@microsoft.com i wygląda następująco:

Ostatnim ustawieniem joba na które należy zwrócić uwagę jest Maximum concurrent runs czyli to ile jest dopuszczalnych równoległych wykonań tego samego joba. Nie należy tego mylić z równoległością zadań wewnątrz joba – tym sterujemy poprzez zależności między taskami – jeśli coś może być uruchomione równolegle to zostanie tak uruchomione.

Przejdźmy dalej i wystartujmy nasz job:

Po uruchomieniu wyskoczy nam pop-up widoczny na powyższym zrzucie gdzie mamy link do interkatywnego “podglądania” wykonania joba:

Zasymulujmy sobie błąd żeby zobaczyć jak poszczególne zadania zostaną wykonane. W Notebook 2 dopisałem następującą linijkę kodu:

assert False,"Some error happen"

Po uruchomieniu joba rezlutat końcowy wygląda następująco:

Jak widać to co mogło się wykonać to się wykonało, a taski zależne od Notebook2 nie zostały wykonane. Tego typu zależności to jedyne na ten moment interakcje jakie możemy ustawić – coś takiego jak uruchomienie taska gdy inny task się zakończył błędem nie jest możliwe do zdefiniowania. Podobnie nie jesteśmy w stanie przekazać rezultatu zwróconego z jednego taska do kolejnego. Dlatego też jeśli potrzebujemy bardziej złożonej interakcji pomiędzy poszczególnymi notebookami należy pomyśleć o wywoływaniu poszczególnych notebooków z poziomu innych notebooków poprzez wspomniane wcześniej %run lub dbutils.

Przetestujmy również przekazywanie parametrów. Aby to zrobić do notebooka 2 dodałem widget odpowiadający za parametry oraz umieściłem go w:

dbutils.widgets.removeAll()
dbutils.widgets.dropdown("paramTest", "1", [str(x) for x in range(1, 10)])
paramTest = dbutils.widgets.get("paramTest")

import time
time.sleep(5)
print(f"Notebook 2 execution finished")

dbutils.notebook.exit(paramTest)

Parametr przekazujemy ręcznie wprowadzając określone wartości:

Następnie po uruchomieniu i wybraniu określonego zadania możemy zobaczyć, że wartość została przekazana w poprawny sposób:

Jak już wspomniałem wyżej nie jesteśmy w stanie przekazać wartości z jednego notebooka do kolejnego – chyba, że rezultat działania odłożymy gdzieś na boku i po prostu odczytamy go w kolejnym zadaniu, ale to nie jest już funkcjonalność Jobów, a standardowe działanie w Sparku. Warto wspomnieć również o zestawie zmiennych systemowych, które możemy przekazać jako parametr:

  • {{job_id}} – identyfikator joba,
  • {{run_id}} – identyfikator uruchomienia,
  • {{start_date}} – data uruchomienia UTC
  • {{start_time}} – timestamp uruchomienia,
  • {{task_retry_count}} – numer ponowenia uruchomienia,
  • {{parent_run_id}} – identyfikator uruchomienia joba w którym znajduje się task,
  • {{task_key}} – nazwa taska będącego częścią multi-task job.

Przechodząc dalej ciekawe rzeczy możemy znaleźć po wejściu do joba. Znajdują się tam informacje związane z tym ile trwały wykonania, jakim rezultatem się zakończyły itp:

To co jest również istotne to fakt, iż cały job, a raczej jego definicja jest zapisana jako JSON:

Używając linii komend jesteśmy w stanie stworzyć taki job przesyłając odpowiedniego jsona – poniżej prosta komenda wykonująca to zadanie:

databricks jobs create --json-file create-job.json

To co możemy wyciągną c z interfejsu graficznego możemy również dostać z kodu dlatego też polecam zaznajomić się z Jobs CLI (link) oraz Runs CLI (link).

Jak widać mamy możliwość tworzenia harmonogramów wykonań wewnątrz platformy Databricks. Dzięki możliwości uruchamiania więcej niż jednego zadania możemy budować już nieco bardziej zaawansowane scenariusze. Sam osobiście nadal polecam użycie Azure Data Factory ale jeśli nie jest ono dla nas dostępne to Multi-Task Job wraz z %run oraz dbutils jak najbardziej spełnia swoją rolę. Pozdrawiam serdecznie.

Leave a Reply