Асинхронные Python-вызовы и отсроченные операторы в Apache AirFlow

триггеры AirFlow, асинхронные операторы AirFlow, отложенные операторы AirFlow, асинхронное программирование AirFlow, AirFlow для дата-инженера и разработчика, Школа Больших Данных Учебный Центр Коммерсант

Что общего между триггерами, отсроченными операторами и асинхронными Python-вызовами в Apache AirFlow, чем они отличаются от стандартных операторов и сенсоров, для чего их использовать и как это сделать.

Асинхронные вызовы и отсроченные операторы в Apache AirFlow

В синхронном коде задачи выполняются последовательно, одна за другой. Причем каждая задача должна завершиться до того, как начнется следующая. Поэтому если одна задача занимает много времени, например, ожидает ответа от внешней системы или читает много данных из большого файла, весь процесс блокируется до завершения этой медленной задачи. Асинхронное программирование позволяет выполнять задачи параллельно, не блокируя основной поток выполнения. В Python это реализуется с помощью библиотеки asyncio и ключевых слов async и await. Асинхронный код особенно полезен для ввода-вывода и сетевых операций, где можно ожидать завершения задачи, выполняя другие задачи параллельно.

Выбор между синхронным и асинхронным кодом зависит от особенностей приложения. При работе с задачами, которые часто блокируются, например, сетевые запросы или операции дискового ввода-вывода, асинхронный подход может сильно улучшить производительность. Однако, разработка такого кода немного сложнее.

На практике асинхронное программирование сегодня используется довольно широко. Например, недавно мы рассматривали, как устроены асинхронные операции ввода/вывода в Apache Flink. Впрочем, асинхронный подход актуален не только для потоковой обработки данных. В пакетных ETL-конвейерах, реализуемых с помощью Apache AirFlow тоже часто возникает потребность в асинхронном выполнении операторов. Например, если DAG содержит задачи обращения к внешним системам или выполняет HTTP-запросы к сторонним сервисам подобно проверки доступности сайта с помощью веб-хуков, как я показывала здесь. Когда таких задач немного, задумываться об асинхронном выполнении нет смысла. Однако, если пакетный ETL-конвейер начинает тормозить из-за интегральной задержки медленных задач, дата-инженеру приходится принимать меры ускорения DAG. Одной из них может быть внедрение асинхронного кода в операторы, реализующие задачи.

Например, можно использовать библиотеку asyncio в PythonOperator для неблокирующих HTTP-запросов к внешним ресурсам. Именно это и делают триггеры и отсроченные операторы, которые появились в Airflow 2.2. Напомним, термином триггер в AirFlow называется служба, похожая на планировщик или рабочий процесс (worker), которая запускает цикл событий asyncio в среде этого пакетного оркестратора. Запуск триггера необходим для использования отсроченных (deferred) операторов. Асинхронные разделы Python-кода эффективно сосуществуют в одном триггерном процессе. Отсроченный оператор отличается от обычного тем, что он имеет такое состояние задачи, указывающее, что задача приостановила свое выполнение, освободила рабочий слот и отправила триггер для обработки процессом-триггером.

Стандартные операторы и датчики (сенсоры) AirFlow занимают полный рабочий слот на все время работы, даже если они простаивают. Например, если есть только 100 рабочих слотов, доступных для запуска задач, и имеется 100 DAG, ожидающих датчика, который в данный момент работает, но простаивает, нельзя запустить ничего другого, даже если весь кластер Airflow по сути простаивает. Поскольку рабочие слоты заняты, задачи ставятся в очередь, а время начала задерживается. Режим reschedule для датчиков решает часть этой проблемы, позволяя им работать только с фиксированными интервалами, но он негибкий и оперирует только временем для возобновления работы. Напомним, в Airflow датчики ждут выполнения определенных условий, прежде чем приступить к последующим задачам. Датчики имеют два варианта управления периодами простоя: mode=’reschedule’ и deferrable=True. Поскольку mode=’reschedule’ – это параметр, специфичный для BaseSensorOperator в Airflow, он позволяет датчику перепланировать себя, если условие не выполнено. А ‘deferrable=True’ – это соглашение, используемое некоторыми операторами для указания того, что задачу можно повторить или отсрочить позже, но это не встроенный параметр или режим в Airflow. Фактическое поведение повтора задачи зависит от конкретной реализации оператора.

С отсроченными операторами рабочие слоты освобождаются, когда задача опрашивает статус задания. Когда задача откладывается, процесс опроса выгружается как триггер для триггера, и рабочий слот становится доступным. Триггер может одновременно запускать много асинхронных задач опроса, предотвращает блокировку рабочих ресурсов задачами опроса. Когда получен конечный статус задания, задача возобновляется, занимая рабочий слот, пока она завершается.

Таким образом, при отсутствии возможностей выполнять операции, отсроченный оператор может приостановить себя и освободить рабочий процесс для других процессов, отсрочив собственное выполнение. При этом выполнение переходит к триггерному процессу, где будет запущен триггер, указанный оператором. Триггерный процесс может выполнить опрос или ожидание, требуемые отсроченным оператором. Затем, когда триггерный процесс заканчивает опрос или ожидание, он отправляет сигнал оператору возобновить выполнение. Во время отсроченной фазы выполнения, поскольку работа была выгружена на триггер, задача больше не занимает слот рабочего процесса, и не потребляет ресурсов. По умолчанию задачи в отсроченном состоянии не занимают слоты пула. Это поведение можно изменить, отредактировав соответствующий пул.

Последовательность работы отсроченных операторов в AirFlow можно представить так:

  • экземпляр задачи (работающий оператор) достигает точки, где он должен ждать других операций или условий, и откладывает себя с триггером, привязанным к событию, чтобы возобновить его. Это освобождает рабочий процесс для запуска другой задачи;
  • новый экземпляр триггера регистрируется Airflow и подхватывается триггерным процессом;
  • как только триггер сработает, его исходная задача будет перепланирована планировщиком;
  • планировщик ставит задачу в очередь для возобновления выполнения на рабочем узле.
триггеры в Apache AirFlow
Работа триггеров в Apache AirFlow

Рекомендуется использовать отсроченные операторы для задач, которые занимают рабочий слот при опросе состояния во внешней системе. Это повысит эффективность пакетного конвейера обработки данных и снизит эксплуатационные расходы.

Как устроены триггеры

Как уже было отмечено выше, отсроченные операторы основаны на триггерах. Триггеры разработаны для обеспечения высокой доступности Airflow, что реализуется запуском нескольких копий triggerer-процесса на разных хостах. По умолчанию каждый triggerer имеет емкость 1000 триггеров, которые он может попытаться запустить одновременно. Дата-инженер может изменить количество триггеров, которые будут запускаться одновременно, с помощью аргумента —capacity. Если количество триггеров, пытающихся запуститься, больше емкости во всех triggerer-процессах, некоторые триггеры будут отсрочены от запуска, пока другие не завершатся. Airflow пытается запускать триггеры только в одном месте одновременно и поддерживает тактовый сигнал для всех triggerer-процессов, которые в данный момент запущены. Если такой процесс остановлен или потерял соединение с база данных метаданных, Airflow автоматически перепланирует триггеры, которые были на этом хосте, для запуска в другом месте.

При этом фреймворк ожидает истечения тайм-аута, равного двухкратному значению параметра triggerer.job_heartbeat_sec, пока узел снова появится, прежде чем перепланировать триггеры. Это означает, что триггеры могут запускаться в нескольких местах одновременно. Это очень маловероятно, но заложено в контракт триггера и является ожидаемым поведением. Airflow дедуплицирует события, которые запускаются, когда триггер запускается в нескольких местах одновременно, обеспечивая прозрачность процесса для операторов. Разумеется, каждый triggerer-запуск приводит к созданию дополнительного постоянного соединения с базой данных метаданных, которая может иметь ограниченное количество таких подключений, например, как у PostgreSQL. Начиная с Airflow 2.9.0, аргументы триггеров kwargs сериализуются и шифруются перед сохранением в базе данных. Поэтому любая конфиденциальная информация, которая передается триггеру, сохраняется в зашифрованном виде и расшифровывается при чтении из базы данных метаданных.

Чтобы использовать существующие в Airflow отсроченные операторы, например, TimeSensorAsync, следует убедиться, что среда запускает обычный планировщик и хотя бы один triggerer-процесс. Далее можно работать с отсроченными операторами и сенсорами в DAG. Airflow автоматически обрабатывает и реализует процессы отсрочки выполнения для традиционных операторов на основе классов. Это означает, что нельзя использовать возможность отсрочки выполнения внутри пользовательских функций PythonOperator или TaskFlow Python. Впрочем, можно написать свои собственные отсроченные операторы. Как это сделать, мы рассмотрим в следующий раз.

Освойте администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/deferring.html
  2. https://www.astronomer.io/docs/learn/deferrable-operators
  3. https://betterprogramming.pub/making-async-api-calls-with-airflow-dynamic-task-mapping-d0cbd3066ebb?gi=6fb0e010fbdc
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту