Разработка и добавление своего плагина в Apache AirFlow: практический пример

Apache AirFlow примеры курсы обучение, Apache AirFlow развертывание администрирование оптимизация, Apache AirFlow для дата-инженеров и администраторов, Школа Больших данных Учебный центр Коммерсант

Как разработать свой плагин Apache AirFlow: пошаговое руководство с наглядной демонстрацией. Добавляем свои пункты меню в веб-интерфейс фреймворка и встраиваем пользовательскую HTML-страницу с новым эскизом Flask.

Разработка своего плагина для AirFlow

Вчера я рассказывала, как расширить функциональные возможности Apache AirFlow с помощью плагинов. Сегодня рассмотрим, как это сделать на практике. Чтобы вы могли повторить это несложное упражнение, в качестве среды развертывания Airflow возьму Google Colab, туннелировав веб-сервер с помощью утилиты ngrok. Сперва установим необходимые библиотеки и импортируем модули, а также создадим директории для размещения компонентов плагина.

# Установка Apache Airflow версии 2.7.3 и провайдера для Google
!pip install "apache-airflow==2.7.3" apache-airflow-providers-google==10.12.0

# Установка провайдера для Amazon
!pip install apache-airflow-providers-amazon

# Установка Flask-Session версии 0.5
!pip install flask-session==0.5

# Установка Connexion с поддержкой Swagger UI
!pip install connexion[swagger-ui]

# Установка Pyngrok для создания публичных URL через ngrok
!pip install pyngrok

# Импорт необходимых модулей
from pyngrok import ngrok
import sys
import os
import json
import datetime
from airflow import DAG

# Монтирование Google Диска к Colab
from google.colab import drive
drive.mount('/content/drive')

# Создание директории для DAGs Airflow
os.makedirs('/content/airflow/dags', exist_ok=True)

# Установка переменной окружения AIRFLOW_HOME
os.environ['AIRFLOW_HOME'] = '/content/airflow'

# Создание директории для DAGs в домашней папке
dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags")
os.makedirs(dags_folder, exist_ok=True)

# Установка и аутентификация ngrok с использованием токена
!ngrok authtoken 'ngrok-token'

# Создание директорий для плагинов Airflow
!mkdir -p /content/airflow/plugins/my_plugin/
!mkdir -p /content/airflow/plugins/my_plugin/static/test_plugin

# Создание файла __init__.py для инициализации плагина
!touch /content/airflow/plugins/my_plugin/__init__.py

Отдельно хочется сказать про пакет apache-airflow-providers-amazon: хотя напрямую он нигде в моем коде не используется, без него попытка зарегистрировать плагин в Airflow вызывала ошибку

Broken plugin: [/content/airflow/plugins/my_plugin/my_plugin.py] No module named 'airflow.providers.amazon'

Скорей всего, проблема связана с зависимостями Airflow, где многие интеграции вынесены в отдельные провайдеры, пакеты которых нужно вручную устанавливать.

Чтобы использовать пользовательский плагин, его нужно сохранить в правильный каталог. В моем случае структура каталогов выглядит так:

airflow /
  └── plugins/
      └── my_plugin/
          ├── __init__.py
          ├── my_plugin.py
          ├── templates/
          │   └── test_plugin/
          │       └── test.html
          └── static/
              └── test_plugin/
                  └── example_static_file.css

В этой структуре каталогов:

  • Airflow — рабочая директория фреймворка;
  • plugins — папка, в которой Airflow ищет плагины;
  • my_plugin — папка плагина;
  • __init__.py — пустой файл, который нужен, чтобы Python распознал папку как пакет;
  • my_plugin.py — файл с кодом плагина;
  • templates и static — папки для HTML-шаблонов и статических файлов, которые использует фреймворк Flask. Ведь именно Flask использует Airflow в качестве основного веб-фреймворка для своего пользовательского интерфейса. В папке templates надо сохранить HTML-файл страницы для визуального представления пользовательского плагина.

Плагин будет отображать в меню веб-интерфейса фреймворка пользовательские элементы и HTML-страницу. Предположим, я хочу встроить в меню AirFlow ссылки на сайты нашей Школы Больших Данных и Школы прикладного бизнес-анализа и проектирования информационных систем. Также добавим пункт меню «Мой плагин», который будет перенаправлять на HTML-страницу визуального представления плагина. Код такого пользовательского плагина выглядит следующим образом:

# путь к файлу
file_path = '/content/airflow/plugins/my_plugin/my_plugin.py'

# код плагина
plugin_code = """
# Импорт модулей
from airflow.plugins_manager import AirflowPlugin
from airflow.security import permissions
from airflow.www.auth import has_access

from flask import Blueprint
from flask_appbuilder import expose, BaseView as AppBuilderBaseView

from airflow.hooks.base import BaseHook

# Определение пользовательского хука, который наследуется от BaseHook
class PluginHook(BaseHook):
    pass

# Определение пользовательского макроса
def plugin_macro():
    pass

# Flask Blueprint для интеграции шаблонов и статических файлов
bp = Blueprint(
    "test_plugin",
    __name__,
    template_folder="templates",  # регистрирует airflow/plugins/templates как папку шаблонов Jinja
    static_folder="static",
    static_url_path="/static/test_plugin",
)

# Определение класса представления Flask AppBuilder с меню
class TestAppBuilderBaseView(AppBuilderBaseView):
    default_view = "test"

    @expose("/")
    @has_access(
        [
            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
        ]
    )
    def test(self):
        # Возвращает HTML-шаблон с контентом
        return self.render_template("test_plugin/test.html", content="HTML-страница представления пользовательского плагина")

# Определение класса представления Flask AppBuilder без меню
class TestAppBuilderBaseNoMenuView(AppBuilderBaseView):
    default_view = "test"

    @expose("/")
    @has_access(
        [
            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
        ]
    )
    def test(self):
        # Возвращает HTML-шаблон с контентом
        return self.render_template("test_plugin/test.html", content="HTML-страница представления пользовательского плагина")

# Создание экземпляра представления с меню
v_appbuilder_view = TestAppBuilderBaseView()
# Пакет для регистрации представления с меню
v_appbuilder_package = {
    "name": "Перейти на страницу плагина",
    "category": "Мой плагин",
    "view": v_appbuilder_view,
}

# Создание экземпляра представления без меню
v_appbuilder_nomenu_view = TestAppBuilderBaseNoMenuView()
# Пакет для регистрации представления без меню
v_appbuilder_nomenu_package = {"view": v_appbuilder_nomenu_view}

# Элементы меню Flask AppBuilder
appbuilder_mitem_1 = {
    "name": "Перейти на сайт",
    "href": "https://bigdataschool.ru/",
    "category": "Школа Больших Данных",
}
appbuilder_mitem_2 = {
    "name": "Перейти на сайт",
    "href": "https://babok-school.ru/",
    "category": "Школа прикладного бизнес-анализа и проектирования ИС",
}

# Определение класса плагина Airflow
class AirflowTestPlugin(AirflowPlugin):
    name = "test_plugin"
    hooks = [PluginHook]  # Регистрация пользовательского хука
    macros = [plugin_macro]  # Регистрация пользовательского макроса
    flask_blueprints = [bp]  # Регистрация Flask Blueprint
    appbuilder_views = [v_appbuilder_package, v_appbuilder_nomenu_package]  # Регистрация представлений
    appbuilder_menu_items = [appbuilder_mitem_1, appbuilder_mitem_2]  # Регистрация элементов меню
"""

# Запись кода в файл
with open(file_path, 'w') as file:
    file.write(plugin_code)

print(f"Плагин сохранён в {file_path}")

Этот плагин добавляет несколько расширений и интеграций в веб-интерфейс Airflow. AirflowPlugin является базовым классом для создания плагинов, Blueprint используется для создания эскизов Flask, чтобы управлять шаблонами и статическими файлами, BaseHook нужен для создания собственных хуков для подключения к различным сервисам. Flask использует концепцию эскизов (blueprint) для создания компонентов приложений и поддержки общих шаблонов внутри приложения или между приложениями. По сути, Blueprint — это способ организации Flask-приложения, который позволяет разбивать приложение на более мелкие и управляемые модули со своим маршрутом, обработчиками ошибок и пр. Так можно делить код на логические части, создавая отдельные файлы для различных функциональных областей Flask-приложения, таких как аутентификация, администрирование и пр.

Класс AirflowTestPlugin представляет собой главный класс плагина, который наследуется от AirflowPlugin. Он определяет различные компоненты плагина, такие как хуки, макросы, Flask Blueprints, представления и элементы меню, которые будут зарегистрированы в Airflow при загрузке плагина.

Класс PluginHook наследуется от BaseHook и его можно расширить для создания пользовательских хуков. Функция plugin_macro() – это макрос, который можно использовать в шаблонах Airflow. Он зарегистрирован как macros.test_plugin.plugin_macro. Чтобы интегрировать пользовательские страницы и статические ресурсы в веб-интерфейс Airflow, создается Blueprint с именем test_plugin, который указывает на папки с шаблонами и статическими файлами. Классы TestAppBuilderBaseView и TestAppBuilderBaseNoMenuView наследуются от AppBuilderBaseView и используются для создания пользовательских страниц во Flask AppBuilder. Методы test() отображают шаблон test_plugin/test.html с параметром content, содержащим данные для отображения в HTML-странице. Чтобы контролировать доступ к этим HTML-страницам, используется декоратор @has_access.

Для создания представлений и меню создаются v_appbuilder_view и v_appbuilder_nomenu_view как экземпляры соответствующих классов представлений. Они упаковываются в словари v_appbuilder_package и v_appbuilder_nomenu_package, которые затем регистрируются в плагине. Также создаются элементы меню appbuilder_mitem_1 и appbuilder_mitem_2, которые добавляют ссылки на внешние сайты в меню веб-интерфейса Airflow.

Разметка HTML-страницы для визуального представления пользовательского плагина выглядит так:

#HTML-страница представления пользовательского плагина
html_content = """
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Test Plugin View</title>
    <style>
        body {
            font-family: Arial, sans-serif;
        }
        .container {
            margin: 20px;
        }
        h1 {
            color: #333;
        }
    </style>
</head>
<body>
    <div class="container">
        <h1>Это представление пользовательского плагина</h1>
        <p>{{ content }}</p>
        <img src="{{ url_for('test_plugin.static', filename='aiplug_0.png') }}" style="display: block; margin-top: 20px;">
        <a href='/'>Вернуться назад в интерфейс AirFlow</a>
    </div>
</body>
</html>
"""

# Путь к директории
directory_path = '/content/airflow/plugins/my_plugin/templates/test_plugin/'

# Создаем директорию, если она не существует
import os
os.makedirs(directory_path, exist_ok=True)

# Путь к файлу
file_path = os.path.join(directory_path, 'test.html')

# Записываем HTML-контент в файл
with open(file_path, 'w') as file:
    file.write(html_content)

Поскольку в этом примере я решила отображать картинку на HTML-странице, путь к ней надо указать относительно директории, где размещен плагин в структуре проекта Airflow. Обычно, статические файлы для плагинов помещаются в папку static внутри каталога плагина.

Сохранив плагин, HTML-страницу и файл картинки в соответствующих папках, можно запускать Airflow. Я сделаю это локально на порту 8888, туннелировав веб-сервер фреймворка с помощью ngrok:

# Запуск Airflow в режиме демона
!airflow standalone --daemon

# Инициализация базы данных Airflow
!airflow db init

# Запуск веб-сервера Airflow на порте 8888
get_ipython().system_raw('airflow webserver --port 8888 &')

# Запуск ngrok для проксирования локального порта 8080 в публичный URL
get_ipython().system_raw('./ngrok http 8080 &')

!curl -s http://localhost:4040/api/tunnels | python3 -c \
    "import sys, json; print(json.load(sys.stdin)['tunnels'][0]['public_url'])"

# Установка порта для доступа к Airflow
addr = 8888

# Создание публичного URL для доступа к веб-интерфейсу Airflow через ngrok
public_url = ngrok.connect(addr)

# Вывод публичного URL для доступа к веб-интерфейсу Airflow
print("Адрес Airflow GUI:", public_url)

# Обновление базы данных Airflow до последней версии
!airflow db upgrade

# Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna,
# адресом электронной почты anna@a.ru и паролем pass. Этот пользователь будет иметь роль Admin,
# которая дает полный доступ к интерфейсу Airflow.
!airflow users create --username anna --firstname Anna --lastname Anna --email anna@a.ru --role Admin --password pass

Просмотреть зарегистрированные таблицы можно в CLI-интерфейсе Colab, вызвав команду

#просмотр установленных плагинов
!airflow plugins
Структура каталогов с элементами пользовательского плагина Airflow
Структура каталогов с элементами пользовательского плагина Airflow

Также успешно зарегистрированные плагины отображаются в веб-интерфейсе Airflow:

Отображение пользовательских плагинов в веб-интерфейсе Airflow
Отображение пользовательских плагинов в веб-интерфейсе Airflow

Благодаря плагину, меню Airflow изменилось: в него добавлены новые пункты:

Новые пункты меню в веб-интерфейсе Airflow
Новые пункты меню в веб-интерфейсе Airflow

Переход на страницу добавленного плагина отображает ее визуальное представление:

Визуальное представление пользовательского плагина
Визуальное представление пользовательского плагина

При разработке и отладке этого плагина мне приходилось постоянно перезапускать веб-сервер и планировщик фреймворка. Чтобы сделать это быстро в Google Colab, просто останавливала процессы и запускала фреймворк в standalone-режиме снова. Для остановки процессов использовала команду !pkill:

!pkill -f "airflow webserver"
!pkill -f "airflow scheduler"
!pkill -f "ngrok"

Таким образом, добавить свой плагин в Airflow оказалось не так сложно, однако, нужно очень внимательно сохранять все добавляемые компоненты в правильных папках согласно структуре каталогов Airflow-проекта с учетом особенностей как самого ETL-оркестратора, так и используемого им фреймворка Flask.

Узнайте больше про администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту