Из Kafka в Elasticsearch с помощью sink-коннектора OpenSearch: практический пример

Kafka курсы примеры обучение, Kafka для разработчика, Kafka примеры курсы обучение дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Как передать JSON-документы из топика Kafka в Elasticsearch, используя  OpenSearch Sink Connector. Подробная демонстрация с настройкой и регистрацией коннектора в Kafka Connect.

Настройка sink-коннектора и отправка в Kafka Connect

Как передать данные из Kafka в Elasticsearch, я уже показывала здесь, развернув экземпляр Kafka в облаке на платформе Upstash. Однако, с 1 октября 2024 года Upstash перестала поддерживать коннекторы, а также с марта 2025 года вообще отключает инстансы этой платформы потоковой передачи событий. Поэтому мне пришлось развернуть образ от Confluent со всеми сопутствующими сервисами (GUI на AKHQ, Connect, Schema Registry) в Docker на своем локальном компьютере. На этом локальном развертывании, получив данные из PostgreSQL с помощью JDBC-коннектора источника, я решила передать их в облачный Elasticsearch 7.10.2, экземпляр которого у меня развернут в сервисе Bonsai.

Скачав с хаба Confluent архив Self-Hosted ElasticSearch Sink Connector confluentinc-kafka-connect-elasticsearch-14.1.2, его надо развернуть в папке jars, которая находится в директории с конфигурационным YAML-файлом контейнеров docker-compose. После этого можно настраивать конфигурацию коннектора и отправлять его в Connect, передав JSON-файл конфигурации с помощью POST-запроса.

Конфигурация моего коннектора к Elasticsearch 7.10.2, описанная в файле elastic-sink-connector.json, выглядела так:

{
  "name": "elastic-sink-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "ishop-product-product",
    "connection.url": "https://my-host:my-port",
    "type.name": "kafka-connect",
    "key.ignore": "true",
    "schema.ignore": "true",
    "batch.size": "2000",
    "max.retries": "5",
    "retry.backoff.ms": "1000",
    "max.buffered.records": "20000",
    "flush.timeout.ms": "10000",
    "linger.ms": "1000",
    "max.in.flight.requests": "5",
    "index.auto.create": "true",
    "behavior.on.malformed.documents": "warn",
    "connection.username": "my-user",
    "connection.password": "my-password"
  }
}

Но попытка отправить коннектор с этой конфигурацией в Kafka Connect с помощью инструкции

curl -X POST -H "Content-Type: application/json" --data @elastic-sink-connector.json http://localhost:8083/connectors

выдала ошибку

org.sourcelab.kafka.connect.apiclient.rest.exceptions.InvalidRequestException: Connector configuration is invalid and contains the following 4 error(s): Could not connect to Elasticsearch. Error message: Invalid or missing build flavor [oss] Failed to create client to verify connection. Invalid or missing build flavor [oss] Could not authenticate the user. Check the 'connection.username' and 'connection.password'. Error message: Invalid or missing build flavor [oss] Could not authenticate the user. Check the 'connection.username' and 'connection.password'. Error message: Invalid or missing build flavor [oss]

Эта ошибка связана с несовместимостью между используемой версией Elasticsearch и ожидаемой версией от коннектора. В частности, Confluent Elasticsearch Sink Connector требует коммерческую, а не OSS-версию Elasticsearch, которая поддерживает определённые функции. Поэтому вместо sink-коннектора к Elasticsearch пришлось использовать его открытую альтернативу: Opensearch. OpenSearch представляет собой вариацию Elasticsearch и Kibana, созданную после изменения лицензии Elasticsearch. Он предоставляет аналогичные функции для поиска и аналитики данных. Sink-коннектор OpenSearch работает аналогично Confluent Elasticsearch Sink Connector, позволяя передавать данные из топика в документо-ориентированную БД для индексации и поиска.

Скачав с Github архив opensearch-connector-for-apache-kafka-3.1.1 и распаковав его в папку jars, я настроила следующую конфигурацию коннектора:

{
  "name": "opensearch-sink-connector",
  "config": {
    "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
    "tasks.max": "1",
    "topics": "ishop-product-product",
    "connection.url": "https://my-host:my-port",
    "type.name": "kafka-connect",
    "key.ignore": "true",
    "schema.ignore": "true",
    "batch.size": "2000",
    "max.retries": "5",
    "retry.backoff.ms": "1000",
    "max.buffered.records": "20000",
    "flush.timeout.ms": "10000",
    "linger.ms": "1000",
    "max.in.flight.requests": "5",
    "index.auto.create": "true",
    "behavior.on.malformed.documents": "warn",
    "connection.username": "my-user",
    "connection.password": "my-password"
  }
}

Отправка коннектора с этой конфигурацией в Kafka Connect уже не выдает ошибок, а регистрирует коннектор и успешно запускает коннектор, что сразу видно в GUI.

Запущенный коннектор в Kafka Connect
Запущенный коннектор в Kafka Connect

Поскольку в топике ishop-product-product уже есть данные в виде JSON-документов, полученные с помощью JDBC-коннектора из PostgreSQL, они сразу же передаются в Elasticsearch. Посмотреть это можно в интерфейсе сервиса Bonsai, отправив POST-запрос к автоматически созданному индексу в API поиска Elasticsearch:

Содержимое индекса Elasticsearch
Содержимое индекса Elasticsearch

Таким образом, при использовании собственного развертывания Kafka Connect, надо очень тщательно обращать внимание на варианты коннекторов и их совместимость с разными версиями систем-источников и приемников.

Освойте администрирование и эксплуатацию Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту