Зачем вам CDF или как упростить CDC-конвейер для Delta Lake

озеро данных Delta Lake CDC CDF, архитектура данных Delta Lake Spark, Spark Structured Streaming примеры курсы обучение Delta Lake, обучение архитекторов и инженеров Big Data, Школа Больших Данных Учебный Центр Коммерсант

Как реализовать CDC для Delta Lake: разбираемся с функцией Change Data Feed от Databricks, которая позволяет быстро узнать обо всех изменениях строк в дельта-таблицах озера данных. Польза и принципы работы CDF для дата-инженера и архитектора данных.

CDC для Delta Lake

Идея сбора и обработки не всего объема данных, а только изменений, произошедших в системах-источниках, сегодня активно используется дата-инженерами. Это называется CDC (Change Data Capture) и применяется в управлении ETL-конвейерами для корпоративных хранилищ и озер данных.  Чтобы поддерживать ACID-транзакции и масштабируемую обработку метаданных, объединяя потоковые и пакетные операции с большими данными, поверх озера данных на HDFS, S3 или Azure Data Lake Storage часто развертывается Delta Lake — коммерческая платформа Databricks, полностью совместимая со всеми API Apache Spark. Подробнее о преимуществах и принципах работы Delta Lake мы писали здесь и здесь.

Применение CDC-подхода к ETL-конвейерам Delta Lake позволяет упростить обработку измененных или добавленных данных. Но получение точных и актуальных данных для каждой таблицы в конвейерах остается довольно сложной задачей, решить которую можно с помощью функции Change Data Feed (CDF) в Delta Lake. Использование CDF упрощает реализацию конвейера, а также поддерживает операцию MERGE и управление версиями журналов Delta Lake.

Польза от включения CDF очевидна в следующих сценариях:

  • создание актуальных агрегированных материализованных представлений через обновление только случившихся изменений без повторной обработки полных базовых таблиц;
  • передача метаданных об изменениях в нижестоящие системы конвейера обработки ;
  • журналирование потока данных об изменениях в виде дельта-таблицы для мониторинга и аудита системных событий, включая удаления и обновления данных;
  • повышение производительности Delta Lake за счет ускорения и упрощения ETL/ELT-процессов благодаря обработке только изменения на уровне строк после начальных операций слияния, обновления или удаления.

Как работает эта функция, рассмотрим далее.

Принципы работы CDF-функции

CDF позволяет отслеживать изменения на уровне строк между версиями дельта-таблицы. Когда эта функция включена для дельта-таблицы, среда выполнения записывает события изменения для всех данных этой таблицы, включая данные строки и метаданные о выполненных операциях (вставка, удаление или обновление). Чтобы использовать CDF-функцию была доступна для таблицы Delta Lake, ее следует сперва включить, например, при создании таблицы:

CREATE TABLE table_name(first_col int, second_col STRING)
USING delta
TBLPROPERTIES (delta.enableChangeDataFeed = true)

Запросить данные об изменениях поможет функция table_changes, которая возвращает начальное и конечное представление обновленной строки, чтобы оценить различия в изменениях.

CDF Delta Lake, архитектура данных примеры курсы обучениеи
Принцип работы CDF-функции в Delta Lake

Помимо столбцов данных для CDF, данные об изменениях содержат столбцы метаданных, определяющие тип события изменения. Согласно документации Databricsk на Delta Lake, функция table_changes(table_str, start [,end]) возвращает журнал изменений в таблице Delta Lake с включенным каналом данных об изменениях. Эта функция имеет следующие параметры:

  • table_str – строка (STRING) с полным именем таблицы, которое может быть уточнено указанием текущей схемы данных (current_schema). Если имя таблицы содержит пробелы или точки, надо экранировать эти символы, используя обратные кавычки внутри строки.
  • start – первая версия (BIGINT) или отметка времени (TIMESTAMP) возвращаемого изменения;
  • end — необязательный параметр, последняя версия (BIGINT) или отметка времени (TIMESTAMP) изменения. Если этот параметр не задан, возвращаются все изменения от запуска до текущего изменения.

Чтобы вызвать функцию table_changes(table_str, start [,end]), необходимо иметь право на использование оператора SELECT для таблицы или же быть ее владельцем или иметь административные привилегии. В результате будет возвращены все столбцы таблицы table_str, а также следующие:

  • _change_type – строка (STRING НЕ NULL), которая указывает произошедшее изменение (вставка, удаление, update_preimage или update_postimage);
  • _commit_version – версия фиксации таблицы (BIGINT NOT NULL), связанная с изменением;
  • _commit_timestamp – отметка времени (TIMESTAMP NOT NULL), связанная с изменением.

Начальная и конечная версии дельта-таблиц, а также временные отметки начала и окончания для получения изменений включаются в запросы. Чтобы прочитать изменения из конкретной начальной версии в последнюю версию таблицы, надо указать только начальную версию или отметку времени. Например, следующий участок кода показывает, что версия фиксации изменений дельта-таблицы задана в виде целого числа, а отметка времени в виде строки в формате гггг-ММ-дд[ЧЧ:мм:сс[.SSS]].

-- version as ints or longs e.g. changes from version 0 to 10
SELECT * FROM table_changes('tableName', 0, 10)

-- timestamp as string formatted timestamps
SELECT * FROM table_changes('tableName', '2021-04-21 05:45:46', '2023-05-21 12:00:00')

-- providing only the startingVersion/timestamp
SELECT * FROM table_changes('tableName', 0)

-- database/schema names inside the string for table name, with backticks for escaping dots and special characters
SELECT * FROM table_changes('dbName.`dotted.tableName`', '2023-04-21 06:45:46' , '2021-05-21 12:00:00')

-- path based tables
SELECT * FROM table_changes_by_path('\path', '2023-04-21 05:45:46')

Таким образом, с помощью CDF-функции можно фиксировать пакетный уровень измененных данных в Delta Lakes. А подробные сведения об этих изменениях с метаданными событий для каждой измененной строки позволяют дата-инженеру точно понять, что и когда произошло.

Узнайте больше подробностей по проектированию и поддержке современных дата-архитектур в проектах аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/globant/simplifying-change-data-capture-cdc-with-delta-lakes-change-data-feed-cdf-3899c1731ef2
  2. https://docs.databricks.com/sql/language-manual/functions/table_changes.html
Поиск по сайту