2 августа 2024 года вышел свежий релиз Apache Flink. Знакомимся с главными новинками выпуска 1.20 для упрощения потоковой обработки данных в мощных управляемых конвейерах: новые материализованные таблицы, единый механизм слияния файлов для контрольных точек, улучшения DataStream API и пакетных операций.
Улучшения Flink SQL
Начнем с новинок Flink SQL, одной из которых стало введение новой материализованной таблицы для упрощения конвейеров данных и синтаксиса, связанного с каталогами. Новая материализованная таблица предназначена для упрощения разработки конвейеров обработки данных, а также упрощения пакетных и потоковых операций. Она позволяет использовать единообразные SQL-функции и автоматически управлять актуальностью данных. С помощью этой динамической таблицы и единообразных SQL-операторов разработчики могут типовым образом определять пакетные и потоковые преобразования данных, ускорять разработку своих ETL-конвейеров и автоматически управлять планированием задач. Также сделан большой шаг в сторону демократизации фреймворка: теперь дата-инженеру не нужно задумываться о тонкостях различий между потоковой и пакетной обработкой, чтобы напрямую поддерживать потоковые или пакетные задания Flink. Все операции выполняются на материализованных таблицах, что снижает порог входа в технологию, т.к. SQL знаком каждому аналитику.
Кроме того, это улучшение позволяет найти оптимальный компромисс между производительностью и стоимостью, т.к. потоковая нагрузка, которая обеспечивает наименьшую задержку и работает непрерывно подразумевает дорогостоящую инфраструктуру, а вставка новых данных с помощью пакетного задания Flink может привести к их устареванию. Новая материализованная таблица, которая автоматически обновляется в фоновом режиме, позволяет пользователям напрямую указывать требования актуальности данных в самом Flink, не требуя внешней системы. После выполнения DDL-запроса на создание такой таблицы Flink запускает непрерывное потоковое или планирует пакетное задание для периодического выполнения. Решение принимается на основе новой конфигурации materialized-table.refresh-mode.freshness-threshold. Если указанное значение параметра FRESHNESS ниже порогового значения, таблица непрерывно обновляется потоковым заданием.
Например, следующий DDL-запрос создает материализованную таблицу dwd_orders для хранения данных, которые обновляются по мере поступления изменений.
CREATE MATERIALIZED TABLE dwd_orders ( PRIMARY KEY(ds, id) NOT ENFORCED ) PARTITIONED BY (ds) FRESHNESS = INTERVAL '3' MINUTE AS SELECT o.ds o.id, o.order_number, o.user_id, ... FROM orders as o LEFT JOIN products FOR SYSTEM_TIME AS OF proctime() AS prod ON o.product_id = prod.id LEFT JOIN order_pay AS pay ON o.id = pay.order_id and o.ds = pay.ds
Параметр FRESHNESS, равный 3-минутному интервалу указывает, что данные в этой материализованной таблице должны обновляться каждые 3 минуты. При запуске этого запроса значение параметра FRESHNESS будет сравниваться со свойством новой конфигурации materialized-table.refresh-mode.freshness-threshold, и результат сравнения будет определять вид задания Flink: пакетное или потоковое.
Потоковая обработка данных с помощью Apache Flink
Код курса
FLINK
Ближайшая дата курса
28 октября, 2024
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.
А синтаксис управления метаданными в каталоге делает разработку и сопровождение конвейеров еще проще, позволяя использовать DQL-запросы для получения подробных метаданных из существующих каталогов. Изменить эти метаданные, например, свойства или комментарии в указанном каталоге, можно также с помощью SQL-запросов с синтаксисом DDL.
Для сегментирования (бакетирования) данных можно использовать выражение DISTRIBUTED BY. В отличие от ранее существовавшего разделения, с помощью сегментирования можно контролировать количество созданных сегментов и определять стратегию распределения данных, например, хэш. Ранее сегментирование в Apache Flink SQL можно было настроить только с помощью предложения WITH, причем каждый коннектор использовал свой собственный синтаксис для определения сегментов. Это было не очень удобно. С версии 1.20 можно задавать универсальные стратегии сегментирования потоковых и пакетных таблиц.
Например, до версии 1.20 чтобы обрабатывать поток данных из Kafka, в Apache Flink пришлось бы создавать следующую таблицу, задавая количество разделов в свойствах коннектора к источнику данных:
CREATE TABLE KafkaTable ( `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING ) WITH ( 'connector' = 'kafka', 'key.format' = 'json', 'key.fields' = 'user_id;item_id', 'value.format' = 'json', 'properties.num.partitions' = '6', )
С версии Flink 1.20 это можно сделать с помощью выражения DISTRIBUTED BY:
CREATE TABLE KafkaTable ( `ts` TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING ) WITH ( 'connector' = 'kafka', 'key.format' = 'json', 'value.format' = 'json', ) DISTRIBUTED BY (user_id, item_id) INTO 6 BUCKETS
Сегменты обеспечивают балансировку нагрузки во внешней системе хранения данных путем разбиения данных на непересекающиеся подмножества. Но это поведение сильно зависит от семантики базового коннектора. Разработчик может влиять на поведение сегментации, указывая количество сегментов, алгоритм распределения и столбцы, которые используются для расчета целевого сегмента. Разумеется, все компоненты сегментации (количество сегментов, алгоритм распределения и ключевые столбцы) являются необязательными в синтаксисе SQL-запроса на создание таблицы-источника Flink SQL.
Новинки DataStream API и пакетной обработки
Для улучшения stateful-нагрузок и обеспечения их надежности, в Apache Flink 1.20 введен новый унифицированный механизм слияния файлов для контрольных точек и сжатие небольших SST-файлов. Единый механизм слияния файлов для создания контрольных точек позволяет объединять несколько небольших файлов контрольных точек в меньшее количество более крупных файлов, сокращая накладные расходы на метаданные файловой системы.
В рамках истории восстановления Flink Flink записывает периодические метаданные (контрольные точки) в постоянное хранилище, например, BLOB, чтобы восстановить работу stateful-приложения в случае сбоя. Контрольная точка состоит из множества файлов, которые имеют разные типы. Для больших заданий с высоким параллелизмом и множеством операторов количество файлов, созданных во время контрольной точки, может привести к значительной нагрузке на базовое хранилище больших двоичных объектов. При использовании облачных BLOB-хранилищ, таких как S3, ADLS и пр. нагрузка напрямую транслируется в большее количество запросов, что обходится дорого при тарификации за каждый вызов API. При использовании локальных BLOB-хранилищ типа HDFS или Minio такой проблемы не возникает, но возрастает нагрузка на сеть.
Чтобы устранить эти проблемы, Apache Flink 1.20 предлагает несколько конфигураций для управления поведением загрузки состояния, сокращая количество файлов с помощью конфигураций state.checkpoints.file-merging и state.checkpoints.file-merging.max-file-size. Настройка state.checkpoints.file-merging нужна для включения функции, а state.checkpoints.file-merging.max-file-size для управления максимальным размером файла контрольной точки. Примечательно, что эта новая конфигурация применяется ко всем типам файлов состояния и применима к состоянию Flink в целом.
А сжатие небольших SST-файлов решает проблемы с избыточной генерацией небольших файлов. Теперь Flink может объединять эти файлы в фоновом режиме с помощью API встроенного key-value хранилища RocksDB, не полагаясь исключительно на автоматическое сжатие данных в этой БД. Flink теперь периодически сканирует локальные файлы и запускает ручное сжатие, когда небольших файлов становится слишком много.
Еще Flink 1.20 поддерживает полную обработку разделов на API DataStream, что добавляет встроенную поддержку агрегаций в потоках без ключей (агрегации в области подзадач) с помощью API FullPartitionWindow.
Для пакетной обработки введен новый механизм восстановления заданий. Он позволяет возобновлять выполнение пакетных заданий после сбоев JobMaster, избегая необходимости повторного запуска завершенных задач. Хотя Flink давно поддерживает восстановление потоковых заданий после сбоев JobMaster путем перезапуска с контрольных точек, до версии 1.20 пакетное восстановление полагалось на перезапуск заданий с самого начала. Это означало, что весь предыдущий прогресс терялся, что неприемлемо для stateful-нагрузок. Проблема решена в выпуске 1.20: теперь можно восстанавливать состояние всех задач, которые были завершены до сбоя JobMaster. Это также повышает эффективность контрольных точек и надежность управления состоянием в Apache Flink.
Также в версию 1.20 добавлен вывод динамического параллелизма источника в пакетных заданиях для исходного коннектора Hive. Это позволяет коннектору динамически определять параллелизм на основе фактических разделов их с динамическим отсечением. Также введена новая опция конфигурации — table.exec.hive.infer-source-parallelism.mode, чтобы разработчик мог выбирать между статическим и динамическим режимами вывода для исходного параллелизма. По умолчанию режим установлен на значение dynamic. Его можно настроить на статический вывод, установив на static или отключить автоматический вывод параллелизма вообще, задав равным none.
Узнайте больше про возможности Apache Flink для пакетной и потоковой аналитики больших данных и машинного обучения на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники