Пример потокового конвейера из Kafka в Elasticsearch на платформе Decodable

Kafka курсы примеры обучение, NoSQL обучение примеры курсы, инженерия данных с Elasticsearch и Kafka, использование Kafka Elasticsearch pipeline, интеграция Kafka пример, Школа Больших Данных Учебный Центр Коммерсант

Практическая демонстрация потокового SQL-конвейера, который преобразует данные, потребленные из Apache Kafka, и записывает результаты в Elasticsearch, используя Debezium-коннекторы и задания Apache Flink в облачной платформе Decodable.

Потребление сообщений из Apache Kafka

Я уже показывала пример интеграции Apache Kafka и Elasticsearch с помощью sink-коннектора, а также конвейер с ClickHouse Cloud. Сегодня совместим некоторые из этих прежних работ, чтобы протестировать облачную платформу Decodable на базе Apache Flink и Debezium. Она является полностью управляемым сервисом, который обеспечивает обработку данных в реальном времени. С Decodable можно получать данные из различных источников и направлять их в разные системы-приемники, а также преобразовывать и дополнять эти потоковые данные с помощью SQL или языка программирования на основе JVM (Java, Scala). Я буду использовать именно SQL в качестве основного средства построения потокового конвейера, суть которого состоит в извлечении данных из топика Kafka, их преобразовании и записи в Elasticsearch.

Архитектура потокового конвейера
Архитектура потокового конвейера

Как обычно, данные в Kafka, развернутой на платформе Upstash, буду публиковать с помощью простого Python-приложения, запускаемого в Google Colab, со следующим исходным кодом:

#установка библиотек
!pip install kafka-python
!pip install faker

#импорт модулей
import json
import random
from datetime import datetime
import time
from time import sleep
from kafka import KafkaProducer

# Импорт модуля faker
from faker import Faker
# объявление продюсера Kafka
producer = KafkaProducer(
  bootstrap_servers=[kafka_url],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username=username,
  sasl_plain_password=password,
  value_serializer=lambda v: json.dumps(v).encode('utf-8'),
  #batch_size=300
)
topic='test'
# Создание объекта Faker с использованием провайдера адресов для России
fake = Faker()

#списки полей в заявке
products = ['яблоки желтые', 'малина', 'вода', 'хлеб белый','хлеб серый', 'креветки', 'форель', 'апельсины', 'кета','курица','яйцо перепелиное','яйцо куриное','лаваш',
            'булка сдобная','булка сахарная','помидоры бакинские','помидоры чери','огурцы','перец сладкий','перец острый','перец болгарский','мандарины','укроп свежий',
            'укроп сушеный','клубника свежая','клубника мороженная','мороженое','картошка','морковь', 'свекла','пангасиус','семга','кальмар замороженный','горошек зеленый',
            'смородина черная','смородина красная','соль поваренная пищевая йодированная','чай черный байховый','чай зеленый','чай красный','кофе','кофе с молоком','какао',
            'молоко','кефир','сыр с плесенью','сыр плавленый','сыр твердый','сыр мягкий','яблоки красные','яблоки зеленые','яблоки сушеные','икра красная',
            'икра черная','икра заморская баклажанная','масло сливочное','масло оливковое','масло подсолнечное','масло кокосовое','орех грецкий','орех бразильский',
            'лист лавровый','куркума','кукуруза','печенье сладкое','пряники сдобные','тесто слоеное','варенц','ряженка','снежок','шоколад молочный']

questions = ['payment', 'delivery', 'discount', 'vip', 'staff']

#бесконечный цикл публикации данных
while True:
  #подготовка данных для публикации в JSON-формате
  now=datetime.now()
  producer_timestamp=now.strftime("%Y-%m-%d %H:%M:%S")

  content = ''
  theme = ''
  corp = 0

  #виды заявок - корпоративная или личная
  corp = random.choice([1,0])

  if corp==1 :
    name=fake.company()
  else:
    name=fake.name()

  #случайный выбор заявка или вопрос
  subject=random.choice(['app', 'question'])

  # Добавление дополнительных данных для заголовка и тела сообщения в зависимости от темы заявки
  if subject == 'question':
        theme = random.choice(questions)
        content = 'Hello, I have a question about ' + theme
  else:
        theme = 'app'
        kol = random.randint(1, 10) # Число позиций в заказе
        items = [] # Инициализация списка для элементов заказа
        for i in range(kol):
            items.append({'name': random.choice(products), 'quantity': str(random.randint(1, 100))})
        content = items

  # Создаем полезную нагрузку в JSON
  data = {"producer_timestamp": producer_timestamp, "name": name, "subject": subject, "content": content}

  #публикуем данные в Kafka
  future = producer.send(topic, value=data)
  record_metadata = future.get(timeout=60)
  print(f' [x] Sent {record_metadata}')
  print(f' [x] Payload {data}')

  #повтор через 3 секунды
  time.sleep(3)

Этот  Python-скрипт каждые 3 секунды генерирует и публикует в Kafka клиентские обращения от компаний или частных лиц: заявки на покупку товаров в интернет-магазине или вопросы по работе магазина.

Python-скрипт публикации сообщений в Kafka
Python-скрипт публикации сообщений в Kafka

Схема полезной нагрузки сообщения в формате JSON выглядит так:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "Generated schema for Root",
  "type": "object",
  "properties": {
    "producer_timestamp": {
      "type": "string"
    },
    "name": {
      "type": "string"
    },
    "subject": {
      "type": "string"
    },
    "content": {
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "name": {
            "type": "string"
          },
          "quantity": {
            "type": "string"
          }
        },
        "required": [
          "name",
          "quantity"
        ]
      }
    }
  },
  "required": [
    "producer_timestamp",
    "name",
    "subject",
    "content"
  ]
}

Далее получим эти данные с помощью Debezium-коннектора в платформе Decodable. Поскольку Decodable основана на Apache Flink, она использует именно его термины при создании потокового конвейера. В частности, сперва нужно создать коннекторы к источникам и приемникам данных, настроив учетные записи подключения к внешним системам. Decodeable создает сетевые подключения к ресурсам, которые указаны в подключениях. Я создала 2 коннектора: Kafka как источник данных и Elasticsearch как приемник.

Подключения к внешним системам: источник и приемник
Подключения к внешним системам: источник и приемник

Преобразование потока данных и публикация в Elasticsearch

Предположим, нужно отправлять в индекс Elasticsearch под названием apps_index не все подряд обращения, а только те, которые являются заявками на покупку, а также количество позиций в каждой заявке. Чтобы сделать это, необходимо создать новый поток, который извлекает данные из исходного потока from_test_kafka, немного преобразует и обогащает их с помощью следующего SQL-запроса:

insert into apps_from_kafka
select producer_timestamp, name, CARDINALITY(content) AS items_quantity, content
from from_test_kafka
WHERE subject='app'
Потоки в Decodable
Потоки в Decodable

Протестировав и настроив источник и приемник данных, следует запустить оба подключения и сам настроенный конвейер, чтобы он работал непрерывно.

Потоковый конвейер обработки данных
Потоковый конвейер обработки данных

Статистика по принятым из источника и отправленным в приемник данным отображается в свойствах созданного конвейера.

Статистика работы потокового конвейера
Статистика работы потокового конвейера

Проверить, что данные, потребленные из Kafka, попадают в систему-приемник, т.е. Elasticsearch, можно из интерфейса самой это NoSQL-СУБД с помощью API поиска, отправив POST-запрос к /apps_index/_search с полезной нагрузкой для просмотра всех результатов:

{
   "query":{
      "match_all":{}
   }
}
Содержимое индекса Elasticsearch
Содержимое индекса Elasticsearch

Более наглядный вид содержимого индекса показывает веб-интерфейс панелей Kibana, интегрированный с Elasticsearch

Просмотр данных в Kibana
Просмотр данных в Kibana

Разумеется, вся информация о публикации и потреблении данных отображается в GUI веб-развертывания Kafka.

Визуализация пропускной способности публикации и потребления сообщений в Kafka
Визуализация пропускной способности публикации и потребления сообщений в Kafka

Разумеется, для практического использования полученной из Kafka информации, конвейер должен включать больше операций преобразования данных. В частности, чтобы построить поминутную или почасовую гистограмму получения заявок в Kibana, необходимо преобразовать типы полей индекса, чтобы поле producer_timestamp было не строкой, а датой и временем. Кроме того, следует преобразовать поле content, рассматривая его как документ, чтобы выполнять поиск по названию и количеству заказанных продуктов, а также строить по ним различные визуализации. Для этого SQL-запрос, преобразующий поток исходных данных из Kafka, будет намного сложнее. Аналогичный практический пример потоковой агрегации данных из Kafka  с помощью SQL-запросов читайте в моей новой статье про потоковую базу данных RisingWave.

Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://upstash.com/docs/kafka/integrations/decodable
  2. https://docs.decodable.co/pipelines/function-reference.html
Поиск по сайту