Изучаем Apache Kafka с нуля. Урок 7. kafka-console-producer — отправка сообщений из командной строки

Изучаем Apache Kafka с нуля. Урок 7. kafka-console-producer — отправка сообщений из командной строки

 

В предыдущем уроке мы научились работать с топиками через kafka-topics.sh: создавали, описывали, меняли настройки и удаляли. Теперь у нас есть готовый топик, и логичный следующий вопрос — как туда что-нибудь положить. Именно этим занимается kafka-console-producer.sh.

Это утилита для ручной отправки сообщений в топик прямо из терминала. Незаменима для быстрых тестов: нужно проверить, что консьюмер читает данные, или убедиться, что партиционирование по ключу работает правильно. Сделали топик, отправили пару сообщений, проверили результат — без единой строки кода.

В этом уроке разберём синтаксис, ключи, свойства producer-а и то, как сообщение попадает в конкретную партицию. А заодно покажем, как сделать то же самое через kcat и Python.

 

Как работает kafka-console-producer.sh

Для начала наверное надо дать краткое определение, что такое продюсер в Kafka: Kafka Producer это программный код, написанный в соответствии с Kafka Producer API,  который исполняется на стороне исходного приложения и используется для отправки  сообщений ( данных приложения) в топик Apache Kafka. На практике продюсер выполняет функцию PUSH для брокера Kafka обеспечивая приницип loosely coupled ( слабая связанность), и добавляя дополнительные возможности по предварительному процессингу сообщений перед отправкой в Kafka (маршрутизация по ключу, подтверждение доставки, сжатие).

kafka-console-producer.sh читает строки из стандартного ввода (stdin) и отправляет каждую строку как отдельное сообщение в указанный топик. По умолчанию одна строка — одно сообщение. Ключ сообщения при этом равен null, а значит Kafka распределяет сообщения по партициям по round-robin.

Базовый синтаксис выглядит так:

kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders

После запуска появляется приглашение >. Вводим текст, нажимаем Enter — сообщение уходит в топик. Завершить сессию можно через Ctrl+C или Ctrl+D.

Обязательных флагов всего два: —bootstrap-server задаёт адрес брокера, —topic — имя топика. Всё остальное — опционально.

 

Отправка сообщений с ключами

Ключ сообщения в Kafka — это не просто метка. Именно по нему Kafka решает, в какую партицию пойдёт сообщение: хэш ключа определяет номер партиции. Если у двух сообщений одинаковый ключ — они гарантированно окажутся в одной партиции и сохранят порядок.

По умолчанию kafka-console-producer.sh не ждёт ключей. Чтобы включить режим key-value, нужно передать три свойства через —property:

kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --property "parse.key=true" \
  --property "key.separator=:"

Теперь каждая строка должна содержать разделитель (в данном случае :). Левая часть становится ключом, правая — значением:

user-123:{"action":"buy","item":"book"}
user-456:{"action":"view","item":"laptop"}
user-123:{"action":"checkout"}

Оба сообщения с ключом user-123 гарантированно окажутся в одной партиции и сохранят порядок. Это важно для сценариев, где нужна последовательность событий по одному объекту — например, серия действий конкретного пользователя или статусы одного заказа.

 

Что будет, если строка не содержит разделитель

Если включён режим parse.key=true, но строка не содержит разделитель — продюсер выбросит ошибку и сообщение не отправится. Это защита от случайных данных без ключа. Если нужно передать сообщение без ключа в этом режиме, проще перезапустить продюсер без свойства parse.key.

 

Важные флаги и свойства

У kafka-console-producer.sh есть несколько рабочих параметров, которые часто нужны на практике. Вот основные из них.

—timeout. Таймаут ожидания подтверждения от брокера в миллисекундах. По умолчанию 1000 мс. Если брокер не отвечает дольше — сообщение считается недоставленным.

—batch-size. Сколько сообщений накапливается перед отправкой пакетом. По умолчанию 200. Увеличение ускоряет пропускную способность, но немного задерживает отправку.

—compression-codec. Сжатие перед отправкой. Доступные значения: none, gzip, snappy, lz4, zstd. По умолчанию сжатия нет.

—sync. Синхронный режим отправки: каждое сообщение ждёт подтверждения брокера перед следующим. Медленнее, но надёжнее для отладки.

—max-block-ms. Максимальное время блокировки продюсера, если буфер заполнен. После этого продюсер выбросит ошибку.

—request-required-acks. Уровень подтверждений: 0 — fire and forget, 1 — подтверждение от лидера, -1 (или all) — от всех ISR-реплик.

Для большинства задач ручного тестирования хватит дефолтных значений. Параметры —compression-codec и —request-required-acks становятся важны, когда нужно воспроизвести поведение продюсера, близкое к продакшн-настройкам.

 

Apache Kafka: администрирование кластера

Код курса
KAFKA
Ближайшая дата курса
13 июля, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

 

Отправка данных из файла

Вводить сообщения вручную удобно для пары строк. Если нужно загрузить в топик содержимое файла — перенаправляем stdin. Каждая строка файла становится отдельным сообщением.

kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  < events.txt

Команда прочитает весь файл, отправит каждую строку как сообщение и завершится. Это удобно для воспроизведения тестовых сценариев или быстрой загрузки заготовленного набора данных.

 

Файл с ключами

Тот же приём работает с режимом key-value. Главное — чтобы разделитель в каждой строке файла совпадал с тем, что указан в key.separator:

kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --property "parse.key=true" \
  --property "key.separator=|" \
  < keyed-events.txt

Содержимое файла keyed-events.txt при этом выглядит так:

order-001|{"status":"created","amount":150}
order-002|{"status":"created","amount":340}
order-001|{"status":"paid"}

Разделитель можно выбрать любой — главное, чтобы он не встречался внутри самих данных.

 

Как сообщение попадает в партицию

Понять распределение по партициям проще через диаграмму. Kafka использует разные стратегии в зависимости от того, есть ли у сообщения ключ.

Как происходит маршрутизация сообщений в топики Kafka продюсером

 

Если ключ не задан, продюсер чередует партиции — это даёт равномерную нагрузку, но порядок сообщений не гарантирован. С ключом одни и те же данные всегда идут в одну партицию, а значит сохраняют порядок относительно друг друга.

Отправка с заголовками сообщений

Начиная с Kafka 2.7, kafka-console-producer.sh поддерживает заголовки сообщений (message headers). Заголовки — это произвольные key-value пары, которые можно прикрепить к сообщению. Они не входят в тело, но доступны консьюмеру.

kafka-console-producer.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --property "headers=source:webapp,env:prod"

Все сообщения в этой сессии получат указанные заголовки. Удобно для пометки тестовых данных по источнику или окружению, чтобы консьюмер мог их фильтровать.

Apache Kafka для инженеров данных

Код курса
DEVKI
Ближайшая дата курса
24 августа, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

Альтернативы kafka-console-producer.sh

Та же задача — отправить сообщение в топик — решается несколькими инструментами. Выбор зависит от контекста.

 

kcat (kafkacat)

kcat — внешняя утилита, которая умеет всё то же самое, но с более лаконичным синтаксисом. Для отправки используется режим -P (producer):

# Проверено: kcat 1.7.0, Apache Kafka 4.2.0
echo "Hello from kcat" | kcat -P \
  -b localhost:9092 \
  -t orders

С ключом и разделителем:

echo "user-123:buy event" | kcat -P \
  -b localhost:9092 \
  -t orders \
  -K :

kcat устанавливается отдельно — он не входит в поставку Apache Kafka. Зато поддерживает SSL, SASL и работает быстрее на больших объёмах данных. Отдельный урок по kcat — в конце курса.

 

Python. kafka-python

Когда нужна логика отправки — циклы, условия, генерация данных — удобнее использовать Python. Библиотека kafka-python предоставляет класс KafkaProducer:

# Проверено: Python 3.12, kafka-python 2.0.2, Apache Kafka 4.2.0
from kafka import KafkaProducer
import json

producer = KafkaProducer(
    bootstrap_servers='localhost:9092',
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    key_serializer=lambda k: k.encode('utf-8')
)

producer.send(
    topic='orders',
    key='user-123',
    value={'action': 'buy', 'item': 'book'}
)

producer.flush()
producer.close()

В платном курсе DEVKI. Apache Kafka для инженеров данных этот же KafkaProducer разбирается детально: сериализаторы, partitioner, acks, idempotent producer и работа с Kafka Streams на реальных кейсах.

 

Сравнение инструментов

Инструмент Установка Ключи Сжатие SSL / SASL Когда использовать
kafka-console-producer.sh в поставке Kafka да да частично быстрый ручной тест
kcat отдельно да да да скриптовая автоматизация
kafka-python KafkaProducer pip install да да да генерация данных, тесты с логикой

 

Типичные ошибки и что с ними делать

При первых запусках часто встречается несколько одинаковых проблем. Лучше знать о них заранее.

  • Connection refused или timeout. Kafka не запущена или указан неверный адрес в —bootstrap-server. Проверить статус: kafka-server-start.sh должен быть запущен, порт 9092 открыт.
  • Topic not found. Топик не существует. Создать через kafka-topics.sh —create или включить auto.create.topics.enable=true в конфигурации брокера.
  • parse.key: no separator found. Включён режим key-value, но строка не содержит заданный разделитель. Либо добавить разделитель в строку, либо убрать parse.key=true.
  • Leader not available. Партиция временно не имеет лидера. Подождать несколько секунд — Kafka переизбирает лидера автоматически. Если ошибка не проходит — проверить состояние брокера.

Большинство этих ошибок диагностируется за пару минут, если знать куда смотреть.

Что дальше

Мы научились отправлять сообщения в Kafka из командной строки — с ключами и без, из файла и через stdin, с разными свойствами продюсера. Разобрались, как ключ влияет на попадание сообщения в партицию.

Следующий урок — kafka-console-consumer.sh. Будем читать то, что только что отправили: группы консьюмеров, чтение с начала, фильтрация по партиции и смещению. Урок 8: kafka-console-consumer.sh.

Если хотите понять продюсеров глубже — настройки acks, идемпотентность, транзакционная запись — всё это разбирается в курсе DEVKI. Apache Kafka для инженеров данных.

Отдельный cheatsheet по всем флагам kafka-console-producer.sh — в бонусном файле к этому уроку.

Источники

Все уроки курса

Тема Ссылка
1 Установка Kafka с Zookeeper https://bigdataschool.ru/blog/news/lesson1-kafka-zookeeper-install/
2 Установка Kafka в режиме KRaft https://bigdataschool.ru/blog/news/lesson2-kafka-kraft-install/
3 Docker KRaft. Однонодовый кластер https://bigdataschool.ru/blog/news/lesson3-kafka-docker-single/
4 Docker KRaft. 3-нодовый кластер https://bigdataschool.ru/blog/news/lesson4-kafka-docker-cluster/
5 Утилиты bin/. Переменные окружения и основы https://bigdataschool.ru/blog/news/lesson5-kafka-bin-intro/
6 kafka-topics.sh. Управление топиками https://bigdataschool.ru/blog/news/lesson6-kafka-topics/
7 kafka-console-producer.sh https://bigdataschool.ru/blog/news/lesson7-kafka-console-producer/
8 kafka-console-consumer.sh https://bigdataschool.ru/blog/news/lesson8-kafka-console-consumer/
9 kafka-server-start.sh / kafka-server-stop.sh https://bigdataschool.ru/blog/news/lesson9-kafka-server-start-stop/
10 kafka-storage.sh https://bigdataschool.ru/blog/news/lesson10-kafka-storage/
11 kafka-cluster.sh https://bigdataschool.ru/blog/news/lesson11-kafka-cluster/
12 kafka-metadata-quorum.sh https://bigdataschool.ru/blog/news/lesson12-kafka-metadata-quorum/
13 kafka-metadata-shell.sh https://bigdataschool.ru/blog/news/lesson13-kafka-metadata-shell/
14 kafka-features.sh https://bigdataschool.ru/blog/news/lesson14-kafka-features/
15 kafka-configs.sh https://bigdataschool.ru/blog/news/lesson15-kafka-configs/
16 kafka-log-dirs.sh https://bigdataschool.ru/blog/news/lesson16-kafka-log-dirs/
17 kafka-dump-log.sh https://bigdataschool.ru/blog/news/lesson17-kafka-dump-log/
18 kafka-delete-records.sh https://bigdataschool.ru/blog/news/lesson18-kafka-delete-records/
19 kafka-consumer-groups.sh https://bigdataschool.ru/blog/news/lesson19-kafka-consumer-groups/
20 kafka-streams-application-reset.sh https://bigdataschool.ru/blog/news/lesson20-kafka-streams-reset/
21 kafka-leader-election.sh https://bigdataschool.ru/blog/news/lesson21-kafka-leader-election/
22 kafka-reassign-partitions.sh https://bigdataschool.ru/blog/news/lesson22-kafka-reassign-partitions/
23 kafka-replica-verification.sh https://bigdataschool.ru/blog/news/lesson23-kafka-replica-verification/
24 kafka-acls.sh https://bigdataschool.ru/blog/news/lesson24-kafka-acls/
25 kafka-broker-api-versions.sh https://bigdataschool.ru/blog/news/lesson25-kafka-broker-api-versions/
26 kafka-get-offsets.sh https://bigdataschool.ru/blog/news/lesson26-kafka-get-offsets/
27 kafka-verifiable-producer/consumer.sh https://bigdataschool.ru/blog/news/lesson27-kafka-verifiable/
28 kafka-producer-perf-test.sh https://bigdataschool.ru/blog/news/lesson28-kafka-producer-perf/
29 kafka-consumer-perf-test.sh https://bigdataschool.ru/blog/news/lesson29-kafka-consumer-perf/
30 kafka-mirror-maker.sh https://bigdataschool.ru/blog/news/lesson30-kafka-mirror-maker/
31 connect-standalone.sh https://bigdataschool.ru/blog/news/lesson31-kafka-connect-standalone/
32 connect-distributed.sh https://bigdataschool.ru/blog/news/lesson32-kafka-connect-distributed/
33 kcat. Альтернативный CLI https://bigdataschool.ru/blog/news/lesson33-kcat/