Как изменить приоритет задачи в очереди исполнителя Apache AirFlow, на что влияет метод определения весов, каким образом можно балансировать нагрузку с помощью пулов и зачем настраивать количество слотов.
Как приоритизировать задачи в очереди Apache AirFlow
Дата-инженеры, которые используют Apache AirFlow для оркестрации пакетных процессов, знают, что задачи скапливаются в очереди исполнителя. Исполнитель — это свойство конфигурации планировщика, который управляет запуском запланированных рабочих процессов. В соответствии с жизненным циклом задачи в Apache AirFlow, о чем мы писали здесь, сначала планировщик определяет время запуска задачи и других зависимостей, включая завершение вышестоящих задач. При этом задача переходит в состояние запланирована (scheduled). После назначения задачи исполнителю, она переходит в состояние очереди (queued). Когда исполнитель берет задачу и рабочий процесс начинает ее выполнение, задача переходит в соответствующее состояние выполнения (running). Иногда задачи зависают в очереди: намеренно, если достигнут установленный параллелизм, или случайно в случае сбоя планировщика.
По умолчанию все задачи, скопившиеся в очереди исполнителя, равны между собой по приоритетам и запускаются в порядке планирования, определенном в DAG. Однако, дата-инженер может вручную приоритизировать задачу с помощью параметра priority_weight, который определяет приоритеты в очереди исполнителя. По умолчанию priority_weight равен 1, но его значение можно увеличить до любого целого числа.
Каждая задача имеет истинное значение параметра priority_weight, рассчитываемое на его основе правила weight_rule и метода определения весов, используемый для расчета эффективного общего веса приоритета задачи. Всего в Apache Airflow есть 3 метода определения весов:
- downstream, когда эффективный вес задачи вычисляется как общая сумма всех нижестоящих потомков. Вышестоящие задачи будут иметь более высокий вес и будут планироваться более агрессивно при использовании положительных значений веса. Это полезно, если есть несколько экземпляров запуска DAG и нужно, чтобы все вышестоящие задачи были выполнены для всех запусков, прежде чем каждый DAG сможет продолжить обработку нижестоящих задач. Этот метод используется в Airflow по умолчанию.
- upstream, когда эффективный вес задачи равен общей сумме ее вышестоящих родительских задач. При этом последующие задачи имеют более высокий вес и будут планироваться более агрессивно при использовании положительных значений веса. Такая настройка полезна, когда есть несколько экземпляров запуска DAG и нужно, чтобы каждый из них был завершен перед запуском вышестоящих задач других запусков DAG.
- При этом методе определения весов эффективный вес задачи соответствует заданному значению priority_weight без дополнительного изменения. Это позволяет точно знать приоритет задачи и дает дополнительный эффект значительного ускорения процесса создания задач для очень больших DAG.
Начиная с версии 2.9.0 d AirFlow можно реализовать свой собственный метод определения весов, расширив класс PriorityWeightStrategy и зарегистрировав его в плагине. Эта функция пока считается экспериментальной, однако воспользоваться ей можно уже сейчас. Например, следующий код объявляет класс стратегии, которая уменьшает вес приоритета с каждой попыткой выполнения задачи DAG.
class DecreasingPriorityStrategy(PriorityWeightStrategy): def get_weight(self, ti: TaskInstance): return max(3 - ti._try_number + 1, 1) class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin): name = "decreasing_priority_weight_strategy_plugin" priority_weight_strategies = [DecreasingPriorityStrategy]
Чтобы использовать эту стратегию, следует импортировать модуль CustomPriorityWeightStrategy:
from custom_weight_rule_module import CustomPriorityWeightStrategy
Далее можно создать экземпляр пользовательского класса и указать его в параметре задачи weight_rule:
task1 = PythonOperator(task_id="task", weight_rule=CustomPriorityWeightStrategy())
Также можно явно указать путь к пользовательскому классу:
task1 = BashOperator( task_id="task", weight_rule="custom_weight_rule_module.CustomPriorityWeightStrategy", )
Управление пулами
Параметр priority_weight можно использовать вместе с пулами для ограничения параллелизма выполнения произвольных наборов задач. Пулы в Airflow используются для управления ресурсами и предотвращения перегрузки систем. Параметр pool_slots указывает, сколько слотов будет занимать каждая задача, позволяя гибко управлять нагрузкой и распределением задач.
Список пулов управляется в пользовательском интерфейсе Airflow, присваивая пулам имена и назначая им несколько рабочих слотов. Там же дата-инженер может включить отложенные задачи при подсчете занятых слотов пула. Когда задача Airflow выполняется в рамках пула, она занимает определенное количество слотов. Как только все эти слоты будут заняты, выполняемые задачи будут поставлены в очередь, и их состояние будет отображаться в пользовательском интерфейсе фреймворка. Когда слоты освобождаются, задачи в очереди начинают выполняться на основе весов приоритета задачи и ее потомков.
Если задаче не присвоен новый пул, они назначаются пулу по умолчанию default_pool, который инициализируется со 128 слотами. Пул по умолчанию может быть изменен через пользовательский интерфейс или в CLI, но не может быть удален. Например, следующий код создает 2 легковесные и 1 тяжеловесную задачи в пуле from_PG_2_Elastic:
PythonOperator( task_id="extract_task", pool_slots=1, pool="from_PG_2_Elastic", ) BashOperator( task_id="transform_task", pool_slots=2, pool="from_PG_2_Elastic", ) BashOperator( task_id="load_task", pool_slots=1, pool="from_PG_2_Elastic", )
Таким образом, пулы позволяют балансировать нагрузку, которая неодинакова у разных задач. Например, задачи связанные со сложными вычислениями или обращению к внешним системам, могут быть потреблять больше системных ресурсов. Поэтому для более эффективной эксплуатации Apache Airflow дата-инженеру полезно знать неочевидные тонкости создания задач.
Узнайте больше про администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Data Pipeline на Apache AirFlow и Apache Hadoop
- AIRFLOW с использованием Yandex Managed Service for Apache Airflow™
Источники