Как настроить source-коннектор Kafka Connect, чтобы повысить его пропускную способность

обучение Apache Kafka, курсы Apache Kafka, Apache Kafka Connect конфигурации настройки примеры, обучение большим данным, Apache Kafka для дата-инженеров и администраторов, Школа Больших Данных Учебный Центр Коммерсант

Компоненты платформы Kafka Connect и их настройки для повышения скорости и объема данных, считываемых из внешних источников и публикуемых в топике Kafka. Разбираем на примере JDBC-коннектора для реляционной базы данных.

Проблемы и возможности коннекторов Kafka Connect

Kafka Connect — это инструмент интеграции данных с открытым исходным кодом, который упрощает процесс потоковой передачи данных между Apache Kafka и другими системами. Kafka Connect имеет два типа коннекторов:

  • коннекторы источников (Source connector) позволяют считывать данные из различных источников и записывать их в топики Kafka;
  • коннекторы приемников (Sink connector) отправляют данные из топиков Kafka в другую конечную точку.

Реализации класса Connector не выполняют копирование данных сами: их конфигурация описывает набор копируемых данных, а класс Connector отвечает за разбиение этого задания на набор задач, которые можно передать рабочим процессам (worker) Kafka Connect. Подобно разделению коннекторов на 2 категории, задачи также делятся на SourceTask и SinkTask. Опционально реализация класса Connector может отслеживать изменения данных внешних систем и запрашивать реконфигурацию задачи. Имея в наличии данные для копирования, каждая задача должна скопировать свое подмножество данных в Kafka или из Kafka. Данные, которые копирует коннектор, должны быть представлены в виде секционированного потока , аналогично модели топика Kafka, где каждый раздел представляет собой упорядоченную последовательность записей со смещениями. Каждой задаче назначается подмножество разделов для обработки. Иногда это сопоставление очевидно: каждый файл в наборе файлов журналов можно считать разделом, каждая строка в файле — это запись, а смещения — это просто позиция в файле.

В других случаях однозначно сопоставить данные с такой моделью не получается. Например, JDBC-коннектор может сопоставить каждую таблицу реляционной базы данных с разделом, но не смещение. Для этого можно использовать столбец метки времени, чтобы создавать запросы для постепенного возврата новых данных, а в качестве смещения использовать последнюю запрошенную метку времени.

Kafka Connect предназначен для определения заданий массового копирования данных, таких как копирование всей базы данных, а не создания множества заданий для копирования каждой таблицы по отдельности. Одним из последствий такой конструкции является то, что набор входных и выходных разделов разъема может меняться со временем. Поэтому source-коннекторам необходимо отслеживать изменения исходной системы, например, добавление или удаление таблиц в базе данных. Когда source-коннектор получает изменения, он уведомляет платформу о необходимости реконфигурации с использованием объекта ConnectorContext.

При этом платформа оперативно запрашивает новую информацию о конфигурации и обновляет задачи, позволяя им корректно зафиксировать прогресс перед их перенастройкой. Пока этот мониторинг оставлен на усмотрение реализации коннектора. Если для выполнения этого мониторинга нужен дополнительный поток, коннектор должен выделить его сам. Хотя в идеальном случае этот код мониторинга изменений должен быть изолирован от системы,  изменения могут повлиять на задачи. Обычно это случается, когда один из входных разделов уничтожается в системе ввода, например, при удалении таблицы из базы данных. Чтобы предупредить сбой потокового приложения, необходимо перехватывать и обрабатывать исключения, используя конструкцию try-catch или аналогичную.

Sink-коннектор обрабатывает только добавление разделов, что может привести к появлению новых записей в их выходных данных. Платформа Kafka Connect управляет любыми изменениями входных данных Kafka, например, когда набор топиков с входными данными изменяется из-за подписки на регулярное выражение. Задача приема может получить дополнительные разделы входных данных, что потребует создания новых ресурсов в системе-приемнике, например, новой таблицы в базе данных. Одной из наиболее сложных проблем с

Самой сложной ситуацией в этих случаях могут быть конфликты между несколькими пользователями, когда SinkTask впервые видит новый входной раздел, и одновременно пытается создать новый ресурс. Сам по себе Sink-коннектор не требует разработки дополнительного кода кода для обработки динамического набора разделов.

Kafka Connect позволяет проверять конфигурации соединителя перед отправкой коннектора на выполнение, а также может предоставлять отзывы об ошибках и рекомендуемых значениях. Для этого разработчику коннектора необходимо предоставить платформе реализацию с определением конфигурации.

Вспомнив, что такое коннектор Kafka Connect, далее рассмотрим, как его настроить, чтобы повысить пропускную способность платформы.

Настройка source-коннекторов

Рассмотрим пример JDBC-коннектора источника, который загружает данные из реляционной базы в топик Kafka с помощью коннектора, конвертера, преобразования и продюсера:

  • коннектор возвращает количество записей, полученных из источника;
  • конвертер преобразует записи в выбранный тип данных, например, AVRO. Преобразование записи в другой формат данных (AVRO/Protobuf/JSON) всегда занимает определенное время.
  • затем к записи применяются простые преобразования сообщений (SMT, Simple Message Transform), которые были определены в конфигурациях коннектора. Хотя обычно преобразование происходит довольно быстро, это занимает определенное время. Чтобы избежать задержки, можно не применять SMT-преобразования.
  • продюсер отправляет записи в топик Kafka.
Kafka Connect, коннекторы Apache Kafka
Принцип работы Kafka Connect

Таким образом, для повышения общей пропускной способности всей потоковой системы в Kafka Connect можно настроить следующие компоненты:

  • конфигурации коннектора;
  • конфигурации продюсера, например, batch.size и/или linger.ms, о чем мы писали здесь.

Настройка коннектора зависит от того, какие конфигурации он предоставляет. Если коннектор не предоставляет никаких конфигураций для получения дополнительных данных из конечной точки, то его пропускную способность невозможно изменить. Например, Source-коннектор Confluent JDBC имеет следующие конфигурации, предоставляемые коннектором:

  • bmax.rows – максимальное количество строк, включаемых в один пакет при опросе новых данных;
  • interval.ms – частота (в миллисекундах) опроса новых данных по каждой таблице.

Можно настроить коннектор на возвращение большего количества записей продюсеру, увеличив значение параметра batch.max.rows. Максимальное количество записей, которое коннектор может получить по каждому запросу, равно значению этой конфигурации. Коннектор можно настроить только в соответствии с его предоставленными параметрами конфигурации.

Помимо batch.size и linger.ms для продюсера, который будет записывать потребленные из внешнего источника данные в Kafka, можно настроить buffer.memory (общее количество байтов памяти для буферизации записей, ожидающих отправки на сервер, по умолчанию 33554432) и compression.type (тип сжатия топика).

Конфигурация buffer.memory представляет общее количество байтов памяти, которые продюсер может использовать для буферизации записей, ожидающих отправки на сервер. Этот параметр должен примерно соответствовать общему объему памяти, который будет использовать продюсер, но он не является жестко привязанным, поскольку не вся память, используемая продюсером, используется для буферизации. Например, продюсер временно не может отправлять сообщения брокеру из-за сбоя  Kafka. Продюсер начнет накапливать пакеты сообщений в буферной памяти (по умолчанию 32 МБ). Как только буфер заполнится, он будет ожидать max.block.ms (по умолчанию 60 000 мс), чтобы очистить буфер. Если буфер не очищается, продюсер выдаст исключение. Если для параметра buffer.memory установлено слишком малое значение, он заполнится мгновенно. Впрочем, слишком большое значение buffer.memory тоже может вызвать исключение нехватки памяти (OOM, Out Of Memory), если она будет израсходована.

Конфигурация compress.type определяет тип сжатия сообщений до их создания. Хотя сжатие отлично уменьшает размер сообщений, оно увеличивает время их доставки из-за предварительных вычислений перед отправкой данных в Kafka.

Как понять, что нужна настройка пропускной способности

Понять, что пропускную способность Kafka Connect надо увеличивать, помогут следующие метрики мониторинга JMX (Java Management Extensions, управленческие расширения Java). Это технология Java для контроля и управления приложениями, системными объектами, устройствами и компьютерными сетями. Метрики JMX для мониторинга пропускной способности Kafka Connect можно разделить на три категории: метрики коннектора, метрики брокера и метрики производителя.

На уровне коннектора надо следить за следующими метриками:

  • source-record-poll-rate — до применения преобразований это среднее количество записей в секунду, создаваемых или опрашиваемых задачей, принадлежащей именованному исходному соединителю в рабочем процессе. Оно сообщает среднее количество записей, созданных в секунду до преобразования.
  • poll-batch-avg-time-ms — среднее время в миллисекундах, затраченное этой задачей на опрос пакета исходных записей. Метрика, которая может сказать вам, сколько времени потребуется, чтобы запись была возвращена из конечной точки.
  • source-record-write-rate — после применения преобразований это среднее число записей в секунду, выводимых в результате преобразований и записываемых в Kafka для задачи, принадлежащей именованному source-коннектору в рабочем процессе (исключая любые записи, отфильтрованные преобразованиями). Полезно при применении преобразований для определения влияния преобразований сообщений о подключении.

На уровне брокера полезно отслеживать  kafka.server:type=BrokerTopicMetrics, name=BytesInPerSec — скорость поступления байтов от клиентов.

А на уровне продюсера необходимо обратить внимание на следующие:

  • record-size-avg — средний размер записи, метрика для вычисления  size;
  • batch-size-avg — среднее количество байтов, отправленных в пакет за запрос. Проверяет, что размер пакета (batch.size) был увеличен с помощью метрики, и продюсер получает ожидаемое количество записей на запрос.
  • records-per-request-avg — среднее количество записей на запрос. Проверяет, сколько записей отправляется в каждом пакете продюсера.
  • record-send-rate — среднее количество записей, отправляемых в секунду в топик.

Кроме отслеживания этих метрик, понять, что коннектор является узким местом, помогает проверка того, что пропускная способность не увеличивается, даже если конфигурации продюсера были изменены. Или скорость отправки продюсера остается неизменной и/или не увеличивается при его изменении.

Скорость отправки метрики JMX говорит, что в данный момент делает продюсер. Если скорость отправки не увеличивается или остается относительно неизменной, то это индикатор того, что продюсер ожидает записи от коннектора. То есть коннектор является узким местом, поскольку продюсер находится в режиме ожидания.

Если среднее количество записей на запрос не достигает количества записей, для которого установлено значение batch.max.rows, то значение records-per-request-avg установлено неоптимально, и продюсеру нужно ждать, чтобы заполнить пакет. В этом случае необходимо увеличить значение linger.ms.

Для коннектора Kafka Connect есть только две конфигурации, которые помогут увеличить его пропускную способность:

  • max.rows– максимальное количество строк, которые можно включить в один пакет при опросе новых данных. Этот параметр можно использовать для ограничения объема данных, буферизуемых внутри соединителя.
  • interval.ms – частота (в миллисекундах) для опроса новых данных в каждой таблице.

Методология устранения неполадок остается прежней: необходимо определить, является ли коннектор или продюсер узким местом. Далее следует настроить продюсер так, чтобы пакеты для публикации сообщений в Kafka были заполнены и настроить коннектор для более быстрой отправки большего количества сообщений продюсер.

Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://docs.confluent.io/platform/current/connect/devguide.html
  2. https://www.confluent.io/blog/how-to-increase-throughput-on-kafka-connect-source-connectors/
Контакты авторизированного учебного центра
«Школа Больших Данных»
Адрес:
127576, г. Москва, м. Алтуфьево, Илимская ул. 5 корпус 2, офис 319, БЦ «Бизнес-Депо»
Часы работы:
Понедельник - Пятница: 09.00 – 18.00
Остались вопросы?
Звоните нам +7 (495) 414-11-21 или отправьте сообщение через контактную форму. Также вы можете найти ответы на ваши вопросы в нашем сборнике часто задаваемых вопросов.
Оставьте сообщение, и мы перезвоним вам в течение рабочего дня
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Или напишите нам в соц.сетях
Поиск по сайту