AzureDatabricks_ChangeDataFeed_00

Przechwytywanie zmian z Change Data Feed w Azure Databricks

Częstym problemem na jaki napotykamy przy pracach związanych z budową hurtowni danych jest fakt przechwytywania zmian zarówno z systemów źródłowych jak i z poszczególnych warstw wchodzących w skład naszej architektury systemu. Tego typu operacja jest w większości przypadków obligatoryjną częścią procesu ładowania ponieważ ładowanie za każdym razem całego zbioru danych jest albo nieefektywne albo wręcz niemożliwe do implementacji. Podobne problemy i dylematy pojawiają się w kwestii lakehouse i podobnych rozwiązań big data. Na przestrzeni lat powstało kilka wzorców projektowych które adresują ten problem i chciałbym powiedzieć o jednym z nich dostępnym w usłudze Azure Databricks, gdzie mamy do dyspozycji narzędzie Change Data Feed umożliwiające w łatwy sposób identyfikacje danych które się zmieniły. Serdecznie zapraszam do lektury.

Na wstępie chciałbym powiedzieć, że ogólnie rzecz biorąc od jakiegoś czasu popularność w rozwiązaniach opartych o Databricks zdobyło podejście nazywane Medallion Architecture. Architektura ta mówi nam o tym, że mamy trzy główne warstwy w naszym lakehouse, które jak sama nazwa wskazuje odpowiadają kolorom medali czyli Bronze, Silver oraz Gold.

Na powyższym rysunku możecie zauważyć charakterystykę każdej z warstw:

  • Bronze jest warstwą gdzie lądują surowe dane z systemów źródłowych,
  • Silver pobiera dane z Bronze z tym, że dane są dostosowane już do potrzeb analitycznych więc to właśnie w tym miejscu występują wszelkiego rodzaju filtracje, czyszczenia i wyliczenia,
  • Gold zawiera już dane dostosowane pod konkretne zastosowania biznesowe i to właśnie z tego miejsca są one odczytywane przez np. narzędzia raportowe.

Fizycznie każda z warstw ma formę Delta Lake co jest oczywiście od jakiegoś czasu domyślnym schematem składowania danych w Databricks. Wspominam o tym dlatego, że Delta Lake jest fundamentem pracy z opisywanym dziś mechanizmem Change Data Feed.  Pomiędzy poszczególnymi warstwami istotne jest to, żeby w wydajny sposób odczytywać tylko te wiersze, które się zmieniły. Właśnie w tym miejscu pojawia się mechanizm Change Data Feed pozwalający nam to osiągnąć. Dobrym wstępem dla kogoś wywodzącego się ze świata relacyhbego jest nawiązanie do mechanizmu Change Data Capture dostępnego w SQL Server. Z logicznego punktu widzenia mechanizmy te działają w zbliżony sposób jednakże ich implementacja jest całkowicie inna co jest dosyć oczywiste ze względu na fakt, że CDC był mechanizmem dostępnym w relacyjnej bazie danych a CDF opiera się o Delta Lake czyli zestaw plików Parquet z logiem transakcyjnym.

Jeśli chodzi o zastosowania to mamy kilka scenariuszy gdzie możemy pomyśleć o zastosowaniu CDF np.:

  • pomiędzy warstawami zachodzą aktualizacje oraz operacje usunięcia danych,
  • dane z systemów źródłowych również mają formę CDC,
  • jedynie niewielka część danych jest aktualizowana lub usuwana.
  • z różnych powodów potrzebujemy bardzo szczegółowego logować każde zdarzenie w określonej tabeli.

Są również sytuacje kiedy trzeba się zastanowić czy wybraliśmy odpowiedni mechanizm:

  • większość operacji wiąże się z aktualizacją lub usunięciem danych,
  • większość danych jest przeładowywanych w kolejnych cyklach ładowań.

Tak więc podchodźmy do decyzji o ładowaniu w sposób rozważny.

Przechodząc dalej pokażmy mechanizm na podstawie praktycznego przykładu. Załóżmy, że naszym celem jest przerzucenie danych z warstwy Bronze do warstwy Silver. Wszystkie moje przykłady będę wykonywał na klastrze Databricks na platformie Azure – część rzeczy będę wykonywał w Pythonie, inne w SQLu.

Tak więc na samym początku stwórzmy sobie zestaw testowy, którym będziemy się posługiwać:

salaries = [(1,"John","Doe","England", 20000), (2,"Mark","Dully","USA", 25000), (3,"Mia","Konev","Romania", 18000), (4,"Jane","Fercis","Canada", 32000),(5,"Tobias","Verwick","Norway", 22000) ]
schema_definition = ["Id","Name","Surname","Country","Salary"]

W skrócie:

  • salaries zawiera listę fikcyjnych osób wraz z kilkoma opisującymi atrybutami,
  • schema_definition to lista nagłówków dla zdefiniowanych w salaries wierszy.
spark.createDataFrame(data=salaries, schema = schema_definition).write.format("delta").mode("overwrite").saveAsTable("staging_salaries")

Powyższa komenda tworzy data frame na podstawie zdefiniowanych wcześniej obiektów i zapisuje na storage w formacie delta lake. Ponadto powyższa komenda zarejestruje ten obiekt dzięki czemu będziemy mogli się do tej tabeli odwoływać w prosty sposób po nazwie, a nie po lokalizacji.

W dalszym kroku stworzymy sobie data frame, który odczyta dane z powstałego powyżej delta lake i dorzuci dwie nowe kolumny tj. kontynent będący wynikiem prostej instrukcji warunkowej oraz SalaryInThousand czyli po prostu pensje wyrażoną w tysiącach. Dodatkowo z sql functions zaimportowałem samą instrukcję warunkową oraz funkcję col umożliwiającą odwoływanie się do kolumny dataframe’a:

from pyspark.sql.functions import col,when

df_staging_salaries = (
spark.read
 .format("delta")
 .table("staging_salaries")
 .withColumn("SalaryInThousands",col("Salary")/1000)
 .withColumn("Continent", when(col("Country") == "Romania","Europe")
 .when(col("Country") == "England","Europe")
 .when(col("Country") == "Canada","North America")
 .when(col("Country") == "Norway","Europe")
 .when(col("Country") == "USA","North America")
 .when(col("Country") == "Brazil","South America")
 .when(col("Country") == "Egypt","Africa")
 .otherwise("Undefined")
)

)
display(df_staging_salaries)

Efekt powyższego wygląda następująco:

Mając taki zestaw danych możemy powiedzieć, że jest to forma docelowa – zapiszmy to zatem jako delta lake o nazwie salaries:

df_staging_salaries.write.format("delta").mode("overwrite").saveAsTable("salaries")

To co do tej pory wykonaliśmy to oczywiście standardowe funkcjonalności związane z Delta Lake – powyższe działania możemy utożsamiać z ładowaniem inicjalnym. Do implementacji ładowania inkrementalnego wykorzystamy bohatera niniejszego artykułu czyli mechanizm Change Data Feed. Żeby móc go użyć musimy najpierw go włączyć, zrobimy to komendą SQL ALTER TABLE:

 

%sql
ALTER TABLE staging_salaries SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

Jeśli chcielibyśmy aby ten mechanizm był włączony dla wszystkich nowych tabel to możemy zmienić ustawienie globalnie:

spark.databricks.delta.properties.defaults.enableChangeDataFeed = true;

Mamy zatem CDF włączony na tabeli staging_salaries wykonajmy zatem kilka operacji np. aktualizacja:

%sql

UPDATE staging_salaries
SET Salary = 34000
WHERE Id =4

usunięcie:

%sql

DELETE FROM staging_salaries
WHERE Id = 1

oraz Insert:

%sql
INSERT INTO staging_salaries
VALUES (10,"Jose","Gaucho","Brazil", 160000),(11,"Ahim","Levy","Egypt", 150000)

Wszystkie te operacje wykonały się poprawnie i zostały odpowiednio zalogowane w CDF. Możemy je podejrzeć z poziomu SQL używając wbudowanej funkcji o nazwie table_changes gdzie jako pierwszy parametr przekazujemy nazwę interesującej nas tabeli, a parametr drugi oraz trzeci mówiące od której do której wersji chcemy odczytać dane (o których powiem za chwilę):

%sql
SELECT * FROM table_changes('staging_salaries', 2, 5) order by _commit_timestamp

Jak widzicie na powyższym zrzucie ekranowym każda operacja została zarejestrowana ze swoją wersją commita (_commit_version) oraz czasem wykonania (_commit_timestamp). Dodatkowo operacja update widoczna jest w dwóch wierszach gdzie mamy dostęp do wartości przed aktualizacją (update_preimage) oraz po aktualizacji (postimage).

Mając do dyspozycji łatwą metodę dostępu do tego typu loga możemy je zaczytać do dataframe, a następnie dodać interesujące nas kolumny wyliczane oraz za pomocą metody createOrReplaceTempView zarejestrować tymczasowy widok aby móc się do tego zbioru odwołać z poziomu SQL. Zwróćcie proszę uwagę na fakt, że z tabeli logującej odfiltrowałem te wiersze, które zawierają informacje o tym jakie wartości dany wiersz posiadał przed akutalizacją (update_preimage) – zrobiłem to po to aby zbiór wejściowy do operacji MERGE był unikalny:

df_updates = (
spark.read.format("delta")
 .option("readChangeData", True)
 .option("startingVersion", 2)
 .table('staging_salaries')
 .filter(col("_change_type")!="update_preimage")
 .withColumn("SalaryInThousands",col("Salary")/1000)
 .withColumn("Continent", when(col("Country") == "Romania","Europe")
 .when(col("Country") == "England","Europe")
 .when(col("Country") == "Canada","North America")
 .when(col("Country") == "Norway","Europe")
 .when(col("Country") == "USA","North America")
 .when(col("Country") == "Brazil","South America")
 .when(col("Country") == "Egypt","Africa")
 .otherwise("Undefined")
)
)
display(df_updates)

df_updates.createOrReplaceTempView("updates")

 

Teraz już nic nie stoi na przeszkodzie aby skonstruować instrukcje MERGE zgodnie z tym jak zaplanowaliśmy sobie odzwierciedlenie operacji w kolejnej warstwie:

%sql
MERGE INTO salaries t
USING updates s ON t.Id = s.Id
WHEN MATCHED AND s._change_type='update_postimage'
THEN UPDATE SET
Name = s.Name,
Surname = s.Surname,
Country = s.Country,
Salary = s.Salary,
Continent = s.Continent
WHEN MATCHED AND s._change_type='delete'
THEN DELETE
WHEN NOT MATCHED
THEN INSERT
(
Id,
Name,
Surname,
Country,
Salary,
SalaryInThousands,
Continent
)
VALUES
(
s.Id,
s.Name,
s.Surname,
s.Country,
s.Salary,
s.SalaryInThousands,
s.Continent
)

 

W efekcie widzimy, że wszystko odbyło się zgodnie z planem i cztery wiersze zostały wstawione, zaktualizowane lub usunięte:

Spójrzmy jeszcze jak przedstawia się efekt w postaci tabeli docelowej:

 

%sql

SELECT * FROM salaries

Gdybyśmy chcieli cyklicznie odświeżać dane to przy kolejnym ładowaniu wystarczy odpowiednio odpytać tabele logującą zmiany. Aby uzyskać efekt pobierania jedynie wierszy, które się zmieniły od ostatniego ładowania możemy całą operację oprzeć albo nr. _commit_version albo o _commit_timestamp. Standardowym podejściem jest to żeby zapisać informacje o tym co było ładowane ostatnio w bazie danych lub w pliku/plikach konfiguracyjnych i odpowiednio parametryzować właśnie odpytanie tabeli logów czyli na poniższym rysunku zamiast na sztywno przypisać wartość “2” nadać wartość pochodzącą z dziennika ładowania/parametru:

Powyższe komendy to oczywiście nie cała paleta możliwości. Poniżej kilka dodatkowych przykładów:

Zmiany w przedziale określonych wersji np. od wersji 1 do wersji 8:

 

updates = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 1) \
.option("endingVersion", 8) \
.table("staging_salaries")

Zmiany w określonym przedziale czasowym:

 

updates = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingTimestamp", '2021-08-29T14:55:29.000') \
.option("endingTimestamp", '2021-08-29T14:55:40.000') \
.table("staging_salaries")

Przy czym w powyżsyzm podejściu nie możemy “przekroczyć” wartości granicznych bo np. jeśli ostatnia wersja została zarejestrowana o 14:00, a my podamy 14:01 to otrzymamy błąd:

Na szczęście jak już wspomniałem większość operacji możemy w całości parametryzować dzięki czemu w dobrej implementacji nie dostaniemy powyższych błędów. Wartym uzupełnienia jest fakt, że historia zmian jest przechowywana na tej samej zasadzie co historia zmian w formacie DELTA i jest usuwana operacją VACUUM dlatego warto odpowiednio to ustawić według własnych potrzeb.

Na sam koniec chciałbym wspomnieć o tym gdzie tak naprawdę dane związane z CDF są składowane i w jaki sposób. Ogólnie rzecz biorąc wiemy, że delta lake jest przechowywany jako zestaw plików parquet oraz jako log transakcyjny w postaci jsona. W momencie włączenia mechanizmu CDF dane o pojedynczych operacjach będą przechowywane w folderze _change_data, który zapewne zauważycie w głównej lokalizacji swojej delta table obok plików parquet i delta_log:

To by było na tyle jeśli chodzi o mechanizm Change Data Feed w ramach Azure Databricks. Polecam potestować ponieważ mechanizm znajduje zastosowanie w wielu różnych scenariuszach. Pozdrawiam!

 

Leave a Reply