Настройка JDBC-коннектора Kafka от Confluent: практический пример

Настройка JDBC-коннектора Kafka от Confluent: практический пример

    Сегодня я на практическом примере покажу тонкости настройки конфигураций JDBC-коннектора источника, передающий новые записи из таблицы PostgreSQL в топик Apache Kafka.

    Настройка JDBC-коннектора и отправка в Kafka Connect

    Как я упоминала вчера, помимо CDC-коннектор Debezium, передать данные из реляционной базы данных PostgreSQL в Apache Kafka, также есть JDBC-коннектор от Confluent: Kafka Connect JDBC Source. Он основан на JDBC-драйвере и периодических SQL-запросах к таблицам, указанным в конфигурации коннектора. Этот JDBC-коннектор настраивается намного проще, чем  CDC-коннектор Debezium, не требуя изменений в конфигурации базы данных, таких как включение логической репликации и создания слота репликации, что я показывала здесь.

    Сперва создадим конфигурацию коннектора. В качестве примера я, как обычно, возьму интернет-магазин. Предположим, необходимо передавать изменения данных из таблицы с продуктами (product) PostgreSQL в Kafka.

    Схема данных PostgreSQL
    Схема данных PostgreSQL

    В отличие от CDC-коннектора, режим обновлений в JDBC-коннекторе гораздо менее гибок: параметр mode конфигурации коннектора может принимать только одно из следующих значений:

    • bulk –массовая загрузка данных всей таблицы каждый раз при ее опросе;
    • incrementing — инкрементный столбец, который поможет обнаружить только новые строки. Этот режим не обнаружит изменения или удаления существующих строк.
    • timestamp – столбец с отметкой времени для обнаружения новых и измененных строк. Предполагается, что этот столбец в базе данных обновляется при каждой записи, его значения монотонно увеличиваются, но не обязательно уникальны.
    • timestamp+incrementing – комбинациях двух столбцов, отметки времени, который обнаруживает новые и измененные строки, и инкремента, который предоставляет глобальный уникальный идентификатор для обновлений, чтобы каждой строке можно было назначить уникальное смещение потока.

    В моей таблице product отсутствует столбец отметки времени, поэтому параметр mode конфигурации коннектора установлю в значение incrementing. Полная конфигурация коннектора выглядит так:

    {
      "name": "pg-jdbc-source-connector",
      "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "tasks.max": "1",
        "connection.url": "jdbc:postgresql://my-db-host:port/database",
        "connection.user": "my-user",
        "connection.password": "my-pass",
        "table.whitelist": "ishop.product",
        "mode": "incrementing",
        "incrementing.column.name": "id",
        "topic.prefix": "ishop-product-",
        "poll.interval.ms": "10000",
        "numeric.mapping": "best_fit",
        "dialect.name": "PostgreSqlDatabaseDialect"
      }
    }

    В этой конфигурации заданы следующие параметры:

    • name — имя коннектора pg-jdbc-source-connector;
    • class —класс коннектора io.confluent.connect.jdbc.JdbcSourceConnector, который будет использоваться для подключения к источнику данных через JDBC;
    • max – максимальное количество задач, которые могут выполняться параллельно. В данном случае это одно подключение.
    • url — URL для подключения к базе данных PostgreSQL;
    • user – имя пользователя для подключения к базе данных;
    • password — пароль пользователя для подключения к базе данных;
    • whitelist — таблицы, которые будут использоваться коннектором, т.е. таблица product в схеме ishop.
    • mode — режим извлечения данных (incrementing), инкрементальное извлечение данных на основе указанного столбца;
    • column.name – название столбца (id), который будет использоваться для инкрементального извлечения данных. Обычно это первичный ключ или автоинкрементное поле. В моем случае это первичный ключ, который автоматически инкрементируется при вставке новой записи в таблицу.
    • prefix – префикс (ishop-product-), который будет добавлен к имени Kafka-топика, куда будут отправляться данные из таблицы;
    • interval.ms — интервал в миллисекундах между опросами базы данных на наличие новых данных (в данном случае 10000 равно 10 секунд);
    • mapping — параметр преобразования числовых типов данных. Значение best_fit означает, что коннектор будет пытаться подобрать наиболее подходящий тип данных. Вообще параметр numeric.mapping может принимать значения none, precision_only, best_fit или best_fit_eager_double. Значение none устанавливается, если все столбцы NUMERIC должны быть представлены логическим типом DECIMAL Connect. Значение best_fit указывает, что столбцы NUMERIC должны быть преобразованы в INT8, INT16, INT32, INT64 или FLOAT64 на основе точности и масштаба столбца. Этот параметр может по-прежнему представлять значение NUMERIC как Connect DECIMAL, если его нельзя преобразовать в собственный тип без потери точности. Например, тип NUMERIC(20) с точностью 20 не сможет поместиться в собственный INT64 без переполнения и, таким образом, будет сохранен как DECIMAL. Значение best_fit_eager_double используется, когда надо всегда приводить числовые столбцы к типу данных Connect FLOAT64, несмотря на возможную потерю точности. Наконец, precision_only используется для сопоставления числовых столбцов только на основе точности столбца, когда масштаб столбца равен 0. По умолчанию используется none, что может привести к проблемам сериализации в AVRO, поскольку тип DECIMAL Connect сопоставляется с его двоичным представлением. Рекомендуется использовать best_fit, который сопоставляет данные с наиболее подходящим примитивным типом.
    • name — диалекта базы данных (для PostgreSQL это PostgreSqlDatabaseDialect).

    Чтобы зарегистрировать коннектор в Kafka Connect, надо оправить его HTTP-запросом POST в веб-сервер Kafka Connect. Поскольку платформа потоковой передачи у меня развернута в Docker на локальном хосте, в моем случае этот запрос будет выглядеть так:

    curl -X POST -H "Content-Type: application/json" --data @pg-jdbc-source-connector.json http://localhost:8083/connectors
    Отправка конфигурации коннектора в Kafka Connect
    Отправка конфигурации коннектора в Kafka Connect

    Это команда выполняется в командной строке, запуская утилиту curl с параметрами:

    • -H «Content-Type: application/json», который добавляет HTTP-заголовок к запросу, указывая, JSON-формат передаваемых данных, чтобы сервер верно мог интерпретировать тело запроса;
    • —data @pg-jdbc-source-connector.json — файл, который содержит данные JSON, отправляемые в теле POST-запроса. Символ @ перед именем файла указывает curl на то, что данные нужно прочитать из файла pg-jdbc-source-connector.json.

    Если ошибок нет, коннектор после отправки сразу запустится, что сразу видно в GUI.

    Запущенный коннектор в Kafka Connect
    Запущенный коннектор в Kafka Connect

    Передача новых данных из PostgreSQL с помощью коннектора

    Как я уже отметила, JDBC-коннектор не требует никаких настроек на стороне источника данных, т.е. базы. После запуска коннектора автоматически создается топик Kafka, куда будут отправляться события вставки новых данных. Однако, из-за того, что поле цена (price) у меня типа numeric, изначально было некорректное сопоставление, когда параметр коннектора numeric.mapping был установлен в значение none.

    Некорректное преобразование типа данных numeric из PostgreSQL в Kafka Connect
    Некорректное преобразование типа данных numeric из PostgreSQL в Kafka Connect

    Исправить это оказалось не так просто. Оказалось, что помимо автоматического создания топика, Kafka Connect также создает в реестре схем схему данных, которая описывает поля публикуемого в топике сообщения и их типы.

    Реестр схем Kafka
    Реестр схем Kafka

    Эту схему данных в формате AVRO я отредактировала вручную, указав следующие параметры:

    {
      "type": "record",
      "name": "product",
      "fields": [
        {
          "name": "id",
          "type": "int"
        },
        {
          "name": "name",
          "type": "string"
        },
        {
          "name": "provider",
          "type": "int"
        },
        {
          "name": "price",
          "type": {
            "type": "bytes",
            "logicalType": "decimal",
            "precision": 10,
            "scale": 2
          }
        },
        {
          "name": "quantity",
          "type": "int"
        }
      ],
      "connect.name": "product"
    }

    Изменения заняли несколько итераций, поскольку при первых запусках коннектора тип поля price был строковым, что не конвертировалось в вещественный. При этом пришлось изменить совместимость схемы в Schema Registry, временно установив ее на NONE, чтобы зарегистрировать новую версию. Это позволило сохранить старые данные, записывая новые данные с использованием новой схемы.

    После добавления новой записи в таблицу PostgreSQL, она попадает в топик Kafka.

    Добавление новой записи в PostgreSQL
    Добавление новой записи в PostgreSQL

    Однако, несмотря на верно указанное преобразование типов данных, вещественное значение преобразуется в целочисленное. Исправить это мне пока не удалось.

    Новое событие в топике Kafka
    Новое событие в топике Kafka

    Подводя итоги работы с JDBC-коннектором, отмечу, что он понравился мне меньше, чем CDC-коннектор Debezium, поскольку имеет больше ограничений и меньше возможностей.

    Освойте администрирование и эксплуатацию Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

    Источники

    1. https://docs.confluent.io/kafka-connectors/jdbc/current/source-connector/source_config_options/
    2. https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/#bytes-decimals-numerics-and-oh-my
    [elementor-template id="13619"]