В этой статье для обучения дата-инженеров и администраторов кластера Apache Kafka разберем, какие ошибки создают медленные потребители и как решить их, просто изменив значений конфигураций по умолчанию. А также познакомимся с Lighthouse — еще одним полезным инструментом мониторинга системных метрик, который позволит обнаружить эти и другие проблемы.
Проблема медленных потребителей в Apache Kafka и способы ее решения
Apache Kafka имеет почти 750 различных конфигураций, распределенных по брокерам, продюсерам, потребителям, топикам и разделам. Разумеется, не все значения по умолчанию будут оптимальны для отдельных вариантов использования. Например, значения конфигураций потребителей max.poll.records и max.poll.interval.ms по умолчанию могут тормозить всю систему и привести к тому, что потребители отдельных топиков Kafka зависают и повторно обрабатывают одни и те же записи.
При этом в логах Kafka регистрирует эти ошибки на уровне INFO, сообщая, что приложение-потребитель отправляет запрос на выход из группы координатору из-за истечения времени ожидания опроса потребителя. Это означает, что время между последующими вызовами poll()больше, чем настроенное значение max.poll.interval.ms. Обычно это означает, что цикл опроса тратит слишком много времени на обработку сообщений. Решить эту проблему можно увеличив значение max.poll.interval.ms или уменьшив максимальный размер пакетов, возвращаемых в poll(), через настройку max.poll.records.
По умолчанию max.poll.records равно 500 – это означает максимальное количество записей, возвращаемых одним вызовом poll(). Изменение этого значения не влияет на основное поведение опроса: приложение-потребитель кэширует записи из каждого запроса и постепенно возвращает их оттуда.
А конфигурация max.poll.interval.ms, по умолчанию равная 5 минут, означает максимальную задержку между вызовами poll() при использовании управления группами потребителей. Она устанавливает верхнюю границу количества времени, в течение которого потребитель может бездействовать, прежде чем получить больше записей. Если метод poll() не вызывается до истечения этого тайм-аута, то потребитель считается отказавшим, и группа выполняет перебалансировку, чтобы переназначить разделы другому члену группы потребителей. Для потребителей, использующих ненулевой group.instance.id, которые достигли этого тайм-аута, разделы не будут переназначены немедленно. Вместо этого потребитель перестанет отправлять тактовые импульсы, а разделы будут переназначены по истечении session.timeout.ms. Это отражает поведение статического потребителя, который отключился.
Эти два свойства конфигурации определяют требования к приложению-потребителю: оно должно иметь возможность потреблять max.poll.records записей за max.poll.interval.ms миллисекунд. Используя значения по умолчанию, это будет 0,6 секунд на одно сообщение. Но эти 600 миллисекунд могут быть довольно жестким ограничением, если нужно выполнить сложные операции для каждой записи.
Таким образом, Apache Kafka по умолчанию не предназначена для медленных потребителей. Если же клиентские приложения не могут или не должны работать с высокой скоростью, можно написать собственный метод считывания сообщений из топиков, модифицировав метод poll(). Он будет извлекать записи из брокеров и возвращать 0 или более записей, в зависимости от того, действительно ли есть какие-либо несчитанные записи в топики. Следует вызывать этот метод по крайней мере один раз каждые max.poll.interval.ms, иначе клиентская библиотека покинет группу потребителей, чтобы позволить другим экземплярам потребителя продолжить работу. Для этого можно абстрагировать метод poll(), используя фреймворк Spring, чтобы создать обработчик обратного вызова.
Это позволит предотвратить отсутствие фиксации и дублировании обработки сообщений при их медленном потреблении. Напомним, если записи в Kafka остались незафиксированными, другой экземпляр медленного приложения-потребителя берет тот же пакет записей и потребители просто повторно обрабатывают одну и ту же партию данных.
Apache Kafka для инженеров данных
Код курса
DEVKI
Ближайшая дата курса
20 января, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Примечательно, что такая ситуация обнаруживается не сразу, а когда пакеты данных становятся достаточно большими. В частности, это может случиться, если потребитель был отключен на какое-то время или произошел внезапный всплеск производства записей. Но пока потребитель не отстает значительно, пакеты достаточно малы для завершения в течение max.poll.interval.ms милисекунд. Поэтому может пройти несколько лет, прежде чем проблема обнаружится.
Таким образом, конфигурация потребителя по умолчанию предполагает высокую скорость его работы. Если это не так, следует перенастроить max.poll.records и/или max.poll.interval.ms. Снижение max.poll.records не влияет на скорость реагирования на определенные сценарии сбоев, но снижает пропускную способность системы из-за роста общих накладных расходов. Увеличение max.poll.interval.ms не влияет на пропускную способность, но может привести к замедлению отклика при перебалансировке потребителей. Определить эти и другие ситуации помогут средства мониторинга, об одном из которых мы поговорим далее.
Lighthouse для мониторинга
Здесь и здесь мы уже рассказывали про средства мониторинга за системными метриками кластера Apache Kafka, а также приложений Kafka Streams. Еще одним полезным для дата-инженера и администратора инструментом является Lighthouse. Эта легковесная система с открытым исходным кодом предоставляет наглядный графический интерфейс с потоковой передачей данных в реальном времени, чтобы отслеживать потоки продюсеров и потребителей. Также Lighthouse обеспечивает оперативное представление основных показателей производительности для визуализации работоспособности кластера Kafka.
Lighthouse создан с использованием Next.js, GraphQL, Apollo и Prometheus, с планами рефакторинга в Typescript. Инструмент использует рендеринг на стороне сервера во время сборки, что позволяет почти мгновенно доставлять пользователю значения отслеживаемых метрик, чтобы проводить диагностику системы и делать прогнозы для любого варианта использования.
Lighthouse использует предопределенный набор параметров GraphQL для запросов к серверу Prometheus, позволяя напрямую отображать данные о кластере Kafka в GUI. После загрузки Lighthouse следует заменить конечную точку Prometheus своей собственной в файле .env, чтобы подключиться к своему кластеру и метрикам, доступным во время выполнения.
Больше подробностей про администрирование и эксплуатацию Apache Kafka в системах аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники