Устранение зависших в очереди задач: новинки будущего релиза Apache AirFlow 2.6.0

Устранение зависших в очереди задач: новинки будущего релиза Apache AirFlow 2.6.0

Мы уже рассказывали про задачи-зомби в Apache AirFlow и способы их устранения. Продолжая тему управления распределенными процессами, сегодня поговорим про задачи, зависшие в очереди и универсальное решение для борьбы с ними, которое будет реализовано в выпуске Apache AirFlow 2.6.0, о других новинках которого читайте здесь.

Жизненный цикл задачи в Apache AirFlow

Задача является элементарной единицей конвейера обработки данных (DAG, Directed Acyclic Graph) в Apache AirFlow. Она имеет свой жизненный цикл, согласно которому ставится в очередь для выполнения на основе их зависимостей и ограничений планирования. В течение своего жизненного цикла состояние задачи меняется от запланированной к поставленной в очередь выполнения. Граф состояний задачи в AirFlow выглядит так:

задачи Apache AirFlow состояния жизненный цикл
Жизненный цикл задачи Apache AirFlow

Эта диаграмма создана в PlantUML следующим скриптом:

@startuml
[*] -> none
state Scheduler {
none --> scheduled
none --> removed
removed --> [*]
upstream_failed --> [*]
none --> upstream_failed
}
state Executor {
scheduled -> queued
}
state Worker {
queued --> running
}
running --> up_for_reschedule
up_for_reschedule --> Scheduler
running ---> success
running --> shutdown : Mark Failed
shutdown --> failed
failed --> [*]
running -> failed : Error without retry
running -> up_to_retry : Error with retry
up_to_retry  -> Scheduler
shutdown --> restarting
restarting --> up_to_retry 
success -> [*]
@enduml

Состояния задачи в Apache AirFlow могут быть следующими:

  • none – задача еще не поставлена в очередь на выполнение, ее зависимости еще не выполнены;
  • scheduled — планировщик определил, что зависимости задачи выполнены, и она должна быть запущена;
  • queued — задача назначена исполнителю и ожидает worker’а;
  • running — задача выполняется на worker’е или на локальном/синхронном исполнителе;
  • success — задача завершена без ошибок
  • shutdown — задача была запрошена извне на завершение работы во время ее выполнения;
  • restarting — задача была запрошена извне для перезапуска во время ее выполнения;
  • failed — во время выполнения задачи произошла ошибка, и ее не удалось запустить;
  • skipped — задача была пропущена из-за ветвления, LatestOnly или аналогичного;
  • upstream_failed – не удалось выполнить восходящую задачу, которая нужна согласно правилу триггера;
  • up_for_retry – задача не удалась, но остались попытки повторной попытки, и она будет перепланирована;
  • up_for_reschedule – задача представляет собой сенсор (особый тип оператора), находящийся в режиме перепланирования;
  • deferred – задача была отложена до триггера;
  • removed — задача удалена из DAG с момента запуска.

Очереди задач

Согласно жизненному циклу задачи в Apache AirFlow сперва планировщик определяет, что пришло время запуска задачи и других зависимостей, включая завершение вышестоящих задач. В это время задача переходит в состояние «запланирована» (scheduled). Когда задача назначается исполнителю, она переходит в состояние очереди (queued). Когда исполнитель, наконец, берет задачу и worker начинает выполнять ее, задача переходит в состояние выполнения (running). Иногда задачи зависают в очереди. Это может быть преднамеренным, например, достигнут параллелизм, или непреднамеренным, когда что-то выходит из строя между исполнителем и планировщиком.

Самый простой способ увидеть это — использовать диаграмму Ганта в GUI AirFlow.

GUI AirFlow Gant diagram
Исполнение задач на диаграмме Ганта в GUI AirFlow

При использовании исполнителя Celery (CeleryExecutor), о котором мы писали здесь, задачи зависали в очереди довольно часто, несмотря на параметр конфигурации AIRFLOW__CELERY__STALLED_TASK_TIMEOUT. Этот параметр означает время в секундах, по истечении которого задачи, поставленные в очередь Celery, считаются остановленными и автоматически перепланируются. Вместо этого с версии AirFlow 2.3.1 принятые задачи используют параметр task_adoption_timeout, если он указан. При нулевом значении этого параметра автоматическая очистка зависших задач отключена.

Хотя конфигурации stalled_task_timeout и task_adoption_timeout устраняли большинство проблем с задачами, застрявшими в очереди, некоторые из них все равно оставались в состоянии очереди в течение нескольких часов, до завершения работы текущего планировщика задачи, и другой планировщик не мог принять такие зависшие задачи.

Чтобы устранить эту ситуацию, дата-инженеры компании Astronomer, коммерциализирующей Apache AirFlow, написали запрос в базу данных для получения задач, зависших в очереди:

SELECT
    dag_id,
    run_id,
    task_id,
    queued_dttm
FROM
    task_instance
WHERE
    queued_dttm < current_timestamp - interval '30 minutes'
AND state = 'queued'

Оказалось, что это простой и логичный способ обнаружения застрявших задач в очереди, поскольку запрашивать базу данных AirFlow из планировщика гораздо удобнее, чем пытаться реализовать индивидуальную логику для каждого исполнителя. Поэтому в будущих релизах AirFlow ожидается следующее решение задач, застрявшие в очереди:

  • запросить в базе данных задачи, которые находились в очереди дольше, чем время, заданное параметром конфигурации task_queued_timeout;
  • отправить любые такие задачи исполнителю для обработки;

Это решение поддерживается только для удаленных исполнителей CeleryExecutor и KubernetesExecutor, т.к. лишь они поддерживают данный параметр конфигурации (scheduler.task_queued_timeout). Для унификации этого решения в Apache AirFlow 2.6.0 реализован единый механизм обнаружения и обработки зависших задач в очереди независимо от используемого исполнителя, а параметры kubernetes.worker_pods_pending_timeout, celery.stalled_task_timeout и celery.task_adoption_timeout будут объединены в единую конфигурацию scheduler.task_queued_timeout. Читайте в нашей новой статье, как настроить приоритет задач в очереди исполнителя.

Освойте все возможности Apache AirFlow  для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

[elementor-template id=»13619″]

Источники

  1. https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks/
  2. https://medium.com/apache-airflow/unsticking-airflow-stuck-queued-tasks-are-no-more-in-2-6-0-6f40a1a22835