В одной из статей мы говорили о TaskFlow API, который позволяет задачам передавать и получить друг от друга информацию. За кулисами этого приема и передачи в Apache Airflow лежит XCom. Сегодня расскажем вам о том, как взаимодействуют задачи через XCom, а также, как задаются параметры конфигурации через особые переменные Apache Airflow.
Что такое XCom в Apache Airflow
XCom (сокр. от cross-communications, кросс-коммуникация) — это механизм взаимодействия между задачами. Задачи в Apache Airflow изолированы и могут запускаться на разных машинах, поэтому XCom является тем средством, которое позволяет им “разговаривать” друг с другом.
Перед тем, как мы начнем, важно помнить: XCom предназначен для передачи небольших данных. Не передавайте через него данные большого размера, например, какой-нибудь DataFrame. Это объясняется тем, что XCom-данные переносятся через СУБД, например, MySQL, который ограничен 64 КБ. Подробнее о других ограничениях этой технологии читайте в нашей новой статье.
XCom идентифицируются по ключу, у которого есть значение. Передача и прием сообщения (значение ключа) осуществляется через методы экземпляра задачи (task instance): xcom_push
и xcom_pull
. Допустим, что имеется 2 задачи, которые реализуют функции Python. Первая задача генерирует код сигнала такой, что если 0, то всё хорошо; все остальные значения говорят о том, что произошла ошибка. Вторая задача получает этот код и производит какие-то действия в ответ. Так вот код сигнала можно передавать по XCom, а задачи будут его проверять. Ниже на примере Python Operator показано, как это может выглядеть.
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.utils.dates import days_ago default_args = { "owner": "roman", } def get_signal():https://bigdataschool.ru/blog/dynamic-dag-generation-in-airflow.html return 0 def process_signal(**kwargs): ti = kwargs["ti"] ti.xcom_push("signal", get_signal()) def result(ti): status = ti.xcom_pull(task_ids="check-status", key="signal") if status == 0: print("Not ok") with DAG( "xcomguide", default_args=default_args, schedule_interval="daily", start_date=days_ago(2), ) as dag: check_status = PythonOperator( task_id="check-status", python_callable=process_signal ) res = PythonOperator( task_id="result", python_callable=result ) check_status >> res
В этом примере можно заметить, что экземпляр задачи можно достать либо через **kwargs
, либо первым аргументом функции. В параметре task_ids
метода xcom_pull
указывается не имя функции, которая передавала пару ключ-значение, а идентификатор задачи.
Data Pipeline на Apache Airflow
Код курса
AIRF
Ближайшая дата курса
19 марта, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Также XCom используется в TaskFlow API, о котором мы говорили тут. В отличие от традиционного XCom, передача информации осуществляется через возвращаемое значение, поэтому его называют более Pythonic — соответсвующий стилю программирования на Python.
Apache Airflow предоставляет ещё один механизм передачи и приема значений задачами под названием переменные (variables).
Что такое переменные в Apache Airflow и зачем они нужны
Возможность передачи/приема информации через XCom имеется у каждого экземпляра задачи. XCom предназначен для взаимодействия внутри одного DAG’а, в то время как переменные являются глобальными, предназначены для общей конфигурации и существуют только во время выполнения (runtime).
Как мы уже говорили в статье Apache Airflow позволяет создавать динамические графы. Примером динамического графа может быть создание задач на основе изменяющегося списка имен файлов, т.е., задачи создаются на основе этих имен файлов. Тогда где хранить этот список с именами? Наиболее простой вариант — в переменных.
Другой пример. Некоторые задачи требуют определенные параметры конфигурации, например, KubernetesPodOperator
. Вместо ручного создания значений параметров, можно просто определить переменную через JSON.
Итак, переменные полезны для хранения и извлечения данных во время выполнения и позволяют избежать ручного задания значений и повторений кода в DAG`ах.
Как создавать переменные
Переменные представляются также, как и XCom, в виде пары ключ-значение, а устанавливаются либо через пользовательский интерфейс, либо через командую строку.
Через пользовательский интерфейс переменные создаются через вкладку Admin->Variables. Можно также их задавать через командую строку. Например, установка (и создание, если переменной с таким ключом нет) переменной осуществляется через ключ set
:
airflow variables set my_key "1"
Теперь мы можем экспортировать эту переменную в файл JSON:
airflow variables export vars.json
Импорт из файла, очевидно, осуществляется через команду import
.
Использование переменных в коде
Достать переменные в самом коде можно через класс Variable
или через шаблоны Jinjа. Например, в функции, которая реализует одна из задач выше, можно дописать следующее:
from airflow.models import Variable def result(ti): my_var = Variable.get("my_key")
Тип получаемого значения является str
. Чтобы изменить его (если у вас не строка), установите параметр deserialize_json=True
, который распарсит возвращаемое значение.
Код курса
ADH-AIR
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Через шаблоны переменные также просто достаются:
ba = BashOperator( task_id="ba", bash_command="echo my_key = {{ var.value.my_key }}" )
Раз уж мы затронули Bash Operator, то значения XCom извлекаются схожим образом:
fetching_data = BashOperator( task_id='fetching_data', bash_command="echo {{ ti.xcom_pull(task_ids='check-status') }}", )
О том, как организовать обмен данными между задачами DAG в Apache AirFlow без традиционного XCom, читайте в нашей новой статье. А Ещё больше подробностей о работе Apache AirFlow на практических примерах вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:
Источники