Содержание
Специально для обучения разработчиков распределенных приложений и дата-инженеров, рассмотрим практический пример использования возможностей фреймворка Spring для управления повторными попытками отправки сообщений потребителям из топика Apache Kafka.
Повторные попытки отправки сообщений и Spring для Apache Kafka
Довольно часто Kafka-приложения требуют высокой надежности обработки сообщений. Например, в финтех- или медтех-проектах, а также системах интернета вещей и платформах информационной безопасности. Например, данные о сетевом трафике клиентов, угрозах, приложениях и устройствах пользователей в режиме реального времени интегрируются в топики Apache Kafka, откуда потом извлекаются, обогащаются и отправляются на настроенные конечные точки систем-приемников. Доступность сторонних систем-приемников зависит от внешней инфраструктуры, а сеть передачи данных может быть ненадежной. При недоступности приемника или невозможности внешней системы обработать запрос потока данных, сообщения просто теряются.
В Apache Kafka потерянные сообщения попадают в соответствующую очередь или топик недоставленных сообщений (Dead letter queue/topic, DLT). Поэтому для повторной отправки сообщений получателю следует работать именно с этим топиком. Рассмотрим, как это сделать, используя популярный open-source фреймворк разработки Java-приложений Spring. Он позволяет реализовать наиболее популярные элементы типовых комплексных приложений: от управления транзакциями до работы с сообщениями. Проект Spring для Apache Kafka применяет основные концепции фреймворка Spring к разработке решений для обмена сообщениями, предоставляя соответствующие высокоуровневые абстракции и шаблоны.
Spring Kafka предоставляет функцию повторной попытки с помощью шаблона RetryTemplate с RetryingMessageListenerAdapter или, в последних версиях, путем настройки обработчика ошибок. Например, чтобы настроить топик повторных попыток и DLT для аннотированного метода @KafkaListener, разработчику нужно просто добавить к нему аннотацию @RetryableTopic, и Spring для Apache Kafka загрузит все необходимые топики и потребителей с конфигурациями по умолчанию:
@RetryableTopic(kafkaTemplate = "myRetryableTopicKafkaTemplate")
@KafkaListener(topics = "my-annotated-topic", groupId = "myGroupId")
public void processMessage(MyPojo message) {
// ... message processing
}
Для обработки сообщений DLT можно указать метод в том же классе, добавив к нему аннотацию @DltHandler. Если метод DltHandler не указан, создается потребитель по умолчанию, который регистрирует только потребление.
@DltHandler
public void processMessage(MyPojo message) {
// ... message processing, persistence, etc
}
Важно помнить, что без указания имени шаблона Kafka (kafkaTemplate), будет найден bean-компонент с именем retryTopicDefaultKafkaTemplate. Если bean-компонент не найден, генерируется исключение. В Spring бины (bean) − это классы, созданием экземпляров которых и установкой в них зависимостей управляет контейнер фреймворка. Бины предназначены для реализации бизнес-логики приложения. Bean воплощает паттерн проектирования «одиночка» (singleton), т.е. в некотором блоке приложения существует только один экземпляр данного класса. Поэтому, если бин содержит изменяемые данные в полях, т.е. имеет состояние, то обращение к таким данным необходимо синхронизировать. Возвращаясь к повторным попыткам отправки сообщений в Kafka с помощью Spring, отметим, что можно настроить поддержку неблокирующих повторных попыток, создав bean-компоненты RetryTopicConfiguration в аннотированном классе @Configuration.
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
return RetryTopicConfigurationBuilder
.newInstance()
.create(template);
}
Это создаст топики повторных попыток и DLT, а также соответствующих потребителей для всех топиков в методах, аннотированных с помощью @KafkaListener, с использованием конфигураций по умолчанию. Экземпляр KafkaTemplate необходим для пересылки сообщений. Для более детального контроля над обработкой неблокирующих повторных попыток для каждого топика, можно предоставить более одного bean-компонента RetryTopicConfiguration.
Потребители топика повторных попыток и DLT будут назначены группе потребителей с идентификатором группы, который представляет собой комбинацию того, что указано в параметре groupId аннотации @KafkaListener с суффиксом топика. Если ничего не задано, все они будут принадлежать к одной и той же группе, а перебалансировка в топике повторной попытки вызовет ненужную перебалансировку в основном топике.
Если потребитель настроен с помощью ErrorHandlingDeserializer, для обработки исключений десериализации важно настроить KafkaTemplate и его продюсера с сериализатором, который может обрабатывать обычные объекты, а также необработанные значения byte[], возникающие в результате исключений десериализации. Общий тип значения шаблона должен быть Object. Один из методов заключается в использовании DelegatingByTypeSerializer:
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfiguration(), new StringSerializer(),
new DelegatingByTypeSerializer(Map.of(byte[].class, new ByteArraySerializer(),
MyNormalObject.class, new JsonSerializer<Object>())));
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
Для одного и того же топика можно использовать несколько аннотаций @KafkaListener с ручным назначением разделов или без него вместе с неблокирующими повторными попытками. Но конфигурация топика при этом не меняется. Поэтому лучше использовать один bean-компонент RetryTopicConfiguration для настройки таких топиков. Если для одного и того же топика используется несколько аннотаций @RetryableTopic, все они должны иметь одинаковые значения, иначе одна из них будет применена ко всем слушателям этого топика, а значения других аннотаций будут проигнорированы.
Хотя такой подход к повторным попыткам сохраняет гарантии порядка доставки сообщений, повторная попытка и ожидание задержки повтора выполняется в приложении-потребителе Kafka. Каждый сбой блокирует обработку назначенных разделов топика, вызывая отставание потребителя, что неприемлемо для топиков с высокой пропускной способностью. Кроме того, это не решает проблему, когда сообщение Kafka не может быть обработано в течение длительного периода времени. Обойти это ограничение поможет возможность, появившаяся в Spring Kafka 2.7, о чем мы поговорим далее.
Неблокирующие повторные попытки
В Spring Kafka 2.7 появилась новая функция неблокирующих повторных попыток, которая использует дополнительные топики Kafka с повторными попытками, куда отправляются необработанные сообщения для последующей повторной доставки. Количество топиков и повторных попыток является произвольным, и для каждого можно настроить различную задержку. Однако, поскольку повторные попытки не блокируются, гарантии порядка сообщений Kafka больше невозможны.
Когда топик настроен на повторную попытку, а слушатель, обрабатывающий сообщение, выдает исключение, настроенный механизм восстановления после ошибок выполняет следующие шаги:
- вычисляет время, когда сообщение должно быть обработано снова;
- создает повторное сообщение как копию исходного с дополнительными заголовками (время выполнения сообщения, сведения об исключении и пр.);
- вычисляет следующий топик повтора и публикует в нем это сообщение;
- если количество попыток исчерпано, сообщение удаляется или отправляется в топик DLT, чтобы обработать его позже при необходимости.
Spring Kafka автоматически создает контейнеры слушателя сообщений для топиков повторных попыток и DLT. Эти контейнеры по умолчанию используют тот же слушатель сообщений и конфигурацию, что и основной топик. Слушатели обёрнуты в специальные адаптеры, реализующие отложенную обработку сообщений следующим образом:
- проверяется время выполнения входящего сообщения в KafkaBackoffAwareMessageListenerAdapter — если оно уже прошло, сообщение передается слушателю;
- если положенное время еще не наступило, адаптер заставляет диспетчер отсрочки PartitionPausingBackoffManager выдавать исключение KafkaBackoffException с информацией о том, насколько должна быть отложена обработка;
- смещение раздела сбрасывается, и потребитель Kafka приостанавливается на требуемый период времени;
- как только потребитель активизируется, сообщение доставляется снова.
Таким образом, при сбое отправки сообщения в конкретный приемник отправитель создает сообщение для восстановления и публикует его в разделе очереди восстановления. Сообщение содержит полную полезную нагрузку для отправки и идентификации системы-приемника. DLT-слушатель Kafka получает сообщение о сбое, и немедленно пытается выполнить повторную доставку. Если это не удается, сообщение передается в следующую очередь восстановления с возрастающей задержкой до тех пор, пока оно не будет успешно отправлено или не исчерпается количество попыток.
Самый простой способ настроить топик с повторами — использовать аннотацию @RetryableTopic в методе слушателя сообщений вместе с аннотацией @KafkaListener. В аннотации можно настроить большинство свойств повторных попыток: количество, автоматическое создание топика и стратегию отсрочки для расчета задержек доставки.
Альтернативой аннотации является создание bean-компонентов RetryTopicConfiguration, которые обеспечивают более точную настройку:
@Bean
public RetryTopicConfiguration recoveryQueueConfiguration(
@Qualifier(RecoveryQueueConfig.TEMPLATE) KafkaTemplate<String, RecoveryMessage> template,
@Qualifier(RecoveryQueueConfig.CONTAINER_FACTORY)
ConcurrentKafkaListenerContainerFactory<String, RecoveryMessage> containerFactory,
MessageRecoveryConfig messageRecoveryConfig) { return RetryTopicConfigurationBuilder.newInstance()
.includeTopic(RecoveryQueueConfig.TOPIC)
.suffixTopicsWithIndexValues()
.doNotAutoCreateRetryTopics()
.maxAttempts(messageRecoveryConfig.getAttempts())
.customBackoff(new IntervalBackOffPolicy(messageRecoveryConfig.getIntervals()))
.notRetryOn(MethodArgumentResolutionException.class)
.notRetryOn(MethodArgumentNotValidException.class)
.notRetryOn(DeserializationException.class)
.dltHandlerMethod(RecoveryQueueConfig.MESSAGE_LISTENER, "deadLetter")
.doNotRetryOnDltFailure()
.listenerFactory(containerFactory)
.create(template);
}
При этом разработчику по-прежнему приходилось создавать обязательные связанные с Kafka bean-компоненты, такие как продюсеры и потребители, а также шаблон Kafka для отправки сообщений в DLT и слушатель, который заботится об их фактической повторной доставке.
Практическое использование этого подхода может привести к следующим неожиданным проблемам:
- сообщения в топиках повторных попыток становятся намного больше исходных из-за добавления к ним заголовков, связанных с исключениями (трассировка стека исключения, полное доменное имя и сообщение). Поскольку это не настраивается, обойти это можно на уровне клиента Kafka, удалив заголовки исключений в перехватчике пользовательского продюсера прямо перед отправкой сообщения.
- повторяются сообщения, которые не могут быть десериализованы или проверены, поэтому их приходится явно исключать из конфигурации топика повторных попыток. Впрочем, это частично решено в Spring Kafka 2.8.
- если сообщение попало в DLT из-за ошибок десериализации, оно несет исключение десериализации со всеми данными, которые не могут быть десериализованы. Когда такое DLT-сообщение доставляется обработчику недоставленных сообщений и его не удается десериализовать, создается новое DLT-сообщение еще большего размера. Это создает цикл, когда DLT заполняется постоянно растущими сообщениями, пока не будет превышен максимальный размер записи Kafka. Избежать этого поможет отключение отправки ошибочных недоставленных сообщений в DLT, что по умолчанию настроено в Spring Kafka 2.8.
- Без явной установки префикса идентификатора клиента в аннотации слушателя метрики потребителя Kafka могут работать некорректно. Исправить ситуацию поможет явная установка параметров конфигурации:
@KafkaListener(
id = RecoveryQueueConfig.ID,
idIsGroup = false,
topics = RecoveryQueueConfig.TOPIC,
clientIdPrefix = "${kafka.consumers.recovery-queue.clientId}",
containerFactory = RecoveryQueueConfig.CONTAINER_FACTORY,
groupId = RecoveryQueueConfig.GROUP_ID)
public void listen(@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(name = KafkaHeaders.RECEIVED_MESSAGE_KEY, required = false) String key,
@Nullable @Header(RetryTopicHeaders.DEFAULT_HEADER_ATTEMPTS) byte[] attempts,
@Valid @Payload RecoveryMessage recoveryMessage) {
...
}
Больше практических кейсов по администрированию и эксплуатации Apache Kafka для потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
[elementor-template id=»13619″]
Источники
- https://medium.com/jamf-engineering/retryable-topics-with-spring-kafka-946360f2d644
- https://docs.spring.io/spring-kafka/reference/html/


