Расширение возможностей Apache AirFlow с помощью плагинов

Apache AirFlow примеры курсы обучение, Apache AirFlow развертывание администрирование оптимизация, Apache AirFlow для дата-инженеров и администраторов, Школа Больших данных Учебный центр Коммерсант

Зачем нужны плагины в Apache AirFlow, как их создать и встроить в пакетный оркестратор для внедрения пользовательских операторов, хуков, датчиков или интерфейсов взаимодействия с  внешними системами.

Плагины AirFlow

Продолжая недавний разговор про добавление пользовательского кода в Apache AirFlow, сегодня разберемся, как расширить функциональные возможности этого ETL-оркестратора с помощью встраиваемых модулей – плагинов. Плагины можно рассматривать как простой способ создания, распространения и активации новых функций, а также инструмент взаимодействия с различными типами данных и метаданных.

Airflow имеет простой встроенный менеджер плагинов, который может интегрировать внешние функции в свое ядро, просто перетаскивая файлы в папку plugins рабочей директории фреймворка. После импорта Python-модулей в папку plugins макросы и веб-представления интегрируются в основные коллекции Airflow и становятся доступными для использования. Плагины могут вводить новые операторы, хуки, датчики или интерфейсы для внешних систем, расширяя возможности фреймворка. Начиная с версии 2.0 импорт операторов, датчиков, хуков, добавленных в плагины через airflow.{operators,sensors,hooks}.<plugin_name> больше не поддерживается: эти расширения должны быть импортированы как обычные модули Python.

Для плагинов поддерживается отложенная загрузка, после которой они никогда не перезагружаются, кроме плагинов пользовательского интерфейса, которые автоматически загружаются в веб-сервер. Чтобы загружать их в начале каждого процесса Airflow, надо установить в [core]-разделе конфигурационного файла airflow.cfg конфигурацию lazy_load_plugins = False. Это означает, что при внесении изменения в плагины придется вручную перезагрузить веб-сервер или планировщик, использующий измененный код. Однако, это не отразится на новых запущенных задачах, пока не загрузится планировщик. По умолчанию выполнение задачи использует разветвление. Это позволяет избежать замедления, связанного с созданием нового интерпретатора Python и повторным анализом всего кода Airflow и процедур запуска. Такой подход дает значительные преимущества, особенно для коротких задач. Также придется перезапустить рабочий процесс, если используется Celery-исполнитель, или планировщик для локальных или Sequence-исполнителей. Альтернативой будет установка параметра конфигурации core.execute_tasks_new_python_interpreter в значение True, что приведет к запуску совершенно нового интерпретатора Python для задач. Плагины, импортированные только с помощью файлов DAG, не сталкиваются с этой проблемой, поскольку файлы DAG не загружаются и не анализируются ни в одном длительном процессе Airflow. Чтобы включить автоматическую перезагрузку веб-сервера при обнаружении изменений в каталоге с плагинами, необходимо в разделе [webserver] конфигурационного файла airflow.cfg установить параметр reload_on_plugin_change в значение True.

Чтобы создать плагин, надо определить класс, который наследует AirflowPlugin и включить нужные компоненты как атрибуты класса:

from airflow.plugins_manager import AirflowPlugin 
class MyCustomPlugin(AirflowPlugin): 
name = "my_custom_plugin" 
operators = [] 
hooks = [] 
executors = [] 
admin_views = [] 
flask_blueprints = [] 
menu_links = []

В этом примере все параметры определены как атрибуты класса, но можно определить их как свойства, если нужно выполнить дополнительную инициализацию. Обязательно нужно указать название, заполнив атрибут name.

Чтобы встроить плагин в Apache Airflow, надо сперва сохранить его в правильный каталог. Обычно структура каталогов выглядит так:

airflow /
  └── plugins/
      └── my_plugin/
          ├── __init__.py
          ├── my_plugin.py
          ├── templates/
          │   └── test_plugin/
          │       └── test.html
          └── static/
              └── test_plugin/
                  └── example_static_file.css

В этой структуре каталогов:

  • Airflow — это рабочая директория фреймворка;
  • plugins — это папка, в которой Airflow ищет плагины;
  • my_plugin — это папка для плагина, которую можно назвать как угодно;
  • __init__.py — этот файл может быть пустым, но он нужен, чтобы Python распознал папку как пакет;
  • my_plugin.py — файл с кодом плагина;
  • templates и static — папки для HTML-шаблонов и статических файлов, которые использует фреймворк Flask. В частности, именно в папке templates надо сохранить HTML-файл страницы, если предполагается визуальная часть представления в плагине.

После создания структуры каталогов и добавления кода следует перезапустить веб-сервер и планировщик Airflow. После перезапуска веб-сервера и планировщика пользовательский плагин будет зарегистрирован, и изменения будут видны в веб-интерфейсе фреймворка. Как это сделать, я покажу на практическом примере завтра в следующей статье, а пока отмечу, что при создании и интеграции своих плагинов следует убедиться, что имена плагинов не конфликтуют с существующими компонентами фреймворка. В частности, во избежание конфликтов надо соблюдать соглашения об именования, чтобы обеспечить читаемость и простоту обслуживания своего кода. Разумеется, перед интеграцией нового плагина в производственную среду его необходимо тщательно протестировать, чтобы не допустить сбоев в рабочих процессах. Эти рекомендации помогут гибко адаптировать Apache Airflow к реальным задачам дата-инженерии.

Узнайте больше про администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/plugins.html
  2. https://www.restack.io/docs/airflow-knowledge-apache-plugins-list

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту