14 августа 2023 года вышел очередной релиз Apache AirFlow . Разбираем его самые главные новые возможности, улучшения и исправления ошибок: отказ от Python 3.7, задачи установки/демонтажа, встроенная поддержка спецификации OpenLineage, обновления интерфейса, упрощение управления сложными зависимостями и другие фичи Apache AirFlow 2.7.
Задачи установки/демонтажа
Apache AirFlow 2.7 содержит более 35 новых функций, около 45 улучшений и примерно 50 исправлений ошибок. Многие функции и улучшения в этой версии сфокусированы на расширении возможностей поддержки управления ресурсами, тестирования DAG и мониторинга как самих конвейеров обработки данных, так и кластера этого оркестратора пакетных процессов.
Главной особенностью AirFlow 2.7 являются задачи установки/демонтажа. Многие рабочие процессы требуют создания ресурса, его использования для выполнения некоторой задачи, а затем устранение этого ресурса. В производственных средах AirFlow рекомендуется настраивать ресурсы и конфигурации до запуска определенных задач, а затем отключать ресурсы, даже если задачи не выполняются, чтобы сократить потребление ресурсов и расходы.
Поэтому в AirFlow 2.7 был добавлен специальный тип задачи для создания и удаления ресурсов, чтобы эффективно выполнять действия по настройке и демонтажу инфраструктуры в DAG. Например, подготовка кластера AWS EMR для запуска DAG: можно создать кластер в DagRun перед первой задачей самого конвейера обработки данных, а затем отключить его после завершения последней задачи, даже в случае сбоев. Эта функция настройки и демонтажа также полезна при обработке данных внутри AirFlow , т.к. данные между задачами передаются через объекты XCom, которые никогда не очищаются и потребляют ресурсы. Именно поэтому не рекомендуется передавать через XCom большие объемы данных.
Задачи установки/демонтажа гарантируют, что необходимые ресурсы для запуска задачи AirFlow будут настроены до выполнения задачи и что эти ресурсы будут удалены после завершения задачи, независимо от каких-либо сбоев задачи. Это полезно не только для выполнения инфраструктурных работ, но и в тестировании качества данных, оркестрации моделей машинного обучения, организации ETL-процессов. Ранее в AirFlow все эти варианты использования реализовывались с помощью сложных правил триггеров, дополнительных DAG и других обходных путей. Теперь дата-инженеру не требуется писать дополнительный код, чтобы воспользоваться преимуществами этой функции для экономии средств и ресурсов.
Любая существующая задача AirFlow может быть обозначена как задача установки/демонтажа, с особым поведением и дополнительной видимостью отношений установки/демонтажа в пользовательском интерфейсе AirFlow . Также это можно сделать в коде следующим образом:
create_cluster >> run_query1 >> run_query2 >> delete_cluster.as_teardown(setups=create_cluster)
В одном DAG может быть несколько независимых наборов задач установки и демонтажа. Например, один рабочий процесс задач устанавливает и отключает кластер, а другой устанавливает и отключает временную базу данных.
Поскольку задачи установки/демонтажа нужны для обычных рабочих задач, целевых для самого конвейера обработки данных, они удаляются при удалении связанной рабочей задачи. По умолчанию задачи демонтажа игнорируются при оценке состояния выполнения DAG, т.е. если только задача демонтажа не удалась, запуск DAG все равно будет выполнен успешно. Задача демонтажа будет запущена, если ее задача установки прошла успешно, даже если рабочие задачи завершились неудачей. Это приводит к «умной» реализации управления ресурсами в DAG, снижению потребления ресурсов и стоимости, а также экономии времени на управление ресурсами.
Для реализации этой новой функции был немного изменен API AirFlow . В частности, для условий выполнения демонтажа добавлено новое правило триггера ALL_DONE_SETUP_SUCCESS, которое выполняется, если все восходящие потоки выполнены и хотя бы одна из его настроек выполнена успешно. Напомним, правила триггера учитывают только прямые восходящие потоки. Подробнее о триггерных правилах мы ранее писали здесь.
Встроенная поддержка OpenLineage
Поскольку AirFlow используется для проектирования конвейеров обработки данных, вопросы отслеживания их происхождения при выполнении задач извлечения и преобразования становятся особенно актуальными. С версии 2.7 фреймворк поддерживает спецификацию OpenLineage – открытую платформу для сбора и анализа метаданных о происхождении данных, включая метаданные о наборах данных, заданиях и запусках. Это позволяет предоставить дата-инженерам информацию, необходимую для выявления основной причины сложных проблем и понимания влияния изменений. OpenLineage содержит открытый стандарт для сбора данных о происхождении, эталонную реализацию репозитория метаданных (Marquez), библиотеки для распространенных языков и интеграцию с инструментами конвейера данных.
Публикация операционной истории происхождения данных через интеграцию OpenLineage была основной возможностью AirFlow для устранения неполадок и сценариев использования управления. До версии 2.7 выпуск метаданных OpenLineage был возможен только с помощью реализации плагина, поддерживаемого в проекте OpenLineage, который зависел от AirFlow и внутренних компонентов оператора. Теперь встроенная поддержка OpenLineage в AirFlow делает публикацию метаданных через экосистему OpenLineage более простой и надежной. Это реализовано путем перемещения пакета openlineage-AirFlow из проекта OpenLineage к провайдеру AirFlow-openlineage в базовом образе AirFlow Docker, где его можно включить с помощью конфигурации, включая логику извлечения происхождения вместе с модульными тестами, что в большинстве случаев устраняет необходимость в дополнительных экстракторах. Наличие логики извлечения в каждом провайдере обеспечивает стабильность контракта происхождения в каждом операторе и упрощает добавление покрытия происхождения к пользовательским операторам.
Изменения в GUI и другие обновления Apache AirFlow 2.7
В пользовательский интерфейс AirFlow добавлено представления активности кластера, которое показывает полезные метрики для мониторинга, включая все, что происходит с запусками DAG и экземплярами задач (сколько из них запущено, неудачно, запланировано и т.д.), а также состояния инфраструктуры (планировщик, база данных метаданных), статус и время выполнения DAG. Также в веб-GUI AirFlow 2.7 добавлены фильтры состояния выполнения и сбоя DAG, что упрощает мониторинг большого количества конвейеров. Теперь можно просто отфильтровать все DAG, чтобы найти все запущенные и выполняющиеся конвейеры, или, наоборот, завершившиеся со сбоем.
Еще одним, связанным с мониторингом улучшением стало добавление провайдера Apprise, который позволяет отправлять уведомления в несколько сервисов, включая Teams, Twitter, Reddit и пр. Чтобы использовать провайдер Apprise, можно импортировать уведомитель и использовать его как функцию обратного вызова. Сервис отправки уведомления, определяется соединением, передаваемым в apprise_conn_id, подобно тому, как мы рассматривали здесь пример с месенджером Телеграм.
В AirFlow 2.7 добавлены опции быстрой остановки, которые можно включить с помощью нового параметра fail_stop на уровне DAG. Если для этого параметра установлено значение true, при сбое задачи любые другие текущие задачи также завершатся сбоем. Это ускоряет разработку и тестирование DAG, поскольку можно устранять любые сбои, не дожидаясь ненужного завершения выполнения других задач в этом DAG. Включение и отключение этой функции задается как установка параметра в DAG:
@dag( start_date=datetime(2023, 8, 1), schedule=None, catchup=False, fail_stop=True, )
Важно, что при использовании этого параметра все задачи в DAG должны использовать правило триггера all_success.
В заключение отметим, что еще в AirFlow 2.7 добавлена новая функция chain_linear для реализации сложных зависимостей, чтобы отмечать группы задач как успешные или неудачные и параметр конфигурации default_deferrable для упрощения реализации отложенных операторов. Также можно отключить тестирование соединений в пользовательском интерфейсе, API и CLI, что отключено по умолчанию в целях безопасности. Наконец, отключена поддержка Python 3.7, вместо команд db init, db upgrade и параметра конфигурации load_default_connections используется команда airflow db migrate для создания или обновления базы данных метаданных. Эта команда не будет создавать соединения по умолчанию: чтобы сделать это, надо явно запустить команду airflow connection create-default-connections после запуска airflow db migrate.
В случае SSL-соединения SMTP-контекст теперь использует контекст по умолчанию (default_ssl_contest) – контекст Python вместо ранее использовавшегося none. Это обеспечивает баланс между безопасностью и совместимостью. Когда сертификаты старые, самозаверяющие или неправильно настроенные, это может не работать. Это можно настроить, установив ssl_context в конфигурации электронной почты Airflow. Установка значения none не рекомендуется из соображений безопасности, поскольку это отключает проверку сертификатов и разрешает MITM-атаки, о чем мы рассказываем в новой статье про уязвимости фреймворка, выявленные в 2023 году.
Также из соображений безопасности конечная точка API /dags/*/dagRuns/*/taskInstances/*/xcomEntries/* отключает возможность десериализации произвольных значений XCom на веб-сервере. Для обратной совместимости администратор сервера может установить для этой конфигурации enable_xcom_deserialize_support значение True, чтобы включить флаг и восстановить обратную совместимость. В производственном развертывании рекомендуется не включать эту функцию и вместо этого выполнять десериализацию на стороне клиента.
Наконец, изменено имени приложения Celery по умолчанию с airflow.executors.celery_executor на airflow.providers.celery.executors.celery_executor.
Узнайте больше про Apache AirFlow и его практическое использование в дата-инженерии и аналитике больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники