Пулы и приоритеты задач в Apache AirFlow

администрирование и настройка Apache AirFlow, курсы Apache AirFlow для дата-инженера, Apache AirFlow, инженерия данных примеры курсы обучение, ETL с Apache AirFlow, AirFlow для разработчика, Школа Больших Данных Учебный Центр Коммерсант

Как изменить приоритет задачи в очереди исполнителя Apache AirFlow, на что влияет метод определения весов, каким образом можно балансировать нагрузку с помощью пулов и зачем настраивать количество слотов.

Как приоритизировать задачи в очереди Apache AirFlow

Дата-инженеры, которые используют Apache AirFlow для оркестрации пакетных процессов, знают, что задачи скапливаются в очереди исполнителя. Исполнитель — это свойство конфигурации планировщика, который управляет запуском запланированных рабочих процессов. В соответствии с жизненным циклом задачи в Apache AirFlow, о чем мы писали здесь, сначала планировщик определяет время запуска задачи и других зависимостей, включая завершение вышестоящих задач. При этом задача переходит в состояние запланирована (scheduled). После назначения задачи исполнителю, она переходит в состояние очереди (queued). Когда исполнитель берет задачу и рабочий процесс начинает ее выполнение, задача переходит в соответствующее состояние выполнения (running). Иногда задачи зависают в очереди: намеренно, если достигнут установленный параллелизм, или случайно в случае сбоя планировщика.

По умолчанию все задачи, скопившиеся в очереди исполнителя, равны между собой по приоритетам и запускаются в порядке планирования, определенном в DAG. Однако, дата-инженер может вручную приоритизировать задачу с помощью параметра priority_weight, который определяет приоритеты в очереди исполнителя. По умолчанию priority_weight равен 1, но его значение можно увеличить до любого целого числа.

Каждая задача имеет истинное значение параметра priority_weight, рассчитываемое на его основе правила weight_rule и метода определения весов, используемый для расчета эффективного общего веса приоритета задачи. Всего в Apache Airflow есть 3 метода определения весов:

  • downstream, когда эффективный вес задачи вычисляется как общая сумма всех нижестоящих потомков. Вышестоящие задачи будут иметь более высокий вес и будут планироваться более агрессивно при использовании положительных значений веса. Это полезно, если есть несколько экземпляров запуска DAG и нужно, чтобы все вышестоящие задачи были выполнены для всех запусков, прежде чем каждый DAG сможет продолжить обработку нижестоящих задач. Этот метод используется в Airflow по умолчанию.
  • upstream, когда эффективный вес задачи равен общей сумме ее вышестоящих родительских задач. При этом последующие задачи имеют более высокий вес и будут планироваться более агрессивно при использовании положительных значений веса. Такая настройка полезна, когда есть несколько экземпляров запуска DAG и нужно, чтобы каждый из них был завершен перед запуском вышестоящих задач других запусков DAG.
  • При этом методе определения весов эффективный вес задачи соответствует заданному значению priority_weight без дополнительного изменения. Это позволяет точно знать приоритет задачи и дает дополнительный эффект значительного ускорения процесса создания задач для очень больших DAG.

Начиная с версии 2.9.0 d AirFlow можно реализовать свой собственный метод определения весов, расширив класс PriorityWeightStrategy и зарегистрировав его в плагине. Эта функция пока считается экспериментальной, однако воспользоваться ей можно уже сейчас. Например, следующий код объявляет класс стратегии, которая уменьшает вес приоритета с каждой попыткой выполнения задачи DAG.

class DecreasingPriorityStrategy(PriorityWeightStrategy):
    def get_weight(self, ti: TaskInstance):
        return max(3 - ti._try_number + 1, 1)

class DecreasingPriorityWeightStrategyPlugin(AirflowPlugin):
    name = "decreasing_priority_weight_strategy_plugin"
    priority_weight_strategies = [DecreasingPriorityStrategy]

Чтобы использовать эту стратегию, следует импортировать модуль CustomPriorityWeightStrategy:

from custom_weight_rule_module import CustomPriorityWeightStrategy

Далее можно создать экземпляр пользовательского класса и указать его в параметре задачи weight_rule:

task1 = PythonOperator(task_id="task", weight_rule=CustomPriorityWeightStrategy())

Также можно явно указать путь к пользовательскому классу:

task1 = BashOperator(
    task_id="task",
    weight_rule="custom_weight_rule_module.CustomPriorityWeightStrategy",
)

Управление пулами

Параметр priority_weight можно использовать вместе с пулами для ограничения параллелизма выполнения произвольных наборов задач. Пулы в Airflow используются для управления ресурсами и предотвращения перегрузки систем. Параметр pool_slots указывает, сколько слотов будет занимать каждая задача, позволяя гибко управлять нагрузкой и распределением задач.

Список пулов управляется в пользовательском интерфейсе Airflow, присваивая пулам имена и назначая им несколько рабочих слотов. Там же дата-инженер может включить отложенные задачи при подсчете занятых слотов пула. Когда задача Airflow выполняется в рамках пула, она занимает определенное количество слотов. Как только все эти слоты будут заняты, выполняемые задачи будут поставлены в очередь, и их состояние будет отображаться в пользовательском интерфейсе фреймворка. Когда слоты освобождаются, задачи в очереди начинают выполняться на основе весов приоритета задачи и ее потомков.

Если задаче не присвоен новый пул, они назначаются пулу по умолчанию default_pool, который инициализируется со 128 слотами. Пул по умолчанию может быть изменен через пользовательский интерфейс или в CLI, но не может быть удален. Например, следующий код создает 2 легковесные и 1 тяжеловесную задачи в пуле from_PG_2_Elastic:

PythonOperator(
    task_id="extract_task",
    pool_slots=1,
    pool="from_PG_2_Elastic",
)

BashOperator(
    task_id="transform_task",
    pool_slots=2,
    pool="from_PG_2_Elastic",
)

BashOperator(
    task_id="load_task",
    pool_slots=1,
    pool="from_PG_2_Elastic",
)

Таким образом, пулы позволяют балансировать нагрузку, которая неодинакова у разных задач. Например, задачи связанные со сложными вычислениями или обращению к внешним системам, могут быть потреблять больше системных ресурсов. Поэтому для более эффективной эксплуатации Apache Airflow дата-инженеру полезно знать неочевидные тонкости создания задач.

Узнайте больше про администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/overview.html
  2. https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/priority-weight.html
  3. https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/pools.html
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту