Изучаем Apache Kafka с нуля. Урок 6. kafka-topics.sh — управление топиками

Изучаем Apache Kafka с нуля. Урок 6. kafka-topics.sh — управление топиками

Изучаем Apache Kafka с нуля. Урок 6. kafka-topics.sh — Управление топиками

ПО: Apache Kafka 4.2.0, KRaft-режим  |  Окружение: Ubuntu 22.04 LTS / macOS 14+  |  Уровень: начинающий+

В прошлом уроке мы разобрались с переменными окружения, алиасами и базовой структурой каталога $KAFKA_HOME/bin/. Теперь умеем запускать утилиты из любой директории и понимаем разницу между —bootstrap-server и —controller-quorum-voters. Это важно, потому что в этом уроке флаг —bootstrap-server будет везде.

Сегодня разбираем kafka-topics.sh — главную утилиту для работы с топиками. Именно через неё создают, смотрят, меняют и удаляют топики на любом кластере. Разберём каждое действие на конкретных командах, поговорим о том, что нельзя откатить, и покажем альтернативные способы сделать то же самое.

 

Что такое топик и почему настройки важны с первого раза

Топик в Kafka — это именованный поток записей. Физически топик делится на партиции, а каждая партиция хранится на одном или нескольких брокерах в виде реплик. Один экземпляр партиции является лидером и принимает запись, остальные — репликами в ISR-списке.

Два параметра задаются при создании и потом уже не уменьшаются.

  • —partitions. Количество партиций определяет параллелизм: сколько консьюмеров в группе могут читать топик одновременно. Больше партиций — больше пропускная способность. Но слишком много партиций на маленьком кластере дают накладные расходы без выигрыша.
  • —replication-factor. Сколько копий каждой партиции хранится на разных брокерах. Значение не может превышать число брокеров в кластере. В продакшне минимум 3.

Партиции можно только добавлять, но не убирать. Поэтому хорошо подумать перед созданием.

Базовый синтаксис команды

Все операции с топиками идут через один скрипт. Структура выглядит так:

kafka-topics.sh --bootstrap-server BROKER:PORT --ДЕЙСТВИЕ [параметры]

Обязательный флаг здесь один — —bootstrap-server. Адрес брокера нужен всегда, потому что именно через него утилита получает метаданные кластера. Если вы настроили алиас из урока 5, то вместо полного пути достаточно написать просто kafka-topics.sh.

Создание топика

Флаг —create запускает создание. Минимальный набор параметров — название, количество партиций и фактор репликации.

kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --create \
  --topic orders \
  --partitions 3 \
  --replication-factor 1

Если топик уже существует, команда выдаст ошибку. Чтобы не получать её в скриптах, добавляем флаг —if-not-exists.

kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --create \
  --topic orders \
  --partitions 3 \
  --replication-factor 1 \
  --if-not-exists

С этим флагом команда завершается без ошибки, если топик уже есть. Удобно для идемпотентных скриптов развёртывания.

Создание топика с конфигурацией

Дополнительные параметры хранения задаются через —config. Флаг можно указывать несколько раз.

kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --create \
  --topic events-raw \
  --partitions 6 \
  --replication-factor 1 \
  --config retention.ms=604800000 \
  --config cleanup.policy=delete \
  --config min.insync.replicas=1

Здесь retention.ms=604800000 задаёт хранение сообщений 7 дней (604800000 миллисекунд). Параметр cleanup.policy=delete означает, что старые данные просто удаляются по истечении времени или при превышении размера. Альтернатива — compact, тогда Kafka хранит только последнее значение для каждого ключа.

Список топиков

Флаг —list выводит все топики кластера.

kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --list

В выводе вы увидите и системные топики, названия которых начинаются с двойного подчёркивания: __consumer_offsets, __cluster_metadata. Чтобы их скрыть, добавляем —exclude-internal.

kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --list \
  --exclude-internal

Теперь в выводе только пользовательские топики — удобно для инвентаризации.

Детальная информация о топике

Флаг —describe показывает всё: партиции, лидеров, реплики, ISR-список и текущие конфиги.

kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --topic orders

Вывод выглядит примерно так:

Topic: orders   TopicId: AbCdEfGhIjKl  PartitionCount: 3  ReplicationFactor: 1  Configs: ...
  Topic: orders  Partition: 0  Leader: 1  Replicas: 1  Isr: 1
  Topic: orders  Partition: 1  Leader: 1  Replicas: 1  Isr: 1
  Topic: orders  Partition: 2  Leader: 1  Replicas: 1  Isr: 1

Три строки ниже заголовка — по одной на партицию. Разберём поля.

  • Leader. ID брокера, который является лидером этой партиции. Именно к нему идут запись и чтение (если только не настроен follower fetching).
  • Replicas. Список всех брокеров, которые должны хранить реплику партиции согласно плану размещения. Порядок имеет значение — первый считается предпочтительным лидером.
  • Isr. In-Sync Replicas — брокеры, реплики которых актуальны прямо сейчас. Если ISR меньше Replicas, значит какой-то брокер отстал или недоступен.

На здоровом кластере Replicas и Isr должны совпадать. Расхождение — первый сигнал о проблеме.

Описание всех топиков сразу

Если не указать —topic, команда опишет все топики кластера. Полезно для аудита.

kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --describe

Изменение топика

Флаг —alter позволяет изменить количество партиций или конфигурацию топика.

Увеличение количества партиций

kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --alter \
  --topic orders \
  --partitions 6

Важный момент: партиции можно только добавлять. Уменьшить их количество нельзя — это ограничение самого Kafka, не утилиты. Причина в том, что данные в существующих партициях не перераспределяются автоматически. При добавлении партиций новые сообщения начнут попадать в новые партиции, но старые данные останутся на месте.

Изменение конфигурации существующего топика

Конфиги топика меняются через отдельную утилиту kafka-configs.sh, а не через kafka-topics.sh с флагом —alter. В старых версиях Kafka это работало через —alter —config, но сейчас такой подход устарел. Подробно разберём это в уроке 15.

Удаление топика

Флаг —delete удаляет топик вместе со всеми данными.

kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --delete \
  --topic orders

Операция необратима. Kafka помечает топик как удалённый и асинхронно удаляет данные с диска. Если топик не существует, команда выдаст ошибку. Чтобы этого избежать, есть флаг —if-exists.

kafka-topics.sh \
  --bootstrap-server localhost:9092 \
  --delete \
  --topic orders \
  --if-exists

Системные топики удалять не нужно — Kafka пересоздаёт их при необходимости. Если удалить __consumer_offsets, кластер может повести себя непредсказуемо.

Системные топики. Что это и зачем знать?

Kafka автоматически создаёт несколько служебных топиков. Они не появляются в —list без флага —exclude-internal, но занимают место и влияют на производительность.

  • __consumer_offsets. Хранит смещения консьюмерных групп. По умолчанию создаётся с 50 партициями. Именно сюда консьюмеры фиксируют, до какого offset дочитали.
  • __cluster_metadata. Metadata-лог KRaft. Хранит все изменения метаданных кластера — создание и удаление топиков, назначение лидеров, брокерные события. Работает вместо ZooKeeper.
  • __transaction_state. Создаётся при использовании транзакций. Хранит состояния транзакций продьюсеров.

На практике системные топики нужно знать, чтобы не путаться при просмотре списка и не пытаться их случайно удалить.

Как выглядит типичный workflow с kafka-topics.sh

В реальных проектах порядок работы с топиком обычно такой: создание при деплое, проверка через describe, изменение при масштабировании, удаление при демонтаже. Диаграмма ниже показывает этот цикл.

Диаграмма управления топиком Kafka: создание, просмотр, изменение партиций и удаление через kafka-topics.sh в KRaft-режиме. Бесплатные курсы по Kafka


 

Альтернативные способы управления топиками

kafka-topics.sh — не единственный способ работать с топиками. Вот что ещё есть в арсенале.

kcat — быстрый просмотр метаданных

Утилита kcat (бывший kafkacat) умеет показывать список топиков и партиций через флаг -L.

kcat -b localhost:9092 -L

Вывод подробный: брокеры, топики, партиции, лидеры. Удобно для быстрой диагностики, но создать или удалить топик через kcat нельзя. Подробно разберём kcat в уроке 33.

AdminClient API в Python

Для автоматизации создания топиков в коде используют AdminClient из библиотеки confluent-kafka.

from confluent_kafka.admin import AdminClient, NewTopic

admin = AdminClient({"bootstrap.servers": "localhost:9092"})

new_topic = NewTopic(
    topic="orders",
    num_partitions=3,
    replication_factor=1
)

result = admin.create_topics([new_topic])

for topic, future in result.items():
    try:
        future.result()
        print(f"Топик {topic} создан")
    except Exception as e:
        print(f"Ошибка: {e}")

Этот подход используют, когда топики нужно создавать динамически — например, при деплое микросервиса или в CI/CD пайплайне. Если вас интересует разработка таких решений, курс DEVKI. Apache Kafka для инженеров данных разбирает AdminClient API подробно на реальных кейсах.

Kafka Admin REST API

Если в вашей инфраструктуре поднят Kafka REST Proxy или Confluent REST API, топики можно создавать через HTTP-запросы. Но это уже история для отдельного урока — в базовой поставке Kafka такого нет.

Что дальше?

Мы научились создавать, просматривать, изменять и удалять топики через kafka-topics.sh. Разобрались в выводе —describe и поняли, за чем следить в полях Leader, Replicas и Isr.

В следующем уроке переходим к kafka-console-producer.sh — разберём, как отправлять сообщения в топик из командной строки, как работают ключи сообщений и как это влияет на попадание в партиции. Посмотрим на урок 7: kafka-console-producer.sh.

Отдельный cheatsheet по всем флагам kafka-topics.sh — в бонусном файле к этому уроку.

Если хотите разобраться с управлением топиками в продакшн-кластере — планированием партиций под нагрузку, настройкой retention и политик очистки — всё это есть в курсе KAFKA. Администрирование кластера Kafka.

Источники

Все уроки курса

Тема Ссылка
1 Установка Kafka с Zookeeper https://bigdataschool.ru/blog/news/lesson1-kafka-zookeeper-install/
2 Установка Kafka в режиме KRaft https://bigdataschool.ru/blog/news/lesson2-kafka-kraft-install/
3 Docker KRaft. Однонодовый кластер https://bigdataschool.ru/blog/news/lesson3-kafka-docker-single/
4 Docker KRaft. Трёхнодовый кластер https://bigdataschool.ru/blog/news/lesson4-kafka-docker-cluster/
5 Утилиты bin/. Переменные окружения и основы https://bigdataschool.ru/blog/news/lesson5-kafka-bin-intro/
6 kafka-topics.sh. Управление топиками https://bigdataschool.ru/blog/news/lesson6-kafka-topics/
7 kafka-console-producer.sh https://bigdataschool.ru/blog/news/lesson7-kafka-console-producer/
8 kafka-console-consumer.sh https://bigdataschool.ru/blog/news/lesson8-kafka-console-consumer/
9 kafka-server-start.sh / kafka-server-stop.sh https://bigdataschool.ru/blog/news/lesson9-kafka-server-start-stop/
10 kafka-storage.sh https://bigdataschool.ru/blog/news/lesson10-kafka-storage/
11 kafka-cluster.sh https://bigdataschool.ru/blog/news/lesson11-kafka-cluster/
12 kafka-metadata-quorum.sh https://bigdataschool.ru/blog/news/lesson12-kafka-metadata-quorum/
13 kafka-metadata-shell.sh https://bigdataschool.ru/blog/news/lesson13-kafka-metadata-shell/
14 kafka-features.sh https://bigdataschool.ru/blog/news/lesson14-kafka-features/
15 kafka-configs.sh https://bigdataschool.ru/blog/news/lesson15-kafka-configs/
16 kafka-log-dirs.sh https://bigdataschool.ru/blog/news/lesson16-kafka-log-dirs/
17 kafka-dump-log.sh https://bigdataschool.ru/blog/news/lesson17-kafka-dump-log/
18 kafka-delete-records.sh https://bigdataschool.ru/blog/news/lesson18-kafka-delete-records/
19 kafka-consumer-groups.sh https://bigdataschool.ru/blog/news/lesson19-kafka-consumer-groups/
20 kafka-streams-application-reset.sh https://bigdataschool.ru/blog/news/lesson20-kafka-streams-reset/
21 kafka-leader-election.sh https://bigdataschool.ru/blog/news/lesson21-kafka-leader-election/
22 kafka-reassign-partitions.sh https://bigdataschool.ru/blog/news/lesson22-kafka-reassign-partitions/
23 kafka-replica-verification.sh https://bigdataschool.ru/blog/news/lesson23-kafka-replica-verification/
24 kafka-acls.sh https://bigdataschool.ru/blog/news/lesson24-kafka-acls/
25 kafka-broker-api-versions.sh https://bigdataschool.ru/blog/news/lesson25-kafka-broker-api-versions/
26 kafka-get-offsets.sh https://bigdataschool.ru/blog/news/lesson26-kafka-get-offsets/
27 kafka-verifiable-producer/consumer.sh https://bigdataschool.ru/blog/news/lesson27-kafka-verifiable/
28 kafka-producer-perf-test.sh https://bigdataschool.ru/blog/news/lesson28-kafka-producer-perf/
29 kafka-consumer-perf-test.sh https://bigdataschool.ru/blog/news/lesson29-kafka-consumer-perf/
30 kafka-mirror-maker.sh https://bigdataschool.ru/blog/news/lesson30-kafka-mirror-maker/
31 connect-standalone.sh https://bigdataschool.ru/blog/news/lesson31-kafka-connect-standalone/
32 connect-distributed.sh https://bigdataschool.ru/blog/news/lesson32-kafka-connect-distributed/
33 kcat. Альтернативный CLI https://bigdataschool.ru/blog/news/lesson33-kcat/