Зачем нужны плагины в Apache AirFlow, как их создать и встроить в пакетный оркестратор для внедрения пользовательских операторов, хуков, датчиков или интерфейсов взаимодействия с внешними системами.
Плагины AirFlow
Продолжая недавний разговор про добавление пользовательского кода в Apache AirFlow, сегодня разберемся, как расширить функциональные возможности этого ETL-оркестратора с помощью встраиваемых модулей – плагинов. Плагины можно рассматривать как простой способ создания, распространения и активации новых функций, а также инструмент взаимодействия с различными типами данных и метаданных.
Airflow имеет простой встроенный менеджер плагинов, который может интегрировать внешние функции в свое ядро, просто перетаскивая файлы в папку plugins рабочей директории фреймворка. После импорта Python-модулей в папку plugins макросы и веб-представления интегрируются в основные коллекции Airflow и становятся доступными для использования. Плагины могут вводить новые операторы, хуки, датчики или интерфейсы для внешних систем, расширяя возможности фреймворка. Начиная с версии 2.0 импорт операторов, датчиков, хуков, добавленных в плагины через airflow.{operators,sensors,hooks}.<plugin_name> больше не поддерживается: эти расширения должны быть импортированы как обычные модули Python.
Для плагинов поддерживается отложенная загрузка, после которой они никогда не перезагружаются, кроме плагинов пользовательского интерфейса, которые автоматически загружаются в веб-сервер. Чтобы загружать их в начале каждого процесса Airflow, надо установить в [core]-разделе конфигурационного файла airflow.cfg конфигурацию lazy_load_plugins = False. Это означает, что при внесении изменения в плагины придется вручную перезагрузить веб-сервер или планировщик, использующий измененный код. Однако, это не отразится на новых запущенных задачах, пока не загрузится планировщик. По умолчанию выполнение задачи использует разветвление. Это позволяет избежать замедления, связанного с созданием нового интерпретатора Python и повторным анализом всего кода Airflow и процедур запуска. Такой подход дает значительные преимущества, особенно для коротких задач. Также придется перезапустить рабочий процесс, если используется Celery-исполнитель, или планировщик для локальных или Sequence-исполнителей. Альтернативой будет установка параметра конфигурации core.execute_tasks_new_python_interpreter в значение True, что приведет к запуску совершенно нового интерпретатора Python для задач. Плагины, импортированные только с помощью файлов DAG, не сталкиваются с этой проблемой, поскольку файлы DAG не загружаются и не анализируются ни в одном длительном процессе Airflow. Чтобы включить автоматическую перезагрузку веб-сервера при обнаружении изменений в каталоге с плагинами, необходимо в разделе [webserver] конфигурационного файла airflow.cfg установить параметр reload_on_plugin_change в значение True.
Чтобы создать плагин, надо определить класс, который наследует AirflowPlugin и включить нужные компоненты как атрибуты класса:
from airflow.plugins_manager import AirflowPlugin class MyCustomPlugin(AirflowPlugin): name = "my_custom_plugin" operators = [] hooks = [] executors = [] admin_views = [] flask_blueprints = [] menu_links = []
В этом примере все параметры определены как атрибуты класса, но можно определить их как свойства, если нужно выполнить дополнительную инициализацию. Обязательно нужно указать название, заполнив атрибут name.
Чтобы встроить плагин в Apache Airflow, надо сперва сохранить его в правильный каталог. Обычно структура каталогов выглядит так:
airflow / └── plugins/ └── my_plugin/ ├── __init__.py ├── my_plugin.py ├── templates/ │ └── test_plugin/ │ └── test.html └── static/ └── test_plugin/ └── example_static_file.css
В этой структуре каталогов:
- Airflow — это рабочая директория фреймворка;
- plugins — это папка, в которой Airflow ищет плагины;
- my_plugin — это папка для плагина, которую можно назвать как угодно;
- __init__.py — этот файл может быть пустым, но он нужен, чтобы Python распознал папку как пакет;
- my_plugin.py — файл с кодом плагина;
- templates и static — папки для HTML-шаблонов и статических файлов, которые использует фреймворк Flask. В частности, именно в папке templates надо сохранить HTML-файл страницы, если предполагается визуальная часть представления в плагине.
После создания структуры каталогов и добавления кода следует перезапустить веб-сервер и планировщик Airflow. После перезапуска веб-сервера и планировщика пользовательский плагин будет зарегистрирован, и изменения будут видны в веб-интерфейсе фреймворка. Как это сделать, я покажу на практическом примере завтра в следующей статье, а пока отмечу, что при создании и интеграции своих плагинов следует убедиться, что имена плагинов не конфликтуют с существующими компонентами фреймворка. В частности, во избежание конфликтов надо соблюдать соглашения об именования, чтобы обеспечить читаемость и простоту обслуживания своего кода. Разумеется, перед интеграцией нового плагина в производственную среду его необходимо тщательно протестировать, чтобы не допустить сбоев в рабочих процессах. Эти рекомендации помогут гибко адаптировать Apache Airflow к реальным задачам дата-инженерии.
Узнайте больше про администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Data Pipeline на Apache AirFlow и Apache Hadoop
- AIRFLOW с использованием Yandex Managed Service for Apache Airflow™
Источники
- https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/plugins.html
- https://www.restack.io/docs/airflow-knowledge-apache-plugins-list