Недавно мы писали про группы общего доступа в Apache Kafka, которые планируется реализовать в KIP-932. Сегодня рассмотрим, как именно это предполагается сделать.
Принципы работы группы общего доступа
Предложение по улучшению Kafka (KIP, Kafka Improvement Proposal) предполагает внесение значительных изменений. Все начинается с публикации предложения, которое рассматривается сообществом, комментируется и пересматривается до тех пор, пока все не будут им довольны тем. Затем члены сообщества голосуют за KIP. Если предложение получает 3 или более положительных голосов от лидеров проекта, оно принимается, разрабатывается код по его реализации и объединяется с Kafka.
Создание значимого нового KIP и его реализация — дорогостоящее и не всегда быстрое мероприятие. Поэтому пока не ясно, когда будет реализован предложенный в июне 2023 года KIP-932, который представляет новый тип групп, называемых общими группами или группами общего доступа (share group) для обеспечения совместного потребления из топика. Эти общие группы реализуют очереди для Kafka, без максимальной глубины очереди и возможностью сброса на определенное время для восстановления на момент времени.
Для реализации этой концепции в Kafka предполагается следующие компоненты:
- Координатор общей группы — брокер, который поддерживает список членов общей группы и управляет назначениями разделов топика для членов общей группы. Для этого каждому потребителю можно предоставить список всех разделов топика, которые соответствуют его подпискам, а затем использовать специальный протокол для извлечения записей из всех разделов. В более сложной реализации можно использовать метрики загрузки разделов топика и задержек для распределения разделов между потребителями в качестве автономного самобалансирующегося назначения разделов , чтобы направить больше потребителей на более загруженные разделы. В этом случае выборку можно делать на основе push-уведомлений.
- Share-partition— топик-раздел с подпиской в общей группе, если на раздел топика, подписано более одной группы общего доступа.
- Лидер общего раздела — компонент брокера, который управляет представлением группы общего доступа к разделу топика. Он совмещен с лидером раздела, и лидерство общего раздела следует за лидером раздела топика. Лидер общего раздела извлекает записи из диспетчера реплик из локальной реплики, управляет и сохраняет состояния записей в полете.
Для каждого общего раздела группа общего доступа добавляет некоторое управление состоянием для потребляемых записей. Начальное смещение записей, подходящих для использования, называется начальным смещением общего раздела (SPSO, share-partition start offset), а последнее смещение записей, подходящих для использования, называется конечным смещением общего раздела (SPEO, share-partition end offset). Таким образом, общий раздел, по сути, управляет потреблением текущих записей.
SPEO не обязательно всегда находится в конце раздела топика, он свободно перемещается по мере того, как записи извлекаются за пределы этой точки. Сегмент раздела топика между SPSO и SPEO представляет собой скользящее окно, которое перемещается по мере использования записей. Лидер разделения акций ограничивает расстояние между SPSO и SPEO. Верхняя граница определяется конфигурацией брокера share.record.lock.partition.limit. В отличие от существующих систем очередей, здесь нет максимальной глубины очереди, но есть ограничение на количество находящихся в пути записей в любой момент времени.
Записи в общем разделе находятся в одном из четырех состояний:
- доступное для потребителя (Available);
- потребленное — запись получена для конкретного потребителя с блокировкой получения с ограничением по времени (Acquired);
- подтвержденное — запись обработана и подтверждена потребителем (Acknowledged);
- архивное — запись недоступна для потребителя (Archived).
Все записи перед SPSO находятся в архивном состоянии. Все записи после SPEO находятся в доступном состоянии, но еще не доставлены потребителям. Записи также имеют счетчик доставки, чтобы предотвратить бесконечную доставку необработанных записей потребителям. Если запись неоднократно вызывает исключения во время ее обработки, то она имеет неправильный формат или схему данных. Каждый раз, когда запись потребляется потребителем в группе общего доступа, ее счетчик доставки увеличивается на 1. При первом получении записи ее счетчик доставки равен 1.
Лидер общего раздела сохраняет состояния и количество доставок. Эти обновления не выполняются с семантикой ровно один раз, поэтому нельзя полагаться на точность счетчика доставки во всех ситуациях. Он предназначен для защиты от подозрительных сообщений, а не для точного подсчета количества раз, когда запись доставляется потребителю.
Когда записи извлекаются для потребителя, лидер общего раздела начинает с SPSO и находит доступные записи. Для каждой найденной записи он переводит ее в потребленное состояние, увеличивает счетчик доставки и добавляет ее в пакет полученных записей для возврата потребителю. Затем потребитель обрабатывает записи и подтверждает их использование. Попытка доставки завершается успешно, и записи переходят в подтвержденное состояние.
Если потребитель не может обработать запись или истекает срок его блокировки получения, попытка доставки завершается неудачно, и следующее состояние записи зависит от счетчика доставки. Если число доставок достигло предела количества попыток доставки общего ресурса кластера (5 по умолчанию), запись переходит в архивное состояние и не имеет права на дополнительные попытки доставки. Если количество доставок не достигло предела, запись возвращается в доступное состояние и может быть доставлена снова. Таким образом, поведение доставки реализует семантику по крайней мере один раз (at-least-once), которая допускает возможность дублей: продюсер может отправить сообщение несколько раз, и потребитель будет обрабатывать его каждый раз. Это происходит, если продюсер не получил подтверждения по истечении определенного времени или получил ошибку. При этом сообщение может быть дублировано, если брокер дал сбой непосредственно перед отправкой подтверждения, но после успешной записи сообщения в топик Kafka.
Группы общего доступа ориентированы в первую очередь на совместное использование, чтобы позволить потребителям масштабироваться независимо от разделов. Записи в общем разделе могут быть доставлены потребителю не по порядку, когда происходит повторная доставка. Например, если два потребителя в группе общего доступа подписаны на топик с одним разделом. Первый потребитель получает записи от 100 до 109 включительно, а затем дает сбой. В то же время второй потребитель извлекает, обрабатывает и подтверждает записи со 110 по 119. Когда второй потребитель снова извлекает, он получает записи со 100 по 109 со значением счетчика доставки, равным 2, поскольку они доставляются повторно. При этом смещения не обязательно монотонно увеличиваются, как это происходит для классической группы потребителей. Записи, возвращаемые в пакете для определенного общего раздела, гарантированно располагаются в порядке возрастания смещения. Но нет никаких гарантий относительно порядка смещений между различными партиями.
Концепции поиска и положения группы потребителей не применимы к общего доступа. SPSO для каждого общего раздела может быть инициализирован для пустой группы общего доступа, и SPEO естественным образом перемещается вперед по мере использования записей. Когда подписка на раздел впервые добавляется в группу общего доступа, SPSO инициализируется для каждого раздела общего доступа. По умолчанию SPSO для каждого общего раздела инициализируется последним смещением для соответствующих разделов темы.
Вместо этого можно использовать инструмент AdminClient.alterShareGroupOffsets или kafka—share-groups.sh для сброса SPSO для пустой группы общего доступа без активных участников . Так можно сбросить смещение группы общего доступа к началу топика, определенной метке времени или концу топика, а также инициализировать группы общего доступа к началу топика. При сбросе SPSO отбрасываются все текущие состояния записи и счетчики доставки.
Если количество разделов увеличивается для топика в группе общего доступа, SPSO для вновь созданных разделов общего доступа инициализируется равным 0. Если SPSO сбрасывается на смещение, которое было привязано к удаленному многоуровневому хранилищу Kafka, это повлияет на производительность, как и в случае с существующими потребителями, извлекающими записи из удаленного хранилища.
Пакетная обработка в Kafka используется для максимизации производительности системы, например, когда потребитель извлекает записи, лидер общего раздела возвращает полные пакеты записей. В обычном и оптимальном случае все записи в этом пакете будут в доступном состоянии, и могут быть переведены в потребленное состояние с одинаковым временем ожидания блокировки получения. Когда потребитель обработает извлеченные записи, он может подтвердить доставку всех записей в виде одного пакета, переводя их все в подтвержденное состояние. Таким образом, когда группа потребителей совместно использует топик в группе общего доступа, естественной единицей совместного использования является пакет записей.
Изменение API потребителя Kafka
Для поддержки групп общего доступа будет изменен API Consumer. Чтобы присоединиться к общей группе, клиентское приложение создает экземпляр, KafkaConsumer, используя параметры конфигурации group.type=»share» и group.id, предоставляя идентификатор общей группы. Затем используется метод subscribe() для предоставления списка топиков, из которых надо потреблять данные. Потребитель не может назначать разделы сам себе.
Каждый вызов KafkaConsumer.poll(Duration) извлекает данные из любого из разделов топика для топика, на которые он подписан. Он возвращает набор текущих записей, полученных для этого потребителя в течение времени ожидания блокировки получения. Для эффективности потребитель предпочтительно возвращает полные наборы записей без пробелов. Затем приложение обрабатывает записи и подтверждает их доставку, используя явное или неявное подтверждение.
Если приложение вызывает новый метод KafkaConsumer.acknowledge(ConsumerRecord, AcknowledgeType) для любой записи в пакете, оно использует явное подтверждение. Такие вызовы должны выполняться в том порядке, в котором записи появляются в объекте ConsumerRecords, то есть в порядке увеличения смещения для каждого общего раздела. В этом случае:
- выполняются вызовы приложения потребителя Kafka, которые передают подтверждения. Если какие-либо записи в пакете не были подтверждены, они остаются полученными и будут представлены приложению в ответ на будущий опрос, т.е. poll()-вызов;
- потребитель вызывает метод poll(Duration) без предварительной фиксации, что асинхронно передает подтверждения Kafka. Исключение не возникает из-за невозможности отправить подтверждение. Если какие-либо записи в пакете не были подтверждены, они остаются полученными и будут представлены приложению в ответ на будущий опрос.
- вызывается метод KafkaConsumer.close(), который пытается зафиксировать любые ожидающие подтверждения и освобождает все оставшиеся полученные записи;
Если приложение-потребитель не запрашивает подтверждение записи в пакете, оно использует неявное подтверждение, то все доставленные записи успешно обработаны, и передает подтверждения Kafka. Вызов метода KafkaConsumer.poll(Duration) без фиксации, что также неявно подтверждает все доставленные записи и асинхронно передает подтверждения в Kafka. В этом случае не возникает никаких исключений из-за сбоя подтверждения. Вызов KafkaConsumer.close() выпускает любые полученные записи без подтверждения. Записи, возвращаемые в объекте ConsumerRecords для определенного общего раздела, расположены в порядке возрастания смещения. Для каждого общего раздела его лидер гарантирует, что подтверждения для записей в пакете выполняются атомарно. Подробнее о том, как происходит потребление из топиков Kafka, мы писали здесь.
Управление доступом к группе общего доступа выполняется для группы общего доступа как и для групп потребителей, с аналогичными правилами для проверяемых действий. Операциям, которые считывают информацию о группе общего доступа, требуется разрешение на выполнение действия DESCRIBE над ресурсом именованной группы. Операции, которые изменяют информацию о группе общего доступа, например, потребление записи, требуют разрешения для выполнения действия READ над ресурсом именованной группы.
Лидер общего раздела отвечает за запись устойчивого состояния общих разделов, которыми он руководит. Для каждого общего раздела нужно иметь возможность восстановить начальное смещение общего раздела (SPSO), состояние записей и количество записей, доставка которых не удалась. Счетчики доставки поддерживаются только приблизительно, а состояние Acquired не сохраняется. Это сводит к минимуму количество состояний общих разделов, которые необходимо регистрировать. Ожидается, что большинство записей будут извлекаться и подтверждаться пакетами только с одной попыткой доставки.
В заключение отметим, что KIP-932 вводит счетчики доставки и максимальное количество попыток доставки. Основное внимание в этом KIP уделяется обмену, а не упорядочению. Концепцию можно расширить, чтобы обеспечить упорядочение на основе ключей, чтобы одновременно можно было достичь частичного упорядочения и мелкозернистого совместного использования.
Возможно, в будущем в Kafka добавится возможность копирования записей, которые не удалось доставить, в очередь недоставленных сообщений. Это позволит обрабатывать сообщения с некорректной структурой и/или форматом без чрезмерного усложнения кода приложения-потребителя. Также может быть реализован потребитель, который не изменяет состояние группы общего доступа и не берет блокировки получения. Реализовать это не так просто, поскольку такой потребитель должен иметь позицию, независимую от SPSO, чтобы перемещаться по очереди.
Наконец, KIP-932 не включает поддержку подтверждения доставки с использованием транзакций для строго однократной семантики доставки (exactly once). Хотя концептуально это не сложно, потребуются изменения в API. Пока же предлагаемые в KIP-932 изменения расширяют возможности Apache Kafka, кардинально не изменяя существующее поведение.
Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Администрирование кластера Kafka
- Apache Kafka для инженеров данных
- Администрирование Arenadata Streaming Kafka
Источники