Потоковое обогащение данных с Flink SQL данными из внешнего сервиса по REST API

Flink SQL примеры курсы обучение, Apache Flink для дата-инженеров разработчиков и аналитиков примеры курсы обучение, REST API FLINK SQL, Apache Flink дата-инженер, потоковая обработка данных Flink, обучение дата-инженеров и разработчиков курсы примеры, Школа Больших Данных Учебный Центр Коммерсант

В этой статье для обучения дата-инженеров и разработчиков распределенных приложений рассмотрим, как Flink SQL может обогатить ML-модель данными из внешней системы в режиме реального времени с использованием REST API. Что представляет собой http-flink-connector с открытым исходным кодом, разработанный GetInData на основе концепции Lookup Joins.

Обогащение данных c SQL: достоинства использования и трудности реализации

В проектах аналитики больших данных часто требуется обрабатывать интенсивные потоки данных в реальном времени, не просто выполняя преобразования, а обогащая данные сведениями из внешних источников, включая соединение потоков и сложные агрегации. Распределенный фреймворк для создания потоковых stateful-приложений Apache Flink отлично справляется с этой задачей, что мы рассматриваем здесь. Благодаря наличию SQL-библиотеки, Flink могут использовать дата-аналитики для обогащения потоков данных информацией из сторонних хранилищ, доступ к которым осуществляется через по HTTP через вызовы REST API. Flink предоставляет SQL API, совместимый со стандартом ANSI, который можно использовать для определения конвейеров обработки данных и выражения источников данных, приемников и функций преобразования данных, включая распознавание образов.

Рассмотрим типичный сценарий использования Flink SQL в задаче машинного обучения, когда надо обогатить ML-модель данными в режиме реального времени для дальнейшей обработки. Данные передаются через HTTP-запрос GET к внешнему веб-сервису. К примеру, это нужно, чтобы расширить входящие потоки данных о кредитах подробными пользовательскими метаданными. На практике обогащение внешними данными обычно реализуется с помощью определяемой пользователем функции (UDF). В нашем примере с ML это может выглядеть примерно так:

SELECT genericEnrichment(orderId, "http://3rdpartyservice.com/service/ml"") FROM orders

Однако, для реализации этого простого способа пользователь должен знать, что выделенный OrdersEnrichmen существует и необходимо указать ожидаемые от него параметры. Другими словами, пользователь должен знать, как использовать эту UDF, что не всегда возможно на практике, если пользователем является не разработчик, а аналитик данных, который умеет использовать SQL-запросы, но не пишет код на Java.

Другая проблема заключается в повторном использовании выделенных OrdersEnrichmen. Если сделать эту UDF очень конкретной, она станет более удобным для пользователя. Потребуется меньше параметров, что упростит запрос, но для каждого нового случая потребуется новая пользовательская функция. А для реализации новых UDF потребуется разработчик кода на Java, что увеличит срок выхода новой функциональной возможности на рынок.

Таким образом, использование типового SQL-запроса с JOIN гораздо проще и удобнее в эксплуатации, поскольку отсутствует необходимость переключения контекста между диалектами этого языка и специфической настройки параметров конфигурации распределенного приложения. Итак, имея таблицу с исходными данными ( заказы) и с данными ML-модели (ML_Data), записи из обеих таблиц можно соединить по ключу следующим SQL-запросом:

SELECT Orders.\*, ML_Data.\* FROM Orders AS o JOIN ML_Data AS ml ON o.id = ml.id

Как реализовать это, используя Flink SQL и open-source коннектор http-flink от GetInData, рассмотрим далее.

HTTP-коннектор для Apache Flink

Чтобы удовлетворить, прежде всего, собственную потребность в возможности использования Flink SQL для доступа к данным во внешних системах в режиме реального времени с использованием REST API, компания GetInData разработала специальный коннектор на основе концепции Lookup Joins. Он позволяет извлекать данные из внешней системы с помощью HTTP-запросов GET и методов приемника (Sink), чтобы применять его в операторе Flink SQL в качестве стандартной таблицы, которую впоследствии можно соединить с другим потоком через SQL-запрос.

Пока коннектор HTTP TableLookup поддерживает только строковые типы данных, соединения поиска и ожидает JSON в качестве тела ответа. HTTP Sink поддерживает как Streaming API при создании с помощью HttpSinkBuilder, так и Table API при создании в HttpDynamicTableSinkFactory.

Коннектор от GetInData с открытым исходным кодом flink-http-connector позволяет определять таблицы Flink SQL, которые действуют как источник данных для обогащения. На такую таблицу можно ссылаться в SQL-запросе с JOIN. Например, так можно определить таблицу данных следующим образом:

CREATE TABLE ML_Data (
  id STRING,
  id2 STRING,
  msg STRING,
  uuid STRING,
  isActive STRING,
  balance STRING
) WITH (
  'connector' = 'rest-lookup',
  'url' = 'http://localhost:8080/client'
)

Эта таблица является новым источником данных, созданным с использованием чистого Flink SQL без использования какого-либо языка программирования типа Java или Scala. Достаточно только SQL и нескольких параметров конфигурации, чтобы указать, что эта таблица поддерживается внешней веб-службой. Можно определить множество таких источников, каждый из которых будет иметь разные схемы и использовать разные внешние сервисы. Все эти источники будут действовать как стандартные реляционные таблицы, с которыми умеет работать каждый аналитик.

Параметр url определяет базовый URL-адрес для REST API, используемый для получения данных из внешней системы. В настоящее время коннектор flink-http-connector поддерживает только HTTP-метод GET.

Возвращаясь к рассматриваемому примеру, отметим, что SQL-запрос Flink, который соответствует этому сценарию использования, должен использовать так называемое соединение поиска (Lookup Join), которое передает коннектору аргументы JOIN для создания HTTP-запроса.

SQL для обогащения данных через соединение двух аргументов id и id2 с таблицей, поддерживаемой http-коннектором, будет выглядеть как типовой SQL-запрос, использующий стандартную концепцию JOIN:

SELECT o.id, o.id2, c.msg, ml.uuid, ml.isActive FROM Orders AS o
JOIN ML_Data FOR SYSTEM_TIME AS OF o.proc_time AS ml ON o.id = ml.id AND o.id2 = ml.id2

Таким образом, обогащение данными из внешней системы с использованием ее REST API может быть выражено в виде SQL-запроса Flink с использованием коннектора http-flink-connector, который компания GetInData сделала общедоступным под лицензией с открытым исходным кодом. Как именно это было реализовано, читайте в нашей новой статье.

Узнайте больше про использование возможностей Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://getindata.com/blog/data-enrichment-flink-sql-http-connector-flink-sql-part-one/
  2. https://github.com/getindata/flink-http-connector
Поиск по сайту