Масштабирование Apache AirFlow: причины, риски и возможности

масштабирование Apache AirFlow, падение производительности AirFlow, почему падает производительность Apache Airflow и что делать, Apache Airflow для дата-инженера и администратора кластера, обучение Apache Airflow, курсы Airflow, как работает Apache Airflow, исполнители задач Airflow, Школа Больших Данных Учебный Центр Коммерсант

Когда и почему нужно повышать производительность Apache AirFlow, как исполнитель влияет на масштабирование этого ETL-оркестратора.

Почему падает производительность AirFlow и что с этим делать

Типичными проблемами, которые требуют масштабирования кластера AirFlow, являются медленный доступ к файлам, недостаточный контроль над возможностями DAG, нерегулярные уровни трафика и конкуренция за ресурсы между рабочими нагрузками. Будучи предназначенным для оркестрации конвейеров обработки данных, Apache AirFlow отлично масштабируется, обеспечивая высокую производительность пакетных задач. Для этого можно настроить оркестратор на 3-х уровнях: всей среды, а также отдельно взятого DAG или задачи, что мы детально рассмотрим в следующей статье. Вообще AirFlow имеет множество параметров, влияющих на его производительность. От этих параметров зависит производительность анализа DAG и планирования задач, а также параллелизм в среде. Поэтому оптимизация конфигураций AirFlow применима для самых разных сценариев использования. Например, администраторы кластера или инженеры DevOps могут настраивать параметры масштабирования на уровне среды, чтобы гарантировать, что поддерживающая инфраструктура не перегружена. А дата-инженеры, как разработчики DAG могут настраивать параметры масштабирования на уровне DAG или задач, чтобы конвейеры не перегружали внешние системы. Поэтому перед настройкой параметров масштабирования следует определить требования к варианту использования оркестратора.

Для этого перечислим типовые причины, почему производительность AirFlow падает, создавая потребность в масштабировании. Одной из них является медленный доступ к файлам при использовании облачного хранилища. Быстрый доступ к файлам очень важен для производительности и целостности среды ETL-оркестратора. Четко определенная стратегия доступа к файлам гарантирует, что планировщик сможет быстро обрабатывать файлы DAG и поддерживать актуальность заданий. AirFlow поддерживает актуальность своего внутреннего представления рабочих процессов, многократно сканируя и повторно анализируя все файлы в настроенном каталоге DAG. Эти файлы необходимо часто сканировать, чтобы поддерживать согласованность между источником достоверных данных на диске для каждой рабочей нагрузки и его представлением в базе данных. Поэтому содержимое каталога DAG должно быть единообразным для всех планировщиков и рабочих процессов в одной среде. Если для хранения файлов DAG используется облачное хранилище, это может снизить производительность из-за операций удаленного доступа. Например, при использовании Google Cloud Storage каждый модуль в среде должен монтировать корзину отдельно. Повысить производительность AirFlow поможет сервер сетевой файловой системы (NFS) в кластере Kubernetes. Смонтировав NFS-сервер как том с возможностью чтения и записи в рабочие поды и поды планировщика, можно синхронизировать состояние этого тома с GCS, так взаимодействовать с облачным хранилищем надо только для загрузки или управления DAG. Это сильно снижает число операций обращения к облачному хранилищу и удаленного чтения, увеличивая производительность оркестратора. Кроме того, в случае GCS можно использовать возможности IAM для управления идентификацией и доступом пользователей, чтобы контролировать, кто может загружать файлы в данную среду. Так, например, можно разрешить пользователям загружать DAG непосредственно в среду разработки или тестирования, но ограничить для производственной среды.

Еще одним фактором, связанным с медленным доступом к файлам, является производительность их обработки. AirFlow имеет несколько способов настроить фоновую обработку файлов, установив режим сортировки, параллелизм или тайм-аут. Это позволяет оптимизировать среду для интерактивной разработки DAG или производительности планировщика в зависимости от требований.

Следующей причиной, почему производительность AirFlow падает, может быть увеличение объемов метаданных. Это заметно по времени загрузки веб-интерфейса и особенно видно во время обновлений, когда миграция может занимать по несколько часов. Устранить это можно, реализовав DAG, который использует ORM-запросы объектно-реляционного сопоставления внутри PythonOperator для удаления строк из любых таблиц, содержащих исторические данные, например, DagRuns, TaskInstances, Logs, TaskRetries и пр. Временное окно для подобного сопоставления следует выбрать на основании длительности необходимой истории для управления инцидентами и отслеживания исторических результатов работы, чтобы хранить эти данные в базе данных метаданных в приемлемых объемах.

Однако, подобное решение означает, что функции AirFlow, которые полагаются на устойчивую историю заданий, например, длительные backfill-операции, не поддерживаются. В некоторых сценариях использования это может вызвать проблемы. Поэтому в качестве альтернативы пользовательскому DAG в версии AirFlow 2.3 добавлена поддержка команды db clean для удаления устаревших метаданных. 

Также производительность AirFlow может падать из-за неоптимальных DAG, которые трудно напрямую связать с пользователями и командами, чтобы быстро связаться с ними для исправления. Исправить эту ситуацию можно, если разворачивать все DAG непосредственно из одного репозитория с отслеживанием владельца задания. Для этого можно организовать реестр пространств имен AirFlow, который представляет собой YAML- файл, где пользователи регистрируют пространство имен для своих DAG, включая информацию о владельцах заданий, исходном GitHub-репозитории и/или месте хранилища, а также основные ограничения. Следует поддерживать подобный реестр манифест для каждой среды и загружать его в облачное хранилище вместе с DAG.

Кроме этого, с авторами DAG связана еще одна причина падения производительности этого ETL-оркестратора. Если все пользователи могут напрямую писать и загружать DAG в производственную среду, администраторы AirFlow физически не могут проверять все задания до того, как они попадут в производство. Чтобы ограничить возможности, следует реализовать политику DAG в файле AirflowClusterPolicyViolation, которая считывает конфигурацию из ранее упомянутого YAML-файла манифеста и отклоняет DAG, которые не соответствуют ограничениям их пространства имен. Правила политики прописываются в файле AirflowClusterPolicyViolation, и могут быть, например, следующими:

  • для права владения идентификатор DAG должен иметь префикс имени существующего пространства имен;
  • задачи DAG должны помещаться только в указанную очередь исполнителя;
  • задачи можно запускать только в определенных пулах, чтобы предотвратить конкуренцию за вычислительные мощности между разными рабочими нагрузками;
  • любой оператор KubernetesPodOperators в DAG должен запускать поды только в указанных пространствах имен, чтобы предотвратить доступ к секретам других пространств имен;
  • задачи в DAG могут запускать поды только в указанные наборы внешних кластеров Kubernetes.

Эту политику можно расширить, чтобы обеспечить соблюдение других правил, например, разрешить только ограниченный набор операторов, или даже изменить задачи, чтобы они соответствовали определенной спецификации, добавив тайм-аут выполнения в зависимости от пространства имен, для всех задач в DAG. Подобные проверки обеспечивают достаточную отслеживаемость, а также снижают риск влияния разных DAG друг на друга.

Наконец, необходимость масштабирования AirFlow может возникнуть из-за неравномерного распределения нагрузки. Самым простым решением является абсолютное значение для интервала расписания DAG (schedule_interval), но это приводит к пиковым нагрузкам, привязанным к значению timedelta, поскольку все DAGruns создаются одновременно. Это приводит к всплеску трафика, который может перегрузить планировщик AirFlow, а также любые внешние службы или инфраструктуру, которые использует задание, например, кластер внешнего хранилища данных. В итоге получается неоптимальное использование ресурсов и увеличение времени выполнения задач.

Избежать этого поможет детерминированный случайный интервал расписания для всех автоматически генерируемых DAG. Для рандомизации можно взять хэш-функцию от постоянного числа, например, dag_id. Так можно значительно сгладить нагрузку и повысить производительность оркестратора. 

Как исполнитель влияет на настройки масштабирования

Кроме того, также настройки масштабирования зависят от исполнителя среды AirFlow, о которых мы писали здесь. Если нужно, чтобы задачи выполнялись в отдельных средах, например, из-за зависимостей от разных Python-библиотек, более мощных ресурсов для интенсивных задач или с другим уровнем доступа, можно создать дополнительные очереди, в которые подмножество заданий отправляет задачи. Затем надо настроить отдельные наборы рабочих процессов для извлечения данных из отдельных очередей. Задачу можно поместить в отдельную очередь с помощью аргумента queue в операторах. Например, для исполнителя Celery запустить рабочий процесс, выполняющий задачи из другой очереди, поможет команда

bashAirflow celery worker –queues <list of queues>

Так можно обеспечить наличие достаточных ресурсов для чувствительных или высокоприоритетных рабочих нагрузок, поскольку они не будут конкурировать с другими рабочими нагрузками за вычислительные мощности.

Например, исполнитель Celery использует постоянных рабочие процессы для выполнения задач. Поэтому для масштабирование надо учитывать количество и размер рабочих процессов, доступных для Airflow: чем их больше, тем больше возможностей для одновременного выполнения задач. В частности, можно настроить свою переменную среды worker_concurrency, которая при использовании Celery называется AIRFLOW__CELERY__WORKER_CONCURRENCY. Она определяет, сколько задач каждый рабочий процесс Celery может выполнять в любой момент времени. По умолчанию исполнитель Celery одновременно выполняет максимум шестнадцать задач. Поэтому при увеличении worker_concurrency, может потребоваться дополнительный процессор и/или память для рабочих процессов.

Исполнитель Kubernetes запускает под в кластере Kubernetes для каждой задачи. Поэтому ресурсы можно указать на уровне отдельной задачи, учитывая инфраструктуру кластера Kubernetes. Можно включить автоматическое масштабирование в кластере, чтобы использовать все преимущества гибкости этой среды управления контейнерами. Для этого надо настроить переменную среды worker_pods_creation_batch_size, которая при использовании Kubernetes называется AIRFLOW__KUBERNETES__WORKER_PODS_CREATION_BATCH_SIZE. Она определяет, сколько подов можно создать за один цикл планировщика. По умолчанию установлено значение 1, но нужно увеличить это число для повышения производительности, особенно для параллельных задач. Максимальное значение определяется возможностями кластера Kubernetes.

Также уменьшить конкуренцию за ресурсы может изоляция рабочих процессов. Однако, не все ресурсы можно тщательно распределить в AirFlow: пропускная способность планировщика, емкость базы данных и IP-пространство Kubernetes — это конечные ресурсы, которые нельзя ограничить для каждой рабочей нагрузки без создания изолированных сред.

Таким образом, из-за того, что масштабирование AirFlow во многом зависит от поддерживающей инфраструктуры и самих DAG, при настройке можно столкнуться со следующими проблемами:

  • Задержка планирования задач слишком высока. Планировщику может не хватать ресурсов для анализа DAG и последующего планирования задач. В этом случае надо изменить значение переменной среды worker_concurrency (при использовании Celery) или параллелизм.
  • DAG остаются в состоянии очереди, но не выполняются. Это может случиться, когда количество запланированных задач превышает возможности инфраструктуры. Если при этом используется исполнитель Kubernetes, надо проверить наличие доступных ресурсов в пространстве имен и увеличить значение переменной среды worker_pods_creation_batch_size. При использовании исполнителя Celery надо, соответственно, увеличить значение переменной среды worker_concurrency.
  • У отдельного DAG возникают проблемы с параллельным выполнением задач, а другие не затронуты. Возможно, проблема в самом DAG. Можно изменить его, или поработать с параметром max_active_task_per_dag, пулами и общим параллелизмом.

Читайте в нашей следующей статье, как настроить Apache AirFlow, чтобы избежать проблем с производительностью и масштабировать кластер в соответствии с нагрузкой.

Узнайте больше про Apache AirFlow  и его практическое использование в дата-инженерии, машинном обучении и аналитике больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://docs.astronomer.io/learn/airflow-scaling-workers
  2. https://shopify.engineering/lessons-learned-apache-airflow-scale
Поиск по сайту