Как считать данные из Apache Kafka с определенного момента: пишем Python-скрипт

Python-продюсер и потребитель Kafka, Kafka-Python, дата-инженер разработчик Apache Kafka примеры курсы обучение, обучение Apache Kafka, курсы Apache Kafka, Apache Kafka потребление сообщений курсы примеры обучение, потоковая и пакетная обработка данных примеры, обучение большим данным, Apache Kafka для дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Иногда возникает потребность в повторном чтении данных из Apache Kafka с определенного момента времени. Сегодня рассмотрим, как это сделать, написав простенький Python-скрипт потребления из раздела топика.

Публикация данных в Kafka

В качестве примера возьмем ранее рассмотренный в этой статье кейс приема потока обращений в интернет-магазин. Обращения могут представлять собой заявки от частных лиц и корпоративных клиентов на покупку товаров, а также вопросы по работе магазина, доставке или оплате. Все входящие обращения будут публиковаться в топик, который состоит из 3-х разделов. В раздел с номером 0 будут попадать все вопросы, в раздел с номером 1 – все корпоративные заявки, а в раздел 2- частные. Как обычно, я создала топик Kafka на инстансе, развернутом в облачной serverless-платформе Upstash. Топик называется InputsTopic и имеет 3 раздела.

Топики Kafka Upstash
Конфигурации созданного топика Kafka на платформе Upstash

Код продюсера, отправляющего клиентские обращения, написан на Python с использованием библиотеки kafka-python. В этом коде с использованием библиотеки Faker случайно формируются сообщения от частных или корпоративных клиентов и публикуются в топик каждые 3 секунды. Разделение сообщений по разделам реализуется с помощью явного указания раздела в методе продюсера send(). В этом случае ключ партиционирования игнорируется.

Сперва установим библиотеки и импортируем необходимые для работы модули:

####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
#установка библиотек
!pip install kafka-python 
!pip install faker

#импорт модулей 
import json
import random
from datetime import datetime
import time
from time import sleep
from kafka import KafkaProducer

# Импорт модуля faker
from faker import Faker

Затем напишем сам скрипт формирования и публикации заявок:

####################################ячейка в Google Colab №2 - публикация сообщений в Kafka###########################################
# объявление продюсера Kafka
producer = KafkaProducer(
  bootstrap_servers=[kafka_url],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username=username,
  sasl_plain_password=password,
  value_serializer=lambda v: json.dumps(v).encode('utf-8'),
)

# Создание объекта Faker с использованием провайдера адресов для России
fake = Faker()

#списки полей в заявке
products = ['bred', 'garlic', 'oil', 'apples', 'water', 'soup', 'dress', 'tea', 'cacao', 'coffee', 'bananas', 'butter', 'eggs', 'oatmeal']
questions = ['payment', 'delivery', 'discount', 'vip', 'staff']

#бесконечный цикл публикации данных
while True:
  #подготовка данных для публикации в JSON-формате
  now=datetime.now()
  id=now.strftime("%m/%d/%Y %H:%M:%S")

  content = ''
  theme = ''
  corp = 0
  part = 0

  #подготовка списка возможных ключей маршрутизации (routing keys)
  corp = random.choice([1,0])

  if corp==1 :
    #name = random.choice(names_corp)
    name=fake.company()
    routing_keys = ['app' + '.company.' + name, 'question']
  else:
    #name = random.choice(names_fiz)
    name=fake.name()
    routing_keys = ['app', 'question']

  #случайный выбор одного из ключей маршрутизации (из routing keys)
  subject=random.choice(routing_keys)

  #добавление дополнительных данных для заголовка и тела сообщения в зависимости от темы заявки
  if subject=='question':
    theme = random.choice(questions)
    content = 'Hello, I have a question about ' + theme
    part=0 #все вопросы записывать в раздел 0
  else :
    theme ='app'
    content = random.choice(products) + ' ' + str(random.randint(0,100))
    if corp==1 :
      part=1 #все корпоративные заявки записывать в раздел 1
    else:
      part=2 #заявки от частных лиц записывать в раздел 2  
    
  #задаем ключ сообщения для Kafka
  mes_key = str.encode(subject+name)
  
  #создаем полезную нагрузку в JSON 
  data = {'id': id, 'name': name, 'subject': subject, 'content': content}

  #публикуем данные в Kafka
  future = producer.send('InputsTopic', value=data, partition=part, key=mes_key)
  record_metadata = future.get(timeout=60)
  print(f' [x] Sent {record_metadata}')
  print(f' [x] Corp =  {corp}')
  print(f' [x] Payload {data}')

  # сериализуем данные в формат JSON и вычисляем размер сообщения
  message_size = len(json.dumps(data).encode('utf-8'))
  
  # выводим размер сообщения на консоль
  print(f"Message size: {message_size} bytes")

  #повтор через 3 секунды
  time.sleep(3)

Запустив этот скрипт в Google Colab, получаем сообщения, опубликованные в Kafka согласно ранее заданной логике распределения по разделам:

Kafka Python Colab
Публикация данных в Kafka

Проверить, что сообщения опубликованы, можно в GUI-интерфейсе платформы Upstash:

GUI Upstash Kafka
Опубликованные сообщения в Kafka

Предположим, в приложении-потребителе, которое обрабатывает заявки от частных лиц, случился сбой и ему необходимо повторно считать данные с момента 17:30. Как это сделать, рассмотрим далее.

Повторное потребление с определенного момента

Несмотря на наличие поля отметки времени (timestamp), которое проставляется автоматически при публикации сообщения в Kafka, хронологический порядок потребления сообщения привязан не к этому параметру, а к смещению (offset). Поэтому в коде приложения-потребителя следует указать нужную позицию начала чтения сообщений из раздела, используя метод потребителя seek(), в параметрах которого указывается раздел и смещение. Например, считать с позиции смещения 300:

offset=300 # задание нужного смещения
topic_partition = TopicPartition(topic, part)   # указываем имя топика и номер раздела
consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел
consumer.seek(topic_partition, offset)

Однако, на практике мы обычно знаем примерное время, но не точное смещение сообщений. Поэтому необходимо выполнить небольшое преобразование от момента, заданного в привычной людям форме, к смещению. Например, нужно считать данные по заявкам частных лиц, которые были опубликованы в Kafka после 17:30  МСК 11 июня 2023 года. За это отвечает следующий участок кода:

topic_partition = TopicPartition(topic, part)   # указываем имя топика и номер раздела
consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел

# определяем момент времени, с которого начнется чтение
timestamp = datetime(2023, 6, 11, 17, 30, 0).timestamp() * 1000 # переводим в миллисекунды

# определяем смещение для каждой партиции в топике
offsets = consumer.offsets_for_times({topic_partition: timestamp})

# устанавливаем позицию для каждой партиции в топике
if offsets[topic_partition] is not None:
   consumer.seek(topic_partition, offsets[topic_partition].offset)

Весь код приложения-потребителя, который считывает данные по заявкам частных лиц и заносит в определенный лист Google-таблицы, выглядит так:

####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
#установка библиотек 
!pip install kafka-python 

#импорт модулей 
from google.colab import auth
auth.authenticate_user()
import gspread
from google.auth import default
creds, _ = default()

import json
import random
from datetime import datetime

from kafka import KafkaConsumer
from json import loads
from kafka.structs import TopicPartition

####################################ячейка в Google Colab №2 - потребление из Kafka###########################################
#объявление потребителя Kafka
consumer = KafkaConsumer(
  bootstrap_servers=[kafka_url],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username=username,
  sasl_plain_password=password,
  group_id='gr1',
  auto_offset_reset='earliest',
  enable_auto_commit=True
)
topic='InputsTopic'

#Google Sheets Autentificate
gc = gspread.authorize(creds)

# подписка потребителя на определенный раздел topic partition
#все вопросы лежат в разделе 0
#все корпоративные заявки лежат в разделе 1
#заявки от частных лиц лежат в разделе 2
part=2 #задание раздела
topic_partition = TopicPartition(topic, part)   # указываем имя топика и номер раздела
consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел

# определяем момент времени, с которого начнется чтение
timestamp = datetime(2023, 6, 11, 17, 30, 0).timestamp() * 1000 # переводим в миллисекунды

# определяем смещение для каждой партиции в топике
offsets = consumer.offsets_for_times({topic_partition: timestamp})

# устанавливаем позицию для каждой партиции в топике
if offsets[topic_partition] is not None:
   consumer.seek(topic_partition, offsets[topic_partition].offset)

#Открытие заранее созданного файла Гугл-таблицы по идентификатору (взять из его URL, например, у меня это https://docs.google.com/spreadsheets/d/1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM)
sh = gc.open_by_key('1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM')
wks = sh.worksheet("Partition_"+ str(part)) #в какой лист гугл-таблиц будем записывать

#начальный номер строки для записи данных
x=1

#считывание из топика Kafka
for message in consumer:
  payload=message.value.decode("utf-8")
  data=json.loads(payload)
  
  print(f"Offset: {message.offset}, Value: {message.value}")
  print(consumer.position(topic_partition)) # выводим текущее смещение

  print(f"Timestamp: {message.timestamp}, Value: {message.value}")# выводим текущую метку времени

  timestamp = message.timestamp / 1000.0 # переводим метку времени в секунды
  datetime_object = datetime.fromtimestamp(timestamp) # преобразуем метку времени в объект datetime
  formatted_timestamp = datetime_object.strftime('%Y-%m-%d %H:%M:%S.%f') # форматируем вывод метки времени
  print(f"Timestamp: {formatted_timestamp}, Value: {message.value}")

  #вывод исходных данных в консоль Goggle Colab
  print(data)
  
  #парсинг JSON-сообщения полезной нагрузки
  id=data['id']
  name=data['name']
  subject = data['subject']
  content = data['content']
  
  #вывод распарсенных данных в консоль Google Colab
  print('Заявка № ', id, ', клиент ', name, ', тема: ", subject, ', 'содержимое: ', content)

  #обновление данных в в Гугл-таблице
  global x
  #переход на следующую строку в гугл-таблицах
  x = x + 1

  #запись данных в ячейки гугл-таблицы
  wks.update_cell(x,1,id)
  wks.update_cell(x,2,name)
  wks.update_cell(x,3,content)

Запуск этого скрипта в Google Colab выводит результаты считывания на консоль:

Kafka consumer python colab
Потребленные сообщения и некоторые их метаданные

В гугл-таблице потребленные данные также отображаются:

Google Sheets API
Запись потребленных данных в Google-таблицу

Таким образом, используя всего несколько дополнительных методов библиотеки kafka-python, можно считать данные из Kafka повторно с нужного смещения или даже заданного момента времени.

Еще больше полезных приемов администрирования и эксплуатации Apache Kafka для потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту