Потоковый конвейер на Apache Kafka с библиотекой Quix Streams

Kafka курсы примеры обучение, Kafka для разработчика, Kafka примеры курсы обучение дата-инженеров, Школа Больших Данных Учебный Центр Коммерсант

Быстрая и простая обработка потоков сообщений в одном приложении с Quix Streams вместо Kafka Streams: практический пример на Python с обогащением и фильтрацией данных.

Практический пример потокового конвейера с Apache Kafka и Quix Streams

Сегодня я познакомилась с Quix Streams — очередной замечательной библиотекой для создания потоковых конвейеров обработки данных на Python. Она работает подобно Kafka Streams, но использует Python вместо Java. Quix Streams позволяет строить потоковые конвейеры обработки данных в реальном времени, используя знакомые каждому дата-инженеру датафреймы с гарантиями масштабируемости, отказоустойчивости и долговечности Kafka. Помимо API потоковых данных датафреймов, включая оконные и агрегатные функции, библиотека поддерживает API сериализаторов JSON, Avro, Protobuf и реестра схем, а также stateful-обработку со встроенным бэкендом RocksDB. Гарантии однократной обработки с помощью транзакций Kafka позволяют использовать Quix Streams для создания простых приложений продюсеров и потребителей, а также разрабатывать сложные потоковые системы, управляемые событиями.

Рассмотрим в качестве примера приложение, которое потребляет входные данные в потоковый датафрейм из одного топика Kafka (InputsTopic), обогащает их и записывает в другой топик (CorpAppsTopic). Таким образом, одно и то же приложение одновременно является и продюсером и потребителем Kafka.

Схема потокового конвейера
Схема потокового конвейера

Основным интерфейсом для определения конвейеров потоковой обработки в Quix Streams является StreamingDataFrame. Он позволяет обрабатывать и преобразовывать данные из одного формата в другой, обновляя и фильтруя их прямо в потоке. Операции с потоковыми данными при этом похожи на работу с обычным датафреймом pandas. StreamingDataFrame поддерживает концепцию отложенных (ленивых) вычислений и является конвейером, который содержит все преобразования для входящих сообщений. При этом он не сохраняет фактически потребленные данные в памяти, а только объявляет, как они должны быть преобразованы. Все манипуляции с StreamingDataFrame могут быть добавлены как по отдельности, так и в виде цепочки. Также можно разветвить конвейер для создания нескольких путей выполнения. Все потоковые датафреймы, сгенерированные с помощью конструкций Applicationwith, Application.dataframe() будут автоматически обнаружены и запущены при запуске потокового приложения Application.run().

В моем примере внешнее приложение-продюсер публикует в топик InputsTopic с входящие события пользовательского поведения: заявки на покупку продуктов в интернет-магазине или вопросы покупателей. Приложение app подключается к брокеру Kafka, считывает эти данные в потоковый датафрейм sdf, и, если это заявка на покупку продукта, добавляет к ней номер, а потом записывает это дополненное сообщение в топик CorpAppsTopic.

Код потокового приложения с библиотекой Quix Streams выглядит так:

!pip install quixstreams
from quixstreams import Application
from quixstreams.kafka.configuration import ConnectionConfig

connection = ConnectionConfig(
    bootstrap_servers=kafka_url,
    security_protocol="SASL_SSL",
    sasl_mechanism="SCRAM-SHA-256",
    sasl_username=username,
    sasl_password=password
)

# приложение для Kafka
app = Application(broker_address=connection)

# определение топиков
inputs_topic = app.topic("InputsTopic", value_deserializer="json")
outputs_topic = app.topic("CorpAppsTopic", value_serializer="json")

# потоковый датафрейм из входного топика Kafka
sdf = app.dataframe(topic=inputs_topic)

# Счётчик для нумерации заявок
app_counter = 0

# Функция для выявления заявок
def extract_apps(message):
    global app_counter
    try:
        content_parts = message["content"].rsplit(' ', 1)
        if len(content_parts) != 2:
            raise ValueError("Это не заявка на покупку")
        product, quantity_str = content_parts
        quantity = int(quantity_str)
        app_counter += 1  # Увеличиваем счётчик при успешной обработке
        return {
            "id": message["id"],
            "name": message["name"],
            "subject": message["subject"],
            "product": product,
            "quantity": quantity,
            "app_number": app_counter  # Добавляем номер заявки
        }
    except Exception as e:
        # запись ошибок в лог-файл на Google Диске
        error_str = f"Ошибка: {str(e)}, Сообщение: {message}\n"
        with open("dlq.txt", "a") as f:
            f.write(error_str)
        print(f"Ошибка: {str(e)}")
        return None  # Возвращаем None, если произошла ошибка

# применяем функцию к датафрейму
sdf_apps = sdf.apply(extract_apps)

# Фильтруем только успешные преобразования
sdf_valid_apps = sdf_apps.filter(lambda x: x is not None)

sdf_valid_apps.print()

# Записываем преобразованные данные в выходной топик Kafka
sdf_valid_apps.to_topic(outputs_topic)

# Запускаем потоковое приложение
app.run()

При запуске потокового приложения сперва выполняется проверка подключения, затем валидация наличия указанных топиков и возможности работы с ними, а также инициализация хранилища состояний, о котором я расскажу в другой раз. После всех проверок, наконец, начинается потребление сообщений в потоковый датафрейм и его обработка. Для записи данных в другой топик создается новый потоковый датафрейм, каждая строка которого публикуется в Kafka методом to_topic().

Запуск потокового конвейера на Apache Kafka и Quix Streams
Запуск потокового конвейера на Apache Kafka и Quix Streams

Таким образом, по сравнению с классическими клиентами Kafka, такими как kafka-python или confluent-kafka, Quix Streams предоставляет довольно удобный интерфейс для работы с потоками данных, скрывая сложность низкоуровневых настроек этой платформы потоковой передачи событий. Возможность работать с потоками данных как с обычным датафреймом упрощает разработку и позволяет быстрее создавать приложения. Из интересных и полезных функций этой библиотеки стоит особенно отметить stateful-обработку и обновление потоковых данных в реальном времени. Об этом я расскажу в следующих статьях.

Узнайте больше про построение сложных архитектур данных и использование Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://www.kai-waehner.de/blog/2023/05/28/quix-streams-stream-processing-with-kafka-and-python/
  2. https://quix.io/docs/quix-streams/processing.html
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту