Что такое backfill в Apache AirFlow и зачем дата-инженеру запускать эту команду CLI-интерфейса при управлении DAG. Разбираемся с параметрами, возможностями и исключениями.
Что такое backfill в Apache AirFlow и чем это полезно при управлении DAG
Иногда при управлении конвейерами обработки данных в Apache AirFlow дата-инженеру необходимо вернуться в прошлое, чтобы выполнить весь DAG или его отдельные задачи за прошедшие дни. Например, выполнить инкрементную загрузку новых или измененных данных в корпоративное зеро или хранилище, что мы рассматриваем здесь. Или, если DAG работает с начала месяца и к нему добавилась новая задача, ее нужно выполнить за предыдущий период. Для таких случаев в Apache AirFlow есть специальная команда backfill, которая повторно запускает все экземпляры dag_id для всех интервалов в пределах указанных дат начала и окончания. Также можно повторно запускать определенные задачи в рамках DAG.
Команда backfill вызывается через CLI-интерфейс и имеет следующий синтаксис:
airflow dags backfill [-h] [-c CONF] [--continue-on-failures] [--delay-on-limit DELAY_ON_LIMIT] [--disable-retry] [-x] [-n] [-e END_DATE] [-i] [-I] [-l] [-m] [--pool POOL] [--rerun-failed-tasks] [--reset-dagruns] [-B] [-s START_DATE] [-S SUBDIR] [-t TASK_REGEX] [--treat-dag-as-regex] [-v] [-y] dag_id
Рассмотрим более подробно параметры этой команды:
- dag_id — идентификатор DAG;
- -c, —conf — строка JSON, которая добавляется в атрибут конфигурации DagRun;
- —continue-on-failures – продолжение выполнения, даже если некоторые задачи не удалось выполнить, по умолчанию отключено;
- —delay-on-limit — время ожидания в секундах, когда будет достигнуто ограничение на максимальное количество активных запусков dag (max_active_runs), прежде чем пытаться снова выполнить запуск DAG. По умолчанию этот период установлен в значение 1,0.
- —disable-retry – маркировка задач как невыполненных без повторной попытки их запуска, по умолчанию отключено;
- -x, —donot-pickle – запуск определенной версии DAG, указанной в pickle — нативном сериализованном объекте Python, который сохраняется в базе данных на время выполнения задания. Поскольку DAG может вызываться из разных мест (пользовательские репозитории, основной репозиторий и пр.), а также выполняться на разных исполнителях, нужен источником достоверности для выполнения backfill-задания. По умолчанию это поведение отключено.
- -n, —dry-run – выполнение пробного прогона для каждой задачи, который проходит только по полям шаблона, по умолчанию отключено;
- —e, —end-date – переопределение даты окончания, заданной в формате ГГГГ-ММ-ДД;
- —i, —ignore-dependencies – пропуск восходящих задач и запуск лишь тех, которые соответствуют регулярному выражению task_regex, по умолчанию отключено;
- -I, —ignore-first-depends-on-past – игнорирование зависимости depend_on_past только для первого набора задач, но с их учетом при последующих выполнениях, по умолчанию отключено;
- -l, —local – запуск задачи с помощью локального исполнителя (LocalExecutor), по умолчанию отключено;
- -m, —mark-success – маркировка задач как выполненных без их фактического запуска, по умолчанию отключено;
- —pool — пул ресурсов для использования;
- —rerun-failed-tasks – автоматический перезапуск всех невыполненных задач для заданного диапазона дат вместо создания исключений, по умолчанию отключено;
- —reset-dagruns – удаление существующих запусков DAG, связанных с backfill, и новый запуск с новыми DAGRun, по умолчанию отключено;
- -B, —run-backwards – выполнение задач в обратном порядке (с самого последнего дня). Но если есть задачи, которые имеют зависимости depend_on_past, эта опция, отключенная по умолчанию, вызовет исключение;
- —s, —start-date – переопределение даты начала (start_date) в формате ГГГГ-ММ-ДД;
- —S, —subdir — расположение файла или каталог, где находится файл DAG. По умолчанию [AIRFLOW_HOME]/dags, где [AIRFLOW_HOME] — это значение, установленное для конфигурации AIRFLOW_HOME в конфигурационном файле cfg.
- -t, —task-regex — регулярное выражение для фильтрации конкретных идентификаторов задач для backfill;
- —treat-dag-as-regex – определение dag_id как регулярного выражения вместо точной строки, по умолчанию отключено;
- -v, —verbose – подробное логгирование, по умолчанию отключено;
- -y, —yes – запрос подтверждения, по умолчанию отключено.
Таким образом, команда backfill позволяет запустить задачу или подраздел DAG для указанного диапазона дат. Если используется опция reset_dag_run, AirFlow сперва спросит дата-инженера, надо ли очистить все предыдущие экземпляры dag_run и task_instances в указанном диапазоне дат. При задании опции rerun_failed_tasks, backfill автоматически повторно запустит предыдущие неудачно выполненные экземпляры задачи в заданном диапазоне дат.
По умолчанию AirFlow будет запускать любые прошлые запланированные интервалы, которые не были запущены, заполняя прошлые данные. Чтобы избежать этого, нужно явно передать параметр catch=False в определении DAG:
dag = DAG( dag_id="dag_no_catchup", start_date=dt.datetime(year=2019, month=1, day=1), schedule_interval="@daily", end_date=dt.datetime(year=2019, month=1, day=5), catchup=False, )
Однако, команда backfill имеет очевидное ограничение доступности данных в источнике. Например, если источник данных предоставляет данные за период до 30 дней, можно запустить обработку данных только за эти 30 дней.
Backfill может пригодиться для повторной обработки данных, например, когда после изменения бизнес-логики надо вычислить новую статистику. Для этого в AirFlow следует очистить прошлый запуск задачи calc_statistics и запустить ее снова.
Как уже было отмечено выше, с помощью backfill можно запускать выполнение как всего DAG
airflow backfill my_example_dag -s [start_date] -e [end_date]
так и отдельных его задач, используя флаг –t:
airflow backfill my_example_dag -s [start_date] -e [end_date] -t [task_id]
А чтобы выполнить только одну указанную задачу и не запускать никаких восходящих задач, следует указать флаг –i, который работает только если указан флаг –t.
Для применения команды backfill к целому DAG или его отдельным задачам, он должен находиться в активном состоянии.
В заключение отметим, как бороться с исключением AirflowException(«You cannot use the —pickle option when using DAG.cli() method.»), которое возникает из-за наличия нескольких версий DAG в базе данных AirFlow. Чтобы решить эту проблему, следует указать флаг —x или —donot-pickle. С этой опцией AirFlow будет запускать команду backfill как есть, без учета чтения состояния DAG в базе данных, сохраненной в виде сериализованного Python-объекта pickle, хранящейся в базе данных на время выполнения задания.
Таким образом, backfill можно назвать очень полезной для дата-инженера функцией Apache AirFlow, которая позволяет повторно запускать целый DAG или его отдельные задачи за прошедший период. О другом значении термина backfilling в кластерной среде при управлении заданиями читайте в нашей новой статье.
Узнайте больше про администрирование и эксплуатацию Apache AirFlow для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники