Как получить спецификацию AsyncAPI из кода с помощью декораторов функций публикации и потребления сообщений средствами Python-библиотеки FastStream: простой пример потокового конвейера на Apache Kafka.
Еще раз про FastStream и спецификацию AsyncAPI
Вчера я рассказывала про Python-библиотеку FastStream для разработки потоковых конвейеров на Apache Kafka, RabbitMQ, NATS и Redis. Помимо мощного, но довольно простого API многопоточной асинхронной обработки данных, FastStream также позволяет автоматически генерировать спецификацию AsyncAPI прямо из кода. Эта спецификация декларативно в формате YAML или JSON описывает ключевые сущности потоковой асинхронной интеграции приложений:
- параметры сервера, который обеспечивает обмен сообщениями между продюсерами и потребителями;
- операции публикации и потребления сообщений;
- схемы публикуемых и потребляемых сообщений.
Спецификация AsyncAPI похожа на OpenAPI и основана на ней, кроме ключевых отличий синхронного и асинхронного взаимодействия. Параметры канала AsyncAPI эквивалентны параметрам пути OpenAPВ, понятия запроса и cookie отсутствуют, а параметры заголовка могут быть определены в объекте сообщения. FastStream позволяет автоматически сгенерировать такую спецификацию с использованием декораторов функций потребления и публикации сообщений: @broker.subscriber(…) и @broker.publisher(…).
Постановка задачи
Чтобы понять, как это работает, в качестве наглядного примера возьмем типичный кейс из прошлой статьи про публикацию клиентских обращений в топик Kafka: заявки на покупку продуктов в интернет-магазине от физических или юридических лиц или вопросы покупателей. Топик под названием InputsTopic разделен на 3 раздела, маршрутизация сообщений по которым зависит от типа обращения:
- корпоративные заявки от юрлиц публикуются в раздел 0;
- заявки от частных лиц публикуются в раздел 1;
- все вопросы публикуются в раздел 2.
Данные обращений в формате JSON имеют похожий, но немного отличающийся набор полей.
Практическая реализация
В качестве среды разработки будем использовать Google Colab. Сначала установим необходимые пакеты и импортируем модули:
!pip install faststream[kafka] !pip install faker !pip install nest_asyncio # Настройка логирования logging.basicConfig(level=logging.INFO)
Затем сформируем Python-файл с определением потокового конвейера. В отличие от прошлой статьи, задекларируем здесь не только публикацию сообщений в Kafka, но и их потребление, чтобы отразить это в генерируемой спецификации AsyncAPI. А вот описывать саму логику формирования полезной нагрузки нет смысла. Поэтому код для автогенерации спецификации AsynAPI будет содержать только декларации сервера, операции публикации и потребления сообщений и описания их структур данных.
#файл для автогенерации спецификации AsyncAPI %%writefile kafka_stream_pipeline.py #импорт модулей import json import random import time import asyncio import nest_asyncio import logging import ssl from datetime import datetime from time import sleep from faststream import FastStream, Logger from faststream.kafka import KafkaBroker, TopicPartition from faststream.security import BaseSecurity, SASLScram256 from dataclasses import dataclass, asdict from typing import Union # Импорт модуля faker from faker import Faker from faker.providers.address.ru_RU import Provider # Определение параметров подключения к Kafka kafka_url="kafka-host:9092" username="kafka-user" password="kafka-user-password" #Параметры подключения к Kafka class SecurityConfig: def __init__(self, sasl_mechanism, security_protocol, sasl_plain_username, sasl_plain_password, use_ssl): self.sasl_mechanism = sasl_mechanism self.security_protocol = security_protocol self.sasl_plain_username = sasl_plain_username self.sasl_plain_password = sasl_plain_password self.use_ssl = use_ssl ssl_context = ssl.create_default_context() security = SASLScram256( ssl_context=ssl_context, username=username, password=password, ) #Определение топика Kafka topic = "InputsTopic" # Конструктор KafkaBroker с объектом безопасности broker = KafkaBroker([kafka_url], security=security) # Инициализация FastStream приложения app = FastStream(broker) # Определение классов # Базовый класс для запросов @dataclass class RequestData: moment: str name: str subject: str content: str # Класс для корпоративных запросов @dataclass class CorporateRequest(RequestData): inn: str # Класс для частных запросов @dataclass class PrivateRequest(RequestData): phone_number: str age: int # Класс для заявок с темой "question" @dataclass class QuestionRequest(RequestData): priority: int # Варианты схем публикуемых сообщений RequestSchema = Union[CorporateRequest, PrivateRequest, QuestionRequest] # Декорированная функция для публикации данных @broker.publisher(topic, schema = RequestSchema) async def publish_fake_data(msg: RequestSchema): logging.info(msg) # Декорированная функция для потребления данных из раздела 0 @broker.subscriber(partitions=[TopicPartition(topic, 0)] ) async def questions_processing(msg: CorporateRequest): logging.info(msg) # Декорированная функция для потребления данных из раздела 1 @broker.subscriber(partitions=[TopicPartition(topic, 1)] ) async def corp_apps_processing(msg: PrivateRequest): logging.info(msg) # Декорированная функция для потребления данных из раздела 2 @broker.subscriber(partitions=[TopicPartition(topic, 2)] ) async def fiz_apps_processing(msg: QuestionRequest): logging.info(msg)
Выполним команду генерации AsyncAPI-спецификации средствами библиотеки FastStream:
!faststream docs gen kafka_stream_pipeline:app
В результате выполнения этой команды получим JSON-файл сгенерированной спецификации.
Спецификация представляет собой JSON-документ по стандарту AsyncAPI 2.6.0:
{ "asyncapi": "2.6.0", "defaultContentType": "application/json", "info": { "title": "FastStream", "version": "0.1.0", "description": "" }, "servers": { "development": { "url": "kafka-host:9092", "protocol": "kafka-secure", "protocolVersion": "auto", "security": [ { "scram256": [] } ] } }, "channels": { "InputsTopic:QuestionsProcessing": { "servers": [ "development" ], "bindings": { "kafka": { "topic": "InputsTopic", "bindingVersion": "0.4.0" } }, "subscribe": { "message": { "$ref": "#/components/messages/InputsTopic:QuestionsProcessing:Message" } } }, "InputsTopic:CorpAppsProcessing": { "servers": [ "development" ], "bindings": { "kafka": { "topic": "InputsTopic", "bindingVersion": "0.4.0" } }, "subscribe": { "message": { "$ref": "#/components/messages/InputsTopic:CorpAppsProcessing:Message" } } }, "InputsTopic:FizAppsProcessing": { "servers": [ "development" ], "bindings": { "kafka": { "topic": "InputsTopic", "bindingVersion": "0.4.0" } }, "subscribe": { "message": { "$ref": "#/components/messages/InputsTopic:FizAppsProcessing:Message" } } }, "InputsTopic:Publisher": { "servers": [ "development" ], "bindings": { "kafka": { "topic": "InputsTopic", "bindingVersion": "0.4.0" } }, "publish": { "message": { "$ref": "#/components/messages/InputsTopic:Publisher:Message" } } } }, "components": { "messages": { "InputsTopic:QuestionsProcessing:Message": { "title": "InputsTopic:QuestionsProcessing:Message", "correlationId": { "location": "$message.header#/correlation_id" }, "payload": { "$ref": "#/components/schemas/CorporateRequest" } }, "InputsTopic:CorpAppsProcessing:Message": { "title": "InputsTopic:CorpAppsProcessing:Message", "correlationId": { "location": "$message.header#/correlation_id" }, "payload": { "$ref": "#/components/schemas/PrivateRequest" } }, "InputsTopic:FizAppsProcessing:Message": { "title": "InputsTopic:FizAppsProcessing:Message", "correlationId": { "location": "$message.header#/correlation_id" }, "payload": { "$ref": "#/components/schemas/QuestionRequest" } }, "InputsTopic:Publisher:Message": { "title": "InputsTopic:Publisher:Message", "correlationId": { "location": "$message.header#/correlation_id" }, "payload": { "$ref": "#/components/schemas/InputsTopic:Publisher:Message:Payload" } } }, "schemas": { "CorporateRequest": { "properties": { "moment": { "title": "Moment", "type": "string" }, "name": { "title": "Name", "type": "string" }, "subject": { "title": "Subject", "type": "string" }, "content": { "title": "Content", "type": "string" }, "inn": { "title": "Inn", "type": "string" } }, "required": [ "moment", "name", "subject", "content", "inn" ], "title": "CorporateRequest", "type": "object" }, "PrivateRequest": { "properties": { "moment": { "title": "Moment", "type": "string" }, "name": { "title": "Name", "type": "string" }, "subject": { "title": "Subject", "type": "string" }, "content": { "title": "Content", "type": "string" }, "phone_number": { "title": "Phone Number", "type": "string" }, "age": { "title": "Age", "type": "integer" } }, "required": [ "moment", "name", "subject", "content", "phone_number", "age" ], "title": "PrivateRequest", "type": "object" }, "QuestionRequest": { "properties": { "moment": { "title": "Moment", "type": "string" }, "name": { "title": "Name", "type": "string" }, "subject": { "title": "Subject", "type": "string" }, "content": { "title": "Content", "type": "string" }, "priority": { "title": "Priority", "type": "integer" } }, "required": [ "moment", "name", "subject", "content", "priority" ], "title": "QuestionRequest", "type": "object" }, "InputsTopic:Publisher:Message:Payload": { "anyOf": [ { "$ref": "#/components/schemas/CorporateRequest" }, { "$ref": "#/components/schemas/PrivateRequest" }, { "$ref": "#/components/schemas/QuestionRequest" } ], "title": "InputsTopic:Publisher:Message:Payload" } }, "securitySchemes": { "scram256": { "type": "scramSha256" } } } }
Наглядно посмотреть содержимое этой спецификации, подобно SwaggerUI для OpenAPI, можно в редакторе AsynAPIStudio, просто загрузив туда полученный JSON-файл. Редактор предлагает конвертировать спецификацию в версию 3.0 и делает это автоматически.
Поскольку в рассматриваемой задаче приложение-продюсер может публиковать в Kafka сообщения с разными структурами данных, в спецификации это необходимо отразить с помощью конструкции one of. Для этого в коде используется конструкция
RequestSchema = Union[CorporateRequest, PrivateRequest, QuestionRequest]
и декорированная функция публикации данных
# Декорированная функция для публикации данных @broker.publisher(topic, schema = RequestSchema) async def publish_fake_data(): data, part = generate_fake_data() # Публикуем данные с указанием partition await broker.publish(topic=topic, message=data.json(), partition=part)
Таким образом, FastStream позволяет реализовать подход CodeFirst благодаря автоматической генерации спецификации AsyncAPI и устранить разрыв между проектированием и реализацией.
Научитесь администрированию и эксплуатации Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Apache Kafka для инженеров данных
- Администрирование кластера Kafka
- Администрирование Apache Kafka в Kubernetes
Источники