Изучаем Apache Kafka с нуля. Урок 25. kafka-broker-api-versions.sh

Изучаем Apache Kafka с нуля. Урок 25. kafka-broker-api-versions.sh

 

В уроке 24 мы разбирали kafka-acls.sh и работу со списками доступа: кто из клиентов что может делать с топиками, группами, кластером. Там же всплыл важный момент — аутентификация через SASL и флаг —command-config. Прежде чем клиент получит право что-то делать, он должен вообще договориться с брокером на уровне протокола.

Именно здесь входит kafka-broker-api-versions.sh. Эта утилита отвечает на простой вопрос: какие версии Kafka API поддерживает брокер прямо сейчас? Звучит как что-то абстрактное, но на практике это первое, что нужно проверить при несовпадении версий клиента и сервера, перед rolling upgrade или когда клиент внезапно начинает падать с ошибкой UNSUPPORTED_VERSION.

Тема совместимости версий Kafka, управления rolling upgrades в многоброкерных кластерах и диагностики protocol-level ошибок детально разбирается в курсе «Администрирование кластера Kafka». Там же практика на реальных кластерах с разными версиями брокеров.

 

Что такое Kafka API и зачем знать версии

Kafka — это сетевой протокол поверх TCP. Каждая операция (создать топик, отправить сообщение, прочитать, получить метаданные) — это отдельный тип запроса, у которого есть числовой идентификатор, называемый API Key. Например, Produce — это API Key 0, Fetch — 1, Metadata — 3.

Протокол Kafka версионируется. Каждый API Key имеет минимальную и максимальную версию, которую брокер умеет обрабатывать. Когда клиент подключается, он спрашивает: «какие версии ты поддерживаешь?» Это и есть запрос ApiVersions (API Key 18). Брокер отвечает полным списком, клиент выбирает версию, которую умеют оба — и дальше общаются на ней.

Проблемы начинаются, когда клиент свежее брокера (или наоборот) и запрашивает версию, которой у брокера нет. Или когда в кластере брокеры разных версий, и один умеет больше другого. kafka-broker-api-versions.sh позволяет это проверить не гадая, а с конкретными данными.

 

Базовый запуск

Команда одна — флагов минимум:

kafka-broker-api-versions.sh --bootstrap-server localhost:9092

Утилита подключится к брокеру и выведет полный список поддерживаемых API. Вывод длинный — порядка 60-70 строк для Kafka 4.x. Каждая строка — один API с его числовым ключом, минимальной и максимальной версией, а также «usable version» — той версией, которую выберет этот конкретный клиент.

Пример фрагмента вывода для Kafka 4.2.0:

# Фрагмент вывода kafka-broker-api-versions.sh
# Формат: ApiName(ApiKey) -> MinVersion...MaxVersion [usable: UsableVersion]

Produce(0) -> 0...11 [usable: 11]
Fetch(1) -> 0...17 [usable: 17]
ListOffsets(2) -> 0...9 [usable: 9]
Metadata(3) -> 0...12 [usable: 12]
OffsetCommit(8) -> 0...9 [usable: 9]
OffsetFetch(9) -> 0...9 [usable: 9]
FindCoordinator(10) -> 0...6 [usable: 6]
JoinGroup(11) -> 0...9 [usable: 9]
Heartbeat(12) -> 0...4 [usable: 4]
LeaveGroup(13) -> 0...5 [usable: 5]
SyncGroup(14) -> 0...5 [usable: 5]
DescribeGroups(15) -> 0...5 [usable: 5]
ListGroups(16) -> 0...5 [usable: 5]
ApiVersions(18) -> 0...3 [usable: 3]
CreateTopics(19) -> 0...7 [usable: 7]
DeleteTopics(20) -> 0...6 [usable: 6]
DescribeAcls(29) -> 0...3 [usable: 3]
CreateAcls(30) -> 0...3 [usable: 3]
...

Обратите внимание на поле usable — это именно та версия API, которую выберет клиент при подключении. Если клиент старше брокера, usable будет ограничен возможностями клиента, а не брокера.

 

Как читать вывод

Строка выглядит так: ApiName(ApiKey) -> MinVersion…MaxVersion [usable: X]. Разберём каждую часть.

  • ApiName. Читаемое имя запроса — Produce, Fetch, Metadata и т.д. Дублирует числовой ApiKey для удобства.
  • ApiKey. Числовой идентификатор в скобках. Именно его передаёт клиент в заголовке каждого запроса. По этому числу брокер понимает, что ему прислали.
  • MinVersion…MaxVersion. Диапазон версий, которые брокер может принять для этого API. Если клиент шлёт версию ниже MinVersion или выше MaxVersion — брокер вернёт ошибку.
  • usable. Версия, которую данный клиент (сама утилита) реально выберет для работы. Это min(MaxVersion брокера, MaxVersion клиента).

Если в строке появляется UNSUPPORTED вместо версии — значит этот API брокер вообще не поддерживает. Такое бывает для API, добавленных в поздних версиях Kafka.

Флаги утилиты

Флагов немного, но каждый важен в своём сценарии.

  • —bootstrap-server. Обязательный флаг. Адрес одного или нескольких брокеров через запятую. Утилита сама разрешит остальные брокеры кластера.
  • —command-config. Путь к файлу с настройками клиента — нужен при включённой SASL-аутентификации или TLS. Без него на защищённом кластере получите ошибку аутентификации.
  • —version. Выводит версию самой утилиты, не брокера.
  • —help. Краткая справка.

Этим список и исчерпывается. Утилита простая и делает ровно одно — запрашивает ApiVersions у брокера и выводит ответ. Никаких флагов для фильтрации или форматирования нет, поэтому фильтрацию берёт на себя grep.

 

Практика.

Проверка совместимости перед обновлением

Самый частый сценарий — rolling upgrade кластера. Допустим, у вас три брокера, вы обновляете их по одному. В какой-то момент в кластере одновременно работают брокеры старой и новой версии. Клиенты подключаются к любому из них. Если новая версия добавила API или изменила MaxVersion — нужно убедиться, что клиенты это выдержат.


# Опрашиваем первый брокер (старая версия в кластере)
kafka-broker-api-versions.sh \
  --bootstrap-server broker1:9092 \
  > /tmp/api-broker1.txt

# Опрашиваем второй брокер (уже обновлённый)
kafka-broker-api-versions.sh \
  --bootstrap-server broker2:9092 \
  > /tmp/api-broker2.txt

# Сравниваем - ищем различия в поддерживаемых версиях
diff /tmp/api-broker1.txt /tmp/api-broker2.txt

Расхождения в diff покажут, какие API изменились. Если клиент использует только те API, которые есть в обоих файлах — rolling upgrade пройдёт без проблем для клиентов.

 

Практика. Диагностика ошибки UNSUPPORTED_VERSION

Клиент падает с UNSUPPORTED_VERSION — это значит, что он запросил версию API, которой у брокера нет. Чаще всего это происходит когда клиент новее брокера и по умолчанию пытается использовать самую свежую версию протокола.


# Смотрим что поддерживает брокер для Produce API (Key 0)
kafka-broker-api-versions.sh \
  --bootstrap-server localhost:9092 \
  | grep "Produce"

Пример вывода:

Produce(0) -> 0...11 [usable: 11]

Если клиент пытается отправить запрос Produce версии 12 — получит UNSUPPORTED_VERSION. Решение — обновить брокер или откатить клиент до версии, которая использует Produce v11 и ниже. Теперь это не гадание, а конкретное число из вывода утилиты.

 

Практика. Проверка конкретного API

Не всегда нужен весь список. Grep по имени API даёт нужное быстро.


# Проверяем поддержку транзакционных API
kafka-broker-api-versions.sh \
  --bootstrap-server localhost:9092 \
  | grep -E "InitProducerId|AddPartitionsToTxn|TxnOffsetCommit|EndTxn"

# Проверяем административные API (AdminClient использует их)
kafka-broker-api-versions.sh \
  --bootstrap-server localhost:9092 \
  | grep -E "CreateTopics|DeleteTopics|DescribeConfigs|AlterConfigs"

# Проверяем поддержку KIP-848 (новая модель групп потребителей)
kafka-broker-api-versions.sh \
  --bootstrap-server localhost:9092 \
  | grep -E "ConsumerGroupHeartbeat|ConsumerGroupDescribe"

Если строка для какого-то API не появилась в выводе — брокер его не поддерживает. Версия Kafka тут же покажет, почему: API мог быть добавлен позже той версии, что у вас в кластере.

 

Работа с защищённым кластером

Если кластер настроен с SASL или TLS — нужен файл с параметрами клиента. Это тот же подход, что в уроке 24 с флагом —command-config.

# Файл /tmp/client.properties - настройки для SASL_PLAINTEXT

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="admin" \
  password="admin-secret";

# Запрашиваем версии API с аутентификацией
kafka-broker-api-versions.sh \
  --bootstrap-server localhost:9093 \
  --command-config /tmp/client.properties

Для TLS (SSL-протокол) в client.properties указываем security.protocol=SSL и добавляем пути к truststore и keystore. Сам вывод утилиты не меняется — только способ подключения.

 

Что покажет разница версий Kafka

Посмотрим, как меняется вывод при переходе с Kafka 3.7 на 4.2. Это наглядно показывает, что реально добавилось в протоколе. проверка версий Kafka API, диагностика UNSUPPORTED_VERSION, совместимость при rolling upgrade


Именно поэтому rolling upgrade нужно делать аккуратно: пока в кластере есть брокеры старой версии, новые клиенты могут на них «налететь» если loadbalancer или bootstrap даст им старый брокер.

 

Альтернативные способы получить информацию о версиях API

Несколько инструментов умеют делать то же самое, что kafka-broker-api-versions.sh.

  • kcat (kafkacat). Команда kcat -b localhost:9092 -L выводит метаданные кластера, но не список API версий. Для API версий в kcat отдельного режима нет — kafka-broker-api-versions.sh здесь незаменима.
  • Python: confluent-kafka. Метод AdminClient.list_topics() не даёт API версии, но через низкоуровневый AdminClient._client.poll() можно получить сырой ответ ApiVersions. Это уже не для повседневного использования.
  • Wireshark / tcpdump. Перехват трафика на порту 9092 покажет реальные запросы ApiVersions в сыром виде. Полезно для глубокой диагностики, когда результат утилиты кажется подозрительным.
  • Kafka REST Proxy. Если у вас развёрнут Confluent REST Proxy — он предоставляет HTTP API, через который можно косвенно проверить поддерживаемые операции. Но напрямую список API версий там не отдаётся.

На практике kafka-broker-api-versions.sh — самый быстрый и однозначный способ. Остальные варианты нужны в особых ситуациях.

Типичные ошибки при работе с утилитой

Несколько проблем встречаются регулярно.

  • Connection refused. Брокер не слушает указанный порт. Проверьте, запущен ли брокер, и правильный ли порт — в KRaft-режиме по умолчанию 9092 для клиентов, 9093 для контроллеров.
  • Authentication failed. Вы забыли передать —command-config на защищённом кластере. Или в файле опечатка в пароле.
  • Пустой вывод или зависание. Брокер доступен по сети, но ещё не готов — например, в процессе старта. Подождите несколько секунд и повторите.
  • Версии совпадают, но клиент всё равно падает. Проверьте не только максимальную, но и минимальную версию API. Если брокер поднял MinVersion выше нуля (бывает при deprecation старых версий) — очень старый клиент может не найти совместимой версии.

В большинстве случаев ошибка видна сразу в stderr — утилита неплохо описывает, что пошло не так.

Что дальше

В уроке 26 разберём kafka-get-offsets.sh — утилиту для получения смещений в топиках. Это нужно когда хочется понять, сколько сообщений накопилось в партиции, какое смещение актуально на данный момент, и как сверить позицию консьюмера с реальным хвостом топика.

Референсы

Все уроки курса

Тема Ссылка
1 Установка Kafka с Zookeeper https://bigdataschool.ru/blog/news/lesson1-kafka-zookeeper-install/
2 Установка Kafka в режиме KRaft https://bigdataschool.ru/blog/news/lesson2-kafka-kraft-install/
3 Docker KRaft: однонодовый кластер https://bigdataschool.ru/blog/news/lesson3-kafka-docker-single/
4 Docker KRaft: 3-нодовый кластер https://bigdataschool.ru/blog/news/lesson4-kafka-docker-cluster/
5 Утилиты bin/: переменные окружения и основы https://bigdataschool.ru/blog/news/lesson5-kafka-bin-intro/
6 kafka-topics.sh: управление топиками https://bigdataschool.ru/blog/news/lesson6-kafka-topics/
7 kafka-console-producer.sh https://bigdataschool.ru/blog/news/lesson7-kafka-console-producer/
8 kafka-console-consumer.sh https://bigdataschool.ru/blog/news/lesson8-kafka-console-consumer/
9 kafka-server-start.sh / kafka-server-stop.sh https://bigdataschool.ru/blog/news/lesson9-kafka-server-start-stop/
10 kafka-storage.sh https://bigdataschool.ru/blog/news/lesson10-kafka-storage/
11 kafka-cluster.sh https://bigdataschool.ru/blog/news/lesson11-kafka-cluster/
12 kafka-metadata-quorum.sh https://bigdataschool.ru/blog/news/lesson12-kafka-metadata-quorum/
13 kafka-metadata-shell.sh https://bigdataschool.ru/blog/news/lesson13-kafka-metadata-shell/
14 kafka-features.sh https://bigdataschool.ru/blog/news/lesson14-kafka-features/
15 kafka-configs.sh https://bigdataschool.ru/blog/news/lesson15-kafka-configs/
16 kafka-log-dirs.sh https://bigdataschool.ru/blog/news/lesson16-kafka-log-dirs/
17 kafka-dump-log.sh https://bigdataschool.ru/blog/news/lesson17-kafka-dump-log/
18 kafka-delete-records.sh https://bigdataschool.ru/blog/news/lesson18-kafka-delete-records/
19 kafka-consumer-groups.sh https://bigdataschool.ru/blog/news/lesson19-kafka-consumer-groups/
20 kafka-streams-application-reset.sh https://bigdataschool.ru/blog/news/lesson20-kafka-streams-reset/
21 kafka-leader-election.sh https://bigdataschool.ru/blog/news/lesson21-kafka-leader-election/
22 kafka-reassign-partitions.sh https://bigdataschool.ru/blog/news/lesson22-kafka-reassign-partitions/
23 kafka-replica-verification.sh https://bigdataschool.ru/blog/news/lesson23-kafka-replica-verification/
24 kafka-acls.sh https://bigdataschool.ru/blog/news/lesson24-kafka-acls/
25 kafka-broker-api-versions.sh https://bigdataschool.ru/blog/news/lesson25-kafka-broker-api-versions/
26 kafka-get-offsets.sh https://bigdataschool.ru/blog/news/lesson26-kafka-get-offsets/
27 kafka-verifiable-producer/consumer.sh https://bigdataschool.ru/blog/news/lesson27-kafka-verifiable/
28 kafka-producer-perf-test.sh https://bigdataschool.ru/blog/news/lesson28-kafka-producer-perf/
29 kafka-consumer-perf-test.sh https://bigdataschool.ru/blog/news/lesson29-kafka-consumer-perf/
30 kafka-mirror-maker.sh https://bigdataschool.ru/blog/news/lesson30-kafka-mirror-maker/
31 connect-standalone.sh https://bigdataschool.ru/blog/news/lesson31-kafka-connect-standalone/
32 connect-distributed.sh https://bigdataschool.ru/blog/news/lesson32-kafka-connect-distributed/
33 kcat. Альтернативный CLI https://bigdataschool.ru/blog/news/lesson33-kcat/