Чтобы сделать наши курсы по Apache Kafka для администраторов кластеров и разработчиков распределенных приложений еще более полезными, сегодня рассмотрим несколько полезных и значимых конфигурационных параметров этой платформы потоковой передачи событий. Что настроить на брокере, топике, продюсере и потребителе, как распараллелить потоки и обрабатывать транзакции.
Настройка брокеров и потоков в Apache Kafka
О некоторых важных параметрах конфигурации Apache Kafka, важных с точки зрения работы потокового приложения мы рассказывали здесь и здесь. Чтобы не повторяться, сегодня разберем другие полезные свойства, которые следует знать администратору этой платформы.
Например, на брокере есть настройка group.initial.rebalance.delay.ms — время, в течение которого координатор группы будет ждать, пока больше потребителей присоединятся к новой группе, перед выполнением первой перебалансировки. По умолчанию значение этого параметра равно 3 секунды. Более длительная задержка означает потенциально меньшее количество перебалансировок, но увеличивает время до начала обработки. На практике если потребителей менее 100 в 1-ой группе, значения по умолчанию вполне хватает. Но если их намного больше, 3 секунды будет недостаточно: группа будет слишком долго находиться в режиме перебалансировки в течение (до часа) из-за длительных циклов ошибки у потребителей. Поэтому чем больше потребителей в одной группе, тем больше должно быть значением параметра group.initial.rebalance.delay.ms.
У каждого топика есть конфигурация retention.ms — максимальное время, в течение которого хранится журнал, прежде чем его старые сегменты будут удалены, чтобы освободить место. По сути, этот параметр представляет собой SLA (Service Level Agreement) о том, как скоро потребители должны прочитать свои данные. Если установлено значение -1, ограничение по времени не применяется. Подробнее об особенностях хранения и очистки сообщений в топиках Apache Kafka мы писали здесь. Для распараллеливания потоковой обработки нужно увеличить количество потоков в параметре num.stream.threads, по умолчанию 1.
Назначение разделов на потребителях
Из множества конфигураций потребителя стоит специально выделить partition.assignment.strategy — список имен классов или их типов, упорядоченных по предпочтению, поддерживаемых стратегий назначения разделов, которые клиент будет использовать для распределения владения разделами среди экземпляров-потребителей при управлении группами. Этот параметр пригодится при неравномерном распределении данных, т.е. когда один из ключей намного «тяжелее» остальных, т.е. имеет больше данных. Возможные опции этой конфигурации следующие:
- apache.kafka.clients.consumer.RangeAssignor — назначает разделы для каждого топика;
- apache.kafka.clients.consumer.RoundRobinAssignor – циклическое назначение разделов потребителям;
- apache.kafka.clients.consumer.StickyAssignor — гарантирует максимально сбалансированное назначение, сохраняя при этом как можно больше существующих назначений разделов;
- apache.kafka.clients.consumer.CooperativeStickyAssignor – похож на предыдущий, но допускает совместную ребалансировку.
По умолчанию разделы назначаются по стратегии [RangeAssignor, CooperativeStickyAssignor] – используется RangeAssignor, но позволяет выполнить обновление до CooperativeStickyAssignor с помощью всего лишь одного непрерывного возврата, удаляющего RangeAssignor из списка.
Реализация интерфейса org.apache.kafka.clients.consumer.ConsumerPartitionAssignor позволяет подключать настраиваемую стратегию назначения разделов.
Транзакции на продюсере
Из конфигурации продюсера хочется особенно отметить transactional.id – параметр конфигурации для транзакционной доставки. Он обеспечивает семантику надежности, которая охватывает несколько сеансов производителей, позволяя клиенту гарантировать, что транзакции с одним и тем же идентификатором завершены до начала новых. Если transactional.id не указан, то производитель ограничен идемпотентной доставкой. При настроенном transactional.id параметр enable.idempotence предполагается установленным в значение true, гарантируя строго однократную запись копии сообщения в поток. Для включения идемпотентности требуется, чтобы значение параметра max.in.flight.requests.per.connection было меньше или равно 5 с сохранением порядка сообщений для любого допустимого значения. При этом количество повторных попыток должно быть больше 0, а подтверждения (acks) следует установить в all. Если администратор кластера Kafka не установит все эти значения явно, они будут выбраны автоматически. Но при установке несовместимых значений возникнет исключение ConfigException. Подробнее про исключения в Kafka-приложениях читайте в нашей новой статье.
По умолчанию transactional.id не настроен, что означает невозможность использования транзакций, для которых требуется кластер Kafka как минимум из трех брокеров. Это рекомендуется для production-среды, а для разработки можно изменить это, настроив параметр брокера transaction.state.log.replication.factor — коэффициент репликации для топика транзакции. Более высокое значение этого параметра обеспечит повышенную доступность. Однако, внутренний топик создать не получится, пока размер кластера Kafka не будет соответствовать этому требованию фактора репликации.
Здесь же упомянем конфигурацию transaction.timeout.ms – максимальное время в мс, в течение которого координатор транзакции будет ожидать обновления статуса транзакции от продюсера, прежде чем упреждающе прервать текущую транзакцию. Если это значение больше, чем параметр transaction.max.timeout.ms в брокере, запрос аварийно завершится с ошибкой InvalidTxnTimeoutException. Параметр transaction.max.timeout.ms, значение которого по умолчанию равно 15 минут, означает максимально допустимый тайм-аут для транзакций. Если запрошенное клиентом время транзакции превышает это значение, брокер вернет ошибку в InitProducerIdRequest. Это предотвращает слишком большой тайм-аут клиента, который может помешать потребителям читать топики, включенные в транзакцию.
А ограничить размер пакета для чтения из сегментов журнала транзакций при загрузке идентификаторов продюсеров и транзакций в кэш поможет параметр transaction.state.log.load.buffer.size. Это мягкое ограничение, которое автоматически отменяется, если записи слишком велики.
Узнайте больше про администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков больших данных в Москве: