Содержание
- Что такое consumer group и почему это важно
- Флаги kafka-consumer-groups.sh
- Просмотр всех consumer groups. Флаг --list
- Диагностика группы. Флаг --describe
- Просмотр участников и состояния группы
- Сброс офсетов. Флаг --reset-offsets
- Предпросмотр сброса перед применением
- Сброс к началу топика
- Пропустить всё накопленное. Сброс к концу
- Сброс к конкретному офсету
- Сброс по времени
- Сдвиг офсета на N позиций
- Экспорт и импорт офсетов
- Экспорт офсетов в CSV
- Восстановление офсетов из файла
- Удаление группы и офсетов
- Работа с несколькими группами сразу
- Admin API как альтернатива
- Типовые сценарии и диагностика
- Растущий lag - что проверить
- Ошибки при сбросе офсетов
- Защищённый кластер. Флаг --command-config
- Что дальше
- Источники
- Все уроки курса
В уроке 18 мы разобрали kafka-delete-records.sh — смещали нижнюю границу лога, делая ненужные записи недоступными для консьюмеров. Там уже мелькало понятие consumer group offset: Kafka запоминает, до какого сообщения дочитала каждая группа, и именно это позволяет консьюмерам продолжать с нужного места.
Сегодня разбираем утилиту, которая управляет этим механизмом напрямую. kafka-consumer-groups.sh — главный инструмент для работы с группами консьюмеров: смотреть их состояние, измерять lag, сбрасывать и экспортировать офсеты, удалять группы. Без неё не обходится ни одна диагностика задержек в production-кластере.
Тема consumer groups разбирается на уровне внутренних механизмов в курсе «Администрирование кластера Kafka» — там же лабораторные по диагностике lag-а и управлению офсетами в условиях нагрузки.
Что такое consumer group и почему это важно
Consumer group — группа консьюмеров, которые вместе читают один топик. Каждая партиция в топике назначается ровно одному консьюмеру в группе. Если консьюмеров меньше, чем партиций, один консьюмер может читать сразу несколько. Если больше — лишние просто ждут.
Kafka хранит текущий офсет каждой группы в системном топике __consumer_offsets. Именно оттуда утилита и берёт данные при вызове. Главная метрика, за которой следят в продакшне, — consumer lag. Это разница между последним сообщением в партиции и тем, до куда дочитала группа. Если lag растёт — консьюмеры не успевают за продюсером.
Флаги kafka-consumer-groups.sh
Утилита работает по принципу «одна операция — один запуск». Флаги разбиваются на три группы: подключение, действие и уточнение действия.
| Флаг | Что делает | Обязательный |
|---|---|---|
| —bootstrap-server | Адрес брокера (host:port) | да |
| —list | Вывести список всех consumer groups | нет |
| —describe | Детальная информация о группе: партиции, офсеты, lag | нет |
| —group | Имя конкретной группы для действия | зависит от операции |
| —all-groups | Применить операцию ко всем группам | нет |
| —topic | Ограничить операцию конкретным топиком (или топиком:партицией) | нет |
| —all-topics | Применить операцию ко всем топикам группы | нет |
| —reset-offsets | Сбросить офсеты группы (требует стратегии и —execute или —dry-run) | нет |
| —to-earliest | Стратегия сброса — к первому доступному сообщению | нет |
| —to-latest | Стратегия сброса — к последнему сообщению (пропустить всё накопленное) | нет |
| —to-offset N | Стратегия сброса — к конкретному офсету | нет |
| —to-datetime | Стратегия сброса — к офсету, соответствующему метке времени (ISO-8601) | нет |
| —by-duration | Стратегия сброса — назад на длительность в формате ISO 8601 Duration (PT1H, P1D) | нет |
| —shift-by N | Стратегия сброса — сдвинуть офсет на N позиций (может быть отрицательным) | нет |
| —execute | Применить сброс офсетов (без него — только предпросмотр) | нет |
| —dry-run | Показать, что изменится, без реального применения | нет |
| —export | Экспортировать текущие офсеты группы в CSV | нет |
| —from-file | Загрузить офсеты из CSV-файла (обратная операция к —export) | нет |
| —delete | Удалить группу (группа должна быть неактивна) | нет |
| —delete-offsets | Удалить сохранённые офсеты группы для конкретного топика | нет |
| —members | Вместе с —describe показать участников группы с назначенными партициями | нет |
| —state | Вместе с —describe показать состояние группы (Stable, Empty, PreparingRebalance…) | нет |
| —verbose | Добавить колонку с preferred leader в вывод —describe | нет |
| —offsets | Показать офсеты (используется с —describe, включён по умолчанию) | нет |
| —command-config | Properties-файл с настройками TLS/SASL для защищённого кластера | нет |
| —timeout | Таймаут ожидания ответа брокера в миллисекундах (по умолчанию 5000) | нет |
Флаги —reset-offsets и —delete работают только на неактивных группах — у которых нет запущенных консьюмеров в данный момент.
Просмотр всех consumer groups. Флаг —list
Самая простая операция — посмотреть, какие группы вообще есть в кластере.
kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --list
Вывод будет простым — по одной группе на строку:
analytics-group payment-processor console-consumer-12345 dead-letter-handler
Группы с автоматическими именами вида console-consumer-XXXXX — это временные группы, которые создаёт kafka-console-consumer.sh без явного флага —group. Обычно их можно смело удалять, если соответствующих консьюмеров уже нет.
Диагностика группы. Флаг —describe
Это самая используемая операция в повседневной работе. Показывает состояние группы по каждой партиции каждого топика, который она читает.
kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --describe \ --group analytics-group
Типичный вывод:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID analytics-group orders 0 490 500 10 consumer-1-abc123-def456 /10.0.0.5 my-consumer analytics-group orders 1 480 480 0 consumer-1-abc123-def456 /10.0.0.5 my-consumer analytics-group orders 2 495 520 25 consumer-2-xyz789 /10.0.0.6 my-consumer
Разбор колонок:
- CURRENT-OFFSET — офсет, до которого дочитала группа в этой партиции (последний закоммиченный).
- LOG-END-OFFSET — офсет следующей записи, которую добавит продюсер. Иными словами — текущий конец лога.
- LAG — разница между LOG-END-OFFSET и CURRENT-OFFSET. Ноль — группа в актуальном состоянии. Растущее число — консьюмер не успевает.
- CONSUMER-ID — внутренний идентификатор экземпляра консьюмера.
- HOST — адрес машины, где запущен консьюмер.
Если поле CONSUMER-ID пустое — группа существует, но ни один консьюмер сейчас к ней не подключён. Это нормально для завершивших работу процессов, офсеты при этом сохраняются.
Просмотр участников и состояния группы
Флаги —members и —state расширяют вывод —describe разными срезами данных.
# Показать участников группы и назначенные им партиции kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --describe \ --group analytics-group \ --members --verbose
# Показать состояние группы (Stable, Empty, PreparingRebalance) kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --describe \ --group analytics-group \ --state
Состояния группы, которые покажет —state: Stable — всё нормально, Empty — нет активных консьюмеров, PreparingRebalance — идёт перераспределение партиций, Dead — группа удалена или сломана координатором.
Сброс офсетов. Флаг —reset-offsets
Сброс офсетов нужен в нескольких типичных сценариях: перечитать топик с начала для повторной обработки, пропустить накопившийся lag, восстановить офсеты после инцидента. Прежде чем что-то применять, всегда используйте —dry-run — это покажет результат без реальных изменений.
Предпросмотр сброса перед применением
# Проверено: Apache Kafka 4.2.0, Ubuntu 22.04 # Посмотреть, что изменится, без применения kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --reset-offsets \ --group analytics-group \ --topic orders \ --to-earliest \ --dry-run
Вывод сухого прогона покажет таблицу с текущим и новым офсетом для каждой партиции. Применения не происходит.
Сброс к началу топика
# Перечитать всё с первого доступного сообщения kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --reset-offsets \ --group analytics-group \ --topic orders \ --to-earliest \ --execute
Группа должна быть неактивна — все консьюмеры остановлены. Иначе команда завершится с ошибкой.
Пропустить всё накопленное. Сброс к концу
# Пропустить весь накопленный lag, начать читать новые сообщения kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --reset-offsets \ --group analytics-group \ --all-topics \ --to-latest \ --execute
Сброс к конкретному офсету
# Установить офсет 350 для партиции 0 топика orders kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --reset-offsets \ --group analytics-group \ --topic orders:0 \ --to-offset 350 \ --execute
Синтаксис —topic orders:0 позволяет ограничить операцию конкретной партицией, не трогая остальные.
Сброс по времени
# Перемотать к конкретному моменту времени kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --reset-offsets \ --group analytics-group \ --topic orders \ --to-datetime 2025-04-01T00:00:00.000 \ --execute
# Откатиться назад на 2 часа kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --reset-offsets \ --group analytics-group \ --all-topics \ --by-duration PT2H \ --execute
Формат PT2H — это ISO 8601 Duration. PT1H — час, PT30M — 30 минут, P1D — сутки.
Сдвиг офсета на N позиций
# Откатиться на 100 сообщений назад kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --reset-offsets \ --group analytics-group \ --topic orders \ --shift-by -100 \ --execute
Отрицательное значение — откат назад. Положительное — пропустить вперёд. Полезно, когда нужно перечитать небольшой диапазон после сбоя в приложении.
Экспорт и импорт офсетов
Бывает нужно зафиксировать текущее состояние офсетов перед рискованной операцией или перенести их между кластерами. Для этого есть пара —export и —from-file.
Экспорт офсетов в CSV
# Сохранить текущие офсеты группы в файл kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --reset-offsets \ --group analytics-group \ --all-topics \ --to-current \ --export \ --dry-run > /tmp/analytics-offsets.csv
CSV-файл будет содержать строки вида orders,0,490 — топик, партиция, офсет.
Восстановление офсетов из файла
# Восстановить офсеты из сохранённого файла kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --reset-offsets \ --group analytics-group \ --from-file /tmp/analytics-offsets.csv \ --execute
Это работает как точка отката. Сначала экспорт — потом операция — потом при необходимости импорт обратно.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
24 августа, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Удаление группы и офсетов
Удалить группу можно только если она неактивна. При удалении стираются все сохранённые офсеты этой группы.
# Удалить группу целиком (вместе с её офсетами) kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --delete \ --group console-consumer-12345
Если нужно удалить только офсеты группы для конкретного топика, не трогая остальные — используйте —delete-offsets:
# Удалить только офсеты для топика orders, остальные топики группы не трогать kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --delete-offsets \ --group analytics-group \ --topic orders
Ответ будет содержать список партиций и статус удаления для каждой из них.
Работа с несколькими группами сразу
Флаг —all-groups позволяет применить операцию ко всем группам кластера. Используйте осторожно — особенно с —reset-offsets и —delete.
# Описать все группы сразу - удобно для общего мониторинга kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --describe \ --all-groups
# Показать состояние всех групп (Stable/Empty/Dead) kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --describe \ --all-groups \ --state
Вывод —all-groups —state даёт быстрый срез по кластеру: сразу видно, какие группы в Stable, какие Empty, какие застряли в PreparingRebalance.
Admin API как альтернатива
Всё, что делает kafka-consumer-groups.sh, доступно через Java AdminClient. Это прямая альтернатива утилите для задач автоматизации — в тех случаях, когда скрипты неудобны или нужна интеграция с приложением.
| Операция в утилите | Метод AdminClient |
|---|---|
| —list | listConsumerGroups() |
| —describe | describeConsumerGroups() |
| —describe (офсеты) | listConsumerGroupOffsets() |
| —reset-offsets —execute | alterConsumerGroupOffsets() |
| —delete-offsets | deleteConsumerGroupOffsets() |
| —delete | deleteConsumerGroups() |
| —describe —members | describeConsumerGroups() (поле members в MemberDescription) |
Для Python-разработчиков те же операции доступны через confluent-kafka (AdminClient) или kafka-python. В курсе «Apache Kafka для инженеров данных» работа с AdminClient разбирается в блоке операционных задач на уровне кода.
Типовые сценарии и диагностика
Растущий lag — что проверить
Если lag по группе постоянно растёт, алгоритм диагностики простой.
- Проверить, активна ли группа. Запустить —describe —members: если CONSUMER-ID пустой у всех партиций — консьюмеры остановлены. Lag растёт просто потому, что никто не читает.
- Посмотреть, равномерно ли распределён lag. Если в одной партиции lag нулевой, а в другой огромный — скорее всего медленный консьюмер или проблема в логике обработки одного конкретного ключа.
- Проверить состояние группы через —state. PreparingRebalance на длительное время — признак нестабильного консьюмера, который постоянно перерегистрируется.
После диагностики — сначала решить проблему на стороне консьюмера, и только потом думать о сбросе офсетов.
Ошибки при сбросе офсетов
Самая частая ошибка: Assignments can only be reset if the group ‘X’ is inactive. Значит, в группе ещё работает хотя бы один активный консьюмер. Нужно остановить все экземпляры и повторить.
Вторая по частоте: смещение вышло за пределы available range — указанный офсет меньше log start offset (данные уже удалены retention-политикой). В этом случае —to-earliest поставит группу на минимально доступный офсет.
Защищённый кластер. Флаг —command-config
Если кластер использует TLS или SASL-аутентификацию, все команды нужно дополнить флагом —command-config с путём к properties-файлу.
# Пример client.properties для SASL/PLAIN security.protocol=SASL_SSL sasl.mechanism=PLAIN sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \ username="admin" password="secret";
kafka-consumer-groups.sh \ --bootstrap-server kafka-broker:9093 \ --describe \ --group analytics-group \ --command-config /etc/kafka/client.properties
Apache Kafka: администрирование кластера
Код курса
KAFKA
Ближайшая дата курса
13 июля, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Что дальше
Мы разобрали kafka-consumer-groups.sh — от просмотра lag-а до точечного управления офсетами. Следующая утилита, урок 20, посвящена kafka-streams-application-reset.sh. Это специализированный инструмент для Kafka Streams-приложений: сброс внутреннего состояния стримингового приложения, очистка internal-топиков и offset-коммитов одной командой.
Если хочется разобраться глубже с тем, как Kafka Streams управляет состоянием, — это тема курса «Apache Kafka для инженеров данных».
Источники
- Apache Kafka 4.x Documentation. Consumer Group Operations
- Apache Kafka 4.x Documentation. Consumer Configs
- Confluent. Kafka Consumer Group Management (2025)
- Confluent. Reset Consumer Group Offsets (2025)
- Apache Kafka 4.0 JavaDoc. AdminClient

