Сериализация в Apache AirFlow

Apache AirFlow примеры курсы обучение, Apache AirFlow развертывание администрирование оптимизация, Apache AirFlow для дата-инженеров и администраторов, Школа Больших данных Учебный центр Коммерсант

Как Apache AirFlow сериализует и десериализует данные, зачем с версии 2 включена обязательная сериализация DAG в JSON, почему для пользовательской сериализации рекомендуются словари или примитивы и что поможет сократить нагрузку на базу данных метаданных через настройку параметров сериализации в конфигурационном файле фреймворка.

Сериализация данных в Apache AirFlow

Чтобы сохранить данные в файле или передать по сети, их необходимо сериализовать, т.е. перевести из исходной структуры в линейную форму. Обратный процесс восстановления сериализованного объекта в исходную структуру данных называется десериализацией. Оба этих процесса многократно выполняются в любой распределенной системе. В Apache AirFlow данные сериализуются для обмена аргументами между задачами и десериализуются, когда это требуется в нижестоящей задаче. Сериализация также происходит так, что веб-серверу и планировщику не приходилось читать файл DAG. Python, на котором написан AirFlow, из коробки поддерживает только сериализацию примитивов, таких как str и int, и циклически проходит по итерируемым объектам. Для более сложных структур данных используется пользовательская сериализация. Airflow поддерживает три способа пользовательской сериализации:

  • примитивы возвращаются как есть, без дополнительного кодирования;
  • если это не примитив или его итерируемый объект, Airflow ищет зарегистрированный сериализатор и десериализатор в пространстве имен serialization.serializers. Если он не найден, фреймворк ищет метод serialize() или, в случае десериализации, метод deserialize(data, version: int).
  • если класс декорирован @dataclass или @attr.define, он будет использовать публичные методы для этих декораторов.

Если необходимо расширить Airflow новым сериализатором, нужно выбирать наиболее подходящий способ сериализации. Объекты, которые находятся под контролем Airflow, т. е. находящиеся в пространстве имен airflow., например, airflow.model.dag.DAG или под контролем разработчика, например, my.company.Foo, следует декорировать с помощью @attr.define или @dataclass. Если это невозможно, то следует самостоятельно реализовать методы serialize() и deserializeserialize(). Метод должен возвращать примитив или словарь (dict), ключи которого должны быть примитивными. Вообще в Airflow рекомендуется использовать встроенные функции словарей как можно чаще вместо классов и других сложных пользовательских структур данных. Напомним, в Python словари являются ассоциативными массивами или хэш-таблицами. Это неупорядоченные коллекции произвольных объектов с доступом по ключу, которые имеют множество специальных методов, а также поддерживают встроенные функции Python.

Объекты, которые не находятся под контролем Airflow, а относятся к общим библиотекам Python, например, numpy.int16, требуют зарегистрированного сериализатора и десериализатора, включая управление версиями. Примитивы, за исключением bytes, могут быть возвращены как словари, ключи которых должны быть примитивами. При этом значения таких словарей не должны быть сериализованы. В случае реализации зарегистрированных сериализаторов следует избегать циклических импортов, используя строки с указанием пространства имен для заполнения списка сериализаторов. Например, serializers = [«my.company.Foo»] вместо serializers = [Foo].

Однако, в AirFlow выполняется не только сериализация пользовательских типов данных. Также этот ETL-оркестратор выполняет сериализацию DAG, что мы разберем далее.

Сериализация DAG

Сериализация DAG — это механизм в Apache Airflow, который позволяет преобразовывать метаданные направленного ациклического графа (DAG) в формат JSON. Этот процесс полезен для снижения зависимости планировщика от файлов DAG, что повышает общую производительность и надежность планировщика. Когда функция сериализации DAG включена, веб-сервер Airflow считывает сериализованные DAG из базы метаданных вместо загрузки из файлов DAG. Такой подход значительно сокращает время загрузки веб-сервера Airflow, особенно при наличии большого количества DAG. Планировщик Airflow сохраняет сериализованные DAG в таблице serialized_dag базы метаданных. Затем веб-сервер извлекает сериализованные DAG из этой таблицы. Начиная с версии 2.0.0, планировщик использует сериализованные DAG для согласованности и принимает решения о планировании. Это обязательно и не может быть отключено. Без сериализации DAG и сохранения в БД, веб-серверу и планировщику требовался доступ к файлам DAG. С помощью сериализации DAG веб-сервер отделен от анализа DAG, что ускоряет его работу.

Сериализация DAG в Apache AirFlow
Сериализация DAG в Apache AirFlow

При использовании сериализации DagFileProcessorProcess планировщик анализирует файлы DAG, сериализует их в формате JSON и сохраняет в базе данных метаданных в качестве модели SerializedDagModel. Далее веб-сервер вместо повторного анализа файлов DAG считывает их сериализованные объекты в JSON, десериализует их, создает DagBag и использует его для отображения в пользовательском интерфейсе. Планировщику не нужны фактические DAG для принятия решений по планированию, поскольку сериализованные DAG содержат всю необходимую информацию. Вообще JSON является частью стандартной библиотеки Python с Python 2.5. Этот текстовый формат довольно популярен и часто используется для передачи данных по сети. Поскольку его система типов моделирует JavaScript, она довольно ограничена и не может автоматически сериализовать пользовательские классы. Справиться с этим ограничением позволяют пользовательские сериализаторы, реализуемые с помощью словарей, что было рассмотрено выше.

Возвращаясь к сериализации DAG, отметим, что она позволяет вместо загрузки всего DagBag при запуске веб-сервера AirFlow загружать каждый DAG только по требованию из таблицы serialized_dag базы данных метаданных. Это помогает сократить время запуска веб-сервера и потребление памяти. Такое сокращение особенно заметно при большом количестве DAG. Можно включить сохранение исходного кода в базе данных, чтобы сделать веб-сервер полностью независимым от файлов DAG. Это не обязательно, если файлы входят в Docker-образ или их можно иным образом предоставить веб-серверу. Данные хранятся в модели DagCode для отображения полей шаблона. Когда включена сериализация, шаблоны не отображаются в запросах, но копия содержимого поля сохраняется до выполнения задачи на рабочем процесcе. Эти данные хранятся в модели RenderedTaskInstanceFields. Чтобы ограничить чрезмерный рост базы данных, сохраняются только самые последние записи, а старые записи очищаются.

Чтобы настроить сериализацию DAG в AirFlow, необходимо внести изменения в конфигурационный файл airflow.cfg, установив там следующие параметры:

  • min_serialized_dag_update_interval – минимальный интервал (в секундах), после которого сериализованные DAG должны обновляться в ьазе данных метаданных. Это помогает снизить нагрузку на внутреннюю базу данных AirFlow.
  • min_serialized_dag_fetch_interval — частота повторной выборки сериализованного DAG из базы данных, когда он уже загружен в DagBag на веб-сервере. Этот параметр позволяет регулировать нагрузку на базу данных метаданных за счет отображения кэшированной версии DAG, которая может быть неактуальной на какой-то момент времени.
  • max_num_rendered_ti_fields_per_task — максимальное количество полей экземпляра визуализированной задачи (полей шаблона) для каждой задачи, которые необходимо сохранить в базе данных.
  • compress_serialized_dags – сжатие сериализованного DAG в базе данных. Это полезно, когда в кластере очень большие DAG. При включении этого параметра в значение True просмотр зависимостей DAG становится невозможен.

Чтобы использовать другую библиотеку JSON вместо стандартной библиотеки json, например ujson, необходимо ее сперва импортировать, а потом определить json-переменную в локальном файле настроек airflow_local_settings.py следующим образом:

import ujson 
json = ujson

Освойте администрирование и эксплуатацию Apache AirFlow для оркестрации пакетных процессов в задачах реальной дата-инженерии на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/serializers.html
  2. https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/dag-serialization.html
  3. https://www.restack.io/docs/airflow-faq-administration-and-deployment-dag-serialization-01
  4. https://proglib.io/p/kak-hranit-obekty-python-so-slozhnoy-strukturoy-moduli-pickle-i-dill-2020-04-30
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту