Почему триггеры отсроченных операторов Apache AirFlow не могут быть блокирующими и как сделать их асинхронными с помощью Python-библиотеки asyncio.
Создание своего отсроченного оператора в Apache AirFlow
О том, что такое отсроченные операторы, как они связаны с триггерами и асинхронными Python-вызовами в Apache AirFlow, мы недавно говорили здесь. Помимо использования существующих отсроченных операторов, дата-инженер может написать свой собственный, соблюдая следующие рекомендации:
- оператор должен отсрочить себя с помощью триггера, включенного в ядро Airflow, или пользовательского. При разработке пользовательского триггера необходимо перезапустить его, чтобы применить любые внесенные изменения, поскольку триггер кэширует триггерные классы. Кроме того, вся информация, которая передается между триггером и рабочим процессом, должна быть сериализуемой в формате JSON.
- поскольку отсроченный оператор останавливается и удаляется из своего рабочего процесса без автоматического сохранения состояния, для stateful-сценариев можно сохранить состояние, указав Airflow возобновить работу оператора определенным методом или передав определенные именованные аргументы в kwargs;
- оператор может откладываться неоднократно, до или после выполнения задачи или соблюдения конкретных условий;
- отсроченным может стать любой оператор, не только сенсор;
- если надо добавить оператор или датчик, который поддерживает как отсроченный, так и обычный режим, лучше добавить соответствующий метод, который будет принимать решение о режиме запуска. Можно настроить значение по умолчанию для всех операторов и датчиков, которые поддерживают переключение между отсроченным и обычным режимами:
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False)__init__deferrabledefault_deferrableoperator
Например, следующий код определяет пользовательский сенсор под названием WaitOneHourSensor. Он основан на BaseSensorOperator, который является базовым классом для всех сенсоров в Airflow. Метод __init__ является конструктором класса, он принимает параметр deferrable, который определяет отсроченный режим выполнения. Значение по умолчанию для deferrable берется из конфигурации Airflow. Конструктор также принимает любые дополнительные именованные аргументы с помощью **kwargs и передает их в базовый класс с помощью super().init(**kwargs).
import time from datetime import timedelta from typing import Any from airflow.configuration import conf from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import TimeDeltaTrigger from airflow.utils.context import Context class WaitOneHourSensor(BaseSensorOperator): def __init__( self, deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False), **kwargs ) -> None: super().__init__(**kwargs) self.deferrable = deferrable def execute(self, context: Context) -> None: if self.deferrable: self.defer( trigger=TimeDeltaTrigger(timedelta(hours=1)), method_name="execute_complete", ) else: time.sleep(3600) def execute_complete( self, context: Context, event: dict[str, Any] | None = None, ) -> None: return
Метод execute() выполняется при запуске сенсора. В отсроченном режиме, т.е. если deferrable установлено в значение True, сенсор откладывает выполнение на один час с помощью TimeDeltaTrigger. Иначе он просто приостанавливает выполнение на 1 час с помощью time.sleep(3600). Метод execute_complete вызывается после завершения отложенного выполнения, просто возвращая None. Таким образом, подобный сенсор можно использовать в DAG для ожидания в течение одного часа перед выполнением следующих задач.
Data Pipeline на Apache Airflow
Код курса
AIRF
Ближайшая дата курса
19 марта, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Разработка собственного триггера
Чтобы вызвать отсрочку в любом месте пользовательского оператора, надо указать это в параметрах метода defer:
self.defer(trigger, method_name, kwargs, timeout)
где trigger – это экземпляр триггера, который надо отсрочить. Он будет сериализован в базу данных метаданных. Также указывается method_name – имя метода оператора, который Airflow должен вызывать при возобновлении работы. Опционально можно указать именнованные ключевые аргументы kwargs. Тайм-аут timeout тоже необязательный параметр, указывающий время ожидания, после которого отсрочка оператора не выполняется, и экземпляр задачи не запускается. По умолчанию тайм-аут отсутствует. При вызове отсрочки, оператор прекратит выполнение и удаляется из текущего рабочего процесса без сохранения состояния, включая локальные переменные или атрибуты, установленные на self. Когда отсроченный оператор возобновляется, он возобновляется как новый экземпляр. Чтобы передать состояние из старого экземпляра оператора в новый, используются method_name и kwargs. Когда оператор возобновляет работу, Airflow добавляет объект context и event в kwargs, переданные методу по имени method_name. Объект event содержит полезную нагрузку из события триггера, которое возобновило работу оператора. Если оператор возвращается из своего первого метода execute(), когда он новый, либо из последующего метода, указанного с помощью method_name, он считается завершенным и завершит выполнение. Можно установить значение method_name равным execute, для одной точки входа в оператор. Также необходимо принимать в качестве необязательного аргумента ключевое слово event.
Следующий пример кода показывает, как датчик может вызвать отсрочку запуска оператора:
from datetime import timedelta from typing import Any from airflow.sensors.base import BaseSensorOperator from airflow.triggers.temporal import TimeDeltaTrigger from airflow.utils.context import Context class WaitOneHourSensor(BaseSensorOperator): def execute(self, context: Context) -> None: self.defer(trigger=TimeDeltaTrigger(timedelta(hours=1)), method_name="execute_complete") def execute_complete(self, context: Context, event: dict[str, Any] | None = None) -> None: Return
По сути, этот сенсор представляет собой обертку вокруг триггера, подчиняясь ему и указывая другой метод возврата при срабатывании триггера. При возврате значения он отмечает датчик как успешный. Вызов self.defer вызывает исключение TaskDeferred, поэтому он может работать в любом месте кода оператора, даже если в него вложено много вызовов глубоко внутри execute(). Можно даже вызвать исключение TaskDeferred вручную, используя те же аргументы, что и self.defer. Тайм-аут выполнения execution_timeout на операторы определяется из общего времени выполнения, а не отдельных выполнений между отсрочками. Поэтому когда execution_timeout установлен, оператор может выйти из строя, пока он отсрочен или пока он выполняется после отсрочки, даже если он был возобновлен всего на несколько секунд.
Чтобы написать пользовательский триггер, надо наследовать от класса BaseTrigger три метода:
- __init__ — метод получения аргументов от операторов, создающих его экземпляр;
- Run — асинхронный метод, который выполняет свою логику и выдает один или несколько экземпляров TriggerEventв качестве асинхронного генератора;
- Serialize — возвращает информацию, необходимую для повторного построения этого триггера, в виде кортежа classpath и ключевых аргументов для передачи в __init__.
При написании собственного триггера следует учитывать некоторые ограничения:
- метод должен быть асинхронным, т.е. использовать возможности Python-библиотеки asyncio и корректно запускаться при выполнении блокирующей операции .await;
- запуск (run) должен генерировать события триггера (yield), а не возвращать их. Если он возвращается до того, как выдаст хотя бы одно событие, Airflow выдаст исключение и завершит все зависимые экземпляры задач.
- Поскольку экземпляр триггера может запускаться более одного раза, надо предусмотреть обработку идемпотентных запросов, например, чтобы не дублировать данные при вставке записей в базу. Поэтому если триггер предназначен для генерации более одного события, то оно должносодержать полезную нагрузку, которую можно использовать для дедупликации, если триггер запущен в нескольких местах. При однократном запуске события без возврата ответа оператору можно просто установить полезную нагрузку на None. Кроме того, триггер может быть внезапно удален из одной службы триггера и запущен на новой, к примеру, при изменении подсетей. В идеальном случае следует реализовать очистку – метод cleanup, который всегда вызывается после run, независимо от успешности его завершения.
- Чтобы любые изменения триггера отразились, триггернеобходимо перезапускать каждый раз при изменении триггера.
Пока триггеры используются только до своего первого события, поскольку они предназначены только для возобновления отложенных задач, а задачи возобновляются после срабатывания первого события. В будущем Airflow планирует разрешить запуск DAG из триггеров с помощью поддержки нескольких событий.
Следующий пример показывает реализацию триггера, который ждет наступления определенного момента времени и затем генерирует событие.
import asyncio from airflow.triggers.base import BaseTrigger, TriggerEvent from airflow.utils import timezone class DateTimeTrigger(BaseTrigger): def __init__(self, moment): super().__init__() self.moment = moment def serialize(self): return ("airflow.triggers.temporal.DateTimeTrigger", {"moment": self.moment}) async def run(self): while self.moment > timezone.utcnow(): await asyncio.sleep(1) yield TriggerEvent(self.moment)
Методы __init__и serialize записываются как пара. Экземпляр триггера создается один раз, когда он отправляется оператором как часть его запроса на отсрочку, затем сериализуется и повторно инстанцируется в любом процессе триггера, который запускает триггер. Метод run должен быть асинхронным, чтобы не блокировать процесс, поэтому он использует async defasyncio.sleeptime.sleep. Когда он генерирует событие, оно упаковывается в self.moment, чтобы его можно было дедуплицировать, если этот триггер запустится на нескольких хостах.
Хотя триггеры могут работать в режиме высокой доступности и автоматически распределяются между хостами, на которых они запущены, рекомендуется делать их stateless, т.е. без сохранения постоянного состояния. Триггеры должны получать все необходимое из своего метода инициализации __init__, чтобы их можно было сериализовать и свободно перемещать.
При разработке собственного метода run() надо соблюдать осторожность. В Python асинхронный код может заблокировать весь процесс, если он некорректно выполняет блокирующую операцию await. При обнаружении подобного кода блокировки процесса Airflow попытается предупредить пользователя об этом в журналах триггера. Дата-инженер может включить дополнительные проверки Python, установив переменную PYTHONASYNCIODEBUG=1, чтобы убедиться в отсутствии блокировок в асинхронном коде. Особенно внимательно нужно делать это при обращении к базовой файловой системе, которая может быть блокирующей, например, при сетевом разделении.
Airflow с использованием Yandex Managed Service for Apache Airflow™
Код курса
YARF
Ближайшая дата курса
2 апреля, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Освойте администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Data Pipeline на Apache AirFlow и Apache Hadoop
- AIRFLOW с использованием Yandex Managed Service for Apache Airflow™
Источники