18 марта 2024 года вышел очередной релиз Apache Flink. Знакомимся с его главными новинками и разбираемся, чем они полезны для потоковой обработки больших данных: ключевые изменения выпуска 1.19 для разработчика stateful-приложений.
Динамическая настройка параллелизма
Выпуск Apache Flink 1.19 можно назвать значимой вехой, поскольку он не только включает новые функции, улучшения и исправления ошибок, но и закладывает основу для мажорного релиза 2.0. Начнем с главного: теперь разработчик может сам установить динамический параллелизм, т.е. количество задач для выполнения задания, для настройки производительности пакетных заданий с помощью опции scan.parallelism. Напомним, для Flink — параллелизм один из основных факторов, влияющих на производительность приложения. Его увеличение позволяет приложению использовать больше слотов задач, повышая общую пропускную способность и производительность. Параллелизм Flink-приложения можно настроить на разных уровнях: уровень оператора, уровень среды выполнении, уровень клиента и системный уровень. Подробнее об этом мы писали здесь.
Динамическая настройка параллелизма позволяет Flink автоматически регулировать количество запускаемых задач, связанных с источниками данных, в зависимости от рабочей нагрузки, улучшая использование ресурсов и снижая необходимость ручного вмешательства. Впрочем, при необходимости можно по-прежнему контролировать максимальный параллелизм источника с помощью ограничения Execution.batch.adaptive.auto-parallelism.default-source-parallelism. Эта конфигурация используется в качестве верхней границы вывода о параллелизме источника, и теперь ее значение не равно 1 по умолчанию. Если эта конфигурация не указана, фреймворк будет искать ограничение, установленное в параметре Execution.batch.adaptive.auto-parallelism.max-parallelism. Если ни один из этих параметров не задан, по умолчанию используются общие настройки параллелизма по умолчанию, установленные с помощью параметра parallelism.default или метода StreamExecutionEnvironment#setParallelism(). Такая динамическая настройка особенно полезна в облачных средах, где важны масштабируемость и эффективное использование ресурсов. Оптимизируя распределение ресурсов в зависимости от объемов данных в реальном времени, Flink 1.19 обеспечивает более плавное масштабирование приложений, поддерживая высокую производительность даже при нестабильных нагрузках. Source-коннекторы уже могут реализовывать новые интерфейсы, которые обеспечивают эту возможность. Задавать параллелизм для коннектора datagen можно даже сейчас при выполнении DDL-запросов Flink SQL, например, для таблицы Orders задан параллелизм, равный 4:
CREATE TABLE Orders ( order_number BIGINT, price DECIMAL(32,2), buyer ROW<first_name STRING, last_name STRING>, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'datagen', 'scan.parallelism' = '4' );
Также можно задать эту конфигурацию для ранее определенной таблицы, используя параметр Dynamic Table:
SELECT * FROM UserActivityLogs /*+ OPTIONS('scan.parallelism'='4') */;
Потоковая обработка данных с помощью Apache Flink
Код курса
FLINK
Ближайшая дата курса
2 декабря, 2024
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.
Новинки Flink SQL
Еще одним важным улучшением для разработчика стала возможность гибко задавать собственные значения времени жизни (TTL, Time To Live) для состояний, включая регулярные соединения и групповые агрегации непосредственно в SQL-запросах с помощью подсказки STATE_TTL. Установки времени жизни состояния полезна для управления памятью, особенно в потоковой обработке, где система должна поддерживать состояние для окон, соединений и агрегаций. Время жизни состояния позволяет автоматически очищать старые данные, которые больше не нужны для вычислений.
К примеру, в рамках регулярного потокового соединения, чтобы выдавать семантически правильные результаты, Flink необходимо постоянно сохранять обе стороны ввода соединения в состоянии. Так состояние для вычисления результата запроса может бесконечно расти в зависимости от количества различных входных строк всех входных таблиц и результатов промежуточного соединения. TTL позволяет ограничить время жизни состояния для отдельных таблиц в запросе Flink SQL. Разумеется, при этом есть риск получения неверных или неполных результатов.
Хотя функция установки времени жизни состояния индивидуально для каждого stateful-оператора через план, скомпилированный SQL, появилась уже в прошлом релизе Flink, в версии 1.19 это можно сделать без изменения скомпилированного плана выполнения запроса, например, так:
-- set state ttl for join SELECT /*+ STATE_TTL('Orders'= '1d', 'Customers' = '20d') */ * FROM Orders LEFT OUTER JOIN Customers ON Orders.o_custkey = Customers.c_custkey; -- set state ttl for aggregation SELECT /*+ STATE_TTL('o' = '1d') */ o_orderkey, SUM(o_totalprice) AS revenue FROM Orders AS o GROUP BY o_orderkey;
В этом участке кода представлено два запроса Flink SQL, в которых используются директивы для установки времени жизни состояния. Первый запрос — это запрос с левым внешним соединением (LEFT OUTER JOIN) между таблицами Orders и Customers. Директива STATE_TTL указывает, что записи состояния для Orders должны истекать через один день (‘1d’), а записи состояния для Customers должны истекать через двадцать дней (’20d’). Это означает, что соответствующие записи состояния для каждой таблицы будут удаляться из состояния после указанного интервала времени.
Второй запрос — это агрегирующий запрос, который суммирует значения общей суммы o_totalprice для каждого заказа o_orderkey в таблице Orders. Здесь директива STATE_TTL применяется к заказу по псевдониму o, который ссылается на таблицу Orders, указывая, что состояние для агрегации должно истекать через один день (‘1d’). Это означает, что данные, связанные с агрегацией для данного ключа заказа, будут удаляться из состояния после одного дня.
Таким образом, подсказка STATE_TTL в Flink SQL позволяет контролировать использование ресурсов и предотвращать бесконечное увеличение размера состояния, что особенно важно в долго работающих stateful-приложениях потоковой обработки данных.
Еще одним полезным для разработчика нововведением стала возможность использовать именованные параметры при вызове функции или хранимой процедуры. Теперь не нужно строго указывать положение параметра, достаточно указать лишь имя параметра и соответствующее ему значение. А если несущественные параметры не указаны, по умолчанию они заполняются нулевым значением.
С версии 1.19 Flink поддерживает использование оконных табличных функций SESSION (Table Value Function, TVF) в потоковом режиме, обеспечивая более гибкие оконные операции, которые группируют события в сеансы на основе промежутков в активности. Табличные функции в SQL создают таблицу, которую можно использовать в FROM в рамках запроса SELECT. Это позволяет проводить сложный анализ закономерностей данных с течением времени с возможностью разделения потоков данных по определенным атрибутам. Следующий пример показывает, как группировать и агрегировать потоковые данные в динамические сеансы на основе активности, обеспечивая более глубокое понимание временных закономерностей в данных:
-- Defining a session window with partition keys SELECT * FROM TABLE( SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)); -- Performing aggregation on the session-windowed table with partition keys SELECT window_start, window_end, item, SUM(price) AS total_price FROM TABLE( SESSION(TABLE Bid PARTITION BY item, DESCRIPTOR(bidtime), INTERVAL '5' MINUTES)) GROUP BY item, window_start, window_end;
Поддержка входных данных журнала изменений для агрегации оконных TVF особенно полезна для потоков CDC-сценариев, когда изменения происходят из источников данных.
Также во Flink SQL добавлена оптимизация микропакетов для регулярных каскадных соединений. В версии 1.19 новую мини-пакетную оптимизацию можно использовать для обычного соединения, чтобы уменьшить промежуточный результат в таких сценариях каскадного соединения.
Наконец, в Apache Flink 1.19 представлена новая опция env.java.opts.sql-gateway для указания параметров Java, которая позволяет точно настроить параметры памяти, поведение сборки мусора и другие соответствующие параметры Java для SQL Gateway.
Асинхронные скалярные функции и изменение API приемника
Введение интерфейса AsyncScalarFunction в Apache Flink 1.19 позволяет асинхронно выполнять скалярные функции. Традиционно пользовательские функции Flink, такие как ScalarFunction, выполняли операции синхронно. Этот подход хорошо работает для вычислений, связанных с использованием ЦП, но не справляется с операциями, требующими ожидания внешних систем, такими как сетевые вызовы или запросы к базе данных. В таких случаях модель синхронного выполнения может значительно ограничить пропускную способность и снизить общую производительность системы из-за последовательной обработки вызовов, что приводит к простою в ожидании завершения операций ввода-вывода.
Эту проблему решает интерфейс AsyncScalarFunction, который позволяет асинхронно выполнять скалярные функции. Теперь операции, которые обычно блокировали основной поток обработки в ожидании ответа, могут выполняться параллельно, позволяя основному потоку продолжить выполнение других задач. Этот параллелизм особенно выгоден для приложений, которые часто взаимодействуют с медленными внешними системами или службами.
Также в Apache Flink 1.19 внесены значительные улучшения в API-интерфейсы приемника посредством контекста инициализации для создания коммиттера в TwoPhaseCommittingSink и синхронизации с API источника. Это позволит разработчикам коннекторов улучшить интеграцию между разными системами.
Еще исправлена проблема невозможности отправлять метрики от коммиттера: теперь можно включать метрики, связанные с фиксацией, такие как ее продолжительность, размер и количество зафиксированных файлов данных. Такое предоставление более богатого контекста инициализации для коммиттеров позволяет разработчикам лучше понимать особенности процесса фиксации данных, чтобы улучшить мониторинг и оптимизацию производительности их потоковых приложений. Для этого в методе изменена параметризация: добавлен новый параметр CommitterInitContext. Исходный метод останется доступным в версии 1.19, но в последующих выпусках он будет удален. Также устранены ограничения интерфейса WithPreCommitTopology, который изначально ограничивал преобразование и управление фиксируемыми сообщениями, особенно в сценариях, требующих агрегирования результатов записи в единый фиксируемый объект.
Немного изменена работа с контрольными точками: добавлен метод SplitEnumeratorContext#setIsProcessingBacklog, который позволяет указать приоритет в обработке записи в сторону низкой задержки или высокой пропускной способности. Разработчики коннектора могут обновить реализацию источника, чтобы использовать этот метод для сообщения о том, являются ли записи невыполненными. Параметр execution.checkpointing.interval-during-backlog можно настроить на использование большего интервала между контрольными точками, чтобы повысить пропускную способность во время обработки задания, если источник знает о невыполненном задании.
Прочие изменения
Для повышения удобства администрирования Flink-приложений YAML-файл конфигурации по умолчанию изменен на config.yaml вместо flink-conf.yaml, который будет удален в выпуске 2.0. Также улучшена работа профилировщика: в версии 1.19 встроен мощный кроссплатформенный профилировщик Java, доступный непосредственно из веб-интерфейса Flink. Он позволяет диагностировать и анализировать узкие места производительности на уровнях JobManager и TaskManager. Предоставляя подробную информацию о поведении потоковых приложений во время выполнения, эта функция позволяет разработчикам и администраторам более эффективно выявлять и устранять проблемы с производительностью, обеспечивая более плавную и надежную работу приложений потоковой обработки данных.
Также добавлен интерфейс TraceReporter, чтобы устранить ранее ограниченную видимость механизмов проверки и восстановления Flink. Отчеты о трассировках или интервалах критических процессов улучшают мониторинг работы приложения. Это использовано при добавлении OpenTelemetryTraceReporter и OpenTelemetryMetricReporter для плотной интеграции OpenTelemetry в Flink, чтобы предоставлять разработчикам подробную информацию о поведении потоковых приложений.
Примечательно, что теперь прикреплять пользовательские метрики к интервалам восстановления, чтобы проводить более детальный анализ операций восстановления, позволяя разработчикам и администраторам отслеживать определенные атрибуты, такие как время загрузки и скорость приема данных.
Кроме того, Apache Flink 1.19 представляет бета-поддержку Java 21, позволяя компилировать и запускать фреймворк, используя последнюю версию Java LTS. Это обновление облегчает доступ к новейшим функциям и улучшениям Java, направленным на повышение производительности и безопасности приложений при сохранении совместимости с текущими стандартами Java.
Наконец, в рамках подготовки к релизу 2.0, который планируется выпустить в этом году, в версии 1.19 официально объявлены устаревшими целый ряд API и добавлены рекомендации по их изменению. В частности, не рекомендуется жесткая регистрация сериализаторов на уровня экземпляра, вместо нее используются сериализаторы уровня класса, предпочтительно использовать методы get() и set() с ключом вместо getXxx() и setXxx(). Также org.apache.flink.configuration.AkkaOptions устарел и заменен на RpcOptions.
О новинках выпуска 1.20 читайте в нашей новой статье.
Узнайте больше про использование Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных и машинного обучения на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники