Всего 2 cURL-вызова для потокового обновления данных с Apache Kafka Connect

курсы Apache Kafka Connect, обучение разработчиков курсы Apache Kafka, курсы по Kafka Conncect, обучение Kafka, разработка потоковых приложений Kafka, интеграция данных с Apache Kafka, обучение разработчиков Big Data, Школа Больших ДАнных Учебный центр Коммерсант

Сегодня в рамках обучения разработчиков распределенных приложений и дата-инженеров рассмотрим практический пример потоковой интеграции данных из 2-х разных источников с Apache Kafka. Читайте далее, как мгновенно передать данные между реляционными СУБД с помощью готовых JDBC-коннекторов через cURL-вызовы к REST API Kafka Connect.

Apache Kafka как средство потоковой интеграции данных

Интеграция данными между разными приложениями и облачными сервисами – одна из самых распространенных ИТ-задач. Apache Kafka может выступать в качестве сервисной шины предприятия (ESB, Enterprise Service Bus), предоставляя все необходимые компоненты для интеграции данных в соответствии с SOA-подходом [1]:

  • набор коннекторов для подключения к разным системам для приема и отправки данных;
  • очередь сообщений для организации промежуточного хранения сообщений в процессе их доставки;
  • платформа связи коннекторов с очередью, которая организует асинхронную передачу информации между источниками и приемниками с гарантированной доставкой сообщений и возможностью их трансформации.

Однако, при построении ESB не стоит встраивать в нее логику интеграции данных или бизнес-логику, чтобы избежать зависимости от связываемых сервисов. Подробнее об этом мы рассказывали здесь на примере кейса компании Авито.

Kafka Connect позволяет реализовать гибкую интеграцию между разными приложениями, предоставляя единое масштабируемое и надежное решение, которое заменяет медленные пакетные задания точечных интеграций между отдельными сервисами на быстрые потоковые обновления по всему ИТ-ландшафту предприятия. Рассмотрим типичный кейс, когда необходимо быстро обновить одну SQL-СУБД информацией из другой базы данных, например, при изменении статуса заказа требуется отразить это во внутренней базе CRM. Это означает, что каждый раз при изменении таблицы STATUS в системе исполнения заказов нужно обновить таблицу ORDER_STATUS в базе CRM. Для простоты предположим, что обе интегрируемые СУБД являются базами данных MySQL [2]. Как связать их с помощью Apache Kafka Connect, рассмотрим далее.

Apache Kafka для инженеров данных

Код курса
DEVKI
Ближайшая дата курса
20 января, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.

Интеграция SQL-СУБД с помощью Kafka Connect: практический пример

В общем случае для рассматриваемого кейса интеграции 2-х SQL-СУБД с помощью работающего кластера Apache Kafka и Kafka Connect требуется выполнить следующие шаги:

  • создать соединение-источник для чтения данных из таблицы STATUS из системы исполнения заказов;
  • создать соединение-приемник для записи данных в таблицу ORDER_STATUS базы данных CRM.

Проще всего это сделать через cURL-вызовы REST API Kafka Connect. Напомним, cURL (Client URL) – это свободно распространяемая утилита, кроссплатформенная служебная программа командной строки. Она позволяет взаимодействовать с разными серверами по множеству протоколов (FTP, FTPS, HTTP, HTTPS, TFTP, SCP, SFTP, Telnet, DICT, LDAP, POP3, IMAP и SMTP) с синтаксисом URL, поддерживая сертификаты HTTPS, методы HTTP POST, HTTP PUT, загрузку на FTP и через формы HTTP. Еще CURL поддерживает различные способы аутентификации: базовый, дайджест, NTLM и Negotiate для HTTP, а также Kerberos для FTP [3].

Таким образом, создание соединения-источника для Kafka Connect будет выглядеть так:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_source_mysql_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url": "jdbc:mysql://fulfillmentdbhost:3306/fulfillmentdb",
"connection.user": "fullfilment_user",
"connection.password": "<password>",
"topic.prefix": "order-status-update-",
"mode":"timestamp",
"table.whitelist" : "fulfullmentdb.status",
"timestamp.column.name": "LAST_UPDATED",
"validate.non.null": false
}
}'

Здесь команда cURL указывает Kafka Connect определенный тип исходного коннектора, а именно JdbcSourceConnector для подключения к базе данных MySQL по адресу fillmentdbhost: 3306/fillmentdb с использованием предоставленных учетных данных. Также выполняется настройка коннектора для отслеживания новых записей в таблице STATUS СУБД-источника на основе метки времени в столбце «LAST_UPDATED». В результате каждый раз при записи в таблицу новой строки Kafka Connect автоматически запишет соответствующее новое сообщение в топик Kafka «order-status-update-status». А второй вызов cURL обеспечивает соединение с приемником, который отслеживает изменения в указанном топике Kafka:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "jdbc_sink_mysql_01",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url": "jdbc:mysql://crmdbhost:3306/crmdb",
"connection.user": "crm_user",
"connection.password": "<password>",
"topics": "order-status-update-status",
"table.name.format" : "crmdb.order_status"
}
}'

Здесь команда cURL создает коннектор приемника JdbcSinkConnector, который подключается к другой СУБД — crmdbhost: 3306/crmdb. Этот коннектор будет отслеживать топик «order-status-update-status» на предмет новых сообщений и записывать их в таблицу ORDER_STATUS базы данных CRM (crmdb).

Аналогичным образом можно интегрировать любые приложения, для которых Kafka Connect предоставляет готовые коннекторы, или написать собственный. Далее нужно настроить коннекторы источника и приемника данных, и потоковая передача обновлений готова. Кроме того, после настройки коннектора-источника его можно использовать для связи сразу с несколькими приемниками. Например, в рассмотренном кейсе можно запустить третью команду cURL, чтобы записать все обновления статуса заказа в другую систему [2]. В следующей статье мы рассмотрим, что под капотом этих JDBC-коннекторов и разберем source-connctor Kafka Connect От Confluent.

Администрирование кластера Kafka

Код курса
KAFKA
Ближайшая дата курса
9 декабря, 2024
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.

Другие примеры интеграции систем с помощью Apache Kafka Connect мы рассматривали здесь и здесь. В заключение отметим, что существует множество готовых коннекторов к различным системам-приемникам и источникам. Но иногда разработчику приходится писать собственный код. Как это сделать, читайте в нашей новой статье

А освоить на практике эти и другие тонкости администрирования и эксплуатации Apache Kafka для разработки распределенных приложений потоковой аналитики больших данных вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. http://www.sovtex.ru/products/esb/
  2. https://levelup.gitconnected.com/how-to-use-kafka-connect-to-connect-two-data-sources-on-heroku-49104d693e4b
  3. https://ru.wikipedia.org/wiki/CURL
Поиск по сайту