Взаимодействие ksqlDB на Docker: примеры работы с CLI и REST API

ksqlDB CLI Docker WSL Kafka SQL, Kafka курсы примеры обучение, Kafka для разработчика, Kafka примеры курсы обучение дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Аналитика данных из топиков Kafka с помощью SQL-запросов: обращение к ksqlDB в Docker через CLI-интерфейс и REST API в Postman с SSH-тунелированием сервера потоковой базы данных. Практическое руководство с примерами и иллюстрациями.

CLI-интерфейс ksqldb

Docker-образ Confluent Kafka включает дополнительные компоненты этой платформы: ksqlDB, Kafka Connect, REST Proxy, Schema Registry). Сегодня я покажу, как работать с ksqlDB – базой данных для потоковой обработки сообщений в топиках Kafka с помощью SQL-запросов. После запуска Docker-контейнеров с Kafka и другими компонентами в WSL на Windows, о чем я писала здесь и здесь, запустим публикацию сообщений в топик и обработаем их через SQL-запросы.

Согласно моему конфигурационному YAML-файлу развертывания докеризированных сервисов, сервер ksqlDB работает на localhost:8088. Для выполнения запросов к ksqlDB потребуется консольный интерфейс ksqlDB или веб-интерфейс. В коммерческую лицензию Confluent Kafka входит Cofluent Command Center, который позволяет работать с ksqlDB. В community-версии, которую я использую, такого GUI нет. AKHQ работает только с самой Kafka и Kafka Connect, не поддерживая ksqlDB. Искать и дополнительно настраивать новый GUI мне не хотелось, поэтому решила обойтись CLI и Postman для обращения к REST API потоковой базы данных.

Для выполнения операций с данными в топиках Kafka через ksqlDB сперва в этой БД нужно создать поток (stream), который будет потреблять данные из нужного топика. После этого можно использовать этот поток для выполнения SQL-запросов. Чтобы запустить CLI-интерфейс ksqlDB, нужно подключиться к серверу в shell- или bash-оболочке. Поскольку я использую Docker в WSL, нужно запустить CLI в работающем Docker-контейнере. Для этого надо выполнить команду

docker exec –it

где

  • docker exec указывает Docker на выполнение команды в активном контейнере;
  • -i — флаг интерактивного режима, который позволяет поддерживать стандартный ввод (stdin) открытым, чтобы взаимодействовать с процессом в контейнере;
  • -t — флаг псевдотерминала, который обеспечивает терминал для сессии, обеспечивая читабельный вывод и позволяя форматировать текст.

Таким образом, чтобы запустить CLI-интерфейс ksqlDB в Docker, надо выполнить команду

docker exec -it <ksqldb-server-container-id> ksql http://localhost:8088

<ksqldb-server-container-id> — это идентификатор контейнера с ksqlDB-сервером. Посмотреть его значение можно, выполнив команду

docker ps
Просмотр запущенных Docker-контейнеров в WSL
Просмотр запущенных Docker-контейнеров

После выполнения команды

docker exec -it <ksqldb-server-container-id> ksql http://localhost:8088

откроется CLI-интерфейс ksqlDB, где можно создать поток для обращения к данным в топике Kafka.

Поскольку данные у меня в формате JSON публикуются в топик под названием test, DDL-запрос создания потока в ksqlDB будет таким:

CREATE STREAM test_stream (
    event_timestamp VARCHAR,
    user VARCHAR,
    page VARCHAR,
    event VARCHAR
) WITH (
    KAFKA_TOPIC='test',
    VALUE_FORMAT='JSON'
);

После этого можно обращаться к этим данным. Например, посчитаем количество событий разного типа, выполнив запрос с группировкой по полю event:

SELECT event, COUNT(*) AS event_count
FROM test_stream
GROUP BY event
EMIT CHANGES;

ksqlDB CLI Docker WSL Kafka SQLЗапуск CLI-интерфейса ksqlDB в WSLВ SELECT-запросе потоковой обработки данных используется оператор EMIT CHANGES, который указывает на постоянное уточнение результатов, поскольку данные  меняются постоянно, а не статичны. Это означает, что каждый раз при поступлении новых данных в поток test_stream, результат запроса будет обновляться и выводить изменения.

Помимо CLI-интерфейса, ksqlDB также имеет REST API, в т.ч. для выполнения SQL-запросов. Как это сделать, рассмотрим далее.

Apache Kafka для инженеров данных

Код курса
DEVKI
Ближайшая дата курса
20 января, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.

HTTP-запросы к REST API потоковой базы данных платформы Kafka Confluent

Конечной точкой HTTP API по умолчанию является http://0.0.0.0:8088/. Однако, чтобы обратиться к этому URL-адресу извне, например, из облачной службы Postman – интерфейса тестирования REST API, надо туннелировать порт 8088. Можно, конечно, запустить Postman локально, но он потребляет очень много ресурсов. Поэтому я обычно использую облачную службу. Для тунелирования порта на локальной машине, как и в прошлый раз, буду использовать SSH-сервер Serveo. Он не только формирует URL-адрес для обращения к службе на локальном хосте извне, но и дает возможность сделать его запоминающимся, как собственный поддомен Serveo. Например, для ksqlDB я решила зарегистрировать URL-адрес ksqldb.serveo.net, выполнив команду

ssh -R ksqldb:80:localhost:8088 serveo.net

После этого можно обращаться к конечным точкам REST API сервера ksqlDB в облачной службе Postman. Примечательно, что эти конечные точки доступны только при использовании HTTP 2. Сперва посмотрим просто опубликованные сообщения, выполнив обращение к ресурсу /query серверного приложения ksqlDB. Ресурс /query позволяет передавать выходные результаты выполнения SQL-оператора  SELECT. Ответ передается до тех пор, пока LIMIT не достигнет указанное в операторе значение или клиент не закроет соединение. Сам SQL-запрос передается как тело HTTP-запроса в виде JSON-объекта в кодировке UTF-8. Кроме тела HTTP-запроса методом POST нужно еще настроить Accept-заголовок, указав тип данных, который сможет принимать клиент от сервера. В данном случае в этом ключе надо указать значение application/vnd.ksql.v1+json. Это означает, что клиент ожидает получить ответ от сервера в формате JSON, который соответствует 1-ой версии API ksqlDB.

REST API ksqlDB Postman
Запрос к REST API ksqlDB через Postman

Помимо ресурса /query в ksqlDB есть ресурс /query-stream. Эта конечная точка чаще используется для выполнения потоковых запросов, которые возвращают данные в режиме реального времени, когда важно получать непрерывный поток данных по мере их появления. Хотя /query тоже позволяет получить постоянно обновляемые результаты с помощью EMIT CHANGES, обычно этот ресурс используют для фиксированного опроса таблицы.

Запрос к /query-stream выдает ответы с двумя возможными типами контента: application/json и application/vnd.ksqlapi.delimited.v1 (по умолчанию). Если задать заголовок HTTP-запроса Accept равным application/vnd.ksqlapi.delimited.v1, то результаты будут возвращаться как объект JSON, за которым следует ноль или более массивов, разделенных символами новой строки. Такой вывод легко анализируется и не требуют потокового анализатора JSON на клиенте, если надо вывести промежуточные результаты.

REST API ksqlDB Postman
Другой запрос к REST API ksqlDB через Postman

Наконец, посчитаем количество событий разных типов, отправив POST-запрос к ресурсу /query с телом

{
     "ksql": "SELECT EVENT, COUNT(*) AS EVENT_COUNT FROM test_stream GROUP BY EVENT EMIT CHANGES;",
     "streamsProperties": {
       "ksql.streams.auto.offset.reset": "earliest"
     }
   }

Параметр streamsProperties с настройкой «ksql.streams.auto.offset.reset»: «earliest» указывает, что ksqlDB должна начать считывание данных с самого начала потока, т.е. с первого доступного сообщения.

Аналитика опубликованных в Kafka сообщений через запрос к REST API ksqlDB в Postman
Аналитика опубликованных в Kafka сообщений через запрос к REST API ksqlDB в Postman

Если отправить этот же запрос к ресурсу /query-stream, вывод будет более лаконичным и удобным для последующей обработки, о которой я расскажу в другой раз.

Научитесь администрированию и эксплуатации Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://docs.confluent.io/platform/current/ksqldb/installing.html
  2. https://docs.ksqldb.io/en/latest/developer-guide/api/
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту