Изучаем Apache Kafka с нуля. Урок 18. kafka-delete-records.sh

Изучаем Apache Kafka с нуля. Урок 18. kafka-delete-records.sh

 

В уроке 17 мы разобрали kafka-dump-log.sh — читали бинарные лог-сегменты прямо с диска, смотрели содержимое батчей, проверяли офсеты и метаданные продюсеров. Это диагностический инструмент: он ничего не меняет, только показывает.

Сегодня переходим к другой задаче. Иногда данные нужно не просмотреть, а удалить — конкретные сообщения в конкретных партициях. Причины бывают разные: GDPR и «право на забвение», тестовый мусор в production-топике, или просто нужно срочно освободить место именно в одной партиции, не трогая остальные.

Для этого в Apache Kafka есть kafka-delete-records.sh. Утилита продвигает нижнюю границу лога — log start offset — вперёд, делая все записи ниже этой границы недоступными. Это не физическое удаление в моменте, но для консьюмеров разница несущественна: сообщения ниже новой границы они прочитать уже не смогут. В курсе «Администрирование кластера Kafka» управление данными в топиках разбирают в блоке операционных задач — рядом с retention-политиками и ребалансировкой.

 

Как Kafka на самом деле хранит и удаляет данные

Прежде чем запускать команды, стоит понять механику. Kafka не удаляет отдельные сообщения по одному — это не база данных с DELETE-запросом. Данные хранятся в виде последовательных лог-сегментов, и удаление работает через две концепции.

Log start offset (low watermark) — это минимальный офсет, доступный для чтения в партиции. Всё, что ниже этой границы, консьюмер прочитать не может. Именно эту границу и сдвигает kafka-delete-records.sh.

Log end offset (high watermark) — это офсет следующей записи, которая будет добавлена. То есть сейчас последнее доступное сообщение имеет офсет high watermark — 1.

Физически сегменты на диске при вызове команды никуда не деваются сразу. Брокер помечает удалённый диапазон и при следующей очистке (log cleaner) убирает сегменты, которые полностью попали под новую нижнюю границу. Момент физического удаления зависит от настроек log.retention.check.interval.ms и log.cleaner.backoff.ms.

Как Kafka управляет оффсетами при удалении записей из логов и что такое Low watermark


Синтаксис и флаги kafka-delete-records.sh

У утилиты минималистичный интерфейс — всего два обязательных параметра. Вся логика вынесена во внешний JSON-файл с описанием партиций и целевых офсетов.

Флаг Что делает Обязательный
—bootstrap-server Адрес брокера (host:port) да
—offset-json-file Путь к JSON-файлу с описанием партиций и офсетов да
—command-config Properties-файл с настройками безопасности (TLS, SASL) нет

Отдельный —command-config потребуется только если кластер защищён аутентификацией. В остальных случаях достаточно двух параметров.

Формат JSON-файла с офсетами

Файл описывает, до какого офсета нужно удалить записи в каждой партиции. Формат фиксированный — массив объектов с тремя полями на каждую партицию.

{
  "partitions": [
    {"topic": "orders", "partition": 0, "offset": 150},
    {"topic": "orders", "partition": 1, "offset": 200},
    {"topic": "orders", "partition": 2, "offset": 100}
  ],
  "version": 1
}

Поле offset задаёт новый log start offset. Все записи до этого офсета станут недоступными. Запись с самим указанным офсетом остаётся доступной — удаляется всё, что строго меньше.

Важный частный случай: если указать offset: -1, Kafka подставит сюда текущий log end offset. Это равносильно удалению всех существующих сообщений в партиции. Новые записи после этого продолжат добавляться штатно.

 

Первый запуск — базовый пример

 

Предположим, топик orders имеет три партиции. В партиции 0 накопились тестовые данные с офсетов 0 до 149. Нам нужно их скрыть — сдвинуть log start offset до 150.

 

Шаг 1. Проверяем текущие офсеты

Сначала узнаём реальное положение дел. Для этого используем kafka-get-offsets.sh — он покажет earliest и latest офсеты по каждой партиции:

# Проверено: Apache Kafka 4.2.0, Ubuntu 22.04
kafka-get-offsets.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --time earliest

kafka-get-offsets.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --time latest

Вывод покажет строки вида orders:0:0 (топик:партиция:офсет). Сравниваем earliest и latest — это и есть текущий «видимый» диапазон каждой партиции.

 

Шаг 2. Готовим JSON-файл

cat > /tmp/delete-records.json << 'EOF'
{
  "partitions": [
    {"topic": "orders", "partition": 0, "offset": 150},
    {"topic": "orders", "partition": 1, "offset": 150},
    {"topic": "orders", "partition": 2, "offset": 150}
  ],
  "version": 1
}
EOF

 

Apache Kafka: администрирование кластера

Код курса
KAFKA
Ближайшая дата курса
13 июля, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

 

Шаг 3. Запускаем удаление

kafka-delete-records.sh \
  --bootstrap-server localhost:9092 \
  --offset-json-file /tmp/delete-records.json

Утилита выведет результат по каждой партиции. Успешный ответ выглядит так:

Executing records delete operation
Records delete operation completed:
partition: orders-0	low_watermark: 150
partition: orders-1	low_watermark: 150
partition: orders-2	low_watermark: 150

Поле low_watermark в ответе — это новый log start offset. Если он совпадает с тем, что вы указали в JSON, всё прошло как надо.

 

Что значит low_watermark в ответе

Брокер возвращает фактически установленный low_watermark, который может отличаться от запрошенного. Вот три варианта расхождения.

  • Запрошенный офсет больше log end offset. Если вы просите удалить до офсета 1000, а в партиции всего 500 сообщений, Kafka установит low_watermark = 500 (текущий end offset). Данных больше нет, граница упирается в конец.
  • Запрошенный офсет меньше текущего low_watermark. Если граница уже стоит на 200, а вы просите удалить до 100 — ничего не изменится. Kafka вернёт 200. Двигать границу назад нельзя.
  • Офсет -1 (удалить всё). Kafka подставит текущий log end offset. Партиция останется пустой, но не исчезнет.

Всегда проверяйте фактический low_watermark в ответе — не полагайтесь на то, что брокер принял ровно то, что вы попросили.

Удаление всех сообщений в топике

Если нужно очистить топик полностью, используем offset -1 для каждой партиции. Он автоматически преобразуется в текущий log end offset.

cat > /tmp/delete-all.json << 'EOF'
{
  "partitions": [
    {"topic": "orders", "partition": 0, "offset": -1},
    {"topic": "orders", "partition": 1, "offset": -1},
    {"topic": "orders", "partition": 2, "offset": -1}
  ],
  "version": 1
}
EOF

kafka-delete-records.sh \
  --bootstrap-server localhost:9092 \
  --offset-json-file /tmp/delete-all.json

Это быстрее, чем удалять и пересоздавать топик — конфигурация, ACL и метаданные сохраняются. Единственное, что меняется — содержимое партиций.

Когда это лучше, чем пересоздание топика

Пересоздание (delete + create) — грубый инструмент. Он сбрасывает все настройки, обнуляет метаданные и требует паузы в работе. kafka-delete-records.sh с offset -1 делает то же самое с точки зрения данных, но гораздо аккуратнее: топик продолжает существовать, продюсеры и консьюмеры не теряют соединение, конфигурация не трогается.

 

Практика GDPR — точечное удаление по офсету

Типичный сценарий для kafka-delete-records.sh в enterprise-среде — выполнение требований GDPR. Пользователь запрашивает удаление своих данных. Вы знаете, в какой партиции и до какого офсета находятся его записи.

Проблема в том, что Kafka не поддерживает удаление конкретного сообщения по ключу или ID. Удалить можно только диапазон: всё до указанного офсета. Поэтому стандартный подход выглядит так.

  • Определяем офсет. Находим последнее сообщение, которое относится к нужному пользователю. Для этого пригодятся kafka-consumer-groups.sh (урок 19) или внешний поиск через consumer с фильтрацией по ключу.
  • Публикуем tombstone-запись. Для компактируемых топиков — отправляем сообщение с тем же ключом и null-значением. Компактор удалит все предыдущие записи с этим ключом при следующей очистке.
  • Сдвигаем log start offset. Если данные пользователя сосредоточены в начале лога и после них идут данные других — сдвигаем границу до первого «чужого» офсета. Всё до него становится недоступным.

Это не идеальное решение для точечного удаления, но это то, что даёт Kafka из коробки. Если задача встречается часто, смотрите в сторону log compaction с tombstone-записями — этот подход нативен для Kafka и работает автоматически.

Ограничения и важные нюансы

Несколько вещей, которые нужно знать перед использованием в production.

  • Compacted-топики. На топиках с политикой cleanup.policy=compact команда выполнится, но результат может быть неожиданным. Компактор периодически пересматривает сегменты и может переписать их так, что граница сдвинется дополнительно. Лучше не смешивать эти инструменты.
  • Физическое удаление не мгновенное. После смены low_watermark данные на диске присутствуют до следующего прохода log cleaner. Если нужно немедленное физическое освобождение места — дополнительно установите для топика retention.ms=1000 через kafka-configs.sh (урок 15) и подождите несколько минут, потом верните обратно.
  • Консьюмеры с отставшими офсетами. Если consumer-группа читала с офсета 50, а вы сдвинули границу до 150 — при следующем запуске консьюмер получит ошибку OffsetOutOfRange. Нужно сбросить offset группы через kafka-consumer-groups.sh —reset-offsets (урок 19).
  • Нельзя откатить. После сдвига low_watermark вернуть старые данные стандартными инструментами невозможно. Если есть сомнения — сначала создайте зеркало топика или убедитесь, что есть резервная копия.

Работа на защищённом кластере

Если кластер использует SASL/SCRAM или TLS, передайте конфигурацию через —command-config:

cat > /tmp/client.properties << 'EOF'
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
  username="admin" \
  password="secret";
ssl.truststore.location=/etc/kafka/ssl/truststore.jks
ssl.truststore.password=changeit
EOF

kafka-delete-records.sh \
  --bootstrap-server broker1:9093 \
  --offset-json-file /tmp/delete-records.json \
  --command-config /tmp/client.properties

Для удаления записей нужно право DELETE на топик. Без него брокер вернёт ошибку авторизации. Управление правами — в уроке 24 про kafka-acls.sh.

 

Альтернативные подходы к удалению данных

Иногда kafka-delete-records.sh — не лучший выбор. Вот что ещё можно использовать в зависимости от задачи.

Задача Инструмент Когда подходит
Удалить все данные старше N дней kafka-configs.sh — изменить retention.ms/retention.bytes Данные удаляются по времени или объёму автоматически
Удалить сообщение с конкретным ключом Tombstone-запись (null value) + log compaction Топик с политикой compact, GDPR-сценарии
Полностью очистить топик kafka-delete-records.sh с offset -1 или kafka-topics.sh —delete + create Зависит от того, нужно ли сохранять конфигурацию
Удалить конкретный диапазон офсетов в партиции kafka-delete-records.sh Единственный вариант для точного диапазона

Для задач автоматизации удаления по времени лучше настроить политику retention на уровне топика — это декларативный подход, который работает без ручных операций. kafka-delete-records.sh имеет смысл использовать именно для разовых или нестандартных случаев.

 

Автоматизация — скрипт с генерацией JSON

Писать JSON-файл руками для каждой операции неудобно. Вот простой bash-скрипт, который генерирует его автоматически по имени топика и числу партиций.

#!/bin/bash
# Проверено: Apache Kafka 4.2.0, Ubuntu 22.04
# Использование: ./delete_records.sh  

TOPIC=$1
TARGET_OFFSET=$2
BOOTSTRAP="localhost:9092"
TMP_FILE="/tmp/delete-records-${TOPIC}.json"

# Получаем число партиций
PARTITION_COUNT=$(kafka-topics.sh \
  --bootstrap-server $BOOTSTRAP \
  --describe \
  --topic $TOPIC \
  | grep "PartitionCount" \
  | awk -F 'PartitionCount: ' '{print $2}' \
  | awk '{print $1}')

# Генерируем JSON
echo '{"partitions":[' > $TMP_FILE
for ((i=0; i<$PARTITION_COUNT; i++)); do if [ $i -lt $((PARTITION_COUNT-1)) ]; then echo " {\"topic\": \"$TOPIC\", \"partition\": $i, \"offset\": $TARGET_OFFSET}," >> $TMP_FILE
  else
    echo "  {\"topic\": \"$TOPIC\", \"partition\": $i, \"offset\": $TARGET_OFFSET}" >> $TMP_FILE
  fi
done
echo '],"version":1}' >> $TMP_FILE

echo "Сгенерирован файл $TMP_FILE для топика '$TOPIC' ($PARTITION_COUNT партиций), offset=$TARGET_OFFSET"

# Запускаем удаление
kafka-delete-records.sh \
  --bootstrap-server $BOOTSTRAP \
  --offset-json-file $TMP_FILE

Запустить скрипт просто: ./delete_records.sh orders 150. Он сам подсчитает число партиций и применит нужный офсет ко всем из них.

 

Проверка результата после удаления

После выполнения команды стоит убедиться, что граница сдвинулась как ожидалось. Для этого снова запускаем kafka-get-offsets.sh с флагом —time earliest:

kafka-get-offsets.sh \
  --bootstrap-server localhost:9092 \
  --topic orders \
  --time earliest

Если в выводе для каждой партиции earliest офсет совпадает с тем, что вы указали — операция прошла успешно. Если нет — смотрите, не упёрся ли брокер в log end offset или в уже существующую границу.

 

Apache Kafka для инженеров данных

Код курса
DEVKI
Ближайшая дата курса
24 августа, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

 

Что дальше

В следующем уроке разбираем kafka-consumer-groups.sh — одну из самых многофункциональных утилит в арсенале. Она покажет lag всех consumer-групп, состояние офсетов по каждой партиции и позволит сбросить или переставить офсеты — в том числе для тех групп, которые после удаления записей оказались с устаревшим положением.

Референсные ссылки

Все уроки курса

Тема URL
1 Установка Kafka с Zookeeper https://bigdataschool.ru/blog/news/lesson1-kafka-zookeeper-install/
2 Установка Kafka в режиме KRaft https://bigdataschool.ru/blog/news/lesson2-kafka-kraft-install/
3 Docker KRaft: однонодовый кластер https://bigdataschool.ru/blog/news/lesson3-kafka-docker-single/
4 Docker KRaft: 3-нодовый кластер https://bigdataschool.ru/blog/news/lesson4-kafka-docker-cluster/
5 Утилиты bin/: переменные окружения и основы https://bigdataschool.ru/blog/news/lesson5-kafka-bin-intro/
6 kafka-topics.sh: управление топиками https://bigdataschool.ru/blog/news/lesson6-kafka-topics/
7 kafka-console-producer.sh https://bigdataschool.ru/blog/news/lesson7-kafka-console-producer/
8 kafka-console-consumer.sh https://bigdataschool.ru/blog/news/lesson8-kafka-console-consumer/
9 kafka-server-start.sh / kafka-server-stop.sh https://bigdataschool.ru/blog/news/lesson9-kafka-server-start-stop/
10 kafka-storage.sh https://bigdataschool.ru/blog/news/lesson10-kafka-storage/
11 kafka-cluster.sh https://bigdataschool.ru/blog/news/lesson11-kafka-cluster/
12 kafka-metadata-quorum.sh https://bigdataschool.ru/blog/news/lesson12-kafka-metadata-quorum/
13 kafka-metadata-shell.sh https://bigdataschool.ru/blog/news/lesson13-kafka-metadata-shell/
14 kafka-features.sh https://bigdataschool.ru/blog/news/lesson14-kafka-features/
15 kafka-configs.sh https://bigdataschool.ru/blog/news/lesson15-kafka-configs/
16 kafka-log-dirs.sh https://bigdataschool.ru/blog/news/lesson16-kafka-log-dirs/
17 kafka-dump-log.sh https://bigdataschool.ru/blog/news/lesson17-kafka-dump-log/
18 kafka-delete-records.sh https://bigdataschool.ru/blog/news/lesson18-kafka-delete-records/
19 kafka-consumer-groups.sh https://bigdataschool.ru/blog/news/lesson19-kafka-consumer-groups/
20 kafka-streams-application-reset.sh https://bigdataschool.ru/blog/news/lesson20-kafka-streams-reset/
21 kafka-leader-election.sh https://bigdataschool.ru/blog/news/lesson21-kafka-leader-election/
22 kafka-reassign-partitions.sh https://bigdataschool.ru/blog/news/lesson22-kafka-reassign-partitions/
23 kafka-replica-verification.sh https://bigdataschool.ru/blog/news/lesson23-kafka-replica-verification/
24 kafka-acls.sh https://bigdataschool.ru/blog/news/lesson24-kafka-acls/
25 kafka-broker-api-versions.sh https://bigdataschool.ru/blog/news/lesson25-kafka-broker-api-versions/
26 kafka-get-offsets.sh https://bigdataschool.ru/blog/news/lesson26-kafka-get-offsets/
27 kafka-verifiable-producer/consumer.sh https://bigdataschool.ru/blog/news/lesson27-kafka-verifiable/
28 kafka-producer-perf-test.sh https://bigdataschool.ru/blog/news/lesson28-kafka-producer-perf/
29 kafka-consumer-perf-test.sh https://bigdataschool.ru/blog/news/lesson29-kafka-consumer-perf/
30 kafka-mirror-maker.sh https://bigdataschool.ru/blog/news/lesson30-kafka-mirror-maker/
31 connect-standalone.sh https://bigdataschool.ru/blog/news/lesson31-kafka-connect-standalone/
32 connect-distributed.sh https://bigdataschool.ru/blog/news/lesson32-kafka-connect-distributed/
33 kcat. Альтернативный CLI https://bigdataschool.ru/blog/news/lesson33-kcat/