Политики хранения, сжатия и очистки данных в топиках Apache Kafka: какие конфигурации нужно настроить, чтобы работать с файлами распределенных логов наиболее эффективно. Ликбез для администратора кластера Kafka и дата-инженера.
Хранение данных в Apache Kafka
Мы уже писали, что топик в Apache Kafka представляет собой не физическое, а логическое хранение данных. Топики делятся на разделы, которые, в свою очередь, состоят из сегментов. Сообщение, отправленное в кластер Kafka, добавляется в конец одного из журналов и остается там в течение настраиваемого периода времени или до тех пор, пока не будет достигнут предельный размер лога. Таким образом, можно сказать, что хранение сообщений в топиках Kafka может быть организовано одним из следующих способов:
- на основе времени, когда сегменты лога помечаются для удаления или сжатия по истечении настроенного времени хранения для сегмента журнала, в зависимости от настроенной политики очистки. По умолчанию срок хранения сегментов составляет 7 дней. При этом настраиваются такие конфигурации брокера, как retention.ms (количество миллисекунд хранения файла журнала перед его удалением), log.retention.minutes (количество минут хранения файла журнала перед его удалением, вторичное по отношению к свойству log.retention.ms) или log.retention.hours (количество часов хранения файла журнала перед его удалением, наименее приоритетная настройка).
- на основе максимально допустимого размера журнала для раздела топика. Когда размер лог достигает этого предела, начинается удаление сегментов с конца. При этом настраивается конфигурация retention.bytes.
Сжатие лога гарантирует, что Kafka всегда будет сохранять по крайней мере последнее известное значение для каждого ключа сообщения в журнале для одного раздела топика, чтобы гарантировать восстановление состояния после сбоя или повторную загрузку кэшей после перезапуска приложения во время эксплуатационного обслуживания. Простой подход к хранению данных предполагает, что старые данные журнала удаляются по истечении фиксированного периода времени или достижении лог-файлом предельного размера. Это отлично подходит для временных данных о событиях, таких как регистрация, где каждая запись стоит отдельно. Но в случае более сложных сценариев, например, захват измененных данных (CDC), где выполняется журналирование изменений с ключами, следует использовать другой способ.
К примеру, при подписке на изменения системы-источника, когда каждое изменение должно быть отражено в нескольких приемниках. Чтобы перезагрузить кэш или восстановить неисправный поисковый узел, нужен понадобиться полный набор данных, а не только последний лог. Аналогично при поиске событий, когда приложение использует журнал изменений в качестве основного хранилища данных для запросов. Также это пригодится при журналировании данных в системы высокой доступности для обеспечения отказоустойчивости. Процесс, выполняющий локальные вычисления, можно сделать отказоустойчивым, записав изменения, которые он вносит в свое локальное состояние, чтобы другой процесс мог использовать их и продолжить работу в случае сбоя. Это применимо в обработке счетчиков, агрегаций и другой подобной группировке данных в системе потоковых запросов.
В каждом из таких случаев сперва необходимо обрабатывать поток изменений в реальном времени. Но при сбое данные необходимо повторно загрузить или повторно обработать, выполнив полную загрузку. Возможно, при этом придется очистить топик, удалив из него все сообщения. Как это сделать, мы написали в отдельной статье. Избежать этого радикального способа поможет сжатие логов, которое позволяет отправить некорректные данные в один и тот же вспомогательный топик. Какие конфигурации при этом настраиваются, рассмотрим далее.
Сжатие и удаление данных
За работу с данными, которые вышли за пределы допустимого хранения в топиках, о чем мы говорили выше, отвечает конфигурация log.cleanup.policy — политика очистки, согласно которой старые сообщения будут удалены (delete) или сжаты (compact). Сжатием журналов занимается очиститель, в котором работает пул фоновых потоков, повторно копирующих файлы сегментов журнала, удаляя записи с ключом, указанном в заголовке. Каждый поток при этом работает следующим образом:
- выбирает наиболее длинный журнал;
- создает краткую сводку последнего смещения для каждого ключа в заголовке журнала. Сводка заголовка журнала — это компактная хеш-таблица, которая использует ровно 24 байта на запись, во много раз меньше изначального размера.
- повторно копирует журнал от начала до конца, удаляя ключи, которые позже появляются в журнале. Новые сегменты немедленно заменяются, поэтому требуется дополнительное дисковое пространство только для одного дополнительного сегмента журнала, а не для всей полной копии.
При этом учитывается значение конфигурации log.cleaner.enable, по умолчанию равной true, что включает процесс очистки журнала для запуска на сервере. Эта конфигурация должна быть включена для топиков с политикой очистки cleanup.policy=compact, включая топик внутренних смещений. Иначе эти топики не будут сжиматься и продолжат постоянно увеличиваться в размерах. Также администратор кластера Kafka может настроить конфигурации log.cleaner.dedupe.buffer.size (общая память для дедупликации журнала во всех потоках очистки) и log.cleaner.delete.retention.ms (время, в течение которого должны сохраняться маркеры удаления для сжатых журналов топика). Параметр log.cleaner.delete.retention.ms также задает ограничение времени, в течение которого потребитель должен завершить чтение, если он начинает со смещения 0, чтобы убедиться, что он получил действительный снимок последнего этапа.
За продолжительность хранения удаленных данных отвечают следующие конфигурации Kafka:
- log.cleaner.max.compaction.lag.ms — максимальное время, в течение которого сообщение не может быть сжато в журнале;
- log.cleaner.min.compaction.lag.ms — минимальное время, в течение которого сообщение будет оставаться несжатым в журнале;
- log.cleaner.min.cleanable.ratio — минимальное соотношение старого журнала к общему, при котором он подлежит очистке. Если также заданы конфигурации log.cleaner.max.compaction.lag.ms или log.cleaner.min.compaction.lag.ms, то средство сжатия журналов считает журнал пригодным для сжатия, как только достигнуто пороговое значение log.cleaner.min.cleanable.ratio, и в журнале есть несжатые записи в течение как минимум времени log.cleaner.min.compaction.lag.ms, или если в журнале присутствуют несжатые записи не более периода log.cleaner.max.compaction.lag.ms.
Таким образом, в Apache Kafka сообщения не удаляются сразу после их использования, т.е. считывания приложением-потребителем, а задается конфигурацией топика, политикой очистки. По умолчанию используется политика удаления, когда старые сегменты удаляются, если превышено их время хранения или предельный размер. Политика сжатия активирует сжатие журналов, выборочно удаляя записи в каждом разделе, где есть более свежее обновление с тем же первичным ключом. Это гарантирует, что в журнале будет хотя бы последнее состояние для каждого ключа. Можно комбинировать оба эти способа в политике очистки, одновременно указав значения удаления и сжатия. Тогда журнал будет сжат, но процесс очистки также будет выполняться согласно настройкам времени хранения или ограничения размера данных. Подробнее про сжатие данных в топиках Kafka мы рассказывали здесь.
Освойте администрирование и эксплуатацию Apache Kafka и Flink для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Администрирование кластера Kafka
- Apache Kafka для инженеров данных
- Администрирование Arenadata Streaming Kafka
Источники