Иногда в распределенных системах требуется строгий порядок событий, т.е. сообщений или записей с полезными данными и состоянием, который должен поддерживаться между продюсерами и потребителями в конвейере их обработки. Например, чтобы сохранить корректный порядок транзакций для правильного расчета остатков по счетам. Читайте далее, как это реализовать в Apache Kafka.
Настройка продюсера
В Apache Kafka, начиная с версии 2.1.0, если отправленное сообщение не подтверждено брокером, оно будет повторно отправлено продюсером. Отправка повторится не более 2147483647 раз (максимальное значение числа типа integer, MAX_INT) или до истечения срока, указанного в параметре конфигурации delivery.timeout.ms, по умолчанию равного 2 минуты. При этом разрешение повторных попыток без установки для параметра max.in.flight.requests.per.connection значения 1 потенциально изменит порядок записей, потому что если два пакета отправляются в один раздел, и первый терпит неудачу и повторяется, а второй завершается успешно, то записи во второй партии может появиться первая.
По умолчанию значение параметра max.in.flight.requests.per.connection равно 5. Поэтому в случае сбоя, т.е. когда запись не подтверждена брокером, продюсер может отправить записи, которые, скорее всего, будут храниться в неправильном порядок, поскольку на самом деле по умолчанию Kafka не гарантирует, что сообщения, отправленные производителем в определенный раздел топика, будут добавлены в том порядке, в котором они были отправлены.
Это легко исправить, просто изменив значение параметра max.in.flight.requests.per.connection для нужного продюсера на 1, что отразится на его пропускной способности. Также можно установить enable.idempotence=true без необходимости уменьшать max.in.flight.requests.per.connection, но тогда придется настроить другие параметры. Для включения идемпотентности нужно, чтобы max.in.flight.requests.per.connection было меньше или равно 5, количество повторных попыток было больше 0, а количество подтверждений должно быть «все» (acks = all). Если эти значения не установлены пользователем явно, будут автоматически выбраны наиболее подходящие. Если заданы несовместимые значения, возникнет исключение ConfigException.
Итак, продюсер реализует идемпотентность, отправляя в Kafka порядковый номер с каждым сообщением. Если этот порядковый номер ровно на 1 больше, чем последний, то сообщение будет сохранено в Kafka. Но конфигурация по умолчанию может привести к созданию сообщений в неправильном порядке, когда происходит сбой. Поэтому, если порядок сообщений важен для распределенного приложения, могут возникнуть проблемы.
Чтобы обеспечить строгий порядок всех событий, топик должен быть разделен на отдельные разделы, что плохо масштабируется. Более того, иногда не нужно сохранение последовательности для всех событий. Например, требуется сохранить порядок транзакций для каждой учетной записи, а не между учетными записями. В этом случае пригодится разделение по ключам при публикации событий.
Kafka выбирает, какое событие назначить какому разделу в топике. Однако мы можем выбрать ключ разделения. Каждое событие в Kafka состоит из ключа, значения и метки времени. Разделитель Kafka по умолчанию использует хэш ключа события для выбора раздела, т. е. записи с одинаковым ключом назначаются одному и тому же разделу.
Чтобы продемонстрировать, как это работает, рассмотрим пример реализации на Python. Сперва импортируем библиотеки.
import json from kafka import KafkaProducer
Покажем пример события для публикации:
e1={"key1":1, "key2":2, "value":100} print (e1, type(e1))
Инициируем продюсера:
producer = KafkaProducer(bootstrap_servers=['broker list'], key_serializer=lambda x:json.dumps(x).encode('ascii'), value_serializer=lambda v:json.dumps(v).encode('ascii'))
Определяем ключ и публикуем событие, выбрав key1 в качестве ключа разделения:
producer.send(‘topic’, key=e1[«key1»], value=e1)
В приведенном выше примере Kafka будет отправлять все события с key1 = 1 в один и тот же раздел. Но в действительности некоторые разделы могут быть более загруженными, чем другие, в зависимости от ключа, хотя потоковая обработка должна поддерживать порядок, в котором события были созданы. Но этот порядок может быть нарушен в приложении-продюсере, например, из-за многопоточных процессов. Избежать этого поможет определенная настройка потребителя, что мы разберем далее.
Обработка исключений потребителя Apache Kafka
Экземпляр-потребитель видит записи в том порядке, в котором они хранятся в журнале. Потребитель просто считывает сообщения по порядку, пока не столкнется с исключением. В этом случае можно приостановить потребление в момент сбоя и возобновить его после разрешения исключения, чтобы сохранить строгий порядок. Для этого пригодятся следующие методы RESTful-API Kafka:
- PUT/connectors/{name}/pause — приостановить коннектор и его задачи, что останавливает обработку сообщений до возобновления коннектора;
- PUT/connectors/{name}/resume — возобновить приостановленный коннектор или ничего не делать, если коннектор не приостановлен;
Чтобы проиллюстрировать, как это работает, рассмотрим пример реализации кода на Python. Сперва импортируем нужные библиотеки:
import json from kafka import KafkaConsumer, TopicPartition
Инициируем потребителя:
consumer = KafkaConsumer( 'topic', bootstrap_servers=['broker list'], value_deserializer=lambda m: json.loads(m.decode('ascii'), strict=False))
Получим события:
consumer.poll()
Обработаем события:
for event in consumer: data = event.value try: fnPostDB(data) #Writes to database except: fnPostException(data) #Notifies exception topics = [TopicPartition('topic', event.partition)] consumer.pause(topics[0]) resumeState = fnResume() #Returns True when exception is resolved if resumeState == 'True': fnPostDB(data) #Reexecutes failed task consumer.resume(topics[0])
В приведенном коде потребитель пытается опубликовать каждое событие в базе данных, используя функцию fnPostDB(data). Если конечная точка базы данных недоступна, она уведомляет об исключении с помощью fnPostException(data) и приостанавливает потребление из раздела топика.
Далее следует проверить, разрешено ли исключение (в этом случае доступна конечная точка). Функция fnResume() может проверять конечную точку с фиксированными интервалами и возвращать True, когда она доступна. Когда исключение разрешено, необходимо повторить неудачную задачу fnPostDB(data) и возобновить потребление из раздела.
На практике не все исключения могут быть связаны с интерфейсом. Хотя это менее вероятно в реальных приложениях, можно столкнуться с исключениями, связанными с полезной нагрузкой, например, сбой десериализации или отсутствие требуемого элемента данных. В таких случаях нет смысла повторять неудачную задачу. Вместо этого имеет смысл прочитать исправленное событие из топика воспроизведения перед возобновлением работы потребителя.
Экземпляр потребителя в группе потребителей прекратит обработку из раздела топика после выдачи паузы, что подходит для рассмотренного примера. Однако, если разные номера учетных записей используют разные конечные точки в качестве аргумента, то потребление для всех номеров учетных записей в разделе приостанавливается. Воздействие можно свести к минимуму, увеличив количество разделов топика и экземпляров потребителей. Но такое согласование кардинальности ключа разделения может оказаться трудным или даже невозможным, а описанное решение увеличит задержку в конвейере данных.
Освойте администрирование и использование Apache Kafka для потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков больших данных в Москве: