Изучаем Apache Kafka с нуля. Урок 20. kafka-streams-application-reset.sh

Изучаем Apache Kafka с нуля. Урок 20. kafka-streams-application-reset.sh

 

В уроке 19 мы разобрали kafka-consumer-groups.sh — утилиту для работы с группами консьюмеров: просмотр lag-а, сброс офсетов, удаление групп. Там же обсуждали, как Kafka хранит офсеты в топике __consumer_offsets.

Сегодня идём дальше. kafka-streams-application-reset.sh — это специализированный инструмент для приложений на Kafka Streams. У стримингового приложения состояние устроено значительно сложнее, чем просто офсет группы консьюмеров: есть внутренние топики, changelog-топики и локальные хранилища на диске. Обычный reset через kafka-consumer-groups.sh здесь не поможет — нужен отдельный инструмент.

Kafka Streams подробно разбирается в курсе «Apache Kafka для инженеров данных» — там же практика по state stores, оконным функциям и Kafka Streams Processor API.

Состояние Kafka Streams и зачем его сбрасывать

Kafka Streams — это библиотека для потоковой обработки данных прямо внутри JVM-приложения, без отдельного кластера обработки. Каждое приложение идентифицируется параметром application.id, который одновременно служит идентификатором consumer group.

В процессе работы Kafka Streams создаёт три вида состояния, которые нужно сбрасывать при перезапуске с нуля.

  • Consumer group offsets. Стандартные офсеты во входных топиках — до какого сообщения дочитало приложение. Хранятся в __consumer_offsets, как у обычного консьюмера.
  • Internal repartition topics. Топики вида application-id-KSTREAM-AGGREGATE-REPARTITION, которые Streams создаёт для промежуточных шагов — перегруппировки и агрегации данных. Имена генерируются автоматически.
  • Changelog topics. Топики вида application-id-STORE_NAME-changelog, в которые записываются изменения локальных state store. Они нужны для восстановления состояния после падения инстанса.

Кроме топиков есть ещё локальные state stores — папки на диске каждого инстанса приложения, обычно на базе RocksDB. Их утилита не трогает — придётся удалять вручную.

Типичные сценарии для полного сброса — разработка и тестирование, когда хочется обработать данные заново с начала. Или recovery после того, как данные в state store оказались некорректными и проще перестроить с нуля, чем разбираться.

 

Apache Kafka для инженеров данных

Код курса
DEVKI
Ближайшая дата курса
24 августа, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

 

Что именно сбрасывает kafka-streams-application-reset.sh


Утилита работает в два этапа: сначала сбрасывает офсеты consumer group во входных топиках до earliest, потом удаляет внутренние топики. После этого приложение при следующем старте создаст их заново и обработает данные с самого начала.

Схема сброса Kafka Streams приложения: остановка, reset офсетов и удаление internal топиков, ручное удаление локального state store, перезапуск
Kafka Streams application reset — что сбрасывает утилита и какой порядок действий

 

Флаги kafka-streams-application-reset.sh

Первое, что бросается в глаза — флаг подключения к брокеру. Здесь он —bootstrap-servers (во множественном числе), тогда как в большинстве других утилит Kafka используется —bootstrap-server (в единственном). Это частая причина ошибок — просто запомните разницу.

Флаг Что делает Обязательный
—bootstrap-servers Адрес брокера (host:port). Во множественном числе — не опечатка да
—application-id Значение application.id вашего Streams-приложения да
—input-topics Входные топики через запятую: офсеты сбрасываются до earliest нет
—intermediate-topics Промежуточные топики, офсеты которых тоже нужно сбросить нет
—execute Реально выполнить сброс. Без этого флага — только dry run нет
—force Пропустить интерактивное подтверждение нет
—config-file Файл с дополнительными параметрами клиента (TLS, SASL и т.д.) нет
—to-earliest Сброс офсетов входных топиков до самого раннего сообщения (поведение по умолчанию) нет
—to-latest Сброс офсетов до последнего сообщения (пропустить всё накопленное) нет
—to-offset Сброс до конкретного офсета. Формат: topic:partition:offset нет
—to-datetime Сброс до ближайшего офсета по timestamp. Формат: YYYY-MM-DDTHH:mm:SS.sss нет
—by-duration Откат назад на заданный период. Формат ISO 8601: PT1H, P1D и т.д. нет
—from-file Восстановить офсеты из CSV-файла, сохранённого ранее нет

Приложение должно быть остановлено

Это не просто рекомендация — это жёсткое требование. Если запустить сброс на работающем приложении, результат непредсказуем: утилита удалит internal топики, пока приложение продолжает в них писать, а consumer group окажется в несогласованном состоянии.

Проверить, что consumer group не активна, можно через утилиту из урока 19.

# Убедиться, что группа не активна (STATE должно быть Empty или Dead)
kafka-consumer-groups.sh \
  --bootstrap-server localhost:9092 \
  --describe \
  --group my-streams-app

Если STATE показывает Stable или PreparingRebalance — приложение ещё живое. Остановите его и подождите несколько секунд, пока группа перейдёт в Empty.

 

Шаг 1. Dry run — смотрим что будет удалено

По умолчанию kafka-streams-application-reset.sh работает в режиме dry run: показывает, что произойдёт, но ничего не делает. Этот режим удобен для проверки перед реальным запуском.

# Dry run - только показать план сброса без реального выполнения
kafka-streams-application-reset.sh \
  --bootstrap-servers localhost:9092 \
  --application-id my-streams-app \
  --input-topics orders,payments

Вывод покажет список internal топиков, которые будут удалены, и офсеты, которые будут сброшены. Проверьте, что в списке нет лишних топиков — иногда application.id перекрывается с другим приложением при неаккуратном именовании.

 

Шаг 2. Полный сброс с флагом —execute

# Полный сброс: удаление internal топиков + сброс офсетов до earliest
kafka-streams-application-reset.sh \
  --bootstrap-servers localhost:9092 \
  --application-id my-streams-app \
  --input-topics orders,payments \
  --execute

Утилита выдаст интерактивный запрос подтверждения. Если хотите запустить без вопросов (например, в скрипте автоматизации) — добавьте флаг —force.

# Сброс без интерактивного подтверждения (для скриптов)
kafka-streams-application-reset.sh \
  --bootstrap-servers localhost:9092 \
  --application-id my-streams-app \
  --input-topics orders,payments \
  --execute \
  --force

Шаг 3. Удаление локального state store

Это самый важный шаг, который легко забыть. После сброса internal топиков в Kafka локальные копии state store на диске остаются нетронутыми. Если не удалить их — при следующем запуске приложение попытается восстановить состояние из старых данных на диске, что приведёт к рассогласованию с очищенными changelog-топиками.

По умолчанию Kafka Streams хранит state store в директории, заданной параметром state.dir. Если он не переопределён в конфиге приложения, используется /tmp/kafka-streams/.

# Удалить локальный state store для приложения my-streams-app
# Выполнить на КАЖДОМ сервере, где запущен инстанс приложения
rm -rf /tmp/kafka-streams/my-streams-app/

# Если state.dir переопределён в конфиге приложения:
rm -rf /var/kafka-streams/my-streams-app/

После этих трёх шагов можно запускать приложение. Оно создаст internal топики заново и начнёт обработку входных топиков с самого начала.

 

Стратегии сброса офсетов

По умолчанию утилита сбрасывает офсеты входных топиков до earliest — то есть приложение перечитает все доступные данные. Но это поведение можно изменить через дополнительные флаги.

Стратегия Флаг Когда использовать
До самого раннего —to-earliest Полная переобработка всех данных. Поведение по умолчанию
До последнего —to-latest Пропустить накопленные данные, обрабатывать только новые
До конкретного момента —to-datetime Переобработать данные начиная с заданного timestamp
Откат на период —by-duration Перечитать последние N часов/дней без ручного расчёта timestamp
До конкретного офсета —to-offset Точечный контроль, когда знаете нужный офсет
Из файла —from-file Восстановить сохранённый ранее набор офсетов

Пример сброса офсетов до конкретного момента времени — удобно, когда нужно переобработать данные после инцидента:

# Сброс до ближайшего офсета по timestamp (переобработать данные с 1 мая 2025)
kafka-streams-application-reset.sh \
  --bootstrap-servers localhost:9092 \
  --application-id my-streams-app \
  --input-topics orders \
  --to-datetime 2025-05-01T00:00:00.000 \
  --execute

Откат на несколько часов назад через ISO 8601 duration:

# Откат офсетов на 6 часов назад (PT6H = Period Time 6 Hours)
kafka-streams-application-reset.sh \
  --bootstrap-servers localhost:9092 \
  --application-id my-streams-app \
  --input-topics orders \
  --by-duration PT6H \
  --execute

 

Частичный сброс. Только офсеты, без удаления топиков

Иногда нужно только сдвинуть офсеты во входных топиках, не трогая state store и internal топики. Например, когда хотите перечитать последние несколько часов данных, но агрегированное состояние нас устраивает.

Для этого передайте —input-topics без указания промежуточных топиков. Утилита сбросит только офсеты consumer group, internal топики останутся нетронутыми.

# Частичный сброс: только офсеты входного топика, без удаления internal топиков
kafka-streams-application-reset.sh \
  --bootstrap-servers localhost:9092 \
  --application-id my-streams-app \
  --input-topics orders \
  --execute

Обратите внимание — при частичном сбросе удалять local state store не обязательно: состояние на диске останется согласованным с changelog-топиками в Kafka.

 

Защита через TLS и SASL

На production-кластере с включённой аутентификацией нужно передать параметры безопасности через файл конфигурации.

# Пример файла client.properties для SASL/PLAIN:
# security.protocol=SASL_SSL
# sasl.mechanism=PLAIN
# sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="secret";

# Запуск с конфигом безопасности
kafka-streams-application-reset.sh \
  --bootstrap-servers kafka.prod.example.com:9093 \
  --application-id my-streams-app \
  --input-topics orders \
  --config-file client.properties \
  --execute

Типичные ошибки при работе с утилитой

Разберём три ситуации, с которыми чаще всего сталкиваются при первом использовании.

  • Опечатка в флаге —bootstrap-servers. Если написать —bootstrap-server (без s), утилита выдаст ошибку «Unknown option». Это единственная стандартная утилита Kafka, которая требует множественное число. Просто запомните.
  • Запуск на работающем приложении. Утилита начнёт удалять internal топики прямо в процессе работы Streams. Приложение словит ошибки типа «Topic not found» и скорее всего упадёт с исключением. Обязательно остановите приложение до запуска reset.
  • Забыть удалить local state store. При следующем запуске приложение попытается восстановить старые данные с диска. Changelog-топиков уже нет — начнутся ошибки восстановления или некорректные агрегаты. Удаляйте state store после каждого полного сброса.

Если после сброса приложение всё равно стартует с ошибками восстановления — проверьте state.dir в конфиге. Иногда разработчики задают нестандартный путь, и папку в /tmp/kafka-streams/ удалять вообще бессмысленно.

 

Apache Kafka для инженеров данных

Код курса
DEVKI
Ближайшая дата курса
24 августа, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

 

Что дальше

В уроке 21 переходим к административным утилитам — разберём kafka-leader-election.sh. Она отвечает за управление leader-репликами партиций: принудительный перевыбор лидера после сбоя брокера и балансировка preferred leaders по кластеру. Инструмент критически важен для production, где нужно восстановить нормальное распределение нагрузки после инцидента.

Если хотите разобраться глубже с тем, как Kafka Streams управляет state stores, восстановлением после сбоев и горизонтальным масштабированием — эта тема подробно разбирается в курсе «Apache Kafka для инженеров данных».

Источники

Все уроки курса

Тема URL
1 Установка Kafka с Zookeeper https://bigdataschool.ru/blog/news/lesson1-kafka-zookeeper-install/
2 Установка Kafka в режиме KRaft https://bigdataschool.ru/blog/news/lesson2-kafka-kraft-install/
3 Docker KRaft. Однонодовый кластер https://bigdataschool.ru/blog/news/lesson3-kafka-docker-single/
4 Docker KRaft. 3-нодовый кластер https://bigdataschool.ru/blog/news/lesson4-kafka-docker-cluster/
5 Утилиты bin/. Переменные окружения и основы https://bigdataschool.ru/blog/news/lesson5-kafka-bin-intro/
6 kafka-topics.sh. Управление топиками https://bigdataschool.ru/blog/news/lesson6-kafka-topics/
7 kafka-console-producer.sh https://bigdataschool.ru/blog/news/lesson7-kafka-console-producer/
8 kafka-console-consumer.sh https://bigdataschool.ru/blog/news/lesson8-kafka-console-consumer/
9 kafka-server-start.sh / kafka-server-stop.sh https://bigdataschool.ru/blog/news/lesson9-kafka-server-start-stop/
10 kafka-storage.sh https://bigdataschool.ru/blog/news/lesson10-kafka-storage/
11 Cheatsheet по кластеру Kafka https://bigdataschool.ru/blog/news/lesson11-kafka-cluster-cheatsheet/
12 kafka-metadata-shell.sh https://bigdataschool.ru/blog/news/lesson13-kafka-metadata-shell/
13 kafka-metadata-shell.sh https://bigdataschool.ru/blog/news/lesson13-kafka-metadata-shell/
14 kafka-metadata-shell.sh https://bigdataschool.ru/blog/news/lesson13-kafka-metadata-shell/
15 kafka-configs.sh https://bigdataschool.ru/blog/news/lesson15-kafka-configs/
16 kafka-log-dirs.sh https://bigdataschool.ru/blog/news/lesson16-kafka-log-dirs/
17 kafka-dump-log.sh https://bigdataschool.ru/blog/news/lesson17-kafka-dump-log/
18 kafka-delete-records.sh https://bigdataschool.ru/blog/news/lesson18-kafka-delete-records/
19 kafka-consumer-groups.sh https://bigdataschool.ru/blog/news/lesson19-kafka-consumer-groups/
20 kafka-streams-application-reset.sh https://bigdataschool.ru/blog/news/lesson20-kafka-streams-reset/
21 kafka-leader-election.sh https://bigdataschool.ru/blog/news/lesson21-kafka-leader-election/
22 kafka-reassign-partitions.sh https://bigdataschool.ru/blog/news/lesson22-kafka-reassign-partitions/
23 kafka-replica-verification.sh https://bigdataschool.ru/blog/news/lesson23-kafka-replica-verification/
24 kafka-acls.sh https://bigdataschool.ru/blog/news/lesson24-kafka-acls/
25 kafka-broker-api-versions.sh https://bigdataschool.ru/blog/news/lesson25-kafka-broker-api-versions/
26 kafka-get-offsets.sh https://bigdataschool.ru/blog/news/lesson26-kafka-get-offsets/
27 kafka-verifiable-producer/consumer.sh https://bigdataschool.ru/blog/news/lesson27-kafka-verifiable/
28 kafka-producer-perf-test.sh https://bigdataschool.ru/blog/news/lesson28-kafka-producer-perf/
29 kafka-consumer-perf-test.sh https://bigdataschool.ru/blog/news/lesson29-kafka-consumer-perf/
30 kafka-mirror-maker.sh https://bigdataschool.ru/blog/news/lesson30-kafka-mirror-maker/
31 connect-standalone.sh https://bigdataschool.ru/blog/news/lesson31-kafka-connect-standalone/
32 connect-distributed.sh https://bigdataschool.ru/blog/news/lesson32-kafka-connect-distributed/
33 kcat. Альтернативный CLI https://bigdataschool.ru/blog/news/lesson33-kcat/