Что такое AsyncAPI, зачем документировать спецификацию для EDA-архитектур и как это сделать. Создаем свою спецификацию для Apache Kafka с помощью веб-инструмента AsynсAPI Studio.
Что такое AsyncAPI
Подобно тому, как Swagger (OpenAPI ) стал стандартом де-факто для описания синхронного REST API, включая HTTP-методы запросов и ответы приложения на них со структурами данных полезной нагрузки, AsyncAPI является таким же аналогом для асинхронных архитектур. Это инициатива с открытым исходным кодом, представляющая собой машиночитаемый язык спецификаций с открытым исходным кодом для асинхронных API и архитектур, ориентированных на события (EDA, Event Driven Architecture). Ключевыми понятиями AsyncAPI являются следующие:
- Сервер – система обмена сообщениями, обеспечивающая связь между продюсером и потребителем. Сервер может быть брокером сообщений или сервисом с API WebSocket, который обеспечивает обмен сообщениями между браузером и сервером или между сервером.
- Продюсер – приложение генерирует и публикует события в виде сообщений, информируя об изменении существующих или создании новых данных. Приложение может быть микросервисом или IoT-устройством, может быть написано на любом языке программирования, если он поддерживает выбранный протокол. Приложение должно использовать протокол, поддерживаемый сервером, для подключения и обмена сообщениями.
- Потребитель – приложение, которое подписывается на канал сервера, потребляет новые сообщения и реагирует на них;
- Канал – механизм, созданный сервером для организации и передачи сообщений. Пользователи могут определять каналы как топик или очередь, включая ключ маршрутизации и путь в зависимости от используемого протокола. Серверы могут поддерживать множество экземпляров каналов, позволяя отправлять сообщения с разным содержимым в разные каналы. В зависимости от реализации сервера канал может быть включен в сообщение через заголовки, определенные протоколом.
- Приложение – в EDA-архитектуре приложение является продюсером или потребителем или тем и другим, как, например, в случае Kafka Streams. Приложения должны использовать протоколы, поддерживаемые сервером, чтобы подключаться к каналам и обмениваться сообщениями.
- Протокол – набор правил, определяющих способ обмена информацией между приложениями и/или серверами. Например, AMQP, HTTP, JMS, Kafka, Anypoint MQ, MQTT, Solace, STOMP, Mercure, WebSocket, Google Pub/Sub, Pulsar пр. Каждый раз, когда продюсер публикует сообщения, протокол передает их в канал, а затем потребителю, обеспечивая асинхронную передачу сообщений. Протокол имеет привязку (binding) —механизм определения информации, специфичной для протокола.
- Сообщение — коммуникационный актив, используемый для обмена информацией от отправителя (продюсера) к получателю (потребителю) через каналы. Одно сообщение может использоваться несколькими независимыми получателями, а также может быть определено как событие или команда. Отправитель отправляет полезную нагрузку данных, которые получателю необходимо обработать и сериализовать в соответствующий формат, например JSON, XML, двоичный и пр. Сообщение также может включать метаданные, т.е. его заголовки или свойства. Примечательно, что не все сообщения фактически являются событиями: некоторые из них могут быть запросами или командами, реализуя архитектурный паттерн CQRS. О проектировании событий в EDA-архитектуре мы писали здесь.
Также как OpenAPI, AsyncAPI не выступает в качестве уровня абстракции кода для выполнения вызовов самих систем обмена сообщениями, а представляет собой язык описания метаданных. Он используется для создания описаний ресурсов, таких как топики и очереди, события, производители и потребители, в формате YAML или JSON. Допускается только подмножество формата YAML, которое соответствует возможностям JSON. Кроме того, что AsyncAPI позволяет описывать приложения и их операции публикации и потребления потоков данных, экосистема инструментов AsyncAPI также включает генераторы кода, валидаторы, анализаторы и прочие полезные для разработчика средства.
Помимо структурных различий AsyncAPI от OpenAPI, важно отметить, что:
- AsyncAPI совместим со схемами OpenAPI;
- Полезная нагрузка сообщения в AsyncAPI может быть любым значением, а не только схемой AsyncAPI или OpenAPI. Например, это может быть схема AVRO;
- Серверный объект AsyncAPI почти идентичен своему аналогу OpenAPI, однако, вместе scheme используется protocol и есть свойство его версии (protocolVersion);
- Параметры пути OpenAPI и параметры канала AsyncAPI немного отличаются, поскольку в AsyncAPI нет понятия запроса (request) и cookie, а параметры заголовка могут быть определены в объекте сообщения. Таким образом, параметры канала AsyncAPI эквивалентны параметрам пути OpenAPI.
Последней текущей версией AsyncAPI на сегодня является 2.6.0, а 3.0.0 пока еще находится в статусе пред-релиза. Хотя спецификация AsyncAPI не предполагает какой-либо топологии, архитектуры или шаблона программного обеспечения, механизм привязок позволяет предоставить более конкретную информацию о протоколе, обеспечивающем передачу сообщений. Познакомившись с базовыми понятиями AsyncAPI, составим собственную спецификацию на примере Apache Kafka.
Практический пример: как составить свою спецификацию
В качестве практического примера я возьму ранее реализованную систему из приложения-продюсера и приложения-потребителя, которые обмениваются данными через экземпляр Apache Kafka, развернутый в облачной serverless-платформе Upstash. Исходный код этих Python-приложений рассмотрен в предыдущей статье.
Приложение-продюсер отправляет в Kafka сообщения – обращения клиентов интернет-магазина с данными в JSON-структуре. Полезная нагрузка сообщения может содержать вопросы по работе магазина, оплате, доставке или скидкам. Также обращение клиента вместо вопроса может содержать заявку на покупка товара – от физлица (индивидуальная заявка) или от юрлица (корпоративная заявка). Распределение сообщений по разделам топика Kafka обусловлена бизнес-логикой: все вопросы должны попадать в раздел 0, заявки на покупку товара от корпоративных клиентов – в раздел 1, а от частных лиц – в раздел 2.
Таким образом, Kafka в терминологии AsyncAPI является протоколом, брокер этой распределенной платформы обмена сообщениями – сервером, а раздел топика – каналом. На основании постановки рассматриваемой задачи, исходного кода своих Python-приложений, настроек экземпляра Kafka и структуры AsyncAPI 2.6, я составила следующую YAML-спецификацию асинхронного обмена данными:
asyncapi: '2.6.0' id: 'urn:my_async_producer_app:server' info: title: Async API specification for Kafka version: '1.0.0' description: 'Спецификация асинхронного обмена данными через Kafka' servers: dev: url: my-kafka.upstash.io:9092 protocol: kafka-secure description: 'параметры брокера Kafka' security: - saslScram: [SCRAM-SHA-256] defaultContentType: application/json channels: publishing.{eventId}: description: приложение-продюсер parameters: eventId: $ref: '#/components/parameters/eventId' publish: summary: 'Генерация события, отправляемого с клиентского приложения' operationId: PublishNewEvent traits: - $ref: '#/components/operationTraits/kafka' message: oneOf: - $ref: '#/components/messages/question' - $ref: '#/components/messages/ind_app' - $ref: '#/components/messages/corp_app' bindings: kafka: key: type: string enum: ['question', 'ind_app', 'corp_app'] questions: description: 'раздел для вопросов' subscribe: operationId: questions_performing traits: - $ref: '#/components/operationTraits/kafka' message: $ref: '#/components/messages/question' ind_apps: description: 'раздел для индивидуальных заявок' subscribe: operationId: ind_apps_performing traits: - $ref: '#/components/operationTraits/kafka' message: $ref: '#/components/messages/ind_app' corp_apps: description: 'раздел для корпоративных заявок' subscribe: operationId: corp_apps_performing traits: - $ref: '#/components/operationTraits/kafka' message: $ref: '#/components/messages/corp_app' components: messages: question: name: question title: question summary: 'в раздел 0 попадают все вопросы' contentType: application/json traits: - $ref: '#/components/messageTraits/question_header' payload: $ref: "#/components/schemas/question" ind_app: name: ind_app title: ind_app summary: 'в раздел 1 попадают все индивидуальные заявки' contentType: application/json traits: - $ref: '#/components/messageTraits/ind_app_header' payload: $ref: "#/components/schemas/ind_app" corp_app: name: corp_app title: corp_app summary: 'в раздел 1 попадают корпоративные заявки' contentType: application/json traits: - $ref: '#/components/messageTraits/corp_app_header' payload: $ref: "#/components/schemas/corp_app" schemas: question: type: object description: 'вопрос по работе магазина' properties: id: type: string description: 'q-x-xx-xx-xxxx' name: type: string subject: type: string content: type: string sentAt: $ref: "#/components/schemas/sentAt" ind_app: type: object description: 'индивидуальная заявка на покупку продукта' properties: id: type: string description: 'ia-x-xx-xxx-xxx' client_name: type: string subject: type: string content: type: object properties: product: type: string $ref: "#/components/schemas/products" quantity: type: integer corp_app: type: object description: 'корпоративная заявка на покупку продукта' properties: id: type: string description: 'ca-x-xx-xxx-xxx' company_name: type: string subject: type: string content: type: object properties: product: type: string $ref: "#/components/schemas/products" quantity: type: integer sentAt: $ref: "#/components/schemas/sentAt" sentAt: type: string format: date-time description: Date and time when the message was sent. products: type: string enum: - milk - water - cofee - tea - apple - orange - bread securitySchemes: saslScram: type: scramSha256 description: протокол безопасности SASL_SSL, механизм безопасности SCRAM-SHA-256 parameters: eventId: description: идентификатор события schema: type: string description: 'x-xx:xx-xx-xx-xxx' messageTraits: question_header: headers: type: object properties: header: type: string enum: - payment - delivery - discount - vip - staff description: 'тема вопроса' ind_app_header: headers: type: object properties: header: type: string enum: - ind_app_buy description: 'заголовок индивидуальной заявки' corp_app_header: headers: type: object properties: header: type: string enum: - corp_app_buy description: 'заголовок корпоративной заявки' operationTraits: kafka: bindings: kafka: topic: 'inputs_topic' partitions: 3 replicas: 3 topicConfiguration: cleanup.policy: ["delete", "compact"] retention.ms: 604800000 retention.bytes: 1000000000 delete.retention.ms: 86400000 max.message.bytes: 1048588 bindingVersion: '0.4.0'
Для валидации этой спецификации и ее визуализации я использовала замечательный веб-инструмент AsyncAPI Studio, который позволяет даже без регистрации составить свой документ и проверить его на наличие ошибок. Для корректной спецификации редактор выводит UI, похожий на Swagger.
Визуализированная в UI спецификация очень удобна для тестирования: за счет полей, описывающих метаданные, например, summary или description, можно документировать структуры данных и особенности протокола, сервера и приложения, в т.ч. с примерами. Задаваемые через enum перечисления также отображаются и используются для автогенерации примеров.
Наконец, еще одной полезной функцией AsyncAPI Studio является визуализация потокового конвейера отправки сообщений от продюсера через сервер к каналам, на которые может подписаться потребитель. Примечательно, что это диаграмма строится автоматически, на основании заданной YAML-спецификации.
Таким образом, личный опыт показал, что AsyncAPI Studio является удобным (но не единственным) инструментом для валидации и визуализации спецификации асинхронного обмена данными. А сама спецификация AsyncAPI – отличный способ документировать не только структуры данных в полезной нагрузке сообщения, но и параметры сервера, включая определение разделов и различных конфигураций брокера.
Про аналог подобной спецификации для событий происхождения данных читайте в нашей новой статье.
Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники
- https://www.asyncapi.com/docs/reference/specification/v2.6.0
- https://dalelane.co.uk/blog/?p=4219
- https://docs.confluent.io/cloud/current/stream-governance/async-api.html
- https://github.com/asyncapi/bindings/tree/master/kafka