Расширение возможностей Apache AirFlow с помощью плагинов

Расширение возможностей Apache AirFlow с помощью плагинов

Содержание

    Зачем нужны плагины в Apache AirFlow, как их создать и встроить в пакетный оркестратор для внедрения пользовательских операторов, хуков, датчиков или интерфейсов взаимодействия с  внешними системами.

    Плагины AirFlow

    Продолжая недавний разговор про добавление пользовательского кода в Apache AirFlow, сегодня разберемся, как расширить функциональные возможности этого ETL-оркестратора с помощью встраиваемых модулей – плагинов. Плагины можно рассматривать как простой способ создания, распространения и активации новых функций, а также инструмент взаимодействия с различными типами данных и метаданных.

    Airflow имеет простой встроенный менеджер плагинов, который может интегрировать внешние функции в свое ядро, просто перетаскивая файлы в папку plugins рабочей директории фреймворка. После импорта Python-модулей в папку plugins макросы и веб-представления интегрируются в основные коллекции Airflow и становятся доступными для использования. Плагины могут вводить новые операторы, хуки, датчики или интерфейсы для внешних систем, расширяя возможности фреймворка. Начиная с версии 2.0 импорт операторов, датчиков, хуков, добавленных в плагины через airflow.{operators,sensors,hooks}.<plugin_name> больше не поддерживается: эти расширения должны быть импортированы как обычные модули Python.

    Для плагинов поддерживается отложенная загрузка, после которой они никогда не перезагружаются, кроме плагинов пользовательского интерфейса, которые автоматически загружаются в веб-сервер. Чтобы загружать их в начале каждого процесса Airflow, надо установить в [core]-разделе конфигурационного файла airflow.cfg конфигурацию lazy_load_plugins = False. Это означает, что при внесении изменения в плагины придется вручную перезагрузить веб-сервер или планировщик, использующий измененный код. Однако, это не отразится на новых запущенных задачах, пока не загрузится планировщик. По умолчанию выполнение задачи использует разветвление. Это позволяет избежать замедления, связанного с созданием нового интерпретатора Python и повторным анализом всего кода Airflow и процедур запуска. Такой подход дает значительные преимущества, особенно для коротких задач. Также придется перезапустить рабочий процесс, если используется Celery-исполнитель, или планировщик для локальных или Sequence-исполнителей. Альтернативой будет установка параметра конфигурации core.execute_tasks_new_python_interpreter в значение True, что приведет к запуску совершенно нового интерпретатора Python для задач. Плагины, импортированные только с помощью файлов DAG, не сталкиваются с этой проблемой, поскольку файлы DAG не загружаются и не анализируются ни в одном длительном процессе Airflow. Чтобы включить автоматическую перезагрузку веб-сервера при обнаружении изменений в каталоге с плагинами, необходимо в разделе [webserver] конфигурационного файла airflow.cfg установить параметр reload_on_plugin_change в значение True.

    Чтобы создать плагин, надо определить класс, который наследует AirflowPlugin и включить нужные компоненты как атрибуты класса:

    from airflow.plugins_manager import AirflowPlugin 
    class MyCustomPlugin(AirflowPlugin): 
    name = "my_custom_plugin" 
    operators = [] 
    hooks = [] 
    executors = [] 
    admin_views = [] 
    flask_blueprints = [] 
    menu_links = []

    В этом примере все параметры определены как атрибуты класса, но можно определить их как свойства, если нужно выполнить дополнительную инициализацию. Обязательно нужно указать название, заполнив атрибут name.

    Чтобы встроить плагин в Apache Airflow, надо сперва сохранить его в правильный каталог. Обычно структура каталогов выглядит так:

    airflow /
      └── plugins/
          └── my_plugin/
              ├── __init__.py
              ├── my_plugin.py
              ├── templates/
              │   └── test_plugin/
              │       └── test/
              └── static/
                  └── test_plugin/
                      └── example_static_file.css

    В этой структуре каталогов:

    • Airflow — это рабочая директория фреймворка;
    • plugins — это папка, в которой Airflow ищет плагины;
    • my_plugin — это папка для плагина, которую можно назвать как угодно;
    • __init__.py — этот файл может быть пустым, но он нужен, чтобы Python распознал папку как пакет;
    • my_plugin.py — файл с кодом плагина;
    • templates и static — папки для HTML-шаблонов и статических файлов, которые использует фреймворк Flask. В частности, именно в папке templates надо сохранить HTML-файл страницы, если предполагается визуальная часть представления в плагине.

    После создания структуры каталогов и добавления кода следует перезапустить веб-сервер и планировщик Airflow. После перезапуска веб-сервера и планировщика пользовательский плагин будет зарегистрирован, и изменения будут видны в веб-интерфейсе фреймворка. Как это сделать, я покажу на практическом примере завтра в следующей статье, а пока отмечу, что при создании и интеграции своих плагинов следует убедиться, что имена плагинов не конфликтуют с существующими компонентами фреймворка. В частности, во избежание конфликтов надо соблюдать соглашения об именования, чтобы обеспечить читаемость и простоту обслуживания своего кода. Разумеется, перед интеграцией нового плагина в производственную среду его необходимо тщательно протестировать, чтобы не допустить сбоев в рабочих процессах. Эти рекомендации помогут гибко адаптировать Apache Airflow к реальным задачам дата-инженерии.

    Узнайте больше про администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

    Источники

    1. https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/plugins/
    2. https://www.restack.io/docs/airflow-knowledge-apache-plugins-list

    [elementor-template id="13619"]