Внешний датчик в Apache Airflow для поэтапной загрузки данных в таблицы DWH

Apache AirFlow примеры курсы обучение, обучение дата-инженеров, инженер данных курсы примеры обучение, external sensor airflow example, инженерия данных с Apache AirFlow пример, обучение большим данным, Школа Больших Данных Учебный центр Коммерсант

Мы уже писали про датчики или сенсоры — особый тип операторов Apache AirFlow, предназначенных для ожидания какого-то события. Сегодня рассмотрим практический пример обучения дата-инженеров и разработчиков по использованию внешнего сенсора в рамках типовой задачи дата-инженерии по организации ETL/ELT-процессов при поэтапной загрузке данных в DWH для OLAP-систем.

Постановка задачи: поэтапная загрузка данных в таблицы DWH

Напомним, работа сенсоров в AirFlow сводится к проверке выполнения конкретных условий, например, загружен ли файл в корзину AWS S3 или добавлена ли новая запись в базу данных. Датчики могут стать отличным инструментом управления DAG’ами AirFlow по событиям. Рассмотрим пример реализации хранилища данных. Согласно устоявшимся подходам проектирования OLAP-систем, здесь чаще всего применяются схемы звезды или снежинки с таблицами измерений и фактов, куда переносятся данные из различных источников через таблицы предварительных этапов (stage). Также в процессе проектирования ETL/ELT создаются таблицы поиска, которые впоследствии используются в конвейерах данных. А таблицы Aggregate и Snapshot из таблицы фактов (Fact) нужны для агрегации результатов и их моментальных снимков. Все эти таблицы имеют определенный набор процессов. Например, чтобы заполнить моментальный снимок факта, нужно сначала заполнить таблицу Fact, для чего сперва следует поместить данные в stage и таблицу поиска. Аналогичные шаги необходимо проделать, чтобы внести данные в таблицы измерения.

Поэтому зависимость между порядком загрузки таблиц при настройке DWH становится очень важна. Можно изящно решить эту задачу с помощью внешнего датчика AirFlow, который проверяет состояние экземпляра задачи, находящегося в другом DAG. Если проверяемая задача успешно выполнена, то DAG с внешними датчиками просто идет вперед и выполняет последующей задачи конвейера. Так можно создать несколько DAG для загрузки разных таблиц, а затем применить к ним зависимости с помощью внешнего сенсора. Это делает код более модульным и простым в обслуживании.

Как использовать внешние датчики Apache AirFlow: практический пример

Внешний датчик как любой Python-оператор AirFlow принимает набор параметров. Для External sensor они такие:

  • external_dag_id (str) — указывает на DAG, которую следует дождаться;
  • external_task_id (str) — указывает на задачу, которую следует дождаться. Если не задано, сенсор ожидает DAG целиком.
  • allow_states (list) — список разрешенных состояний, по умолчанию «успешно» ([‘success’]);
  • execute_delta (datetime.timedelta) — разница во времени с предыдущим выполнением для просмотра. По умолчанию используется та же дата исполнения, что и у текущей задачи или DAG. Для вчерашнего дня используйте [positive!] datetime.timedelta(days=1). В ExternalTaskSensor можно передать execute_delta или execute_date_fn, но не оба одновременно.
  • execution_date_fn(callable) — функция, которая получает текущую дату выполнения и возвращает желаемые даты выполнения для запроса. В ExternalTaskSensor можно передать execute_delta или execute_date_fn, но не оба одновременно.
  • check_existence (bool) — значение True для этого параметра проверяет, существует ли внешняя задача, если задан external_task_id, или DAG, который нужно ожидать, если external_task_id имеет значение None. Если внешняя задача или DAG не существуют, ожидание немедленно прекращается. По умолчанию значение этого параметра равно False.

Если необходимо разработать собственный внешний сенсор AirFlow, он как Python-оператор будет ссылаться на существующий класс ExternalTaskSensor, который позволяет пользователям получать доступ к ожидаемому DAG. Класс имеет следующий синтаксис:

classairflow.sensors.external_task.ExternalTaskSensor(*, external_dag_id: str, external_task_id: Optional[str] = None, external_task_ids: Optional[Iterable[str]] = None, allowed_states: Optional[Iterable[str]] = None, failed_states: Optional[Iterable[str]] = None, execution_delta: Optional[datetime.timedelta] = None, execution_date_fn: Optional[Callable] = None, check_existence: bool = False, **kwargs)[source]

Итак, нужно, чтобы Task_A в DAG_A определял завершение Task_B в DAG_B.

AirFlow DAG
Зависимости между задачами в разных DAG

Если бы эти две задачи находились в одном DAG, можно было бы просто добавить эту строку кода в файл DAG: Task_A.set_upstream(Task_B). Но это решение не применимо, поскольку задачи находятся в разных файлах DAG. Поэтому определим ExternalTaskSensor в DAG_A, который определяет завершение Task_B в DAG_B.

external sensor AirFlow DAG example
Управление запуском задач в разных DAG с помощью внешнего сенсора Airflow

Пусть DAG_A находится в файле First.py, а DAG_B – соответственно, в файле Second.py. DAG_B, определенный в файле Second.py выполняется только после завершения DAG_A, определенного в файле First.py. Поэтому в Second.py задается датчик внешней задачи, который указывает на First.py.

Python-код для DAG_A (файл First.py):

import loggingimport airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperatorlogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)args = {“owner”: “airflow”, “start_date”: airflow.utils.dates.days_ago(1)}dag = DAG(
 dag_id=”First”, default_args=args, schedule_interval=’45 06 * * *’
)def pp():
 print(‘First Primary Task’)
with dag:
 first_task=PythonOperator(task_id=”first_task”, python_callable=pp,dag=dag)first_task

Python-код для DAG_B (файл Second.py):

import loggingimport airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
from datetime import datetime, timedeltalogging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)args = {“owner”: “airflow”, “start_date”: airflow.utils.dates.days_ago(1)}dag = DAG(
 dag_id=”Second”, default_args=args, schedule_interval=’55 06 * * *’
)def pp():
 print(‘Second Dependent Task’)
with dag:
 Second_Task=PythonOperator(task_id=”Second_Task”, python_callable=pp,dag=dag)ExternalTaskSensor(
 task_id=’Ext_Sensor_Task’,
 external_dag_id=’First’,
 external_task_id=’first_task’,
 execution_delta = timedelta(minutes=10),
 timeout=300,
 dag=dag)>>Second_Task

Когда запускается скрипт Second.py, он сначала проверяет выполнение скрипта First.py и, если тот завершен успешно, запускает задачу, определенную в Second.py.

Освойте администрирование и эксплуатацию Apache AirFlow для организации ETL/ELT-процессов в аналитике больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/@komal1491/airflow-external-sensor-a7e999ecadfd
  2. https://airflow.apache.org/docs/apache-airflow/stable/concepts/sensors.html
  3. https://medium.com/@fninsiima/sensing-the-completion-of-external-airflow-tasks-827344d03142
Поиск по сайту