Содержание
- Состояние Kafka Streams и зачем его сбрасывать
- Что именно сбрасывает kafka-streams-application-reset.sh
- Флаги kafka-streams-application-reset.sh
- Приложение должно быть остановлено
- Шаг 1. Dry run - смотрим что будет удалено
- Шаг 2. Полный сброс с флагом --execute
- Шаг 3. Удаление локального state store
- Стратегии сброса офсетов
- Частичный сброс. Только офсеты, без удаления топиков
- Защита через TLS и SASL
- Типичные ошибки при работе с утилитой
- Что дальше
- Источники
- Все уроки курса
В уроке 19 мы разобрали kafka-consumer-groups.sh — утилиту для работы с группами консьюмеров: просмотр lag-а, сброс офсетов, удаление групп. Там же обсуждали, как Kafka хранит офсеты в топике __consumer_offsets.
Сегодня идём дальше. kafka-streams-application-reset.sh — это специализированный инструмент для приложений на Kafka Streams. У стримингового приложения состояние устроено значительно сложнее, чем просто офсет группы консьюмеров: есть внутренние топики, changelog-топики и локальные хранилища на диске. Обычный reset через kafka-consumer-groups.sh здесь не поможет — нужен отдельный инструмент.
Kafka Streams подробно разбирается в курсе «Apache Kafka для инженеров данных» — там же практика по state stores, оконным функциям и Kafka Streams Processor API.
Состояние Kafka Streams и зачем его сбрасывать
Kafka Streams — это библиотека для потоковой обработки данных прямо внутри JVM-приложения, без отдельного кластера обработки. Каждое приложение идентифицируется параметром application.id, который одновременно служит идентификатором consumer group.
В процессе работы Kafka Streams создаёт три вида состояния, которые нужно сбрасывать при перезапуске с нуля.
- Consumer group offsets. Стандартные офсеты во входных топиках — до какого сообщения дочитало приложение. Хранятся в __consumer_offsets, как у обычного консьюмера.
- Internal repartition topics. Топики вида
application-id-KSTREAM-AGGREGATE-REPARTITION, которые Streams создаёт для промежуточных шагов — перегруппировки и агрегации данных. Имена генерируются автоматически. - Changelog topics. Топики вида
application-id-STORE_NAME-changelog, в которые записываются изменения локальных state store. Они нужны для восстановления состояния после падения инстанса.
Кроме топиков есть ещё локальные state stores — папки на диске каждого инстанса приложения, обычно на базе RocksDB. Их утилита не трогает — придётся удалять вручную.
Типичные сценарии для полного сброса — разработка и тестирование, когда хочется обработать данные заново с начала. Или recovery после того, как данные в state store оказались некорректными и проще перестроить с нуля, чем разбираться.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
24 августа, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Что именно сбрасывает kafka-streams-application-reset.sh
Утилита работает в два этапа: сначала сбрасывает офсеты consumer group во входных топиках до earliest, потом удаляет внутренние топики. После этого приложение при следующем старте создаст их заново и обработает данные с самого начала.

Флаги kafka-streams-application-reset.sh
Первое, что бросается в глаза — флаг подключения к брокеру. Здесь он —bootstrap-servers (во множественном числе), тогда как в большинстве других утилит Kafka используется —bootstrap-server (в единственном). Это частая причина ошибок — просто запомните разницу.
| Флаг | Что делает | Обязательный |
|---|---|---|
| —bootstrap-servers | Адрес брокера (host:port). Во множественном числе — не опечатка | да |
| —application-id | Значение application.id вашего Streams-приложения | да |
| —input-topics | Входные топики через запятую: офсеты сбрасываются до earliest | нет |
| —intermediate-topics | Промежуточные топики, офсеты которых тоже нужно сбросить | нет |
| —execute | Реально выполнить сброс. Без этого флага — только dry run | нет |
| —force | Пропустить интерактивное подтверждение | нет |
| —config-file | Файл с дополнительными параметрами клиента (TLS, SASL и т.д.) | нет |
| —to-earliest | Сброс офсетов входных топиков до самого раннего сообщения (поведение по умолчанию) | нет |
| —to-latest | Сброс офсетов до последнего сообщения (пропустить всё накопленное) | нет |
| —to-offset | Сброс до конкретного офсета. Формат: topic:partition:offset | нет |
| —to-datetime | Сброс до ближайшего офсета по timestamp. Формат: YYYY-MM-DDTHH:mm:SS.sss | нет |
| —by-duration | Откат назад на заданный период. Формат ISO 8601: PT1H, P1D и т.д. | нет |
| —from-file | Восстановить офсеты из CSV-файла, сохранённого ранее | нет |
Приложение должно быть остановлено
Это не просто рекомендация — это жёсткое требование. Если запустить сброс на работающем приложении, результат непредсказуем: утилита удалит internal топики, пока приложение продолжает в них писать, а consumer group окажется в несогласованном состоянии.
Проверить, что consumer group не активна, можно через утилиту из урока 19.
# Убедиться, что группа не активна (STATE должно быть Empty или Dead) kafka-consumer-groups.sh \ --bootstrap-server localhost:9092 \ --describe \ --group my-streams-app
Если STATE показывает Stable или PreparingRebalance — приложение ещё живое. Остановите его и подождите несколько секунд, пока группа перейдёт в Empty.
Шаг 1. Dry run — смотрим что будет удалено
По умолчанию kafka-streams-application-reset.sh работает в режиме dry run: показывает, что произойдёт, но ничего не делает. Этот режим удобен для проверки перед реальным запуском.
# Dry run - только показать план сброса без реального выполнения kafka-streams-application-reset.sh \ --bootstrap-servers localhost:9092 \ --application-id my-streams-app \ --input-topics orders,payments
Вывод покажет список internal топиков, которые будут удалены, и офсеты, которые будут сброшены. Проверьте, что в списке нет лишних топиков — иногда application.id перекрывается с другим приложением при неаккуратном именовании.
Шаг 2. Полный сброс с флагом —execute
# Полный сброс: удаление internal топиков + сброс офсетов до earliest kafka-streams-application-reset.sh \ --bootstrap-servers localhost:9092 \ --application-id my-streams-app \ --input-topics orders,payments \ --execute
Утилита выдаст интерактивный запрос подтверждения. Если хотите запустить без вопросов (например, в скрипте автоматизации) — добавьте флаг —force.
# Сброс без интерактивного подтверждения (для скриптов) kafka-streams-application-reset.sh \ --bootstrap-servers localhost:9092 \ --application-id my-streams-app \ --input-topics orders,payments \ --execute \ --force
Шаг 3. Удаление локального state store
Это самый важный шаг, который легко забыть. После сброса internal топиков в Kafka локальные копии state store на диске остаются нетронутыми. Если не удалить их — при следующем запуске приложение попытается восстановить состояние из старых данных на диске, что приведёт к рассогласованию с очищенными changelog-топиками.
По умолчанию Kafka Streams хранит state store в директории, заданной параметром state.dir. Если он не переопределён в конфиге приложения, используется /tmp/kafka-streams/.
# Удалить локальный state store для приложения my-streams-app # Выполнить на КАЖДОМ сервере, где запущен инстанс приложения rm -rf /tmp/kafka-streams/my-streams-app/ # Если state.dir переопределён в конфиге приложения: rm -rf /var/kafka-streams/my-streams-app/
После этих трёх шагов можно запускать приложение. Оно создаст internal топики заново и начнёт обработку входных топиков с самого начала.
Стратегии сброса офсетов
По умолчанию утилита сбрасывает офсеты входных топиков до earliest — то есть приложение перечитает все доступные данные. Но это поведение можно изменить через дополнительные флаги.
| Стратегия | Флаг | Когда использовать |
|---|---|---|
| До самого раннего | —to-earliest | Полная переобработка всех данных. Поведение по умолчанию |
| До последнего | —to-latest | Пропустить накопленные данные, обрабатывать только новые |
| До конкретного момента | —to-datetime | Переобработать данные начиная с заданного timestamp |
| Откат на период | —by-duration | Перечитать последние N часов/дней без ручного расчёта timestamp |
| До конкретного офсета | —to-offset | Точечный контроль, когда знаете нужный офсет |
| Из файла | —from-file | Восстановить сохранённый ранее набор офсетов |
Пример сброса офсетов до конкретного момента времени — удобно, когда нужно переобработать данные после инцидента:
# Сброс до ближайшего офсета по timestamp (переобработать данные с 1 мая 2025) kafka-streams-application-reset.sh \ --bootstrap-servers localhost:9092 \ --application-id my-streams-app \ --input-topics orders \ --to-datetime 2025-05-01T00:00:00.000 \ --execute
Откат на несколько часов назад через ISO 8601 duration:
# Откат офсетов на 6 часов назад (PT6H = Period Time 6 Hours) kafka-streams-application-reset.sh \ --bootstrap-servers localhost:9092 \ --application-id my-streams-app \ --input-topics orders \ --by-duration PT6H \ --execute
Частичный сброс. Только офсеты, без удаления топиков
Иногда нужно только сдвинуть офсеты во входных топиках, не трогая state store и internal топики. Например, когда хотите перечитать последние несколько часов данных, но агрегированное состояние нас устраивает.
Для этого передайте —input-topics без указания промежуточных топиков. Утилита сбросит только офсеты consumer group, internal топики останутся нетронутыми.
# Частичный сброс: только офсеты входного топика, без удаления internal топиков kafka-streams-application-reset.sh \ --bootstrap-servers localhost:9092 \ --application-id my-streams-app \ --input-topics orders \ --execute
Обратите внимание — при частичном сбросе удалять local state store не обязательно: состояние на диске останется согласованным с changelog-топиками в Kafka.
Защита через TLS и SASL
На production-кластере с включённой аутентификацией нужно передать параметры безопасности через файл конфигурации.
# Пример файла client.properties для SASL/PLAIN: # security.protocol=SASL_SSL # sasl.mechanism=PLAIN # sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="admin" password="secret"; # Запуск с конфигом безопасности kafka-streams-application-reset.sh \ --bootstrap-servers kafka.prod.example.com:9093 \ --application-id my-streams-app \ --input-topics orders \ --config-file client.properties \ --execute
Типичные ошибки при работе с утилитой
Разберём три ситуации, с которыми чаще всего сталкиваются при первом использовании.
- Опечатка в флаге —bootstrap-servers. Если написать —bootstrap-server (без s), утилита выдаст ошибку «Unknown option». Это единственная стандартная утилита Kafka, которая требует множественное число. Просто запомните.
- Запуск на работающем приложении. Утилита начнёт удалять internal топики прямо в процессе работы Streams. Приложение словит ошибки типа «Topic not found» и скорее всего упадёт с исключением. Обязательно остановите приложение до запуска reset.
- Забыть удалить local state store. При следующем запуске приложение попытается восстановить старые данные с диска. Changelog-топиков уже нет — начнутся ошибки восстановления или некорректные агрегаты. Удаляйте state store после каждого полного сброса.
Если после сброса приложение всё равно стартует с ошибками восстановления — проверьте state.dir в конфиге. Иногда разработчики задают нестандартный путь, и папку в /tmp/kafka-streams/ удалять вообще бессмысленно.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
24 августа, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Что дальше
В уроке 21 переходим к административным утилитам — разберём kafka-leader-election.sh. Она отвечает за управление leader-репликами партиций: принудительный перевыбор лидера после сбоя брокера и балансировка preferred leaders по кластеру. Инструмент критически важен для production, где нужно восстановить нормальное распределение нагрузки после инцидента.
Если хотите разобраться глубже с тем, как Kafka Streams управляет state stores, восстановлением после сбоев и горизонтальным масштабированием — эта тема подробно разбирается в курсе «Apache Kafka для инженеров данных».
Источники
- Apache Kafka Streams. Application Reset Tool — официальная документация (2025)
- Apache Kafka 4.0 Streams Documentation (2025)
- Confluent. Kafka Streams Application Reset Tool (2025)
- Apache Kafka Streams. Configuration Reference — state.dir и application.id (2025)
- Apache Kafka 4.x Documentation. Consumer Group Operations (2025)
