Ранее мы писали про проблемы повышения производительности Apache AirFlow и каковы их причины. В продолжение этой темы сегодня рассмотрим, как настроить этот ETL-оркестратор, чтобы избежать подобных ситуаций и масштабировать кластер в соответствии с нагрузкой.
Настройка AirFlow на уровне среды
Как мы уже отмечали, Apache AirFlow отлично масштабируется, обеспечивая высокую производительность пакетных задач. Для этого можно настроить оркестратор на 3-х уровнях: всей среды, а также отдельно взятого DAG или задачи.
Например, настройки уровня среды влияют на всю среду AirFlow, т.е. все DAG. Все они имеют значения по умолчанию, которые можно переопределить, установив соответствующую переменную среды или изменив файл конфигурации airflow.cfg. Проверить текущие значения для существующей среды можно в веб-интерфейсе AirFlow.
Основные настройки контролируют количество одновременно выполняемых процессов и продолжительность их работы во всей среде. Связанные переменные среды для всех параметров в этом разделе имеют формат AIRFLOW__CORE__PARAMETER_NAME, где PARAMETER_NAME может принимать следующие значения:
- parallelism – максимальное количество задач, которые могут одновременно выполняться в каждом планировщике в одной среде AirFНапример, если для этого параметра установлено значение 32 и есть 2 планировщика, то во всех DAG одновременно может находиться не более 64 задач в состоянии выполнения или в очереди. Если задачи остаются в запланированном состоянии в течение длительного периода времени, это надо значение. По умолчанию параллелизм равен 32.
- max_active_tasks_per_dag (ранее dag_concurrency) – максимальное количество задач, которые можно запланировать одновременно для каждого DAG. Этот параметр, по умолчанию равный 16, помогает избежать ситуации, когда какой-то DAG занимает слишком много доступных слотов из-за параллелизма или пулов. Если объем ресурсов, доступных для AirFlow, т.е. рабочих процессов Celery или ресурсов Kubernetes, увеличен, но задачи по-прежнему не выполняются должным образом, придется увеличить значения parallelism иmax_active_tasks_per_dag.
- max_active_runs_per_dag – максимальное количество активных запусков каждого DAG, которые планировщик может создать одновременно. В AirFlow запуск DAG представляет собой создание экземпляра DAG. Настройка этого параметра, по умолчанию равного 16, актуальна, если необходимо заполнить пропущенные запуски DAG.
- dag_file_processor_timeout – лимит времени, пока DagFileProcessor может работать файл DAG. По умолчанию этот параметр равен 50 секунд.
- dagbag_import_timeout – лимит времени, пока dagbag будет импортировать объекты DAG. Этот параметр, по умолчанию, равный 30 секунд, должен быть меньше значения, установленного для dag_file_processor_timeout. Это значение надо увеличить, если в журналах обработки DAG отображаются тайм-ауты, DAG не отображается в списке конвейеров или возникают ошибки импорта. Можно также увеличить это значение, если задачи не выполняются, поскольку рабочим процесса надо заполнить поле dagbag, пока задачи выполняются.
Параметры конфигурации планировщика управляют тем, как планировщик анализирует файлы DAG и создает запуски DAG. Связанные переменные среды для всех параметров в этом разделе имеют формат AIRFLOW__SCHEDULER__PARAMETER_NAME, где PARAMETER_NAME принимает следующие значения:
- min_file_process_interval – частота анализа каждого файла DAG в секундах, после чего происходит обновление DAG. Значение по умолчанию — 30 секунд. Слишком частое обновление увеличивает загрузку ЦП планировщика. Для динамических DAG, созданных с помощью сложного кода, можно увеличить это значение, чтобы повысить производительность планировщика.
- dag_dir_list_interval — частота сканирования каталога DAG на наличие новых файлов в секундах, по умолчанию 300. Чем ниже это значение, тем быстрее обрабатываются DAG, но увеличивается нагрузка на ЦП.
- dag_processing.total_parse_time – время анализа DAG, которое позволяет выбрать оптимальные значения min_file_process_interval иdag_dir_list_interval. Если dag_dir_list_interval меньше, чем время, необходимое для анализа каждого DAG, могут возникнуть проблемы с производительностью. Для небольшого количества DAG (менее 200), можно безопасно установить значение переменной среды AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL равным 30 для производственного развертывания.
- parsing_processes (ранее max_threads) – количество процессов, которое планировщик может запускать параллельно для анализа DAG. По умолчанию этот параметр равен 2. Рекомендуется задавать это значение в 2 раза больше количества доступных виртуальных ЦП, чтобы более эффективно сериализовать большое количество DAG. При использовании нескольких планировщиков, это значение применяется к каждому из них.
- file_parsing_sort_mode – режим сортировки файлов DAG, который определяет, как планировщик перечисляет и сортирует их для определения порядка анализа. Этот параметр может принимать значения по времени изменения (modified_time),случайное по хосту (random_seeded_by_host) или по алфавиту (alphabetical).По умолчанию задано значение modified_time.
- scheduler_heartbeat_sec – интервал тактового heartbeat-сигнала, который определяет, как часто должен запускаться планировщик для запуска новых задач. Значение по умолчанию — 5 секунд.
- max_dagruns_to_create_per_loop — максимальное количество DAG для создания запусков DAG в каждом цикле планировщика. По умолчанию равен 10. Чтобы освободить ресурсы для планирования задач, это значение надо уменьшить.
- max_tis_per_query – размер пакета запросов к метахранилищу в основном цикле планирования. Более высокое значение позволяет обрабатывать больше данных за один запрос, но этот запрос усложняется и может вызвать проблемы с производительностью. По умолчанию равно 16. Важно, чтобы значение max_tis_per_query было ниже значения параллелизма (core.parallelism).
Настройки на уровне DAG и задач
Настройки уровня DAG применяются только к определенным DAG и определяются в коде DAG. Рекомендуется менять параметры по умолчанию, чтобы настроить производительность конкретного DAG, когда в нем есть задачи обращения к внешней системе по веб-API или к базе данных. Если параметр существует как на уровне DAG, так и на уровне среды, параметр уровня DAG имеет более высокий приоритет.
Существует 3 основных параметра AirFlow на уровне DAG, которые можно определить в коде:
- max_active_runs (max_active_runs_per_dag) – максимальное количество активных запусков DAG. При превышении этого предела планировщик не будет создавать новые активные запуски DAG. Если этот параметр не определен, предполагается значение параметра уровня среды. Для catchup- или backfill-сценариев надо определить этот параметр так, чтобы избежать случайную инициацию большого количества запусков DAG.
- max_active_tasks (max_active_tasks_per_dag) – общее количество задач, которые могут выполняться одновременно для данного запуска DAG. По сути, он контролирует параллелизм внутри DAG. Если этот параметр не определен, используется значение, установленное на уровне среды.
- concurrency – максимальное количество экземпляров задач, которые разрешено одновременно запускать во всех активных запусках DAG. Это разрешает одному DAG запускать, например, 32 задачи одновременно, а другому – только 16. Если этот параметр не определен, используется значение, установленное на уровне среды.
Можно определить любые параметры уровня DAG. Например, следующая инструкция допускает запуск максимум 10 экземпляров задач одновременно в 3-х активных запусках DAG:
with DAG(«my_dag_id», concurrency=10, max_active_runs=3):
Параметры уровня задачи определяются их операторами, которые можно использовать для реализации дополнительных настроек производительности. В частности, дата-инженер может настроить AirFlow на уровне задачи, определив в коде следующие параметры:
- max_active_tis_per_dag (ранее task_concurrency) – максимальное количество раз, которое одна и та же задача может выполняться одновременно во всех запусках DAG. Например, если задача связана с извлечением данных из внешнего ресурса, такого как таблица БД, которая не должна изменяться несколькими задачами одновременно, можно установить это значение равным 1.
- pool – количество пулов, доступных для задачи. Пулы — это способ ограничить количество одновременных экземпляров произвольной группы задач. Этот параметр полезен, когда много рабочих процессов или одновременно работающих DAG, и надо избежать ограничения скорости API или ситуации с перегрузкой источника или приемника данных. Поскольку пулы используются для ограничения параллельного выполнения набора задач, они пригодятся для уменьшения сбоев, вызванных всплесками трафика. Хотя пулы являются полезным инструментом для обеспечения изоляции задач, управлять ими может быть непросто, поскольку только администраторы имеют доступ к их редактированию через веб-интерфейс Apache AirFlow. Обойти это ограничение можно, написав специальный DAG, который синхронизирует пулы в среде с состоянием, указанным в карте конфигурации Kubernetes, с помощью нескольких простых ORM-запросов. Это позволяет управлять пулами вместе с остальной частью конфигурации развертывания AirFlow, давая пользователям возможность обновлять пулы с помощью проверенного запроса без необходимости менять права доступа на расширенные.
Поскольку все эти параметры унаследованы от BaseOperator, их можно установить в любом определении оператора, например, в Python-операторе:
def t1_func(): pass t1 = PythonOperator( task_id="t1", python_callable=t1_func, pool="my_custom_pool", max_active_tis_per_dag=14 )
В заключение отметим параметр priority_weight, который позволяет назначить более высокий приоритет отдельной задаче в очереди исполнителя. Хотя это и не является прямым решением проблемы конкуренции за ресурсы, установка более высокого priority_weight может пригодиться для того, чтобы критические задачи, чувствительные к задержке, выполнялись раньше других. Например, чтобы базовый DAG, который генерирует простые метрики и активирует некоторые оповещения, работал как можно быстрее. Однако, для работы с priority_weight нужна согласованная шкала установки приоритета. По умолчанию эффективность параметра priority_weight для задачи, используемой при принятии решений по планированию, представляет собой сумму ее собственного веса и веса всех последующих задач. Это означает, что восходящие задачи в больших DAG будут предпочтительнее задач в меньших DAG. Поэтому для использования priority_weight требуются знания о других DAG, работающих в среде AirFlow.
Узнайте больше про Apache AirFlow и его практическое использование в дата-инженерии, машинном обучении и аналитике больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники