Лучшие практики работы с DLQ-очередями в Apache Kafka

Лучшие практики работы с DLQ-очередями в Apache Kafka

Недавно мы писали про очереди недоставленных сообщений в Apache Kafka и RabbitMQ. Сегодня поговорим про стратегии обработки ошибок, связанные с DLQ-очередями в Kafka, а также рассмотрим, какие сообщения НЕ надо помещать в Dead Letter Queue.

4 стратегии работы с DLQ-топиками в Apache Kafka

Напомним, в Apache Kafka в очереди недоставленных сообщений (Dead Letter Queue, DLQ) скапливаются сообщения, которые не могут быть обработаны приложением-потребителем. Поскольку Apache Kafka, в отличие от RabbitMQ, не доставляет сообщения приложениям-потребителям, это обычно происходит из-за некорректного формата или схемы данных.

Таким образом, DLQ-очередь позволяет отделить обработку ошибок данных от обработки событий без остановки потокового конвейера. Существует несколько таких стратегий обработки сообщений, хранящихся в DLQ-очереди:

  • повторная обработка, когда некоторые сообщения в DLQ надо обработать снова. Но сперва необходимо решить проблему, т.е. понять, почему сообщения попали в DLQ-очередь. Это можно сделать автоматически с помощью скрипта или разобраться вручную, а затем вернуть ошибку приложению продюсеру с запросом на повторную отправку исправленного сообщения. Здесь же можно применить расширенную аналитику для получения информации о проблемах потока данных в режиме реального времени. Например, использовать ksqlDB для вычислений статистики потока событий в реальном времени, такой как среднее количество сообщений об ошибках в час и других сведений, которые помогут понять характер данных конвейере с Kafka.
  • отбрасывание некорректных сообщений после их предварительного анализа;
  • остановка рабочего процесса, если некорректные сообщения критичны для бизнеса. Это действие может быть автоматизировано или выполняться дата-инженером вручную;
  • игнорирование ошибок, когда DLQ-очередь просто наполняется без принятия каихх-либо мер. Такое бездействие допустимо в некоторых сценариях использования, например, при мониторинге общего поведения приложения Kafka. Но при этом стоит помнить, что топик Kafka не хранит данные вечно, а имеет политику очистки, о чем мы писали здесь и здесь, согласно которой сообщения удаляются из топика по истечении определенного времени. Поэтому стоит следить за DLQ-топиком, чтобы оперативно принимать меры в случае его нетипичного поведения, например, если он начинает наполняться слишком быстро.

Как уже было отмечено выше, помимо использования JMX для мониторинга DLQ-очереди, можно воспользоваться возможностями агрегирования ksqlDB и написать простое потоковое приложение для отслеживания скорости, с которой сообщения записываются в очередь.  Создадим потоковую таблицу, используя стандартный синтаксис SQL-команд:

-- Register stream for each dead letter queue topic.
CREATE STREAM dlq_file_sink_06__01 (MSG VARCHAR) WITH (KAFKA_TOPIC='dlq_file_sink_06__01', VALUE_FORMAT='DELIMITED');
CREATE STREAM dlq_file_sink_06__02 (MSG VARCHAR) WITH (KAFKA_TOPIC='dlq_file_sink_06__02', VALUE_FORMAT='DELIMITED');

-- Consume data from the beginning of the topic
SET 'auto.offset.reset' = 'earliest';

-- Create a monitor stream with additional columns
--  that can be used for subsequent aggregation queries
CREATE STREAM DLQ_MONITOR WITH (VALUE_FORMAT='AVRO') AS \
  SELECT 'dlq_file_sink_06__01' AS SINK_NAME, \
         'Records: ' AS GROUP_COL, \
         MSG \
    FROM dlq_file_sink_06__01;

-- Populate the same monitor stream with records from
--  the second dead letter queue
INSERT INTO DLQ_MONITOR \
  SELECT 'dlq_file_sink_06__02' AS SINK_NAME, \
         'Records: ' AS GROUP_COL, \
         MSG \
    FROM dlq_file_sink_06__02;

-- Create an aggregate view of the number of messages
--  in each dead letter queue per minute window
CREATE TABLE DLQ_MESSAGE_COUNT_PER_MIN AS \
  SELECT TIMESTAMPTOSTRING(WINDOWSTART(),'yyyy-MM-dd HH:mm:ss') AS START_TS, \
         SINK_NAME, \
         GROUP_COL, \
         COUNT(*) AS DLQ_MESSAGE_COUNT \
    FROM DLQ_MONITOR \
          WINDOW TUMBLING (SIZE 1 MINUTE) \
 GROUP BY SINK_NAME, \
          GROUP_COL;

Эта агрегирующая таблица, которая может быть запрошена в интерактивном режиме, фактически представляет собой топик Kafka, данные из которого  можно визуализировать, перенаправив на дэшборд. Также эту ksqlDB-таблицу можно использовать для управления оповещениями. Например, если в DLQ-очередь попадает более 5 сообщений в минуту, дата-инженер должен получать соответствующее уведомление. Для этого  создадим топик DLQ_BREACH, на который подписывается сервис оповещения, и при получении любого события посылает уведомление:

CREATE TABLE DLQ_BREACH AS \
    SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT \
      FROM DLQ_MESSAGE_COUNT_PER_MIN \
     WHERE DLQ_MESSAGE_COUNT>5;

Выборка данных из топика DLQ_BREACH c ksqlDB реализуется через простой SQL-запрос SELECT. Например, нужно получить время создания сообщения в DLQ-очереди, ее название и количество сообщений, попавших туда:

SELECT START_TS, SINK_NAME, DLQ_MESSAGE_COUNT FROM DLQ_BREACH;

Пример реализации DLQ-очередей в Kafka для AVRO-сообщений в приложении Spark Streaming c библиотекой ABRiS смотрите в нашей новой статье.

Лучшие практики для очереди недоставленных писем в Apache Kafka

В реальности часто бывает так, что работа с DLQ-очередями в конвейерах Apache Kafka уходит на последний план и дата-инженеры не обрабатывают сообщения, попавшие в Dead Letter Queue. Или оповещения об ошибках получает только дежурный дата-инженер. Однако, согласно DataOps-подходу, знать об этом должны владельцы данных, а не только команда инфраструктуры. Поэтому оповещение должно уведомлять команду приложения-продюсера о том, что в конвейер поступили некорректные данные, и их нужно повторно отправить, предварительно исправив.

Если же в этом нет необходимости, и получение некорректных данных фактически не влияет на ход и результат бизнес-процессов, встает вопрос о целесообразности существования DLQ-топика в Kafka. Вместо сбора таких сообщений их можно сразу проигнорировать в исходном приложении-потребителе, чтобы сократить нагрузку на сеть, инфраструктуру и другие ресурсы.

Таким образом, при работе с DLQ-очередями в Kafka можно выделить следующие лучше практики:

  • создать дэшборд с оповещениями, которые будут получать все заинтересованные лица (дата-инженер, владелец данных, команда приложения-продюсера) по используемым каналам (электронная почта, оповещения в Slack или другом мессенджере);
  • определить стратегию обработки ошибок для каждого DLQ-топика Kafka, чтобы своевременно остановить конвейер данных, удалить некорректные сообщения или обработать их повторно;
  • отправлять в DLQ только сообщения о фактических ошибках с данными, а не все ошибки. Например, за проблемы с подключением отвечает само приложение-потребитель, а продюсер и Kafka здесь не причем.
  • сохранять исходные сообщения и записывать их в DLQ с дополнительными заголовками, такими как уровень ошибки и время ее возникновения, имя приложения-продюсера и прочие метаданные, которые упростят процессы отладки и повторной обработки событий;
  • определить оптимальное количество DLQ-очередей, поскольку каждый топик Kafka занимает ресурсы. Хранить все ошибок в одном DLQ не удобно для дальнейшего анализа и повторной обработки, однако чрезмерное множество DLQ-топиков тоже сложно поддерживать. Более того, концепция DLQ фактически изменяет порядок обработки входящих данных и значительно усложняет автоматические конвейеры. Поэтому количество и варианты использования DLQ в Kafka надо подбирать под каждый конвейер с учетом его специфики и критичности для бизнеса. В частности, вместо DLQ-топиков в Apache Kafka можно использовать реестр схем для управления данными и предотвращения ошибок. Schema Registry от Confluent позволяет обеспечить очистку данных, чтобы предотвратить ошибки в полезной нагрузке от приложений-продюсеров, предоставляя корректную структуру сообщений и реализуя проверку схемы на стороне клиента. Например, Confluent Server обеспечивает дополнительную проверку схемы на стороне брокера, чтобы отклонить некорректные сообщения от приложения-продюсера, который не использует реестр схем. Подробнее о реестре схем Kafka мы писали здесь.

В заключение рассмотрим пару случаев, когда НЕ надо использовать DLQ-очередь в Kafka. К примеру, DLQ для регулирования пропускной способности конвейера данных, отправляя в очередь недоставленных сообщений часть данные в режиме пиковой нагрузки, нельзя назвать хорошей идеей. Поскольку Kafka не доставляет сообщения приложениям-потребителям, они сами потребляют их в своем темпе. Если объем входящих данных вырос, следует масштабировать потребителей и/или наращивать емкость хранилища Kafka, т.е. увеличивать объем дискового пространства, а не настраивать DLQ.

Аналогично, не следует использовать DLQ для хранения сведений о неудачных подключениях приложения-потребителя к Kafka. Вместо этого нужно решить проблему с подключением, а сообщения будут храниться в обычном топике согласно политике его очистки.

Освойте администрирование и эксплуатацию Apache Kafka и Flink для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

[elementor-template id=»13619″]

Источники

  1. https://www.kai-waehner.de/blog/2022/05/30/error-handling-via-dead-letter-queue-in-apache-kafka/
  2. https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/