Содержание
- Как Kafka на самом деле хранит и удаляет данные
- Синтаксис и флаги kafka-delete-records.sh
- Формат JSON-файла с офсетами
- Первый запуск - базовый пример
- Шаг 1. Проверяем текущие офсеты
- Шаг 2. Готовим JSON-файл
- Шаг 3. Запускаем удаление
- Что значит low_watermark в ответе
- Удаление всех сообщений в топике
- Когда это лучше, чем пересоздание топика
- Практика GDPR - точечное удаление по офсету
- Ограничения и важные нюансы
- Работа на защищённом кластере
- Альтернативные подходы к удалению данных
- Автоматизация - скрипт с генерацией JSON
- Проверка результата после удаления
- Что дальше
- Референсные ссылки
- Все уроки курса
В уроке 17 мы разобрали kafka-dump-log.sh — читали бинарные лог-сегменты прямо с диска, смотрели содержимое батчей, проверяли офсеты и метаданные продюсеров. Это диагностический инструмент: он ничего не меняет, только показывает.
Сегодня переходим к другой задаче. Иногда данные нужно не просмотреть, а удалить — конкретные сообщения в конкретных партициях. Причины бывают разные: GDPR и «право на забвение», тестовый мусор в production-топике, или просто нужно срочно освободить место именно в одной партиции, не трогая остальные.
Для этого в Apache Kafka есть kafka-delete-records.sh. Утилита продвигает нижнюю границу лога — log start offset — вперёд, делая все записи ниже этой границы недоступными. Это не физическое удаление в моменте, но для консьюмеров разница несущественна: сообщения ниже новой границы они прочитать уже не смогут. В курсе «Администрирование кластера Kafka» управление данными в топиках разбирают в блоке операционных задач — рядом с retention-политиками и ребалансировкой.
Как Kafka на самом деле хранит и удаляет данные
Прежде чем запускать команды, стоит понять механику. Kafka не удаляет отдельные сообщения по одному — это не база данных с DELETE-запросом. Данные хранятся в виде последовательных лог-сегментов, и удаление работает через две концепции.
Log start offset (low watermark) — это минимальный офсет, доступный для чтения в партиции. Всё, что ниже этой границы, консьюмер прочитать не может. Именно эту границу и сдвигает kafka-delete-records.sh.
Log end offset (high watermark) — это офсет следующей записи, которая будет добавлена. То есть сейчас последнее доступное сообщение имеет офсет high watermark — 1.
Физически сегменты на диске при вызове команды никуда не деваются сразу. Брокер помечает удалённый диапазон и при следующей очистке (log cleaner) убирает сегменты, которые полностью попали под новую нижнюю границу. Момент физического удаления зависит от настроек log.retention.check.interval.ms и log.cleaner.backoff.ms.
Синтаксис и флаги kafka-delete-records.sh
У утилиты минималистичный интерфейс — всего два обязательных параметра. Вся логика вынесена во внешний JSON-файл с описанием партиций и целевых офсетов.
| Флаг | Что делает | Обязательный |
|---|---|---|
| —bootstrap-server | Адрес брокера (host:port) | да |
| —offset-json-file | Путь к JSON-файлу с описанием партиций и офсетов | да |
| —command-config | Properties-файл с настройками безопасности (TLS, SASL) | нет |
Отдельный —command-config потребуется только если кластер защищён аутентификацией. В остальных случаях достаточно двух параметров.
Формат JSON-файла с офсетами
Файл описывает, до какого офсета нужно удалить записи в каждой партиции. Формат фиксированный — массив объектов с тремя полями на каждую партицию.
{
"partitions": [
{"topic": "orders", "partition": 0, "offset": 150},
{"topic": "orders", "partition": 1, "offset": 200},
{"topic": "orders", "partition": 2, "offset": 100}
],
"version": 1
}
Поле offset задаёт новый log start offset. Все записи до этого офсета станут недоступными. Запись с самим указанным офсетом остаётся доступной — удаляется всё, что строго меньше.
Важный частный случай: если указать offset: -1, Kafka подставит сюда текущий log end offset. Это равносильно удалению всех существующих сообщений в партиции. Новые записи после этого продолжат добавляться штатно.
Первый запуск — базовый пример
Предположим, топик orders имеет три партиции. В партиции 0 накопились тестовые данные с офсетов 0 до 149. Нам нужно их скрыть — сдвинуть log start offset до 150.
Шаг 1. Проверяем текущие офсеты
Сначала узнаём реальное положение дел. Для этого используем kafka-get-offsets.sh — он покажет earliest и latest офсеты по каждой партиции:
# Проверено: Apache Kafka 4.2.0, Ubuntu 22.04 kafka-get-offsets.sh \ --bootstrap-server localhost:9092 \ --topic orders \ --time earliest kafka-get-offsets.sh \ --bootstrap-server localhost:9092 \ --topic orders \ --time latest
Вывод покажет строки вида orders:0:0 (топик:партиция:офсет). Сравниваем earliest и latest — это и есть текущий «видимый» диапазон каждой партиции.
Шаг 2. Готовим JSON-файл
cat > /tmp/delete-records.json << 'EOF'
{
"partitions": [
{"topic": "orders", "partition": 0, "offset": 150},
{"topic": "orders", "partition": 1, "offset": 150},
{"topic": "orders", "partition": 2, "offset": 150}
],
"version": 1
}
EOF
Apache Kafka: администрирование кластера
Код курса
KAFKA
Ближайшая дата курса
13 июля, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Шаг 3. Запускаем удаление
kafka-delete-records.sh \ --bootstrap-server localhost:9092 \ --offset-json-file /tmp/delete-records.json
Утилита выведет результат по каждой партиции. Успешный ответ выглядит так:
Executing records delete operation Records delete operation completed: partition: orders-0 low_watermark: 150 partition: orders-1 low_watermark: 150 partition: orders-2 low_watermark: 150
Поле low_watermark в ответе — это новый log start offset. Если он совпадает с тем, что вы указали в JSON, всё прошло как надо.
Что значит low_watermark в ответе
Брокер возвращает фактически установленный low_watermark, который может отличаться от запрошенного. Вот три варианта расхождения.
- Запрошенный офсет больше log end offset. Если вы просите удалить до офсета 1000, а в партиции всего 500 сообщений, Kafka установит low_watermark = 500 (текущий end offset). Данных больше нет, граница упирается в конец.
- Запрошенный офсет меньше текущего low_watermark. Если граница уже стоит на 200, а вы просите удалить до 100 — ничего не изменится. Kafka вернёт 200. Двигать границу назад нельзя.
- Офсет -1 (удалить всё). Kafka подставит текущий log end offset. Партиция останется пустой, но не исчезнет.
Всегда проверяйте фактический low_watermark в ответе — не полагайтесь на то, что брокер принял ровно то, что вы попросили.
Удаление всех сообщений в топике
Если нужно очистить топик полностью, используем offset -1 для каждой партиции. Он автоматически преобразуется в текущий log end offset.
cat > /tmp/delete-all.json << 'EOF'
{
"partitions": [
{"topic": "orders", "partition": 0, "offset": -1},
{"topic": "orders", "partition": 1, "offset": -1},
{"topic": "orders", "partition": 2, "offset": -1}
],
"version": 1
}
EOF
kafka-delete-records.sh \
--bootstrap-server localhost:9092 \
--offset-json-file /tmp/delete-all.json
Это быстрее, чем удалять и пересоздавать топик — конфигурация, ACL и метаданные сохраняются. Единственное, что меняется — содержимое партиций.
Когда это лучше, чем пересоздание топика
Пересоздание (delete + create) — грубый инструмент. Он сбрасывает все настройки, обнуляет метаданные и требует паузы в работе. kafka-delete-records.sh с offset -1 делает то же самое с точки зрения данных, но гораздо аккуратнее: топик продолжает существовать, продюсеры и консьюмеры не теряют соединение, конфигурация не трогается.
Практика GDPR — точечное удаление по офсету
Типичный сценарий для kafka-delete-records.sh в enterprise-среде — выполнение требований GDPR. Пользователь запрашивает удаление своих данных. Вы знаете, в какой партиции и до какого офсета находятся его записи.
Проблема в том, что Kafka не поддерживает удаление конкретного сообщения по ключу или ID. Удалить можно только диапазон: всё до указанного офсета. Поэтому стандартный подход выглядит так.
- Определяем офсет. Находим последнее сообщение, которое относится к нужному пользователю. Для этого пригодятся kafka-consumer-groups.sh (урок 19) или внешний поиск через consumer с фильтрацией по ключу.
- Публикуем tombstone-запись. Для компактируемых топиков — отправляем сообщение с тем же ключом и null-значением. Компактор удалит все предыдущие записи с этим ключом при следующей очистке.
- Сдвигаем log start offset. Если данные пользователя сосредоточены в начале лога и после них идут данные других — сдвигаем границу до первого «чужого» офсета. Всё до него становится недоступным.
Это не идеальное решение для точечного удаления, но это то, что даёт Kafka из коробки. Если задача встречается часто, смотрите в сторону log compaction с tombstone-записями — этот подход нативен для Kafka и работает автоматически.
Ограничения и важные нюансы
Несколько вещей, которые нужно знать перед использованием в production.
- Compacted-топики. На топиках с политикой cleanup.policy=compact команда выполнится, но результат может быть неожиданным. Компактор периодически пересматривает сегменты и может переписать их так, что граница сдвинется дополнительно. Лучше не смешивать эти инструменты.
- Физическое удаление не мгновенное. После смены low_watermark данные на диске присутствуют до следующего прохода log cleaner. Если нужно немедленное физическое освобождение места — дополнительно установите для топика retention.ms=1000 через kafka-configs.sh (урок 15) и подождите несколько минут, потом верните обратно.
- Консьюмеры с отставшими офсетами. Если consumer-группа читала с офсета 50, а вы сдвинули границу до 150 — при следующем запуске консьюмер получит ошибку OffsetOutOfRange. Нужно сбросить offset группы через kafka-consumer-groups.sh —reset-offsets (урок 19).
- Нельзя откатить. После сдвига low_watermark вернуть старые данные стандартными инструментами невозможно. Если есть сомнения — сначала создайте зеркало топика или убедитесь, что есть резервная копия.
Работа на защищённом кластере
Если кластер использует SASL/SCRAM или TLS, передайте конфигурацию через —command-config:
cat > /tmp/client.properties << 'EOF' security.protocol=SASL_SSL sasl.mechanism=SCRAM-SHA-256 sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \ username="admin" \ password="secret"; ssl.truststore.location=/etc/kafka/ssl/truststore.jks ssl.truststore.password=changeit EOF kafka-delete-records.sh \ --bootstrap-server broker1:9093 \ --offset-json-file /tmp/delete-records.json \ --command-config /tmp/client.properties
Для удаления записей нужно право DELETE на топик. Без него брокер вернёт ошибку авторизации. Управление правами — в уроке 24 про kafka-acls.sh.
Альтернативные подходы к удалению данных
Иногда kafka-delete-records.sh — не лучший выбор. Вот что ещё можно использовать в зависимости от задачи.
| Задача | Инструмент | Когда подходит |
|---|---|---|
| Удалить все данные старше N дней | kafka-configs.sh — изменить retention.ms/retention.bytes | Данные удаляются по времени или объёму автоматически |
| Удалить сообщение с конкретным ключом | Tombstone-запись (null value) + log compaction | Топик с политикой compact, GDPR-сценарии |
| Полностью очистить топик | kafka-delete-records.sh с offset -1 или kafka-topics.sh —delete + create | Зависит от того, нужно ли сохранять конфигурацию |
| Удалить конкретный диапазон офсетов в партиции | kafka-delete-records.sh | Единственный вариант для точного диапазона |
Для задач автоматизации удаления по времени лучше настроить политику retention на уровне топика — это декларативный подход, который работает без ручных операций. kafka-delete-records.sh имеет смысл использовать именно для разовых или нестандартных случаев.
Автоматизация — скрипт с генерацией JSON
Писать JSON-файл руками для каждой операции неудобно. Вот простой bash-скрипт, который генерирует его автоматически по имени топика и числу партиций.
#!/bin/bash
# Проверено: Apache Kafka 4.2.0, Ubuntu 22.04
# Использование: ./delete_records.sh
TOPIC=$1
TARGET_OFFSET=$2
BOOTSTRAP="localhost:9092"
TMP_FILE="/tmp/delete-records-${TOPIC}.json"
# Получаем число партиций
PARTITION_COUNT=$(kafka-topics.sh \
--bootstrap-server $BOOTSTRAP \
--describe \
--topic $TOPIC \
| grep "PartitionCount" \
| awk -F 'PartitionCount: ' '{print $2}' \
| awk '{print $1}')
# Генерируем JSON
echo '{"partitions":[' > $TMP_FILE
for ((i=0; i<$PARTITION_COUNT; i++)); do if [ $i -lt $((PARTITION_COUNT-1)) ]; then echo " {\"topic\": \"$TOPIC\", \"partition\": $i, \"offset\": $TARGET_OFFSET}," >> $TMP_FILE
else
echo " {\"topic\": \"$TOPIC\", \"partition\": $i, \"offset\": $TARGET_OFFSET}" >> $TMP_FILE
fi
done
echo '],"version":1}' >> $TMP_FILE
echo "Сгенерирован файл $TMP_FILE для топика '$TOPIC' ($PARTITION_COUNT партиций), offset=$TARGET_OFFSET"
# Запускаем удаление
kafka-delete-records.sh \
--bootstrap-server $BOOTSTRAP \
--offset-json-file $TMP_FILE
Запустить скрипт просто: ./delete_records.sh orders 150. Он сам подсчитает число партиций и применит нужный офсет ко всем из них.
Проверка результата после удаления
После выполнения команды стоит убедиться, что граница сдвинулась как ожидалось. Для этого снова запускаем kafka-get-offsets.sh с флагом —time earliest:
kafka-get-offsets.sh \ --bootstrap-server localhost:9092 \ --topic orders \ --time earliest
Если в выводе для каждой партиции earliest офсет совпадает с тем, что вы указали — операция прошла успешно. Если нет — смотрите, не упёрся ли брокер в log end offset или в уже существующую границу.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
24 августа, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Что дальше
В следующем уроке разбираем kafka-consumer-groups.sh — одну из самых многофункциональных утилит в арсенале. Она покажет lag всех consumer-групп, состояние офсетов по каждой партиции и позволит сбросить или переставить офсеты — в том числе для тех групп, которые после удаления записей оказались с устаревшим положением.
Референсные ссылки
- Apache Kafka Documentation — DeleteRecords API: kafka.apache.org/documentation/#basic_ops_delete_records
- KIP-107: Add deleteRecords() API to the AdminClient (Apache Kafka Wiki): cwiki.apache.org — KIP-107
- Confluent Developer — Managing Kafka Topic Data Retention (2025): developer.confluent.io — manage-data-in-kafka
- Apache Kafka 4.2 Release Notes (2025): kafka.apache.org/blog

