24 августа вышел новый релиз Apache AirFlow. Знакомимся с новинками версии 2.10: гибкая настройка исполнителей для всей среды, конкретного DAG и отдельных задач, а также динамическое планирование набора данных и улучшения GUI.
Гибкая настройка исполнителей
Одной из самых главных новинок Apache AirFlow 2.10 стала конфигурация гибридного исполнения, позволяющая использовать несколько исполнителей одновременно в рамках одной среды. Раньше среда Airflow ограничивалась одним исполнителем, который является свойством конфигурации планировщика, управляющим запуском запланированных рабочих процессов. В AirFlow есть локальный Local, а также удаленные исполнители Sequential, Celery, Kubernetes, а также 2 экспериментальных исполнителя AWS: ECS Executor и Batch Executor. Каждый из этих исполнителей имеет свои достоинства и недостатки, о которых мы писали здесь. Раньше дата-инженеру приходилось выбирать между ними, чтобы найти компромисс между задержкой, изоляцией и вычислительной эффективностью. Однако, с версии 2.10 можно одновременно использовать несколько исполнителей в одной среде Airflow. Эта гибкость позволяет использовать преимущества сильных сторон различных исполнителей для разных задач, повышая общую эффективность и смягчая слабые стороны.
Пользователи могут задать исполнителя по умолчанию для всей среды и, при необходимости, назначить определенных исполнителей для отдельных DAG или задач. Для этого надо передать список, разделенный запятыми, в конфигурацию Airflow. Первый исполнитель в списке будет исполнителем по умолчанию для среды. Например, следующий код в конфигурационном файле airflow.cfg:
[core] executor = 'LocalExecutor,CeleryExecutor'
будет устанавливать локального исполнителя для среды по молчанию, а также Celery-исполнителя как альтернативу. Можно назначить исполнителя на конкретную задачу, указав это прямо в коде DAG, например, следующий оператор с исполнителем Kubernetes выводит приветствие на экран:
@task(executor="KubernetesExecutor") def hello(): print("Привет!")
Также можно указать исполнителя на уровне DAG, задав это в аргументах по умолчанию. Указанный оператор будет применен ко всем задачам в этом DAG, например:
def hello(): print("Привет!") def hello_again(): print("Привет еще раз!") with DAG( dag_id="hellos", default_args={"executor": " CeleryExecutor"}, ) as dag: hw = hello() hw_again = hello_again()
Изменения датасетов
Наборы данных и планирование с учетом данных были изначально выпущены в Airflow 2.4. Они позволяют организовать связи между DAG, которые обращаются к тем же данным, и планировать их на основе обновлений этих наборов данных. Благодаря этому можно сделать конвейеры обработки данных более атомарными и разделить их по разным командам дата-инженеров, сохраняя прозрачную зависимость между ними. В предыдущих версиях Airflow входы и выходы наборов данных должны были быть установлены во время анализа DAG, т.е. были статическими. Это позволяло избегать некорректно сформированных URI наборов данных, но исключало возможность гибкой настройки входов и выходов во время выполнения задачи. Чтобы повысить гибкость использования датасетов, Airflow 2.10 предлагает новый класс DatasetAlias, который может принимать значения набора данных и разрешается во время выполнения. Псевдоним позволяет определять нисходящие расписания или входы, не зная заранее точное имя динамического набора данных. Чтобы использовать псевдоним набора данных, его надо установить как выход для задачи, а затем связать с ним события набора данных, определяя outlet_events. Например, в следующем коде
@task(outlets=[DatasetAlias("my-task-outputs")]) def my_task(*, ds, outlet_events): outlet_events["my-task-outputs"].add(Dataset(f"s3://bucket/my-task/{ds}") )
часть ds набора данных URI будет заполнена во время выполнения на основе информации, переданной задаче. Поскольку эта информация не известна заранее, можно запланировать нисходящий DAG на основе псевдонима:
DAG ( . . . , schedule = DatasetAlias ( "my-task-outputs" ) )
Эта функция очень гибкая и предназначена для работы со старыми реализациями наборов данных. Даже при использовании псевдонимов, DAG можно планировать на основе URI набора данных, добавляя несколько событий к одному псевдониму.
Еще одним преимуществом новой функции псевдонима набора данных является возможность прикреплять метаданные к событию, используя дополнительный параметр или класс Metadata.
@task(outlets=[DatasetAlias("my-task-outputs")]) def my_task(*, ds): s3_dataset = Dataset(f"s3://bucket/my-task/{ds}") yield Metadata(s3_dataset, extra={"k": "v"}, alias="my-task-outputs")
Это позволяет сохранять информацию о данных, которые были обработаны, например, количество записей, обработанных в этой задаче, новую оценку точности модели после обучения или имена файлов любых обработанных файлов. Эти метаданные также могут использоваться задачами в нижестоящих DAG, которые взаимодействуют с тем же набором данных.
Обновления GUI и другие улучшения Apache Airflow в релизе 2.10
Для поддержки новой функции псевдонима набора данных был обновлен веб-интерфейс AirFlow. Новое представление содержит более подробную информацию о каждом событии набора данных, включая источник, запуски DAG, которые были вызваны этим набором данных, и дополнительные сведения.
Граф зависимостей и список всех наборов данных в экземпляре Airflow теперь находятся на отдельных вкладках, что стало намного понятнее и удобнее для навигации. События набора данных теперь также отображаются на вкладке «Подробности» каждого запуска DAG и на графе DAG. Еще релиз 2.10 веб-интерфейс Airflow поддерживает очень модный темный режим. Еще в GUI добавлена новая кнопка для повторного анализа DAG по требованию благодаря добавлению конечной точки повторного анализа в API. В целом GUI версии 2.10 стал более наглядным, в частности, отображаются зависимости невыполненных задач на странице сведений и улучшена визуализация XCom-объектов.
В заключение отметим еще несколько важных обновлений Airflow 2.10:
- отсроченные операторы теперь могут выполняться напрямую из триггера, не обращаясь к worker’у. Для некоторых операторов, таких как датчики (сенсоры) это особенно эффективно.
- история экземпляров задач теперь сохраняется для всех попыток, а не только для самой последней. Это обновление является частью разработки управления версиями DAG, что появится в следующих выпусках Airflow.
- логи исполнителя теперь отправляются в журналы задач. Если исполнитель не может запустить задачу, соответствующие сообщения об ошибках будут доступны пользователю в журналах задач, что значительно упрощает отладку.
Таким образом, самый популярный ETL-оркестратор пакетных процессов стал еще удобнее и полезнее для практических задач реальной дата-инженерии.
Узнайте больше про администрирование и эксплуатацию Apache AirFlow на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Data Pipeline на Apache AirFlow и Apache Hadoop
- AIRFLOW с использованием Yandex Managed Service for Apache Airflow™
Источники