Когда и зачем фиксировать смещение потребителей Kafka вручную, с какими проблемами можно при этом столкнуться и как улучшение KIP-1094 обеспечивает целостность потоков данных в распределенных средах.
Когда и зачем фиксировать смещения потребителей в Kafka вручную
Недавно мы разбирали, как выполняется автоматическая фиксация смещений потребителей в Apache Kafka. Она выполняется периодически. Это подходит для большинства случаев, но иногда возникает необходимость в ручной фиксации смещений, например, в следующих случай:
- необходимость строгого обеспечения exactly-once семантики доставки сообщений. Когда нужно избежать потерь или повторной доставки сообщений, то автоматическая фиксация смещений не подойдет, так как она происходит периодически и независимо от успешности их обработки. Выполняя фиксацию смещения вручную после успешной обработки сообщения, например, после того как данные успешно записаны в постоянное хранилище, можно точно гарантировать, что сообщение не будет потеряно или продублировано при повторном запуске потребителя. Например, необходимо фиксировать смещения, соответствующие успешно обработанным транзакциям, чтобы гарантировать, что каждая запись обрабатывается ровно один раз. После обработки смещение следующей записи для каждого раздела топика отправляется в транзакцию, и продюсер фиксирует это. Если во время обработки возникает ошибка, транзакция прерывается, чтобы гарантировать отсутствие частичной обработки. При прерывании транзакции смещения не фиксируются.
- транзакционная обработка, когда публикация сообщений в Kafka является частью более крупной транзакции, например, включающей запись в базу данных или вызов внешнего API, то необходимо контролировать точный момент фиксации смещения. Ручная фиксация позволяет согласовать фиксацию смещения с успешным завершением всей транзакции.
- обработка сообщений в пакетном режиме, когда сообщения потребляются пакетами и фиксировать их после того, как все сообщения из пакета успешно обработаны. Ручная фиксация позволяет в случае ошибки при обработке пакета повторно обработать весь пакет, а не только часть сообщений.
- управление производительностью и сокращение накладных расходов. Автоматическая фиксация смещений в Kafka обычно производится через определенные интервалы времени, например, раз в несколько секунд. Иногда это приводит к избыточной нагрузке. В таких случаях можно вручную выполнять фиксацию смещения реже или чаще в зависимости от требований.
- сложные сценарии обработки, где возможны ошибки, и нужно обеспечить точный контроль над повторным чтением или пропуском сообщений. В таких случаях ручная фиксация смещений позволяет явно управлять процессом обработки и восстановления после ошибок.
Таким образом, ручная фиксация смещения в Kafka позволяет более точно контролировать, когда фиксируются смещения, в отличие от автоматической фиксации смещения, которая происходит периодически. Для ручной фиксации смещений используются методы commitSync или commitAsync, которые предоставляются API потребителя Kafka. Синхронный метод commitSync блокируется до тех пор, пока запрос на фиксацию не будет подтвержден брокером, гарантируя, что фиксация была успешной перед продолжением. Это надежно, но может снизить общую пропускную способность из-за простоя потребителя, ожидающего завершения фиксации. С другой стороны, асинхронный метод commitAsync позволяет потребителю продолжать обработку записей, пока запрос на фиксацию обрабатывается в фоновом режиме. Это повышает пропускную способность, но создает риск сбоев фиксации, которые необходимо обрабатывать соответствующим образом. Если фиксация не удалась, можно использовать обратный вызов, чтобы повторить фиксацию или обработать сбой другим способом. Например, одной из допустимых стратегий может быть игнорирование одной неудачной фиксации и ожидание успешного выполнения следующей фиксации. Если между ними произойдет сбой, потребуется повторно обработать только несколько дополнительных записей, что остается приемлемым в рамках семантики at-least-once, гарантируя, что каждая запись будет обработана хотя бы один раз. Однако полностью игнорировать все ошибки фиксации не рекомендуется. Лучше установить пороговое значение для количества последовательных ошибок фиксации, которые приложение может безопасно игнорировать, прежде чем это станет проблемой.
Оба метода commitSync и commitAsync фиксируют смещение на основе последнего вызова poll-опроса. Смещения, зафиксированные с помощью этих методов, будут использоваться при первой выборке после каждой перебалансировки, а также при запуске. Также commitSync и commitAsync позволяют делать ручную фиксацию смещений с интервалами, отличными от периодичности poll-опроса. Но на практике фиксация каждого отдельного смещения нецелесообразна, т.к. это снижает пропускную способность потребителя, ухудшает производительность и задержку всей системы.
Разобравшись с особенностями ручной фиксации смещений, далее рассмотрим, как это сделать и какие при этом могут возникнуть проблемы.
Как узнать последнее смещение потребителя
Чтобы узнать, какое смещение зафиксировать, можно провести опрос потребителя. Однако, смещение последней записи, возвращенное poll-опросом + 1 не всегда является следующим реальным смещением, т.к. оно увеличивается не только из-за фактического потребления. Контрольные записи (control records) также увеличивают смещение. Это системные сообщения специального назначения, которые создаются брокером и используются для внутреннего управления состоянием группы потребителей, в частности, для фиксации смещений. Они также хранятся в специальном внутреннем топике Kafka под названием __consumer_offsets. Когда потребитель Kafka фиксирует свои текущие смещения, брокер записывает их именно в этот внутренний топик в виде контрольных записей. Эти записи предназначены не для передачи реальных пользовательских данных, а исключительно для внутреннего учета и восстановления состояния потребителей.
Таким образом, контрольные записи хранят информацию о текущих позициях потребителей, чтобы в случае перезапуска приложения, сбоя или ребалансировки группы можно было продолжить потребление сообщений именно с того места, где остановилась обработка. Также они позволяют брокеру Kafka координировать работу группы потребителей, отслеживать активных участников группы и инициировать ребалансировки при изменении состава группы. Наконец, если потребитель выходит из строя, Kafka может восстановить его текущее состояние, прочитав последнее зафиксированное смещение из контрольных записей внутреннего топика.

Проблема возникает, когда приложение-продюсер перестаёт публиковать новые данные или публикует их слишком медленно из-за, например, очень низкой пропускной способности или проблем с сетью. В этом случае потребитель при чтении потока данных достигает контрольных записей и переходит через них. Однако после этих контрольных записей новых сообщений уже нет, поскольку продюсер остановился или замедлился. Таким образом, потребитель останавливается на позиции сразу после контрольных записей. Проблема в том, что Kafka продолжает считать эти контрольные записи частью потока данных, и потребитель, не получая новых данных, остаётся с разницей в смещении, равной количеству таких контрольных записей. Это создаёт иллюзию отставания, временной лаг (lag), хотя по факту пользователь уже обработал все реальные сообщения. Такая специфика работы Kafka и её механизма отслеживания смещения может сбивать с толку.

Альтернативой poll-опросу для определения фиксируемого смещения является явное получение позиции потребителя для конкретного раздела топика. Однако, при этом возвращается только следующее смещение без дополнительных метаданных, таких как эпоха лидера. Однако, это необходимо для согласованности метаданных, что мы недавно разбирали здесь.
По умолчанию потребитель Kafka поддерживает позицию в памяти и возвращает ее по запросу. Иногда позиция может отсутствовать или быть некорректной, например, когда лидер раздела меняется, или брокер Kafka выполняет обслуживание или восстановление данных. В таких случаях потребитель вынужден повторно запрашивать позицию у брокера и проверять ее на допустимость. Допустимая позиция всегда должна быть меньше последнего смещения раздела и должна указывать на сообщение, которое еще существует в логе Kafka, а не на сообщения, которые уже попали под политику очистки.
Если позиция некорректна или отсутствует, потребитель должен получить позицию от брокера и проверить ее. Когда потребитель не может проверить существующую позицию с новым лидером или получить новую, возникает исключение TimeoutException, которое нужно обработать соответствующим образом. Кроме того, может возникнуть исключение OFFSET_OUT_OF_RANGE, если выбор лидера происходит параллельно с перебалансировкой и есть так называемый лидер-зомби, который перестал быть лидером из-за потери доступности в сети, но сам еще не знает об этом.
Поэтому в Kafka 4.0 реализовано улучшение KIP-1094 для фиксации корректного смещения следующей записи данных вместе с метаданными эпохи лидера. Как это реализовано, рассмотрим далее.
KIP-1094 для фиксации корректного смещения в Apache Kafka 4.0
Надежная фиксации корректного смещения следующей записи данных вместе с метаданными эпохи лидера в KIP-1094 реализована с помощью нового метода конструктора nextOffsets в класс ConsumerRecords. Этот конструктор инициализирует следующие смещения, а также эпоху лидера, обернутую в объект OffsetAndMetadata. Такое улучшение гарантирует, что корректное следующее смещение вместе с правильной эпохой лидера предоставляется записями, непосредственно возвращаемыми из poll-опроса. Новый метод конструктора nextOffsets также делает устаревшим текущий конструктор ConsumerRecords, который не включает следующие смещения. Это изменение направлено на предотвращение потенциальных ошибок в будущем и улучшение Consumer API.
Такое решение было принято вместо кэширования диапазонов смещения эпохи лидера внутри потребителя, чтобы обеспечить независимость от продюсера. Второй отклоненной альтернативой был новый метод Consumer.positionWithMetadata, усложняющий интерфейс потребителя. Кроме того, этот метод страдал бы от состояния гонки, при котором позиция могла бы измениться к тому времени, когда приложение будет фиксировать смещение.
Таким образом, KIP-1094 позволяет получить правильное смещение следующей записи данных вместе с эпохой лидера из ConsumerRecords, возвращаемого poll-опросом, и вручную фиксировать его через метод commitSync или commitAsync. Новый конструктор инициализирует следующие смещения, а также эпоху лидера, обернутую в объект ConsumerRecordsOffsetAndMetadata.
Это полезно, например, когда приложение Kafka Streams фиксирует смещения после обработки всех записей во внутренних буферах. Вместо простой фиксации смещения последней записи в объекте ConsumerRecords + 1, чреватой временным лагом из-за контрольных записей, Kafka Streams фиксирует смещение, возвращаемое Consumer.position. Результат является следующим смещением, которое должно быть извлечено после последнего извлеченного пакета, включая любые потенциальные контрольные записи фиксации. Однако метод Consumer.position не включает эпоху лидера. Кроме того, иногда из-за состояния гонки между потоками application/StreamThread и consumer heartbeat метод Consumer.position() возвращает некорректное следующее смещение. Улучшение KIP-1094 устраняет эти проблемы, предлагая более точный и надежный способ управления смещениями, что повышает надежность API потребителя Kafka и поддерживает целостности потоков данных в распределенных средах.
Научитесь администрированию и эксплуатации Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники