Swagger для асинхрона: составляем спецификацию AsyncAPI на примере Apache Kafka

спецификация AsyncAPI Kafka для аналитиков и разработчиков примеры курсы обучение, Kafka EDA архитектура, Kafka для архитекторов и разработчиков, архитектура данных и приложений Big Data Kafka обучение примеры курсы, разработка Kafka-приложений, обучение Kafka, курсы Kafka, Apache Kafka для инженеров и разработчиков, обучение разработчиков Kafka и дата-инженеров, проектирование архитектуры приложений Kafka, Kafka EDA примеры курсы обучение, обучение большим данным, Школа Больших Данных Учебный Центр Коммерсант

Что такое AsyncAPI, зачем документировать  спецификацию для EDA-архитектур и как это сделать. Создаем свою спецификацию для Apache Kafka с помощью веб-инструмента AsynсAPI Studio.

Что такое AsyncAPI

Подобно тому, как Swagger (OpenAPI ) стал стандартом де-факто для описания синхронного REST API, включая HTTP-методы запросов и ответы приложения на них со структурами данных полезной нагрузки, AsyncAPI является таким же аналогом для асинхронных архитектур. Это инициатива с открытым исходным кодом, представляющая собой машиночитаемый язык спецификаций с открытым исходным кодом для асинхронных API и архитектур, ориентированных на события (EDA, Event Driven Architecture). Ключевыми понятиями AsyncAPI являются следующие:

  • Сервер – система обмена сообщениями, обеспечивающая связь между продюсером и потребителем. Сервер может быть брокером сообщений или сервисом с API WebSocket, который обеспечивает обмен сообщениями между браузером и сервером или между сервером.
  • Продюсер – приложение генерирует и публикует события в виде сообщений, информируя об изменении существующих или создании новых данных. Приложение может быть микросервисом или IoT-устройством, может быть написано на любом языке программирования, если он поддерживает выбранный протокол. Приложение должно использовать протокол, поддерживаемый сервером, для подключения и обмена сообщениями.
  • Потребитель – приложение, которое подписывается на канал сервера, потребляет новые сообщения и реагирует на них;
  • Канал – механизм, созданный сервером для организации и передачи сообщений. Пользователи могут определять каналы как топик или очередь, включая ключ маршрутизации и путь в зависимости от используемого протокола. Серверы могут поддерживать множество экземпляров каналов, позволяя отправлять сообщения с разным содержимым в разные каналы. В зависимости от реализации сервера канал может быть включен в сообщение через заголовки, определенные протоколом.
  • Приложение – в EDA-архитектуре приложение является продюсером или потребителем или тем и другим, как, например, в случае Kafka Streams. Приложения должны использовать протоколы, поддерживаемые сервером, чтобы подключаться к каналам и обмениваться сообщениями.
  • Протокол – набор правил, определяющих способ обмена информацией между приложениями и/или серверами. Например, AMQP, HTTP, JMS, Kafka, Anypoint MQ, MQTT, Solace, STOMP, Mercure, WebSocket, Google Pub/Sub, Pulsar пр. Каждый раз, когда продюсер публикует сообщения, протокол передает их в канал, а затем потребителю, обеспечивая асинхронную передачу сообщений. Протокол имеет привязку (binding) —механизм определения информации, специфичной для протокола.
  • Сообщениекоммуникационный актив, используемый для обмена информацией от отправителя (продюсера) к получателю (потребителю) через каналы. Одно сообщение может использоваться несколькими независимыми получателями, а также может быть определено как событие или команда. Отправитель отправляет полезную нагрузку данных, которые получателю необходимо обработать и сериализовать в соответствующий формат, например JSON, XML, двоичный и пр. Сообщение также может включать метаданные, т.е. его заголовки или свойства. Примечательно, что не все сообщения фактически являются событиями: некоторые из них могут быть запросами или командами, реализуя архитектурный паттерн CQRS. О проектировании событий в EDA-архитектуре мы писали здесь.

Ключевые понятия AsyncAPI и EDA-архитектуры
Ключевые понятия AsyncAPI и EDA-архитектуры
Виды сообщений
Виды сообщений

Также как OpenAPI, AsyncAPI не выступает в качестве уровня абстракции кода для выполнения вызовов самих систем обмена сообщениями, а представляет собой язык описания метаданных. Он используется для создания описаний ресурсов, таких как топики и очереди, события, производители и потребители, в формате YAML или JSON. Допускается только подмножество формата YAML, которое соответствует возможностям JSON. Кроме того, что AsyncAPI позволяет описывать приложения и их операции публикации и потребления потоков данных, экосистема инструментов AsyncAPI также включает генераторы кода, валидаторы, анализаторы и прочие полезные для разработчика средства.

Помимо структурных различий AsyncAPI от OpenAPI, важно отметить, что:

  • AsyncAPI совместим со схемами OpenAPI;
  • Полезная нагрузка сообщения в AsyncAPI может быть любым значением, а не только схемой AsyncAPI или OpenAPI. Например, это может быть схема AVRO;
  • Серверный объект AsyncAPI почти идентичен своему аналогу OpenAPI, однако, вместе scheme используется protocol и есть свойство его версии (protocolVersion);
  • Параметры пути OpenAPI и параметры канала AsyncAPI немного отличаются, поскольку в AsyncAPI нет понятия запроса (request) и cookie, а параметры заголовка могут быть определены в объекте сообщения. Таким образом, параметры канала AsyncAPI эквивалентны параметрам пути OpenAPI.
OpenAPI 3 и AsyncAPI 2
Сравнение структуры OpenAPI 3 и AsyncAPI 2

Последней текущей версией AsyncAPI на сегодня является 2.6.0, а 3.0.0 пока еще находится в статусе пред-релиза. Хотя спецификация AsyncAPI не предполагает какой-либо топологии, архитектуры или шаблона программного обеспечения, механизм привязок позволяет предоставить более конкретную информацию о протоколе, обеспечивающем передачу сообщений. Познакомившись с базовыми понятиями AsyncAPI, составим собственную спецификацию на примере Apache Kafka.

Практический пример: как составить свою спецификацию

В качестве практического примера я возьму ранее реализованную систему из приложения-продюсера и приложения-потребителя, которые обмениваются данными через экземпляр Apache Kafka, развернутый в облачной serverless-платформе Upstash. Исходный код этих Python-приложений рассмотрен в предыдущей статье.

Приложение-продюсер отправляет в Kafka сообщения – обращения клиентов интернет-магазина с данными в JSON-структуре. Полезная нагрузка сообщения может содержать вопросы по работе магазина, оплате, доставке или скидкам. Также обращение клиента вместо вопроса может содержать заявку на покупка товара – от физлица (индивидуальная заявка) или от юрлица (корпоративная заявка). Распределение сообщений по разделам топика Kafka обусловлена бизнес-логикой: все вопросы должны попадать в раздел 0, заявки на покупку товара от корпоративных клиентов – в раздел 1, а от частных лиц – в раздел 2.

Таким образом, Kafka в терминологии AsyncAPI является протоколом, брокер этой распределенной платформы обмена сообщениями – сервером, а раздел топика – каналом. На основании постановки рассматриваемой задачи, исходного кода своих Python-приложений, настроек экземпляра Kafka и структуры AsyncAPI 2.6, я составила следующую YAML-спецификацию асинхронного обмена данными:

asyncapi: '2.6.0'
id: 'urn:my_async_producer_app:server'
info:
  title: Async API specification for Kafka
  version: '1.0.0'
  description: 'Спецификация асинхронного обмена данными через Kafka'

servers:
  dev:
    url: my-kafka.upstash.io:9092
    protocol: kafka-secure
    description: 'параметры брокера Kafka'
    security:
      - saslScram: [SCRAM-SHA-256]

defaultContentType: application/json

channels:
  publishing.{eventId}:
    description: приложение-продюсер
    parameters:
      eventId:
        $ref: '#/components/parameters/eventId'
    publish:
      summary: 'Генерация события, отправляемого с клиентского приложения'
      operationId: PublishNewEvent
      traits:
        - $ref: '#/components/operationTraits/kafka'
      message:
        oneOf:
          - $ref: '#/components/messages/question'
          - $ref: '#/components/messages/ind_app'
          - $ref: '#/components/messages/corp_app'
    bindings:
      kafka:
        key:
          type: string
          enum: ['question', 'ind_app', 'corp_app']
  
  questions:
    description: 'раздел для вопросов'
    subscribe:
      operationId: questions_performing
      traits:
        - $ref: '#/components/operationTraits/kafka'
      message:
        $ref: '#/components/messages/question'

  ind_apps:
    description: 'раздел для индивидуальных заявок'
    subscribe:
      operationId: ind_apps_performing
      traits:
        - $ref: '#/components/operationTraits/kafka'
      message:
        $ref: '#/components/messages/ind_app'

  corp_apps:
    description: 'раздел для корпоративных заявок'
    subscribe:
      operationId: corp_apps_performing
      traits:
        - $ref: '#/components/operationTraits/kafka'
      message:
        $ref: '#/components/messages/corp_app'

components:
  messages:
    question:
      name: question
      title: question
      summary: 'в раздел 0 попадают все вопросы'
      contentType: application/json
      traits:
        - $ref: '#/components/messageTraits/question_header'
      payload:
        $ref: "#/components/schemas/question"
    ind_app:
      name: ind_app
      title: ind_app
      summary: 'в раздел 1 попадают все индивидуальные заявки'
      contentType: application/json
      traits:
        - $ref: '#/components/messageTraits/ind_app_header'
      payload:
        $ref: "#/components/schemas/ind_app"
    corp_app:
      name: corp_app
      title: corp_app
      summary: 'в раздел 1 попадают корпоративные заявки'
      contentType: application/json
      traits:
        - $ref: '#/components/messageTraits/corp_app_header'
      payload:
        $ref: "#/components/schemas/corp_app"    

  schemas:
    question:
      type: object
      description: 'вопрос по работе магазина'
      properties:
        id:
          type: string
          description: 'q-x-xx-xx-xxxx'
        name:
          type: string
        subject:
          type: string
        content:
          type: string
        sentAt:
          $ref: "#/components/schemas/sentAt"
    ind_app:
      type: object
      description: 'индивидуальная заявка на покупку продукта'
      properties:
        id:
          type: string
          description: 'ia-x-xx-xxx-xxx'
        client_name:
          type: string
        subject:
          type: string
        content:
          type: object
          properties:
            product:
              type: string
              $ref: "#/components/schemas/products"
            quantity:
              type: integer
    corp_app:
      type: object
      description: 'корпоративная заявка на покупку продукта'
      properties:
        id:
          type: string
          description: 'ca-x-xx-xxx-xxx'
        company_name:
          type: string
        subject:
          type: string
        content:
          type: object
          properties:
            product:
              type: string
              $ref: "#/components/schemas/products"
            quantity:
              type: integer            
        sentAt:
          $ref: "#/components/schemas/sentAt"      
    sentAt:
      type: string
      format: date-time
      description: Date and time when the message was sent.
    products:
      type: string
      enum:
        - milk
        - water
        - cofee
        - tea
        - apple
        - orange
        - bread

  securitySchemes:
    saslScram:
      type: scramSha256
      description: протокол безопасности SASL_SSL, механизм безопасности SCRAM-SHA-256

  parameters:
    eventId:
      description: идентификатор события
      schema:
        type: string
        description: 'x-xx:xx-xx-xx-xxx'

  messageTraits:
    question_header:
      headers:
        type: object
        properties:            
          header:
            type: string
            enum:
              - payment
              - delivery
              - discount
              - vip
              - staff
        description: 'тема вопроса'     
    ind_app_header:
      headers:
        type: object
        properties:            
          header:
            type: string
            enum:
              - ind_app_buy
        description: 'заголовок индивидуальной заявки'                 
    corp_app_header:
      headers:
        type: object
        properties:            
          header:
            type: string
            enum:
              - corp_app_buy            
        description: 'заголовок корпоративной заявки'     			

  operationTraits:
    kafka:
      bindings:
        kafka:
          topic: 'inputs_topic'
          partitions: 3
          replicas: 3
          topicConfiguration:
          cleanup.policy: ["delete", "compact"]
          retention.ms: 604800000
          retention.bytes: 1000000000
          delete.retention.ms: 86400000
          max.message.bytes: 1048588
        bindingVersion: '0.4.0'

Для валидации этой спецификации и ее визуализации я использовала замечательный веб-инструмент AsyncAPI Studio, который позволяет даже без регистрации составить свой документ и проверить его на наличие ошибок. Для корректной спецификации редактор выводит UI, похожий на Swagger.

Редактор AsynсAPI Studio
Редактор AsynсAPI Studio

Визуализированная в UI спецификация очень удобна для тестирования: за счет полей, описывающих метаданные, например, summary или description, можно документировать структуры данных и особенности протокола, сервера и приложения, в т.ч. с примерами. Задаваемые через enum перечисления также отображаются и используются для автогенерации примеров.

Визуализация спецификации AsynсAPI
Визуализация спецификации AsynсAPI

Наконец, еще одной полезной функцией AsyncAPI Studio является визуализация потокового конвейера отправки сообщений от продюсера через сервер к каналам, на которые может подписаться потребитель. Примечательно, что это диаграмма строится автоматически, на основании заданной YAML-спецификации.

Потоковый конвейер на Apache Kafka, EDA Kafka
Визуализация потокового конвейера на Apache Kafka

Таким образом, личный опыт показал, что AsyncAPI Studio является удобным (но не единственным) инструментом для валидации и визуализации спецификации асинхронного обмена данными. А сама спецификация AsyncAPI – отличный способ документировать не только структуры данных в полезной нагрузке сообщения, но и параметры сервера, включая определение разделов и различных конфигураций брокера.

Про аналог подобной спецификации для событий происхождения данных читайте в нашей новой статье.

Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://www.asyncapi.com/docs/reference/specification/v2.6.0
  2. https://dalelane.co.uk/blog/?p=4219
  3. https://docs.confluent.io/cloud/current/stream-governance/async-api.html
  4. https://github.com/asyncapi/bindings/tree/master/kafka

 

Поиск по сайту