Azure Data Factory jest kluczowym serwisem pozwalającym tworzyć skalowalne przepływy danych. Nawet najlepiej zaplanowany przepływ musi mieć zaimplementowaną logikę na wypadek wystąpienia błędu, w tym również możliwość przechwycenia konkretnego komunikatu związanego z błędem. Implementacja takiego mechanizmu może zostać wykonana na kilka różnych sposobów – jednym z nich jest odpytanie serwisu z poziomu żądania REST API, które to chciałbym przedstawić w ramach niniejszego wpisu.
Przechodząc do meritum przedstawmy scenariusz testowy. Składa się on z trzech pipeline:
- TasksPackage – pipeline wywołujący zadania.
- ErrorHandlingPackage – pipeline obsługujący błędy.
- MainPackage – główny pipeline wywołujący powyższe dwa.
W przypadku pipeline odpowiedzialnego za obsługę błędów mamy kilka rzeczy do wyjaśnienia. Błąd będziemy chcieli przechwycić i przechowywać w bazie danych dlatego też stworzyłem w Azure SQL Database prostą tabelę o następującej strukturze:
CREATE TABLE dbo.ADFStatus ( [PipelineRunID] NVARCHAR(100) ,[PipelineName] NVARCHAR(100) ,[Status] NVARCHAR(20) ,[ErrorMessage] NVARCHAR(250) ) GO
Myślę, że powyższa struktura jest na tyle prosta, że nie wymaga dodatkowego wyjaśnienia. Dane do tej tabeli będą trafiać poprzez procedurę składowaną, która również jest bardzo prosta w swojej strukturze:
CREATE PROC dbo.spInsertLog @PipelineRunID NVARCHAR(100) ,@PipelineName NVARCHAR(100) ,@Status NVARCHAR(20) ,@ErrorMessage NVARCHAR(250) AS BEGIN INSERT INTO dbo.ADFStatus ( [PipelineRunID] ,[PipelineName] ,[Status] ,[ErrorMessage] ) SELECT @PipelineRunID ,@PipelineName ,@Status ,@ErrorMessage END GO
Oczywiście nic nie stoi na przeszkodzie abyśmy cały mechanizm mocno rozbudowali jednak w celach testowych powyższe podejście w zupełności wystarczy. Mając już przygotowane podstawowe struktury możemy przejść do odpytania API Data Factory w celu uzyskania interesujących nas informacji. Zapytanie które będziemy wysyłać powinno mieć następującą strukturę:
https://management.azure.com/subscriptions/IDSubskrypcji/resourceGroups/nazwaResourceGroup/providers/Microsoft.DataFactory/factories/nazwaDataFactory/pipelineruns/IdUruchomieniaPipeline?api-version=2018-06-01
Jeśli chodzi o Id subskrypcji oraz nazwę Resource Group to tą pierwszą możemy dosyć łatwo wyciągnąć przez REST drugą nieco trudniej ale ogólnie możemy obie te wartości parametryzować i przechowywać w bazie w tabeli konfiguracyjnej – ja na ten moment przypiszę je na sztywno w URL. Przechodząc dalej jeśli chodzi o nazwę Data Factory oraz identyfikator uruchomienia konkretnego pipeline’a to te informacje znajdziemy w zmiennych systemowych samego ADFa.
Mając na uwadze powyższe informacje możemy przejść do parametryzowania naszej paczki odpowiadającej za obsługę błędów. Stworzyłem dwa parametry odpowiadające powyższym informacjom:
Następnie stworzyłem zmienna która będzie przechowywać zbudowany przez nas URL:
Przypasania wartości do zmiennej możemy dokonać poprzez zadanie Set variable – jego konfiguracja nie jest niczym szczególnym i sprowadza się do wywołania funkcji concat i poskładania całego adresu z poszczególnych składników:
@concat('https://management.azure.com/subscriptions/283416s6f-e4ed-4977-a474-9f8c1c4a78c8/resourceGroups/seequalityRG/providers/Microsoft.DataFactory/factories/seequalityADF/pipelineruns/', pipeline().parameters.PipelineRunID ,'?api-version=2018-06-01')
Jak już wspomniałem poszczególne elementy tego URL polecam parametryzować aby mieć pełną kontrolę nad tworzonym przepływem. W momencie gdy mamy już URL to możemy przejść do wysłania żądania – do tego celu wykorzystamy zadanie Web Activity.
Jego konfiguracja sprowadza się do podania URL (w naszym przypadku zmiennej, która ma przypisaną odpowiednią wartość), wybrania metody którą w tym wypadku jest GET oraz podania Resource do którego się odwołujemy czyli https://management.azure.com.
Na powyższym zrzucie ekranowym możecie zobaczyć, że jako sposób uwierzytelnienia do zasobu wybrałem Managed Identity czyli w skrócie rzecz ujmując ADF będzie odpytywał sam siebie używając swojej własnej tożsamości. Co ciekawe domyślnie rzecz biorąc ADF nie posiada praw do sprawdzania logów swoich własnych przepływów dlatego musimy mu je nadać. Zrobimy to w standardowy sposób na zakładce zasobu wchodzimy w Access control (IAM) i tam wprowadzamy managed identity czyli wyszukujemy tożsamość o takiej samej nazwie jak nasze Data Factory. W moim przypadku nadałem rolę Contributor, czyli z poziomu pipeline’a będziemy mogli zrobić niemal wszystko (oprócz zarządzania security):
Przechodząc dalej możemy dodać zadanie zapisujące przechwycone błędy w bazie danych wywołując przygotowaną procedurę przekazując wartości parametrów:
Na powyższym zrzucie ekranowym widać, że wartości parametrów pochodzą z różnych źródeł. Wartości ErrorMessage oraz Status są brane z rezultatu działania zadania Web Activity, a PipelineName oraz PipelineRunID pochodzą z parametrów. Aby poznać strukturę zwracaną przez Web Activity możemy uruchomić paczkę obsługującą błędy w trybie Debug i podanie nazwy pipeline’a oraz identyfikatora uruchomienia, które wykonaliśmy już wcześniej i zakończyło się błędem:
W taki właśnie sposób poznamy przykładowąstrukturę tego co zwróci Web Activity czyli interesujące nas kolumny status oraz message:
Mamy zatem całość wyrażeń potrzebnych do uruchomienia naszej procedury logującej:
- ErrorMessage: @activity(‘Retrieve Status’).output.message
- PipelineName: @pipeline().parameters.PipelineName
- PipelineRunID: @pipeline().parameters.PipelineRunID
- Status: @activity(‘Retrieve Status’).output.status
Konfigurację pipeline’a przechwytującego błędy mamy za sobą. Kolejny na liście jest TasksPackage, który posiada tylko jedno zadanianie, które (celowo) ma wywołać błąd. Możemy to osiągnąć bardzo prosto próbując przypisać do zmiennej wynik dzielenia przez zero – co w każdym wypadku będzie skutkowało błędem i pozwoli nam przetestować cały mechanizm:
Nasze składowe są już poprawnie przygotowane więc możemy przejść do głównego przepływu czyli MainPackage. Składa się on oczywiście z wywołań wcześniej przygotowanych pipeline’ów gdzie obsługa błędów ma zostać wywołana tylko w przypadku wystąpienia błędu przy pierwszym przepływie czyli łączymy oba zadania czerwoną strzałką tak jak na poniższym zrzucie:
Pamiętamy, że pipeline zapisujący błędy wymaga podania dwóch parametrów czyli identyfikatora ładowania oraz nazwy. Pamiętamy, że każdy pipeline dostaje swój własny identyfikator dlatego my przekazujemy identyfikator z poprzedzajacego zadania Execute Pipeline @activity(‘Execute Execution Package’).output.pipelineRunId jako nazwę przekażemy nazwę bieżącego pakietu w którym się znajdujemy czyli wykorzystamy zmienną @pipeline().Pipeline:
Tak przygotowany pakiet możemy opublikować i uruchomić. Efekty naszych działań powinny pojawić się w miarę szybko na zakładce Monitor:
Po sprawdzeniu w bazie widzimy, że całość została zalogowana tak jakbyśmy tego oczekiwali:
W celu upewnienia się, że wszystko działa jak należy możemy spróbować wywołać inny błąd – zamiast dzielenia przez zero spróbujmy konwersji wartości tekstowej ‘abc’ na liczbę całkowitą:
Również tym razem wszystko zakończyło się zgodnie z oczekiwaniami.
Jak możecie zauważyć cały mechanizm nie jest trudny jednak z całą pewnością nie jest trywialny. Przygotowany w taki sposób pipeline przechwytujący błędy może być podpięty w wielu różnych miejscach dzięki czemu będziemy w stanie obsłużyć postawione przed nami zadania. Mam nadzieję, że przedstawiony tutorial okazał się dla was przydatny – pozdrawiam!
- Executing SQL queries from Azure DevOps using Service Connection credentials - August 28, 2024
- Setup Git credentials for Service Principal in Azure Databricks - August 21, 2024
- Microsoft Fabric 101 Episode 3: Pausing and Scaling using portal and Powershell - August 8, 2024
Last comments