Что такое AsyncAPI, зачем документировать спецификацию для EDA-архитектур и как это сделать. Создаем свою спецификацию для Apache Kafka с помощью веб-инструмента AsynсAPI Studio.
Что такое AsyncAPI
Подобно тому, как Swagger (OpenAPI ) стал стандартом де-факто для описания синхронного REST API, включая HTTP-методы запросов и ответы приложения на них со структурами данных полезной нагрузки, AsyncAPI является таким же аналогом для асинхронных архитектур. Это инициатива с открытым исходным кодом, представляющая собой машиночитаемый язык спецификаций с открытым исходным кодом для асинхронных API и архитектур, ориентированных на события (EDA, Event Driven Architecture). Ключевыми понятиями AsyncAPI являются следующие:
- Сервер – система обмена сообщениями, обеспечивающая связь между продюсером и потребителем. Сервер может быть брокером сообщений или сервисом с API WebSocket, который обеспечивает обмен сообщениями между браузером и сервером или между сервером.
- Продюсер – приложение генерирует и публикует события в виде сообщений, информируя об изменении существующих или создании новых данных. Приложение может быть микросервисом или IoT-устройством, может быть написано на любом языке программирования, если он поддерживает выбранный протокол. Приложение должно использовать протокол, поддерживаемый сервером, для подключения и обмена сообщениями.
- Потребитель – приложение, которое подписывается на канал сервера, потребляет новые сообщения и реагирует на них;
- Канал – механизм, созданный сервером для организации и передачи сообщений. Пользователи могут определять каналы как топик или очередь, включая ключ маршрутизации и путь в зависимости от используемого протокола. Серверы могут поддерживать множество экземпляров каналов, позволяя отправлять сообщения с разным содержимым в разные каналы. В зависимости от реализации сервера канал может быть включен в сообщение через заголовки, определенные протоколом.
- Приложение – в EDA-архитектуре приложение является продюсером или потребителем или тем и другим, как, например, в случае Kafka Streams. Приложения должны использовать протоколы, поддерживаемые сервером, чтобы подключаться к каналам и обмениваться сообщениями.
- Протокол – набор правил, определяющих способ обмена информацией между приложениями и/или серверами. Например, AMQP, HTTP, JMS, Kafka, Anypoint MQ, MQTT, Solace, STOMP, Mercure, WebSocket, Google Pub/Sub, Pulsar пр. Каждый раз, когда продюсер публикует сообщения, протокол передает их в канал, а затем потребителю, обеспечивая асинхронную передачу сообщений. Протокол имеет привязку (binding) —механизм определения информации, специфичной для протокола.
- Сообщение — коммуникационный актив, используемый для обмена информацией от отправителя (продюсера) к получателю (потребителю) через каналы. Одно сообщение может использоваться несколькими независимыми получателями, а также может быть определено как событие или команда. Отправитель отправляет полезную нагрузку данных, которые получателю необходимо обработать и сериализовать в соответствующий формат, например JSON, XML, двоичный и пр. Сообщение также может включать метаданные, т.е. его заголовки или свойства. Примечательно, что не все сообщения фактически являются событиями: некоторые из них могут быть запросами или командами, реализуя архитектурный паттерн CQRS. О проектировании событий в EDA-архитектуре мы писали здесь.


Также как OpenAPI, AsyncAPI не выступает в качестве уровня абстракции кода для выполнения вызовов самих систем обмена сообщениями, а представляет собой язык описания метаданных. Он используется для создания описаний ресурсов, таких как топики и очереди, события, производители и потребители, в формате YAML или JSON. Допускается только подмножество формата YAML, которое соответствует возможностям JSON. Кроме того, что AsyncAPI позволяет описывать приложения и их операции публикации и потребления потоков данных, экосистема инструментов AsyncAPI также включает генераторы кода, валидаторы, анализаторы и прочие полезные для разработчика средства.
Помимо структурных различий AsyncAPI от OpenAPI, важно отметить, что:
- AsyncAPI совместим со схемами OpenAPI;
- Полезная нагрузка сообщения в AsyncAPI может быть любым значением, а не только схемой AsyncAPI или OpenAPI. Например, это может быть схема AVRO;
- Серверный объект AsyncAPI почти идентичен своему аналогу OpenAPI, однако, вместе scheme используется protocol и есть свойство его версии (protocolVersion);
- Параметры пути OpenAPI и параметры канала AsyncAPI немного отличаются, поскольку в AsyncAPI нет понятия запроса (request) и cookie, а параметры заголовка могут быть определены в объекте сообщения. Таким образом, параметры канала AsyncAPI эквивалентны параметрам пути OpenAPI.

Последней текущей версией AsyncAPI на сегодня является 2.6.0, а 3.0.0 пока еще находится в статусе пред-релиза. Хотя спецификация AsyncAPI не предполагает какой-либо топологии, архитектуры или шаблона программного обеспечения, механизм привязок позволяет предоставить более конкретную информацию о протоколе, обеспечивающем передачу сообщений. Познакомившись с базовыми понятиями AsyncAPI, составим собственную спецификацию на примере Apache Kafka.
Практический пример: как составить свою спецификацию
В качестве практического примера я возьму ранее реализованную систему из приложения-продюсера и приложения-потребителя, которые обмениваются данными через экземпляр Apache Kafka, развернутый в облачной serverless-платформе Upstash. Исходный код этих Python-приложений рассмотрен в предыдущей статье.
Приложение-продюсер отправляет в Kafka сообщения – обращения клиентов интернет-магазина с данными в JSON-структуре. Полезная нагрузка сообщения может содержать вопросы по работе магазина, оплате, доставке или скидкам. Также обращение клиента вместо вопроса может содержать заявку на покупка товара – от физлица (индивидуальная заявка) или от юрлица (корпоративная заявка). Распределение сообщений по разделам топика Kafka обусловлена бизнес-логикой: все вопросы должны попадать в раздел 0, заявки на покупку товара от корпоративных клиентов – в раздел 1, а от частных лиц – в раздел 2.
Таким образом, Kafka в терминологии AsyncAPI является протоколом, брокер этой распределенной платформы обмена сообщениями – сервером, а раздел топика – каналом. На основании постановки рассматриваемой задачи, исходного кода своих Python-приложений, настроек экземпляра Kafka и структуры AsyncAPI 2.6, я составила следующую YAML-спецификацию асинхронного обмена данными:
asyncapi: '2.6.0'
id: 'urn:my_async_producer_app:server'
info:
title: Async API specification for Kafka
version: '1.0.0'
description: 'Спецификация асинхронного обмена данными через Kafka'
servers:
dev:
url: my-kafka.upstash.io:9092
protocol: kafka-secure
description: 'параметры брокера Kafka'
security:
- saslScram: [SCRAM-SHA-256]
defaultContentType: application/json
channels:
publishing.{eventId}:
description: приложение-продюсер
parameters:
eventId:
$ref: '#/components/parameters/eventId'
publish:
summary: 'Генерация события, отправляемого с клиентского приложения'
operationId: PublishNewEvent
traits:
- $ref: '#/components/operationTraits/kafka'
message:
oneOf:
- $ref: '#/components/messages/question'
- $ref: '#/components/messages/ind_app'
- $ref: '#/components/messages/corp_app'
bindings:
kafka:
key:
type: string
enum: ['question', 'ind_app', 'corp_app']
questions:
description: 'раздел для вопросов'
subscribe:
operationId: questions_performing
traits:
- $ref: '#/components/operationTraits/kafka'
message:
$ref: '#/components/messages/question'
ind_apps:
description: 'раздел для индивидуальных заявок'
subscribe:
operationId: ind_apps_performing
traits:
- $ref: '#/components/operationTraits/kafka'
message:
$ref: '#/components/messages/ind_app'
corp_apps:
description: 'раздел для корпоративных заявок'
subscribe:
operationId: corp_apps_performing
traits:
- $ref: '#/components/operationTraits/kafka'
message:
$ref: '#/components/messages/corp_app'
components:
messages:
question:
name: question
title: question
summary: 'в раздел 0 попадают все вопросы'
contentType: application/json
traits:
- $ref: '#/components/messageTraits/question_header'
payload:
$ref: "#/components/schemas/question"
ind_app:
name: ind_app
title: ind_app
summary: 'в раздел 1 попадают все индивидуальные заявки'
contentType: application/json
traits:
- $ref: '#/components/messageTraits/ind_app_header'
payload:
$ref: "#/components/schemas/ind_app"
corp_app:
name: corp_app
title: corp_app
summary: 'в раздел 1 попадают корпоративные заявки'
contentType: application/json
traits:
- $ref: '#/components/messageTraits/corp_app_header'
payload:
$ref: "#/components/schemas/corp_app"
schemas:
question:
type: object
description: 'вопрос по работе магазина'
properties:
id:
type: string
description: 'q-x-xx-xx-xxxx'
name:
type: string
subject:
type: string
content:
type: string
sentAt:
$ref: "#/components/schemas/sentAt"
ind_app:
type: object
description: 'индивидуальная заявка на покупку продукта'
properties:
id:
type: string
description: 'ia-x-xx-xxx-xxx'
client_name:
type: string
subject:
type: string
content:
type: object
properties:
product:
type: string
$ref: "#/components/schemas/products"
quantity:
type: integer
corp_app:
type: object
description: 'корпоративная заявка на покупку продукта'
properties:
id:
type: string
description: 'ca-x-xx-xxx-xxx'
company_name:
type: string
subject:
type: string
content:
type: object
properties:
product:
type: string
$ref: "#/components/schemas/products"
quantity:
type: integer
sentAt:
$ref: "#/components/schemas/sentAt"
sentAt:
type: string
format: date-time
description: Date and time when the message was sent.
products:
type: string
enum:
- milk
- water
- cofee
- tea
- apple
- orange
- bread
securitySchemes:
saslScram:
type: scramSha256
description: протокол безопасности SASL_SSL, механизм безопасности SCRAM-SHA-256
parameters:
eventId:
description: идентификатор события
schema:
type: string
description: 'x-xx:xx-xx-xx-xxx'
messageTraits:
question_header:
headers:
type: object
properties:
header:
type: string
enum:
- payment
- delivery
- discount
- vip
- staff
description: 'тема вопроса'
ind_app_header:
headers:
type: object
properties:
header:
type: string
enum:
- ind_app_buy
description: 'заголовок индивидуальной заявки'
corp_app_header:
headers:
type: object
properties:
header:
type: string
enum:
- corp_app_buy
description: 'заголовок корпоративной заявки'
operationTraits:
kafka:
bindings:
kafka:
topic: 'inputs_topic'
partitions: 3
replicas: 3
topicConfiguration:
cleanup.policy: ["delete", "compact"]
retention.ms: 604800000
retention.bytes: 1000000000
delete.retention.ms: 86400000
max.message.bytes: 1048588
bindingVersion: '0.4.0'
Для валидации этой спецификации и ее визуализации я использовала замечательный веб-инструмент AsyncAPI Studio, который позволяет даже без регистрации составить свой документ и проверить его на наличие ошибок. Для корректной спецификации редактор выводит UI, похожий на Swagger.

Визуализированная в UI спецификация очень удобна для тестирования: за счет полей, описывающих метаданные, например, summary или description, можно документировать структуры данных и особенности протокола, сервера и приложения, в т.ч. с примерами. Задаваемые через enum перечисления также отображаются и используются для автогенерации примеров.

Наконец, еще одной полезной функцией AsyncAPI Studio является визуализация потокового конвейера отправки сообщений от продюсера через сервер к каналам, на которые может подписаться потребитель. Примечательно, что это диаграмма строится автоматически, на основании заданной YAML-спецификации.

Таким образом, личный опыт показал, что AsyncAPI Studio является удобным (но не единственным) инструментом для валидации и визуализации спецификации асинхронного обмена данными. А сама спецификация AsyncAPI – отличный способ документировать не только структуры данных в полезной нагрузке сообщения, но и параметры сервера, включая определение разделов и различных конфигураций брокера. О том, как автоматически сгенерировать подобную спецификацию из кода с помощью библиотеки FastStream, я рассказываю здесь. А про аналог подобной спецификации для событий происхождения данных читайте в этой новой статье.
Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники


