Под капотом продюсера Kafka: UML-диаграмма публикации сообщений

Python-продюсер Kafka, Kafka-Python, дата-инженер администратор кластера разработчик Apache Kafka конфигурации примеры курсы обучение, обучение Apache Kafka, курсы Apache Kafka, Apache Kafka Streams курсы примеры обучение, потоковая и пакетная обработка данных примеры, обучение большим данным, Apache Kafka для дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Как на самом деле работает приложение-продюсер Apache Kafka: разбираемся с конфигурациями и составляем UML-диаграмму последовательности системных вызовов при публикации сообщений в топик.

Как работает продюсер Kafka

Когда разработчик пишет приложение-продюсер, которое публикует сообщение в топик Apache Kafka, он использует методы специальных библиотек, таких как kafka-python и пр. Достаточно только создать экземпляры нужных классов. Пример такого скрипта смотрите в нашей новой статье. Например, в следующем участке кода, который можно запустить в нескольких ячейках Google Colab, устанавливаются необходимые библиотеки и импортируются модули:

####################ячейка в Google Colab №1 - установка и импорт библиотек############################
#установка библиотек
!pip install kafka-python 

#импорт модулей 
from kafka import KafkaProducer

Далее создадим экземпляр класса продюсера, задав параметры подключения к инстансу Kafka, развернутого в облачной платформе Upstash. О том, как создать свой инстанс в Upstash, а также написать свой Python-продюсер и потребитель, я подробно рассказывала в статье блога нашей Школы прикладного бизнес-анализа.

##################ячейка в Google Colab №2 - публикация сообщений в Kafka##################
# объявление продюсера Kafka
producer = KafkaProducer(
  bootstrap_servers=['........здесь взять название своего сервера.upstash.io:9092'],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username='.........здесь имя пользователя, взятое из upstash',
  sasl_plain_password='..........здесь пароль для этого пользователя',
  value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

После задания ключа партиционирования (key) и определения полезной нагрузки (data) для отправки сообщения в топик под названием InputsTopic с 3-мя разделами вызывается метод send(), который есть у класса продюсер:

  #публикуем данные в Kafka 
future = producer.send('InputsTopic', value=data, key=mes_key)
record_metadata = future.get(timeout=60)
print(f' [x] Sent {record_metadata}')

Продюсер является потокобезопасным, и совместное использование одного экземпляра продюсера между потоками быстрее, чем наличие нескольких экземпляров. Продюсер состоит из пула буферного пространства, содержащего записи, которые еще не были переданы на сервер, а также фонового потока ввода-вывода (Producer Network Thread), который отвечает за преобразование этих записей в запросы и их передачу в кластер Kafka. Как выполняется этот процесс, смотрите здесь и здесь.

Метод send() является асинхронным: при вызове он добавляет запись в буфер ожидающих отправки записей и сразу же возвращается, позволяя продюсеру группировать отдельные записи для повышения эффективности. О том, как Kafka обеспечивает транзакционную атомарнонсть публикации сообщений, читайте в нашей новой статье.

Под капотом этих методов библиотеки kafka-python происходит следующее:

  1. Экземпляр класса Producer получает вызов метода send(), в параметрах которого указан топик, данные сообщения и другие параметры, например, ключ партиционирования (key). В вышеприведенном примере ключ партиционирования задан параметром key метода send(). Сообщения с одним и тем же ключом помещаются в один и тот же раздел. Это удобно для группировки сообщений. Например, чтобы события поведения одного и того же пользователя. Если ключ партиционирования не задан, сообщения будут записываться в существующие разделы топика по принципу циклического перебора.
  2. Как только раздел определен, продюсер передает данные в сетевой поток Producer Network Thread, который отвечает за преобразование этих записей в запросы и их передачу в кластер Kafka.
  3. Сетевой поток Producer Network Thread ищет лидера раздела, чтобы отправить ему сообщение, т.к. именно лидер раздела позволяет записывать в Kafka новые данные, а подписчики реплицируют их. Для этого Producer Network Thread получает метаданные от списка брокеров Kafka, заданных в конфигурации bootstrap_servers, где задан набор сокетов узлов кластера, т.е. их IP-адреса с номерами портов, обычно равных 9092. При развертывании облачного инстанса в Upstash список серверов Kafka задается в виде виртуального адреса хоста, например, merry-perry-kafka.upstash.io:9092. По сути, bootstrap.servers — это первые серверы Kafka, с которыми продюсер должен связаться для получения конфигурации кластера. Подробнее о том, как клиенты Kafka общаются с серверами, мы писали здесь.
  4. Получив информацию о лидере раздела, продюсер добавляет сообщение в локальную очередь.
  5. Когда размер этой очереди превысит значение конфигурации batch.size или истечет тайм-аута задержки отправки, заданный в конфигурации ms, пакет сообщений из локальной очереди отправляется лидеру раздела, расположенного на брокере — одном из узлов кластера. В рассмотренном примере параметр linger_ms в методе producer.send() не задан.
  6. Если продюсер ожидает подтверждения о репликации сообщения с лидера раздела по всем подписчикам, информация о подтверждении принимается от синхронизированных реплик (ISR), и ответ возвращается в сетевой поток продюсера. В нашем рассмотренном примере конфигурация acks при объявлении продюсера не задана, а потому равна значению по умолчанию 1. Это означает, что продюсер ожидает, только когда лидер раздела запишет запись только в свой локальный журнал, не дожидаясь подтверждения от всех подписчиков. В этом случае, если лидер выйдет из строя сразу после подтверждения записи, но до того, как подписчики реплицируют ее, данные будут потеряны. Поэтому для повышения надежности можно задать конфигурации acks значение all, чтобы продюсер ждал, пока все подписчики реплицируют отправленные лидеру данные и подтвердят это. Это самая надежная гарантия доступности: запись не будет потеряна, пока хотя бы одна синхронизированная реплика остается активной на любом брокере. Однако, такая настройка снижает общее быстродействие системы из-за необходимости ожидания подтверждений.
  7. Сетевой поток продюсера выполняет обратный вызов (callback), выводя информацию о метаданных, что задано в строке print(f’ [x] Sent {record_metadata}’).

Визуализируем это в виде UMД-диаграммы последовательности:

UML sequence Kafka Producer
UML-диаграмма последовательности публикации сообщений в топик Kafka

Эта UML-диаграмма последовательности была создана в PlantUML с помощью следующего скрипта:

@startuml
skinparam packageStyle rectangle
participant Producer
participant Producer_Network_Thread
autonumber
box Kafka #LightYellow
participant Bootstrap_servers
participant Broker_Leader
participant Broker_Follower_1
participant Broker_Follower_2
participant Broker_Follower_3
-> Producer : send(topic, message)
activate Producer
Producer -> Producer : determine partition
Producer -> Producer: add message to local queue
Producer -> Producer: check batch.size / linger.ms
Producer -> Producer_Network_Thread : transform message to query
activate Producer_Network_Thread
Producer_Network_Thread -> Bootstrap_servers: get cluster metadata 
activate Bootstrap_servers 
Bootstrap_servers -> Bootstrap_servers : find leader
Bootstrap_servers --> Producer_Network_Thread : leader
Producer_Network_Thread  --> Producer : cluster metadata 
Producer -> Broker_Leader : publish message
activate Broker_Leader
alt acks = all
Broker_Leader -> Broker_Follower_1 : fetch(message)
activate Broker_Follower_1
Broker_Follower_1 --> Broker_Leader : ack
Broker_Leader -> Broker_Follower_2 : fetch(message)
activate Broker_Follower_2
Broker_Follower_2 --> Broker_Leader : ack
Broker_Leader -> Broker_Follower_3 : fetch(message)
activate Broker_Follower_3
Broker_Follower_3 --> Broker_Leader : ack
 Broker_Leader --> Producer : acks
end
 <- Producer_Network_Thread : callback()
end box
@enduml

В следующий раз мы заглянем под капот потребителя, разобрав системные вызовы и конфигурации класса Consumer в клиентской библиотеке kafka-python, а также составим подобную UML-диаграмму последовательности. А как Kafka справляется со сбоями приложений-продюсера, которые приводят к проблеме разделенного мозга в распределенной системе, читайте здесь.

Больше деталей про администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://kafka.apache.org/34/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
  2. https://kafka-python.readthedocs.io/en/master/apidoc/KafkaProducer.html
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту