Как применить триггеры Apache Spark Structured Streaming для пакетных заданий

озеро данных, архитектура данных Delta Lake Spark, Spark Structured Streaming примеры курсы обучение Delta Lake, обучение архитекторов и инженеров Big Data, Школа Больших Данных Учебный Центр Коммерсант

Можно ли применять Apache Spark Structured Streaming для пакетных заданий и в каких случаях это целесообразно. Разбираемся, как устроена потоковая передача событий в Spark Structured Streaming, с какой частотой разные режимы триггеров микропакетной обработки данных запускают потоковые вычисления и что выбрать дата-инженеру.

Потоковая передача событий и пакетные задания: versus или вместе

Иногда дата-инженеры сталкиваются с ситуацией, когда не нужно каждый раз перечитывать или переписывать новый набор данных при обработке обновлений или вставок в хранилище. Например, когда данных слишком много, или архитектурные особенности хранилища ограничивают возможности применения upsert-операций, что характерно для AWS S3, Delta Lake и NoSQL-СУБД, о чем мы писали здесь и здесь. В частности, с большими файлами колоночного формата Parquet в AWS S3 можно отлично выполнять операции чтения и записи, но нельзя легко обновить одну точку данных без удаления и повторного создания всего набора. Дата-инженеру приходится использовать эту методологию перезаписи данных вместо отслеживания обновлений или применения типовых SQL-операторов.

В таких случаях можно создать потоковые задания, которые будут непрерывно читать или записывать данные из внешних источников. Главным преимуществом этого метода является возможность создания контрольных точек данных для облегчения повторной обработки при внезапных сбоях. Однако, такие задания в конечном итоге простаивают достаточно много времени или просто являются излишне сложными для простого процесса.

Поэтому дата-инженеру нужно искать компромисс, чтобы определить, насколько велик созданный набор данных для круглосуточной потоковой передачи и насколько велики риски/затраты на повторные операции его чтения или записи без возможности создания контрольной точки. При этом, даже если нет фактической необходимости обрабатывать поступившие данные в режиме реального времени, но есть потребность избежать повторной обработки всего датасета в случае сбоя, потоковые задания отлично решат эту проблему.

В потоковых системах данные поступают непрерывно и обрабатываются сразу, в режиме почти реального времени. Тем не менее, фактическую обработку запускает не само событие поступления данных, а связанный с этим триггер. Как это реализовано в Apache Spark Structured Streaming, рассмотрим далее.

Триггеры в Apache Spark Structured Streaming

Structured Streaming — это масштабируемый и отказоустойчивый механизм обработки потоков на основе механизма Spark SQL, который позволяет выполнять потоковые вычисления аналогично пакетным операциям со статическими данными. Оптимизированный механизм Spark SQL заботится о постепенном и непрерывном запуске и обновлении конечного результата по мере поступления потоковых данных. Apache Spark Structured Streaming позволяет использовать Dataset/DataFrame API в Scala, Java, Python или R для выражения потоковой агрегации, временных окон для событий, а также соединения потоков и пакетов. Также движок гарантирует сквозную отказоустойчивость со строго однократной доставкой данных (exactly once) с помощью контрольных точек и журналов с опережающей записью.

В Spark Structured Streaming запросы по умолчанию обрабатываются с помощью механизма микропакетной обработки, который представляет потоки данных как серию небольших пакетных заданий. Благодаря этому сквозные задержки составляют не более 100 миллисекунд, т.е. в режиме почти реального времени. Начиная с версии 2.3 в Spark есть режим обработки с малой задержкой, называемый непрерывной обработкой, который может обеспечить сквозную задержку до 1 миллисекунды с гарантией хотя бы один раз (at least once). Выбрать нужный режим в зависимости от потребностей распределенного приложения можно, не изменяя операции Dataset/DataFrame в запросах.

Фактически Spark Streaming состоит из трех основных компонентов: источников и приемников данных, а также механизма их обработки, т.е. вычислительного движка. Источники данных считывают их из топиков Kafka, Flume, реляционных баз, HDFS или других файловых систем, а также объектных хранилищ типа AWS S3. Движок обрабатывает эти входящие данные, а приемники данных получают обработанные данные и отправляют их в постоянные места назначения: внешние реляционные базы, файловые системы, топики Kafka или объектные хранилища.

Как уже было отмечено, в потоковой обработке вообще и в Spark Structured Streaming в частности, фактическую обработку запускает не само событие поступления данных, а связанный с этим триггер. Параметры триггера потокового запроса определяют время обработки потоковых данных, независимо от того, будет ли запрос выполняться как микропакетный запрос с фиксированным пакетным интервалом или как запрос непрерывной обработки.

В Apache Spark Structured Streaming есть следующие типы триггеров потокового запроса:

  • Не специализированный (unspecified) – это режим по умолчанию, если явно не указана настройка триггера, то по умолчанию запрос будет выполняться в режиме микропакетов. Каждый новый микропакет генерируется, как только завершится обработка предыдущего микропакета.
  • Микропакеты с фиксированным интервалом (Fixed interval micro-batches) – запрос будет выполняться в режиме микропакетов, при котором микропакеты будут запускаться с заданными пользователем интервалами. Если предыдущий микропакет завершается в течение заданного интервала, движок будет ждать окончания интервала, прежде чем запускать следующий микропакет. Если для завершения предыдущего микропакета требуется больше времени, чем заданный интервал, то следующий микропакет начнется, как только завершится предыдущий, не ожидая границы следующего интервала. А если новых данных нет, новый микропакет не будет запущен. К примеру, если время обработки предыдущего пакета превышает указанный интервал, следующий пакет будет выполнен немедленно. Если время обработки задано на 1 минуту, а текущий микропакет занимает 35 секунд, запуск следующего начнется только через 25 секунд. Если микропакет занимает 70 секунд, то следующий будет выполнен немедленно.
  • одноразовый микропакет (One-time micro-batch) – запрос выполнит только один микропакет для обработки всех доступных данных, а затем остановится. Это полезно в сценариях, когда нужно периодически запускать кластер, обрабатывать все, что было доступно с момента последнего периода, а затем выключать кластер. Иногда это помогает сэкономить средства на использование облачных ресурсов. С этим режимом метод триггера trigger(Trigger.Once()) запустит один микропакет в запросе: он обработает все доступные данные, а затем остановит приложение.
  • Все доступные микропакеты (Available-now micro-batch) – этот режим похож на однократный запуск микропакета, но запрос обрабатывает все доступные данные, а затем останавливается. При этом данные могут содержаться в нескольких микропакетах на основе значений параметров источника данных, заданных, например, в конфигурации maxFilesPerTrigger для источника файла. Этот режим дает лучшую масштабируемость запроса, позволяя разделить большой пакет, который триггер может сделать один раз.
  • Непрерывный с фиксированным интервалом между контрольными точками (Continuous with fixed checkpoint interval) – этот режим еще находится в статусе экспериментального. Он предполагает, что запрос будет выполняться в новом режиме непрерывной обработки с малой задержкой.

Рассмотрим, как реализуются эти режимы в Spark Scala:

import org.apache.spark.sql.streaming.Trigger

// Default trigger (runs micro-batch as soon as it can)
df.writeStream
  .format("console")
  .start()

// ProcessingTime trigger with two-seconds micro-batch interval
df.writeStream
  .format("console")
  .trigger(Trigger.ProcessingTime("2 seconds"))
  .start()

// One-time trigger
df.writeStream
  .format("console")
  .trigger(Trigger.Once())
  .start()

// Available-now trigger
df.writeStream
  .format("console")
  .trigger(Trigger.AvailableNow())
  .start()

// Continuous trigger with one-second checkpointing interval
df.writeStream
  .format("console")
  .trigger(Trigger.Continuous("1 second"))
  .start()

По сути, режим триггера указывает, как часто поток должен создавать результаты, т.е. как с какой периодичностью будет выполняться вызов метода Trigger.Once() для набора данных. Если явно не задан никакой режим триггера, запрос Spark Structured Streaming будет выполняться в микропакетном режиме: триггер выполняет следующий пакет, как только завершается предыдущий. Пример реализации этого кода на Spark Scala будет выглядеть так:

events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.trigger(Trigger.Once)
.start("/tmp/delta/events")

В этом участке кода testDataFrame — это данные, которые нужно записать в какой-то источник данных. Также указана опция контрольной точки, что предохраняет от повторного выполнения операций в случае сбоя. Метод Trigger.Once() значительно снижает затраты на поддержание постоянного потока и требует меньше настроек. Также можно использовать другие методы, например, processAllAvaiable(), который пригодится, если требуется большая повторная обработка вместо ручного отключения и перезапуска потока. Он блокирует обработку данных до тех пор, пока все доступные данные в источнике не будут обработаны и переданы в приемник. Не рекомендуется использовать processAllAvailable с заданиями в производственной среде, поскольку неожиданная загрузка данных может вызвать серьезные проблемы. Но этот метод отлично подходит для тестирования или повторной обработки всех данных. Как перейти от пакетной обработки к потоковой, используя микросервисный подход, читайте в нашей новой статье.

Освойте использование Apache Spark в аналитике больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://d-hanshew.medium.com/using-spark-streaming-for-batch-jobs-f62413c33f10
  2. https://www.projectpro.io/recipes/explain-spark-streaming-triggers
  3. https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
Поиск по сайту