ETL по расписанию: 4 способа планирования запусков DAG в Apache AirFlow

Apache AirFlow примеры курсы обучение, Apache AirFlow развертывание администрирование оптимизация, Apache AirFlow для дата-инженеров и администраторов, Школа Больших данных Учебный центр Коммерсант

Чем планирование запуска DAG в Apache AirFlow с объектом timedelta отличается от использования cron-выражений, в чем разница CronTriggerTimetable и CronDataIntervalTimetable, а также как создать собственный класс расписания.

Объект timedelta vs cron-выражение: задание расписания запуска DAG в Apache AirFlow

Apache AirFlow идеально подходит для классических пакетных ETL-сценариев, например, когда надо извлечь данные из транзакционных систем, преобразовать и загрузить их в корпоративное хранилище (DWH, Data WareHouse). Обычно такие процессы запускаются по расписанию и часто выполняются глубокой ночью, чтобы не мешать работе бизнес-пользователей. Airflow дает возможность запускать пакетные конвейеры обработки данных по расписанию, позволяя определять интервал данных и логическую дату каждого запуска DAG. Для этого фреймворк поддерживает 3 способа задания расписания:

  • использование timedelta-объекта Python-модуля datetime;
  • использование Linux-утилиты cron;
  • пользовательские классы.

DAG, запланированные с помощью timedelta-объекта Python-модуля datetime или Linux-утилиты cron, внутренне преобразуются для постоянного использования расписания. Для запуска программ или скриптов в заданное время или по расписанию утилита cron использует таблицу crontab. Это текстовый файл со списком задач и расписаний их выполнения. Каждая задача определяет команду, которую надо выполнить, и временные интервалы выполнения. В каждой строке crontab-таблицы первые пять полей определяют минуты, часы, дни месяца, месяцы и дни недели. А в последнем необязательном поле указывается пользователь, от имени которого будет выполняться задача. Также утилита cron предоставляет механизм логирования, который записывает результаты выполнения задач в специальные лог-файлы, чтобы отслеживать и анализировать результаты, а также оперативно разбираться с проблемами. Airflow анализирует cron-выражения с помощью библиотеки croniter, которая поддерживает расширенный синтаксис для строк cron-таблицы.

Например, следующее объявление DAG включает его запуск по расписанию в формате cron. Этот DAG будет запускаться каждый день в полночь (00:00):

dag = DAG("regular_interval_cron_example", schedule="0 0 * * *", ...)

А здесь используется предустановленный шаблон расписания, который также означает ежедневный запуск:

dag = DAG("regular_interval_cron_preset_example", schedule="@daily", ...)

В этом примере расписание запуска DAG задается с использованием объекта timedelta из Python-модуля datetime. DAG будет запускаться каждый день с интервалом в 1 день:

dag = DAG("regular_interval_timedelta_example", schedule=datetime.timedelta(days=1), ...)

Независимо от способа задания расписания, DAG создается с соответствующей записью запуска в бэкэнде базы данных Airflow.

Расписание, которое планирует интервалы данных с дельтой времени, т.е. с использованием datetime.timedelta фокусируется на значении интервала данных и не обязательно сопоставляет даты выполнения с произвольными границами, такими как начало дня или часа. Расписание, созданное с использованием cron-выражения, создает интервалы данных в соответствии с интервалом между точками срабатывания cron и запускает запуск DAG в конце каждого интервала данных.

Разница между планирования запуска DAG с помощью timedelta и cron заключается в гибкости, возможностях и контексте использования. К преимуществам timedelta относится простая интеграция с данными и зависимостями между задачами, поддержка встроенных инструментов мониторинга и уведомлений, гибкость настройки временных интервалов и возможность изменения расписания на лету, без перезапуска системы. Однако, этот способ требует больше ресурсов по сравнению с cron-выражениями.

Linux-утилита cron более проста в использовании и настройке, требует минимальных ресурсов, является надежным и устойчивым инструментом, который работает годами и хорошо протестирован в различных средах. Однако, у него нет встроенных инструментов для мониторинга и уведомлений. Также с cron труднее управлять сложными зависимостями между задачами, и он статичен: изменение расписания требует редактирования cron-файлов и перезагрузки конфигурации.

Таким образом, рекомендуется задавать расписание запуска DAG в Apache Airflow с использованием объекта timedelta в следующих условиях:

  • нужно управлять сложными зависимостями между задачами;
  • нужны инструменты мониторинга и уведомлений;
  • нужна гибкость в расписании и возможность быстро вносить изменения;
  • есть ресурсы и навыки для поддержки Airflow.

Утилиту cron можно использовать, если:

  • нужно простое и надежное решение для выполнения задач по расписанию;
  • задачи не требуют сложного управления зависимостями;
  • не нужны сложные инструменты мониторинга;
  • ресурсы ограничены.

2 способа работы с cron-выражениями: CronTriggerTimetable vs CronDataIntervalTimetable

Примечательно, что Airflow поддерживает 2 способа работы с cron-выражениями: CronTriggerTimetable и CronDataIntervalTimetable. Разница между ними в том, что CronTriggerTimetable не учитывает интервал данных, а CronDataIntervalTimetable учитывает. Также отметка времени в run_id для logical_date определяется по-разному в зависимости от того, как CronTriggerTimetable и CronDataIntervalTimetable обрабатывают расписание запуска DAG. Поскольку CronTriggerTimetable не включает интервал данных, значение переменных data_interval_start,  data_interval_end и legacy execution_date одинаковы. В случае CronDataIntervalTimetable эти параметры различаются: data_interval_start — это время, когда запускается DAG, а data_interval_end — это конец интервала запуска.

Например, DAG задан в 15:00 31 января с использованием cron-выражения @daily. CronTriggerTimetable инициирует новый запуск DAG в 12:00 1 февраля. А CronDataIntervalTimetable немедленно инициирует новый запуск DAG, поскольку запуск DAG для ежедневного временного интервала, начинающегося в 12:00 31 января, еще не произошел. Временная метка — полночь 31 января, поскольку это начало интервала данных 0 0 * * *.

Работа с пропусками также отличается. Если остановить DAG в 15:00 31 января и снова включить в 15:00 2 февраля, CronTriggerTimetable пропустит запуски DAG, которые должны были запуститься 1 и 2 февраля, запустив DAG в следующий раз в 12:00 3 февраля. А CronDataIntervalTimetable пропустит запуски DAG только за 1 февраля. Запуск DAG для 2 февраля будет запущен немедленно после того, как DAG, использующий этот способ планирования, снова будет включен.

Таким образом, CronTriggerTimetable запускает задачи точно в указанные моменты времени, определенные cron-выражением. Это полезно, когда нужно запускать задачи в конкретные моменты времени, независимо от того, сколько времени потребуется на их выполнение. Например, следующий DAG будет запускаться каждый день ровно в 12:00:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.timetables.trigger import CronTriggerTimetable
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
}

dag = DAG(
    'cron_trigger_timetable_example',
    default_args=default_args,
    schedule_interval=CronTriggerTimetable('0 12 * * *'),  # каждый день в 12:00
)

task = DummyOperator(
    task_id='dummy_task',
    dag=dag,
)

CronDataIntervalTimetable задает интервалы данных, которые задача должна обработать, на основе cron-выражения. Это полезно, когда важно обработать все данные за определенные интервалы времени, даже если предыдущие задачи еще не завершены. Например, следующий DAG тоже будет запускаться каждый день в 12:00, но каждая задача будет обрабатывать данные за предыдущие 24 часа:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.timetables.interval import CronDataIntervalTimetable
from datetime import datetime

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2023, 1, 1),
}

dag = DAG(
    'cron_data_interval_timetable_example',
    default_args=default_args,
    schedule_interval=CronDataIntervalTimetable('0 12 * * *'),  # каждый день в 12:00
)

task = DummyOperator(
    task_id='dummy_task',
    dag=dag,
)

Таким образом, имеет смысл выбирать CronTriggerTimetable, когда нужно запускать задачи строго в определенное время, независимо от интервала данных или времени выполнения предыдущих задач. Например, ежедневные отчеты или задачи, которые должны выполняться в определенный момент. А CronDataIntervalTimetable пригодится, когда важно обработать все данные за определенные интервалы времени, особенно если задачи могут быть длительными или нужно обрабатывать данные за весь интервал времени. Например, обработка логов или данных, поступающих с регулярными интервалами.

Гибкое планирование

Сочетание условных выражений наборов данных с временными расписаниями повышает гибкость планирования. Для этого в Airflow есть DatasetOrTimeSchedule – специализированное расписание, которое позволяет планировать DAG на основе как расписаний, основанных на времени, так и событий набора данных. Оно также облегчает создание запланированных запусков, согласно традиционным расписаниям, и запусков, запускаемых набором данных, которые работают независимо. Эта функция особенно полезна в сценариях, где DAG необходимо запускать при обновлениях набора данных, а также с периодическими интервалами. Она гарантирует, что рабочий процесс остается отзывчивым к изменениям данных и последовательно запускает регулярные проверки или обновления. Например, следующий код настраивает DAG, который будет запускаться по расписанию cron каждую среду в 01:00 UTC и при обновлении указанных датасетов:

from airflow.timetables.datasets import DatasetOrTimeSchedule
from airflow.timetables.trigger import CronTriggerTimetable


@dag(
    schedule=DatasetOrTimeSchedule(
        timetable=CronTriggerTimetable("0 1 * * 3", timezone="UTC"), datasets=(dag1_dataset & dag2_dataset)
    )
    # Additional arguments here, replace this comment with actual arguments
)
def example_dag():
    # DAG tasks go here
    pass

Объект класса DatasetOrTimeSchedule объединяет два типа расписаний:

  • CronTriggerTimetable(«0 1 * * 3″, timezone=»UTC») – расписание по cron-выражению, которое означает выполнение DAG каждую среду в 01:00 UTC;
  • datasets=(dag1_dataset & dag2_dataset) – расписание, основанное на датасетах. DAG будет запущен при обновлении обоих датасетов: dag1_dataset и dag2_dataset.

Помимо относительно простых сценариев запуска DAG по расписанию, заданному с помощью cron-выражению или timedelta, дата-инженер может создать в AirFlow свой собственный класс расписания и передать его аргументу DAG schedule. Это пригодится, например, при периодичности запуска, не соответствующего григорианскому календарю, а также в случае скользящего временного окна или перекрывающихся интервалах. Например, надо запускать DAG каждый день, но так, чтобы каждый запуск охватывал период предыдущих семи дней. Это можно реализовать с помощью cron-выражения, но пользовательский интервал данных обеспечивает более естественное представление, поскольку он не непрерывный, в отличие от непрерывного интервала в cron-выражении и timedelta.

Чтобы создать пользовательский класс расписания в Apache Airflow, надо наследовать его от класса BaseTimetable. Например, класс SevenDayIntervalTimetable является потомком класса BaseTimetable и реализует методы infer_manual_data_interval и next_dagrun_info, которые используются для определения интервалов данных для запуска DAG:

from airflow.timetables.base import BaseTimetable, DagRunInfo, DataInterval
from datetime import datetime, timedelta
from typing import Optional

class SevenDayIntervalTimetable(BaseTimetable):
    def infer_manual_data_interval(self, run_after: datetime) -> DataInterval:
        start = run_after - timedelta(days=7)
        end = run_after
        return DataInterval(start=start, end=end)

    def next_dagrun_info(self, *, last_automated_data_interval: Optional[DataInterval], restriction) -> Optional[DagRunInfo]:
        if not last_automated_data_interval:
            # если нет предыдущих запусков, запуск с даты неделю назад 
            start = restriction.earliest or (datetime.now() - timedelta(days=7))
        else:
            start = last_automated_data_interval.end

        end = start + timedelta(days=7)
        if restriction.latest and end > restriction.latest:
            return None  # Do not schedule if end is beyond the latest allowed time

        return DagRunInfo.interval(start, end)

Используя этот класс, можно создать DAG, который будет запускаться ежедневно, но каждый запуск будет охватывать период прошедшей недели:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime

# Импорт своего класса расписания
from your_module import SevenDayIntervalTimetable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 1,
}

dag = DAG(
    'seven_day_interval_dag',
    default_args=default_args,
    timetable=SevenDayIntervalTimetable(),
    catchup=False,
)

start = DummyOperator(task_id='start', dag=dag)

Кроме того, Airflow позволяет писать пользовательские расписания в плагинах и использовать их в DAG. При этом в коде рекомендуется обращаться к переменным, соединениям или чему-либо еще, что требует доступа к базе данных метаданных, как можно позже, чтобы сократить нагрузку на базу данных метаданных.

Узнайте больше про администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/timetable.html
  2. https://www.nic.ru/help/gid-po-nastrojke-cron-zadaniya-na-vydelennom-servere_11621.html
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту