Кастомизация Apache Airflow: мониторинг исполнения Big Data pipeline’ов со своими KPI

курсы по Airflow, администрирование и поддержка Airflow, Airflow maintenance operations, Apache Airflow обучение, курсы дата-инженеров и администраторов, обучение инженеров Big Data, инженерия больших данных, AirFlow применение, data pipeline, AirFlow инженерия данных обучение, Школа Больших Данных Учебный центр Коммерсант

Добавляя в наши курсы по Apache AirFlow еще больше полезных практик, сегодня разберем опыт дата-инженеров американской компании Groupon по настройке этого фреймворка. Читайте далее, как добавить собственные KPI исполнения конвейеров обработки данных в эту workflow-платформу, делая его веб-GUI более наглядным и удобным для управления DAG’ами.

Типовые возможности веб-GUI Apache Airflow для просмотра DAG и что с ними не так

Несмотря на широкую популярность Apache Airflow в качестве мощного средства управления рабочими процессами, его пользовательский интерфейс не очень нагляден по сравнению веб-GUI Dagster, о чем мы писали здесь и здесь. Поэтому дата-инженеры стремятся расширить типовой пользовательский UI Airflow, который изначально не слишком удобен для просмотра статусов заданий в конвейерах обработки данных. Особенно это проявляется по мере увеличения количества data pipeline’ов и их сложности. Чтобы смягчить эту проблему, дата-инженеры американской ИТ-компании Groupon, разработавшей одноименной сервис коллективных скидок, предлагают способ настройки веб-GUI Airflow через внесение небольших изменений в базовый код фреймворка [1].

Изначально типовой пользовательский интерфейс Airflow включает всего 3 вкладки для отображения статуса DAG [2]:

  • все запущенные цепочки задач (All);
  • активные в настоящее время (Active);
  • остановленные (Paused).

Для фильтрации DAG’ов можно добавить теги в каждый из них. Фильтр сохраняется в файле cookie и может быть сброшен.

Apache AirFlow GUI
Типовой веб-интерфейс Apache AirFlow

Однако, на практике для оперативного анализа и мониторинга исполнения большего числа конвейеров возможностей этого лаконичного интерфейса не всегда хватает. Например, нужно посмотреть, сколько DAG’ов завершились аварийно (failed) или выполняются слишком долго. Для этого дата-инженеры Groupon предлагают расширить UI AirFlow собственными ключевыми показателями эффективности (KPI, Key Performance Indicators), настроив всего 2 файла: dags.html и views.py. Как именно это сделать, мы рассмотрим далее.

Кастомизация пользовательского интерфейса

Чтобы добавить новую вкладку для просмотра DAG’ов с определенным статусом, следует внести изменения в файл dags.html. Например, чтобы внести KPI для «провалившихся» заданий, здесь следует добавить дополнительную строку, которая создаст в веб-GUI новую вкладку Failed:

<a href=»{{ url_for(‘Airflow.index’, status=’failed‘, search=request.args.get(‘search’, None), tags=request.args.get(‘tags’, None)) }}» class=»btn {{‘btn-primary’ if status_filter == ‘failed‘ else ‘btn-default’}}» title=»Show only failed DAGS»>Failed <span class=»badge»>{{ «{:,}».format(status_count_failed) }}</span></a>

Само содержимое вкладок определяется в файле views.py с помощью предопределенного ORM-сопоставлений таблицы метаданных Airflow, которые используются для запроса данных и передачи результатов в пользовательский интерфейс. Чтобы получить данные в пользовательском интерфейсе, нужно выполнить три следующих шага [2]:

  • подготовить условие фильтра для KPI;
  • создать объект для счетчика KPI;
  • вернуть объект фильтра обратно в базовый вид Airflow.

В нашем примере информация о Failed DAG приходит из объекта DagRun ORM, что эквивалентно выполнению запроса к метаданным Airflow на стороне сервера.

Для подготовки условия фильтра этого KPI, т.е. чтобы отфильтровать отображаемые данные, будут использоваться ORM DagRun и DagModel, сопоставленные с метаданными Airflow. Объект failed_dag создается путем объединения данных между этими двумя ORM [2]:

failed_dags = dags_query.filter(DagRun.dag_id == DagModel.dag_id, DagRun.state.is_(‘failed’))

Переменная счетчика KPI нужна для отображения количества результатов вместе с именем вкладки в пользовательском интерфейсе. Следующий код показывает, как это реализуется через непосредственное выполнение SQL-запроса к таблице метаданных AirFow:

status_count_failed = session.execute(

‘SELECT count(distinct dag_id) from dag_run where state IN («failed») and execution_date>

current_timestamp -90′).scalar()

Далее следует вернуть объект фильтра и счетчик обратно в базовый вид Airflow. Для этого в условии фильтра arg_status_filter следует добавить две строки, выделенные жирным шрифтом. Они отвечают за назначение созданного объекта и переменной счетчика типовым переменным.

if arg_status_filter == ‘active’:

current_dags = active_dags

num_of_all_dags = status_count_active

elif arg_status_filter == ‘paused’:

current_dags = paused_dags

num_of_all_dags = status_count_paused

elif arg_status_filter == ‘failed’:

current_dags = failed_dags

num_of_all_dags = status_count_failed

else:

current_dags = all_dags

num_of_all_dags = all_dags_count

Наконец, необходимо присвоить переменную счетчика:

if arg_status_filter else None),

num_runs=num_runs,
tags=tags,
state_color=state_color_mapping,
status_filter=arg_status_filter,
status_count_all=all_dags_count,
status_count_active=status_count_active,
status_count_paused=status_count_paused,
status_count_failed=status_count_failed)

Аналогичным образом можно отслеживать долго выполняющиеся задания, добавив соответствующие KPI, каждый из которых будет извлекать нужные данные из таблиц метаданных. Получать дополнительную информацию на основе различных критериев помогут более сложные SQL-запросы.

Чтобы посмотреть работу внесенных изменений, нужно выполнить 3 простых шага [2]:

  • в корневом каталоге репозитория кода Airflow запустить команду python setup.py install;
  • запустить веб-сервер Airflow через команду ./build/airflow webserver —p 8080;
  • проверить localhost после успешного запуска веб-сервера: http://localhost:8080

В результате всех вышеописанных действий пользовательский веб-GUI Apache AirFlow был расширен 3-мя дополнительными вкладками: Failed, Running и LongRunning, накоторых отображаются аварийно завершившиеся, выполняющиеся сейчас и зависающие DAG’и, выполняющиеся слишком долго [2].

Apache AirFlow GUI
Кастомизированный веб-интерфейс Apache AirFlow

Освойте все тонкостей современной дата-инженерии и практического применения Apache AirFlow для разработки сложных конвейеров аналитики больших данных с Hadoop и Spark на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://airflow.apache.org/docs/apache-airflow/stable/ui.html
  2. https://medium.com/groupon-eng/how-to-add-custom-kpis-to-airflow-ac09eb1bf3e1

 

Поиск по сайту