Содержание
Сегодня я на практическом примере покажу тонкости настройки конфигураций JDBC-коннектора источника, передающий новые записи из таблицы PostgreSQL в топик Apache Kafka.
Настройка JDBC-коннектора и отправка в Kafka Connect
Как я упоминала вчера, помимо CDC-коннектор Debezium, передать данные из реляционной базы данных PostgreSQL в Apache Kafka, также есть JDBC-коннектор от Confluent: Kafka Connect JDBC Source. Он основан на JDBC-драйвере и периодических SQL-запросах к таблицам, указанным в конфигурации коннектора. Этот JDBC-коннектор настраивается намного проще, чем CDC-коннектор Debezium, не требуя изменений в конфигурации базы данных, таких как включение логической репликации и создания слота репликации, что я показывала здесь.
Сперва создадим конфигурацию коннектора. В качестве примера я, как обычно, возьму интернет-магазин. Предположим, необходимо передавать изменения данных из таблицы с продуктами (product) PostgreSQL в Kafka.

В отличие от CDC-коннектора, режим обновлений в JDBC-коннекторе гораздо менее гибок: параметр mode конфигурации коннектора может принимать только одно из следующих значений:
- bulk –массовая загрузка данных всей таблицы каждый раз при ее опросе;
- incrementing — инкрементный столбец, который поможет обнаружить только новые строки. Этот режим не обнаружит изменения или удаления существующих строк.
- timestamp – столбец с отметкой времени для обнаружения новых и измененных строк. Предполагается, что этот столбец в базе данных обновляется при каждой записи, его значения монотонно увеличиваются, но не обязательно уникальны.
- timestamp+incrementing – комбинациях двух столбцов, отметки времени, который обнаруживает новые и измененные строки, и инкремента, который предоставляет глобальный уникальный идентификатор для обновлений, чтобы каждой строке можно было назначить уникальное смещение потока.
В моей таблице product отсутствует столбец отметки времени, поэтому параметр mode конфигурации коннектора установлю в значение incrementing. Полная конфигурация коннектора выглядит так:
{
"name": "pg-jdbc-source-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "jdbc:postgresql://my-db-host:port/database",
"connection.user": "my-user",
"connection.password": "my-pass",
"table.whitelist": "ishop.product",
"mode": "incrementing",
"incrementing.column.name": "id",
"topic.prefix": "ishop-product-",
"poll.interval.ms": "10000",
"numeric.mapping": "best_fit",
"dialect.name": "PostgreSqlDatabaseDialect"
}
}
В этой конфигурации заданы следующие параметры:
- name — имя коннектора pg-jdbc-source-connector;
- class —класс коннектора io.confluent.connect.jdbc.JdbcSourceConnector, который будет использоваться для подключения к источнику данных через JDBC;
- max – максимальное количество задач, которые могут выполняться параллельно. В данном случае это одно подключение.
- url — URL для подключения к базе данных PostgreSQL;
- user – имя пользователя для подключения к базе данных;
- password — пароль пользователя для подключения к базе данных;
- whitelist — таблицы, которые будут использоваться коннектором, т.е. таблица product в схеме ishop.
- mode — режим извлечения данных (incrementing), инкрементальное извлечение данных на основе указанного столбца;
- column.name – название столбца (id), который будет использоваться для инкрементального извлечения данных. Обычно это первичный ключ или автоинкрементное поле. В моем случае это первичный ключ, который автоматически инкрементируется при вставке новой записи в таблицу.
- prefix – префикс (ishop-product-), который будет добавлен к имени Kafka-топика, куда будут отправляться данные из таблицы;
- interval.ms — интервал в миллисекундах между опросами базы данных на наличие новых данных (в данном случае 10000 равно 10 секунд);
- mapping — параметр преобразования числовых типов данных. Значение best_fit означает, что коннектор будет пытаться подобрать наиболее подходящий тип данных. Вообще параметр numeric.mapping может принимать значения none, precision_only, best_fit или best_fit_eager_double. Значение none устанавливается, если все столбцы NUMERIC должны быть представлены логическим типом DECIMAL Connect. Значение best_fit указывает, что столбцы NUMERIC должны быть преобразованы в INT8, INT16, INT32, INT64 или FLOAT64 на основе точности и масштаба столбца. Этот параметр может по-прежнему представлять значение NUMERIC как Connect DECIMAL, если его нельзя преобразовать в собственный тип без потери точности. Например, тип NUMERIC(20) с точностью 20 не сможет поместиться в собственный INT64 без переполнения и, таким образом, будет сохранен как DECIMAL. Значение best_fit_eager_double используется, когда надо всегда приводить числовые столбцы к типу данных Connect FLOAT64, несмотря на возможную потерю точности. Наконец, precision_only используется для сопоставления числовых столбцов только на основе точности столбца, когда масштаб столбца равен 0. По умолчанию используется none, что может привести к проблемам сериализации в AVRO, поскольку тип DECIMAL Connect сопоставляется с его двоичным представлением. Рекомендуется использовать best_fit, который сопоставляет данные с наиболее подходящим примитивным типом.
- name — диалекта базы данных (для PostgreSQL это PostgreSqlDatabaseDialect).
Чтобы зарегистрировать коннектор в Kafka Connect, надо оправить его HTTP-запросом POST в веб-сервер Kafka Connect. Поскольку платформа потоковой передачи у меня развернута в Docker на локальном хосте, в моем случае этот запрос будет выглядеть так:
curl -X POST -H "Content-Type: application/json" --data @pg-jdbc-source-connector.json http://localhost:8083/connectors

Это команда выполняется в командной строке, запуская утилиту curl с параметрами:
- -H «Content-Type: application/json», который добавляет HTTP-заголовок к запросу, указывая, JSON-формат передаваемых данных, чтобы сервер верно мог интерпретировать тело запроса;
- —data @pg-jdbc-source-connector.json — файл, который содержит данные JSON, отправляемые в теле POST-запроса. Символ @ перед именем файла указывает curl на то, что данные нужно прочитать из файла pg-jdbc-source-connector.json.
Если ошибок нет, коннектор после отправки сразу запустится, что сразу видно в GUI.

Передача новых данных из PostgreSQL с помощью коннектора
Как я уже отметила, JDBC-коннектор не требует никаких настроек на стороне источника данных, т.е. базы. После запуска коннектора автоматически создается топик Kafka, куда будут отправляться события вставки новых данных. Однако, из-за того, что поле цена (price) у меня типа numeric, изначально было некорректное сопоставление, когда параметр коннектора numeric.mapping был установлен в значение none.

Исправить это оказалось не так просто. Оказалось, что помимо автоматического создания топика, Kafka Connect также создает в реестре схем схему данных, которая описывает поля публикуемого в топике сообщения и их типы.

Эту схему данных в формате AVRO я отредактировала вручную, указав следующие параметры:
{
"type": "record",
"name": "product",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string"
},
{
"name": "provider",
"type": "int"
},
{
"name": "price",
"type": {
"type": "bytes",
"logicalType": "decimal",
"precision": 10,
"scale": 2
}
},
{
"name": "quantity",
"type": "int"
}
],
"connect.name": "product"
}
Изменения заняли несколько итераций, поскольку при первых запусках коннектора тип поля price был строковым, что не конвертировалось в вещественный. При этом пришлось изменить совместимость схемы в Schema Registry, временно установив ее на NONE, чтобы зарегистрировать новую версию. Это позволило сохранить старые данные, записывая новые данные с использованием новой схемы.
После добавления новой записи в таблицу PostgreSQL, она попадает в топик Kafka.

Однако, несмотря на верно указанное преобразование типов данных, вещественное значение преобразуется в целочисленное. Исправить это мне пока не удалось.

Подводя итоги работы с JDBC-коннектором, отмечу, что он понравился мне меньше, чем CDC-коннектор Debezium, поскольку имеет больше ограничений и меньше возможностей.
Освойте администрирование и эксплуатацию Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Apache Kafka для инженеров данных
- Администрирование кластера Kafka
- Администрирование Apache Kafka в Kubernetes
Источники


