Идемпотентность приложений Apache Spark Structured Streaming в Delta Lake

архитектура данных Delta Lake Spark, Spark Structured Streaming примеры курсы обучение Delta Lake, обучение архитекторов и инженеров Big Data, Школа Больших Данных Учебный Центр Коммерсант

Продолжая недавний разговор про Apache Spark Structured Streaming, сегодня рассмотрим, как этот движок потоковой обработки данных помогает дата-инженеру реализовать идемпотентную запись в таблицы Delta Lake, а также выполнить операции слияния и обновления/вставки в помощью метода foreachBatch().

Идемпотентность потоковых приложений Apache Spark

Идемпотентность – важное свойство распределенных систем, которое гарантирует, что повторные вызовы какой-либо функции дадут те же результаты, что и ее первичное выполнение. В частности, HTTP-метод GET, активно используемый в REST API для получения данных, является идемпотентным. А вот HTTP-метод POST, используемый для создания нового ресурса, каждый раз возвращает новые результаты, поэтому не является идемпотентным. Подробнее об этом мы писали здесь и здесь.

Наличие идемпотентного API является критически важным во многих ситуациях, а потому поддерживается в большинстве Big Data фреймворках. В частности, Apache Spark Structured Streaming по умолчанию гарантирует однократную отказоустойчивость с использованием контрольных точек и журналов с упреждающей записью, что мы рассматриваем в этой статье. Однако, это не касается метода foreachBatch(), который применяется пакетными API Spark SQL к потоковым данным или используется для записи выходных данных в несколько мест назначения. Также foreachBatch() полезен для вызова таких функций, как MERGE INTO, для обновления нижестоящих таблиц Delta Lake – уровня хранилища данных с открытым исходным кодом, который обеспечивает надежность озера данных, поддерживает ACID-транзакции и масштабируемую обработку метаданных, объединяя потоковые и пакетные операции. Delta Lake работает на базе существующего озера данных на HDFS, Amazon S3 или Azure Data Lake Storage, будучи полностью совместимо со всеми API Apache Spark. Подробнее о том, что такое Delta Lake, мы рассказывали в этой статье.

Возвращаясь к методу foreachBatch() в пакете Spark streamingDF.writeStream, который позволяет указать функцию, выполняемую для выходных данных каждого микропакета потокового запроса, отметим, что этот метод имеет два параметра: датафрейм с выходными данными микропакета, и его уникальный идентификатор. В случае сбоя или прерывания потока внутри foreachBatch() может возникнуть дублирование в целевых таблицах. Пример использования этого метода смотрите в нашей новой статье про потоковую публикацию данных во внешний REST API с Apache Spark Streaming.

Чтобы предотвратить эту проблемы, разработчики Databricks, под эгидой которого выходит Delta Lake, используют 2 параметра потокового запроса:

  • txnAppId – идентификатор приложения, записывающего данные в целевую дельта-таблицу. Это нужно, потому что дельта-таблица может иметь несколько потоков, записывающих в нее параллельно, и каждый из них может быть отдельным приложением.
  • txnVersion — серийный номер, указывающий версию записываемых данных приложения. В случае foreachBatch() это обычно будет идентификатор пакета.

Псевдокод этих двух параметров выглядит так:

txnVersion = options.get("txnVersion")
txnAppId = options.get("txnAppId")

Класс записи просто пропустит запись, если полученная версия ранее была видна в дельта-таблице для того же appId. Журнал транзакций дельта-таблицы не обязательно должен хранить все исторические версии, достаточно только последней версии для каждого идентификатора приложения. Псевдокод этой логики выглядит так:

def hasTransactionBeenExecutedBefore(table, txnVersion, txnAppId)    
latestSeenVersion = table.getLatestSeenVersionForApp(txnAppId)    
if (latestSeenVersion >= txnVersion)        
return true
    else
        return false

Благодаря тому, что при каждой следующей записи Delta Lake проверяет, была ли эта попытка раньше или нет, потоковое приложение может продолжить операцию или пропустить ее. Эта простая концепция может быть полезна и в сценариях без потоковой передачи: действие идентификатора транзакции всегда будет записано в дельта-лог JSON или в файлы Parquet. При этом Delta Lake нет необходимости просматривать полный список версий для реализации этой логики, достаточно только последней записи каждого идентификатора приложения.

Стоит помнить, что при удалении контрольной точки потоковой передачи и перезапуске запроса с новой контрольной точкой, следует указать другой appId. Иначе записи из перезапущенного запроса будут игнорироваться, поскольку он будет содержать тот же txnAppId, а идентификатор пакета будет начинаться с 0. Это обычно бывает при попытке начать заново, но вместо этого при воссоздании целевой дельта-таблицы просто удаляются существующие записи. Таблица будет пустой с точки зрения данных, но журнал транзакций по-прежнему будет иметь следы предыдущего запуска потоковой передачи и, следовательно, просто пропустит все идентификаторы пакетов, которые были просмотрены ранее.

Изменив уровень ведения журнала на INFO, можно увидеть сообщения, указывающие на пропуск микропакетов, поскольку транзакция записи версии 0 для приложения с идентификатором idempotent_app уже зафиксирована в Delta-таблице и запись будет пропущена. Это исключает перезапись последнего зафиксированного микропакета в целевую дельта-таблицу, в типовом случае возобновления отказавшего потокового приложения.

Таким образом, модернизация функции foreachBatch() полезна при записи в таблицы Delta Lake при использовании класса DataFrameWriter. Она позволяет выполнить идемпотентную запись в дельта-таблицы, чтобы сделать потоковые рабочие нагрузки устойчивыми к временным сбоям. Без этого могут возникнуть дубликаты, которые очень трудно обнаружить на практике. Однако, эта функция доступна только из API DataFrameWriter, ее пока нельзя использовать для запуска таких команд, как MERGE INTO. Впрочем, далее мы рассмотрим, как обойти это ограничение. Аналогично при работе с Parquet-файлами или JDBC, придется решать проблему идемпотентности самостоятельно.

Оптимизация таблиц Delta Lake через сортировку

Если добавлять файлы в дельта-таблицу и не сортировать их, то со временем придется читать много файлов во время слияния. Поэтому лучше оптимизировать дельта-таблицу после каждых N слияний, количество которых зависит от требований к задержке обработки данных. Следующий код запустит команду оптимизации и сортировки для заданной таблицы, которая загружается потоком. Команды оптимизации не могут выполняться изолированно, потому что это потребует приостановки, а затем возобновления потока. Поэтому эта операция является частью функции upsert, позволяя оптимизировать чтение до поступления следующего пакета потоковых данных.

from timeit import default_timer as timer

def optimize_and_zorder_table(table_name: str, zorder_by_col_name: str) -> None:
"""
Parameters:
table_name: str
name of the table to be optimized
zorder_by_col_name: str
comma separated list of columns to zorder by. example "col_a, col_b, col_c"
"""
start = timer()
print(f"Met condition to optimize table {table_name}")
sql_query_optimize = f"OPTIMIZE {table_name} ZORDER BY ({zorder_by_col_name})"
spark.sql(sql_query_optimize)
end = timer()
time_elapsed_seconds = end - start
print(
f"Successfully optimized table {table_name} . Total time elapsed: {time_elapsed_seconds} seconds"
)

В заключение используем предыдущую целевую таблицу в качестве нового источника потоковой передачи для следующего конвейера Spark Structured Streaming. Веб-канал данных об изменениях позволяет отслеживать изменения на уровне строк между версиями таблицы Delta Lake. Если эта функция включена для дельта-таблицы, среда выполнения записывает события изменения для всех данных, записанных в таблицу. Сюда входят данные строки вместе с метаданными, указывающими, была ли указанная строка вставлена, удалена или обновлена. С какими ошибками при этом можно столкнуться и как их решать, читайте в нашей новой статье.

(
generated_df
.writeStream.format('delta')
.trigger(processingTime='30 seconds')
.option("checkpointLocation", check_point_location)
.foreachBatch(make_changes_using_the_micro_batch)
.start()
)

Узнайте больше про использование Apache Spark в современных дата-архитектурах аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://towardsdatascience.com/idempotent-writes-to-delta-lake-tables-96f49addd4aa
  2. https://canadiandataguy.medium.com/using-spark-streaming-to-merge-upsert-data-into-a-delta-lake-6d34c827a892
  3. https://docs.databricks.com/structured-streaming/foreach.html
Поиск по сайту