Для чего нужен контекст задачи Apache AirFlow, что он собой представляет, какие включает объекты, как получить к ним доступ и чем они полезны дата-инженеру.
Что такое контекст задачи Apache AirFlow
В разработке ПО контекстом называется среда, в которой существует объект. Это понятие очень важно при использовании специализированных фреймворков. Например, в Apache Spark контекст является точкой входа в функциональные возможности фреймворка, позволяя получить доступ к кластеру с помощью диспетчера ресурсов. В Apache AirFlow контекст также используется дата-инженером, предоставляя ему информацию о работающем DAG и его среде Airflow, доступ к которой можно получить из задачи.
Контекст Airflow представляет собой словарь, который доступен во всех задачах. Одним из наиболее распространенных значений, извлекаемых из этого словаря, является ключевое слово ti (сокращение от task_instance, экземпляра задачи), которое позволяет получить доступ к атрибутам и методам этого объекта. Так можно организовать передачу данных между задачами, используя результаты выполнения одной задачи как входные параметры другой, например, с помощью механизма XCom ti.xcom_pull(task_ids=»previous_task«). Это поддерживается как в традиционном, так и в TaskFlow API, о чем мы недавно писали здесь.
Таким образом, контекст Airflow позволяет использовать параметры уровня DAG в задачах, например, чтобы применять логическую дату запуска DAG в задаче как часть имени файла или выполнить действие в задаче, обусловленное настройкой конкретной конфигурации фреймворка.
Получить доступ к информации из контекста можно следующими способами:
- передать аргумент **context функции, используемой в декорированной задаче @task или PythonOperator-операторе;
- использовать шаблоны Jinja в традиционных операторах Airflow. Напомним, фреймворк использует Jinja, инфраструктуру шаблонов Python, в качестве механизма шаблонов.
- обратиться к контексту kwarg в методе execute() любого традиционного или пользовательского оператора.
Важно помнить, что получить доступ к контекстному словарю Airflow вне задачи нельзя. Это логично, т.к. само понятие контекст в AirFlow неразрывно связано с задачей.
Чтобы получить доступ к контексту Airflow в TaskFlow API декорированной задачи @task или задаче с оператором PythonOperator, необходимо добавить **contex— аргумент в функцию задачи. Это сделает контекст доступным в виде словаря задачи. Например, в TaskFlow API вывести на экран полный словарь данных контекста задачи можно так:
from pprint import pprint @task def print_context(**context) pprint(context)
В традиционном API, который намного многословнее TaskFlow API, аналогичный код выглядит следующим образом:
from pprint import pprint def print_context_func(**context): pprint(context) print_context = PythonOperator( task_id="print_context", python_callable=print_context_func, )
Получить доступ к контексту задачи можно через шаблоны Jinja, которые позволяют передавать динамическую информацию в экземпляры задач во время выполнения, заключая переменные в двойные фигурных скобки {{ }}. Содержимое этих фигурных скобок — это шаблонный код, который оценивается во время выполнения. Получить список всех параметров, допускающих шаблоны для любого оператора, можно через его атрибут template_fields. Например, чтобы получить доступ к логической дате запуска DAG в формате, YYYY-MM-DD, можно использовать шаблон {{ ds }} в bash_command-параметре BashOperator:
task_id="print_logical_date", bash_command="echo {{ ds }}", )
Также можно использовать шаблоны Jinja для доступа к значениям XCom-объекта в параметрах традиционной задачи. В следующем фрагменте кода первая задача first_task отправит извлеченные данные first_data в XCom-объект, а вторая задача second_task будет использовать шаблон Jinja для извлечения этого значения из ti-объекта экземпляра задач, т.е. контекста Airflow, контенкации строк и вывода объединенных данных.
@task def first(): return "first_data" second = BashOperator( task_id="second_task", bash_command="echo '{{ ti.xcom_pull(task_ids='first_task') }} + second_data'", ) first() >> second
В традиционном операторе контекст Airflow всегда передается методу execute() с использованием context-аргумента ключевого слова. При разработке собственного оператора, дата-инженеру необходимо включить context в метод execute() объекта kwarg:
class PrintDAGIDOperator(BaseOperator): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) def execute(self, context): print(context["dag"].dag_id)
Что внутри контекста задачи: ключи словаря
Ключ ti, который содержит объект экземпляра задачи TaskInstance – не единственный ключ в словаре контекста. Хотя его атрибуты xcom_pull и xcom_push, которые позволяют отправлять и получать XCom-объекты, используются довольно часта, на практике дата-инженеру приходится работать и с другими ключами. В частности, ключ ts поможет информацию о планировании DAG. А ключ conf содержит объект AirflowConfigParser с информацией о конфигурации фреймворка, т.е. настройках среды. Получив программный доступ к этим настройкам, можно сделать DAG зависимым от какой-либо настройки конфигурации, например, от исполнителя.
Ключ dag содержит объект DAG, который имеет множество полезных атрибутов и методов. К примеру, метод get_run_dates() позволяет получить список всех отметок времени, с которыми DAG будет работать в течение определенного периода времени. Это особенно полезно для отладки ошибок в конвейерах обработки данных со сложными расписаниями запуска. Например, вывести на экран все DAG, старт которых запланирован с 1-го июня по 10 июля 2024 года, можно используя следующий код:
@task def get_dagrun_dates(**context): run_dates = context["dag"].get_run_dates( start_date=datetime(2024, 06, 1), end_date=datetime(2024, 07, 10) ) print(run_dates)
Ключ dag_run содержит объект запуска DAG. Из методов часто используется active_runs_of_dags(), который возвращает количество активных в данный момент запусков конкретного DAG. А атрибут external_trigger возвращает значение True, если запуск DAG был запущен вне обычного расписания. Вывести эту информацию на экран можно с помощью следующего кода:
@task def print_dagrun_info(**context): print(context["dag_run"].active_runs_of_dags()) print(context["dag_run"].external_trigger)
Ключ params содержит словарь всех параметров уровня DAG и задачи, которые были переданы конкретному экземпляру задачи. Доступ к отдельным параметрам можно получить, используя соответствующий ключ.
@task def print_param(**context): print(context["params"]["my_favorite_param"])
Ключ var содержит все переменные экземпляра Airflow, которые представляют собой пары ключ-значение. Они обычно используются для хранения информации на уровне экземпляра, которая редко меняется. Просмотреть их поможет следующий код:
@task def get_var_from_context(**context): print(context["var"]["value"].get("my_regular_var")) print(context["var"]["json"].get("my_json_var")["num2"])
Airflow с использованием Yandex Managed Service for Apache Airflow™
Код курса
YARF
Ближайшая дата курса
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Научитесь использовать Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Data Pipeline на Apache AirFlow и Apache Hadoop
- AIRFLOW с использованием Yandex Managed Service for Apache Airflow™
Источники