Как разработать свой плагин Apache AirFlow: пошаговое руководство с наглядной демонстрацией. Добавляем свои пункты меню в веб-интерфейс фреймворка и встраиваем пользовательскую HTML-страницу с новым эскизом Flask.
Разработка своего плагина для AirFlow
Вчера я рассказывала, как расширить функциональные возможности Apache AirFlow с помощью плагинов. Сегодня рассмотрим, как это сделать на практике. Чтобы вы могли повторить это несложное упражнение, в качестве среды развертывания Airflow возьму Google Colab, туннелировав веб-сервер с помощью утилиты ngrok. Сперва установим необходимые библиотеки и импортируем модули, а также создадим директории для размещения компонентов плагина.
# Установка Apache Airflow версии 2.7.3 и провайдера для Google !pip install "apache-airflow==2.7.3" apache-airflow-providers-google==10.12.0 # Установка провайдера для Amazon !pip install apache-airflow-providers-amazon # Установка Flask-Session версии 0.5 !pip install flask-session==0.5 # Установка Connexion с поддержкой Swagger UI !pip install connexion[swagger-ui] # Установка Pyngrok для создания публичных URL через ngrok !pip install pyngrok # Импорт необходимых модулей from pyngrok import ngrok import sys import os import json import datetime from airflow import DAG # Монтирование Google Диска к Colab from google.colab import drive drive.mount('/content/drive') # Создание директории для DAGs Airflow os.makedirs('/content/airflow/dags', exist_ok=True) # Установка переменной окружения AIRFLOW_HOME os.environ['AIRFLOW_HOME'] = '/content/airflow' # Создание директории для DAGs в домашней папке dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags") os.makedirs(dags_folder, exist_ok=True) # Установка и аутентификация ngrok с использованием токена !ngrok authtoken 'ngrok-token' # Создание директорий для плагинов Airflow !mkdir -p /content/airflow/plugins/my_plugin/ !mkdir -p /content/airflow/plugins/my_plugin/static/test_plugin # Создание файла __init__.py для инициализации плагина !touch /content/airflow/plugins/my_plugin/__init__.py
Отдельно хочется сказать про пакет apache-airflow-providers-amazon: хотя напрямую он нигде в моем коде не используется, без него попытка зарегистрировать плагин в Airflow вызывала ошибку
Broken plugin: [/content/airflow/plugins/my_plugin/my_plugin.py] No module named 'airflow.providers.amazon'
Скорей всего, проблема связана с зависимостями Airflow, где многие интеграции вынесены в отдельные провайдеры, пакеты которых нужно вручную устанавливать.
Чтобы использовать пользовательский плагин, его нужно сохранить в правильный каталог. В моем случае структура каталогов выглядит так:
airflow / └── plugins/ └── my_plugin/ ├── __init__.py ├── my_plugin.py ├── templates/ │ └── test_plugin/ │ └── test.html └── static/ └── test_plugin/ └── example_static_file.css
В этой структуре каталогов:
- Airflow — рабочая директория фреймворка;
- plugins — папка, в которой Airflow ищет плагины;
- my_plugin — папка плагина;
- __init__.py — пустой файл, который нужен, чтобы Python распознал папку как пакет;
- my_plugin.py — файл с кодом плагина;
- templates и static — папки для HTML-шаблонов и статических файлов, которые использует фреймворк Flask. Ведь именно Flask использует Airflow в качестве основного веб-фреймворка для своего пользовательского интерфейса. В папке templates надо сохранить HTML-файл страницы для визуального представления пользовательского плагина.
Плагин будет отображать в меню веб-интерфейса фреймворка пользовательские элементы и HTML-страницу. Предположим, я хочу встроить в меню AirFlow ссылки на сайты нашей Школы Больших Данных и Школы прикладного бизнес-анализа и проектирования информационных систем. Также добавим пункт меню «Мой плагин», который будет перенаправлять на HTML-страницу визуального представления плагина. Код такого пользовательского плагина выглядит следующим образом:
# путь к файлу file_path = '/content/airflow/plugins/my_plugin/my_plugin.py' # код плагина plugin_code = """ # Импорт модулей from airflow.plugins_manager import AirflowPlugin from airflow.security import permissions from airflow.www.auth import has_access from flask import Blueprint from flask_appbuilder import expose, BaseView as AppBuilderBaseView from airflow.hooks.base import BaseHook # Определение пользовательского хука, который наследуется от BaseHook class PluginHook(BaseHook): pass # Определение пользовательского макроса def plugin_macro(): pass # Flask Blueprint для интеграции шаблонов и статических файлов bp = Blueprint( "test_plugin", __name__, template_folder="templates", # регистрирует airflow/plugins/templates как папку шаблонов Jinja static_folder="static", static_url_path="/static/test_plugin", ) # Определение класса представления Flask AppBuilder с меню class TestAppBuilderBaseView(AppBuilderBaseView): default_view = "test" @expose("/") @has_access( [ (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), ] ) def test(self): # Возвращает HTML-шаблон с контентом return self.render_template("test_plugin/test.html", content="HTML-страница представления пользовательского плагина") # Определение класса представления Flask AppBuilder без меню class TestAppBuilderBaseNoMenuView(AppBuilderBaseView): default_view = "test" @expose("/") @has_access( [ (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE), ] ) def test(self): # Возвращает HTML-шаблон с контентом return self.render_template("test_plugin/test.html", content="HTML-страница представления пользовательского плагина") # Создание экземпляра представления с меню v_appbuilder_view = TestAppBuilderBaseView() # Пакет для регистрации представления с меню v_appbuilder_package = { "name": "Перейти на страницу плагина", "category": "Мой плагин", "view": v_appbuilder_view, } # Создание экземпляра представления без меню v_appbuilder_nomenu_view = TestAppBuilderBaseNoMenuView() # Пакет для регистрации представления без меню v_appbuilder_nomenu_package = {"view": v_appbuilder_nomenu_view} # Элементы меню Flask AppBuilder appbuilder_mitem_1 = { "name": "Перейти на сайт", "href": "https://bigdataschool.ru/", "category": "Школа Больших Данных", } appbuilder_mitem_2 = { "name": "Перейти на сайт", "href": "https://babok-school.ru/", "category": "Школа прикладного бизнес-анализа и проектирования ИС", } # Определение класса плагина Airflow class AirflowTestPlugin(AirflowPlugin): name = "test_plugin" hooks = [PluginHook] # Регистрация пользовательского хука macros = [plugin_macro] # Регистрация пользовательского макроса flask_blueprints = [bp] # Регистрация Flask Blueprint appbuilder_views = [v_appbuilder_package, v_appbuilder_nomenu_package] # Регистрация представлений appbuilder_menu_items = [appbuilder_mitem_1, appbuilder_mitem_2] # Регистрация элементов меню """ # Запись кода в файл with open(file_path, 'w') as file: file.write(plugin_code) print(f"Плагин сохранён в {file_path}")
Этот плагин добавляет несколько расширений и интеграций в веб-интерфейс Airflow. AirflowPlugin является базовым классом для создания плагинов, Blueprint используется для создания эскизов Flask, чтобы управлять шаблонами и статическими файлами, BaseHook нужен для создания собственных хуков для подключения к различным сервисам. Flask использует концепцию эскизов (blueprint) для создания компонентов приложений и поддержки общих шаблонов внутри приложения или между приложениями. По сути, Blueprint — это способ организации Flask-приложения, который позволяет разбивать приложение на более мелкие и управляемые модули со своим маршрутом, обработчиками ошибок и пр. Так можно делить код на логические части, создавая отдельные файлы для различных функциональных областей Flask-приложения, таких как аутентификация, администрирование и пр.
Класс AirflowTestPlugin представляет собой главный класс плагина, который наследуется от AirflowPlugin. Он определяет различные компоненты плагина, такие как хуки, макросы, Flask Blueprints, представления и элементы меню, которые будут зарегистрированы в Airflow при загрузке плагина.
Класс PluginHook наследуется от BaseHook и его можно расширить для создания пользовательских хуков. Функция plugin_macro() – это макрос, который можно использовать в шаблонах Airflow. Он зарегистрирован как macros.test_plugin.plugin_macro. Чтобы интегрировать пользовательские страницы и статические ресурсы в веб-интерфейс Airflow, создается Blueprint с именем test_plugin, который указывает на папки с шаблонами и статическими файлами. Классы TestAppBuilderBaseView и TestAppBuilderBaseNoMenuView наследуются от AppBuilderBaseView и используются для создания пользовательских страниц во Flask AppBuilder. Методы test() отображают шаблон test_plugin/test.html с параметром content, содержащим данные для отображения в HTML-странице. Чтобы контролировать доступ к этим HTML-страницам, используется декоратор @has_access.
Для создания представлений и меню создаются v_appbuilder_view и v_appbuilder_nomenu_view как экземпляры соответствующих классов представлений. Они упаковываются в словари v_appbuilder_package и v_appbuilder_nomenu_package, которые затем регистрируются в плагине. Также создаются элементы меню appbuilder_mitem_1 и appbuilder_mitem_2, которые добавляют ссылки на внешние сайты в меню веб-интерфейса Airflow.
Разметка HTML-страницы для визуального представления пользовательского плагина выглядит так:
#HTML-страница представления пользовательского плагина html_content = """ <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>Test Plugin View</title> <style> body { font-family: Arial, sans-serif; } .container { margin: 20px; } h1 { color: #333; } </style> </head> <body> <div class="container"> <h1>Это представление пользовательского плагина</h1> <p>{{ content }}</p> <img src="{{ url_for('test_plugin.static', filename='aiplug_0.png') }}" style="display: block; margin-top: 20px;"> <a href='/'>Вернуться назад в интерфейс AirFlow</a> </div> </body> </html> """ # Путь к директории directory_path = '/content/airflow/plugins/my_plugin/templates/test_plugin/' # Создаем директорию, если она не существует import os os.makedirs(directory_path, exist_ok=True) # Путь к файлу file_path = os.path.join(directory_path, 'test.html') # Записываем HTML-контент в файл with open(file_path, 'w') as file: file.write(html_content)
Поскольку в этом примере я решила отображать картинку на HTML-странице, путь к ней надо указать относительно директории, где размещен плагин в структуре проекта Airflow. Обычно, статические файлы для плагинов помещаются в папку static внутри каталога плагина.
Сохранив плагин, HTML-страницу и файл картинки в соответствующих папках, можно запускать Airflow. Я сделаю это локально на порту 8888, туннелировав веб-сервер фреймворка с помощью ngrok:
# Запуск Airflow в режиме демона !airflow standalone --daemon # Инициализация базы данных Airflow !airflow db init # Запуск веб-сервера Airflow на порте 8888 get_ipython().system_raw('airflow webserver --port 8888 &') # Запуск ngrok для проксирования локального порта 8080 в публичный URL get_ipython().system_raw('./ngrok http 8080 &') !curl -s http://localhost:4040/api/tunnels | python3 -c \ "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])" # Установка порта для доступа к Airflow addr = 8888 # Создание публичного URL для доступа к веб-интерфейсу Airflow через ngrok public_url = ngrok.connect(addr) # Вывод публичного URL для доступа к веб-интерфейсу Airflow print("Адрес Airflow GUI:", public_url) # Обновление базы данных Airflow до последней версии !airflow db upgrade # Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, # адресом электронной почты anna@a.ru и паролем pass. Этот пользователь будет иметь роль Admin, # которая дает полный доступ к интерфейсу Airflow. !airflow users create --username anna --firstname Anna --lastname Anna --email anna@a.ru --role Admin --password pass
Просмотреть зарегистрированные таблицы можно в CLI-интерфейсе Colab, вызвав команду
#просмотр установленных плагинов !airflow plugins
Также успешно зарегистрированные плагины отображаются в веб-интерфейсе Airflow:
Благодаря плагину, меню Airflow изменилось: в него добавлены новые пункты:
Переход на страницу добавленного плагина отображает ее визуальное представление:
При разработке и отладке этого плагина мне приходилось постоянно перезапускать веб-сервер и планировщик фреймворка. Чтобы сделать это быстро в Google Colab, просто останавливала процессы и запускала фреймворк в standalone-режиме снова. Для остановки процессов использовала команду !pkill:
!pkill -f "airflow webserver" !pkill -f "airflow scheduler" !pkill -f "ngrok"
Таким образом, добавить свой плагин в Airflow оказалось не так сложно, однако, нужно очень внимательно сохранять все добавляемые компоненты в правильных папках согласно структуре каталогов Airflow-проекта с учетом особенностей как самого ETL-оркестратора, так и используемого им фреймворка Flask.
Узнайте больше про администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: