Разделы и потребители Apache Kafka: практический пример

Python-продюсер и потребитель Kafka, Kafka-Python, дата-инженер администратор кластера разработчик Apache Kafka примеры курсы обучение, обучение Apache Kafka, курсы Apache Kafka, Apache Kafka потребление сообщений курсы примеры обучение, потоковая и пакетная обработка данных примеры, обучение большим данным, Apache Kafka для дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Чтобы разобраться, как на самом деле работают разделы и потребители Apache Kafka, сегодня рассмотрим небольшой демонстрационный пример, иллюстрирующий потребление сообщений. Пишем Python-скрипты публикации и потребления сообщений из разных разделов топика Kafka с занесением данных в несколько вкладок Google-таблицы.

Как сообщения распределяются по разделам топика Kafka

Напомним, в Apache Kafka раздел топика является единицей параллелизма. На 1 раздел может быть лишь 1 активный потребитель в группе. Например, если в топике 3 раздела и 5 приложений-потребителей в одной группе, 3 потребителя будут распределены по имеющимся разделам, а 2 оставшихся будут простаивать. Хронологический порядок сообщений в Kafka гарантируется только в рамках раздела. Поэтому в топике с одним разделом проще обеспечить упорядочивание, чем с несколькими. Но один раздел не дает параллелизма и балансировки нагрузки. Чтобы найти компромисс между этими противоречивыми требованиями, в Apache Kafka есть несколько стратегий (принципов) партиционирования, т.е. распределения входящих сообщений по разделам топика. Наиболее наглядно различия этих принципов распределения сообщений по разделам показывает простая таблица сравнения их по достоинствам, недостаткам и вариантам использования.

Принцип распределения сообщений по разделам

Плюсы

Минусы

Варианты использования

Круговой перебор (Robin Round)

параллелизм и балансировка нагрузки, упорядоченность сообщений внутри раздела

не сохраняется общий хронологический порядок сообщений (вне раздела)

Используется по умолчанию. Сообщения отправляются всем разделам по кругу, обеспечивая сбалансированную нагрузку на узлы кластера

Задание ключа сообщению для отправки в раздел по хэш-функции от ключа

можно поддерживать порядок сообщений в пределах ключа

неравномерное распределение данных по разделам (несбалансированная нагрузка)

Сообщения с одним и тем же значением ключа попадут к одному и тому же потребителю

Пользовательский разделитель

упорядоченость сообщений согласно бизнес-логике, балансировка нагрузки и параллелизм

Усложнение программного кода приложения-продюсера, где задается раздел

Используется в случаях, когда нужно обеспечить упорядочивание сообщений, параллелизм и балансировку нагрузки

Пользовательский разделитель, а также задание ключа сообщению позволяют реализовать бизнес-логику маршрутизации потока данных подобно обменникам в RabbitMQ. Рассмотрим это на примере приема потока обращений в интернет-магазин. Обращения могут представлять собой заявки от частных лиц и корпоративных клиентов на покупку товаров, а также вопросы по работе магазина, доставке или оплате. Все входящие обращения будут публиковаться в топик Kafka, который состоит из 3-х разделов. В раздел с номером 0 будут попадать все вопросы, в раздел с номером 1 – все корпоративные заявки, а в раздел 2- частные. Как реализовать такое разделение в коде, рассмотрим далее.

Пример продюсера и потребителей на Python

Чтобы реализовать вышеописанную логику распределения сообщений по разделам, я создала топик Kafka на инстансе, развернутом в облачной serverless-платформе Upstash. Топик называется InputsTopic и имеет 3 раздела.

Топики Kafka Upstash
Конфигурации созданного топика Kafka на платформе Upstash

Код продюсера, отправляющего клиентские обращения, написан на Python с использованием библиотеки kafka-python:

#############ячейка в Google Colab №1 - установка и импорт библиотек################
#установка библиотек
!pip install kafka-python 

#импорт модулей 
import json
import random
from datetime import datetime
import time
from time import sleep
from kafka import KafkaProducer

#########ячейка в Google Colab №2 - публикация сообщений в Kafka#####################
# объявление продюсера Kafka
producer = KafkaProducer(
  bootstrap_servers=['........здесь взять название своего сервера.upstash.io:9092'],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username='.........здесь имя пользователя, взятое из upstash',
  sasl_plain_password='..........здесь пароль для этого пользователя',
  value_serializer=lambda v: json.dumps(v).encode('utf-8')
)

#бесконечный цикл публикации данных
#списки полей в заявке
names_fiz = ['Anna', 'Lisa', 'Ivan', 'Petr', 'Kira', 'Dima', 'Oleg', 'Zina', 'Vlad', 'Nick']
names_corp = ['ZAO SVAP', 'OOO Rim', 'ZAO Kisa', 'IP Lisana', 'OOO Zavod', 'OAO Tips', 'IP VAA']
products = ['bred', 'garlic', 'oil', 'apples', 'water', 'soup', 'dress', 'tea', 'cacao', 'coffee', 'bananas', 'butter', 'eggs', 'oatmeal']
questions = ['payment', 'delivery', 'discount', 'vip', 'staff']

while True:
  #подготовка данных для публикации в JSON-формате
  now=datetime.now()
  id=now.strftime("%m/%d/%Y %H:%M:%S")

  content = ''
  theme = ''
  corp = 0
  part = 0

  #подготовка списка возможных ключей маршрутизации (routing keys)
  corp = random.choice([1,0])

  if corp==1 :
    name = random.choice(names_fiz)
    routing_keys = ['app' + '.company.' + name, 'question']
  else:
    name = random.choice(names_corp)
    routing_keys = ['app', 'question']

  #случайный выбор одного из ключей маршрутизации (из routing keys)
  subject=random.choice(routing_keys)

  #добавление дополнительных данных для заголовка и тела сообщения в зависимости от темы заявки
  if subject=='question':
    theme = random.choice(questions)
    content = 'Hello, I have a question about ' + theme
    part=0 #все вопросы записывать в раздел 0
  else :
    theme ='app'
    if corp==1 :
      part=1 #все корпоративные заявки записывать в раздел 1
    else:
      part=2 #заявки от частных лиц записывать в раздел 2 
    content = random.choice(products) + ' ' + str(random.randint(0,100))

  #задаем ключ сообщения для Kafka
  mes_key = str.encode(subject+name)
  
  #создаем полезную нагрузку в JSON 
  data = {'id': id, 'name': name, 'subject': subject, 'content': content}

  #публикуем данные в Kafka
  future = producer.send('InputsTopic', value=data, partition=part, key=mes_key) #здесь вместо InputsTopic должно быть название вашего топика
  record_metadata = future.get(timeout=60)
  
  #вывод отладочной информации
  print(f' [x] Sent {record_metadata}') 
  print(f' [x] Corp =  {corp}')
  print(f' [x] Payload {data}')
  
  #повтор через 3 секунды
  time.sleep(3)  
#####################ячейка в Google Colab №3 - закрытие соединения###################
#Закрываем соединения
producer.close()

В этом коде сообщения формируются случайно и публикуются в топик каждые 3 секунды. В методе продюсера send() явно заданы и ключ партиционирования для сообщения, и раздел, в который следует его опубликовать. При явном указании раздела именно он является указателем для распределения сообщений в топике, а ключ сообщения при распределении не учитывается.

Как обычно, запускаю код в интерактивной среде Google Colab:

Kafka Python Google Colab примеры курсы обучение
Работа Python-продюсера для Kafka в Google Colab

Для наглядной демонстрации распределения потока сообщений по разделам я написала 3 почти одинаковых приложения-потребителя, каждый из которых считывает сообщения из своего раздела, и публикует их в соответствующей вкладке Google-таблицы. Код выглядит следующим образом:

####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
#установка библиотек 
!pip install kafka-python 

#импорт модулей 
from google.colab import auth
auth.authenticate_user()
import gspread
from google.auth import default
creds, _ = default()

import json
import random

from kafka import KafkaConsumer
from json import loads
from kafka.structs import TopicPartition

####################################ячейка в Google Colab №2 - потребление из Kafka###########################################
#объявление потребителя Kafka
consumer = KafkaConsumer(
  bootstrap_servers=['........здесь взять название своего сервера.upstash.io:9092'],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username='.........здесь имя пользователя, взятое из upstash',
  sasl_plain_password='..........здесь пароль для этого пользователя',
  group_id='1',
  auto_offset_reset='earliest',
  enable_auto_commit=True
)

#consumer.unsubscribe()

#consumer.unsubscribe()

#Google Sheets Autentificate
gc = gspread.authorize(creds)
# подписка потребителя на определенный раздел topic partition
#все вопросы лежат в разделе 0
#все корпоративные заявки лежат в разделе 1
#заявки от частных лиц лежат в разделе 2
part=1 #задание раздела
topic_partition = TopicPartition('InputsTopic', part)   # указываем имя топика и номер раздела
consumer.assign([topic_partition])

#Открытие заранее созданного файла Гугл-таблицы по идентификатору (взять из его URL, например, у меня это https://docs.google.com/spreadsheets/d/1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM)
sh = gc.open_by_key('1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM')
wks = sh.worksheet("Partition_1") #в какой лист гугл-таблиц будем записывать

#начальный номер строки для записи данных
x=1

#считывание из топика Kafka
for message in consumer:
  payload=message.value.decode("utf-8")
  data=json.loads(payload)
  
  #вывод исходных данных в консоль Goggle Colab
  print(data)
  
  #парсинг JSON-сообщения полезной нагрузки
  id=data['id']
  name=data['name']
  subject = data['subject']
  content = data['content']
  
  #вывод распарсенных данных в консоль Google Colab
  print('Заявка № ', id, ', клиент ', name, ', тема: ", subject, ', 'содержимое: ', content)

  #обновление данных в в Гугл-таблице
  global x
  #переход на следующую строку в гугл-таблицах
  x = x + 1

  #запись данных в ячейки гугл-таблицы
  wks.update_cell(x,1,id)
  wks.update_cell(x,2,name)
  wks.update_cell(x,3,content)
###################################ячейка в Google Colab №3 - закрытие соединения###########################################
#отписываем потребителя и закрываем соединение 
consumer.unsubscribe()
consumer.close()

Посмотрим, что получилось в Google-таблице. Как и ожидалось, все сообщения распределены по разделам топика согласно заданной бизнес-логике. Из этого топика их считывали 3 приложения-потребителя в одной группе, т.е. с одинаковым значением group.id, заданным при объявлении потребителя.

Kafka Python Google Sheets
Визуализация потребления сообщений из разных разделов одного топика Kafka с помощью вкладок Google-таблиц

В заключение рассмотрим график публикации и потребления сообщений в GUI платформы Upstash.

Kafka Upstash примеры курсы обучение
Визуализация публикации и потребления сообщений топика Kafka в GUI Upstash

Как считать данные из Kafka повторно и/или с определенного момента времени, читайте в нашей новой статье.

Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html
  2. https://github.com/AnnaVichugova/Kafka-RQM/blob/main/Kafka%20Producer
  3. https://github.com/AnnaVichugova/Kafka-RQM/blob/main/Kafka_Consumer_2_Google_sheets_with_Redis
Поиск по сайту