Потоковый конвейер на Apache Kafka с библиотекой Quix Streams

Потоковый конвейер на Apache Kafka с библиотекой Quix Streams

    Быстрая и простая обработка потоков сообщений в одном приложении с Quix Streams вместо Kafka Streams: практический пример на Python с обогащением и фильтрацией данных.

    Практический пример потокового конвейера с Apache Kafka и Quix Streams

    Сегодня я познакомилась с Quix Streams — очередной замечательной библиотекой для создания потоковых конвейеров обработки данных на Python. Она работает подобно Kafka Streams, но использует Python вместо Java. Quix Streams позволяет строить потоковые конвейеры обработки данных в реальном времени, используя знакомые каждому дата-инженеру датафреймы с гарантиями масштабируемости, отказоустойчивости и долговечности Kafka. Помимо API потоковых данных датафреймов, включая оконные и агрегатные функции, библиотека поддерживает API сериализаторов JSON, Avro, Protobuf и реестра схем, а также stateful-обработку со встроенным бэкендом RocksDB. Гарантии однократной обработки с помощью транзакций Kafka позволяют использовать Quix Streams для создания простых приложений продюсеров и потребителей, а также разрабатывать сложные потоковые системы, управляемые событиями.

    Рассмотрим в качестве примера приложение, которое потребляет входные данные в потоковый датафрейм из одного топика Kafka (InputsTopic), обогащает их и записывает в другой топик (CorpAppsTopic). Таким образом, одно и то же приложение одновременно является и продюсером и потребителем Kafka.

    Схема потокового конвейера
    Схема потокового конвейера

    Основным интерфейсом для определения конвейеров потоковой обработки в Quix Streams является StreamingDataFrame. Он позволяет обрабатывать и преобразовывать данные из одного формата в другой, обновляя и фильтруя их прямо в потоке. Операции с потоковыми данными при этом похожи на работу с обычным датафреймом pandas. StreamingDataFrame поддерживает концепцию отложенных (ленивых) вычислений и является конвейером, который содержит все преобразования для входящих сообщений. При этом он не сохраняет фактически потребленные данные в памяти, а только объявляет, как они должны быть преобразованы. Все манипуляции с StreamingDataFrame могут быть добавлены как по отдельности, так и в виде цепочки. Также можно разветвить конвейер для создания нескольких путей выполнения. Все потоковые датафреймы, сгенерированные с помощью конструкций Applicationwith, Application.dataframe() будут автоматически обнаружены и запущены при запуске потокового приложения Application.run().

    В моем примере внешнее приложение-продюсер публикует в топик InputsTopic с входящие события пользовательского поведения: заявки на покупку продуктов в интернет-магазине или вопросы покупателей. Приложение app подключается к брокеру Kafka, считывает эти данные в потоковый датафрейм sdf, и, если это заявка на покупку продукта, добавляет к ней номер, а потом записывает это дополненное сообщение в топик CorpAppsTopic.

    Код потокового приложения с библиотекой Quix Streams выглядит так:

    !pip install quixstreams
    from quixstreams import Application
    from quixstreams.kafka.configuration import ConnectionConfig
    
    connection = ConnectionConfig(
        bootstrap_servers=kafka_url,
        security_protocol="SASL_SSL",
        sasl_mechanism="SCRAM-SHA-256",
        sasl_username=username,
        sasl_password=password
    )
    
    # приложение для Kafka
    app = Application(broker_address=connection)
    
    # определение топиков
    inputs_topic = app.topic("InputsTopic", value_deserializer="json")
    outputs_topic = app.topic("CorpAppsTopic", value_serializer="json")
    
    # потоковый датафрейм из входного топика Kafka
    sdf = app.dataframe(topic=inputs_topic)
    
    # Счётчик для нумерации заявок
    app_counter = 0
    
    # Функция для выявления заявок
    def extract_apps(message):
        global app_counter
        try:
            content_parts = message["content"].rsplit(' ', 1)
            if len(content_parts) != 2:
                raise ValueError("Это не заявка на покупку")
            product, quantity_str = content_parts
            quantity = int(quantity_str)
            app_counter += 1  # Увеличиваем счётчик при успешной обработке
            return {
                "id": message["id"],
                "name": message["name"],
                "subject": message["subject"],
                "product": product,
                "quantity": quantity,
                "app_number": app_counter  # Добавляем номер заявки
            }
        except Exception as e:
            # запись ошибок в лог-файл на Google Диске
            error_str = f"Ошибка: {str(e)}, Сообщение: {message}\n"
            with open("dlq.txt", "a") as f:
                f.write(error_str)
            print(f"Ошибка: {str(e)}")
            return None  # Возвращаем None, если произошла ошибка
    
    # применяем функцию к датафрейму
    sdf_apps = sdf.apply(extract_apps)
    
    # Фильтруем только успешные преобразования
    sdf_valid_apps = sdf_apps.filter(lambda x: x is not None)
    
    sdf_valid_apps.print()
    
    # Записываем преобразованные данные в выходной топик Kafka
    sdf_valid_apps.to_topic(outputs_topic)
    
    # Запускаем потоковое приложение
    app.run()

    При запуске потокового приложения сперва выполняется проверка подключения, затем валидация наличия указанных топиков и возможности работы с ними, а также инициализация хранилища состояний, о котором я расскажу в другой раз. После всех проверок, наконец, начинается потребление сообщений в потоковый датафрейм и его обработка. Для записи данных в другой топик создается новый потоковый датафрейм, каждая строка которого публикуется в Kafka методом to_topic().

    Запуск потокового конвейера на Apache Kafka и Quix Streams
    Запуск потокового конвейера на Apache Kafka и Quix Streams

    Таким образом, по сравнению с классическими клиентами Kafka, такими как kafka-python или confluent-kafka, Quix Streams предоставляет довольно удобный интерфейс для работы с потоками данных, скрывая сложность низкоуровневых настроек этой платформы потоковой передачи событий. Возможность работать с потоками данных как с обычным датафреймом упрощает разработку и позволяет быстрее создавать приложения. Из интересных и полезных функций этой библиотеки стоит особенно отметить stateful-обработку и обновление потоковых данных в реальном времени. Об этом я расскажу в следующих статьях.

    Узнайте больше про построение сложных архитектур данных и использование Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

    Источники

    1. https://www.kai-waehner.de/blog/2023/05/28/quix-streams-stream-processing-with-kafka-and-python/
    2. https://quix.io/docs/quix-streams/processing/
    [elementor-template id="13619"]