Отладка конвейеров Apache AirFlow: операторы, кластерные политики и обратные вызовы задач

callback AirFlow operator кластерная политика, обратные вызовы Apache AirFlow, пользовательский оператор Apache AirFlow, отладка и мониторинг конвейеров обработки данных Apache AirFlow, обучение AirFlow, курсы AirFlow администратор кластера дата-инженер, AirFlow операторы DAG примеры курсы обучение, обучение инженеров данных Big Data, курсы дата-инженеров, Школа Больших Данных Учебный центр Коммерсант

Проблемы отладки конвейеров обработки данных в Apache AirFlow и способы их решения средствами самого фреймворка. Как дата-инженеру настроить мониторинг системных событий на уровне DAG или отдельной задачи: операторы, кластерные политики и обратные вызовы.

Отладка конвейеров обработки данных в Apache AirFlow: проблемы и возможности

Практикующий дата-инженер знает, как бывает сложно найти ошибку во множестве конвейеров обработки данных, реализованных в виде DAG Apache AirFlow. Ошибка может скрываться как на уровне направленного ациклического графа, так и на уровне отдельно взятой задачи или оператора. Тем не менее, в большинстве случаев отладка сводится к следующим шагам:

  • собрать все объекты оператора;
  • собрать метаданных выполнения;
  • настроить панель оповещения и мониторинга на основе вышеупомянутых метаданных.

Для реализации этих шагов можно сделать собственное решение, используя инструменты с открытым исходным кодом, или воспользоваться внешним сервисом, который централизует метаданные, а также обеспечивает мониторинг и оповещения. Кроме того, AirFlow поддерживает несколько механизмов логирования и генерации метрик для сбора, обработки и визуализации в нижестоящих системах. В частности, AirFlow поддерживает уведомление об ошибках в режиме реального времени благодаря интеграции с Sentry – сервисом мониторинга производительности приложений и отслеживания ошибок, который можно развернуть в своей инфраструктуре или использовать в облачном варианте.

По умолчанию AirFlow поддерживает логирование в локальную файловую систему, записывая туда логи с веб-сервера, планировщика и worker’ов, исполняющих задачи. Это подходит для сред разработки и для быстрой отладки. Для облачных развертываний AirFlow есть обработчики ведения журнала в облачное хранилище AWS, Google Cloud и Azure. Параметры логирования задаются в файле конфигурации AirFlow, который должен быть доступен для всех процессов: веб-сервера, планировщика и worker’ов. Для производственных развертываний можно использовать FluentD для сбора логов и отправки их в места назначения, такие как Elasticsearch или Splunk, а также StatsD для сбора метрик и их отправки в Prometheus.

Однако, как уже было отмечено выше, чтобы собрать и визуализировать эти системные метрики, сперва необходимо поработать с операторами и задачами. В рамках AirFlow это реализуется средствами кластерной политики и обратных вызовов, чтобы отслеживать продолжительность и статусы задач, а также качество обрабатываемых данных. Как это сделать, мы рассмотрим далее.

Операторы AirFlow и политики кластера

Для массовой манипуляции с DAG и операторами на разных этапах AirFlow предоставляет инструмент кластерных политик. Airflow предоставляет несколько политик, каждая из которых представляет собой функцию, которая будет загружаться и вызываться фреймворком. Это очень удобно, когда надо изменить DAG или задачи на уровне всего кластера. Также инструмент политик кластера позволяет проверить, что весь DAG или отдельные задачи соответствуют определенному стандарту, установить для них аргументы или выполнить пользовательскую логику маршрутизации.

В AirFlow есть 3 вида кластерной политики:

  • dag_policy(dag), которая принимает DAG в качестве параметра и запускается во время загрузки DAG из DagBag – набора направленных ациклических графов, извлеченного из дерева папок с высокоуровневыми настройками конфигурации. В этих настройках задается, например, какую базу данных использовать в качестве серверной части и на каком исполнителе запускаются задачи. Это упрощает запуск сред развертывания по циклу разработки ПО (dev/test/prod) или для разных групп и профилей безопасности. Настройки уровня системы содержатся в DagBag, чтобы запускать несколько независимых развертываний.
  • task_policy(task), которая принимает задачу типа BaseOperator и выполняется, когда задача создается во время разбора задачи из DagBag во время загрузки. Это означает, что все определение задачи может быть изменено в ее политике, применяемой ко всем экземплярам задачи, которые будут выполняться в будущем, но не к той, что выполняется в DagRun в текущий момент.
  • task_instance_mutation_hook(task_instance), которая принимает экземпляр задачи типа TaskInstance и применяется не к задаче вообще, а к текущему экземпляру задачи, выполняющейся в настоящее время, т.е. находящемуся в DagRun. Он выполняется в worker’е, а не в файловом процессоре DAG, непосредственно перед выполнением экземпляра задачи.

Кластерные политики уровня DAG и задачи могут вызвать исключение AirflowClusterPolicyViolation, которое указывает, что переданный им DAG или задача не соответствует требованиям и не должен загружаться. Любые дополнительные атрибуты, установленные политикой кластера, имеют приоритет над теми, которые определены в файле DAG. Например, если задан SLA для задачи в файле DAG, а затем кластерная политика также задает это значение, именно значение кластерной политики будет иметь приоритет.

В Apache AirFlow есть 2 способа настроить кластерную политику:

  • создать файл airflow_local_settings.py в пути поиска python, например, папка config/ в $AIRFLOW_HOME, а затем добавить вызываемые объекты в файл, соответствующий какому-либо уровню политики кластера, например, dag_policy;
  • использовать точку входа setuptools в пользовательском модуле с помощью интерфейса Pluggy — ядра управления плагинами и вызова перехватчиков событий (хуков, hoock) для pytest. Pluggy позволяет более чем 500 плагинам расширять и настраивать поведение pytest по умолчанию. Даже сам pytest состоит из набора плагинов, которые вызываются последовательно в соответствии с четко определенным набором протоколов. Это позволяет дата-инженеру расширять или изменять поведение хост-программы путем установки плагина для нее. Код плагина будет выполняться как часть обычного выполнения программы, изменяя или улучшая некоторые ее аспекты. Этот метод требует навыков работы с пакетами Python: сперва следует создать свою функцию политики в модуле и добавить точку входа в спецификацию проекта, декорировав этот модуль или класс маркером @hookimpl.

После этого функции политики будут вызываться различными компонентами AirFlow. При этом точный порядок вызова плагинов не определен, а потому дата-инженеру нужно задать его самостоятельно. При этом для любого способа определения функций политики имена аргументов должны точно совпадать с теми, что определены в документации Apache AirFlow:

  • airflow.policies.task_policy(task) позволяет перепрограммировать некоторые параметры задачи task после ее загрузки в DagBag. Это пригодится, чтобы установить определенную очередь, чтобы убедиться их распределение по нужным worker’ам. Можно применить политику тайм-аута задачи, чтобы гарантировать длительность ее выполнения не дольше заданного порога.
  • airflow.policies.dag_policy(dag) позволяет перепрограммировать некоторые параметры DAG после их загрузки в DagBag. Это пригодится, например, чтобы применить пользователя по умолчанию для DAG или проверить настройку тегов для каждого конвейера обработки данных.
  • airflow.policies.task_instance_mutation_hook(task_instance) – этот параметр позволяет изменять экземпляры задач task_instance перед их постановкой в очередь планировщиком Airflow. Это можно использовать, например, для изменения экземпляра задачи во время повторных попыток.
  • airflow.policies.pod_mutation_hook(pod) позволяет изменить под Kubernetes, т.е. объект kubernetes.client.models.V1Pod перед планированием до того, как он будет передан клиенту Kubernetes для планирования. Это пригодится для добавления контейнеров sidecar или init в каждый рабочий под, запускаемый KubernetesExecutor или KubernetesPodOperator.
  • airflow.policies.get_airflow_context_vars(context) позволяет получить переменные контекста AirFlow, которые представляют собой пары ключ-значение и становятся доступными в качестве переменных среды при выполнении нужного экземпляра задачи task_instance с контекстом context.

Data Pipeline на Apache Airflow

Код курса
AIRF
Ближайшая дата курса
28 августа, 2024
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.

Сбор метаданных событий с помощью обратных вызовов задач

Для мониторинга системных событий при выполнении конвейеров данных Apache AirFlow их необходимо перехватывать. Это можно сделать следующими способами:

  • обернуть метод выполнения задач в Python-коде и добавить новые функции относительно исходного выполнения;
  • использовать обратные вызовы задачи, которые фреймворк вызывает для разных событий в процессе запуска экземпляра задачи.

Первый вариант годится для отладки, но не подходит для производственного использования на больших и сложных конвейерах обработки данных, поскольку манипуляции с методом execute() в пользовательском коде могут повлиять на логику обработки данных и привести к неожиданному поведению. Поэтому работа с обратными вызовами для перехвата системных событий при выполнении задачи более предпочтительна.

Обратные вызовы в Apache AirFlow бывают следующих видов:

  • pre_execute — запускается непосредственно перед запуском метода execute();
  • post_execute — запускается сразу после запуска метода execute() и только если не было ошибки;
  • on_failure_callback — запускается, если при выполнении произошла ошибка и задача не удалась;
  • on_success_callback — запускается, если в процессе выполнения не было ошибки и задача выполнена успешно;
  • on_retry_callback — запускается, если при выполнении произошла ошибка и задача настроена на повторную попытку;
  • on_kill — запускается, если истекло время выполнения метода, прямо перед возникновением ошибки тайм-аута или после получения системного сигнала SIGTERM.

Подробнее про использование обратных вызовов для управления задачи в Apache AirFlow мы писали здесь и здесь.

Эти обратные вызовы позволяют отслеживать продолжительность выполнения задачи, а также отловить проблемы с качеством данных, не заметные в явном виде. Почти каждый дата-инженер сталкивался с ситуаций, когда пользовательский интерфейс AirFlow отображает выполнение конвейера обработки данных зеленым цветом, но тогда данные доставлены в том виде, как это ожидалось. Это означает, что проблема в самих данных, а не в DAG, но у AirFlow нет прямого способа отследить это. Такое особенно часто случается, когда поставщики данных меняют свой API, что влечет изменение схемы данных, и необходимость исправления кода их обработки. Кластерные политики кластера и обратные вызовы задач помогут отслеживать подобные события и оценивать показатели качества данных.

В заключение отметим несколько лучших практик мониторинга конвейеров обработки и оценки качества данных в Apache AirFlow:

  • извлекать информацию о запросах из SQL-операторов (SnowflakeOperator, PostgresOperator и пр.) с помощью sqlparse – SQL-анализатора для Python, который обеспечивает поддержку синтаксического анализа, разделения и форматирования SQL-операторов;
  • оценивать статистику по каждому столбцу (количество пропусков и выбросов, как в абсолютном выражении, так и в процентилях), используя pydeequ или Great Expectation для проверки вывода операторов;
  • отслеживать схему данных;
  • собирать системные метрики (потребление памяти, процессора, диска и т.д.) с помощью filprofiler или аналогичных инструментов;
  • отслеживать значения шаблонных полей (template_fields) шаблонизатора Jinia (во время выполнения.

Узнайте больше про использование Apache AirFlow  для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/apache-airflow/how-to-track-metadata-with-airflow-cluster-policies-and-task-callbacks-f80d42db9895
  2. https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/cluster-policies.html
Поиск по сайту