Управляемые повторы отправки сообщений из Apache Kafka c фреймворком Sping

Управляемые повторы отправки сообщений из Apache Kafka c фреймворком Sping

    Специально для обучения разработчиков распределенных приложений и дата-инженеров, рассмотрим практический пример использования возможностей фреймворка Spring для управления повторными попытками отправки сообщений потребителям из топика Apache Kafka.

    Повторные попытки отправки сообщений и Spring для Apache Kafka

    Довольно часто Kafka-приложения требуют высокой надежности обработки сообщений. Например, в финтех- или медтех-проектах, а также системах интернета вещей и платформах информационной безопасности. Например, данные о сетевом трафике клиентов, угрозах, приложениях и устройствах пользователей в режиме реального времени интегрируются в топики Apache Kafka, откуда потом извлекаются, обогащаются и отправляются на настроенные конечные точки систем-приемников. Доступность сторонних систем-приемников зависит от внешней инфраструктуры, а сеть передачи данных может быть ненадежной. При недоступности приемника или невозможности внешней системы обработать запрос потока данных, сообщения просто теряются.

    В Apache Kafka потерянные сообщения попадают в соответствующую очередь или топик недоставленных сообщений (Dead letter queue/topic, DLT). Поэтому для повторной отправки сообщений получателю следует работать именно с этим топиком. Рассмотрим, как это сделать, используя популярный open-source фреймворк разработки Java-приложений Spring. Он позволяет реализовать наиболее популярные элементы типовых комплексных приложений: от управления транзакциями до работы с сообщениями. Проект Spring для Apache Kafka применяет основные концепции фреймворка Spring к разработке решений для обмена сообщениями, предоставляя соответствующие высокоуровневые абстракции и шаблоны.

    Spring Kafka предоставляет функцию повторной попытки с помощью шаблона RetryTemplate с RetryingMessageListenerAdapter или, в последних версиях, путем настройки обработчика ошибок. Например, чтобы настроить топик повторных попыток и DLT для аннотированного метода @KafkaListener, разработчику нужно просто добавить к нему аннотацию @RetryableTopic, и Spring для Apache Kafka загрузит все необходимые топики и потребителей с конфигурациями по умолчанию:

    @RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
    @KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
    public void processMessage(MyPojo message) {
            // ... message processing
    }

    Для обработки сообщений DLT можно указать метод в том же классе, добавив к нему аннотацию @DltHandler. Если метод DltHandler не указан, создается потребитель по умолчанию, который регистрирует только потребление.

    @DltHandler
    public void processMessage(MyPojo message) {
    // ... message processing, persistence, etc
    }

    Важно помнить, что без указания имени шаблона Kafka (kafkaTemplate), будет найден bean-компонент с именем retryTopicDefaultKafkaTemplate. Если bean-компонент не найден, генерируется исключение. В Spring бины (bean) − это классы, созданием экземпляров которых и установкой в них зависимостей управляет контейнер фреймворка. Бины предназначены для реализации бизнес-логики приложения. Bean воплощает паттерн проектирования «одиночка» (singleton), т.е. в некотором блоке приложения существует только один экземпляр данного класса. Поэтому, если бин содержит изменяемые данные в полях, т.е. имеет состояние, то обращение к таким данным необходимо синхронизировать. Возвращаясь к повторным попыткам отправки сообщений в Kafka с помощью Spring, отметим, что можно настроить поддержку неблокирующих повторных попыток, создав bean-компоненты RetryTopicConfiguration в аннотированном классе @Configuration.

    @Bean
    public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
        return RetryTopicConfigurationBuilder
                .newInstance()
                .create(template);
    }

    Это создаст топики повторных попыток и DLT, а также соответствующих потребителей для всех топиков в методах, аннотированных с помощью @KafkaListener, с использованием конфигураций по умолчанию. Экземпляр KafkaTemplate необходим для пересылки сообщений. Для более детального контроля над обработкой неблокирующих повторных попыток для каждого топика, можно предоставить более одного bean-компонента RetryTopicConfiguration.

    Потребители топика повторных попыток и DLT будут назначены группе потребителей с идентификатором группы, который представляет собой комбинацию того, что указано в параметре groupId аннотации @KafkaListener с суффиксом топика. Если ничего не задано, все они будут принадлежать к одной и той же группе, а перебалансировка в топике повторной попытки вызовет ненужную перебалансировку в основном топике.

    Если потребитель настроен с помощью ErrorHandlingDeserializer, для обработки исключений десериализации важно настроить KafkaTemplate и его продюсера с сериализатором, который может обрабатывать обычные объекты, а также необработанные значения byte[], возникающие в результате исключений десериализации. Общий тип значения шаблона должен быть Object. Один из методов заключается в использовании DelegatingByTypeSerializer:

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
      return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
        new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
              MyNormalObject.class, new JsonSerializer<Object>())));
    }
    
    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
      return new KafkaTemplate<>(producerFactory());
    }

    Для одного и того же топика можно использовать несколько аннотаций @KafkaListener с ручным назначением разделов или без него вместе с неблокирующими повторными попытками. Но конфигурация топика при этом не меняется. Поэтому лучше использовать один bean-компонент RetryTopicConfiguration для настройки таких топиков. Если для одного и того же топика используется несколько аннотаций @RetryableTopic, все они должны иметь одинаковые значения, иначе одна из них будет применена ко всем слушателям этого топика, а значения других аннотаций будут проигнорированы.

    Хотя такой подход к повторным попыткам сохраняет гарантии порядка доставки сообщений, повторная попытка и ожидание задержки повтора выполняется в приложении-потребителе Kafka. Каждый сбой блокирует обработку назначенных разделов топика, вызывая отставание потребителя, что неприемлемо для топиков с высокой пропускной способностью. Кроме того, это не решает проблему, когда сообщение Kafka не может быть обработано в течение длительного периода времени. Обойти это ограничение поможет возможность, появившаяся в Spring Kafka 2.7, о чем мы поговорим далее.

    Неблокирующие повторные попытки

    В Spring Kafka 2.7 появилась новая функция неблокирующих повторных попыток, которая использует дополнительные топики Kafka с повторными попытками, куда отправляются необработанные сообщения для последующей повторной доставки. Количество топиков и повторных попыток является произвольным, и для каждого можно настроить различную задержку. Однако, поскольку повторные попытки не блокируются, гарантии порядка сообщений Kafka больше невозможны.

    Когда топик настроен на повторную попытку, а слушатель, обрабатывающий сообщение, выдает исключение, настроенный механизм восстановления после ошибок выполняет следующие шаги:

    • вычисляет время, когда сообщение должно быть обработано снова;
    • создает повторное сообщение как копию исходного с дополнительными заголовками (время выполнения сообщения, сведения об исключении и пр.);
    • вычисляет следующий топик повтора и публикует в нем это сообщение;
    • если количество попыток исчерпано, сообщение удаляется или отправляется в топик DLT, чтобы обработать его позже при необходимости.

    Spring Kafka автоматически создает контейнеры слушателя сообщений для топиков повторных попыток и DLT. Эти контейнеры по умолчанию используют тот же слушатель сообщений и конфигурацию, что и основной топик. Слушатели обёрнуты в специальные адаптеры, реализующие отложенную обработку сообщений следующим образом:

    • проверяется время выполнения входящего сообщения в KafkaBackoffAwareMessageListenerAdapter — если оно уже прошло, сообщение передается слушателю;
    • если положенное время еще не наступило, адаптер заставляет диспетчер отсрочки PartitionPausingBackoffManager выдавать исключение KafkaBackoffException с информацией о том, насколько должна быть отложена обработка;
    • смещение раздела сбрасывается, и потребитель Kafka приостанавливается на требуемый период времени;
    • как только потребитель активизируется, сообщение доставляется снова.

    Таким образом, при сбое отправки сообщения в конкретный приемник отправитель создает сообщение для восстановления и публикует его в разделе очереди восстановления. Сообщение содержит полную полезную нагрузку для отправки и идентификации системы-приемника. DLT-слушатель Kafka получает сообщение о сбое, и немедленно пытается выполнить повторную доставку. Если это не удается, сообщение передается в следующую очередь восстановления с возрастающей задержкой до тех пор, пока оно не будет успешно отправлено или не исчерпается количество попыток.

    Самый простой способ настроить топик с повторами — использовать аннотацию @RetryableTopic в методе слушателя сообщений вместе с аннотацией @KafkaListener. В аннотации можно настроить большинство свойств повторных попыток: количество, автоматическое создание топика и стратегию отсрочки для расчета задержек доставки.

    Альтернативой аннотации является создание bean-компонентов RetryTopicConfiguration, которые обеспечивают более точную настройку:

    @Bean
    public RetryTopicConfiguration recoveryQueueConfiguration(
                          @Qualifier(RecoveryQueueConfig.TEMPLATE) KafkaTemplate<String, RecoveryMessage> template,
                          @Qualifier(RecoveryQueueConfig.CONTAINER_FACTORY)
                           
    ConcurrentKafkaListenerContainerFactory<String, RecoveryMessage> containerFactory,
                          MessageRecoveryConfig messageRecoveryConfig) {        return RetryTopicConfigurationBuilder.newInstance()
                                     .includeTopic(RecoveryQueueConfig.TOPIC)
                                     .suffixTopicsWithIndexValues()
                                     .doNotAutoCreateRetryTopics()
                                     .maxAttempts(messageRecoveryConfig.getAttempts())
                                     .customBackoff(new IntervalBackOffPolicy(messageRecoveryConfig.getIntervals()))
                                     .notRetryOn(MethodArgumentResolutionException.class)
                                     .notRetryOn(MethodArgumentNotValidException.class)
                                     .notRetryOn(DeserializationException.class)
                                     .dltHandlerMethod(RecoveryQueueConfig.MESSAGE_LISTENER, "deadLetter")
                                     .doNotRetryOnDltFailure()
                                     .listenerFactory(containerFactory)
                                     .create(template);
    }

    При этом разработчику по-прежнему приходилось создавать обязательные связанные с Kafka bean-компоненты, такие как продюсеры и потребители, а также шаблон Kafka для отправки сообщений в DLT и слушатель, который заботится об их фактической повторной доставке.

    Практическое использование этого подхода может привести к следующим неожиданным проблемам:

    • сообщения в топиках повторных попыток становятся намного больше исходных из-за добавления к ним заголовков, связанных с исключениями (трассировка стека исключения, полное доменное имя и сообщение). Поскольку это не настраивается, обойти это можно на уровне клиента Kafka, удалив заголовки исключений в перехватчике пользовательского продюсера прямо перед отправкой сообщения.
    • повторяются сообщения, которые не могут быть десериализованы или проверены, поэтому их приходится явно исключать из конфигурации топика повторных попыток. Впрочем, это частично решено в Spring Kafka 2.8.
    • если сообщение попало в DLT из-за ошибок десериализации, оно несет исключение десериализации со всеми данными, которые не могут быть десериализованы. Когда такое DLT-сообщение доставляется обработчику недоставленных сообщений и его не удается десериализовать, создается новое DLT-сообщение еще большего размера. Это создает цикл, когда DLT заполняется постоянно растущими сообщениями, пока не будет превышен максимальный размер записи Kafka. Избежать этого поможет отключение отправки ошибочных недоставленных сообщений в DLT, что по умолчанию настроено в Spring Kafka 2.8.
    • Без явной установки префикса идентификатора клиента в аннотации слушателя метрики потребителя Kafka могут работать некорректно. Исправить ситуацию поможет явная установка параметров конфигурации:
    @KafkaListener(
    id = RecoveryQueueConfig.ID,
    idIsGroup = false,
    topics = RecoveryQueueConfig.TOPIC,
    clientIdPrefix = "${kafka.consumers.recovery-queue.clientId}",
    containerFactory = RecoveryQueueConfig.CONTAINER_FACTORY,
    groupId = RecoveryQueueConfig.GROUP_ID)
    public void listen(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
    @Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) String key,
    @Nullable @Header(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS) byte[] attempts,
    @Valid @Payload RecoveryMessage recoveryMessage) {
    ...
    }

    Больше практических кейсов по администрированию и эксплуатации Apache Kafka для потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

    [elementor-template id=»13619″]

    Источники

    1. https://medium.com/jamf-engineering/retryable-topics-with-spring-kafka-946360f2d644
    2. https://docs.spring.io/spring-kafka/reference/html/