Apache Airflow – мощный инструмент современной дата-инженерии. Этот оркестровщик batch-процессов позволяет запускать цепочки задач в виде направленного ациклического графа (DAG) по расписанию. Однако, планировщик Airflow имеет некоторые специфические особенности, которые необходимо знать каждому разработчику Data Flow. Об этом мы сегодня поговорим.
Планирование запуска DAG в Apache AirFlow: краткий ликбез
Запуски DAG планируются в основном на основе двух параметров: schedule_interval и start_date. Всякий раз, когда создается объект DAG, schedule_interval определяется явно или неявно. Значение по умолчанию schedule_interval — один день. Это означает, что новые запуски DAG будут планироваться каждый день, начиная с start_date. Также можно использовать переменную end_date, чтобы указать, когда планировщик прекращает планирование новых запусков DAG. Рекомендуется установить статическое значение start_date, а не динамическое, начиная с версии 1.8.0.
Однако, DAG будет выполняться тогда и только тогда, когда его start_date уже в прошлом. Поэтому следует ставить фиксированное время в прошлое и отменить перехват, установив параметр Catchupв значение False. Если установить start_date = datetime.now(), DAG перестанет работать. Airflow интерпретирует datetime.now() как текущую отметку времени и предполагает, что он не готов к запуску. DAG запускается после наступления schedule_interval. Например, если schedule_interval равен @hourly, нужно подождать 1 час, прежде чем запустится DAG. А если schedule_interval равен @daily, запуска DAG придется ждать целый день. Например, следующий код запустит DAG начнется 01.01.2022 в 00:00:00 и он запускаться каждый час:
from datetime import datetime from airflow import DAG default_args = { "start_date": datetime(2022, 1, 1), } with DAG("example_dag", default_args=default_args, schedule_interval="@hourly") as dag:
В вышеприведенном участке кода DAG запустится в первый раз в 01:00:00 2022-01-01 01:00:00, поскольку Airflow ожидает окончания интервала расписания, чтобы убедиться в наличии всех данных. Кроме того, стоит помнить, что Airflow хранит информацию о дате и времени в формате UTC. Поэтому фактическое время запуска DAG может не совпадать с локальным часовым поясом.
Чтобы избежать сбоя, когда start_date ссылается на более раннее время, чем фактическое время выполнения DAG Run, в Airflow есть концепция перехвата (Catchup). Если перехват включен на уровне DAG, планировщик запускает его заново для каждого интервала, который не был запущен во время триггера.
Такой перехват позволяет запускать рабочие процессы на основе временных интервалов и сохранять атомарные определения задач, что особенно полезно при работе с атомарными наборами данных, которые можно легко разделить на определенные временные рамки.
Когда рабочий процесс запускается путем запуска пользовательского интерфейса вручную или с помощью планировщика, концепция перехвата может запустить несколько автоматических запусков DAG для разных временных интервалов (Catchup=enabled). Тогда для разных запусков DAG их start_date сливается в одно значение. Различать их поможет параметр logical_date или execution_date в веб-интерфейсе Airflow. Параметр execution_date находится в контексте интервалов расписания и представляет время начала соответствующего интервала расписания. Поэтому даже у одновременно выполняемых запусков DAG переменная execution_date будет отличаться. Чтобы показать, как Airflow справляется с планированием запусков DAG, далее рассмотрим простой пример.
Практический пример
В следующем фрагменте кода для параметра thestart_date установлено значение первого дня 2019 года, для параметра schedule_interval — годовой интервал, а для параметра Catchup — значение True. Каждый раз при запуске рабочего процесса планировщиком, он создает 3 разных запуска DAG. Если это делать вручную через пользовательский интерфейс, будет создано 4 разных запуска DAG.
from datetime import datetime from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python_operator import PythonOperatordef print_hello(): print("Hello world!")default_args = { 'owner': 'irem.ertuerk', 'start_date': datetime(2019, 1, 1), }dag = DAG('dag-example', description='Simple Dag Example', schedule_interval="@yearly", default_args = default_args, catchup=True)dummy_operator = DummyOperator(task_id='dummy_task', retries=3, dag=dag)hello_operator = PythonOperator(task_id='hello_task', python_callable=print_hello, dag=dag)dummy_operator >> hello_operator
Сведения о каждом запуске DAG включают информацию о logical_date (execution_date), start_date и end_date. И дата начала, и дата окончания представляют собой фактическое время выполнения. А execution_date представляет время, когда ожидается выполнение DAG Run.
Поскольку start_date указывает на 2019 год, то при инициации первого запуска DAG в 2022 году, планировщик понимает, что запуски DAG отсутствуют на 2019, 2020 и 2021 годы, и полностью заполняет их и устанавливает соответствующие логические даты для каждого запуска. Однако, на 2022 год запуск DAG отсутствует, т.к. интервал начала, запланированный на 1 января, уже пройден. Поскольку Airflow предназначен для ETL-конвейеров, имеет смысл запустить рабочий процесс на 2022 год, когда все данные будут собраны, что означает в конце интервала графика, т.е. в конце года. Поэтому запуск DAG на 2022 год должен быть запланирован на 1 января 2023–01 годы, 00:00.
При этом начало и конец интервала данных определяются планировщиком и не обязательно коррелируют с реальным временем начала и окончания, указанным в UTC.
В заключение отметим еще одну неочевидную особенность. Иногда необходимо запустить новую задачу, добавленную в существующий DAG, где параметр аргументов по умолчанию (default_args) depend_on_past установлен в значение True. Эта новая задача не будет запускаться автоматически и, т.к. она зависит от прошлого, ее невозможно выполнить. Обойти это ограничение можно запустив в GUI команду игнорирования зависимостей (Ignore All Deps). Также можно отметить предыдущий запуск задачи успешным, даже если фактически она не запускалась.
Код курса
ADH-AIR
Ближайшая дата курса
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Все подробности администрирования и эксплуатации Apache AirFlow для организации ETL/ELT-процессов в аналитике больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники