Содержание
- Зачем нужны эти две утилиты kafka-verifiable-producer.sh и kafka-verifiable-consumer.sh
- Базовый запуск утилиты kafka-verifiable-producer.sh
- Управление производительностью и параметрами отправки
- Базовый запуск консьюмера kafka-verifiable-consumer.sh
- Сценарий проверки после замены брокера
- Проверка топика с несколькими партициями
- Дополнительные флаги и тонкости
- Что дальше
- Референсные ссылки
- Все уроки курса
В уроке 26 мы работали с kafka-get-offsets.sh и разбирались, как читать офсеты топиков: где сейчас начало лога, где конец, сколько сообщений накопилось на каждой партиции. Офсеты дают общую картину, но не отвечают на вопрос, работает ли кластер так, как ожидается, когда по нему идёт реальный поток данных.
Именно для такой проверки в Kafka есть пара специальных утилит: kafka-verifiable-producer.sh и kafka-verifiable-consumer.sh. Они работают в паре и умеют генерировать пронумерованный поток сообщений, а потом сверить, все ли из них дошли и в каком порядке. Это не production-инструмент и не замена перф-тестам — это именно инструмент проверки корректности, который особенно полезен после изменений в кластере.
Настройка надёжности доставки в production-кластерах, работа с acks, min.insync.replicas и гарантиями exactly-once — всё это разбирается в курсе «Администрирование кластера Kafka» и в курсе «Apache Kafka для инженеров данных».
Зачем нужны эти две утилиты kafka-verifiable-producer.sh и kafka-verifiable-consumer.sh
У Kafka есть kafka-console-producer.sh и kafka-console-consumer.sh — ими мы пользовались в уроках 7 и 8. Они хороши для ручной работы: отправить тестовое сообщение, прочитать что-то из топика. Но у них нет механизма сверки: было отправлено 1000 сообщений, сколько дошло? Потерялись ли какие-то? Нарушился ли порядок?
Verifiable-утилиты закрывают именно этот пробел. Продюсер отправляет сообщения с монотонно растущим счётчиком и пишет в stdout каждое событие в формате JSON: подтверждение отправки, ошибки, финальную статистику. Консьюмер читает те же сообщения и проверяет последовательность по каждой партиции. Если в партиции номер прыгнул (скажем, пришло 5, потом 7 без 6) — это дубликат или потеря, и консьюмер это зафиксирует.
Типичные сценарии использования: проверить кластер после замены брокера, убедиться, что настройки acks и replication.factor работают как задумано, воспроизвести условия сбоя контролируемым способом.
Базовый запуск утилиты kafka-verifiable-producer.sh
Минимальная команда для kafka-verifiable-producer.sh требует трёх флагов: брокер, топик и количество сообщений.
# Отправляем 100 пронумерованных сообщений в топик test-verify kafka-verifiable-producer.sh \ --bootstrap-server localhost:9092 \ --topic test-verify \ --max-messages 100
Каждое подтверждение от брокера выводится отдельной строкой JSON. После завершения утилита печатает итоговую статистику:
# Пример вывода (фрагмент)
{"timestamp":1716800001234,"name":"producer_send_success","key":null,"value":"0","offset":0,"partition":0,"topic":"test-verify"}
{"timestamp":1716800001235,"name":"producer_send_success","key":null,"value":"1","offset":1,"partition":0,"topic":"test-verify"}
...
{"timestamp":1716800002100,"name":"shutdown_complete"}
{"timestamp":1716800002101,"name":"tool_data","sent":100,"acked":100,"target_throughput":-1,"avg_throughput":89.3}
Поле value — это порядковый номер сообщения, начиная с 0. Поле name показывает тип события: producer_send_success означает, что брокер подтвердил запись. Если что-то пошло не так, появится producer_send_error с деталями.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
24 августа, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Управление производительностью и параметрами отправки
По умолчанию продюсер отправляет столько сообщений, сколько успеет за итерацию. Можно задать скорость через —throughput и изменить настройки подтверждений:
# Отправляем 500 сообщений, не быстрее 50 в секунду # acks=all гарантирует подтверждение от всех ISR-реплик kafka-verifiable-producer.sh \ --bootstrap-server localhost:9092 \ --topic test-verify \ --max-messages 500 \ --throughput 50 \ --acks -1
Значение —acks -1 соответствует acks=all. Именно с этой настройкой интересно тестировать поведение кластера при выводе брокера из строя: часть сообщений не подтвердится и попадёт в producer_send_error.
Флаг —value-prefix позволяет добавить строковый префикс к значению каждого сообщения. Это полезно, когда несколько продюсеров пишут в один топик и нужно отличить потоки:
# Два продюсера с разными префиксами в один топик kafka-verifiable-producer.sh \ --bootstrap-server localhost:9092 \ --topic test-verify \ --max-messages 200 \ --value-prefix "producer-A" & kafka-verifiable-producer.sh \ --bootstrap-server localhost:9092 \ --topic test-verify \ --max-messages 200 \ --value-prefix "producer-B" &
Базовый запуск консьюмера kafka-verifiable-consumer.sh
Консьюмер запускается параллельно с продюсером или после него. Главная задача — прочитать сообщения и проверить, нет ли пропусков или дубликатов в счётчике по каждой партиции.
# Читаем из топика test-verify, ждём 100 сообщений и выходим kafka-verifiable-consumer.sh \ --bootstrap-server localhost:9092 \ --topic test-verify \ --group-id verify-group \ --max-messages 100
Консьюмер тоже пишет события в JSON. Вот что выводится в штатном случае:
# Пример вывода (фрагмент)
{"timestamp":1716800010001,"name":"startup_complete"}
{"timestamp":1716800010100,"name":"partitions_assigned","partitions":[{"topic":"test-verify","partition":0}]}
{"timestamp":1716800010200,"name":"records_consumed","count":50,"partitions":[{"topic":"test-verify","partition":0,"minOffset":0,"maxOffset":49}]}
{"timestamp":1716800010300,"name":"records_consumed","count":50,"partitions":[{"topic":"test-verify","partition":0,"minOffset":50,"maxOffset":99}]}
{"timestamp":1716800010310,"name":"offsets_committed","offsets":[{"topic":"test-verify","partition":0,"offset":100}],"success":true}
{"timestamp":1716800010320,"name":"shutdown_complete"}
{"timestamp":1716800010321,"name":"tool_data","consumed":100}
Событие records_consumed показывает диапазон офсетов в пакете. Если в партиции 0 офсеты пошли не подряд — консьюмер напечатает out_of_order с указанием, какое значение ожидалось и какое пришло. Событие offsets_committed подтверждает, что группа зафиксировала позицию.

Сценарий проверки после замены брокера
Один из самых практичных сценариев — проверить, что кластер не теряет сообщения при выводе брокера из строя. Запускаем продюсер в фоне, затем убиваем один брокер, затем читаем результат консьюмером.
# Шаг 1. Создаём топик с replication.factor=3 kafka-topics.sh \ --bootstrap-server localhost:9092 \ --create \ --topic failover-test \ --partitions 3 \ --replication-factor 3 # Шаг 2. Запускаем продюсер в фоне (1000 сообщений, 20 в секунду) kafka-verifiable-producer.sh \ --bootstrap-server localhost:9092 \ --topic failover-test \ --max-messages 1000 \ --throughput 20 \ --acks -1 > /tmp/producer-output.json & PRODUCER_PID=$! # Шаг 3. Через 10 секунд убиваем один брокер (например, контейнер broker-2) sleep 10 docker stop broker-2 # Шаг 4. Ждём завершения продюсера wait $PRODUCER_PID # Шаг 5. Смотрим, сколько было ошибок grep "producer_send_error" /tmp/producer-output.json | wc -l grep "tool_data" /tmp/producer-output.json
С acks=-1 и min.insync.replicas=2 ожидаем, что небольшое количество сообщений не подтвердится в момент выбора нового лидера. Это нормальное поведение — вопрос в том, сколько именно и восстановился ли кластер. Теперь читаем консьюмером:
# Читаем все сообщения из топика с самого начала kafka-verifiable-consumer.sh \ --bootstrap-server localhost:9092 \ --topic failover-test \ --group-id failover-check \ --reset-policy earliest \ --max-messages 1000 > /tmp/consumer-output.json # Проверяем наличие нарушений порядка grep "out_of_order" /tmp/consumer-output.json # Итоговая статистика grep "tool_data" /tmp/consumer-output.json
Если строк out_of_order нет — порядок в партициях сохранился. Если consumed меньше acked из вывода продюсера — часть сообщений до брокеров не добралась. Это уже повод разбираться с настройками.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
24 августа, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Проверка топика с несколькими партициями
Когда в топике несколько партиций и несколько консьюмеров в группе, verifiable-consumer особенно удобен: он фиксирует, какая партиция назначена какому инстансу, и отслеживает последовательность независимо по каждой.
# Запускаем двух консьюмеров в одной группе в разных терминалах # Терминал 1 kafka-verifiable-consumer.sh \ --bootstrap-server localhost:9092 \ --topic failover-test \ --group-id verify-multipart \ --max-messages 500 # Терминал 2 kafka-verifiable-consumer.sh \ --bootstrap-server localhost:9092 \ --topic failover-test \ --group-id verify-multipart \ --max-messages 500
Каждый инстанс получит часть партиций через consumer group rebalance. Событие partitions_assigned в выводе покажет, какие партиции достались именно этому инстансу.
Дополнительные флаги и тонкости
Оба инструмента поддерживают несколько полезных настроек, о которых стоит знать.
Для продюсера:
—producer.config path/to/config — передать файл с дополнительными настройками, например SSL или SASL-параметры. Тот же подход, что и в kafka-console-producer.sh.
—message-create-time — задать timestamp для всех сообщений в миллисекундах. Полезно для тестирования retention-политик по времени.
—num-records — синоним для —max-messages, принимается в некоторых версиях утилиты.
Для консьюмера:
—consumer.config path/to/config — аналогично продюсеру, для SSL/SASL.
—reset-policy — стратегия для новых групп: earliest (читать с начала) или latest (только новые). По умолчанию earliest.
—session-timeout — таймаут сессии в миллисекундах. Если консьюмер не шлёт heartbeat дольше этого времени — группа считает его мёртвым и запускает rebalance.
—enable-autocommit — включить автоматический коммит офсетов. По умолчанию консьюмер коммитит офсеты вручную при каждом batch, что даёт более точный контроль в тестах.
Что дальше
В уроке 28 переходим к нагрузочному тестированию: kafka-producer-perf-test.sh умеет гнать заданное количество байт в секунду и измерять реальную пропускную способность кластера. Это уже не проверка корректности, а бенчмарк — другой класс задач.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
24 августа, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Референсные ссылки
- Apache Kafka Documentation. Operations: Consumer Tools (2025)
- Apache Kafka 4.0 Official Documentation (2025)
- Confluent Developer. KRaft: Apache Kafka without ZooKeeper (2025)
- KIP-98. Exactly Once Delivery and Transactional Messaging (обновлено 2025)
- Confluent Blog. Configuring Kafka for Low Latency (2025)
Все уроки курса
