Быстрая и простая обработка потоков сообщений в одном приложении с Quix Streams вместо Kafka Streams: практический пример на Python с обогащением и фильтрацией данных.
Практический пример потокового конвейера с Apache Kafka и Quix Streams
Сегодня я познакомилась с Quix Streams — очередной замечательной библиотекой для создания потоковых конвейеров обработки данных на Python. Она работает подобно Kafka Streams, но использует Python вместо Java. Quix Streams позволяет строить потоковые конвейеры обработки данных в реальном времени, используя знакомые каждому дата-инженеру датафреймы с гарантиями масштабируемости, отказоустойчивости и долговечности Kafka. Помимо API потоковых данных датафреймов, включая оконные и агрегатные функции, библиотека поддерживает API сериализаторов JSON, Avro, Protobuf и реестра схем, а также stateful-обработку со встроенным бэкендом RocksDB. Гарантии однократной обработки с помощью транзакций Kafka позволяют использовать Quix Streams для создания простых приложений продюсеров и потребителей, а также разрабатывать сложные потоковые системы, управляемые событиями.
Рассмотрим в качестве примера приложение, которое потребляет входные данные в потоковый датафрейм из одного топика Kafka (InputsTopic), обогащает их и записывает в другой топик (CorpAppsTopic). Таким образом, одно и то же приложение одновременно является и продюсером и потребителем Kafka.

Основным интерфейсом для определения конвейеров потоковой обработки в Quix Streams является StreamingDataFrame. Он позволяет обрабатывать и преобразовывать данные из одного формата в другой, обновляя и фильтруя их прямо в потоке. Операции с потоковыми данными при этом похожи на работу с обычным датафреймом pandas. StreamingDataFrame поддерживает концепцию отложенных (ленивых) вычислений и является конвейером, который содержит все преобразования для входящих сообщений. При этом он не сохраняет фактически потребленные данные в памяти, а только объявляет, как они должны быть преобразованы. Все манипуляции с StreamingDataFrame могут быть добавлены как по отдельности, так и в виде цепочки. Также можно разветвить конвейер для создания нескольких путей выполнения. Все потоковые датафреймы, сгенерированные с помощью конструкций Applicationwith, Application.dataframe() будут автоматически обнаружены и запущены при запуске потокового приложения Application.run().
В моем примере внешнее приложение-продюсер публикует в топик InputsTopic с входящие события пользовательского поведения: заявки на покупку продуктов в интернет-магазине или вопросы покупателей. Приложение app подключается к брокеру Kafka, считывает эти данные в потоковый датафрейм sdf, и, если это заявка на покупку продукта, добавляет к ней номер, а потом записывает это дополненное сообщение в топик CorpAppsTopic.
Код потокового приложения с библиотекой Quix Streams выглядит так:
!pip install quixstreams
from quixstreams import Application
from quixstreams.kafka.configuration import ConnectionConfig
connection = ConnectionConfig(
bootstrap_servers=kafka_url,
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-256",
sasl_username=username,
sasl_password=password
)
# приложение для Kafka
app = Application(broker_address=connection)
# определение топиков
inputs_topic = app.topic("InputsTopic", value_deserializer="json")
outputs_topic = app.topic("CorpAppsTopic", value_serializer="json")
# потоковый датафрейм из входного топика Kafka
sdf = app.dataframe(topic=inputs_topic)
# Счётчик для нумерации заявок
app_counter = 0
# Функция для выявления заявок
def extract_apps(message):
global app_counter
try:
content_parts = message["content"].rsplit(' ', 1)
if len(content_parts) != 2:
raise ValueError("Это не заявка на покупку")
product, quantity_str = content_parts
quantity = int(quantity_str)
app_counter += 1 # Увеличиваем счётчик при успешной обработке
return {
"id": message["id"],
"name": message["name"],
"subject": message["subject"],
"product": product,
"quantity": quantity,
"app_number": app_counter # Добавляем номер заявки
}
except Exception as e:
# запись ошибок в лог-файл на Google Диске
error_str = f"Ошибка: {str(e)}, Сообщение: {message}\n"
with open("dlq.txt", "a") as f:
f.write(error_str)
print(f"Ошибка: {str(e)}")
return None # Возвращаем None, если произошла ошибка
# применяем функцию к датафрейму
sdf_apps = sdf.apply(extract_apps)
# Фильтруем только успешные преобразования
sdf_valid_apps = sdf_apps.filter(lambda x: x is not None)
sdf_valid_apps.print()
# Записываем преобразованные данные в выходной топик Kafka
sdf_valid_apps.to_topic(outputs_topic)
# Запускаем потоковое приложение
app.run()
При запуске потокового приложения сперва выполняется проверка подключения, затем валидация наличия указанных топиков и возможности работы с ними, а также инициализация хранилища состояний, о котором я расскажу в другой раз. После всех проверок, наконец, начинается потребление сообщений в потоковый датафрейм и его обработка. Для записи данных в другой топик создается новый потоковый датафрейм, каждая строка которого публикуется в Kafka методом to_topic().

Таким образом, по сравнению с классическими клиентами Kafka, такими как kafka-python или confluent-kafka, Quix Streams предоставляет довольно удобный интерфейс для работы с потоками данных, скрывая сложность низкоуровневых настроек этой платформы потоковой передачи событий. Возможность работать с потоками данных как с обычным датафреймом упрощает разработку и позволяет быстрее создавать приложения. Из интересных и полезных функций этой библиотеки стоит особенно отметить stateful-обработку и обновление потоковых данных в реальном времени. Об этом я расскажу в следующих статьях.
Узнайте больше про построение сложных архитектур данных и использование Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники


