Какие меры принять администратору кластера Apache Kafka, чтобы повысить надежность потоковой экосистемы, использующей эту распределенную платформу как средство интеграции различных приложений.
Сбои в потоковой экосистеме и способы их устранения
Хотя Apache Kafka считается высоконадежной системой благодаря множеству встроенных механизмов отказоустойчивости, таким как репликация и перевыборы лидера. Впрочем, это не исключает возможности сбоев. Сбой — это ситуация, когда приложения не могут публиковаться в Kafka. Снизить риски возникновения сбоев помогут следующие меры:
- при использовании собственной аппаратной инфраструктуры надо прежде всего обеспечить резервирование сетевого оборудования, систем хранения данных, стоек и источников питания;
- далее следует распределить брокеров Kafka так, чтобы они могли пережить сбой любого элемента инфраструктуры;
- настроить репликацию топиков и подтверждения продюсеров (конфигурация acks, о которой мы писали здесь и здесь);
- настроить оповещения о сбоях и критических значениях аппаратной и программной инфраструктуры;
- настроить регулярное обновление кластеров и клиентских библиотек;
- автоматизировать развертывания, включая управляемое версионирование конфигураций.
Сбой не обязательно приводит к отключению всего кластера Kafka, которое происходит в следующих случаях:
- несколько брокеров в кластере становятся недоступными примерно в одно и то же время;
- insync.replicas недостижим при отправке, где acks=all или ни один лидер не доступен для разделов, в которые приложение-продюсер хочет отправить данные;
Частичный сбой кластера, приводящий к удалению нескольких разделов, может полностью заблокировать отправку данных от приложения-продюсера, поскольку буферы памяти в его клиентской библиотеке заполняются. Подробнее о том, как происходит публикация сообщения в Kafka, мы рассматривали здесь.
Длительные сбои не определяются каким-либо конкретным временным интервалом, но имеют определенные характеристики:
- приложение не может отправлять сообщения в клиентскую библиотеку;
- сообщения превышают максимальное количество повторов или начинает истекать время ожидания;
Какой из этих симптомов зависит от следующих параметров конфигурации клиента:
- объем памяти, выделенный для буфера библиотеки Kafka;
- максимальное количество повторов;
- срок действия сообщения.
Когда клиентская библиотека определяет, что максимальное количество повторов сообщения превышено или время ожидания сообщения истекло, приложение получает исключения или ошибки в слушателях обратного вызова. Очень важно не игнорировать эти предупреждения, поскольку ошибки повлияют на сообщения, отправленные в разделы, которые имеют разных брокеров в качестве лидеров. Через некоторое время после сбоя потоки приложения-продюсера будут заблокированы для отправки и начнут возвращать ошибки. Как именно это будет выглядеть, зависит от клиентской библиотеки:
- при использовании библиотеки librdkafka, которая позволяет писать код для работы с Kafka на Python, Go, .NET, C# и других языках программирования вместо Java, предоставляя высокопроизводительную, легкую и многофункциональную реализацию протокола Kafka, вернется код ошибки. Приложению необходимо запросить ответы брокера, чтобы освободить место в клиентской библиотеке перед повторной попыткой отправки.
- При использовании Java-клиента попытки отправки приведут к блокировке потоков приложения и, в конечном итоге, к таймауту. В итоге исчерпается пул потоков приложений аналогично нескольким потокам, блокирующим один и тот же ресурс, например, таблицу базы данных, и система перестает отвечать на запросы извне.
Это можно обойти, используя очереди в памяти, когда один или несколько потоков-получателей принимают запросы, выполняют некоторую обработку и ставят их в очередь фонового потока для отправки в клиентскую библиотеку Kafka. Однако, очереди в памяти – не самое лучшее решение из-за их временного характера. По своей сути буфер – это область, в которой временно хранятся данные. Некоторые процессы записывают данные в эту область, а другие читают и удаляют их. Внутри приложения буфер — это структура данных, а процессы — это потоки выполнения. Очередь в обычном брокере сообщений также является буфером, а процессные блоки — это приложения, которые публикуют и потребляют данные.
Вместимость буфера ограничена, но можно регулировать количество поступающих в него сообщений, а также скорость их потребления. Буферы со временем имеют тенденцию к двум разным состояниям:
- если скорость производства меньше скорости потребления, то буфер будет иметь тенденцию к опустошению;
- если скорость публикации выше скорости потребления, то буфер будет стремиться к заполнению.
Если процесс отправки записывает в буфер, который используется медленнее, то буфер в конечном итоге заполнится. Если процесс отправки получает данные из вышестоящего приложения, он достигнет емкости и отправит обратно в вышестоящую систему. Во время длительных простоев Kafka приложение-продюсер отправляет сообщения в клиентскую библиотеку Kafka, а фоновый поток, выполняющий отправку брокеру, не может освободить какие-либо из этих данных. Даже если добавить в систему еще один буфер и разделять прием входящих запросов от отправки в клиентскую библиотеку Kafka, буфер все равно будет наполняться быстрее, чем освобождаться.
Только гарантируя, что скорость потребления превышает скорость производства, можно обеспечить постоянный поток данных в динамической системе в долгосрочной перспективе. Увеличение буферов не будет долгосрочным решением в перспективе. Кроме того, приложения, хранящие данные внутри буфера в памяти, могут аварийно завершить работу во время ожидания повторного открытия выходного потока. Поэтому приложения, отправляющие сообщения брокеру сообщений, должны иметь возможность различать два типа ошибок и реагировать соответствующим образом:
- независимо от количества попыток приложение-продюсер никогда не сможет отправить сообщение, например, из-за сбоя сериализации, когда неотправленные сообщения куда-то записываются, например, в лог-файл или таблицу исключений в базе данных.
- если приложение-продюсер повторно отправляет сообщение, эта повторная попытка может быть успешной. Для этого следует установить тайм-аут доставки.
Справиться с неудачными отправками можно несколькими способами:
- удалять неудачные сообщения;
- организовать обратное давление в приложении-продюсере и повторить отправку;
- отправить все сообщения в альтернативное локальное хранилище, откуда они будут асинхронно загружены в Kafka;
- отправить сообщения об истечении тайм-аута в локальное хранилище и отправлять их в Kafka фоном;
- реализовать автоматический выключатель для сброса сообщений в альтернативное хранилище, например, на диск или в локальный брокер сообщений и запустить процесс восстановления для последующей отправки сообщений в Kafka;
- выполнять двойную запись сообщений в несколько кластеров Kafka.
При этом стоит помнить о нарушении порядка отправки сообщений при попытке их повторной отправки. При записи сообщений в локальное хранилище, следует обеспечить его отказоустойчивость и соответствующую пропускную способность. В системе, где отправляющее приложение занимает значительный объем памяти, запись на диск проходит через небольшой страничный кэш и включать большое количество операций записи на диск. Чтобы гарантировать сохранность данных в случае отключения локального хранилища, приложения, записывающие данные на диск через файловую систему, должны периодически сбрасывать записи на диск, например, используя функцию fsync(). Задержка конвейера ввода-вывода ограничивает производительность этой операции, которую приложение вызывает синхронно.
Можно добиться более высокой пропускной способности с меньшей надежностью, используя API-интерфейсы асинхронного диска, такие как libaio или io_uring, с помощью liburing. Частоту, с которой должна происходить запись на локальные физические диски, необходимо учитывать наряду с объемом данных, находящихся в различных буферах приложений, которые еще не были записаны и будут потеряны в случае отключения питания в системе.
Также стоит решить вопрос, как будут сообщения записываться в файл: один для всех типов сообщений или по одному для каждого целевого топика или раздела. Также следует определить, как при загрузке данных из файла в Kafka отслеживать, какие сообщения уже были прочитаны. Это надо учитывать, если процесс восстановления завершается на полпути. Впрочем, любая система приема должна иметь возможность обрабатывать частичную запись, включая поврежденные файлы. Наконец, нужно определиться с максимальным размером файла данных, когда приложение переносится на новый файл и когда использованные файлы удаляются из системы.
6 стратегий устранения сбоев в Apache Kafka
Первым вариантом устранения сбоев является удаление неудавшихся сообщений. Эта стратегия более известна как сброс нагрузки. Когда обработчик обратного вызова получает временную ошибку, связанную с отправленным сообщением, он удаляет связанные данные из вышестоящих структур данных и не выполняет никаких дальнейших действий. Приложение может зарегистрировать событие. Эта ситуация должна быть видна операторам извне посредством мониторинга. Такую стратегию просто реализовать, но может возникнуть потеря данных.
Вторым вариантом является создание обратного давления в приложении-потребителе и повторная отправка данных продюсером. Продюсер повторно отправляет сообщения об истечении времени ожидания в клиентскую библиотеку. В случае сбоя Kafka и невозможности публикации сообщений приложение-потребитель прекращает ожидать входящего трафика до тех пор, пока сообщения не начнут поступать снова. Достоинством этого решения является отсутствие внешних зависимостей и потерь сообщений. Но недостатки тут следующие:
- реализация повторных попыток поверх клиентских библиотек Kafka не рекомендуется, поскольку клиентская библиотека уже содержат механизм повтора, который поддерживает порядок сообщений и обеспечивает идемпотентную выработку. Если время сообщения истекло, библиотека сделала все возможное, чтобы доставить его брокеру.
- порядок сообщений теряется;
- потребитель может получить одну и ту же полезную нагрузку дважды, если продюсер повторно отправляет сообщение, время ожидания которого истекло после отправки брокеру и первоначально не получил ответа;
- блокировка системы входящего трафика фактически закрывает сервис без предупреждения внешних сторон.
- изменение конструкции приложения для оказания обратного давления может потребовать явного изменения контракта API на границе системы.
Альтернативной стратегией является запись всех сообщений локально и их асинхронная запись в Kafka. В случае сбоя приложение-продюсер отправляет все сообщения в альтернативное локальное хранилище. Этот вариант менее сложен варианта с автоматическим выключателем: приложение может продолжать принимать входящий трафик гораздо дольше. Для возврата в нормальное рабочее состояние не требуется никакого ручного вмешательства. Нет потерь сообщений. Однако, локальное хранилище будет менее надежным, чем кластер, который оно защищает, и станет потенциальной единственной точкой отказа. Также возникает дополнительная сквозная задержка.
Вместо этого можно отправлять сообщения об истечении тайм-аута в локальное хранилище и их загрузка в Kafka с помощью дополнительного побочного процесса. Этот вариант использует локальное хранилище в качестве канала недоставленных сообщений. Пакетный процесс импортирует эти сообщения в Kafka, как только система вернется в нормальное состояние. Плюсами этого варианта является низкая сложность реализации, а также приложение может продолжать принимать входящий трафик намного дольше. Нет потерь сообщений, хотя их порядок теряется. Еще одни недостатком этой стратегии становится сложность записи в локальное хранилище и необходимость ручного вмешательства для каждого экземпляра приложения, чтобы восстановить все сообщения.
Вместо этого можно внедрить автоматический выключатель для временной отправки сообщений в локальное хранилище. Но это довольно сложно с точки зрения реализации: система отключает поток данных в Kafka в случае сбоя этой платформы и перенаправляет сообщения в локальное хранилище. Приложение воспроизводит эти сообщения из локального хранилища и повторно отправляет их после восстановления Kafka. Затем возобновляется регулярный поток. Сложность в том, что несколько сообщений будут находиться в очередях продюсера. Проблемы с синхронизацией могут возникнуть при размыкании и замыкании автоматического выключателя. Чтобы понять, когда следует размыкать автоматический выключатель, надо знать состояние кластера на основе клиента Kafka через JMX в Java или статистику библиотеки librdkafka. Вместо этого следует отслеживать частоту ошибок в данных, например, отчеты об доставке с ошибками.
Наконец, можно реализовать двойную запись в параллельные кластеры Kafka, когда приложение-продюсер записывает сообщения дважды. А приложения-потребители считывают данные сразу из двух кластеров, отбрасывая ранее просмотренные сообщения. Вместо дедупликации потребители используют метод, основанный на упорядочивании с помощью монотонно увеличивающегося счетчика, проставляемого в каждом сообщении процессом отправки. Потребитель извлекает сообщения из обоих кластеров и отслеживает наибольшее значение счетчика. Такая система продолжит работать, даже если весь кластер Kafka недоступен. Также преимуществами этого способа является отсутствие потери сообщений и сохранение их упорядоченности. Однако, усложняется логика продюсера, а конфигурация топика должна быть одинаковой в обоих кластерах Kafka. Кроме того, этот вариант не защищает от всех проблем: продюсер может не отправлять сообщения ни в один из кластеров в случае проблем с сетью. Поэтому процесс резервного копирования, который будет записывать ошибочные сообщения в локальное хранилище, по-прежнему необходим. Кроме того, этот вариант требует удвоения аппаратного обеспечения и удваивает затраты на обслуживание.
Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Apache Kafka для инженеров данных
- Администрирование кластера Kafka
- Администрирование Arenadata Streaming Kafka
Источники