Как клиенты Apache Kafka общаются с брокерами: протоколы и интерфейсы

разработчик Apache Kafka конфигурации примеры курсы обучение, обучение Apache Kafka, курсы Apache Kafka, Apache Kafka Streams курсы примеры обучение, потоковая и пакетная обработка данных примеры, обучение большим данным, Apache Kafka для дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Чтобы сделать наши курсы по Apache Kafka еще более полезными, сегодня рассмотрим, какие интерфейсы и протоколы для связи клиента с брокером использует эта платформа потоковой передачи событий. А также рассмотрим, что обеспечивает двунаправленную совместимость API.

Протоколы и интерфейсы Apache Kafka для общения клиентов с брокерами

Apache Kafka использует бинарный протокол поверх TCP, который имеет около 70 API. Он определяет все API как пары сообщений «запрос-ответ», которые ограничены по размеру и состоят из примитивных типов (boolean, int, float, string, bytes и пр.). Эти пары сообщений схематизированы и имеют версии с использованием управления версиями API, чтобы гарантировать возможность совместимость и взаимодействия клиентов с брокерами. Подробнее о том, как клиентское приложение-продюсер публикует сообщения в топик Kafka, читайте в нашей новой статье.

Клиент инициирует подключение к сокету – виртуальной конструкции из IP-адреса и номера порта, которая позволяет организовать обмен данными между разными процессами и системами, а затем записывает последовательность сообщений-запросов и считывает соответствующее ответное сообщение. При подключении или отключении не требуется рукопожатие (handshake) — обмен данными между клиентом и брокером в рамках установления соединения. TCP позволяет поддерживать постоянные соединения, используемые для многих запросов, чтобы амортизировать стоимость этого рукопожатия.

Однако, клиенту требуется поддерживать соединение с несколькими брокерами, поскольку данные в кластере Apache Kafka распределены по нескольким узлам, и клиентам нужно будет взаимодействовать с каждым сервером, на котором находятся данные. Но при этом нет необходимости поддерживать пул соединений, т.е. несколько соединений с одним брокером из одного экземпляра клиента.

Сервер гарантирует, что при одном TCP-соединении запросы будут обрабатываться в том порядке, в котором они были отправлены, и ответы будут возвращаться в том же порядке. Обработка запросов брокера допускает только один запрос в пути для каждого соединения, чтобы гарантировать этот порядок. Клиенты могут использовать асинхронные вызовы, т.е. неблокирующий ввод-вывод для реализации конвейерной обработки запросов и достижения более высокой пропускной способности. Это означает, что клиенты могут отправлять запросы, даже в период ожидания ответов на предыдущие запросы, поскольку невыполненные запросы буферизуются в базовом буфере сокета операционной системы. Все запросы инициируются клиентом и приводят к соответствующему ответному сообщению от сервера, если не указано иное. Сервер имеет настраиваемый максимальный предел размера запроса, и любой запрос, превышающий это ограничение, приведет к отключению сокета.

Двунаправленная совместимость и средства ее реализации

Начиная с версии 0.10.0.0, клиенты и брокеры Kafka изначально совместимы в двух направлениях, т.е. новые клиенты могут общаться со старыми брокерами, а старые клиенты могут общаться с новыми брокерами. А, поскольку Kafka по своей сути является распределенной системой, клиентам нужно поддерживать соединение с несколькими брокерами, чтобы направлять свои запросы на производство или выборку соответствующему брокеру в зависимости от топика или разделов, в которые они отправляют или из которых получают данные. Чтобы работать с несколькими версиями брокера, клиенты узнают, какие версии различных API поддерживает брокер, а затем используют в своих запросах самую старшую версию из них. Список поддерживаемых версий API предоставляется клиентам брокерами через ApiVersionsRequest.

Для обеспечения двунаправленной и совместимой связи с брокерами клиенты выполняют последовательность шагов:

  • Определение последней версии API, поддерживаемой как клиентом, так и брокером. После установления соединения с брокером и перед отправкой любого запроса клиент отправляет ApiVersionsRequest брокеру. В ответ брокер возвращает полный список поддерживаемых ApiKeys и версий. Клиент выбирает самую старшую версию API, поддерживаемую как им самим, так и брокером. Если такой версии не существует, клиент сообщает об ошибке.
  • Получение метаданных кластера. Клиент (Producer, Consumer или AdminClient) отправляет MetadataRequest брокеру через настроенную конечную точку сервера начальной загрузки Kafka. Клиент должен использовать конечную точку подсистемы балансировки нагрузки кластера, где она доступна, или список статически настроенных хостов Kafka в качестве конечной точки сервера начальной загрузки. Когда любой из посредников получает MetadataRequest, он возвращает метаданные кластера, описывающие текущее состояние кластера, топики с их разделами, лидеров для этих разделов топика, а также информацию о хосте и порте для посредников. Получив метаданные, клиент кэширует их и не запрашивает повторно обновление из кластера, а время от времени обновляет его с интервалом max.age.ms или до тех пор, пока не получит ошибку при обработке запроса, например, NotLeaderForPartition, которая указывает, что кэшированные метаданные устарели. Если это произошло, клиент снова обновляет свои метаданные, отправляя запрос MetadataRequest, а затем опять пытается выполнить неудачный запрос.
  • Обновление метаданных кластера. Все брокеры в кластере Kafka могут отвечать на запрос MetadataRequest, поскольку все они поддерживают локальный кеш метаданных кластера. Брокер, назначенный контроллером, отвечает за создание метаданных кластера, а также за инициализацию и обновление кэша метаданных других брокеров через UpdateMetadataRequest. Контроллер отправляет UpdateMetadataRequest всем брокерам при добавлении нового брокера, удалении или модификации существующего, выборе нового лидера раздела, уведомлении об изменении синхронизации реплик.

Перед отправкой каждого запроса клиент отправляет ключ API и версию API. Эти два 16-битных числа, взятые вместе, однозначно идентифицируют схему следующего сообщения. При общении с конкретным брокером данный клиент должен использовать самую старшую версию API, поддерживаемую обоими, и указывать эту версию в своих запросах. Сервер будет отклонять запросы с версией, которую он не поддерживает, и всегда будет отвечать клиенту точно в том формате протокола, который он ожидает, исходя из версии, которую он включил в свой запрос.

Чтобы повысить производительность, API-интерфейсы объединяют несколько сообщений в один пакет, сокращая накладные расходы на передачу метаданных, о чем мы писали здесь. Умный клиент может использовать это и поддерживать асинхронный режим, объединяя сообщения и отправляя их большими пакетами. Разрешена группировка по нескольким топикам и разделам, поэтому запрос на создание может содержать данные для добавления ко многим разделам, а запрос на выборку может извлекать данные из многих разделов одновременно. Впрочем, разработчик клиента может отправлять сообщения по одному.

При аутентификации SASL используется следующая последовательность:

  • Клиент может отправить Kafka запрос ApiVersionsRequest для получения диапазонов версий запросов, поддерживаемых брокером;
  • Запрос SaslHandshakeRequest, содержащий механизм SASL для аутентификации, отправляется клиентом в Kafka. Если запрошенный механизм не включен на сервере, сервер отвечает списком поддерживаемых механизмов и закрывает клиентское соединение. Если этот механизм включен на сервере, сервер отправляет успешный ответ и продолжает аутентификацию SASL.
  • Далее выполняется фактическая аутентификация SASL. Если версия SaslHandshakeRequest v0, ряд токенов клиента и сервера SASL, соответствуют механизму, это отправляется в виде непрозрачных пакетов без упаковки сообщений в заголовки протокола Kafka. Для версии SaslHandshakeRequest v1 используется запрос/ответ SaslAuthenticate, где фактические токены SASL заключены в протокол Kafka. Код ошибки в окончательном сообщении от брокера укажет, прошла ли аутентификация успешно или нет.
  • Если аутентификация прошла успешно, последующие пакеты обрабатываются как запросы API Kafka. Иначе клиентское соединение закрывается.
  • Для взаимодействия с клиентами версии 0.9.0.x первый пакет, полученный сервером, обрабатывается как клиентский токен SASL/GSSAPI, если он не является действительным запросом Kafka. Начиная с этого пакета выполняется аутентификация SASL/GSSAPI выполняется.

Больше подробностей про администрирование и эксплуатацию Apache Kafka в системах аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://kafka.apache.org/protocol
  2. https://medium.com/@darefamuyiwa/kafka-bidirectional-client-broker-compatibility-21c1b26e9d61
Поиск по сайту