Развивая наши курсы по Apache AirFlow для дата-инженеров и администраторов, сегодня рассмотрим, как автоматизировать обслуживание этого фреймворка, запуская поддерживающие операции как рабочие задачи по расписанию. В этой статье разбираем опыт дата-инженеров американской ИТ-компании Clairvoyant, предложивших сообществу 3 разных DAG по обслуживанию Apache AirFlow в виде open-source проектов, доступных для свободного скачивания.
3 операции сопровождения Big Data конвейеров в Apache Airflow
Дата-инженера нужно не просто строить эффективные конвейеры обработки данных с помощью Apache Airflow, Dagster или другого оркестратора batch-процессов, но и поддерживать созданные pipeline’ы. В AirFlow можно выделить 3 эксплуатационных операции, которые следует периодически повторять [1]:
- очистка хранилища метаданных (Metastore), где хранятся сведения о запущенных DAG’ах, экземплярах задач, переменных XCom, логах и прочая оперативная информация. Если экземпляр Airflow работает в течение длительного периода времени, ежедневно планируя сотни и тысячи заданий, объем Metastore должен быть довольно большим. Однако, на практике обращение к таким историческим данным происходит редко, поэтому нет необходимости хранить их в течение долгого времени, заполняя БД и сокращая место на диске. Поэтому дата-инженеры Clairvoyant разработали DAG под названием db-cleanup, который периодически удаляет записи из хранилища метаданных старше установленного срока [2].
- очистка логов Иногда облачные хранилища, такие как Amazon S3 или Azure Blob для сохранения журналов бывают недоступны, поэтому приходится записывать их на локальный диск, что чревато его переполнением и нехваткой свободного места. Многие популярные библиотеки логов типа log4j, предлагают стратегии ротации для очистки старых журналов. Но Airflow по умолчанию ничего подобного не использует, поэтому дата-инженеру приходится самостоятельно выбирать ненужные журналы и очищать их. На этот случай дата-инженеры Clairvoyant разработали DAG под названием log-cleanup, который периодически удаляет с локального диска старые записи логов [3].
- удаление остановленных задач, которые продолжали работать в фоновом режиме при удалении TaskInstance из базы данных AirFlow. Хотя обычно это действие делается, чтобы завершить задачу: при удалении экземпляра задачи TaskInstance она должна прекращать работать, но иногда такие задачи продолжают выполняться на отдельных узлах до завершения. Поэтому дата-инженеры Clairvoyant создали DAG под названием kill-halted-tasks. Он периодически завершает выполнение задач, для которых нет соответствующего экземпляра задачи в хранилище метаданных [4].
Что именно представляют собой все эти решения, а также как их использовать, мы рассмотрим далее.
Автоматизация обслуживания AirFlow от Clairvoyant: принцип действия
Все 3 вышеописанных операции обслуживания Apache AirFlow дата-инженеры компании Clairvoyant предлагают автоматизировать следующим образом [2, 3, 4]:
- все задачи очистки описаны в виде DAG’ов и упакованы в Python файлы (*.py). Эти py-файлы нужно загрузить с Github и скопировать в каталог DAG;
- далее нужно обновить глобальные переменные (SCHEDULE_INTERVAL, DAG_OWNER_NAME, ALERT_EMAIL_ADDRESSES, ENABLE_DELETE и AIRFLOW_HOSTS) в DAG желаемыми значениями;
- затем необходимо создать и установить переменные на веб-сервере Airflow (Admin -> Variables):
- airflow_db_cleanup__max_db_entry_age_in_days – длительность хранения файлов журнала (в днях), не заданная в конфигурационном файле. Например, если установлено значение 30, будут удалены файлы старше этого срока. Этот параметр нужен для DAG под названием db-cleanup, который периодически удаляет записи из хранилища метаданных старше установленного срока [2].
- airflow_log_cleanup__max_log_age_in_days – длительность хранения лог-файлов, не заданная в конфигурационном файле. Это параметр DAG’а под названием log-cleanup, который периодически удаляет с локального диска старые записи логов [3].
- airflow_log_cleanup__enable_delete_child_log — логическое значение (True / False), определяющее, следует ли удалять файлы из каталога дочернего журнала, заданного в [scheduler] конфигурационного файла airflow.cfg. Это тоже параметр DAG’а под названием log-cleanup [3].
- наконец, нужно включить DAG на веб-сервере Airflow.
В эту последовательность действий для очистки Metastore между шагами 2 и 3 также добавляется изменение списка DATABASE_OBJECTS по мере необходимости со следующими параметрами:
- airflow_db_model — модель, импортированная из airflow.models, соответствующая таблице в БД метаданных;
- age_check_column — столбец в таблице для расчета максимальной даты удаления данных;
- keep_last — логическое значение, указывающее, следует ли сохранять последний запускаемый экземпляр. Параметр keep_last_filters позволяет определить список фильтров для предотвращения удаления данных во время очистки, таких как запущенные DAG, когда для внешнего триггера установлено значение 0. А параметр keep_last_group_by дает возможность указать столбец, по которому следует группировать записи базы данных и выполнять агрегатные функции.
Для задачи очистки лог-файлов дата-инженеры Clairvoyant предлагают не только файл airflow-log-cleanup.py, который позволяет удалять журналы, указав количество рабочих узлов, но и его SSH-модификацию. Файл airflow-log-cleanup-pwdless-ssh.py удаляет логи из списка рабочих узлов по их имени хоста при наличии у пользователя фреймворка беспарольного ssh-доступа ко всем узлам, перечисленных в AIRFLOW_HOSTS. Для этого нужен SSH-ключ с открытым и закрытым ключами. А содержимое открытого ключа должно быть добавлено в файл ~/.ssh/authorized_keys на всех узлах AirFlow-кластера [3].
Узнайте, как автоматизировать свои задачи дата-инженерии с помощью Apache AirFlow, создавать сложные конвейеры аналитики больших данных с Hadoop и Spark и эффективно управлять ими. Пройдите краткосрочные курсы по AirFlow для разработчиков, ИТ-архитекторов, дата-инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в нашем лицензированном учебном центре обучения и повышения квалификации в Москве:
Источники
- https://medium.com/softwaresanders/automated-maintenance-for-apache-airflow-c4495e0f98e1
- https://github.com/teamclairvoyant/airflow-maintenance-dags/tree/master/db-cleanup
- https://github.com/teamclairvoyant/airflow-maintenance-dags/tree/master/log-cleanup
- https://github.com/teamclairvoyant/airflow-maintenance-dags/tree/master/kill-halted-tasks