Как добавить пользовательский код в Apache AirFlow и где его хранить: лучшие практики и рекомендации для дата-инженера с примером создания и импорта своего пакета.
Как хранить общий код в AirFlow
Недавно мы писали про сложности управления DAG в многопользовательской среде Apache AirFlow. Однако, даже когда речь идет про однопользовательскую работу в AirFlow, необходимо грамотно управлять конвейерами обработки данных, желательно как кодом. Впрочем, для этого ETL-оркестратора это делается нативно, поскольку DAG представляет собой Python-код или YAML-файл. Чаще всего весь код развертывания Airflow хранится в одном репозитории. Однако, иногда имеет смысл разделить DAG на несколько проектов. В таких сценариях лучше всего иметь отдельное развертывание Airflow для каждого проекта по следующим причинам:
- контроль доступа на основе ролей (RBAC) на уровне развертывания Airflow, а не на уровне DAG. Если разные команды должны иметь доступ только к своим DAG, есть смысл разделить их на два проекта и отдельные среды развертывания.
- разная инфраструктура – некоторые DAG хорошо подходят для исполнителей Kubernetes, а другие – для Celery;
- изоляция и избегание конфликтов, когда разные DAG имеют конфликтующие зависимости Python или ОС.
Как мы уже отмечали, запуск DAG из нескольких проектов в одном развертывании Airflow не считается рекомендуемым шаблоном. Если же файлы из разных репозиториев разворачиваются вместе в одну среду Airflow, это должно быть под контролем CI/CD-инструментов, что мы разбирали здесь.
Python-код, такой как пользовательские хуки, операторы или шаблоны DAG, которые повторно используются между проектами Airflow, следует хранить в репозитории, который отделен от индивидуальных репозиториев проектов. Это гарантирует, что любые изменения в повторно используемом коде нужно будет вносить только один раз и они будут применены ко всем проектам, где этот код используется. При этом стоит придерживаться следующих рекомендаций по хранению общего кода Python:
- в одном файле – при повторном использовании кода только в пределах одного скрипта. Например, чтобы уменьшить нагрузку, связанную с многократным поддержанием одной и той же бизнес-логики, и иметь только один способ ее выполнения, можно оформить код в одну функцию в том же файле DAG и ссылаться на нее в нескольких задачах Airflow в одном и том же скрипте.
- в папке /include – при повторном использовании кода в нескольких скриптах, но в пределах одного репозитория Git;
- в пакете Python в отдельном проекте – при повторном использовании кода в нескольких проектах Git. Чтобы повторно использовать код в нескольких проектах, нужно сохранить его в отдельном репозитории Git, который может быть повторно использован несколькими проектами. Для этого надо создать собственный Python-пакет из репозитория, который следует сделать доступным для нескольких проектов. Это требует немного больше работы по настройке, но позволяет нескольким командам, использующим несколько репозиториев Git, поддерживать единый источник кода.
Как добавить пользовательский Python-код в Airflow
Airflow позволяет использовать собственные модули Python в DAG и в конфигурации фреймворка. Это можно сделать одним из следующих способов:
- добавить свои модули в одну из папок, которые Airflow автоматически добавляет в PYTHONPATH;
- добавить дополнительные папки, где хранится пользовательский код с PYTHONPATH;
- упаковать пользовательский код в пакет Python и установить его вместе с Airflow.
Список каталогов, из которых Python пытается загрузить модуль, задается переменной sys.path, которая инициализируется во время запуска программы. Первый приоритет отдается текущему каталогу, т. е. path[0], содержащему текущий скрипт, который использовался для вызова, или пустой строке, если это была интерактивная оболочка. Второй приоритет отдается PYTHONPATH, за которым следуют зависящие от установки пути по умолчанию, управляемые управляются модулем site. Значение переменной sys.path может быть изменено во время сеанса Python с помощью метода append(), например, sys.path.append(«/path/to/custom/package»)). Тогда Python начнет искать пакеты в новых путях после их добавления.
В переменной sys.path есть каталог site-packages, который содержит установленные внешние пакеты, установленные с помощью менеджера пакетов pip или anaconda, чтобы использовать их в Airflow. Airflow при динамическом запуске добавляет три каталога в sys.path:
- dags, что настраивается с помощью опции dags_folder в разделе [core];
- config, что настраивается путем установки переменной {AIRFLOW_HOME}/config;
- plugins, что настраивается с помощью параметра plugins_folder в разделе [core].
В Airflow версии 2 папка dags в не должна размещаться на веб-сервере, поскольку он не должен запускать код, который могут изменить пользователи, пишущие DAG. Если нужно предоставить общий доступ к какому-либо коду веб-серверу, рекомендуется предоставить его через папку config или plugins или через установленные пакеты Airflow. Эти папки обычно управляются и доступны администраторам или DevOps-инженерам, тогда как в папку dags дата-инженеры или специалисты по Data Science записывают свои конвейеры обработки данных. Папки config и plugins по умолчанию считаются безопасными, потому что являются частью конфигурации установки Airflow и контролируются специалистами, управляющими развертыванием.
Для всех пакетов и модулей рекомендуется задавать уникальные имена и избегать использования общих имен на уровне PYTHONPATH. Например, при добавлении папки airflow с __init__.py в папку dags, она будет конфликтовать с пакетом Airflow, откуда не получится ничего импортировать. Аналогично, не следует добавлять туда файл airflow.py.
Также общие имена, используемые стандартными библиотечными пакетами, такими как multiprocessing или logging и т. д., не должны использоваться на верхнем уровне, ни как пакеты, т. е. папки с __init__.py, ни как модули, т. е py-файлы. То же самое относится к папкам config и plugins, которые также находятся в папке PYTHONPATH, и ко всему, что туда добавляется вручную.
Не следует использовать относительный импорт, начинающийся с точки (.), добавленный в Python 3, поскольку в Airflow один и тот же файл DAG может анализироваться в разных контекстах (планировщиками, рабочими процессами или во время тестов). Всегда надо использовать полные пути пакетов Python при импорте в DAG.
При создании папок в них надо добавлять пустые файлы __init__.py. Хотя в в Python 3 есть концепция неявных пространств имен, где не нужно добавлять эти файлы в папку, Airflow ожидает их во всех пакетах.
Как уже было отмечено, создание и установка своего Python-пакета считается наиболее корректным способом добавления пользовательского кода в Airflow, поскольку так можно организовать свой подход к управлению версиями, развертывать код во всех экземплярах и контейнерах контролируемым образом системными администраторами и DevOps-инженерами, а не авторами DAG. Также можете установить свои пакеты плагинов и провайдеров как Python- пакеты, выполнив следующие действия:
- выбрать и установить инструмент сборки/упаковки, который будет соответствовать PEP-621 для хранения метаданных, например, setuptools, poety, hatch, flit.
- создать каталог пакета с помощью команды mkdir;
- создать файл py внутри пакета и добавить туда код, который надо выводить при импорте этого пакета;
- создать файл pyproject.toml, который содержит всю информацию о Python-проекте, и заполнить его конфигурацией инструмента сборки;
- создать свой Python-проект, упаковав его в wheel-пакет, который затем может быть использован для установки с помощью pip. Это можно сделать, например, с помощью команды, которая создаст whl-файл в папке dist:
-
hatch build -t wheel
- далее можно установить этот wheel-пакет, например, my_pack, с помощью менеджера пакетов, например, pip:
pip install dist/my_pack-0.0.0-py3-none-any.whl
- наконец, можно импортировать в файл DAG или задачи созданный пакет с помощью ключевого слова import:
import my_pack
Удалить ненужный пакет можно также с помощью менеджера pip, добавив команду uninstall:
pip uninstall my_pack
Узнайте больше про администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Data Pipeline на Apache AirFlow и Apache Hadoop
- AIRFLOW с использованием Yandex Managed Service for Apache Airflow™
Источники