Дополняя наши курсы по Apache Kafka практическими примерами, сегодня рассмотрим, как загрузить в топик данные из ответа REST API или HTTP-запроса. Читайте далее, что такое cURL и какие команды нужно отправить через эту утилиту, чтобы записать в Kafka сообщения из JSON-файла.
REST API, HTTP и сURL
Импорт данных из REST API в топики Kafka обычно включает разработку специального продюсера для чтения данных и записи. Это не так-то просто при работе с несколькими конечными точками REST, ответами и аутентификациями, поэтому существуют различные RESTful-коннекторы [1]. По этой же причине компания Confluent разработала прокси-сервер с RESTful-интерфейсом – REST Proxy к Apache Kafka, о котором мы рассказывали здесь и здесь.
Однако, чтобы просто записать содержимое REST API или HTTP-ответа в Apache Kafka, например, с какого-то веб-сайта, вовсе необязательно прибегать к подобным дополнительным компонентам. Можно воспользоваться свободно распространяемой утилитой cURL (Client URL) – кроссплатформенной служебной программой командной строки. Она позволяет взаимодействовать с различными серверами по множеству протоколов (FTP, FTPS, HTTP, HTTPS, TFTP, SCP, SFTP, Telnet, DICT, LDAP, POP3, IMAP и SMTP) с синтаксисом URL, поддерживая сертификаты HTTPS, методы HTTP POST, HTTP PUT, загрузку на FTP, загрузку через формы HTTP, а также аутентификацию (базовая, дайджест, NTLM и Negotiate для HTTP, Kerberos для FTP) [2].
Запись данных REST API или HTTP-ответа в Apache Kafka с cURL: 4 простых шага
Утилита cURL поддерживает скачивание и удаление файлов, а ее основной синтаксис выглядит следующим образом: curl [OPTIONS] [URL]. С использованием cURL последовательность шагов для загрузки данных из содержимого REST API или HTTP-ответа в топики Apache Kafka, развернутой в Docker-контейнере, строится так [3]:
- чтение содержимого ответа HTTP/REST API в файл JSON с помощью команды
curl -L -o assessment-attempts-nested.json https://site
В этой команде опция –L, действительная для HTTP и HTTPS, означает возможность повторить запрос, если сервер сообщает, что запрошенная страница перемещена в другое место (обозначенное заголовком Location: и кодом ответа 3XX). При аутентификации curl отправляет свои учетные данные только начальному хосту и, если включено перенаправление curl не сможет перехватить имя пользователя и пароль. Ограничить количество последующих перенаправлений поможет параметр — max-redirs. Опция –o означает запись вывода в файл assessment-attempts-nested.json. Вместо https://site следует подставить URL-адрес, откуда нужно получить данные.
- объединение полученных файлов в стандартный вывод с командой
cat assessment-attempts-nested.json | jq ‘.[]’ –c
Опция | jq ‘.[]’ –c означает, что для чтения JSON-файла используется jq – легкий и гибкий JSON-процессор командной строки, который выделяет каждый индекс в массиве JSON в новую строку, сохраняя форматирование.
- отображение количества строк полученного JSON-массива, каждая из которых будет сообщением в топике Kafka, с помощью команды cat assessment-attempts-nested.json | jq ‘.[]’ —c | wc –l. Опция | wc -l позволяет взять стандартный вывод предыдущего шага, который представляет собой отформатированные и извлеченные строки jq из массива JSON, и напечатать счетчик новых строк. Результатом этой команды будет конкретное число, означающее количество сообщений, которые окажутся в топике Kafka.
- непосредственная запись строк полученного JSON-массива в топик Kafka под названием «assessment-attempts». Здесь будет использоваться docker-compose, т.к. Kafka запущена в Docker-контейнере: docker-compose exec container1 bash -c «cat assessment-attempts-nested.json | jq ‘.[]’ -c | kafkacat -P -b kafka:29092 -t assessment-attempts
docker-compose exec запускает команду в bash-оболочке контейнере с именем «container1». Опция -c позволяет читать команды из следующей строки, которая объединяет содержимое файла assessment-plays-nested.json в стандартный вывод и передает его в jq ‘. []’ –C. Она получает все содержимое вывода в формате JSON и извлекает каждый индекс массива в новую строку. А команда kafkacat -P -b kafka: 29092 -t Assessment-sessions запускает утилиту в режиме продюсера, который генерирует сообщения со стандартного ввода (stdin). Брокер Kafka задается через опцию -b kafka, имя и хост которого настроен в файле docker-compose.yml. Опция -t нужна, чтобы указать название топика Apache Kafka, куда будут записываться данные. В рассматриваемом примере это топик с названием «assessment-attempts». Как использовать cURL в задачах интеграции с Apache Kafka Connect, читайте в нашей новой статье.
Узнайте больше про администрирование кластеров Apache Kafka и разработку распределенных приложений потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники
- https://www.progress.com/tutorials/jdbc/import-data-from-any-rest-api-to-kafka-incrementally-using-jdbc
- https://ru.wikipedia.org/wiki/CURL
- https://medium.com/geeks-for-tech/consuming-data-through-apis-into-kafka-c3d3562093dd