Настройка планировщика Apache AirFlow

Apache AirFlow примеры курсы обучение, Apache AirFlow развертывание администрирование оптимизация, Apache AirFlow для дата-инженеров и администраторов, Школа Больших данных Учебный центр Коммерсант

Как устроен планировщик заданий Apache AirFlow, от чего зависит его производительность и какие конфигурации помогут ее улучшить: настройки, приемы, рекомендуемые значения и лучшие практики.

Как работает планировщик Apache AirFlow

Apache AirFlow как фреймворк оркестрации пакетных процессов включает несколько компонентов. Одним из них является планировщик (scheduler), который отслеживает все задачи и DAG, а также запускает экземпляры задач после завершения их зависимостей, используя настроенный исполнитель. При этом выполняется подпроцесс, который отслеживает и синхронизируется со всеми DAG в указанном каталоге DAG. По умолчанию раз в минуту планировщик собирает результаты анализа DAG и проверяет, можно ли запустить какие-либо активные задачи. При этом файлы Python, содержащиеся в папке DAG, преобразуются в объекты DAG, содержащие задачи для планирования.

Первый запуск DAG создается на основе минимальной даты start_date в его задачах. Последующие запуски DAG создаются в соответствии с расписанием DAG. Для DAG с расписанием cron или timedelta, что мы разбирали в прошлой статье, планировщик не будет запускать задачи, пока не закончится охватываемый им период. Например, задание с установленным значением @daily запускается после окончания дня. Это гарантирует, что все данные, необходимые для этого периода, будут полностью доступны до выполнения DAG. В пользовательском интерфейсе это выглядит так, как будто Airflow запускает задачи на день позже.

Если нужно использовать внешний триггер для запуска DAG в будущем, надо установить параметру allow_trigger_in_future значение True в разделе планировщика в конфигурационном файле airflow.cfg. Это работает, когда DAG определен с незаданным значением расписания, т.е. schedule=None. Если для параметра allow_trigger_in_future установлено значение False (по умолчанию), то при его ручном запуске с интервалами данных в будущем, планировщик не выполнит этот DAG, пока не наступит его дата интервала старта data_interval_start.

Планировщик является постоянно работающей службой в производственной среде Airflow и использует конфигурацию, указанную в конфигурационном файле airflow.cfg в разделе scheduler. Планировщик Airflow как сервис разработан таким образом, чтобы обеспечить высокую пропускную способность для ускорения планирования. Планировщик сам проверяет, сколько свободных слотов доступно в пуле, и планирует ровно столько или меньше экземпляров задач за одну итерацию. Про пулы задач и приоритеты мы говорили здесь. Приоритет задачи будет учитываться только тогда, когда запланированных задач будет больше, чем слотов в очереди. Поэтому может случиться, что задачи с низким приоритетом будут запланированы до задач с высоким приоритетом, если они разделяют один и тот же пакет.

Для повышения производительности и отказоустойчивости AirFlow может использовать несколько планировщиков. При этом каждый из них взаимодействует с базой данных метаданных (рекомендуется PostgreSQL 12+ или MySQL 8.0+) напрямую вместо средств обеспечения консенсуса между планировщиками (Raft, Paxos, Apache Zookeeper, Consul и пр.). Для высокой производительности и пропускной способности нужно гарантировать, что в один момент времени только один из планировщиков находится в критической секции, где TaskInstances переходят из запланированного состояния и ставятся в очередь к исполнителю, при этом гарантируя соблюдение различных ограничений параллелизма и пула. Критическая секция получается путем запроса блокировки записи на уровне строк для каждой строки таблицы pool, где хранится информация о пулах ресурсов. Для этого в Airflow используются блокировки на уровне строк базы данных с помощью запроса

SELECT * FROM slot_pool FOR UPDATE NOWAIT

PostgreSQL 12+ или MySQL 8.0+ имеют высокоэффективные внутренние механизмы управления такими блокировками, поэтому именно эти БД рекомендуются для использования в production, о чем мы писали здесь.

Как улучшить планирование DAG, настроив планировщик

Планировщик Airflow отвечает за две операции:

  • непрерывный анализ файлов DAG и синхронизация с DAG в базе данных метаданных;
  • постоянное планирование задач для выполнения.

Эти две задачи выполняются параллельно и работают независимо друг от друга в разных процессах. Поэтому для точной настройки планировщика нужно учитывать следующие факторы:

  • параметры развертывания, в т.ч. тип файловой системы и ее скорость в операциях чтения для совместного использования DAG несколькими планировщиками. Также важны количество памяти, загруженность процессора и пропускная способность сети.
  • Логика и определение структуры DAG, включая количество файлов, объем и сложность каждого из них DAG, в т.ч. импорт большого количества библиотек или тяжелую обработку данных;
  • конфигурация планировщика, в т.ч. их количество, количество процессов синтаксического анализа в каждом планировщике, время ожидания между повторными обработками одного и того же DAG, количество экземпляров задач в одном цикле, количество запусков DAG на цикл, а также частота очистки и проверки потерянных задач.

Для повышения производительности файловой системы, поскольку Airflow Scheduler постоянно считывает и повторно анализирует Python-файлы DAG, рекомендуется использовать распределенные версии (NFS, CIFS, EFS, GCS fuse, Azure File System). Оптимизировать количество подключений к базе данных метаданных поможет балансировщик нагрузки. Это особенно актуально для PostgreSQL, где обработка подключений основана на процессах, в отличии от MySQL, модель обработки подключений которого основана на потоках. Самым популярным балансировщиком нагрузки для PostgreSQL считается PGBouncer. Даже Helm Chart для развертывания Apache Airflow в в Kubernetes поддерживает PGBouncer.

Оптимизировать потребление ЦП, что особенно важно для процессов FileProcessors, которые анализируют и выполняют Python-файлы DAG можно, увеличив параметр min_file_process_interval. Но тогда изменения в этих файлах будут обрабатываться медленнее. Поэтому лучше перестроить DAG, избегая внешних источников данных. Также можно увеличить количество потоков обработки parsing_processes, добавить еще несколько планировщиков и настроить у них следующие параметры в конфигурационном файле:

  • max_dagruns_to_create_per_loop – количество DAG, которые блокируются каждым планировщиком при создании запусков DAG;
  • max_dagruns_per_loop_to_schedule – количество запусков DAG, которые должен проверять (и блокировать) планировщик при планировании и постановке задач в очередь. Увеличение этого предела увеличит пропускную способность для небольших DAG, но замедлит пропускную способность для больших DAG c 500 задачами и более. Установка слишком высокого значения при использовании нескольких планировщиков может привести к тому, что один планировщик возьмет на себя все запуски DAG, не оставив работы для других.
  • use_row_level_locking – использование блокировки на уровне строк для управления конкурентным доступом к задачам (при использовании нескольких экземпляров планировщика);
  • pool_metrics_interval – частота отправки статистики использования пула в StatsD (если включен statsd_on). Это довольно дорогостоящий запрос, поэтому этот параметр следует настроить на тот же период, что и период подсчета статистики StatsD.
  • orphaned_tasks_check – частота (в секундах) проверки потерянных задач или мертвых заданий SchedulerJobs, которые не отвечают после периода, заданного в scheduler_health_check_threshold. Потерянными считаются задачи, которые по каким-то причинам потеряли связь с основным процессом планировщика и продолжают висеть в статусе running или queued, хотя на самом деле они уже не выполняются или не должны выполняться. При обнаружении таких заданий все запущенные или поставленные ими в очередь задачи, будут контролироваться этим планировщиком. Scheduler периодически проверяет состояние задач, которые находятся в статусах running или queued. Если обнаруживается, что такие задачи больше не связаны с активными рабочими процессами или их запуск превысил допустимое время выполнения scheduler_health_check_threshold, Scheduler помечает эти задачи как failed или up_for_retry, в зависимости от настроек и логики обработки ошибок.
  • dag_dir_list_interval – частота сканирования каталога DAG на появление новых файлов (в секундах)
  • file_parsing_sort_mode – режим сортировки файлов DAG Для определения порядка их парсинга. Python-файлы DAG могут быть отсортированы по времени изменения (modified_time), случайным образом по нескольким планировщикам и с одним тем же порядком на одном хосте (random_seeded_by_host) или в алфавитном порядке по имени файла (alphabetical).
  • max_tis_per_query — размер пакета запросов в основном цикле планирования. Должен быть не больше parallelism. Если это значение слишком велико, то производительность SQL-запроса может снизиться из-за сложности предиката запроса блокировок или максимально допустимой длины запроса для базы данных метаданных.
  • min_file_process_interval – частота повторного анализа файлов DAG (в секундах);
  • parsing_processes – количество процессов, запущенных параллельно для синтаксического анализа файлов DAG;
  • scheduler_idle_sleep_time – неактивность планировщика между циклами;
  • schedule_after_task_execution — должен ли процесс Task supervisor выполнять мини-планирование, чтобы попытаться запланировать больше задач того же DAG. Если оставить это включенным, задачи в том же DAG будут выполняться быстрее, но это может негативно повлияют на другие DAG.

Таким образом, Apache AirFlow имеет довольно много конфигураций для гибкой настройки планировщика, позволяя эффективно управлять планированием множеством конвейеров пакетной обработки данных.

Узнайте больше про администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/scheduler.html#dag-file-processing
  2. https://www.run.ai/guides/scheduled-jobs/airflow-scheduling

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту