Изучаем Apache Kafka с нуля. Урок 19. kafka-consumer-groups.sh

Изучаем Apache Kafka с нуля. Урок 19. kafka-consumer-groups.sh

 

В уроке 18 мы разобрали kafka-delete-records.sh — смещали нижнюю границу лога, делая ненужные записи недоступными для консьюмеров. Там уже мелькало понятие consumer group offset: Kafka запоминает, до какого сообщения дочитала каждая группа, и именно это позволяет консьюмерам продолжать с нужного места.

Сегодня разбираем утилиту, которая управляет этим механизмом напрямую. kafka-consumer-groups.sh — главный инструмент для работы с группами консьюмеров: смотреть их состояние, измерять lag, сбрасывать и экспортировать офсеты, удалять группы. Без неё не обходится ни одна диагностика задержек в production-кластере.

Тема consumer groups разбирается на уровне внутренних механизмов в курсе «Администрирование кластера Kafka» — там же лабораторные по диагностике lag-а и управлению офсетами в условиях нагрузки.

 

Что такое consumer group и почему это важно

Consumer group — группа консьюмеров, которые вместе читают один топик. Каждая партиция в топике назначается ровно одному консьюмеру в группе. Если консьюмеров меньше, чем партиций, один консьюмер может читать сразу несколько. Если больше — лишние просто ждут.

Kafka хранит текущий офсет каждой группы в системном топике __consumer_offsets. Именно оттуда утилита и берёт данные при вызове. Главная метрика, за которой следят в продакшне, — consumer lag. Это разница между последним сообщением в партиции и тем, до куда дочитала группа. Если lag растёт — консьюмеры не успевают за продюсером.

Kafka consumer group lag и распределение партиций между консьюмерами


Флаги kafka-consumer-groups.sh

Утилита работает по принципу «одна операция — один запуск». Флаги разбиваются на три группы: подключение, действие и уточнение действия.

Флаг Что делает Обязательный
—bootstrap-server Адрес брокера (host:port) да
—list Вывести список всех consumer groups нет
—describe Детальная информация о группе: партиции, офсеты, lag нет
—group Имя конкретной группы для действия зависит от операции
—all-groups Применить операцию ко всем группам нет
—topic Ограничить операцию конкретным топиком (или топиком:партицией) нет
—all-topics Применить операцию ко всем топикам группы нет
—reset-offsets Сбросить офсеты группы (требует стратегии и —execute или —dry-run) нет
—to-earliest Стратегия сброса — к первому доступному сообщению нет
—to-latest Стратегия сброса — к последнему сообщению (пропустить всё накопленное) нет
—to-offset N Стратегия сброса — к конкретному офсету нет
—to-datetime Стратегия сброса — к офсету, соответствующему метке времени (ISO-8601) нет
—by-duration Стратегия сброса — назад на длительность в формате ISO 8601 Duration (PT1H, P1D) нет
—shift-by N Стратегия сброса — сдвинуть офсет на N позиций (может быть отрицательным) нет
—execute Применить сброс офсетов (без него — только предпросмотр) нет
—dry-run Показать, что изменится, без реального применения нет
—export Экспортировать текущие офсеты группы в CSV нет
—from-file Загрузить офсеты из CSV-файла (обратная операция к —export) нет
—delete Удалить группу (группа должна быть неактивна) нет
—delete-offsets Удалить сохранённые офсеты группы для конкретного топика нет
—members Вместе с —describe показать участников группы с назначенными партициями нет
—state Вместе с —describe показать состояние группы (Stable, Empty, PreparingRebalance…) нет
—verbose Добавить колонку с preferred leader в вывод —describe нет
—offsets Показать офсеты (используется с —describe, включён по умолчанию) нет
—command-config Properties-файл с настройками TLS/SASL для защищённого кластера нет
—timeout Таймаут ожидания ответа брокера в миллисекундах (по умолчанию 5000) нет

Флаги —reset-offsets и —delete работают только на неактивных группах — у которых нет запущенных консьюмеров в данный момент.

Просмотр всех consumer groups. Флаг —list

Самая простая операция — посмотреть, какие группы вообще есть в кластере.

kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --list

Вывод будет простым — по одной группе на строку:

analytics-group
payment-processor
console-consumer-12345
dead-letter-handler

Группы с автоматическими именами вида console-consumer-XXXXX — это временные группы, которые создаёт kafka-console-consumer.sh без явного флага —group. Обычно их можно смело удалять, если соответствующих консьюмеров уже нет.

 

Диагностика группы. Флаг —describe

 

Это самая используемая операция в повседневной работе. Показывает состояние группы по каждой партиции каждого топика, который она читает.

kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --group analytics-group

Типичный вывод:

GROUP            TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID                                     HOST            CLIENT-ID
analytics-group  orders  0          490             500             10   consumer-1-abc123-def456  /10.0.0.5  my-consumer
analytics-group  orders  1          480             480             0    consumer-1-abc123-def456  /10.0.0.5  my-consumer
analytics-group  orders  2          495             520             25   consumer-2-xyz789         /10.0.0.6  my-consumer

Разбор колонок:

  • CURRENT-OFFSET — офсет, до которого дочитала группа в этой партиции (последний закоммиченный).
  • LOG-END-OFFSET — офсет следующей записи, которую добавит продюсер. Иными словами — текущий конец лога.
  • LAG — разница между LOG-END-OFFSET и CURRENT-OFFSET. Ноль — группа в актуальном состоянии. Растущее число — консьюмер не успевает.
  • CONSUMER-ID — внутренний идентификатор экземпляра консьюмера.
  • HOST — адрес машины, где запущен консьюмер.

Если поле CONSUMER-ID пустое — группа существует, но ни один консьюмер сейчас к ней не подключён. Это нормально для завершивших работу процессов, офсеты при этом сохраняются.

Просмотр участников и состояния группы

Флаги —members и —state расширяют вывод —describe разными срезами данных.

# Показать участников группы и назначенные им партиции
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --group analytics-group \
  --members --verbose
# Показать состояние группы (Stable, Empty, PreparingRebalance)
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --group analytics-group \
  --state

Состояния группы, которые покажет —state: Stable — всё нормально, Empty — нет активных консьюмеров, PreparingRebalance — идёт перераспределение партиций, Dead — группа удалена или сломана координатором.

 

Сброс офсетов. Флаг —reset-offsets

 

Сброс офсетов нужен в нескольких типичных сценариях: перечитать топик с начала для повторной обработки, пропустить накопившийся lag, восстановить офсеты после инцидента. Прежде чем что-то применять, всегда используйте —dry-run — это покажет результат без реальных изменений.

Предпросмотр сброса перед применением

# Проверено: Apache Kafka 4.2.0, Ubuntu 22.04
# Посмотреть, что изменится, без применения
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --reset-offsets \
  --group analytics-group \
  --topic orders \
  --to-earliest \
  --dry-run

Вывод сухого прогона покажет таблицу с текущим и новым офсетом для каждой партиции. Применения не происходит.

Сброс к началу топика

# Перечитать всё с первого доступного сообщения
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --reset-offsets \
  --group analytics-group \
  --topic orders \
  --to-earliest \
  --execute

Группа должна быть неактивна — все консьюмеры остановлены. Иначе команда завершится с ошибкой.

Пропустить всё накопленное. Сброс к концу

# Пропустить весь накопленный lag, начать читать новые сообщения
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --reset-offsets \
  --group analytics-group \
  --all-topics \
  --to-latest \
  --execute

Сброс к конкретному офсету

# Установить офсет 350 для партиции 0 топика orders
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --reset-offsets \
  --group analytics-group \
  --topic orders:0 \
  --to-offset 350 \
  --execute

Синтаксис —topic orders:0 позволяет ограничить операцию конкретной партицией, не трогая остальные.

 

Сброс по времени

# Перемотать к конкретному моменту времени
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --reset-offsets \
  --group analytics-group \
  --topic orders \
  --to-datetime 2025-04-01T00:00:00.000 \
  --execute
# Откатиться назад на 2 часа
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --reset-offsets \
  --group analytics-group \
  --all-topics \
  --by-duration PT2H \
  --execute

Формат PT2H — это ISO 8601 Duration. PT1H — час, PT30M — 30 минут, P1D — сутки.

Сдвиг офсета на N позиций

# Откатиться на 100 сообщений назад
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --reset-offsets \
  --group analytics-group \
  --topic orders \
  --shift-by -100 \
  --execute

Отрицательное значение — откат назад. Положительное — пропустить вперёд. Полезно, когда нужно перечитать небольшой диапазон после сбоя в приложении.

 

Экспорт и импорт офсетов

Бывает нужно зафиксировать текущее состояние офсетов перед рискованной операцией или перенести их между кластерами. Для этого есть пара —export и —from-file.

Экспорт офсетов в CSV

# Сохранить текущие офсеты группы в файл
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --reset-offsets \
  --group analytics-group \
  --all-topics \
  --to-current \
  --export \
  --dry-run > /tmp/analytics-offsets.csv

CSV-файл будет содержать строки вида orders,0,490 — топик, партиция, офсет.

Восстановление офсетов из файла

# Восстановить офсеты из сохранённого файла
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --reset-offsets \
  --group analytics-group \
  --from-file /tmp/analytics-offsets.csv \
  --execute

Это работает как точка отката. Сначала экспорт — потом операция — потом при необходимости импорт обратно.

 

Apache Kafka для инженеров данных

Код курса
DEVKI
Ближайшая дата курса
24 августа, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

 

Удаление группы и офсетов

Удалить группу можно только если она неактивна. При удалении стираются все сохранённые офсеты этой группы.

# Удалить группу целиком (вместе с её офсетами)
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --delete \
  --group console-consumer-12345

Если нужно удалить только офсеты группы для конкретного топика, не трогая остальные — используйте —delete-offsets:

# Удалить только офсеты для топика orders, остальные топики группы не трогать
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --delete-offsets \
  --group analytics-group \
  --topic orders

Ответ будет содержать список партиций и статус удаления для каждой из них.

 

Работа с несколькими группами сразу

Флаг —all-groups позволяет применить операцию ко всем группам кластера. Используйте осторожно — особенно с —reset-offsets и —delete.

# Описать все группы сразу - удобно для общего мониторинга
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --all-groups
# Показать состояние всех групп (Stable/Empty/Dead)
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --all-groups \
  --state

Вывод —all-groups —state даёт быстрый срез по кластеру: сразу видно, какие группы в Stable, какие Empty, какие застряли в PreparingRebalance.

 

Admin API как альтернатива

Всё, что делает kafka-consumer-groups.sh, доступно через Java AdminClient. Это прямая альтернатива утилите для задач автоматизации — в тех случаях, когда скрипты неудобны или нужна интеграция с приложением.

Операция в утилите Метод AdminClient
—list listConsumerGroups()
—describe describeConsumerGroups()
—describe (офсеты) listConsumerGroupOffsets()
—reset-offsets —execute alterConsumerGroupOffsets()
—delete-offsets deleteConsumerGroupOffsets()
—delete deleteConsumerGroups()
—describe —members describeConsumerGroups() (поле members в MemberDescription)

Для Python-разработчиков те же операции доступны через confluent-kafka (AdminClient) или kafka-python. В курсе «Apache Kafka для инженеров данных» работа с AdminClient разбирается в блоке операционных задач на уровне кода.

Типовые сценарии и диагностика

Растущий lag — что проверить

Если lag по группе постоянно растёт, алгоритм диагностики простой.

  • Проверить, активна ли группа. Запустить —describe —members: если CONSUMER-ID пустой у всех партиций — консьюмеры остановлены. Lag растёт просто потому, что никто не читает.
  • Посмотреть, равномерно ли распределён lag. Если в одной партиции lag нулевой, а в другой огромный — скорее всего медленный консьюмер или проблема в логике обработки одного конкретного ключа.
  • Проверить состояние группы через —state. PreparingRebalance на длительное время — признак нестабильного консьюмера, который постоянно перерегистрируется.

После диагностики — сначала решить проблему на стороне консьюмера, и только потом думать о сбросе офсетов.

Ошибки при сбросе офсетов

Самая частая ошибка: Assignments can only be reset if the group ‘X’ is inactive. Значит, в группе ещё работает хотя бы один активный консьюмер. Нужно остановить все экземпляры и повторить.

Вторая по частоте: смещение вышло за пределы available range — указанный офсет меньше log start offset (данные уже удалены retention-политикой). В этом случае —to-earliest поставит группу на минимально доступный офсет.

 

Защищённый кластер. Флаг —command-config

Если кластер использует TLS или SASL-аутентификацию, все команды нужно дополнить флагом —command-config с путём к properties-файлу.

# Пример client.properties для SASL/PLAIN
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="admin" password="secret";
kafka-consumer-groups.sh \
  --bootstrap-server kafka-broker:9093 \
  --describe \
  --group analytics-group \
  --command-config /etc/kafka/client.properties

 

Apache Kafka: администрирование кластера

Код курса
KAFKA
Ближайшая дата курса
13 июля, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

Что дальше

Мы разобрали kafka-consumer-groups.sh — от просмотра lag-а до точечного управления офсетами. Следующая утилита, урок 20, посвящена kafka-streams-application-reset.sh. Это специализированный инструмент для Kafka Streams-приложений: сброс внутреннего состояния стримингового приложения, очистка internal-топиков и offset-коммитов одной командой.

Если хочется разобраться глубже с тем, как Kafka Streams управляет состоянием, — это тема курса «Apache Kafka для инженеров данных».

Источники

Все уроки курса

Тема URL
1 Установка Kafka с Zookeeper https://bigdataschool.ru/blog/news/lesson1-kafka-zookeeper-install/
2 Установка Kafka в режиме KRaft https://bigdataschool.ru/blog/news/lesson2-kafka-kraft-install/
3 Docker KRaft: однонодовый кластер https://bigdataschool.ru/blog/news/lesson3-kafka-docker-single/
4 Docker KRaft: 3-нодовый кластер https://bigdataschool.ru/blog/news/lesson4-kafka-docker-cluster/
5 Утилиты bin/: переменные окружения и основы https://bigdataschool.ru/blog/news/lesson5-kafka-bin-intro/
6 kafka-topics.sh: управление топиками https://bigdataschool.ru/blog/news/lesson6-kafka-topics/
7 kafka-console-producer.sh https://bigdataschool.ru/blog/news/lesson7-kafka-console-producer/
8 kafka-console-consumer.sh https://bigdataschool.ru/blog/news/lesson8-kafka-console-consumer/
9 kafka-server-start.sh / kafka-server-stop.sh https://bigdataschool.ru/blog/news/lesson9-kafka-server-start-stop/
10 kafka-storage.sh https://bigdataschool.ru/blog/news/lesson10-kafka-storage/
11 kafka-cluster.sh https://bigdataschool.ru/blog/news/lesson11-kafka-cluster/
12 kafka-metadata-quorum.sh https://bigdataschool.ru/blog/news/lesson12-kafka-metadata-quorum/
13 kafka-metadata-shell.sh https://bigdataschool.ru/blog/news/lesson13-kafka-metadata-shell/
14 kafka-features.sh https://bigdataschool.ru/blog/news/lesson14-kafka-features/
15 kafka-configs.sh https://bigdataschool.ru/blog/news/lesson15-kafka-configs/
16 kafka-log-dirs.sh https://bigdataschool.ru/blog/news/lesson16-kafka-log-dirs/
17 kafka-dump-log.sh https://bigdataschool.ru/blog/news/lesson17-kafka-dump-log/
18 kafka-delete-records.sh https://bigdataschool.ru/blog/news/lesson18-kafka-delete-records/
19 kafka-consumer-groups.sh https://bigdataschool.ru/blog/news/lesson19-kafka-consumer-groups/
20 kafka-streams-application-reset.sh https://bigdataschool.ru/blog/news/lesson20-kafka-streams-reset/
21 kafka-leader-election.sh https://bigdataschool.ru/blog/news/lesson21-kafka-leader-election/
22 kafka-reassign-partitions.sh https://bigdataschool.ru/blog/news/lesson22-kafka-reassign-partitions/
23 kafka-replica-verification.sh https://bigdataschool.ru/blog/news/lesson23-kafka-replica-verification/
24 kafka-acls.sh https://bigdataschool.ru/blog/news/lesson24-kafka-acls/
25 kafka-broker-api-versions.sh https://bigdataschool.ru/blog/news/lesson25-kafka-broker-api-versions/
26 kafka-get-offsets.sh https://bigdataschool.ru/blog/news/lesson26-kafka-get-offsets/
27 kafka-verifiable-producer/consumer.sh https://bigdataschool.ru/blog/news/lesson27-kafka-verifiable/
28 kafka-producer-perf-test.sh https://bigdataschool.ru/blog/news/lesson28-kafka-producer-perf/
29 kafka-consumer-perf-test.sh https://bigdataschool.ru/blog/news/lesson29-kafka-consumer-perf/
30 kafka-mirror-maker.sh https://bigdataschool.ru/blog/news/lesson30-kafka-mirror-maker/
31 connect-standalone.sh https://bigdataschool.ru/blog/news/lesson31-kafka-connect-standalone/
32 connect-distributed.sh https://bigdataschool.ru/blog/news/lesson32-kafka-connect-distributed/
33 kcat. Альтернативный CLI https://bigdataschool.ru/blog/news/lesson33-kcat/