15 советов по работе с DAG в Apache AirFlow: лучшие практики дата-инженера

курсы по Airflow, Apache Airflow обучение, курсы дата-инженеров, обучение инженеров Big Data, инженерия больших данных, AirFlow 2.0, DAG AirFlow best practices

Практическое обучение дата-инженеров – это не просто курсы по основам Big Data, а полезные рекомендации с реальными примерами. Поэтому сегодня рассмотрим, как работать с DAG в Apache AirFlow еще эффективнее с помощью параметров конфигурации, плагинов, меток, шаблонов, переменных и еще 10 различных инструментов.

15 лучших практики для DAG в Apache AirFlow

Напомним, конвейер обработки данных (data pipeline) в Airflow — это просто Python-скрипт, который определяет объект DAG (Directed Acyclic Graph). DAG AirFlow – это цепочка задач для запланированного запуска по расписанию в виде направленного ациклического графа. Проектируя DAG, инженер данных как разработчик Data Flow, определяет набор операторов, с помощью которых будут выполняться входящие в граф задачи. Поэтому знание лучших практик работы с DAG очень полезно пользователю Apache AirFlow. В этом случае наиболее простыми, но весьма эффективными рекомендациями будут следующие [1]:

  • рассматривайте DAG как файл конфигурации;
  • Используйте систему плагинов;
  • не выполняйте обработку данных в файлах DAG;
  • делегируйте операторам вызовы API-интерфейсов или подключения к базам данных;
  • делайте DAG-файлы и задачи идемпотентными;
  • используйте одну переменную для каждого DAG;
  • маркируйте DAG;
  • не злоупотребляйте XCom;
  • используйте промежуточное хранилище между задачами;
  • контролируйте доступ на уровне DAG;
  • используйте возможности шаблонов Jinja;
  • установите повторные попытки выполнения задач на уровне DAG;
  • определите согласованную структуру файлов;
  • выберите последовательный метод для зависимостей задач;
  • имейте стратегию уведомлений о сбоях.

Что именно представляет собой каждая рекомендуемая практика, мы рассмотрим далее.

DAG как файл конфигурации

Планировщик Airflow сканирует и компилирует файлы DAG при каждом такте. Это занимает довольно много ресурсов для тяжеловесных файлов с большим количеством верхнеуровневого кода. Поэтому целесообразно делать DAG максимально понятными с т.н. «чистым кодом», что они были похожи на файлы конфигурации. Для этого пригодится YAML/JSON-определение рабочего процесса (workflow), чтобы затем на его основе создавать DAG. Такой подход даст как минимум следующие преимущества:

  • DAG, которые создаются автоматически (программным способом), будут согласованными и воспроизводимыми в любое время;
  • доступность для пользователей, не работающих с Python.

Более того, блоки кода, не связанные с конфигурацией, можно отделить от определения DAG и использовать атрибут template_searchpath для их добавления. Например, если требуется выполнить некоторый SQL-запрос, подключившись к источнику данных, эта команда SQL должна быть загружена из файла. А расположение этого файла следует указать в template_searchpath. Аналогичное эмпирическое правило подходит и для запросов Hive (.hql).

Используйте в систему плагинов Airflow

Организуйте качественный репозиторий плагинов и поддерживайте его для создания пользовательских плагинов. Создавайте плагин по единому (универсальному) образцу, чтобы его можно было многократно использовать в разных сценариях использования. Это позволит управлять версиями плагинов, а также поддерживать порядок в рабочих процессах с помощью параметров их конфигурации, а не логики реализации. Вставляйте операции внутри метода выполнения, а не при инициализации класса.

Не выполняйте обработку данных в файлах DAG

Поскольку файлы Directed Acyclic Graph представляют собой Python-скрипты, может возникнуть соблазн использовать pandas или аналогичные библиотеки обработки данных. Однако, не стоит это делать: помните, что Airflow — это оркестратор рабочих процессов, а не среда их исполнения. Все вычисления должны выполняться в специализированной целевой системе.

Делегируйте операторам API-вызовы и подключения к базам данных

Вызов API или соединение с БД, выполненное на верхнем уровне кода в файлах DAG, перегружает веб-сервер. Эти вызовы, определенные вне оператора, вызываются при каждом такте. Поэтому рекомендуется передать их оператору util/common.

Сделайте DAG-файлы и задачи идемпотентными

DAG должен выдавать одни и те же данные при каждом запуске, например, чтение из раздела и запись в него, должны быть неизменными. Чтобы избежать внезапных ошибок, создавайте и удаляйте сами разделы, не трогая DAG.

Data Pipeline на Apache Airflow

Код курса
AIRF
Ближайшая дата курса
19 марта, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.

Используйте одну переменную для каждого DAG

Каждый раз при обращении к переменным Directed Acyclic Graph, создается соединение с базой данных, чтобы считать метаданные. Это может перегрузить СУБД, особенно при нескольких DAG, каждый из которых вызывает более одной переменной. Поэтому лучше использовать одну переменную для каждого графа с объектом JSON в рамках единого соединения. А, проанализировать этот JSON, можно получить требуемую пару ключ-значение.

Маркируйте DAG

Наличие тегов помогает фильтровать и группировать Directed Acyclic Graph. Поэтому стоит маркировать цепочки задач в соответствии с системой тегов, характерной для вашей инфраструктуры. Например, теги могут базироваться на проекте, категории приложения и прочих особенностях экосистемы data pipeline’ов, принятых в компании. Также это может помочь в управлении множеством взаимозависимых DAG: об этой проблеме и способах ее решения читайте в нашей новой статье.

Не злоупотребляйте XCom

Напомним, XCom (cross-communication) в Apache AirFlow используется как канал обмена данными между задачами в одном DAG, с помощью пары ключ-значение и названием задачи-отправителя. XCom создаётся в операторе Python на основании возвращаемого им значения или вручную с помощью функции xcom_push. После выполнения задачи значение сохраняется в контексте, чтобы его через функцию xcom_pull приняла следующая задача в другом Python-операторе или из шаблона jinja внутри любой предобработанной строки. При этом данные хранятся в серверной СУБД метаданных, хотя в Airflow 2.0 сама операция XCom скрыта внутри Python-оператора и полностью абстрагируется от разработчика DAG. Подробнее об этом и других новинках нового релиза Apache AirFlow мы писали здесь. Тем не менее, несмотря на отмеченное улучшение с инкапсуляцией XCom, при передаче большого количества данных между задачами или слишком частого выполнения этой процедуры, серверная СУБД с метаданными будет перегружена.

Используйте промежуточное хранилище между задачами

Чтобы избежать вышеуказанной проблемы с XCom и организовать быстрый обмен большим объемом данных между задачами, имеет смысл сохранить их в промежуточной хранилище. И передавать последующей задаче ссылку на эти данные, не пересылая их самих.

Используйте возможности шаблонов Jinja

Airflow использует возможности Jinja Templating и предоставляет разработчику Data Flow готовый набор встроенных параметров и макросов, позволяя также самостоятельно определять их и создавать новые шаблоны. Напомним, Jinja — это язык шаблонов для Python-разработчиков, похожий на шаблоны Django. Он быстрый, популярный и безопасный благодаря дополнительной изолированной среде выполнения шаблонов (песочнице) и автоматической системе экранирования HTML для предотвращения XSS [2].

Многие операторы AirFlow поддерживают template_fields – кортежный объект (tuple), который определяет, какие поля будут преобразованы.

class PythonOperator(BaseOperator):
template_fields = ('templates_dict', 'op_args', 'op_kwargs')

При создании пользовательского оператора достаточно переопределить атрибут template_fields [1]:

class CustomBashOperator(BaseOperator):
template_fields = ('file_name', 'command', 'dest_host')

 

В этом примере поля «file_name», «command», «dest_host» доступны для создания шаблонов Jinja.

Контролируйте доступ на уровне DAG

Мы уже упоминали, что в Apache AirFlow 2.0 добавлены новые кластерные политики, которые предоставляют интерфейс для работы с каждой задачей или DAG во время его загрузки, а также непосредственно перед выполнением задачи. Поэтому следует использовать эти возможности по максимуму, определив в пользовательских настройках airflow_local_settings политики dag_policy, task_policy и task_instance_mutation_hook. Разумеется, предварительно следует создать настраиваемую роль – пользователя Linux, который будет выполнять разрешенные действия с графами и задачами.

Еще 5 полезных практик в работе с AirFlow

  • используйте статическую дату начала start_date, чтобы корректно определить расписание запуска DAG;
  • при глобальных структурных изменениях переименуйте файл с цепочкой задач, создав его новую версию, чтобы сохранить всю историю. При этом в частности и при создании любых файлов вообще придерживайтесь согласованной и понятной структуры их именования и хранения.
  • придерживайтесь последовательного метода организации зависимостей между задачами;
  • разработайте и внедрите стратегию уведомления о сбоях;
  • установите повторные попытки выполнения отказавших задач на уровне DAG.

О том, какие еще практические рекомендации помогут дата-инженеру разрабатывать и поддерживать Big Data Pipeline’ы, мы рассказываем в новой статье.

Код курса
ADH-AIR
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.

Как использовать все эти рекомендации на практике, чтобы эффективно применять Apache AirFlow для простой разработки сложных конвейеров аналитики больших данных с Hadoop и Spark, вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/swlh/airflow-dag-best-practices-716ac95b82d1
  2. https://jinja.palletsprojects.com/en/2.11.x/
  3. https://airflow.apache.org/docs/apache-airflow/stable/tutorial.html

 

Поиск по сайту