Очереди и группы общего доступа для потребителей Apache Kafka: разбираемся с KIP-932

KIP-932, группы потребителей и группы общего доступа Kafka, потребитель Kafka, дата-инженер разработчик Apache Kafka примеры курсы обучение, обучение Apache Kafka, курсы Apache Kafka, Apache Kafka потребление сообщений курсы примеры обучение, потоковая и пакетная обработка данных примеры, обучение большим данным, Apache Kafka для дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Что такое группы общего доступа для потребителей, чем это отличается от существующей концепции группы потребителей, почему в Apache Kafka появляются очереди и чем это улучшит потоковую обработку событий.

Что такое KIP-932: группы общего доступа потребления данных из Apache Kafka

Напомним, группы потребителей в Kafka предназначены для повышения надежности упорядоченной доставки потоков событий распределенному приложению. Каждому потребителю в группе предоставляется эксклюзивный доступ к потреблению из назначенных ему разделов. Поэтому должно быть как минимум столько же разделов, сколько и потребителей. Эта идея позволяет повысить пропускную способность приложения-потребителя, существующего в нескольких экземплярах, каждый из которых считывает данные из назначенного ему раздела в хронологическом порядке происхождения этих событий. Но на практике упорядочивание не всегда нужно. Зависимость между количеством потребителей в группе потребителей и количеством разделов приходится соблюдать просто для того, чтобы обеспечить достаточное параллельное потребление в пиковые нагрузки.

Однако, в реальности приложениям-потребителям бывает необходимо просто совместно потреблять из потока событий без необходимости предоставления монопольного доступа к определенным разделам топика. Это, вместе с подтверждением каждого сообщения и подсчетом доставки, позволяет использовать класс вариантов использования, традиционно построенных вокруг концепции очереди. Например, очередь идеально подходит для ситуации, когда сообщения представляют собой независимые рабочие элементы, которые могут обрабатываться одновременно пулом приложений, а затем индивидуально повторяться или подтверждаться по завершении обработки. Этого гораздо проще добиться, используя очередь, а не топик с множеством разделов, каждый из которых назначен на одного потребителя в группе.

Поэтому в июне 2023 года было внесено очередное предложение по улучшению проекта Kafka (KIP, Kafka Improvement Proposal) под номером 932. KIP-932 представляет новый тип групп, называемых общими группами (share group) или группами общего доступа, которые не заменяют группы потребителей, а дополняют их. Можно выбрать, какую группу использовать, исходя из желаемого сценария потребления.

Kafka consumer group and share group, Группы потребителей и группы общего доступа в Apache Kafka, KIP-932
Группы потребителей и группы общего доступа в Apache Kafka

Этот KIP представляет концепцию группы общего доступа как способ обеспечения совместного потребления из топика Kafka. Можно думать о группе общего доступа примерно как о долговременной общей подписке в существующих системах. По сути, такие общие группы реализуют очереди для Kafka, без максимальной глубины очереди и возможностью сброса на определенное время для восстановления на момент времени.

Фундаментальные различия между группой общего доступа и обычной группой потребителей заключаются в следующем:

  • потребители в общей группе совместно потребляют записи без эксклюзивного назначения им разделов;
  • количество потребителей в группе общего доступа может превышать количество разделов;
  • записи подтверждаются на индивидуальной основе, хотя система оптимизирована для работы в пакетном режиме для повышения эффективности;
  • попытки доставки потребителям в группе общего доступа подсчитываются, чтобы обеспечить автоматическую обработку необработанных записей.

Таким образом, общие группы — это новый вид групп, наряду с существующими группами потребителей. Потребитель указывает, что он хочет использовать группу общего доступа, используя параметры конфигурации потребителя group.type=»share» и файлы group.id. Все потребители в одной группе общего доступа, подписанные на один и тот же топик, совместно потребляют записи из него. Если к топику обращаются потребители из более чем одной группы общего доступа, каждая группа общего доступа совместно использует эту тему независимо от других групп общего доступа. Каждый потребитель может динамически устанавливать список топиков, на которые он хочет подписаться. На практике все потребители в группе общего доступа обычно подписываются на одни и те же топики.

Когда потребитель в общей группе извлекает записи, он получает доступные записи из любого раздела топика, которые соответствуют его подпискам. Записи потребляются приложением-потребителем с блокировкой получения с ограничением по времени. Пока запись получена, она недоступна для другого потребителя. По умолчанию продолжительность блокировки составляет 30 секунд, но ею также можно управлять с помощью конфигурации потребителя. Идея состоит в том, что блокировка автоматически снимается по истечении срока действия блокировки, после чего запись становится доступной для передачи другому потребителю. Потребитель, который удерживает блокировку, может справиться с ней следующими способами:

  • потребитель может подтвердить успешную обработку записи, отправив подтверждение (ack, acknowledge);
  • потребитель может разблокировать запись, что делает запись доступной для другой попытки доставки;
  • потребитель может отклонить запись, что указывает на невозможность ее обработки, например, из-за изменения схемы или формата полезной нагрузки. В этом случае повторные попытки доставки этой же самой записи не предпринимаются. Подробнее про очереди недоставленных сообщений в JMS-брокере RabbitMQ И Apache Kafka читайте в этой статье.
  • если приложение-потребитель не делает ничего, блокировка автоматически снимается по истечении времени блокировки (по умолчанию 30 секунд).

Кластер ограничивает количество записей, получаемых потребителями для каждого раздела топика в общей группе. Как только лимит будет достигнут, выборка записей временно перестанет давать новые записи, пока количество полученных записей не уменьшится, что обычно происходит по истечении времени блокировки. Это ограничение контролируется параметром конфигурации брокера share.record.lock.partition.limit. Благодаря ограничению продолжительности блокировки и ее автоматическому снятию, брокер обеспечивает доставку даже при наличии сбоев у потребителя. Как это может быть реализовано технически, мы рассмотрим в следующей статье, а пока разберемся с основными принципами работы групп общего доступа.

Как работают группы общего доступа

Потребители в группе общего доступа взаимодействуют друг с другом способом, знакомым пользователям традиционных очередей сообщений. Можно писать приложения-потребители из Kafka, которые используют группы общего доступа для использования записей из топиков, просто увеличивая количество потребителей, не беспокоясь о разделах. Это похоже на использование традиционной очереди сообщений с несколькими приложениями-потребителями.

В группе общего доступа каждому потребителю назначаются все разделы для совместного использования со всеми другими потребителями, а затем им предоставляются отдельные наборы записей для использования. Это очень похоже на долгосрочную общую подписку в JMS-брокерах.

Потребители могут подтвердить доставку записи по отдельности, но обычно они делают это партиями за раз.  Вот пример фрагмента кода Java, взятого из KIP-932 для приложения, которое использует группу общего доступа и подтверждает записи как пакет:

Properties props = new Properties(); 
props.setProperty("bootstrap.servers", "localhost:9092"); props.setProperty("enable.auto.commit", "false"); 
props.setProperty("group.type", "share"); 
props.setProperty("group.id", "myshare"); 
KafkaConsumer<String, String> consumer = 
new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer()); consumer.subscribe(Arrays.asList("foo")); 
while (true) { 
// Fetch a batch of records acquired for this consumer
 ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofMillis(100)); 
for (ConsumerRecord<String, String> record : records) { 
doProcessing(record); } // Commit the acknowledgement of all the records in the batch consumer.commitSync(); 
}

Свойство group.type со значением share указывает на группу общего доступа. Каждый вызов poll() возвращает пакет записей, которые затем обрабатываются, а вызов commitSync() подтверждает их доставку. Приложение-потребитель из Kafka использует расширение клиентского API, поэтому можно запускать его в нескольких экземплярах, не беспокоясь о разделах. Это похоже на классическую очередь сообщений, где сообщение в очереди становится доступным для потребления и потребляется с помощью JMS API. После этого сообщение логически, но еще не физически удаляется из очереди, и становится невидимо для других приложений. Как только приложение-потребитель подтверждает доставку, сообщение фактически удаляется из очереди. 

Очереди в Kafka с группами общего потребления реализуют аналогичную идею состояний для записей, находящихся в процессе доставки. Набор текущих записей для общего раздела имеет начальное смещение и конечное смещение. По мере того, как регистрируется полная доставка, начальное и конечное смещения перемещаются вперед по разделу. Расстояние между этими смещениями ограничено максимальным количеством текущих записей только в целях управления ресурсами. Сегмент раздела между начальным смещением и конечным смещением представляет собой скользящее окно, которое перемещается по мере использования записей. В случае идеальной потоковой обработки скользящее окно текущих записей большую часть времени находится близко к концу раздела, поэтому потребители не отстают от поступления новых записей.

Когда запись извлекается для потребителя в группе общего доступа, она изменяет свое состояние и остается в нем в течение ограниченного периода времени (по умолчанию 30 секунд), которого должно быть достаточно для обработки записи и ее подтверждения в большинстве случаев. Когда потребитель подтверждает запись, это состояние меняется. Но если запись не подтверждена достаточно быстро из-за сбоя приложения-потребителя или невозможности обработать запись, она становится доступной для другой попытки доставки.

Попытки доставки подсчитываются, чтобы автоматически обрабатывать необработанные записи. По умолчанию для каждой записи разрешено 5 попыток доставки, после чего она логически архивируется и больше не будет доставлена. На практике это означает, что случайные некорректные сообщения ничего не ломают, а лишь вызывают временное неудобство.

При использовании группы потребителей в Kafka записи не имеют состояний. Группа потребителей имеет фиксированное смещение для каждого раздела топика, а записи имеют смещения либо до, либо после этого зафиксированного смещения. В группах потребителей вообще нет управления состоянием для каждой записи.

Таким образом, группы общего доступа позволяют использовать записи из топиков также, как из очередей. Можно взять существующее приложение, которое использует очередь, и преобразовать его для использования топика и группы общего доступа, что намного проще, чем с группой потребителей. Но, чтобы не вводить очереди как отдельный ресурс и исключить путаницу с топиками, KIP-932 представляет группы общего доступа как новый способ использования записей из топиков. Для приложений-производителей ничего не меняется. 

На один и тот же топик можно подписать несколько групп общего доступа, которые будут независимо управлять записями в процессе выполнения, обеспечивая потребление данных несколькими приложениями-потребителями. При совмещении групп потребителей и групп общего доступа сценарий использования будет похож на pub/sub в терминологии JMS-брокеров. Однако, группы общего доступа не обеспечивают упорядочивание событий, т.к. нет привязки к разделам. Группы общего доступа не поддерживают семантику строго однократной доставки.

В заключение подчеркнем, что KIP-932, даже если он будет реализован, пока не сможет заменить традиционные JMS-брокеры с их очередями сообщений. Kafka не обеспечит двухфазную фиксацию и совместимость API с legacy-системами. Чем Kafka отличается от JMS-брокеров, мы подробно писали в этой статье. Кроме того, очереди в Kafka не похожи на очереди в JMS-брокерах по следующим отличиям:

  • топик Kafka не имеет максимальной глубины очереди – хотя необходимо обеспечить корреляцию темпа потребления с темпом публикации сообщений, временное отставание большого количества записей во время ежедневного пика не особо влияет на производительность системы;
  • поскольку общий раздел является просто скользящим окном в разделе топика Kafka, можно сбросить его положение в любой момент времени, чтобы воспроизвести сообщения, поскольку Kafka хранит все записи до тех пор, пока не сработает политика очистки, о чем мы писали здесь.

Таким образом, группа общего доступа — это не радикально новая концепция, а всего лишь способ потребления записей из топиков Kafka, который расширяет возможности этой распределенной платформы потоковой передачи событий, предоставляя альтернативную модель использования приложений. Подобно тому, как некоторые традиционные JMS-брокеры поддерживают публикацию/подписку, даже если они основаны на очередях, очереди Kafka обеспечивают потребление данных приложениями в группе общего доступа, несмотря на разделы топика. Как это будет реализовано, мы рассмотрим в следующий раз.

Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/@andrew_schofield/queues-for-kafka-29afa8aeed86
  2. https://cwiki.apache.org/confluence/display/KAFKA/KIP-932%3A+Queues+for+Kafka
Поиск по сайту