Как считать данные из Apache Kafka с определенного момента: пишем Python-скрипт

Как считать данные из Apache Kafka с определенного момента: пишем Python-скрипт

    Иногда возникает потребность в повторном чтении данных из Apache Kafka с определенного момента времени. Сегодня рассмотрим, как это сделать, написав простенький Python-скрипт потребления из раздела топика.

    Публикация данных в Kafka

    В качестве примера возьмем ранее рассмотренный в этой статье кейс приема потока обращений в интернет-магазин. Обращения могут представлять собой заявки от частных лиц и корпоративных клиентов на покупку товаров, а также вопросы по работе магазина, доставке или оплате. Все входящие обращения будут публиковаться в топик, который состоит из 3-х разделов. В раздел с номером 0 будут попадать все вопросы, в раздел с номером 1 – все корпоративные заявки, а в раздел 2- частные. Как обычно, я создала топик Kafka на инстансе, развернутом в облачной serverless-платформе Upstash. Топик называется InputsTopic и имеет 3 раздела.

    Топики Kafka Upstash
    Конфигурации созданного топика Kafka на платформе Upstash

    Код продюсера, отправляющего клиентские обращения, написан на Python с использованием библиотеки kafka-python. В этом коде с использованием библиотеки Faker случайно формируются сообщения от частных или корпоративных клиентов и публикуются в топик каждые 3 секунды. Разделение сообщений по разделам реализуется с помощью явного указания раздела в методе продюсера send(). В этом случае ключ партиционирования игнорируется.

    Сперва установим библиотеки и импортируем необходимые для работы модули:

    ####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
    #установка библиотек
    !pip install kafka-python 
    !pip install faker
    
    #импорт модулей 
    import json
    import random
    from datetime import datetime
    import time
    from time import sleep
    from kafka import KafkaProducer
    
    # Импорт модуля faker
    from faker import Faker

    Затем напишем сам скрипт формирования и публикации заявок:

    ####################################ячейка в Google Colab №2 - публикация сообщений в Kafka###########################################
    # объявление продюсера Kafka
    producer = KafkaProducer(
      bootstrap_servers=[kafka_url],
      sasl_mechanism='SCRAM-SHA-256',
      security_protocol='SASL_SSL',
      sasl_plain_username=username,
      sasl_plain_password=password,
      value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    )
    
    # Создание объекта Faker с использованием провайдера адресов для России
    fake = Faker()
    
    #списки полей в заявке
    products = ['bred', 'garlic', 'oil', 'apples', 'water', 'soup', 'dress', 'tea', 'cacao', 'coffee', 'bananas', 'butter', 'eggs', 'oatmeal']
    questions = ['payment', 'delivery', 'discount', 'vip', 'staff']
    
    #бесконечный цикл публикации данных
    while True:
      #подготовка данных для публикации в JSON-формате
      now=datetime.now()
      id=now.strftime("%m/%d/%Y %H:%M:%S")
    
      content = ''
      theme = ''
      corp = 0
      part = 0
    
      #подготовка списка возможных ключей маршрутизации (routing keys)
      corp = random.choice([1,0])
    
      if corp==1 :
        #name = random.choice(names_corp)
        name=fake.company()
        routing_keys = ['app' + '.company.' + name, 'question']
      else:
        #name = random.choice(names_fiz)
        name=fake.name()
        routing_keys = ['app', 'question']
    
      #случайный выбор одного из ключей маршрутизации (из routing keys)
      subject=random.choice(routing_keys)
    
      #добавление дополнительных данных для заголовка и тела сообщения в зависимости от темы заявки
      if subject=='question':
        theme = random.choice(questions)
        content = 'Hello, I have a question about ' + theme
        part=0 #все вопросы записывать в раздел 0
      else :
        theme ='app'
        content = random.choice(products) + ' ' + str(random.randint(0,100))
        if corp==1 :
          part=1 #все корпоративные заявки записывать в раздел 1
        else:
          part=2 #заявки от частных лиц записывать в раздел 2  
        
      #задаем ключ сообщения для Kafka
      mes_key = str.encode(subject+name)
      
      #создаем полезную нагрузку в JSON 
      data = {'id': id, 'name': name, 'subject': subject, 'content': content}
    
      #публикуем данные в Kafka
      future = producer.send('InputsTopic', value=data, partition=part, key=mes_key)
      record_metadata = future.get(timeout=60)
      print(f' [x] Sent {record_metadata}')
      print(f' [x] Corp =  {corp}')
      print(f' [x] Payload {data}')
    
      # сериализуем данные в формат JSON и вычисляем размер сообщения
      message_size = len(json.dumps(data).encode('utf-8'))
      
      # выводим размер сообщения на консоль
      print(f"Message size: {message_size} bytes")
    
      #повтор через 3 секунды
      time.sleep(3)

    Запустив этот скрипт в Google Colab, получаем сообщения, опубликованные в Kafka согласно ранее заданной логике распределения по разделам:

    Kafka Python Colab
    Публикация данных в Kafka

    Проверить, что сообщения опубликованы, можно в GUI-интерфейсе платформы Upstash:

    GUI Upstash Kafka
    Опубликованные сообщения в Kafka

    Предположим, в приложении-потребителе, которое обрабатывает заявки от частных лиц, случился сбой и ему необходимо повторно считать данные с момента 17:30. Как это сделать, рассмотрим далее.

    Повторное потребление с определенного момента

    Несмотря на наличие поля отметки времени (timestamp), которое проставляется автоматически при публикации сообщения в Kafka, хронологический порядок потребления сообщения привязан не к этому параметру, а к смещению (offset). Поэтому в коде приложения-потребителя следует указать нужную позицию начала чтения сообщений из раздела, используя метод потребителя seek(), в параметрах которого указывается раздел и смещение. Например, считать с позиции смещения 300:

    offset=300 # задание нужного смещения
    topic_partition = TopicPartition(topic, part)   # указываем имя топика и номер раздела
    consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел
    consumer.seek(topic_partition, offset)

    Однако, на практике мы обычно знаем примерное время, но не точное смещение сообщений. Поэтому необходимо выполнить небольшое преобразование от момента, заданного в привычной людям форме, к смещению. Например, нужно считать данные по заявкам частных лиц, которые были опубликованы в Kafka после 17:30  МСК 11 июня 2023 года. За это отвечает следующий участок кода:

    topic_partition = TopicPartition(topic, part)   # указываем имя топика и номер раздела
    consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел
    
    # определяем момент времени, с которого начнется чтение
    timestamp = datetime(2023, 6, 11, 17, 30, 0).timestamp() * 1000 # переводим в миллисекунды
    
    # определяем смещение для каждой партиции в топике
    offsets = consumer.offsets_for_times({topic_partition: timestamp})
    
    # устанавливаем позицию для каждой партиции в топике
    if offsets[topic_partition] is not None:
       consumer.seek(topic_partition, offsets[topic_partition].offset)

    Весь код приложения-потребителя, который считывает данные по заявкам частных лиц и заносит в определенный лист Google-таблицы, выглядит так:

    ####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
    #установка библиотек 
    !pip install kafka-python 
    
    #импорт модулей 
    from google.colab import auth
    auth.authenticate_user()
    import gspread
    from google.auth import default
    creds, _ = default()
    
    import json
    import random
    from datetime import datetime
    
    from kafka import KafkaConsumer
    from json import loads
    from kafka.structs import TopicPartition
    
    ####################################ячейка в Google Colab №2 - потребление из Kafka###########################################
    #объявление потребителя Kafka
    consumer = KafkaConsumer(
      bootstrap_servers=[kafka_url],
      sasl_mechanism='SCRAM-SHA-256',
      security_protocol='SASL_SSL',
      sasl_plain_username=username,
      sasl_plain_password=password,
      group_id='gr1',
      auto_offset_reset='earliest',
      enable_auto_commit=True
    )
    topic='InputsTopic'
    
    #Google Sheets Autentificate
    gc = gspread.authorize(creds)
    
    # подписка потребителя на определенный раздел topic partition
    #все вопросы лежат в разделе 0
    #все корпоративные заявки лежат в разделе 1
    #заявки от частных лиц лежат в разделе 2
    part=2 #задание раздела
    topic_partition = TopicPartition(topic, part)   # указываем имя топика и номер раздела
    consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел
    
    # определяем момент времени, с которого начнется чтение
    timestamp = datetime(2023, 6, 11, 17, 30, 0).timestamp() * 1000 # переводим в миллисекунды
    
    # определяем смещение для каждой партиции в топике
    offsets = consumer.offsets_for_times({topic_partition: timestamp})
    
    # устанавливаем позицию для каждой партиции в топике
    if offsets[topic_partition] is not None:
       consumer.seek(topic_partition, offsets[topic_partition].offset)
    
    #Открытие заранее созданного файла Гугл-таблицы по идентификатору (взять из его URL, например, у меня это https://docs.google.com/spreadsheets/d/1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM)
    sh = gc.open_by_key('1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM')
    wks = sh.worksheet("Partition_"+ str(part)) #в какой лист гугл-таблиц будем записывать
    
    #начальный номер строки для записи данных
    x=1
    
    #считывание из топика Kafka
    for message in consumer:
      payload=message.value.decode("utf-8")
      data=json.loads(payload)
      
      print(f"Offset: {message.offset}, Value: {message.value}")
      print(consumer.position(topic_partition)) # выводим текущее смещение
    
      print(f"Timestamp: {message.timestamp}, Value: {message.value}")# выводим текущую метку времени
    
      timestamp = message.timestamp / 1000.0 # переводим метку времени в секунды
      datetime_object = datetime.fromtimestamp(timestamp) # преобразуем метку времени в объект datetime
      formatted_timestamp = datetime_object.strftime('%Y-%m-%d %H:%M:%S.%f') # форматируем вывод метки времени
      print(f"Timestamp: {formatted_timestamp}, Value: {message.value}")
    
      #вывод исходных данных в консоль Goggle Colab
      print(data)
      
      #парсинг JSON-сообщения полезной нагрузки
      id=data['id']
      name=data['name']
      subject = data['subject']
      content = data['content']
      
      #вывод распарсенных данных в консоль Google Colab
      print('Заявка № ', id, ', клиент ', name, ', тема: ", subject, ', 'содержимое: ', content)
    
      #обновление данных в в Гугл-таблице
      global x
      #переход на следующую строку в гугл-таблицах
      x = x + 1
    
      #запись данных в ячейки гугл-таблицы
      wks.update_cell(x,1,id)
      wks.update_cell(x,2,name)
      wks.update_cell(x,3,content)

    Запуск этого скрипта в Google Colab выводит результаты считывания на консоль:

    Kafka consumer python colab
    Потребленные сообщения и некоторые их метаданные

    В гугл-таблице потребленные данные также отображаются:

    Google Sheets API
    Запись потребленных данных в Google-таблицу

    Таким образом, используя всего несколько дополнительных методов библиотеки kafka-python, можно считать данные из Kafka повторно с нужного смещения или даже заданного момента времени.

    Еще больше полезных приемов администрирования и эксплуатации Apache Kafka для потоковой аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

    [elementor-template id=»13619″]