FastStream для работы с Kafka: практический пример

Kafka курсы примеры обучение, Kafka для администратора кластера, Kafka примеры курсы обучение дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Чем хороша Python-библиотека FastStream и как ее использовать для потоковой публикации данных в Apache Kafka: практический пример асинхронной отправки JSON-сообщений.

О библиотеке FastStream

Для Python-разработчиков есть довольно много библиотек, позволяющих взаимодействовать с Apache Kafka: kafka-python, confluent-kafka, Quix Streams и другие клиенты. О сравнении kafka-python и confluent-kafka я писала здесь, а о Quix Streams – здесь. Сегодня хочу рассказать о FastStream  — еще одной интересной библиотеке для создания потоковых конвейеров на Python. Этот простой, но очень мощный фреймворк облегчает разработку продюсеров и потребителей, предоставляя унифицированный API для работы с несколькими брокерами сообщений: Kafka, RabbitMQ, NATS и Redis. Взаимодействие с Apache Kafka реализуется на основе библиотек AIOKafka и Confluent. Однако, в отличие от AIOKafka и Confluent, в FastStream основным объектом, вокруг которого строится потоковый конвейер, является брокер.

Брокеры FastStream предоставляют удобные декораторы функций: @broker.subscriber(…) и @broker.publisher(…), которые автоматизируют следующие функции при разработке кода:

  • публикация и потребление сообщений в очереди событий;
  • кодирование и декодирование сообщений в формате JSON;
  • генерация документации AsyncAPI, о чем я подробно рассказываю в новой статье.

Эти декораторы брокеров упрощают определение логики обработки для продюсеров и потребителей, позволяя сосредоточиться на основной бизнес-логике потокового приложения. Также FastStream использует Pydantic трансформации входных данных в формате JSON в объекты Python, что упрощает работу со структурированными данными, позволяя сериализовать входные сообщения, просто используя аннотации типов.

FastStream позволяет обрабатывать большие объемы данных с минимальными задержками, а также поддерживает многопоточность и параллельную обработку данных, что значительно ускоряет выполнение задач на многоядерных системах. Кроме того, фреймворк можно можно использовать как часть библиотеки FastAPI для быстрого создания веб-приложений в стиле REST. Для этого нужно всего лишь импортировать необходимый StreamRouter и объявить обработчик сообщений с теми же параметрами и декораторами: @router.subscriber(…) и @router.publisher(…). Об этом я расскажу в другой раз, а пока рассмотрим пример публикации данных в топик Kafka с помощью FastStream.

Пример публикации данных в Kafka

Как обычно, в качестве примера возьму публикацию клиентских обращений в топик InputsTopic: заявки на покупку продуктов в интернет-магазине от физических или юридических лиц или вопросы покупателей. Топик InputsTopic разделен на 3 раздела, маршрутизация сообщений по которым зависит от типа обращения:

  • корпоративные заявки от юрлиц публикуются в раздел 0;
  • заявки от частных лиц публикуются в раздел 1;
  • все вопросы публикуются в раздел 2.
Схема потоковой публикации данных в Kafka
Схема потоковой публикации данных в Kafka

Данные обращений в формате JSON имеют похожий, но немного отличающийся набор полей.

Схемы публикуемых JSON-сообщений
Схемы публикуемых JSON-сообщений

Чтобы вы могли легко повторить это упражнение, буду писать и запускать Python-код в интерактивной среде Google Colab. Сперва установим нужные пакеты и импортируем модули:

!pip install faststream[kafka]
!pip install faker
!pip install nest_asyncio

#импорт модулей
import json
import random
import time
import asyncio
import nest_asyncio
import logging
import ssl
from datetime import datetime
from time import sleep
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker
from pydantic import BaseModel, Field, PositiveInt, NonNegativeFloat
from google.colab import userdata
from typing import List
from faststream.security import BaseSecurity, SASLScram256
from dataclasses import dataclass, asdict

# Импорт модуля faker
from faker import Faker
from faker.providers.address.ru_RU import Provider

# Настройка логирования
logging.basicConfig(level=logging.INFO)

Затем зададим списки значений и определим параметры подключения к Kafka.

# Определение параметров подключения к Kafka
kafka_url = userdata.get('kafka_url')
username = userdata.get('username')
password = userdata.get('password')

# Списки полей в заявке
products = [
    "яблоки желтые", "малина", "вода", "хлеб белый","хлеб серый", "креветки", "форель",
    "апельсины", "кета","курица","яйцо перепелиное","яйцо куриное","лаваш",
    "булка сдобная","булка сахарная","помидоры бакинские","помидоры чери","огурцы",
    "перец сладкий","перец острый","перец болгарский","мандарины","укроп свежий",
    "укроп сушеный","клубника свежая","клубника мороженная","мороженое","картошка",
    "морковь", "свекла","пангасиус","семга","кальмар замороженный","горошек зеленый",
    "смородина черная","смородина красная","соль поваренная пищевая йодированная",
    "чай черный байховый","чай зеленый","чай красный","кофе","кофе с молоком","какао",
    "молоко","кефир","сыр с плесенью","сыр плавленый","сыр твердый","сыр мягкий",
    "яблоки красные","яблоки зеленые","яблоки сушеные","икра красная", "икра черная",
    "икра заморская баклажанная","масло сливочное","масло оливковое","масло подсолнечное",
    "масло кокосовое","орех грецкий","орех бразильский", "лист лавровый","куркума",
    "кукуруза","печенье сладкое","пряники сдобные","тесто слоеное","варенц",
    "ряженка","снежок","шоколад молочный"
]

questions = ["оплата", "доставка", "скидки", "сервис", "персонал"]

# Сопоставление темы с приоритетом
priority_map = {
    "оплата": 1,
    "доставка": 2,
    "скидки": 3,
    "сервис": 4,
    "персонал": 5
}

#Параметры подключения к Kafka
class SecurityConfig:
       def __init__(self, sasl_mechanism, security_protocol, sasl_plain_username, sasl_plain_password, use_ssl):
           self.sasl_mechanism = sasl_mechanism
           self.security_protocol = security_protocol
           self.sasl_plain_username = sasl_plain_username
           self.sasl_plain_password = sasl_plain_password
           self.use_ssl = use_ssl

ssl_context = ssl.create_default_context()
security = SASLScram256(
    ssl_context=ssl_context,
    username=username,
    password=password,
)

#Определение топика Kafka
topic = "InputsTopic"

# Конструктор KafkaBroker с объектом безопасности
broker = KafkaBroker([kafka_url], security=security)

В этот раз параметры для подключения к Kafka (URL-адрес bootstrap-сервера, логин и пароль) я сохранила в хранилище секретов Google Colab, получая в коде значения этих ключей.

Использование хранилища секретов Google Colab
Использование хранилища секретов Google Colab

Далее напишем код асинхронной публикации сообщений в Kafka, используя API FastStream на основе AIOKafka – python-клиента, который базируется на библиотеке kafka-python. Но, в отличие от kafka-python, AIOKafka работает асинхронно благодаря использованию asyncio, что позволяет выполнять операции без блокировки основного потока. Это особенно полезно для приложений, требующих высокой производительности и параллельности. AIOKafka предоставляет асинхронные интерфейсы для продюсеров и потребителей, что требует использования синтаксиса async/await.

Для типизации публикуемых данных будем использовать классы, чтобы избежать дублирования кода и поддерживать единообразие между различными типами клиентских обращений.

# Определение классов
# Базовый класс для запросов
@dataclass
class RequestData:
    moment: str
    name: str
    subject: str
    content: str

# Класс для корпоративных запросов
@dataclass
class CorporateRequest(RequestData):
    inn: str

# Класс для частных запросов
@dataclass
class PrivateRequest(RequestData):
    phone_number: str
    age: int

# Класс для заявок с темой "question"
@dataclass
class QuestionRequest(RequestData):
    priority: int

# Инициализация FastStream приложения
app = FastStream(broker)

# Создание продюсера для топика InputsTopic
publisher = broker.publisher(topic)

# Инициализация Faker
fake = Faker("ru_RU")

def generate_fake_data() -> RequestData:
    now = datetime.now()
    moment = now.strftime("%m/%d/%Y %H:%M:%S")

    corp = random.choice([1, 0])

    if corp == 1:
        name = fake.company()
    else:
        name = fake.name()

    subject = random.choice(["app", "question"])

    if subject == "question":
        theme = random.choice(questions)
        content = f"Здравствуйте, у меня вопрос про {theme}"
        part = 2  # все вопросы в раздел 2
    else:
        theme = "app"
        content = f"{random.choice(products)} {random.randint(0, 100)}"
        if corp == 1:
            part = 0  # корпоративные заявки в раздел 0
        else:
            part = 1  # частные заявки в раздел 1

    if theme == "app":
        if corp == 1:
            inn = fake.businesses_inn()
            request = CorporateRequest(
                moment=moment,
                name=name,
                subject=subject,
                content=content,
                inn=inn
            )
        else:
            phone_number = fake.phone_number()
            age = random.randint(18, 100)
            request = PrivateRequest(
                moment=moment,
                name=name,
                subject=subject,
                content=content,
                phone_number=phone_number,
                age=age
            )
    else:
        # Вычисление приоритета
        priority = priority_map.get(theme, 0)  # 0 - значение по умолчанию, если тема не найдена
        # Преобразуем в QuestionRequest, используя ранее определённые переменные
        request = QuestionRequest(
            moment=moment,
            name=name,
            subject=subject,
            content=content,
            priority=priority
        )
    return request, part

# Публикация данных в Kafka
async def publish_data():
    while True:
        try:
            request, part = generate_fake_data()
            msg = json.dumps(asdict(request), ensure_ascii=False)
            await publisher.publish(msg, partition = part)  # Используем publisher для публикации
            print(f"В раздел {part} опубликовано {msg}")
        except Exception as e:
            logging.error(f"Ошибка при публикации данных: {e}")
        await asyncio.sleep(2)  # Задержка перед следующим сообщением (5 секунд)

async def main():
    try:
        # Устанавливаем соединение с брокером
        await broker.connect()
        print("Соединение с Kafka установлено.")

        # Запускаем процесс публикации данных
        await publish_data()
    except Exception as e:
        logging.error(f"Ошибка при подключении или публикации: {e}")

if __name__ == "__main__":
    import nest_asyncio
    nest_asyncio.apply()
    try:
        asyncio.run(main())
    except RuntimeError as e:
        if "asyncio.run() cannot be called from a running event loop" in str(e):
            loop = asyncio.get_event_loop()
            loop.create_task(main())
        else:
            raise

В этом коде асинхронная функция publish_data() постоянно в бесконечном цикле каждые 2 секунды генерирует фейковые данные и вычисляет номер раздела топика Kafka, преобразует сообщение в JSON-строку, выводит информацию о публикации данных и, при возникновении ошибки логирует её.

Запуск FastStream-продюсера в Colab
Запуск FastStream-продюсера в Colab

Благодаря методу asdict из модуля dataclasses, объекты классов легко преобразуются в словари, что упрощает их сериализацию в формат JSON для отправки в брокер. Аннотации типов в @dataclass также используются для автоматической генерации спецификации AsyncApi, что я показываю здесь. А в заключение отмечу, что сообщения успешно опубликованы в Kafka.

Просмотр опубликованных в Kafka сообщений
Просмотр опубликованных в Kafka сообщений

Научитесь администрированию и эксплуатации Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://faststream.airt.ai/latest/faststream/
  2. https://github.com/aio-libs/aiokafka
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту