Пользовательское распределение данных для входного потока Lookup Join в Apache Flink 2.0

Flink обучение курсы примеры, Apache Flink SQL, курсы дата-инженеров Flink, Flink 2.0 примеры курсы обучение, Школа Больших Данных

Как Flink SQL позволяет обогащать потоковые данные информацией из внешних систем и статических таблиц, зачем в релизе 2.0  улучшили Lookup Join и каким образом работает эта оптимизация.

Как работает потоковое обогащение в Apache Flink

Для взаимодействия с внешними системами (источниками и приемниками данных) Apache Flink использует коннекторы. Source-коннекторы обеспечивают чтение данных из источников, а sink-коннекторы позволяют записывать результаты во внешние базы и другие хранилища данных. Одним из частных случаев обработки потоковых данных, потребляемых Flink-приложением из потоковых платформ передачи событий, таких как Apache Kafka, является их обогащения информацией из внешних систем. Как это выполнялось раньше, мы разбирали здесь.

Однако, вместо разработки кода на Java, Apache Flink позволяет выполнить обогащение потока данных с помощью SQL-запросов. Для такого обогащения потоковых данных информацией из внешних таблиц или источников используется операция Lookup Join. Это специальный тип соединения, позволяющий обогащать потоковые данные дополнительной информацией, хранящейся во внешних источниках, а также статических или медленно меняющихся таблицах. Он используется, когда поток новых событий поступает очень быстро, а данные справочных таблиц обновляются реже.

Например, есть поток событий, содержащих информацию о заказах и отдельная таблица с информацией о клиентах. Используя Lookup Join, Flink может обогатить каждый потоковый элемент заказа информацией о клиенте, чтобы в дальнейшем проводить аналитику его покупок и рекомендовать наиболее релевантные товары.

Для каждого события, поступающего в поток, Flink выполняет запрос во внешнюю справочную таблицу или внешнюю систему хранения данных. При этом выполняется так называемый lookup-запрос, т.е. поиск по ключу, чтобы получить соответствующие строки из внешней таблицы и присоединить их к потоку в реальном времени с учетом временных данных. Это означает, что Flink выполняет обогащение потока на определенный момент времени, чтобы учитывать изменения данных во времени и выполнять исторический поиск по данным, актуальным на момент события.

Lookup Join позволяет обогатить потоковые данные, не загружая всю внешнюю/справочную таблицу в память Flink.

Однако, производительность такого потокового обогащения снижается, если внешняя таблица медленно отвечает на запросы. Для сокращения задержек возникает потребность в эффективном кэшировании и оптимизации запросов для минимизации задержек. Поэтому Flink поддерживает встроенные механизмы кэширования Lookup Cache, чтобы уменьшить количество повторных запросов во внешнюю таблицу и повысить производительность. Как обычно при работе с кэшем, нужно найти баланс между актуальностью данных и производительностью этого механизма оптимизации, определив подходящий размер и время жизни (TTL, Time To Live).

Для этого можно настроить следующие параметры:

  • cache.max-rows – максимальное количество строк данных, которые могут быть кэшированы. Если количество строк данных в кэше превышает значение этого параметра, самая ранняя строка данных устаревает и заменяется новой строкой данных.
  • cache.ttl – максимальное время жизни (TTL) каждой строки данных в кэше. Если период времени, в течение которого строка данных кэшируется, превышает значение этого параметра, строка данных устаревает.
  • cache.caching-missing-key – следует ли кэшировать пустые результаты запроса;
  • max-повторов – максимальное количество повторных попыток при неудачном запросе к внешнему источнику.

По умолчанию кэширование таблиц измерений отключено. Изменить это можно, настроив параметры lookup.cache.max-rows и lookup.cache.ttl. Тогда будет использоваться политика кэширования LRU (Least Recently Used) – стратегия удаления элементов из кэша, когда он достигает своего максимального размера. При этом вытесняются элементы, к которым в последнее время не было обращений, т.к. ожидается, что вероятность их использования в ближайшем будущем минимальна.

Таким образом, кэширование в Lookup Join позволяет ускорить процесс потокового обогащения данными из внешних источников. Если обращаться к источнику данных по каждой входящей записи, возникают значительные сетевые издержки ввода-вывода и RPC-вызовы. Поэтому большинство коннекторов вводят кэширование на уровне записи. Например, так работает HBase и JDBC. Оптимизация производительности реализуется за счет правильного распределения данных. Одним из важных механизмов управления этим поведением является пользовательское распределение данных (custom partitioning) для входного потока Lookup Join.

Пользовательское распределение данных означает, что разработчик явно задает, каким образом записи будут распределяться по параллельным задачам или экземплярам оператора Flink. Такое распределение позволяет контролировать, что все записи с одинаковыми ключами всегда поступают на одну и ту же параллельную задачу. Так можно повысить производительность и эффективность выполнения Lookup Join за счет улучшения локальности данных и снижения издержек при обращении к внешним источникам.

Раньше, до выпуска Apache Flink 2.0 распределение данных входного потока для Lookup Join было произвольным. Поэтому частота попаданий в кэш иногда была неудовлетворительна. Частично эта проблема решена в FLIP-204 благодаря введению Hash Lookup Join, который распределяет данные в соответствии с хэшем ключей соединения. Это повышает частоту попаданий в кэш. Однако, в некоторых внешних системах сами данные имеют определенную схему распределения. Например, Apache Paimon организует данные в сегменты, где разбиение по ключу соединения не способствует локальности данных.

Таким образом, внешние системы могут иметь различные требования к распределению данных на входной стороне, о которых Flink не знает. Поэтому улучшение FLIP-462, реализованное в Apache Flink 2.0, вводит механизм, с помощью которого коннектор будет сообщать планировщику Flink желаемое распределение данных входного потока или стратегию разбиения. Это должно сократить объем кэшированных данных и повысить производительность Lookup Join.

Однако, просто разбить входные данные на ожидаемые разделы недостаточно. Например, если данные перемешиваются по сегментам, различным экземплярам LookupFunction необходимо знать некоторую информацию о задаче, например, ее фактический параллелизм и максимальный параллелизм, чтобы определить, какой сегмент был назначен для дальнейшей оптимизации и как загружать данные только внутри этого сегмента для кэширования на этапе инициализации. Как это реализовано, рассмотрим далее.

Изменения Lookup Join в релизе 2.0

Чтобы сообщить планировщику Flink желаемое распределение данных входного потока или стратегию разбиения, в релизе 2.0 был добавлен новый интерфейс для LookupTableSource-коннектора и метод, позволяющий LookupContext-коннектору определить, соответствует ли распределение данных ожидаемому.

Для включения этой оптимизации введена новая опция shuffle для подсказки SQL-запроса с использованием Lookup Join. Если shuffle = true, то планировщик таблиц попытается наилучшим образом применить пользовательское перемешивание к входному потоку соединения поиска. Если указать shuffle = true в SELECT-запросе с соединением, когда целевая таблица измерений реализует SupportsLookupCustomShuffle, планировщик постарается наилучшим образом применить пользовательское разбиение для входного потока. Иначе будет применяться хэш-разбиение.

Применительно к ранее рассмотренному примеру обогащения потока заказов историческими данными о клиенте на момент совершения заказа, использование этой подсказки выглядит так:

SELECT /*+ LOOKUP('table'='Customers', 'shuffle'='true') */ o.order_id, o.total, c.country, c.zip
FROM Orders AS o
JOIN Customers FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;

Этот SQL-запрос получает идентификатор заказа order_id и общую сумму заказа total из таблицы заказов Orders, а также страну country и почтовый индекс zip клиента из таблицы клиентов Customers. Таблица клиентов соединяется с таблицей заказов по временному признаку: для каждого заказа берётся информация о клиенте, актуальная именно на момент обработки заказа proc_time. Оптимизационная подсказка дает оптимизатору рекомендации по выбору наиболее оптимального алгоритма соединения, но не является обязательной для выполнения.

Начиная с версии 1.16, Flink SQL включает механизм обработки NDU (недетерминированное обновление). Это позволяет распределенному приложению работать в ситуациях, когда порядок поступления событий или их объединение не гарантирует полностью детерминированных результатов, и когда обновления могут происходить без строгой предсказуемости. Благодаря этому Flink SQL может обрабатывать данные из источников, которые не гарантируют строгой упорядоченности или строгой детерминированности событий. Это особенно важно при интеграции с внешними источниками данных, включая CDC-сценарии экспорта обновлений, где порядок событий не всегда строго определен. Также NDU позволяет реализовывать сложные сценарии преобразования и объединения потоков данных, которые ранее были труднодостижимы из-за строгих ограничений на детерминированность. Это упрощает интеграцию Flink SQL в средах, где данные часто бывают неконсистентными или поступают асинхронно.

Однако, результаты запросов могут варьироваться в зависимости от порядка поступления обновлений. Это осложняет тестирование, отладку и воспроизводимость. А для систем, где нужна строгая согласованность и гарантированность результата, NDU может привести к неточным данным.

Поэтому при включенном режиме TRY_RESOLVE механизм NDU проверяется в потоковом запросе, чтобы устранить проблему, сгенерированную Lookup Join. Для этого требуются входные данные, распределенные по ключам соединения, и часто конфликтующие с пользовательским разделителем. Поэтому в таком случае планировщик не использует пользовательский разделитель, предоставляемый коннектором. Если коннектор предоставляет недетерминированный разделитель, например, случайный разделитель для избегания перекоса, он нарушит следующие предположения потоковых SQL-операторов:

  • события ADD/UPDATE_AFTER всегда происходят до связанных с ними событий UPDATE_BEFORE/DELETE;
  • события ADD/UPDATE_AFTER всегда обрабатываются одной и той же задачей, даже при перемешивании данных. Поэтому планировщик не будет применять пользовательский shuffle-разделитель, если  метод isDeterministic этого разделителя возвращает значение false.

Наконец, необходимость включения пользовательского разделителя при выполнении Lookup Join для планировщика определяется не только типом коннектора, но и другими факторами. Например, для Apache Paimon, планировщик Flink SQL ожидает, что входные данные будут распределены в соответствии с идентификатором контейнера. Поля в таблице Paimon, используемые для вычисления идентификатора контейнера, называются ключом контейнера. Если ключи соединения не полностью покрывают все ключи контейнера, то планировщик не может применить пользовательское перемешивание, поскольку стратегия разбиения не определена должным образом.

Научиться создавать высокопроизводительные приложения пакетной и потоковой аналитики больших данных и машинного обучения с помощью Apache Flink вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

                                                                                Источники

  1. https://cwiki.apache.org/confluence/display/FLINK/FLIP-462+Support+Custom+Data+Distribution+for+Input+Stream+of+Lookup+Join
  2. https://cwiki.apache.org/confluence/display/FLINK/FLIP-204%3A+Introduce+Hash+Lookup+Join
  3. https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/overview/
  4. https://www.alibabacloud.com/help/en/flink/developer-reference/jdbc-connector
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.