Как ускорить Apache Spark Structured Streaming: 3 главных параметра потоковой обработки

Spark Structured Streaming для дата-инженеров и разработчиков, Spark Structured Streaming настройка, потоковая обработка Spark Structured Streaming, Spark Structured Streaming примеры курсы обучение, Школа Больших Данных Учебный центр Коммерсант

Как размер пакета, режим вывода и интервал срабатывания триггера потоковой обработки влияют на скорость вычислений в приложении Apache Spark Structured Streaming и как настроить эти параметры.

Размер пакета при потоковой обработке данных в Spark Streaming

Хотя скорость обработки данных средствами Apache Spark Streaming зависит от многих факторов, включая саму структуру и формат обрабатываемых данных, в большинстве случаев ее можно повысить, задав оптимальные настройки следующих параметров:

  • размер пакета;
  • режим вывода;
  • интервал срабатывания триггера потоковой обработки.

Будучи основанном на Spark SQL, Structured Streaming используется для масштабируемой и отказоустойчивой потоковой обработки, позволяя выполнять вычисления с потоками данных аналогично пакетным операциям на статических записях. Структурированная потоковая передача Spark не материализует всю таблицу потоковых данных, а считывает последние доступные данные из источника и обрабатывает их пошагово для обновления результата, а затем отбрасывает исходные данные. Сохраняются только минимальные промежуточные данные состояния, необходимые для обновления результата, например, счетчики. Такая модель потоковых вычислений разгружает дата-инженера, освобождая его от поддержки запущенных агрегаций, самостоятельного обеспечения отказоустойчивости и согласованности данных.

По умолчанию запросы Spark Structured Streaming выполняются с помощью микропакетного процессора, который обрабатывает потоки данных как серию небольших пакетных заданий. Это происходит почти в реальном времени: сквозная задержка обработки данных составляет всего около 100 миллисекунд с гарантией отказоустойчивости exactly-once. А с версии Spark 2.3 добавлен режим постоянной обработки  Continuous Processing, при котором сквозные задержки обработки не превышают 1 миллисекунды с гарантией at least-once.

Хотя считается, что в потоковой парадигме данные поступают непрерывно и обрабатываются сразу же, в почти реальном времени, на практике это не совсем так: фактическую операцию с новыми  запускает не само событие поступления данных, а связанный с этим триггер. Настройки триггера запроса определяют время обработки потоковых данных, независимо от того, будет ли запрос выполняться как микропакетный запрос с фиксированным пакетным интервалом или как запрос непрерывной обработки. Подробнее об этом мы писали здесь.

Управлять скоростью обработки потоковых данных можно, ограничив размер входного пакета от источника данных. Какие источники входных данных поддерживает Apache Spark, мы разбирали здесь. Из них в промышленном развертывании используются только файловые источники и Kafka, поскольку сокет, а также генераторы rate source и Rate Per Micro-Batch source, обычно используются для тестирования. Поэтому ограничить скорость поступления данных от файловых источников и Apache Kafka можно с помощью следующих конфигураций:

  • maxFilesPerTrigger или maxFilesPerTrigger для облачного автозагрузчика Databricks (Auto Loader) указывает верхнюю границу для количества файлов, обрабатываемых в каждом микропакете. Для Delta Lake и Auto Loader это значение по умолчанию равно 1000. Эта опция также доступна для других файловых источников без максимального значения по умолчанию.
  • maxBytesPerTrigger или maxBytesPerTrigger для Auto Loader устанавливает приблизительный максимум для объема данных, обрабатываемых в каждом микропакете. Это означает, что пакет обрабатывает приблизительно этот объем данных и может обрабатывать больше лимита, чтобы потоковый запрос продвигался вперед в случаях, когда наименьшая входная единица больше этого лимита. Для этого параметра нет значения по умолчанию. Например, если задано ограничение 10 ГБ, но есть файлы размером 3 ГБ каждый, будет обработано 12 ГБ в одном микропакете.
  • maxOffsetsPerTrigger – максимальное количество обработанных смещений за интервал срабатывания при потреблении потоковых данных из Apache Kafka. Указанное общее количество смещений будет пропорционально разделено между разделами топика разного объема.

При одновременном использовании maxBytesPerTrigger и maxFilesPerTrigger, микропакет обрабатывает данные до достижения нижнего предела любой из обеих конфигураций.

Режим вывода

Режим вывода определяет, как данные обрабатываются и помещаются в приемник. Как мы уже упоминали здесь, в Spark Structured Streaming есть 3 режима вывода, которые сообщают оператору, какие записи следует выдавать при определенном срабатывании триггера:

  • Добавление (Append) – режим по умолчанию, когда операторы выдают только те строки, которые не изменяются в будущих триггерах. Чтобы определить, когда это происходит, stateful-операторы используют водяной знак.
  • Обновление (Upsert) – режим, в котором операторы выдают все строки, которые изменились во время текущего триггера, даже если выданная запись может измениться в последующем триггере;
  • Полный режим (Complete), который работает только с потоковыми агрегациями, когда все результирующие строки, когда-либо созданные оператором, выдаются далее по конвейеру обработки.

Обычно приходится выбирать между режимами добавления и обновления. Режим вывода влияет на то, сколько времени должно пройти до записи данных. Частота и объем записываемых данных могут повлиять на время выполнения потоковой обработки. В режиме добавления stateful-операторы выдают финальные результаты, что соответствует задержке водяного знака. Например, задержка watermark, равная 1 час, означает, что записи имеют задержку не менее 1 часа перед выдачей далее по конвейеру. Режим обновления приводит к одной записи на триггер на совокупное значение. Поэтому при использовании облачных платформ с оплатой за запись, многократное обновление данных до того, как пройдет задержка водяного знака, становится экономически неэффективно, т.е. дорого.

Режим добавления используется по умолчанию, записывая данные каждый раз, когда изменяется информация о состоянии. Это приводит к многочисленным обновлениям и подходит в большинстве случаев, когда последующим сервисам в потоковом конвейере надо выполнить одно действие для каждой нисходящей записи. Например, сервис уведомлений, отправляет уведомления для каждой новой записи, записанной в приемник. Режим добавления гарантирует, что каждая запись будет записана только один раз, исключая дубли. Если нижестоящие сервисы нуждаются в свежих результатах, следует использовать режим обновления, который гарантирует полную актуальность данных в приемнике. Например, ML-модель, которая считывает характеристики в режиме реального времени, или аналитическая панель, отслеживающая агрегаты на лету.

Стоит помнить, что движок Structured Streaming поддерживает не все операции Apache Spark, причем некоторые потоковые операции не поддерживаются во всех режимах вывода. Помимо выбора режима вывода для ускорения потоковой обработки надо использовать оптимальный оператор: foreach() или foreachBatch(). Эти операторы позволяют применять произвольные операции и логику записи к выходным данным потокового запроса. C foreach() можно настроить логику записи для каждой строки, а с foreachBatch() – применить произвольные операции и настраиваемую логику к выходным данным каждого микропакета. Начиная с Spark 2.4, это поддерживается в Scala, Java и Python. Оператор foreachBatch() принимает два параметра: датафрейм или датасет, который содержит выходные данные микропакета, и его уникальный идентификатор. Поэтому можно повторно использовать существующие источники пакетных данных, т.е. средства записи пакетных данных на выходе каждого микропакета. Такое переиспользование ускоряет обработку. Также foreachBatch() позволяет кэшировать выходной датафрейм или датасет, чтобы записывать его в несколько мест. Это исключает повторное чтение входных и пересчет выходных данных, ускоряя время обработки.

Оператор foreachBatch() не работает с режимом непрерывной обработки, поскольку он в основном полагается на микропакетное выполнение потокового запроса. Для Continuous Processing надо использовать foreach().

Интервал срабатывания триггера потоковой обработки

Apache Spark Structured Streaming обрабатывает данные поэтапно, поэтому управление интервалом запуска для пакетной обработки позволяет использовать движок для разных видов рабочих нагрузок: от вычислений в почти реальном времени до периодических пакетных операций. Структурированная потоковая передача относится к временным интервалам запуска как к микропакетам с фиксированным интервалом. Параметр processingTime позволяет указать длительность времени срабатывания триггера в виде строки, например .trigger(processingTime=’10 seconds’). Если указать слишком малый интервал (менее десятков секунд), система будет выполнять ненужные проверки, чтобы узнать, поступают ли новые данные. Необходимо настроить время обработки так, чтобы сбалансировать требования к задержке и скорость поступления данных в источник.

Опция триггера Trigger.AvailableNow использует все доступные записи как инкрементный пакет с возможностью настройки его размера, что было рассмотрено ранее. По умолчанию Structured Streaming использует фиксированный интервал микропакетов в 500 мс. Рекомендуется всегда задавать этот параметр явно, адаптируя его под каждый конкретный случай, чтобы сократить затраты, связанные с проверкой поступления новых данных и обработкой пакетов недостаточного размера.

Можно изменить интервал срабатывания между запусками, используя одну и ту же контрольную точку. Если работа Structured Streaming останавливается во время обработки микропакета, этот микропакет должен завершиться до применения нового интервала запуска. Таким образом, обработка микропакета с ранее указанными настройками продолжится после изменения интервала запуска. При переходе от временного интервала к Trigger.AvailableNow это может привести к обработке микропакетов до обработки всех доступных записей как инкрементного пакета. При переходе от Trigger.AvailableNow к временному интервалу это может привести к продолжению обработки всех записей, которые были доступны при запуске последнего задания с Trigger.AvailableNow.

Подробнее о выполнении запросов Structured Streaming на уровне Java-кода читайте в нашей новой статье.

Освойте Apache Spark для разработки приложений аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://docs.databricks.com/en/structured-streaming/production.html
  2. https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту