Динамическое сопоставление задач в Apache AirFlow 2.3

динамическое сопоставление задач AirFlow 2.3 новый релиз обзор, AirFlow обучение примеры курсы, AirFlow для дата-инженера, обучение инженер данных AirFlow, AirFlow конвейер обработки данных примеры курсы обучение, data pipeline AirFlow, Школа Больших Данных Учебный Центр Коммерсант

Недавно мы писали про Apache AirFlow 2.3.0 от 30 апреля 2022 года. Сегодня более подробно разберем одну из главных новинок этого релиза – динамическое сопоставление задач. Что это такое, как работает и зачем нужно дата-инженеру.

Что такое динамическое сопоставление задач в ETL-конвейере

Напомним, динамическое сопоставление задач (Dynamic Task Mapping) считается одной из ключевых фич свежего выпуска самого популярного batch-оркестратора. Эта новая функция позволяет программно через API запускать задачи на основе непредсказуемых входных условий API динамического сопоставления задач. Такая возможность нужна, когда заранее неизвестно, сколько элементов в коллекции (файлы в каталоге, контейнер в хранилище объектов или корзина в AWS S3) нужно запланировать и автоматически запустить на исполнение n задач для всех этих элементов с помощью AirFlow. Подобной функции нет в Spark на платформе Databricks, о чем мы рассказываем здесь.

До версии 2.3 реализовать это можно было следующими способами:

  • выделить фиксированное количество рабочих процессов AirFlow и поручить им извлечение и обработку элементов коллекции. Эта схема допускала параллельную обработку, но с неэффективным распараллеливанием из-за невозможности точно указать количество элементов для обработки. Администратору кластера AirFlow приходилось перераспределять, worker’ов для выполнения этого задания или разделять рабочую нагрузку между доступными worker’ами и обеспечивать их занятость. Но этот подход не позволял оптимально использовать встроенный параллелизм AirFlow и приводил к удорожанию и сложности из-за необходимости разработки и поддержки пользовательского кода.
  • создать монолитную задачу для обработки всех элементов коллекции по одному, используя только одного worker’а AirFlow вместо параллельной обработки всеми доступными рабочими процессами.

Динамическое сопоставление задач в новой версии позволяет планировщику AirFlow запускать задачи в зависимости от контекста. Причем само количество задач может меняться в зависимости от числа элементов в коллекции. К примеру, можно применить динамическое сопоставление задач для подключения к AWS S3, получения списка всех новых файлов и запуска отдельных экземпляров одной и той же задачи для каждого из них. При этом дата-инженеру не нужно самостоятельно распределять нагрузку по worker’ам: планировщик AirFlow автоматически оптимизирует доступный параллелизм.

Динамическое сопоставление задач позволяет не только распараллелить операции между доступными рабочими процессами AirFlow, но и упрощает повторный запуск отдельных задач (например, в случае сбоя), предоставляя улучшенное представление их результата. Это особенно удобно при анализе выполнения монолитной задачи обработки всех файлов в корзине S3, когда один или несколько шагов в ней не выполнились. В прошлых версиях AirFlow приходилось анализировать лог-файл, созданный этой монолитной задачей, чтобы определить, какой шаг не удался и почему. В версии 2.3 при сбое динамически сопоставляемой задачи создается дискретное предупреждение этого шага. Это ускоряет процесс отладки, позволяя дата-инженеру сосредоточиться на конкретной задаче, а не проверять весь конвейер целиком. При необходимости можно повторно поставить отказавшую задачу в очередь и запустить ее снова.

Рассмотрим еще один пример практический пример типового ETL-конвейера, который включает извлечение данных из облачного сервиса, их преобразование и загрузку в реляционную базу данных. Например, нужно получить список игровых персонажей вместе с их отличительными способностями через вызовы REST API, чтобы загрузить эти данные в таблицу хранилища данных Snowflake. Это можно реализовать несколькими задачами:

  • первая задача получает символьные данные (список пар «имя-значение») из REST API и сериализует этот список в CSV-файл в корзине S3;
  • вторая задача анализирует список CSV, чтобы извлечь каждую пару «имя-значение» персонажей, и передает их, например, через API AirFlow TaskFlow следующей динамически отображаемой задаче.

На этом этапе планировщик AirFlow ставит в очередь одну и ту же задачу множество раз  — столько, сколько персонажей имеется в полученном списке, т.е. для каждой пары «имя-значение». Благодаря динамическому сопоставлению каждая задача выполняется параллельно, а планировщик распределяет их между доступными рабочими процессами. Каждый рабочий процесс сериализует свою пару «имя-значение» в виде отдельный CSV-файл в подкаталоге той же корзины S3. Оттуда планировщик ставит в очередь n CSV-файлов как одну задачу и использует оператор S3toSnowflakeOperator для извлечения пар «имя-значение» и передачи их в REST API Snowflake. Наконец, база данных Snowflake создает экземпляры этих данных в указанной таблице.

Таким образом, теперь дата-инженер может сопоставлять задачи с переменными, конфигурациями DAG и таблицами базы данных — вместо ручного вызова оператора для выполнения задач, AirFlow динамически изменяет топологию пользовательского рабочего процесса во время выполнения. Какие изменения были реализованы для этого, рассмотрим далее.

Подкапотные изменения AirFlow 2.3 для реализации Dynamic Task Mapping

Для поддержки динамического сопоставления задач шаблоны по умолчанию для логирования экземпляров задач (лог-файлы и elasticsearch log_id) были изменены. Если конфигурация содержит старые значения по умолчанию, они будут обновлены на месте. Для использования новых значений конфигурации по умолчанию следует удалить эти параметры в конфигурационном файле airflow.cfg:

  • [core] log_filename_template: {{ ti.dag_id }}/{{ ti.task_id }}/{{ ts }}/{{ try_number }}.log
  • [elasticsearch] log_id_template: {dag_id}-{task_id}-{execution_date}-{try_number}

В AirFlow 2.3 параметр [core] log_filename_template по умолчанию использует стиль партиционирования Hive dag_id=<id>/run_id=<id>, что может вызвать проблемы в некоторых старых файловых системах FAT. Чтобы избежать этого, нужно изменить шаблон лога.

После настройки шаблонов следует убедиться, что они содержат {{ ti.map_index }}, чтобы использовать динамическое сопоставление задач.

Кроме того, в рамках динамического сопоставления задач добавлен map_index в строку XCom для поддержки удаленной части API. Для этого изменен интерфейс для BaseOperatorLink, чтобы принимать TaskInstanceKey в качестве аргумента ключевого слова ti_key, поскольку сочетание execute_date + task больше не является уникальным для сопоставленных операторов. Соответствующим образом изменен метод get_link BaseOperatorLink(), чтобы принимать аргумент ключевого слова ti_key.

Читайте в нашей новой статье, как функция динамического сопоставления задач AirFlow полезна в MLOps для оркестрации конвейеров машинного обучения.  

Освойте все тонкости администрирования и эксплуатации Apache AirFlow для организации ETL/ELT-процессов в аналитике больших данных вы можете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://www.astronomer.io/blog/apache-AirFlow-2-3-everything-you-need-to-know
  2. https://AirFlow.apache.org/blog/AirFlow-2.3.0/
  3. https://AirFlow.apache.org/docs/apache-AirFlow/2.3.0/release_notes.html#AirFlow-2-3-0-2022-04-30
Поиск по сайту