Устранение зависших в очереди задач: новинки будущего релиза Apache AirFlow 2.6.0

зависание задач в очереди Apache AirFlow, очереди задач Apache AirFlow, AirFlow executors, Celery Executor Kubernetes Apache Airflow, CeleryExecutor Airflow, CeleryExecutorKubernetes Apache Airflow, KubernetesExecutor Apache Airflow, обучение Apache Airflow, курсы Airflow, как работает Apache Airflow, исполнители задач Airflow, Школа Больших Данных Учебный Центр Коммерсант

Мы уже рассказывали про задачи-зомби в Apache AirFlow и способы их устранения. Продолжая тему управления распределенными процессами, сегодня поговорим про задачи, зависшие в очереди и универсальное решение для борьбы с ними, которое будет реализовано в выпуске Apache AirFlow 2.6.0, о других новинках которого читайте здесь.

Жизненный цикл задачи в Apache AirFlow

Задача является элементарной единицей конвейера обработки данных (DAG, Directed Acyclic Graph) в Apache AirFlow. Она имеет свой жизненный цикл, согласно которому ставится в очередь для выполнения на основе их зависимостей и ограничений планирования. В течение своего жизненного цикла состояние задачи меняется от запланированной к поставленной в очередь выполнения. Граф состояний задачи в AirFlow выглядит так:

задачи Apache AirFlow состояния жизненный цикл
Жизненный цикл задачи Apache AirFlow

Эта диаграмма создана в PlantUML следующим скриптом:

@startuml
[*] -> none
state Scheduler {
none --> scheduled
none --> removed
removed --> [*]
upstream_failed --> [*]
none --> upstream_failed
}
state Executor {
scheduled -> queued
}
state Worker {
queued --> running
}
running --> up_for_reschedule
up_for_reschedule --> Scheduler
running ---> success
running --> shutdown : Mark Failed
shutdown --> failed
failed --> [*]
running -> failed : Error without retry
running -> up_to_retry : Error with retry
up_to_retry  -> Scheduler
shutdown --> restarting
restarting --> up_to_retry 
success -> [*]
@enduml

Состояния задачи в Apache AirFlow могут быть следующими:

  • none – задача еще не поставлена в очередь на выполнение, ее зависимости еще не выполнены;
  • scheduled — планировщик определил, что зависимости задачи выполнены, и она должна быть запущена;
  • queued — задача назначена исполнителю и ожидает worker’а;
  • running — задача выполняется на worker’е или на локальном/синхронном исполнителе;
  • success — задача завершена без ошибок
  • shutdown — задача была запрошена извне на завершение работы во время ее выполнения;
  • restarting — задача была запрошена извне для перезапуска во время ее выполнения;
  • failed — во время выполнения задачи произошла ошибка, и ее не удалось запустить;
  • skipped — задача была пропущена из-за ветвления, LatestOnly или аналогичного;
  • upstream_failed – не удалось выполнить восходящую задачу, которая нужна согласно правилу триггера;
  • up_for_retry – задача не удалась, но остались попытки повторной попытки, и она будет перепланирована;
  • up_for_reschedule – задача представляет собой сенсор (особый тип оператора), находящийся в режиме перепланирования;
  • deferred – задача была отложена до триггера;
  • removed — задача удалена из DAG с момента запуска.

Очереди задач

Согласно жизненному циклу задачи в Apache AirFlow сперва планировщик определяет, что пришло время запуска задачи и других зависимостей, включая завершение вышестоящих задач. В это время задача переходит в состояние «запланирована» (scheduled). Когда задача назначается исполнителю, она переходит в состояние очереди (queued). Когда исполнитель, наконец, берет задачу и worker начинает выполнять ее, задача переходит в состояние выполнения (running). Иногда задачи зависают в очереди. Это может быть преднамеренным, например, достигнут параллелизм, или непреднамеренным, когда что-то выходит из строя между исполнителем и планировщиком.

Самый простой способ увидеть это — использовать диаграмму Ганта в GUI AirFlow.

GUI AirFlow Gant diagram
Исполнение задач на диаграмме Ганта в GUI AirFlow

При использовании исполнителя Celery (CeleryExecutor), о котором мы писали здесь, задачи зависали в очереди довольно часто, несмотря на параметр конфигурации AIRFLOW__CELERY__STALLED_TASK_TIMEOUT. Этот параметр означает время в секундах, по истечении которого задачи, поставленные в очередь Celery, считаются остановленными и автоматически перепланируются. Вместо этого с версии AirFlow 2.3.1 принятые задачи используют параметр task_adoption_timeout, если он указан. При нулевом значении этого параметра автоматическая очистка зависших задач отключена.

Хотя конфигурации stalled_task_timeout и task_adoption_timeout устраняли большинство проблем с задачами, застрявшими в очереди, некоторые из них все равно оставались в состоянии очереди в течение нескольких часов, до завершения работы текущего планировщика задачи, и другой планировщик не мог принять такие зависшие задачи.

Чтобы устранить эту ситуацию, дата-инженеры компании Astronomer, коммерциализирующей Apache AirFlow, написали запрос в базу данных для получения задач, зависших в очереди:

SELECT
    dag_id,
    run_id,
    task_id,
    queued_dttm
FROM
    task_instance
WHERE
    queued_dttm < current_timestamp - interval '30 minutes'
AND state = 'queued'

Оказалось, что это простой и логичный способ обнаружения застрявших задач в очереди, поскольку запрашивать базу данных AirFlow из планировщика гораздо удобнее, чем пытаться реализовать индивидуальную логику для каждого исполнителя. Поэтому в будущих релизах AirFlow ожидается следующее решение задач, застрявшие в очереди:

  • запросить в базе данных задачи, которые находились в очереди дольше, чем время, заданное параметром конфигурации task_queued_timeout;
  • отправить любые такие задачи исполнителю для обработки;

Это решение поддерживается только для удаленных исполнителей CeleryExecutor и KubernetesExecutor, т.к. лишь они поддерживают данный параметр конфигурации (scheduler.task_queued_timeout). Для унификации этого решения в Apache AirFlow 2.6.0 реализован единый механизм обнаружения и обработки зависших задач в очереди независимо от используемого исполнителя, а параметры kubernetes.worker_pods_pending_timeout, celery.stalled_task_timeout и celery.task_adoption_timeout будут объединены в единую конфигурацию scheduler.task_queued_timeout. Читайте в нашей новой статье, как настроить приоритет задач в очереди исполнителя.

Освойте все возможности Apache AirFlow  для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html
  2. https://medium.com/apache-airflow/unsticking-airflow-stuck-queued-tasks-are-no-more-in-2-6-0-6f40a1a22835
Поиск по сайту