Всему свое время: запуск DAG Apache Airflow по расписанию

DAG Apache AirFlow, Apache AirFlow примеры курсы обучение, обучение дата-инженеров, инженер данных курсы примеры обучение, запуск DAG по расписанию airflow example, инженерия данных с Apache AirFlow пример, обучение большим данным, Школа Больших Данных Учебный центр Коммерсант

Apache Airflow – мощный инструмент современной дата-инженерии. Этот оркестровщик batch-процессов позволяет запускать цепочки задач в виде направленного ациклического графа (DAG) по расписанию. Однако, планировщик Airflow имеет некоторые специфические особенности, которые необходимо знать каждому разработчику Data Flow. Об этом мы сегодня поговорим.

Планирование запуска DAG в Apache AirFlow: краткий ликбез

Запуски DAG планируются в основном на основе двух параметров: schedule_interval и start_date. Всякий раз, когда создается объект DAG, schedule_interval определяется явно или неявно. Значение по умолчанию schedule_interval — один день. Это означает, что новые запуски DAG будут планироваться каждый день, начиная с start_date. Также можно использовать переменную end_date, чтобы указать, когда планировщик прекращает планирование новых запусков DAG. Рекомендуется установить статическое значение start_date, а не динамическое, начиная с версии 1.8.0.

Однако, DAG будет выполняться тогда и только тогда, когда его start_date уже в прошлом. Поэтому следует ставить фиксированное время в прошлое и отменить перехват, установив параметр Catchupв значение False. Если установить start_date = datetime.now(), DAG перестанет работать. Airflow интерпретирует datetime.now() как текущую отметку времени и предполагает, что он не готов к запуску. DAG запускается после наступления schedule_interval. Например, если schedule_interval равен @hourly, нужно подождать 1 час, прежде чем запустится DAG. А если schedule_interval равен @daily, запуска DAG придется ждать целый день. Например, следующий код запустит DAG начнется 01.01.2022 в 00:00:00 и он запускаться каждый час:

from datetime import datetime
    from airflow import DAG
     
    default_args = {
      "start_date": datetime(2022, 1, 1),
    }
    
    with DAG("example_dag",
             default_args=default_args,
             schedule_interval="@hourly") as dag:

В вышеприведенном участке кода DAG запустится в первый раз в 01:00:00 2022-01-01 01:00:00, поскольку Airflow ожидает окончания интервала расписания, чтобы убедиться в наличии всех данных. Кроме того, стоит помнить, что Airflow хранит информацию о дате и времени в формате UTC. Поэтому фактическое время запуска DAG может не совпадать с локальным часовым поясом.

Чтобы избежать сбоя, когда start_date ссылается на более раннее время, чем фактическое время выполнения DAG Run, в Airflow есть концепция перехвата (Catchup). Если перехват включен на уровне DAG, планировщик запускает его заново для каждого интервала, который не был запущен во время триггера.

Airflow DAG run Catchup, запуск DAG в Apache AirFlow
Время запуска по расписанию в Airflow в зависимости от значения параметра перехвата Catchup

Такой перехват позволяет запускать рабочие процессы на основе временных интервалов и сохранять атомарные определения задач, что особенно полезно при работе с атомарными наборами данных, которые можно легко разделить на определенные временные рамки.

Когда рабочий процесс запускается путем запуска пользовательского интерфейса вручную или с помощью планировщика, концепция перехвата может запустить несколько автоматических запусков DAG для разных временных интервалов (Catchup=enabled). Тогда для разных запусков DAG их start_date сливается в одно значение. Различать их поможет параметр logical_date или execution_date в веб-интерфейсе Airflow. Параметр execution_date находится в контексте интервалов расписания и представляет время начала соответствующего интервала расписания. Поэтому даже у одновременно выполняемых запусков DAG переменная execution_date будет отличаться. Чтобы показать, как Airflow справляется с планированием запусков DAG, далее рассмотрим простой пример.

Практический пример

В следующем фрагменте кода для параметра thestart_date установлено значение первого дня 2019 года, для параметра schedule_interval — годовой интервал, а для параметра Catchup — значение True. Каждый раз при запуске рабочего процесса планировщиком, он создает 3 разных запуска DAG. Если это делать вручную через пользовательский интерфейс, будет создано 4 разных запуска DAG.

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperatordef print_hello():
    print("Hello world!")default_args = {
   'owner': 'irem.ertuerk',
   'start_date': datetime(2019, 1, 1),
}dag = DAG('dag-example',
          description='Simple Dag Example',
          schedule_interval="@yearly",
          default_args = default_args,
          catchup=True)dummy_operator = DummyOperator(task_id='dummy_task',
                    retries=3, dag=dag)hello_operator = PythonOperator(task_id='hello_task',
                    python_callable=print_hello, dag=dag)dummy_operator >> hello_operator

 

Сведения о каждом запуске DAG включают информацию о logical_date (execution_date), start_date и end_date. И дата начала, и дата окончания представляют собой фактическое время выполнения. А execution_date представляет время, когда ожидается выполнение DAG Run.

DAG Run в веб-GUI Apache AirFlow
Сведения об объектах DAG Run в веб-GUI Apache AirFlow

Поскольку start_date указывает на 2019 год, то при инициации первого запуска DAG в 2022 году, планировщик понимает, что запуски DAG отсутствуют на 2019, 2020 и 2021 годы, и полностью заполняет их и устанавливает соответствующие логические даты для каждого запуска. Однако, на 2022 год запуск DAG отсутствует, т.к. интервал начала, запланированный на 1 января, уже пройден. Поскольку Airflow предназначен для ETL-конвейеров, имеет смысл запустить рабочий процесс на 2022 год, когда все данные будут собраны, что означает в конце интервала графика, т.е. в конце года. Поэтому запуск DAG на 2022 год должен быть запланирован на 1 января 2023–01 годы, 00:00.

DAG Run AirFlow
Детальные сведения запуске отдельного DAG в веб-GUI Apache AirFlow

При этом начало и конец интервала данных определяются планировщиком и не обязательно коррелируют с реальным временем начала и окончания, указанным в UTC.

В заключение отметим еще одну неочевидную особенность. Иногда необходимо запустить новую задачу, добавленную в существующий DAG, где параметр аргументов по умолчанию (default_args) depend_on_past установлен в значение True. Эта новая задача не будет запускаться автоматически и, т.к. она зависит от прошлого, ее невозможно выполнить. Обойти это ограничение можно запустив в GUI команду игнорирования зависимостей (Ignore All Deps). Также можно отметить предыдущий запуск задачи успешным, даже если фактически она не запускалась.

Действия над задачами в веб-GUI Apache AirFlow
Способ обойти ограничения запуска зависимых задач в веб-GUI Apache AirFlow

Все подробности администрирования и эксплуатации Apache AirFlow для организации ETL/ELT-процессов в аналитике больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://medium.com/@irem.ertuerk/advancing-apache-airflow-5b02a34fe92b
  2. https://medium.com/@thehippieandtheboss/3-things-about-airflow-you-might-not-know-b7d48fde28cd
Поиск по сайту