Uruchamianie notebooka Databricks z poziomu Azure Data Factory

Databricks posiada swoje własne możliwości orkiestracji procesów poprzez wywoływanie jednych notebooków z poziomu innych co możemy osiągnąć przy pomocy komendy %run lub dbutils.notebook.run. Często jednak zdarza się tak, że notebook jest jedynie częścią większego procesu i wtedy orkiestrację musimy przekazać do innego narzędzia takiego jak np. Azure Data Factory. Na szczęście oba narzędzia integrują się w dosyć prosty i przejrzysty sposób co chciałbym pokazać w ramach dzisiejszego wpisu do lektury którego serdecznie zapraszam.

Dla ukazania integracji obu narzędzi stworzyłem notebook, który odczytuje przykładowe dane NYTaxi (domyślnie znajdziecie je w systemie plików instancji Databricks) następnie są one przefiltrowane przez określony miesiąc, zagregowane i zapisane do pliku parquet.

Rozbierzmy notebook na czynniki pierwsze w celu lepszego zrozumienia procesu. W pierwszej kolejności importujemy potrzebne funkcje:

from pyspark.sql.functions import col
from pyspark.sql.functions import year, month, dayofmonth

Funkcje daty raczej nie wymagają wyjaśnienia, funkcja col jest przydatna do odwoływania się do konkretnej kolumny w ramach data frame’a. W dalszej kolejności mamy następujący kod:

dbutils.widgets.dropdown("paramMonth", "1", [str(x) for x in range(1, 12)])
paramMonth = dbutils.widgets.get("paramMonth")

Jest to definicja parametru naszego notebook’a który przyjmuje wartości od 1 do 12 odpowiadające miesiącom. Z parametrami w Databricks pracujemy za pomocą wbudowanej funkcji dbutils.widget – jest kilka różnych rodzajów tego typu obiektów ale myślę, że opiszę je w osobnym artykule. W dalszej części pobieramy wartość z widgetu i przypisujemy do zmiennej.
W poniższej części kodu odczytujemy interesujące nas dane i przypisujemy do dataframe o nazwie df_result:

df_result =(
spark.read.format("csv")
.option("sep", ",")
.option("inferSchema", "true")
.option("header", "true")
.load("/databricks-datasets/nyctaxi/tripdata/green/green_tripdata_2013-*.csv.gz")
)

Dalej mamy filtrację danych na podstawie wybranego miesiąca i zapis po agregacji w określonej lokalizacji:

 df_result
.withColumn("month",month("lpep_pickup_datetime"))
.withColumn("day",dayofmonth("lpep_pickup_datetime"))
.filter(col("month")==paramMonth)
.groupBy(["month","day"])
.count()
.withColumnRenamed("count","numberOfRecords")
.write.mode('overwrite')
.parquet("/tmp/output/result.parquet")

Ostatnią linijką kodu jaką zawarłem w notebooku jest zwrócenie wartości która w tym wypadku jest stringiem o wartości “success”:

dbutils.notebook.exit("success")

W celu sprawdzenia czy wszystko działa uruchomiłem powyższy kod i odpytałem powstałe pliki w celu sprawdzenia zawartości. Jak możecie zobaczyć poniżej wszystko działa zgodnie z oczekiwaniami:

 

Upewniając się, że wszystko jest ok i mając działający notebook przejdźmy do Azure Data Factory gdzie spróbujemy go uruchomić. Po stworzeniu pipeline’a możemy zobaczyć, że do dyspozycji mamy trzy aktywności związane z Databricks, a mianowicie:

  • Notebook – możliwość uruchomienia Notebooka,
  • Jar – możliwość uruchomienia skryptu jar,
  • Python – możliwość uruchomienia skryptu Python.

W niniejszym artykule skupimy się na uruchomieniu notebooków jednakże warto pamiętać, że pozostałe dwie opcje działają analogicznie i ich użycie sprowadza się do wskazania skryptów w systemie plików. Przejdźmy dalej i stwórzmy Linked Service do instancji Azure Databricks – przeciągnijmy zatem zadanie Notebook i tam na zakładce Azure Databricks wybierzmy New aby dodać nowe połączenie do ADB:

W pierwszej części okna zobaczymy standardowe ustawienia związane ze wskazaniem konkretnej instancji Databricks:

Jeśli chodzi o moc obliczeniową możemy wybrać nowy Job Cluster czyli klaster powołany specjalnie na potrzeby wykonania tego Notebook’a. Ponadto możemy wybrać również istniejący klaster interaktywny lub pule instancji, które miałyby wykonać zadanie. W przypadku wybrania nowego klastra musimy wyspecyfikować jego wydajność, ilość node’ów, ustawienie związane z autoskalowaniem itp. W niniejszej demonstracji posłużymy się istniejącym interaktywnym klastrem który stworzyłem wcześniej. W dalszej części mamy możliwość wskazania sposobu uwierzytelnienia:

Aby się uwierzytelnić do usługi mamy możliwość użycia Managed Identity lub Access Tokenu. W tym miejscu chciałem zaznaczyć, że zawsze wtedy gdy mamy możliwość użycia Managed Identity to powinniśmy to robić bo po pierwsze nie musimy nigdzie zapisywać sekretów ani innych haseł (tzw. passwordless), ani nie musimy tworzyć dedykowanych procesów do rolowania sekretów co oczywiście powinno mieć miejsce gdy takowe używamy.

Ze względu na fakt, iż dodanie Managed Identity do Databricks nie różni się niczym od dodawania użytkowników lub grup to w tym przypadku postanowiłem pokazać jak uwierzytelnić się przez Access Token.  Pierwszym krokiem żeby to zrobić jest wejście w ustawienia w workspace Databricks, a następnie wygenerowanie nowego tokenu:

Definicja tokenu nie powinna przysporzyć nikomu problemów i sprowadza się do dodania krótkiego komentarza opisującego oraz czasu życia samego tokenu. Jeśli nie podamy czasu życia tokenu to pozostanie on aktywny cały czas więc raczej powinniśmy tego unikać:

Po kliknięciu w powyższym oknie przycisku Generate naszym oczom powinien ukazać się sam token:

Możemy podać go w ADF w definicji Linked Service. Gdy już to zrobimy powinniśmy móc wybrać jeden z dostępnych klastrów oraz przetestować połączenie:

Przechodząc na zakładkę  Settings możemy wybrać Notebook, który chcemy uruchomić oraz w sekcji Base parameters podać wartości parametrów:

Powyżej podałem parametr “na sztywno” jednakże jak większość rzeczy w Data Factory mogła to być wartość kalkulowana w dowolny sposób. Ostatnią opcją jaką możemy ustawić jest możliwość dołączenia biblioteki do klastra:

Do wyboru mamy kilka typów/źródeł m.in:

  • jar,
  • egg,
  • pypi,
  • wheel,
  • maven.

Możemy również podać repozytorium gdzie określony moduł może zostać znaleziony. Biblioteka wyspecyfikowana w ten sposób zostanie doinstalowana do klastra jeśli jej tam nie ma. Mimo, iż stworzony przeze mnie notebook nie potrzebuje dodatkowych bibliotek to dopisałem w Package frazę simplejson==3.8.0 aby przetestować dołączenie biblioteki do klastra.

Mając wszystko skonfigurowane nie pozostaje mi nic innego jak uruchomić ADF’a i sprawdzić działanie całego mechanizmu. Warto wspomnieć, że w przypadku gdy wybierzemy istniejący klaster interaktywny to jeśli jest on wyłączony to samo żądanie wykonania notebook’a niejako go wybudzi. Jeśli wszystko zrobiliśmy dobrze powinniśmy dostać pozytywny rezultat wykonania:

Pierwszą rzeczą na jaką warto zwrócić uwagę jest output jaki otrzymaliśmy z samej aktywności:

Właściwość runPageUrl przeniesie nas do logów dostępnych w Databricks związanych z konkretnym wykonaniem. Ważna jest również  zwrócić uwagę na wartość runOutput gdzie znajduje się zwrócona przez nas wartość wyspecyfikowana przy pomocy funkcji dbutils.notebook.exit. W tym konkretnym przypadku jest to tylko string “success” ale mogłoby to być cokolwiek innego.

Po przejściu na link dostępny w runPageUrl zobaczymy następujący obrazek:

Jest to log wykonania od strony Databricks i jak widać mamy tutaj wszystkie najważniejsze informacje + podgląd wykonania każdej komórki notebooka. W czasie konfiguracji dołączyliśmy dodatkową bibliotekę do klastra i jest ona również widoczna powyżej w sekcji Dependent Libraries. Aby dodatkowo potwierdzić fakt doinstalowania biblioteki możemy przejść do właściwości klastra gdzie powinniśmy ją znaleźć:

Spróbujmy teraz uruchomić równolegle ten sam notebook tylko z różnymi wartościami parametrów:

Jak możecie się domyśleć jedno wywołanie przekazuje jako parametr 7 oznaczający lipiec oraz 8 jako sierpień. W kodzie notebook’a zmieniłem jedną rzecz, a mianowicie do nazwy pliku wynikowego dodałem właśnie wartość parametru:

 (df_result
.withColumn("month",month("lpep_pickup_datetime"))
.withColumn("day",dayofmonth("lpep_pickup_datetime"))
.filter(col("month")==paramMonth)
.groupBy(["month","day"])
.count()
.withColumnRenamed("count","numberOfRecords")
.write.mode('overwrite')
.parquet(f"/tmp/output/result_{paramMonth}") 
)

Po uruchomieniu ADF w logach widzimy, że wszystko przebiegło bez problemu i wywołania ozstały wykonane równolegle:

Powstałe pliki zostały zapisane w systemie plików DBFS:

Wszystko przebiegło bez większego problemu jednak pamiętać należy o tym, że klaster ma określone możliwości przetwarzania równoległego więc należy mieć to zawsze na uwadze. Oczywiście możliwe jest uruchamianie notebooków wewnątrz innych notebooków + ich zrównoleglanie za pomocą wątków jednak jest to nieco trudniejszy proces niż gdybyśmy chcieli to zrobić za pomocą ADF.  Z tego też powodu polecam używać właśnie ADF ze względu na łatwość budowania nawet bardzo skomplikowanych procesów. Podejście to bywa przydatne szczególnie wtedy gdy przetwarzanie danych w Databricks jest jednym z wielu kroków całego procesu, który musimy zamodelować – wtedy zdecydowanie użycie ADF jest rekomendowanym podejściem.

Leave a Reply