Настройка JDBC-коннектора Kafka от Confluent: практический пример

Kafka курсы примеры обучение, Kafka для разработчика, Kafka примеры курсы обучение дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Сегодня я на практическом примере покажу тонкости настройки конфигураций JDBC-коннектора источника, передающий новые записи из таблицы PostgreSQL в топик Apache Kafka.

Настройка JDBC-коннектора и отправка в Kafka Connect

Как я упоминала вчера, помимо CDC-коннектор Debezium, передать данные из реляционной базы данных PostgreSQL в Apache Kafka, также есть JDBC-коннектор от Confluent: Kafka Connect JDBC Source. Он основан на JDBC-драйвере и периодических SQL-запросах к таблицам, указанным в конфигурации коннектора. Этот JDBC-коннектор настраивается намного проще, чем  CDC-коннектор Debezium, не требуя изменений в конфигурации базы данных, таких как включение логической репликации и создания слота репликации, что я показывала здесь.

Сперва создадим конфигурацию коннектора. В качестве примера я, как обычно, возьму интернет-магазин. Предположим, необходимо передавать изменения данных из таблицы с продуктами (product) PostgreSQL в Kafka.

Схема данных PostgreSQL
Схема данных PostgreSQL

В отличие от CDC-коннектора, режим обновлений в JDBC-коннекторе гораздо менее гибок: параметр mode конфигурации коннектора может принимать только одно из следующих значений:

  • bulk –массовая загрузка данных всей таблицы каждый раз при ее опросе;
  • incrementing — инкрементный столбец, который поможет обнаружить только новые строки. Этот режим не обнаружит изменения или удаления существующих строк.
  • timestamp – столбец с отметкой времени для обнаружения новых и измененных строк. Предполагается, что этот столбец в базе данных обновляется при каждой записи, его значения монотонно увеличиваются, но не обязательно уникальны.
  • timestamp+incrementing – комбинациях двух столбцов, отметки времени, который обнаруживает новые и измененные строки, и инкремента, который предоставляет глобальный уникальный идентификатор для обновлений, чтобы каждой строке можно было назначить уникальное смещение потока.

В моей таблице product отсутствует столбец отметки времени, поэтому параметр mode конфигурации коннектора установлю в значение incrementing. Полная конфигурация коннектора выглядит так:

{
  "name": "pg-jdbc-source-connector",
  "config": {
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "tasks.max": "1",
    "connection.url": "jdbc:postgresql://my-db-host:port/database",
    "connection.user": "my-user",
    "connection.password": "my-pass",
    "table.whitelist": "ishop.product",
    "mode": "incrementing",
    "incrementing.column.name": "id",
    "topic.prefix": "ishop-product-",
    "poll.interval.ms": "10000",
    "numeric.mapping": "best_fit",
    "dialect.name": "PostgreSqlDatabaseDialect"
  }
}

В этой конфигурации заданы следующие параметры:

  • name — имя коннектора pg-jdbc-source-connector;
  • class —класс коннектора io.confluent.connect.jdbc.JdbcSourceConnector, который будет использоваться для подключения к источнику данных через JDBC;
  • max – максимальное количество задач, которые могут выполняться параллельно. В данном случае это одно подключение.
  • url — URL для подключения к базе данных PostgreSQL;
  • user – имя пользователя для подключения к базе данных;
  • password — пароль пользователя для подключения к базе данных;
  • whitelist — таблицы, которые будут использоваться коннектором, т.е. таблица product в схеме ishop.
  • mode — режим извлечения данных (incrementing), инкрементальное извлечение данных на основе указанного столбца;
  • column.name – название столбца (id), который будет использоваться для инкрементального извлечения данных. Обычно это первичный ключ или автоинкрементное поле. В моем случае это первичный ключ, который автоматически инкрементируется при вставке новой записи в таблицу.
  • prefix – префикс (ishop-product-), который будет добавлен к имени Kafka-топика, куда будут отправляться данные из таблицы;
  • interval.ms — интервал в миллисекундах между опросами базы данных на наличие новых данных (в данном случае 10000 равно 10 секунд);
  • mapping — параметр преобразования числовых типов данных. Значение best_fit означает, что коннектор будет пытаться подобрать наиболее подходящий тип данных. Вообще параметр numeric.mapping может принимать значения none, precision_only, best_fit или best_fit_eager_double. Значение none устанавливается, если все столбцы NUMERIC должны быть представлены логическим типом DECIMAL Connect. Значение best_fit указывает, что столбцы NUMERIC должны быть преобразованы в INT8, INT16, INT32, INT64 или FLOAT64 на основе точности и масштаба столбца. Этот параметр может по-прежнему представлять значение NUMERIC как Connect DECIMAL, если его нельзя преобразовать в собственный тип без потери точности. Например, тип NUMERIC(20) с точностью 20 не сможет поместиться в собственный INT64 без переполнения и, таким образом, будет сохранен как DECIMAL. Значение best_fit_eager_double используется, когда надо всегда приводить числовые столбцы к типу данных Connect FLOAT64, несмотря на возможную потерю точности. Наконец, precision_only используется для сопоставления числовых столбцов только на основе точности столбца, когда масштаб столбца равен 0. По умолчанию используется none, что может привести к проблемам сериализации в AVRO, поскольку тип DECIMAL Connect сопоставляется с его двоичным представлением. Рекомендуется использовать best_fit, который сопоставляет данные с наиболее подходящим примитивным типом.
  • name — диалекта базы данных (для PostgreSQL это PostgreSqlDatabaseDialect).

Чтобы зарегистрировать коннектор в Kafka Connect, надо оправить его HTTP-запросом POST в веб-сервер Kafka Connect. Поскольку платформа потоковой передачи у меня развернута в Docker на локальном хосте, в моем случае этот запрос будет выглядеть так:

curl -X POST -H "Content-Type: application/json" --data @pg-jdbc-source-connector.json http://localhost:8083/connectors
Отправка конфигурации коннектора в Kafka Connect
Отправка конфигурации коннектора в Kafka Connect

Это команда выполняется в командной строке, запуская утилиту curl с параметрами:

  • -H «Content-Type: application/json», который добавляет HTTP-заголовок к запросу, указывая, JSON-формат передаваемых данных, чтобы сервер верно мог интерпретировать тело запроса;
  • —data @pg-jdbc-source-connector.json — файл, который содержит данные JSON, отправляемые в теле POST-запроса. Символ @ перед именем файла указывает curl на то, что данные нужно прочитать из файла pg-jdbc-source-connector.json.

Если ошибок нет, коннектор после отправки сразу запустится, что сразу видно в GUI.

Запущенный коннектор в Kafka Connect
Запущенный коннектор в Kafka Connect

Передача новых данных из PostgreSQL с помощью коннектора

Как я уже отметила, JDBC-коннектор не требует никаких настроек на стороне источника данных, т.е. базы. После запуска коннектора автоматически создается топик Kafka, куда будут отправляться события вставки новых данных. Однако, из-за того, что поле цена (price) у меня типа numeric, изначально было некорректное сопоставление, когда параметр коннектора numeric.mapping был установлен в значение none.

Некорректное преобразование типа данных numeric из PostgreSQL в Kafka Connect
Некорректное преобразование типа данных numeric из PostgreSQL в Kafka Connect

Исправить это оказалось не так просто. Оказалось, что помимо автоматического создания топика, Kafka Connect также создает в реестре схем схему данных, которая описывает поля публикуемого в топике сообщения и их типы.

Реестр схем Kafka
Реестр схем Kafka

Эту схему данных в формате AVRO я отредактировала вручную, указав следующие параметры:

{
  "type": "record",
  "name": "product",
  "fields": [
    {
      "name": "id",
      "type": "int"
    },
    {
      "name": "name",
      "type": "string"
    },
    {
      "name": "provider",
      "type": "int"
    },
    {
      "name": "price",
      "type": {
        "type": "bytes",
        "logicalType": "decimal",
        "precision": 10,
        "scale": 2
      }
    },
    {
      "name": "quantity",
      "type": "int"
    }
  ],
  "connect.name": "product"
}

Изменения заняли несколько итераций, поскольку при первых запусках коннектора тип поля price был строковым, что не конвертировалось в вещественный. При этом пришлось изменить совместимость схемы в Schema Registry, временно установив ее на NONE, чтобы зарегистрировать новую версию. Это позволило сохранить старые данные, записывая новые данные с использованием новой схемы.

После добавления новой записи в таблицу PostgreSQL, она попадает в топик Kafka.

Добавление новой записи в PostgreSQL
Добавление новой записи в PostgreSQL

Однако, несмотря на верно указанное преобразование типов данных, вещественное значение преобразуется в целочисленное. Исправить это мне пока не удалось.

Новое событие в топике Kafka
Новое событие в топике Kafka

Подводя итоги работы с JDBC-коннектором, отмечу, что он понравился мне меньше, чем CDC-коннектор Debezium, поскольку имеет больше ограничений и меньше возможностей.

Освойте администрирование и эксплуатацию Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options.html
  2. https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/#bytes-decimals-numerics-and-oh-my
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту