Лучшие практики работы с DLQ-очередями в Apache Kafka

DLQ Kafka, очередь недоставленных сообщений в Kafka примеры курсы обучение, курсы по Apache Kafka, Kafka для разработчиков, обучение Kafka, Apache Kafka Dead Letter Queue, обучение большим данным, курсы Big Data, Школа Больших Данных Учебный Центр Коммерсант

Недавно мы писали про очереди недоставленных сообщений в Apache Kafka и RabbitMQ. Сегодня поговорим про стратегии обработки ошибок, связанные с DLQ-очередями в Kafka, а также рассмотрим, какие сообщения НЕ надо помещать в Dead Letter Queue.

4 стратегии работы с DLQ-топиками в Apache Kafka

Напомним, в Apache Kafka в очереди недоставленных сообщений (Dead Letter Queue, DLQ) скапливаются сообщения, которые не могут быть обработаны приложением-потребителем. Поскольку Apache Kafka, в отличие от RabbitMQ, не доставляет сообщения приложениям-потребителям, это обычно происходит из-за некорректного формата или схемы данных.

Таким образом, DLQ-очередь позволяет отделить обработку ошибок данных от обработки событий без остановки потокового конвейера. Существует несколько таких стратегий обработки сообщений, хранящихся в DLQ-очереди:

  • повторная обработка, когда некоторые сообщения в DLQ надо обработать снова. Но сперва необходимо решить проблему, т.е. понять, почему сообщения попали в DLQ-очередь. Это можно сделать автоматически с помощью скрипта или разобраться вручную, а затем вернуть ошибку приложению продюсеру с запросом на повторную отправку исправленного сообщения. Здесь же можно применить расширенную аналитику для получения информации о проблемах потока данных в режиме реального времени. Например, использовать ksqlDB для вычислений статистики потока событий в реальном времени, такой как среднее количество сообщений об ошибках в час и других сведений, которые помогут понять характер данных конвейере с Kafka.
  • отбрасывание некорректных сообщений после их предварительного анализа;
  • остановка рабочего процесса, если некорректные сообщения критичны для бизнеса. Это действие может быть автоматизировано или выполняться дата-инженером вручную;
  • игнорирование ошибок, когда DLQ-очередь просто наполняется без принятия каихх-либо мер. Такое бездействие допустимо в некоторых сценариях использования, например, при мониторинге общего поведения приложения Kafka. Но при этом стоит помнить, что топик Kafka не хранит данные вечно, а имеет политику очистки, о чем мы писали здесь и здесь, согласно которой сообщения удаляются из топика по истечении определенного времени. Поэтому стоит следить за DLQ-топиком, чтобы оперативно принимать меры в случае его нетипичного поведения, например, если он начинает наполняться слишком быстро.

Как уже было отмечено выше, помимо использования JMX для мониторинга DLQ-очереди, можно воспользоваться возможностями агрегирования ksqlDB и написать простое потоковое приложение для отслеживания скорости, с которой сообщения записываются в очередь.  Создадим потоковую таблицу, используя стандартный синтаксис SQL-команд:

-- Register stream for each dead letter queue topic.
CREATE STREAM dlq_file_sink_06__01 (MSG VARCHAR) WITH (KAFKA_TOPIC='dlq_file_sink_06__01', VALUE_FORMAT='DELIMITED');
CREATE STREAM dlq_file_sink_06__02 (MSG VARCHAR) WITH (KAFKA_TOPIC='dlq_file_sink_06__02', VALUE_FORMAT='DELIMITED');

-- Consume data from the beginning of the topic
SET 'auto.offset.reset' = 'earliest';

-- Create a monitor stream with additional columns
--  that can be used for subsequent aggregation queries
CREATE STREAM DLQ_MONITOR WITH (VALUE_FORMAT='AVRO') AS \
  SELECT 'dlq_file_sink_06__01' AS SINK_NAME, \
         'Records: ' AS GROUP_COL, \
         MSG \
    FROM dlq_file_sink_06__01;

-- Populate the same monitor stream with records from
--  the second dead letter queue
INSERT INTO DLQ_MONITOR \
  SELECT 'dlq_file_sink_06__02' AS SINK_NAME, \
         'Records: ' AS GROUP_COL, \
         MSG \
    FROM dlq_file_sink_06__02;

-- Create an aggregate view of the number of messages
--  in each dead letter queue per minute window
CREATE TABLE DLQ_MESSAGE_COUNT_PER_MIN AS \
  SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS START_TS, \
         SINK_NAME, \
         GROUP_COL, \
         COUNT(*) AS DLQ_MESSAGE_COUNT \
    FROM DLQ_MONITOR \
          WINDOW TUMBLING (SIZE 1 MINUTE) \
 GROUP BY SINK_NAME, \
          GROUP_COL;

Эта агрегирующая таблица, которая может быть запрошена в интерактивном режиме, фактически представляет собой топик Kafka, данные из которого  можно визуализировать, перенаправив на дэшборд. Также эту ksqlDB-таблицу можно использовать для управления оповещениями. Например, если в DLQ-очередь попадает более 5 сообщений в минуту, дата-инженер должен получать соответствующее уведомление. Для этого  создадим топик DLQ_BREACH, на который подписывается сервис оповещения, и при получении любого события посылает уведомление:

CREATE TABLE DLQ_BREACH AS \
    SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT \
      FROM DLQ_MESSAGE_COUNT_PER_MIN \
     WHERE DLQ_MESSAGE_COUNT>5;

Выборка данных из топика DLQ_BREACH c ksqlDB реализуется через простой SQL-запрос SELECT. Например, нужно получить время создания сообщения в DLQ-очереди, ее название и количество сообщений, попавших туда:

SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT FROM DLQ_BREACH;

Пример реализации DLQ-очередей в Kafka для AVRO-сообщений в приложении Spark Streaming c библиотекой ABRiS смотрите в нашей новой статье.

Лучшие практики для очереди недоставленных писем в Apache Kafka

В реальности часто бывает так, что работа с DLQ-очередями в конвейерах Apache Kafka уходит на последний план и дата-инженеры не обрабатывают сообщения, попавшие в Dead Letter Queue. Или оповещения об ошибках получает только дежурный дата-инженер. Однако, согласно DataOps-подходу, знать об этом должны владельцы данных, а не только команда инфраструктуры. Поэтому оповещение должно уведомлять команду приложения-продюсера о том, что в конвейер поступили некорректные данные, и их нужно повторно отправить, предварительно исправив.

Если же в этом нет необходимости, и получение некорректных данных фактически не влияет на ход и результат бизнес-процессов, встает вопрос о целесообразности существования DLQ-топика в Kafka. Вместо сбора таких сообщений их можно сразу проигнорировать в исходном приложении-потребителе, чтобы сократить нагрузку на сеть, инфраструктуру и другие ресурсы.

Таким образом, при работе с DLQ-очередями в Kafka можно выделить следующие лучше практики:

  • создать дэшборд с оповещениями, которые будут получать все заинтересованные лица (дата-инженер, владелец данных, команда приложения-продюсера) по используемым каналам (электронная почта, оповещения в Slack или другом мессенджере);
  • определить стратегию обработки ошибок для каждого DLQ-топика Kafka, чтобы своевременно остановить конвейер данных, удалить некорректные сообщения или обработать их повторно;
  • отправлять в DLQ только сообщения о фактических ошибках с данными, а не все ошибки. Например, за проблемы с подключением отвечает само приложение-потребитель, а продюсер и Kafka здесь не причем.
  • сохранять исходные сообщения и записывать их в DLQ с дополнительными заголовками, такими как уровень ошибки и время ее возникновения, имя приложения-продюсера и прочие метаданные, которые упростят процессы отладки и повторной обработки событий;
  • определить оптимальное количество DLQ-очередей, поскольку каждый топик Kafka занимает ресурсы. Хранить все ошибок в одном DLQ не удобно для дальнейшего анализа и повторной обработки, однако чрезмерное множество DLQ-топиков тоже сложно поддерживать. Более того, концепция DLQ фактически изменяет порядок обработки входящих данных и значительно усложняет автоматические конвейеры. Поэтому количество и варианты использования DLQ в Kafka надо подбирать под каждый конвейер с учетом его специфики и критичности для бизнеса. В частности, вместо DLQ-топиков в Apache Kafka можно использовать реестр схем для управления данными и предотвращения ошибок. Schema Registry от Confluent позволяет обеспечить очистку данных, чтобы предотвратить ошибки в полезной нагрузке от приложений-продюсеров, предоставляя корректную структуру сообщений и реализуя проверку схемы на стороне клиента. Например, Confluent Server обеспечивает дополнительную проверку схемы на стороне брокера, чтобы отклонить некорректные сообщения от приложения-продюсера, который не использует реестр схем. Подробнее о реестре схем Kafka мы писали здесь.

В заключение рассмотрим пару случаев, когда НЕ надо использовать DLQ-очередь в Kafka. К примеру, DLQ для регулирования пропускной способности конвейера данных, отправляя в очередь недоставленных сообщений часть данные в режиме пиковой нагрузки, нельзя назвать хорошей идеей. Поскольку Kafka не доставляет сообщения приложениям-потребителям, они сами потребляют их в своем темпе. Если объем входящих данных вырос, следует масштабировать потребителей и/или наращивать емкость хранилища Kafka, т.е. увеличивать объем дискового пространства, а не настраивать DLQ.

Аналогично, не следует использовать DLQ для хранения сведений о неудачных подключениях приложения-потребителя к Kafka. Вместо этого нужно решить проблему с подключением, а сообщения будут храниться в обычном топике согласно политике его очистки.

Освойте администрирование и эксплуатацию Apache Kafka и Flink для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://www.kai-waehner.de/blog/2022/05/30/error-handling-via-dead-letter-queue-in-apache-kafka/
  2. https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/
Поиск по сайту