Сегодня рассмотрим важную для обучения администраторов кластера Apache Kafka тему про удаление брокеров. Что происходит, когда администратор удаляет брокер Kafka из кластера, какие сложности при этом могут возникнуть и как с ними справляется решение на базе платформы Confluent.
Как вручную удалить брокер Kafka из кластера: краткий guide администратора
На первый взгляд кажется, что добавление узлов в кластер может стать нетривиальной задачей, а с удалением все просто. Однако, на самом деле, при удалении брокеров Kafka тоже есть ряд особенностей, которые следует знать администратору кластера. Удаление брокеров из кластера нужно для обеспечения масштабируемости системы потоковой обработки событий и часто требуется для более эффективной с экономической точки зрения утилизации вычислительных ресурсов. Например, когда объем обрабатываемых данных снизился, нет смысла загружать больше брокеров, чем это на самом деле нужно.
Чтобы избавиться от лишних брокеров, следует сперва уменьшить их нагрузку до тех пор, пока они не перестанут содержать какие-либо разделы. При этом нужно сперва переназначить лидеров разделов топика с помощью скрипта kafka-reassign-partitions.sh, а затем выключить и удалить сам брокер, т.е. компонент Kafka Broker в Apache Ambari, используемого для управления кластером. Сама процедура ручного удаления брокеров Kafka из кластера сводится к следующим шагам:
- в оболочке ZooKeeper получить список брокеров и их идентификаторы с помощью команды, задав имя хоста и номер порта, где запущен сервис синхронизации метаданных ZooKeeper
cd /usr/hdp/current/kafka-broker/bin ./zookeeper-shell.sh <zookeeper_server>:<port> ls /brokers/ids
- В оболочке ZooKeeper получить информацию о брокере Kafka, указав его идентификатор
get /brokers/ids/<brokerId>
- Определить список топиков и разделов, требующих переназначения лидера и реплик через запуск скрипта kafka-topics.sh из каталога /usr/hdp/current/kafka-broker/bin, указав имя хоста и номер порта, где запущен сервис ZooKeeper
./kafka-topics.sh --zookeeper <zookeeper_server>:2182 --describe
Разделы топика, требующие переназначения, идентифицируются со значениями Лидера и Реплики, равными идентификатору брокера узла, который должен быть выведен из эксплуатации.
- Изолировать топики определенного брокера, которые следует удалить, запустив скрипт kafka-topics.sh с параметрами, указывающими ID брокера, название сервера и номер порта ZooKeeper. Например, для вывода из эксплуатации брокера с идентификатором 1004 с ZooKeeper, запущенным на сервере hostname и порте 2182 команда изоляции будет выглядеть так:
./kafka-topics.sh --zookeeper `hostname`:2182 --describe | egrep "Leader: 1004|Replicas: 1004"
- переназначить изолированные разделы другому брокеру Kafka согласно рассчитанному плану переназначений, который позволит равномерно распределить нагрузку по кластеру.
- остановить сервис брокера Kafka и удалить компонент в Ambari. После завершения плана переназначения разделов и подтверждения того, что на брокере нет данных, их можно выключить. Этот шаг является необязательным и зависит от флага toShutdown, введенного в Confluent Platform0. Он по умолчанию имеет значение true. Аналогичное условие справедливо и для операции очистки исключений реплик на отключенных брокерах.
- перезапустить HDFS и YARN, которые могут считывать данные от выведенного из эксплуатации брокера Kafka.
Apache Kafka предоставляет API AlterPartitionReassignments в Controller Broker, позволяя передать раздел и его новый набор целевых реплик , т.е. набор идентификаторов брокера, который должен размещать этот раздел. Сервис Controller Kafka работает на каждом брокере в кластере, но только один из них может быть активен в любой момент времени. При вызове API переназначения разделов контроллер Kafka инициирует перемещение данных, удаляя старые реплики после того, как все новые присоединятся к набору синхронизированных реплик. По завершении этого процесса переназначение разделов считается выполненным.
Администрирование кластера Kafka
Код курса
KAFKA
Ближайшая дата курса
9 декабря, 2024
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Kafka предоставляет низкоуровневый инструмент под названием kafka-reassign-partitions, который помогает переназначить разделы с использованием API AlterPartitionReassignments. Но применение kafka-reassign-partitions также нетривиально и включает целую последовательность шагов. В частности, пользователям необходимо вручную сшить большой JSON-файл с указанием всех разделов и связанных с каждым из них реплик.
Также придется столкнуться с проблемой упаковки бинов, связанную с перемещением реплик по остающимся брокерам. Еще механизм самобалансировки (Self-Balancing) Kafka при удалении брокера требует, чтобы коэффициенты репликации были меньше числа брокеров в кластере, но оставались достаточными для выполнения репликации. Например, не может быть топика с коэффициентом репликации, равным 1, т.к. при его удалении, т.е. если этот топик находится на удаленном брокере, не будет реплики для повторной балансировки. В этом случае при попытке удалить брокера выйдет сообщение об ошибке с исключением org.apache.kafka.common.errors.InvalidBrokerRemovalException.
Таким образом, ручное удаление брокеров из кластера Kafka требует от администратора много специальных знаний и навыков, а также занимает довольно длительный период времени, включая тестирование проведенных операций и исправление возможных ошибок. Поэтому коммерциализированная версия Kafka на платформе Confluent предлагает самобалансирующиеся кластеры и инструменты автоматизированного удаления брокеров. Как это работает, рассмотрим далее.
Решение Confluent
Чтобы автоматизировать операции перебалансировки кластера, не зависящие от пользователя, можно организовать некоторый длительный процесс в брокере, который отслеживает ресурсы кластера, понимает их нагрузку и может придумать интеллектуальный способ переназначения разделов из удаляемых брокеров. Этот ребалансировщик кластера будет перераспределять разделы по всему кластеру, чтобы свести к минимуму влияние нагрузки на оставшихся брокеров.
Впервые такой перебалансировщик был представлен в версии Confluent Platform 6.0 с помощью команды kafka-remove-brokers.
Но функция удаления, выпущенная в версии 6.0, имела два заметных недостатка. Во-первых, удаление периодически приводило к недостаточной репликации разделов в кластере, поскольку брокер был сначала закрыт до того, как разделы были перемещены из него. Вторая проблема вытекала из первой – можно было удалить только одного брокера за раз. Для следующего удаления требовалось ждать, пока нагрузка не будет перебалансирована, чтобы не получить недостаточное количество синхронизированных реплик (ISR, In-Sync Replica) из-за настроенного значения Confluent Cloud, равного 2.
Разумеется, можно было безопасно удалить несколько брокеров по одному с помощью старого API. Но на практике, такой подход тоже сопровождался проблемами. Предположим, нужно удалить пять брокеров из кластера Kafka. При удалении четвертого брокера можно было обнаружить, что нет возможности удалить больше. Или это удаление могло привести к недостаточной емкости всего кластера, т.е. недостаточной репликации разделов и невозможности обслуживать данные. Предупредить такой негативный сценарий можно, предварительно рассчитав емкость, которую можно безопасно удалить.
Поэтому в недавнем выпуске Confluent Cloud и Platform 7.0 появилась возможность удалять брокеры Apache Kafka и уменьшать кластер с помощью всего одной команды. Confluent Server избавляет администратора от необходимости рассчитывать емкость кластера, выполнять перемещение разделов и автоматизирует другие рутинные операции.
Функция самобалансирующихся кластеров Confluent помогает перераспределять разделы по всему кластеру, чтобы свести к минимуму влияние нагрузки на оставшихся брокеров. Встроенный сервис балансировки (балансировщик) работает внутри контроллера Kafka и отвечает за упаковку контейнеров и переназначение разделов, а также выполняет фактические операции удаления брокера. Используя метрики для каждого раздела, он сам рассчитывает план переназначений разделов, чтобы равномерно распределить нагрузку удаляемых брокеров по оставшейся части кластера. Затем эти переназначения назначаются и разрешаются постепенно, чтобы не перегружать кластер множеством операций сразу.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
20 января, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
На практике время, необходимое для перебалансировки разделов между брокерами, очень зависит от объема данных на диске брокера. В случае кластера с многоуровневым хранилищем с сотней ГБ данных на диске это займет примерно час. Но в кластере без многоуровневого хранилища с терабайтами данных на диске, процесс может растянуться на пару дней.
Чтобы операция перебалансировки оставалась бесперебойной, она должна быть устойчива к сбоям и перезапускам узлов. Процедура удаления брокера сохраняет свое выполнение на диск и перезапускается с последнего шага при отработке отказа любого компонента балансировщика.
Балансировщик Confluent использует топик Kafka с коэффициентом репликации 3 для сохранения состояния операции. Записи в формате Protobuf внутри топика позволяют балансировщику полностью восстанавливать свое состояние в случае сбоя системы, сводя к минимуму время восстановления и влияние на любые текущие перебалансировки. Если балансировщик прервется из-за перезапуска брокера, он будет потреблять все из своего топика при запуске, определяя, что выполнялась операция удаления, и просто возобновляет ее с того места остановки.
Примечательно, что сервис Confluentрешает проблему расчета необходимой емкости кластера, чтобы после удаления N брокеров ее хватало для размещения ресурсов. Кроме того, пользователи могут создавать новые топики во время операции удаления брокера, не волнуясь о том, что их реплики окажутся на удаляемом брокере. Для этого разработчики создали специальную функцию исключения реплик в виде API уровня управления, который позволяет пользователю помечать брокера как исключенного для любого размещения новой реплики, запрещая размещение на нем любых новых реплик.
Любой брокер, помеченный как исключенный, не будет иметь никаких новых реплик, размещенных на нем во время создания нового топика или раздела в рамках автоматического назначения реплики. А если пользователь явно назначит исключенный брокер для назначения реплик, возникнет исключение, которое сохраняется и не удаляется до тех пор, пока API не будет вызван снова с явным намерением удалить указанное исключение. Операция автоматически очищается по завершении, удаляя исключение реплики только после подтверждения закрытия брокера. Чтобы упростить уровень удаления брокера и обеспечить качественный дизайн API , API-интерфейс исключения является атомарным и идемпотентным по своей структуре.
Запросы на исключение обрабатываются атомарно: применяется весь набор исключений или же не применяется ни одно, например, при попытке удалить исключение, которого не существует. Чтобы API удаления брокера не закрывал его до запуска переназначений в Confluent Platform 7.0, введен новый логический флаг toShutdown. Он обеспечивает обратную совместимость с предыдущей версией платформы Kafka на Kubernetes и обозначает, должна ли операция удаления продолжаться с выключением брокера. Согласно лучшим практикам работы с оператором Kubernetes, операцию удаления брокера является идемпотентной. Если все брокеры, входящие в состав запроса на удаление, уже находятся в процессе удаления или были удалены из кластера в рамках предыдущего запроса, запрос становится неактивным и немедленно возвращает успешное состояние.
Таким образом, новое решение Confluent автоматизирует и упрощает процесс удаления нескольких брокеров Kafka на этой платформе, позволяя безопасно и эластично масштабировать облачный кластер по потребностям пользователя.
Больше полезных примеров про администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков больших данных в Москве:
Источники
- https://www.ibm.com/docs/en/npi/1.3.1?topic=cluster-decommissioning-kafka-broker-component
- https://sleeplessbeastie.eu/2022/01/05/how-to-reassign-kafka-topic-partitions-and-replicas/
- https://www.confluent.io/blog/remove-kafka-brokers-from-any-cluster-the-easy-way/
- https://docs.confluent.io/platform/current/kafka/sbc/sbc-tutorial.html