Содержание
- Как работает kafka-console-consumer.sh
- Чтение с начала топика. Флаг --from-beginning
- Ограничение количества сообщений
- Consumer-группы и флаг --group
- Как Kafka распределяет партиции между консьюмерами
- Вывод ключей, временных меток и заголовков
- Чтение из конкретной партиции и управление offset
- Подписка на несколько топиков
- Важные флаги kafka-console-consumer.sh
- Альтернативы. kcat и Python
- kcat (kafkacat)
- Python. KafkaConsumer
- Сравнение инструментов
- Типичные ошибки при работе с консьюмером
- Что дальше
- Источники
- Все уроки курса
В прошлом уроке мы научились отправлять сообщения через kafka-console-producer.sh: вводить строки вручную, работать с ключами и загружать данные из файла. Теперь нужно научиться их читать. За это отвечает kafka-console-consumer.sh — стандартная утилита для чтения сообщений из топика прямо в терминале.
Это инструмент из той же категории «быстро проверить»: подключился, прочитал несколько сообщений, убедился, что всё дошло и лежит в правильном порядке. Без кода, без внешних клиентов. Также она помогает разобраться с тем, как работают consumer-группы и смещения (offsets) — ключевые концепции Kafka, которые иначе сложно потрогать руками.
В этом уроке разберём синтаксис, режимы чтения, группы консьюмеров, управление смещениями и альтернативные инструменты для тех же задач.
Как работает kafka-console-consumer.sh
kafka-console-consumer.sh подключается к брокеру Kafka, подписывается на один или несколько топиков и выводит сообщения в stdout. Каждое сообщение печатается отдельной строкой. По умолчанию утилита не читает историю — она стартует с конца топика и ждёт новых сообщений.
Минимальная команда для запуска выглядит так:
kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic orders
После запуска консьюмер висит и ждёт новых сообщений. Как только продюсер что-то отправит — сообщение сразу появится в терминале. Завершить работу можно через Ctrl+C.
Обязательных флагов, как и у продюсера, два: —bootstrap-server задаёт адрес брокера, —topic — имя топика для чтения.
Чтение с начала топика. Флаг —from-beginning
Самый частый вопрос у новичков: «а куда делись мои сообщения?». Никуда. Просто консьюмер по умолчанию начинает читать с конца. Если топик уже содержит данные, запущенный без дополнительных флагов консьюмер их не покажет — он ждёт только то, что придёт после его старта.
Чтобы прочитать всё, что уже есть в топике, используется флаг —from-beginning:
kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic orders \ --from-beginning
Консьюмер прочитает все доступные сообщения от offset 0 и продолжит ждать новые. Важно понимать: «от начала» означает от минимального доступного смещения, а не от момента создания топика. Если для топика настроено удаление старых сегментов (retention.ms), часть ранних сообщений может уже не существовать.
Ограничение количества сообщений
Иногда нужно прочитать ровно N сообщений и завершиться. Флаг —max-messages задаёт это ограничение:
kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic orders \ --from-beginning \ --max-messages 10
После получения десяти сообщений консьюмер завершится автоматически. Это удобно для скриптов, где нужно забрать порцию данных и двигаться дальше.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
24 августа, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Consumer-группы и флаг —group
Одна из ключевых особенностей Kafka — механизм consumer-групп. Несколько консьюмеров, объединённых в одну группу, делят топик между собой: каждая партиция назначается ровно одному консьюмеру в группе. Это основа горизонтального масштабирования чтения.
По умолчанию kafka-console-consumer.sh создаёт группу с автоматически сгенерированным именем (что-то вроде console-consumer-12345). Чтобы указать конкретное имя группы, используется флаг —group:
kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic orders \ --group my-consumer-group
Зачем именовать группу?
- Во-первых, Kafka запоминает, до какого offset дочитала группа. Если консьюмер завершился и запустился снова с тем же именем группы — он продолжит с того места, где остановился.
- Во-вторых, состояние группы можно проверить через kafka-consumer-groups.sh (урок 19) — посмотреть lag, offset и статус каждой партиции.
Как Kafka распределяет партиции между консьюмерами
Представим топик с тремя партициями и группу из двух консьюмеров. Kafka автоматически назначит партиции между ними: например, первому консьюмеру уйдут партиции 0 и 1, второму — партиция 2. Если один из консьюмеров упадёт, Kafka перераспределит партиции на оставшихся в течение нескольких секунд (rebalance).
Если запустить третий консьюмер с тем же именем группы, произойдёт rebalance и каждому достанется по одной партиции. Четвёртый консьюмер останется без партиций — партиций меньше, чем участников группы.
Вывод ключей, временных меток и заголовков
По умолчанию консьюмер выводит только значение сообщения. Но сообщение в Kafka содержит больше: ключ, временную метку, заголовки (headers) и номер offset. Всё это можно включить через флаги —property.
Для вывода ключа вместе со значением:
kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic orders \ --from-beginning \ --property print.key=true \ --property key.separator=" => "
Вывод будет выглядеть так: user-123 => {«action»:»buy»,»item»:»book»}. Разделитель между ключом и значением задаётся через key.separator. По умолчанию это символ табуляции, что не всегда удобно для чтения.
Дополнительные свойства для расширенного вывода:
- print.timestamp=true — добавляет временную метку сообщения в формате CreateTime:1234567890.
- print.offset=true — показывает номер смещения (offset) каждого сообщения в его партиции.
- print.partition=true — показывает, из какой партиции пришло сообщение.
- print.headers=true — выводит заголовки сообщения (если они есть).
- print.value=false — отключает вывод самого значения. Полезно, если нужны только метаданные.
Пример с полным набором метаданных — offset, партиция, ключ и значение:
kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic orders \ --from-beginning \ --property print.offset=true \ --property print.partition=true \ --property print.key=true \ --property key.separator=" | "
Такой формат вывода удобен при отладке: сразу видно, из какой партиции пришло сообщение, какой у него offset и что лежит в ключе.
Чтение из конкретной партиции и управление offset
Иногда нужно прочитать не весь топик, а одну конкретную партицию — и начать не с начала, а с заданного смещения. Для этого есть флаги —partition и —offset.
kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --topic orders \ --partition 0 \ --offset 5
Эта команда начнёт читать партицию 0 с offset 5 включительно. Важный момент: при использовании —partition и —offset флаг —group не работает — консьюмер читает в режиме без группы и не сохраняет offset.
Специальные значения для —offset:
- earliest — читать с самого начала доступных данных (эквивалент —from-beginning для одной партиции).
- latest — начать с конца, ждать только новые сообщения.
- Конкретное число — начать ровно с указанного offset.
Этот режим полезен для диагностики: когда нужно убедиться, что в конкретной партиции лежат ожидаемые данные, или выяснить, что именно находится по заданному смещению.
Подписка на несколько топиков
Консьюмер может читать из нескольких топиков одновременно. Для точного перечисления топиков используется флаг —whitelist с регулярным выражением:
kafka-console-consumer.sh \ --bootstrap-server localhost:9092 \ --whitelist "orders|payments|notifications" \ --group monitoring-group \ --from-beginning
Флаг —whitelist принимает регулярное выражение в стиле Java. Паттерн orders.* подпишет консьюмер на все топики, начинающиеся с «orders»: orders-new, orders-archived и так далее. Это удобно для мониторинга группы связанных топиков.
Важные флаги kafka-console-consumer.sh
Помимо уже рассмотренных, у утилиты есть несколько флагов, которые пригодятся в повседневной работе.
—timeout-ms. Время в миллисекундах, которое консьюмер ждёт новых сообщений. Если за это время ничего не пришло — завершается. По умолчанию ждёт бесконечно.
—skip-message-on-error. Пропускать сообщения, которые не удалось десериализовать, вместо того чтобы падать с ошибкой. Полезно при работе с разнородными данными.
—isolation-level. Уровень изоляции транзакций. Значение read_committed показывает только подтверждённые транзакции, read_uncommitted (по умолчанию) — все.
—formatter. Задаёт класс форматтера для десериализации сообщений. По умолчанию DefaultMessageFormatter. Для Avro и других форматов нужен кастомный форматтер.
—consumer.config. Путь к файлу с дополнительными настройками консьюмера, например параметрами SSL или SASL для аутентификации.
Большинство из этих флагов нужны для специфических сценариев. На практике первые сессии работы с консьюмером обходятся тремя-четырьмя параметрами.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
24 августа, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Альтернативы. kcat и Python
kafka-console-consumer.sh хорош для быстрых проверок, но у него есть ограничения. Нет встроенной поддержки Avro и Schema Registry. Нельзя фильтровать сообщения по содержимому. Вывод всегда текстовый. Для более сложных задач есть два популярных варианта.
kcat (kafkacat)
kcat читает из топика значительно гибче: поддерживает JSON-форматирование вывода, фильтрацию по времени, ограничение по количеству сообщений и полный контроль над offset. Пример чтения последних 5 сообщений из топика:
# Проверено: kcat 1.7.0, Apache Kafka 4.2.0 kcat -b localhost:9092 -t orders -o -5 -e
Флаг -o -5 означает «начать с пятого сообщения с конца», -e — завершиться после прочтения существующих сообщений. Режим consumer в kcat включается флагом -C (или по умолчанию, если не указано действие). Отдельный урок по kcat — урок 33 этого курса.
Python. KafkaConsumer
Для автоматизированного чтения и обработки сообщений в коде используют библиотеку kafka-python. Класс KafkaConsumer подписывается на топик и возвращает сообщения в виде объектов с полным набором метаданных:
# Проверено: Python 3.12, kafka-python 2.0.2, Apache Kafka 4.2.0
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'orders',
bootstrap_servers='localhost:9092',
group_id='my-consumer-group',
auto_offset_reset='earliest',
value_deserializer=lambda v: json.loads(v.decode('utf-8')),
key_deserializer=lambda k: k.decode('utf-8') if k else None
)
for message in consumer:
print(f"Partition: {message.partition} | Offset: {message.offset}")
print(f"Key: {message.key} | Value: {message.value}")
Параметр auto_offset_reset=’earliest’ эквивалентен флагу —from-beginning в консольной утилите. Параметр group_id задаёт имя группы — Kafka запомнит offset и при следующем запуске продолжит с того места, где остановились.
В платном курсе DEVKI. Apache Kafka для инженеров данных тема консьюмеров раскрывается на уровне продакшн-кейсов: управление смещениями вручную, обработка rebalance, exactly-once семантика и работа с Kafka Streams.
Сравнение инструментов
| Инструмент | Установка | Группы | Фильтрация | SSL / SASL | Когда использовать |
|---|---|---|---|---|---|
| kafka-console-consumer.sh | в поставке Kafka | да | нет | частично | быстрый ручной тест |
| kcat | отдельно | да | по времени / offset | да | скрипты, удобный вывод |
| kafka-python KafkaConsumer | pip install | да | в коде | да | обработка сообщений, автоматизация |
Типичные ошибки при работе с консьюмером
Несколько проблем, с которыми сталкиваются чаще всего при первом знакомстве с консьюмером.
- Консьюмер молчит после запуска. Скорее всего, он читает с конца (поведение по умолчанию) и просто ждёт новые сообщения. Добавить —from-beginning, чтобы прочитать то, что уже есть в топике.
- Сообщения не воспроизводятся повторно. Если использовалось именованное имя группы (—group), Kafka уже сохранила offset этой группы. При повторном запуске с тем же именем группы чтение продолжится с конца. Сменить имя группы или добавить —from-beginning вместе со сбросом offset через kafka-consumer-groups.sh —reset-offsets.
- Connection refused. Брокер недоступен или неверный адрес в —bootstrap-server. Проверить, что Kafka запущена и порт 9092 открыт.
- Topic not found. Топик не существует. Создать через kafka-topics.sh —create (урок 6).
- —partition и —group конфликтуют. При явном указании партиции через —partition нельзя одновременно использовать —group для управления offset. Kafka выбросит ошибку. Использовать либо то, либо другое.
Большинство проблем решается добавлением —from-beginning или сменой имени группы. Это первое, что стоит проверить, если консьюмер ведёт себя не так, как ожидалось.
Apache Kafka: администрирование кластера
Код курса
KAFKA
Ближайшая дата курса
13 июля, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Что дальше
Мы умеем создавать топики, отправлять в них сообщения и читать их обратно. Это базовый цикл работы с Kafka. В следующем уроке займёмся управлением самим брокером: kafka-server-start.sh и kafka-server-stop.sh — как запускать, останавливать и перезапускать Kafka в разных окружениях, что происходит при завершении работы и как правильно завершать брокер, не теряя данных.
Если хочется разобраться глубже с тем, как Kafka управляет группами консьюмеров, lag-ом и балансировкой партиций — это тема платного курса KAFKA. Администрирование кластера Kafka. Там разбирают consumer groups на уровне внутренних механизмов, мониторинг и диагностику в продакшн-среде.
Источники
- Apache Kafka 4.x Documentation. Consumer Configs
- Apache Kafka Documentation. Operations. Console Consumer
- Confluent. Kafka Consumer Overview (2025)
- kcat (kafkacat). GitHub. README (2025)
- kafka-python. KafkaConsumer API Reference (2025)

