Практическое обучение дата-инженеров – это не просто курсы по основам Big Data, а полезные рекомендации с реальными примерами. Поэтому сегодня рассмотрим, как работать с DAG в Apache AirFlow еще эффективнее с помощью параметров конфигурации, плагинов, меток, шаблонов, переменных и еще 10 различных инструментов.
15 лучших практики для DAG в Apache AirFlow
Напомним, конвейер обработки данных (data pipeline) в Airflow — это просто Python-скрипт, который определяет объект DAG (Directed Acyclic Graph). DAG AirFlow – это цепочка задач для запланированного запуска по расписанию в виде направленного ациклического графа. Проектируя DAG, инженер данных как разработчик Data Flow, определяет набор операторов, с помощью которых будут выполняться входящие в граф задачи. Поэтому знание лучших практик работы с DAG очень полезно пользователю Apache AirFlow. В этом случае наиболее простыми, но весьма эффективными рекомендациями будут следующие [1]:
- рассматривайте DAG как файл конфигурации;
- Используйте систему плагинов;
- не выполняйте обработку данных в файлах DAG;
- делегируйте операторам вызовы API-интерфейсов или подключения к базам данных;
- делайте DAG-файлы и задачи идемпотентными;
- используйте одну переменную для каждого DAG;
- маркируйте DAG;
- не злоупотребляйте XCom;
- используйте промежуточное хранилище между задачами;
- контролируйте доступ на уровне DAG;
- используйте возможности шаблонов Jinja;
- установите повторные попытки выполнения задач на уровне DAG;
- определите согласованную структуру файлов;
- выберите последовательный метод для зависимостей задач;
- имейте стратегию уведомлений о сбоях.
Что именно представляет собой каждая рекомендуемая практика, мы рассмотрим далее.
DAG как файл конфигурации
Планировщик Airflow сканирует и компилирует файлы DAG при каждом такте. Это занимает довольно много ресурсов для тяжеловесных файлов с большим количеством верхнеуровневого кода. Поэтому целесообразно делать DAG максимально понятными с т.н. «чистым кодом», что они были похожи на файлы конфигурации. Для этого пригодится YAML/JSON-определение рабочего процесса (workflow), чтобы затем на его основе создавать DAG. Такой подход даст как минимум следующие преимущества:
- DAG, которые создаются автоматически (программным способом), будут согласованными и воспроизводимыми в любое время;
- доступность для пользователей, не работающих с Python.
Более того, блоки кода, не связанные с конфигурацией, можно отделить от определения DAG и использовать атрибут template_searchpath для их добавления. Например, если требуется выполнить некоторый SQL-запрос, подключившись к источнику данных, эта команда SQL должна быть загружена из файла. А расположение этого файла следует указать в template_searchpath. Аналогичное эмпирическое правило подходит и для запросов Hive (.hql).
Используйте в систему плагинов Airflow
Организуйте качественный репозиторий плагинов и поддерживайте его для создания пользовательских плагинов. Создавайте плагин по единому (универсальному) образцу, чтобы его можно было многократно использовать в разных сценариях использования. Это позволит управлять версиями плагинов, а также поддерживать порядок в рабочих процессах с помощью параметров их конфигурации, а не логики реализации. Вставляйте операции внутри метода выполнения, а не при инициализации класса.
Не выполняйте обработку данных в файлах DAG
Поскольку файлы Directed Acyclic Graph представляют собой Python-скрипты, может возникнуть соблазн использовать pandas или аналогичные библиотеки обработки данных. Однако, не стоит это делать: помните, что Airflow — это оркестратор рабочих процессов, а не среда их исполнения. Все вычисления должны выполняться в специализированной целевой системе.
Делегируйте операторам API-вызовы и подключения к базам данных
Вызов API или соединение с БД, выполненное на верхнем уровне кода в файлах DAG, перегружает веб-сервер. Эти вызовы, определенные вне оператора, вызываются при каждом такте. Поэтому рекомендуется передать их оператору util/common.
Сделайте DAG-файлы и задачи идемпотентными
DAG должен выдавать одни и те же данные при каждом запуске, например, чтение из раздела и запись в него, должны быть неизменными. Чтобы избежать внезапных ошибок, создавайте и удаляйте сами разделы, не трогая DAG.
Data Pipeline на Apache Airflow
Код курса
AIRF
Ближайшая дата курса
2 июня, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Используйте одну переменную для каждого DAG
Каждый раз при обращении к переменным Directed Acyclic Graph, создается соединение с базой данных, чтобы считать метаданные. Это может перегрузить СУБД, особенно при нескольких DAG, каждый из которых вызывает более одной переменной. Поэтому лучше использовать одну переменную для каждого графа с объектом JSON в рамках единого соединения. А, проанализировать этот JSON, можно получить требуемую пару ключ-значение.
Маркируйте DAG
Наличие тегов помогает фильтровать и группировать Directed Acyclic Graph. Поэтому стоит маркировать цепочки задач в соответствии с системой тегов, характерной для вашей инфраструктуры. Например, теги могут базироваться на проекте, категории приложения и прочих особенностях экосистемы data pipeline’ов, принятых в компании. Также это может помочь в управлении множеством взаимозависимых DAG: об этой проблеме и способах ее решения читайте в нашей новой статье.
Не злоупотребляйте XCom
Напомним, XCom (cross-communication) в Apache AirFlow используется как канал обмена данными между задачами в одном DAG, с помощью пары ключ-значение и названием задачи-отправителя. XCom создаётся в операторе Python на основании возвращаемого им значения или вручную с помощью функции xcom_push. После выполнения задачи значение сохраняется в контексте, чтобы его через функцию xcom_pull приняла следующая задача в другом Python-операторе или из шаблона jinja внутри любой предобработанной строки. При этом данные хранятся в серверной СУБД метаданных, хотя в Airflow 2.0 сама операция XCom скрыта внутри Python-оператора и полностью абстрагируется от разработчика DAG. Подробнее об этом и других новинках нового релиза Apache AirFlow мы писали здесь. Тем не менее, несмотря на отмеченное улучшение с инкапсуляцией XCom, при передаче большого количества данных между задачами или слишком частого выполнения этой процедуры, серверная СУБД с метаданными будет перегружена.
Используйте промежуточное хранилище между задачами
Чтобы избежать вышеуказанной проблемы с XCom и организовать быстрый обмен большим объемом данных между задачами, имеет смысл сохранить их в промежуточной хранилище. И передавать последующей задаче ссылку на эти данные, не пересылая их самих.
Используйте возможности шаблонов Jinja
Airflow использует возможности Jinja Templating и предоставляет разработчику Data Flow готовый набор встроенных параметров и макросов, позволяя также самостоятельно определять их и создавать новые шаблоны. Напомним, Jinja — это язык шаблонов для Python-разработчиков, похожий на шаблоны Django. Он быстрый, популярный и безопасный благодаря дополнительной изолированной среде выполнения шаблонов (песочнице) и автоматической системе экранирования HTML для предотвращения XSS [2].
Многие операторы AirFlow поддерживают template_fields – кортежный объект (tuple), который определяет, какие поля будут преобразованы.
class PythonOperator(BaseOperator): template_fields = ('templates_dict', 'op_args', 'op_kwargs')
При создании пользовательского оператора достаточно переопределить атрибут template_fields [1]:
class CustomBashOperator(BaseOperator): template_fields = ('file_name', 'command', 'dest_host')
В этом примере поля «file_name», «command», «dest_host» доступны для создания шаблонов Jinja.
Контролируйте доступ на уровне DAG
Мы уже упоминали, что в Apache AirFlow 2.0 добавлены новые кластерные политики, которые предоставляют интерфейс для работы с каждой задачей или DAG во время его загрузки, а также непосредственно перед выполнением задачи. Поэтому следует использовать эти возможности по максимуму, определив в пользовательских настройках airflow_local_settings политики dag_policy, task_policy и task_instance_mutation_hook. Разумеется, предварительно следует создать настраиваемую роль – пользователя Linux, который будет выполнять разрешенные действия с графами и задачами.
Еще 5 полезных практик в работе с AirFlow
- используйте статическую дату начала start_date, чтобы корректно определить расписание запуска DAG;
- при глобальных структурных изменениях переименуйте файл с цепочкой задач, создав его новую версию, чтобы сохранить всю историю. При этом в частности и при создании любых файлов вообще придерживайтесь согласованной и понятной структуры их именования и хранения.
- придерживайтесь последовательного метода организации зависимостей между задачами;
- разработайте и внедрите стратегию уведомления о сбоях;
- установите повторные попытки выполнения отказавших задач на уровне DAG.
О том, какие еще практические рекомендации помогут дата-инженеру разрабатывать и поддерживать Big Data Pipeline’ы, мы рассказываем в новой статье.
Код курса
ADH-AIR
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Как использовать все эти рекомендации на практике, чтобы эффективно применять Apache AirFlow для простой разработки сложных конвейеров аналитики больших данных с Hadoop и Spark, вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники
- https://medium.com/swlh/airflow-dag-best-practices-716ac95b82d1
- https://jinja.palletsprojects.com/en/2.11.x/
- https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html