Модульные тесты для DAG в Apache Airflow: примеры и лучшие практики

модульное тестирование DAG Apache AirFlow, Apache AirFlow примеры курсы обучение, обучение дата-инженеров, инженер данных курсы примеры обучение, обновления airflow example, инженерия данных с Apache AirFlow пример, обучение большим данным, Школа Больших Данных Учебный центр Коммерсант

Благодаря возможности написать собственный Python-код для операторов и задач DAG’ов, Apache Airflow позволяет разработчикам Data Flow и инженерам данных создавать сложные и эффективные конвейеры пакетной обработки данных. Обеспечить надежность этого многообразия поможет качественное тестирование пользовательского кода. Рассмотрим примеры и рекомендации по написанию модульных тестов.

Зачем тестировать DAG AirFlow?

Модульные тесты являются основой тестирования: они должны быстро выполняться, покрывать как можно большую часть кода и описывать его поведение в различных вариантах использования, чтобы облегчить понимание написанного другим разработчикам.  Для конвейера обработки данных в виде единого направленного графа (DAG, Directed Acyclic Graph) Airflow также необходимы модульные тесты, чтобы упростить поддержку, добавление или изменение цепочки задач.

Чтобы убедиться, что код DAG’а Airflow написан корректно, необходимо проверить следующее:

  • DAG создается правильно, с верной установкой даты старта, окончания и других параметров (start_date, end_date, catchup и пр.);
  • задачи поставлены правильно;
  • задачи запускаются согласно правилам.

К примеру, если нужно создать DAG с определенными диапазонами дат, выполнить дополнительные прогоны или запустить их в конкретный интервал расписания, все эти условия могут быть явно зафиксированы модульными тестами. Модульные тесты могут подтвердить правильный порядок конвейера, что что задачи запускаются по верным правилам. Например, чтобы нижестоящая задача запускалась только после успешного завершения вышестоящей. По умолчанию правило триггера задачи Airflow установлено в значение all_success, что соответствует указанному правилу. Но можно также задокументировать это поведение с помощью модульных тестов.

Кроме того, Airflow позволяет запускать DAG с конфигурацией JSON, разрешая пользователю указывать различную конфигурацию времени выполнения для разных запусков цепочки задач. Во время модульных тестов нужно будет создать контекст запуска DAG для проверки различных потоков кода в зависимости от конфигурации.

Data Pipeline на Apache Airflow

Код курса
AIRF
Ближайшая дата курса
27 ноября, 2024
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.

В качестве примера разберем несколько рекомендаций для модульных тестов пользовательских операторов и хуков Airflow, с помощью которых дата-инженер может решать специализированные задачи. На практике при разработке пользовательских операторов часто нужно настроить соединения, переменные или контексты запуска DAG с помощью конфигурации JSON. Иногда пользовательский оператор использует перехватчик (hook) Airflow, а разработчику нужно убедиться в его корректном использовании.

Хуки Airflow обычно используются операторами в качестве абстракции от внешних сервисов, используемых оператором. Соединения или подключения хранят конфиденциальную информацию, которая используется при аутентификации на внешних ресурсах. Каждое подключение имеет идентификатор (conn_id), который используется хуками. Обычно в хуке извлекается необходимое соединение после инициализации (__init__). Поскольку получение соединения Airflow с помощью функции get_connection означает подключение к базе данных, рекомендуется инициализировать хук только во время выполнения функции оператора, чтобы не перегружать СУБД дополнительными подключениями. На практике это означает, что во время модульных тестов оператора Airflow следует будет имитировать инициализацию хука Airflow. Как это сделать, рассмотрим далее.

Модульное тестирование хуков и соединений

Есть два способа организовать соединения Airflow в начале модульных тестов — с помощью переменных среды или с помощью ручного исправления функции get_connection. Ручной способ организации подключения не рекомендуется. В этом случае для получения соединения применяется метод get_connection, унаследованный от класса BaseHook: нужно написать много кода самостоятельно, что чревато ошибками при работе с несколькими подключениями в одном потоке тестируемого кода, чтобы обрабатывать возврат различных соединений на основе заданного conn_id.

Рекомендуемым способом для тестирования соединений является использование переменных среды. Переменные хранят неконфиденциальную информацию и работают как простое хранилище ключей и значений. Переменные называются по их ключу и часто используются для хранения настроек конфигурации или произвольного статического содержимого. Переменные среды хранятся в виде строк, поэтому соединение Airflow сохраняется в формате URI. При разработке тестов для кода, использующего переменные или соединение, следует убедиться, что они существуют при запуске тестов. Для этого их можно сохранить в базе данных, чтобы прочитать во время выполнения кода. Но чтение и запись объектов в базу данных требуют дополнительных затрат времени. Для ускорения выполнения теста стоит смоделировать существование этих объектов, не сохраняя их в БД. Для этого вы можете создать переменные среды с имитацией os.environ, используя unittest.mock.patch.dict(). Такое решение считается лучшей практикой согласно документации Airflow.

Для задания переменной используется AIRFLOW_VAR_{KEY}:

with mock.patch.dict("os.environ", AIRFLOW_VAR_KEY="env-value"):
assert "env-value" == Variable.get("key")

Для соединения используется AIRFLOW_CONN_{CONN_ID}:

conn = Connection(
conn_type="gcpssh",
login="cat",
host="conn-host",
)
conn_uri = conn.get_uri()
with mock.patch.dict("os.environ", AIRFLOW_CONN_MY_CONN=conn_uri):
assert "cat" == Connection.get("my_conn").login

Пример модульного теста для проверки корректной загрузки DAG может выглядеть следующим образом:

import pytest
from airflow.models import DagBag
@pytest.fixture()

def dagbag():
    return DagBag()

def test_dag_loaded(dagbag):
    dag = dagbag.get_dag(dag_id="hello_world")
    assert dagbag.import_errors == {}
    assert dag is not None
    assert len(dag.tasks) == 1

Читайте в нашей новой статье про тестирование структуры DAG и использование шаблонов Jinja. А освоить администрирование и эксплуатацию Apache AirFlow для организации ETL/ELT-процессов в аналитике больших данных вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/@jw_ng/writing-unit-tests-for-an-airflow-dag-78f738fe6bfc
  2. https://medium.com/@jw_ng/writing-unit-tests-for-airflow-custom-operators-sensors-and-hooks-9771d21fe3b9
  3. https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#mocking-variables-and-connections
Поиск по сайту