Что такое проблема недетерминированного поведения, почему она так важна в потоковой обработке данных и как Apache Flink борется с ней: недетерминированные и динамические функции, а также changelog stateful-операторов.
Недетерминированные функции в Apache Flink
В потоковой обработке данных, на которую ориентирован Apache Flink, все завязано на отметку времени события (timestamp). Однако, ее вычисление – не самая тривиальная задача, поскольку основано на функциях работы со временем, которые являются недетерминированными, т.е. могут возвращать разные результаты при одних и тех же входных значениях. Впрочем, подобная ситуация может возникать и при пакетных сценариях.
В классическом пакетном сценарии повторное выполнение одного и того же запроса для заданного набора данных дает идентичные результаты. Однако, на практике не все запросы являются детерминированными. Как правило, отсутствие детерминизма в пакетной обработке вызвано недетерминированными функциями. В Apache Flink есть несколько таких функций, которые могут возвращать разные результаты при каждом вызове, даже если входные данные остаются неизменными. Flink наследует определение функций от Apache Calcite, фреймворка с открытым исходным кодом для создания баз данных и систем управления данными. В Calcite есть недетерминированные функции и динамические функции. Недетерминированные функции выполняются во время выполнения в кластерах, оцениваются по записи, а динамические функции определяют соответствующие значения только при генерации плана запроса. Динамические функции не выполняются во время выполнения, поэтому в разное время они вычисляют разные значения, но дают одни и те же значения при одном и том же выполнении.
Детерминированные функции в Apache Calcite возвращают true при вызове метода isDeterministic():
public boolean isDeterministic() { return true; }
Динамические функции в Apache Calcite возвращают true при вызове метода isDynamicFunction():
public boolean isDynamicFunction() { return false; }
Метод isDynamicFunction() применим только для системных функций, т.е. не пользовательских. Динамические функции оцениваются только в начале запроса, только во время планирования для пакетного режима. Для потокового режима это эквивалентно недетерминированной функции, поскольку запрос непрерывно выполняется логически, что соответствует абстракции непрерывного запроса по динамическим таблицам. Поэтому динамические функции также повторно оцениваются для каждого выполнения запроса, что эквивалентно для каждой записи в пакетном режиме. Если метод isDeterministic() возвращает false, это указывает на недетерминированность функции, которая будет оцениваться для каждой записи во время выполнения.
Следующие системные функции в Apache Flink всегда являются недетерминированными, т.е. оцениваются для каждой записи во время выполнения как в пакетном, так и в потоковом режимах:
- UUID() — генерация уникальных идентификаторов строк в виде 128-битных значений;
- RAND() – генерация случайного вещественного числа с плавающей запятой в диапазоне от 0.0 (включительно) до 1.0 (не включительно);
- RAND_INTEGER() – генерация случайного целого числа в заданном диапазоне;
- CURRENT_DATABASE() – возвращает имя текущей базы данных, в контексте которой выполняется запрос, что полезно для динамического определения контекста выполнения запросов;
- UNIX_TIMESTAMP() – возвращает текущее время в виде количества секунд, прошедших с начала эпохи Unix (1 января 1970 года, 00:00:00 UTC), что часто используется для меток времени и временных вычислений.
- CURRENT_ROW_TIMESTAMP() – возвращает отметку времени текущей строки во временных таблицах, что полезно для работы с временными данными, где важно знать точное время поступления или обработки каждой строки.
Для работы с временными данными в потоковой обработке используются соответствующие функции, которые позволяют получить текущие временные метки в нужном формате. Эти системные функции работы со временем в Apache Flink являются динамическими. Они предварительно оцениваются во время планирования (запуск запроса) для пакетного режима и для каждой записи для потокового режима. К таким функциям относятся следующие:
- CURRENT_DATE() — возвращает текущую дату в формате YYYY-MM-DD;
- CURRENT_TIME() — возвращает текущее время в формате HH:MM:SS;
- CURRENT_TIMESTAMP() — возвращает текущую дату и время в формате YYYY-MM-DD HH:MM:SS;
- NOW() — синоним функции CURRENT_TIMESTAMP(), возвращает текущую дату и время в формате YYYY-MM-DD HH:MM:SS;
- LOCALTIME() — возвращает текущее время в формате HH:MM:SS, но с учетом локального часового пояса;
- LOCALTIMESTAMP() — возвращает текущую дату и время в формате YYYY-MM-DD HH:MM:SS, но с учетом локального часового пояса.
Основное различие между потоковой и пакетной обработкой заключается в неограниченности данных. Flink SQL абстрагирует потоковую обработку как непрерывный запрос к динамическим таблицам. Поэтому динамическая функция в примере пакетного запроса эквивалентна недетерминированной функции в потоковой обработке, где логически каждое изменение в базовой таблице запускает выполнение запроса. Если таблица заполняется динамически данными из топика Kafka, куда непрерывно публикуются новые сообщения, тот же запрос с функциями времени вернет новые данные.
Особенности детерминизма и его отсутствия
Помимо недетерминированности самой функции, есть и другие факторы, которые могут генерировать недетерминизм:
- недетерминированное обратное считывание коннектора-источника;
- запрос на основе времени обработки;
- очистка внутренних данных о состоянии на основе TTL
Для Flink SQL предоставляемый детерминизм ограничивается только вычислениями, поскольку он не хранит сами пользовательские данные. Поэтому реализация коннектора-источника, которая не может обеспечить детерминированное обратное чтение, приведет к недетерминированности входных данных и недетерминированным результатам. Обычными примерами являются несогласованные данные для нескольких чтений с одного и того же смещения в топике Kafka или запросы на данные, которые больше не существуют из-за времени хранения, например, когда в топике Kafka сработала политика очистки, а запрошенные данные находились за пределами заданного значения конфигурации retention.time.
В отличие от времени события, время обработки основано на локальном времени машины, и эта обработка не обеспечивает детерминизма в связи с распределенной природой Flink-приложений в кластере на несколько узлов. Это затрагивает следующие операции: Window Aggregation, Interval Join, Temporal Join и Lookup Join, где недетерминизм возникает, когда доступная внешняя таблица изменяется с течением времени.
Из-за неограниченной природы потоковой обработки внутренние данные состояния, поддерживаемые длительными потоковыми запросами в таких операциях, как Regular Join и Group Aggregation (неоконная агрегация), могут постоянно увеличиваться. Избежать такого некотролируемого роста позволяет установка времени жизни (TTL, Time To Life) состояния для очистки внутренних данных. Но этот компромисс также может привести к недетерминированным вычислениям, что чревато некорректными результатами или ошибками времени выполнения.
Flink SQL реализует полный механизм инкрементального обновления на основе абстракции непрерывного запроса к динамическим таблицам. Все операции, которые должны генерировать инкрементальные сообщения, поддерживают полные данные внутреннего состояния, а работа всего конвейера запросов, включая полный DAG от источника до приемника, опирается на гарантию корректной доставки сообщений обновления между операторами. Эта гарантия может быть нарушена недетерминизмом, приводящим к ошибкам. Особенно часто к такому приводит недетерминированное обновление.
Сообщения об обновлении (changelog) могут содержать следующие типы изменений:
- Insert (I) – вставка новых данных;
- Delete (D) – удаление существующих данных;
- Update_Before (UB) – состояние данных перед их изменением. Оно содержит значения полей, которые имели место до выполнения операции обновления. Это нужно, чтобы восстановить прежнее состояние данных, а также для аудита и отслеживания изменений.
- Update_After (UA) – состояние данных после их изменения. Оно содержит значения полей, которые данные приобрели после выполнения операции обновления, позволяя понять, какие именно изменения были внесены.
Если выполняется только вставка новых данных, проблем с недетерминированным обновлением не возникает. Когда в журнале изменений есть сообщение об обновлении, содержащее по крайней мере одно сообщение D, UB, UA в дополнение к I, ключ обновления сообщения, который можно рассматривать как первичный ключ changelog, выводится из запроса на основании следующих соображений:
- когда ключ обновления может быть выведен, операторы в конвейере поддерживают внутреннее состояние с помощью ключа обновления;
- когда ключ обновления не может быть выведен, если первичный ключ не определен в исходной CDC-таблице или таблице приемника, или некоторые операции не могут быть выведены из семантики запроса, тогда все stateful-операторы обрабатывают сообщения обновления (D/UB/UA) только через полные строки. При этом операции удаления выполняются по полным строкам.
Таким образом, в режиме обновления по строкам все сообщения об обновлении, полученные stateful-операторами, не могут быть затронуты недетерминированными значениями столбцов, т.к. это вызовет проблемы недетерминированного обновления, приводящие к ошибкам вычислений. Дополнительной сложностью проблемы недетерминированного обновления в потоковых запросах становится ее неочевидность и риск возникновения из-за небольшого изменения в сложном запросе. Поэтому начиная с версии 1.16, Flink SQL вводит экспериментальный механизм обработки недетерминированных обновлений table.optimizer.non-deterministic-update.strategy. Когда для этого параметра установлено значение TRY_RESOLVE, он проверяет, есть ли проблема недетерминированных обновлений в потоковом запросе, и пытается ее устранить проблему, добавив внутреннюю материализацию. Если это невозможно устранить таким образом, Flink SQL выдаст подробные сообщения об ошибках, чтобы разработчик мог скорректировать запрос и избежать недетерминизма.
Поэтому рекомендуется установить конфигурации table.optimizer.non-deterministic-update.strategy, по умолчанию равной INGNORE, значение TRY_RESOLVE перед запуском потокового запроса. Также полезно посмотреть план выполнения запроса с недетерминированной функцией, добавив в него оператор EXPLAIN. Например, если в запросе используется недетерминированная функция now(), вместо нее нужно применить другую, явно задав в таблице-источнике столбец со временем события и привязав вычисления к нему. При использовании Lookup Join можно объявить первичный ключ, если он существует. Также нужно убедиться, что таблица источника поиска является чисто статической и не обновляется.
Подводя итог описания проблем недетерминированного поведения потоковых запросов, хочется подчеркнуть, что в большинстве случаев разработчик сам является ее причиной, используя недетерминированные функции в своем коде. Поэтому при проектировании и реализации Flink-приложения надо очень тщательно подходить к выбору операторов, особенно тех, которые предполагают сохранение состояния. Подробнее о них мы писали здесь.
Узнайте больше про возможности Apache Flink для пакетной и потоковой аналитики больших данных и машинного обучения на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники