AI SDK для Apache AirFlow: оркестрация LLM-задач

курсы дата-инженеров Airflow AI, искусственный интеллект в инженерии данных, ИИ LLM Airflow, Школа Больших Данных

Как LLM упрощают работу дата-инженера: новые декораторы TaskFlow API в Apache Airflow для внедрения больших языковых моделей в DAG. Обзор Airflow AI SDK на основе Pydantic AI с практическим примером про анализ отзывов.

ИИ в инженерии данных

Мультимодальность современных инструментов машинного обучения, когда одна ML-модель может принимать на вход данные разного формата и характера (изображения, текст и аудио) позволяет быстрее получить результат прогнозирования или классификации благодаря расширению контекста. Благодаря этому можно создавать сложные конвейеры обработки огромных объемов данных, которые почти мгновенно извлекают структурированные идеи из неструктурированных источников.

В качестве примера рассмотрим процесс анализа отзывов, который состоит из следующих шагов:

  • сбор данных из различных источников (веб-формы, email, соцсети);
  • предварительная обработка — очистка и нормализация данных;
  • классификация отзывов по категориям (качество продукта, обслуживание, цена);
  • анализ тональности каждого отзыва;
  • определение дальнейших действий, например, эскалация критически негативных отзывов;
  • генерация отчетов и рекомендаций;
  • распределение задач по обработке специальных запросов, например, технические или методические вопросы;
  • генерация ответов;
  • оценка сгенерированных ответов для повышения их качества и релевантности;
  • сохранение обработанных данных и отчетов в хранилище данных
  • отправка уведомлений о результатах анализа.
Процесс анализа отзывов
Процесс анализа отзывов

Большие языковые модели (LLM, Large Language Model) отлично справляются с анализом информации и генерацией выводов, обеспечивая предсказуемую гибкость. Поэтому можно выделить следующие паттерны применения LLM в рабочих процессах обработки данных:

  • цепочка промптов — последовательные шаги, в которых каждая LLM обрабатывает выходные данные предыдущего шага;
  • маршрутизация — классификатор LLM направляет входные данные в специализированные последующие процессы;
  • распараллеливание — одновременно выполняется несколько вызовов LLM, после чего результаты суммируются;
  • распределение задач, когда центральная LLM распределяет задачи между исполнителями — специализированными LLM;
  • оценщик-оптимизатор, где одна LLM генерирует ответы, а другая оценивает и уточняет их.

Эти шаблоны дополняют друг друга и настраиваются для решения сложных бизнес-задач, сохраняя контроль и наблюдаемость, которые требуются в производстве. Применительно к нашему примеру конвейер обработки данных с использованием LLM-паттернов может выглядеть так:

  • сбор данных и предварительная обработка являются стартовыми этапами конвейера;
  • цепочка промптов включает в себя последовательные вызовы LLM 1 и LLM 2, где каждая следующая модель обрабатывает выходные данные предыдущей;
  • маршрутизация осуществляется через LLM 3, которая определяет дальнейшие действия на основе анализа данных;
  • на этапе распараллеливания одновременно выполняются LLM 4 и LLM 5, после чего их результаты объединяются и создаются задачи для последующих ML-моделей;
  • распределение задач выполняется с помощью LLM 6, которая направляет специфические запросы к специализированным моделям LLM A и LLM B;
  • оценщик-оптимизатор состоит из LLM 7, генерирующей ответы, и LLM 8, которая оценивает и улучшает эти ответы;
  • конвейер завершается хранением результатов и отправкой уведомлений.
Конвейер обработки данных с использованием LLM-паттернов
Конвейер обработки данных с использованием LLM-паттернов

Чтобы использовать эти паттерны применения LLM в рабочих процессах, компания Astro разработала Airflow AI SDK, который расширяет Apache Airflow с помощью специфических для LLM возможностей на основе Pydantic AI. PydanticAI — это фреймворк Python-агентов, упрощающий создание промышленных приложений с использованием генеративного ИИ. Он поддерживает ML-модели от OpenAI, Anthropic, Gemini, Deepseek, Ollama, Groq, Cohere и Mistral, а также имеет простой интерфейс для реализации поддержки моделей других провайдеров. Благодаря бесшовной интеграции с Pydantic Logfire обеспечивается отладка приложений на основе LLM в реальном времени, мониторинг производительности и отслеживание поведения. Фреймворк обеспечивает типобезопасность и возможность непрерывной потоковой передачи выходных данных LLM с немедленной проверкой, гарантируя быстрые и точные результаты. Как он работает, я покажу в другой раз, когда получу ключ API для какой-нибудь LLM-модели, а пока вернемся к Airflow.

Data Pipeline на Apache Airflow

Код курса
AIRF
Ближайшая дата курса
2 июня, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.

Как устроен Airflow AI SDK

Airflow AI SDK включает 3 основных декоратора:

  1. @task.llm — определение задач, вызывающих языковые модели с автоматическим анализом выходных данных;
  2. @task.agent – организует многошаговые рассуждения ИИ с помощью специальных инструментов;
  3. @task.llm_branch — изменение потока управления рабочим процессом на основе выходных данных LLM.

Функция, предоставляемая каждому декоратору, является функцией перевода, которая преобразует ввод задачи Airflow во ввод LLM. Используя эти декораторы TaskFlow API, можно написать DAG Airflow, состоящий из задач вызова LLM для различных вариантов использования, от модерации контента до извлечения данных и поддержки принятия решений. Например, для определения задач в примере с анализом отзывов можно использовать декоратор @task.llm:

class ProductFeedbackSummary(ai_sdk.BaseModel):
    summary: str
    sentiment: Literal["positive", "negative", "neutral"]
    feature_requests: list[str]

@task.llm(
    model="gpt-4o-mini",
    result_type=ProductFeedbackSummary,
    system_prompt="Extract the summary, sentiment, and feature requests from the product feedback.",
)
def summarize_product_feedback(feedback: str | None = None) -> ProductFeedbackSummary:
    """
    This task summarizes the product feedback. You can add logic here to transform the input to the LLM, to (for example) perform PII redaction.
    """
    feedback_anonymized = mask_pii(feedback)
    return feedback_anonymized

В этом коде определена задача Airflow, которая с помощью упрощенной версии GPT-4 извлекает из отзыва краткое содержание, тональность (позитивная, негативная, нейтральная) и запросы на улучшение или добавление функций продукта. Перед обработкой отзыва выполняется анонимизация для защиты персональных данных пользователей.

Для более сложных сценариев Airflow AI SDK поддерживает рабочие процессы на основе агентов, которые выполняют специализированные задачи внутри общего процесса. Каждый агент обладает определёнными функциями и может взаимодействовать с другими агентами для достижения комплексных целей. Это позволяет создавать более гибкие и масштабируемые рабочие процессы, легко адаптирующиеся к изменениям и сложным требованиям.

При этом сам Airflow обеспечивает логирование и наблюдение за выполнение DAG, позволяя обойтись без специализированных инструментов мониторинга LLM. Airflow AI SDK автоматически организует каждый вызов инструмента в собственную группу журналов, чтобы видеть каждый шаг агента.

Таким образом, Apache AirFlow в очередной раз подтверждает статус самого популярного оркестратора рабочих процессов, который постоянно развивается и упрощает работу дата-инженера. Новый AI SDK на базе Pydantic AI значительно расширяет возможности Airflow, делая его более мощным и гибким инструментом для инженерии данных. Это позволяет не только автоматизировать рутинные задачи, но и внедрять современные ИИ-технологии для решения сложных бизнес-задач.

Освойте все эти и другие тонкости администрирования и эксплуатации Apache AirFlow вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://www.astronomer.io/blog/workflows-then-agents/
  2. https://github.com/astronomer/airflow-ai-sdk
  3. https://ai.pydantic.dev/
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.