Как измерить задержку потребления из Apache Kafka: простой пример

публикация сообщений в Kafka и задержка потребления, как измерить задержку потребления в Apache Kafka, потребление из Kafka в реальном времени, обучение Apache Kafka, Apache Kafka для дата-инженеров и разработчиков, примеры сжатия сообщения Apache Kafka, курсы по Apache Kafka, Apache Kafka разработчик примеры курсы обучение, Школа Больших Данных Учебный центр Коммерсант

Насколько быстро работает Apache Kafka в облачной платформе Upstash: пишем простой пример для пары продюсер-потребитель на Python и измеряем задержку. Миллисекундное отставание при публикации и минутная задержка обработки данных на потребителе.

Задержка публикации сообщений в Kafka

Чтобы измерить задержку асинхронного обмена данными в системе с EDA-архитектурой из продюсера и потребителя Apache Kafka, я написала пару простых Python-приложений. Задержка – это разница между моментом происхождения события в реальном мире, материализованным в виде публикации сообщения продюсером и фактическим потреблением этих данных приложением-потребителем. На самом деле эта задержка складывается из двух компонентов:

  • задержка публикации – разница между моментом происхождения события и его записью в топике Kafka;
  • задержка потребления – разница между получением сообщения из Kafka и его фактической обработкой.

Чтобы сфокусироваться именно на этих временных показателях, логика публикации сообщений очень простая, без распределения сообщений по разделам топика Kafka. Приложение-продюсер отправляет в явно заданный раздел топика Kafka сообщение, которое содержит случайное название компании, сгенерированное с помощью метода fake.company() из библиотеки Faker. Для отслеживания времени происхождения события в реальном мире, т.е. когда продюсер его сгенерировал, используется переменная producer_publish_time, полученная из преобразования результата функции time.time(), которая возвращает текущее время в секундах, прошедших с начала эпохи Unix (1 января 1970 года 00:00:00 UTC). Это значение называется временной меткой (timestamp). Для наглядности в рамках цикла публикации присвоим каждому сообщению итерируемый порядковый номер. Таким образом, полезная нагрузка, упакованная в формате JSON, имеет следующую схему:

{
    "$schema": "http://json-schema.org/draft-06/schema#",
    "$ref": "#/definitions/Welcome4",
    "definitions": {
        "Welcome4": {
            "type": "object",
            "additionalProperties": false,
            "properties": {
                "number": {
                    "type": "integer"
                },
                "producer_publish_time": {
                    "type": "string"
                },
                "content": {
                    "type": "string"
                }
            },
            "required": [
                "number",
                "content",
                "producer_publish_time"
            ],
        }
    }
}

Код приложения-продюсера, который отправляет данные в 1-ый раздел топика Apache Kafka с названием test, запущенный в Google Colab, выглядит так:

####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
#установка библиотек
!pip install kafka-python
!pip install faker

#импорт модулей
import json
import random
from datetime import datetime
import time
from time import sleep
from kafka import KafkaProducer

# Импорт модуля faker
from faker import Faker
####################################ячейка в Google Colab №2 - публикация сообщений в Kafka###########################################
# объявление продюсера Kafka
producer = KafkaProducer(
  bootstrap_servers=[kafka_url],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username=username,
  sasl_plain_password=password,
  value_serializer=lambda v: json.dumps(v).encode('utf-8'),
  #batch_size=300
)
topic='test'

fake=Faker()

number=0

#бесконечный цикл публикации данных
while True:
  start_time = time.time()  # запоминаем время начала отправки сообщения

  #подготовка данных для публикации в JSON-формате
  producer_publish_time = time.strftime("%m/%d/%Y %H:%M:%S", time.localtime(start_time))

  number=number+1

  #задаем распределение по разделам
  part=1

  name=fake.company()
  content = str('сообщение от ' + name)

  #создаем полезную нагрузку в JSON
  data = {'number': number, 'producer_publish_time': producer_publish_time,'content': content}

  # публикуем данные в Kafka
  future = producer.send(topic, value=data, partition=part, key=None)
  record_metadata = future.get(timeout=60)
  print(f' [x] Sent {record_metadata}')
  print(f' [x] Payload {data}')

  end_time = time.time()  # запоминаем время окончания отправки сообщения
  package_size = len(json.dumps(data).encode('utf-8'))  # размер пакета в байтах
  delay = end_time - start_time  # задержка в секундах

  print(f'Размер пакета в байтах : {package_size}')
  print(f'Задержка: {delay} секунд')

####################################ячейка в Google Colab №3 - закрытие соединения###########################################
#Закрываем соединения
producer.close()
Kafka Python-producer Google Colab
Публикация данных в Kafka

Задержка публикации для пакета сообщений 150-170 байт составила около 0,02 секунды, кроме самого первого сообщения. Далее напишем код потребителя, чтобы понять задержку потребления.

Задержка потребления сообщений

Код потребителя тоже будет очень простым без сложных преобразований полученных данных. Для наглядности приложение-потребитель будет записывать потребленные сообщения в Google-таблицу, добавляя отметку времени приема сообщения в Kafka и текущее время, т.е. момент, когда фактически потребитель обрабатывает полученные данные. Текущее время вычисляется с помощью функции now(), которая возвращает текущую дату и время в формате объекта datetime. Python-код приложения-потребителя будет выглядеть так:

####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
#установка библиотек
!pip install kafka-python

#импорт модулей
from google.colab import auth
auth.authenticate_user()
import gspread
from google.auth import default
creds, _ = default()

import json
import random
from datetime import datetime

from kafka import KafkaConsumer
from json import loads
from kafka.structs import TopicPartition
####################################ячейка в Google Colab №2 - потребление из Kafka###########################################
#объявление потребителя Kafka
consumer = KafkaConsumer(
  bootstrap_servers=[kafka_url],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username=username,
  sasl_plain_password=password,
  group_id='gr1',
  auto_offset_reset='earliest',
  enable_auto_commit=True
)
topic='test'

#Google Sheets Autentificate
gc = gspread.authorize(creds)

# подписка потребителя на определенный раздел topic partition
part=1 #задание раздела
topic_partition = TopicPartition(topic, part)   # указываем имя топика и номер раздела
consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел

#Открытие заранее созданного файла Гугл-таблицы по идентификатору (взять из его URL, например, у меня это https://docs.google.com/spreadsheets/d/1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM)
sh = gc.open_by_key('1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM')
wks = sh.worksheet("test_"+ str(part)) #в какой лист гугл-таблиц будем записывать

#начальный номер строки для записи данных
x=1

# обработка сообщений из Kafka
for message in consumer:
    try:
        # распаковка сообщения
        payload = message.value.decode("utf-8")
        data = json.loads(payload)

        # вывод сообщения в консоль
        print(f"Offset: {message.offset}, Value: {message.value}")
        print(consumer.position(topic_partition))
        print(f"Timestamp: {message.timestamp}, Value: {message.value}")
        timestamp = message.timestamp / 1000.0
        datetime_object = datetime.fromtimestamp(timestamp)
        formatted_timestamp = datetime_object.strftime('%Y-%m-%d %H:%M:%S.%f')
        print(f"Timestamp: {formatted_timestamp}, Value: {message.value}")

        # парсинг сообщения
        number = data['number']
        producer_publish_time = data['producer_publish_time']
        content = data['content']

        now=datetime.now()
        consuming_time=now.strftime("%m/%d/%Y %H:%M:%S")

        # вывод распарсенных данных в консоль
        print(f'Сообщение № {number}, время публикации: {producer_publish_time}, Kafka timestamp {formatted_timestamp}, сейчас: {consuming_time}, содержимое: {content}')

        # обновление данных в Google Sheets
        x += 1
        wks.update_cell(x, 1, number)
        wks.update_cell(x, 2, producer_publish_time)
        wks.update_cell(x, 3, formatted_timestamp)
        wks.update_cell(x, 4, consuming_time)
        wks.update_cell(x, 5, content)
    except Exception as e:
        # запись ошибок в лог-файл на Google Диске
        error_str = f"Error: {str(e)}, Offset: {message.offset}, Value: {message.value}\n"
        with open("dlq.txt", "a") as f:
            f.write(error_str)
        print(f"Error: {str(e)}")
###################################ячейка в Google Colab №3 - закрытие соединения###########################################
#отписываем потребителя и закрываем соединение
consumer.unsubscribe()
consumer.close()

Чтобы избежать сбоя потребителя из-за изменения схемы данных полезной нагрузки, цикл потребления сообщений включает конструкцию try-except. При возникновении ошибки парсинга, т.е. исключения сообщение с измененной схемой записывается в лог-файл на Google Диске. Посмотрим вывод отладочной информации в консоли после запуска этого скрипта в Google Colab:

Kafka Python-consumer Google Colab
Потребление сообщений из Kafka

Таким образом, видно, что потребитель уже с первых сообщений начинает отставать от продюсера. Наиболее наглядно этот вывод подтверждает визуализация результатов потребления в Google-таблице:

from Kafka to Google Colab
Вывод данных с отметками времени публикации и потребления в Google-таблицу

Визуализация показывает, что за 1 секунду продюсер отправляет в Kafka примерно 43 сообщения. А потребитель потребляет лишь 1 сообщение в секунду. Поэтому задержка между моментом происхождения события, его публикации в топике и моментом фактической обработки данных нарастает интегрально и приводит к тому, что общая задержка системы возрастает и уже измеряется минутами. Например, мой Python-скрипт продюсера опубликовал в Kafka 1108 сообщений, проработав с 10:12:50 до 10:13:16, т.е. всего минуту и 16 секунд. Однако, фактическая обработка всех этих сообщений приложением-потребителем завершилась в 10:41:57, т.е. интегральная задержка составила 28 минут и 41 секунду. Это уже совсем не обработка в реальном времени и даже не near real-time. Впрочем, скорей всего такое расхождение связано с долгой записью данных в Google-таблицы, которая выполняется для каждого полученного сообщения и включает накладные расходы установки соединения. Поэтому для потоковой записи событий в системы-приемники обычно используются специализированные API в библиотеках или коннекторах, например, Kafka Connect Neo4j Connector, PostgreSQL Source (JDBC) Connector for Confluent Cloud и пр. А вообще общая задержка системы зависит от многих факторов:

  • где и как развернута Apache Kafka (локально или в облаке);
  • объем передаваемых данных и формат сериализации сообщения;
  • сложность бизнес-логики потребителя, который должен не просто считать сообщение из Kafka, но и преобразовать эти данные или отреагировать на них как-то иначе, чтобы потом перейти к следующему сообщению;
  • каков фактор репликации, ожидает ли продюсер подтверждений, о том, что все брокеры-подписчики получили свою копию данных с брокера-лидера;
  • криптографические операции (шифрование на строне продюсера и расшифровка на стороне потребителя), пример которых разбирается в новой статье
  • пропускная способность сетей передачи данных, как между продюсером и Kafka, так и между потребителем и Kafka.

Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту