Автогенерация AsyncAPI-спецификации для Kafka с FastStream: практический пример

Kafka курсы примеры обучение, Kafka для администратора кластера, Kafka примеры курсы обучение дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Как получить спецификацию AsyncAPI из кода с помощью декораторов функций публикации и потребления сообщений средствами Python-библиотеки FastStream: простой пример потокового конвейера на Apache Kafka.

Еще раз про FastStream и спецификацию AsyncAPI

Вчера я рассказывала про Python-библиотеку FastStream для разработки потоковых конвейеров на Apache Kafka, RabbitMQ, NATS и Redis. Помимо мощного, но довольно простого API многопоточной асинхронной обработки данных, FastStream также позволяет автоматически генерировать спецификацию AsyncAPI прямо из кода. Эта спецификация декларативно в формате YAML или JSON описывает ключевые сущности потоковой асинхронной интеграции приложений:

  • параметры сервера, который обеспечивает обмен сообщениями между продюсерами и потребителями;
  • операции публикации и потребления сообщений;
  • схемы публикуемых и потребляемых сообщений.

Спецификация AsyncAPI похожа на OpenAPI и основана на ней, кроме ключевых отличий синхронного и асинхронного взаимодействия. Параметры канала AsyncAPI эквивалентны параметрам пути OpenAPВ, понятия запроса и cookie отсутствуют, а параметры заголовка могут быть определены в объекте сообщения. FastStream позволяет автоматически сгенерировать такую спецификацию с использованием декораторов функций потребления и публикации сообщений: @broker.subscriber(…) и @broker.publisher(…).

Постановка задачи

Чтобы понять, как это работает, в качестве наглядного примера возьмем типичный кейс из прошлой статьи про публикацию клиентских обращений в топик Kafka: заявки на покупку продуктов в интернет-магазине от физических или юридических лиц или вопросы покупателей. Топик под названием InputsTopic разделен на 3 раздела, маршрутизация сообщений по которым зависит от типа обращения:

  • корпоративные заявки от юрлиц публикуются в раздел 0;
  • заявки от частных лиц публикуются в раздел 1;
  • все вопросы публикуются в раздел 2.
Схема потоковой публикации данных в Kafka
Схема потоковой публикации данных в Kafka

Данные обращений в формате JSON имеют похожий, но немного отличающийся набор полей.

Схемы публикуемых JSON-сообщений
Схемы публикуемых JSON-сообщений

Практическая реализация

В качестве среды разработки будем использовать Google Colab. Сначала установим необходимые пакеты и импортируем модули:

!pip install faststream[kafka]
!pip install faker
!pip install nest_asyncio
# Настройка логирования
logging.basicConfig(level=logging.INFO)

Затем сформируем Python-файл с определением потокового конвейера. В отличие от прошлой статьи, задекларируем здесь не только публикацию сообщений в Kafka, но и их потребление, чтобы отразить это в генерируемой спецификации AsyncAPI. А вот описывать саму логику формирования полезной нагрузки нет смысла. Поэтому код для автогенерации спецификации AsynAPI будет содержать только декларации сервера, операции публикации и потребления сообщений и описания их структур данных.

#файл для автогенерации спецификации AsyncAPI
%%writefile kafka_stream_pipeline.py
#импорт модулей
import json
import random
import time
import asyncio
import nest_asyncio
import logging
import ssl
from datetime import datetime
from time import sleep
from faststream import FastStream, Logger
from faststream.kafka import KafkaBroker, TopicPartition
from faststream.security import BaseSecurity, SASLScram256
from dataclasses import dataclass, asdict
from typing import Union

# Импорт модуля faker
from faker import Faker
from faker.providers.address.ru_RU import Provider

# Определение параметров подключения к Kafka
kafka_url="kafka-host:9092"
username="kafka-user"
password="kafka-user-password"

#Параметры подключения к Kafka
class SecurityConfig:
       def __init__(self, sasl_mechanism, security_protocol, sasl_plain_username, sasl_plain_password, use_ssl):
           self.sasl_mechanism = sasl_mechanism
           self.security_protocol = security_protocol
           self.sasl_plain_username = sasl_plain_username
           self.sasl_plain_password = sasl_plain_password
           self.use_ssl = use_ssl

ssl_context = ssl.create_default_context()
security = SASLScram256(
    ssl_context=ssl_context,
    username=username,
    password=password,
)

#Определение топика Kafka
topic = "InputsTopic"

# Конструктор KafkaBroker с объектом безопасности
broker = KafkaBroker([kafka_url], security=security)

# Инициализация FastStream приложения
app = FastStream(broker)

# Определение классов
# Базовый класс для запросов
@dataclass
class RequestData:
    moment: str
    name: str
    subject: str
    content: str

# Класс для корпоративных запросов
@dataclass
class CorporateRequest(RequestData):
    inn: str

# Класс для частных запросов
@dataclass
class PrivateRequest(RequestData):
    phone_number: str
    age: int

# Класс для заявок с темой "question"
@dataclass
class QuestionRequest(RequestData):
    priority: int

# Варианты схем публикуемых сообщений
RequestSchema = Union[CorporateRequest, PrivateRequest, QuestionRequest]

# Декорированная функция для публикации данных
@broker.publisher(topic, schema = RequestSchema)
async def publish_fake_data(msg: RequestSchema):
    logging.info(msg)

# Декорированная функция для потребления данных из раздела 0
@broker.subscriber(partitions=[TopicPartition(topic, 0)] )
async def questions_processing(msg: CorporateRequest):
    logging.info(msg)

# Декорированная функция для потребления данных из раздела 1
@broker.subscriber(partitions=[TopicPartition(topic, 1)] )
async def corp_apps_processing(msg: PrivateRequest):
    logging.info(msg)

# Декорированная функция для потребления данных из раздела 2
@broker.subscriber(partitions=[TopicPartition(topic, 2)] )
async def fiz_apps_processing(msg: QuestionRequest):
    logging.info(msg)

Выполним команду генерации  AsyncAPI-спецификации средствами библиотеки FastStream:

!faststream docs gen kafka_stream_pipeline:app

В результате выполнения этой команды получим JSON-файл сгенерированной спецификации.

Автоматическая генерация спецификации AsyncAPI с помощью библиотеки FastStream
Автоматическая генерация спецификации AsyncAPI с помощью библиотеки FastStream

Спецификация представляет собой JSON-документ по стандарту AsyncAPI 2.6.0:

{
  "asyncapi": "2.6.0",
  "defaultContentType": "application/json",
  "info": {
    "title": "FastStream",
    "version": "0.1.0",
    "description": ""
  },
  "servers": {
    "development": {
      "url": "kafka-host:9092",
      "protocol": "kafka-secure",
      "protocolVersion": "auto",
      "security": [
        {
          "scram256": []
        }
      ]
    }
  },
  "channels": {
    "InputsTopic:QuestionsProcessing": {
      "servers": [
        "development"
      ],
      "bindings": {
        "kafka": {
          "topic": "InputsTopic",
          "bindingVersion": "0.4.0"
        }
      },
      "subscribe": {
        "message": {
          "$ref": "#/components/messages/InputsTopic:QuestionsProcessing:Message"
        }
      }
    },
    "InputsTopic:CorpAppsProcessing": {
      "servers": [
        "development"
      ],
      "bindings": {
        "kafka": {
          "topic": "InputsTopic",
          "bindingVersion": "0.4.0"
        }
      },
      "subscribe": {
        "message": {
          "$ref": "#/components/messages/InputsTopic:CorpAppsProcessing:Message"
        }
      }
    },
    "InputsTopic:FizAppsProcessing": {
      "servers": [
        "development"
      ],
      "bindings": {
        "kafka": {
          "topic": "InputsTopic",
          "bindingVersion": "0.4.0"
        }
      },
      "subscribe": {
        "message": {
          "$ref": "#/components/messages/InputsTopic:FizAppsProcessing:Message"
        }
      }
    },
    "InputsTopic:Publisher": {
      "servers": [
        "development"
      ],
      "bindings": {
        "kafka": {
          "topic": "InputsTopic",
          "bindingVersion": "0.4.0"
        }
      },
      "publish": {
        "message": {
          "$ref": "#/components/messages/InputsTopic:Publisher:Message"
        }
      }
    }
  },
  "components": {
    "messages": {
      "InputsTopic:QuestionsProcessing:Message": {
        "title": "InputsTopic:QuestionsProcessing:Message",
        "correlationId": {
          "location": "$message.header#/correlation_id"
        },
        "payload": {
          "$ref": "#/components/schemas/CorporateRequest"
        }
      },
      "InputsTopic:CorpAppsProcessing:Message": {
        "title": "InputsTopic:CorpAppsProcessing:Message",
        "correlationId": {
          "location": "$message.header#/correlation_id"
        },
        "payload": {
          "$ref": "#/components/schemas/PrivateRequest"
        }
      },
      "InputsTopic:FizAppsProcessing:Message": {
        "title": "InputsTopic:FizAppsProcessing:Message",
        "correlationId": {
          "location": "$message.header#/correlation_id"
        },
        "payload": {
          "$ref": "#/components/schemas/QuestionRequest"
        }
      },
      "InputsTopic:Publisher:Message": {
        "title": "InputsTopic:Publisher:Message",
        "correlationId": {
          "location": "$message.header#/correlation_id"
        },
        "payload": {
          "$ref": "#/components/schemas/InputsTopic:Publisher:Message:Payload"
        }
      }
    },
    "schemas": {
      "CorporateRequest": {
        "properties": {
          "moment": {
            "title": "Moment",
            "type": "string"
          },
          "name": {
            "title": "Name",
            "type": "string"
          },
          "subject": {
            "title": "Subject",
            "type": "string"
          },
          "content": {
            "title": "Content",
            "type": "string"
          },
          "inn": {
            "title": "Inn",
            "type": "string"
          }
        },
        "required": [
          "moment",
          "name",
          "subject",
          "content",
          "inn"
        ],
        "title": "CorporateRequest",
        "type": "object"
      },
      "PrivateRequest": {
        "properties": {
          "moment": {
            "title": "Moment",
            "type": "string"
          },
          "name": {
            "title": "Name",
            "type": "string"
          },
          "subject": {
            "title": "Subject",
            "type": "string"
          },
          "content": {
            "title": "Content",
            "type": "string"
          },
          "phone_number": {
            "title": "Phone Number",
            "type": "string"
          },
          "age": {
            "title": "Age",
            "type": "integer"
          }
        },
        "required": [
          "moment",
          "name",
          "subject",
          "content",
          "phone_number",
          "age"
        ],
        "title": "PrivateRequest",
        "type": "object"
      },
      "QuestionRequest": {
        "properties": {
          "moment": {
            "title": "Moment",
            "type": "string"
          },
          "name": {
            "title": "Name",
            "type": "string"
          },
          "subject": {
            "title": "Subject",
            "type": "string"
          },
          "content": {
            "title": "Content",
            "type": "string"
          },
          "priority": {
            "title": "Priority",
            "type": "integer"
          }
        },
        "required": [
          "moment",
          "name",
          "subject",
          "content",
          "priority"
        ],
        "title": "QuestionRequest",
        "type": "object"
      },
      "InputsTopic:Publisher:Message:Payload": {
        "anyOf": [
          {
            "$ref": "#/components/schemas/CorporateRequest"
          },
          {
            "$ref": "#/components/schemas/PrivateRequest"
          },
          {
            "$ref": "#/components/schemas/QuestionRequest"
          }
        ],
        "title": "InputsTopic:Publisher:Message:Payload"
      }
    },
    "securitySchemes": {
      "scram256": {
        "type": "scramSha256"
      }
    }
  }
}

Наглядно посмотреть содержимое этой спецификации, подобно SwaggerUI для OpenAPI, можно в редакторе AsynAPIStudio, просто загрузив туда полученный JSON-файл. Редактор предлагает конвертировать спецификацию в версию 3.0 и делает это автоматически.

Просмотр сгенерированной AsynAPI-спецификации в AsynAPIStudio
Просмотр сгенерированной AsynAPI-спецификации в AsynAPIStudio

Поскольку в рассматриваемой задаче приложение-продюсер может публиковать в Kafka сообщения с разными структурами данных, в спецификации это необходимо отразить с помощью конструкции one of. Для этого в коде используется конструкция

RequestSchema = Union[CorporateRequest, PrivateRequest, QuestionRequest]

и декорированная функция публикации данных

# Декорированная функция для публикации данных
@broker.publisher(topic, schema = RequestSchema)
async def publish_fake_data():
    data, part = generate_fake_data()
    # Публикуем данные с указанием partition
    await broker.publish(topic=topic, message=data.json(), partition=part)
Просмотр схем сообщений в AsynAPIStudio
Просмотр схем сообщений в AsynAPIStudio

Таким образом, FastStream позволяет реализовать подход CodeFirst благодаря автоматической генерации спецификации AsyncAPI и устранить разрыв между проектированием и реализацией.

Научитесь администрированию и эксплуатации Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://faststream.airt.ai/latest/faststream/
  2. https://studio.asyncapi.com/

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту