Как создать свой оператор и использовать обратные вызовы в Apache AirFlow

callback AirFlow custom operator, обратные вызовы Apache AirFlow, пользовательский оператор Apache AirFlow, обучение AirFlow, курсы AirFlow администратор кластера, AirFlow операторы DAG примеры курсы обучение, обучение инженеров данных Big Data, курсы дата-инженеров, Школа Больших Данных Учебный центр Коммерсант

Как написать пользовательский оператор Apache AirFlow и использовать его в DAG. А также чем хороши функции обратного вызова вместо XCom, и когда их не следует применять.

Создаем свой оператор AirFlow и используем его в DAG

Однажды мы уже разбирали, как создать свой оператор Apache AirFlow на примере сенсора – оператора специального вида, который предназначен для выполнения строго одной задачи в виде ожидания, когда что-то произойдет, например, загрузится файл, будет получен ответ от внешней системы и пр. Сегодня поработаем с обычными операторами на примере задачи загрузки файла. Хотя Apache AirFow предоставляет множество операторов для выполнения различных задач, иногда дата-инженеру приходится писать свои собственные.

Оператор AirFlow представляет собой Python-класс, решающий одну задачу в рабочем процессе, включая логику ее выполнения, входы, выходы и зависимости.

Чтобы разработать собственный оператор, нужно создать новый класс Python, который наследуется от класса BaseOperator, предоставленного AirFlow. Например, необходимо считать файл из удаленного места и записать его в локальный каталог. Для разработки собственного оператора в виде Python-скрипта надо сперва импортировать необходимые зависимости:

from airflow.models.baseoperator import BaseOperator
from airflow.utils.decorators import apply_defaults
from urllib.request import urlretrieve

Затем можно объявить новый класс своего оператора с именем DownloadFileOperator, который является потомком класса BaseOperator и принимает два параметра: URL-адрес удаленного файла для загрузки и локальный каталог для его сохранения.

class DownloadFileOperator(BaseOperator):
    @apply_defaults
    def __init__(self, source_url, destination_dir, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.source_url = source_url
        self.destination_dir = destination_dir
    def execute(self, context):
        self.log.info('Downloading file from %s', self.source_url)
        file_path, _ = urlretrieve(self.source_url)
        self.log.info('File downloaded to %s', file_path)
        self.log.info('Moving file to %s', self.destination_dir)
        shutil.move(file_path, self.destination_dir)
        self.log.info('File moved to %s', self.destination_dir)

В этом участке кода метод __init__ принимает сообщение параметра и сохраняет его как переменную экземпляра. Метод execute выполняет действия по считыванию и переносу файла: загружает файл с помощью функции urlretrieve из библиотеки urllib, а затем перемещает загруженный файл в указанный локальный каталог с помощью библиотеки Shutil. Декоратор @apply_defaults используется для применения значений по умолчанию к параметрам, переданным конструктору. Использование декоратора необязательно, но может пригодиться для указания значений по умолчанию для параметров создаваемого оператора.

После определения своего оператора, его следует зарегистрировать в AirFlow, чтобы использовать в DAG. Для этого нужно импортировать созданный класс оператора и создать его экземпляр, передав все необходимые аргументы. Далее можно добавить этот экземпляр в качестве задачи в DAG. При этом, как и в любом Python-скрипте, надо сперва импортировать необходимые зависимости:

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime
from custom_operators import DownloadFileOperator

Затем можно определить свой конвейер обработки данных, т.е. DAG:

dag = DAG(
    'download_file_dag',
    description='Download a file from a remote location and save it to a local directory',
    schedule_interval=None,
    start_date=datetime(2023, 3, 26),
)
start = DummyOperator(task_id='start', dag=dag)
download_file = DownloadFileOperator(
    task_id='download_file',
    source_url='https://example.com/file.txt',
    destination_dir='/path/to/local/directory',
    dag=dag,
)
end = DummyOperator(task_id='end', dag=dag)
start >> download_file >> end

В этом примере создается DAG с именем download_file_dag с задачей с именем download_file и двумя фиктивными задачами в качестве начала и конца. Экземпляр DownloadFileOperator создается с параметрами URL-адреса источника (source_url) и директории назначения (destination_dir), для которых заданы значения https://example.com/file.txt и /path/to/local/directory соответственно.

В GUI Apache Airflow созданный конвейер обработки данных будет выглядеть следующим образом:

GUI AirFlow DAG
Отображение DAG в GUI Apache AirFlow

Таким образом, создание собственного пользовательского оператора AirFlow может расширить функциональные возможности этого ETL-фреймворка в соответствии с конкретными потребностями дата-инженера. Следует всего лишь определить новый класс оператора и реализовать метод execute(), чтобы создать пользовательскую задачу, которую можно использовать в DAG, как и любой другой оператор.

Однако, при этом стоит помнить о рекомендации делать оператор идемпотентным, чтобы результаты его повторного вызова не создавали побочных эффектов в виде создания нового файла или рекурсивного удаления объектов. Разумеется, перед использованием пользовательского оператора в реальном проекте, его следует тщательно протестировать, о чем мы говорили здесь.

Читайте в нашей новой статье, как использовать самую модную на сегодня ИИ-технологию ChatGPT для создания своего пользовательского оператора AirFlow, работающего с OpenAPI.

Обратный вызов вместо XCom

Кроме создания пользовательских операторов также дата-инженеру необходимо организовать взаимодействие между ними. Разумеется, для этого можно использовать механизм XCom (Cross Communication), который позволяет обмениваться небольшим количеством данных между задачами в AirFlow, храня эти промежуточные данные в метаданных фреймворка. Однако, этот способ не универсален и имеет ограничения, связанные с размером передаваемых данных. Предельный размер XCom в Apache AirFlow зависит от базы данных, которая используется для хранения метаданных. В частности, для встроенной SQLite этот лимит равен 2 ГБ, для PostgreSQL 1 ГБ и 64 КБ для MySQL. Подробнее об этом мы писали здесь.

Вместо XCom можно использовать функцию обратного вызова on_success_callback() как на уровне оператора, так и на уровне DAG. Этот подход позволяет сделать DAG максимально простым и может быть обобщен для применения в нескольких конвейеров обработки данных.

Возвращаясь к ранее рассмотренному примеру, добавим в DAG и созданный оператор функцию обратного вызова:

dag = DAG(
    'download_file_dag',
    description='Download a file from a remote location and save it to a local directory',
    on_success_callback=on_success_callback,
    schedule_interval=None,
    start_date=datetime(2023, 3, 26),
)
start = DummyOperator(task_id='start', dag=dag)
download_file = DownloadFileOperator(
    task_id='download_file',
    source_url='https://example.com/file.txt',
    destination_dir='/path/to/local/directory',
    dag=dag,
    on_success_callback=on_success_callback,
)
end = DummyOperator(task_id='end', dag=dag)
start >> download_file >> end

Теперь DAG и оператор загрузки файла определяют функцию on_success_callback() и оба ее используют. Разумеется, надо определить саму эту функцию. Для простоты в этом определении выведем идентификатор экземпляра задачи и индекс сопоставления:

def on_success_callback(context, session=None):
    task_instance = context["ti"]

    print("------------")
    print("ON_SUCCESS_CALLBACK CALLED")
    print(task_instance.task_id)
    print(task_instance.map_index)
    print("------------")

Результаты работы созданного оператора и DAG, в котором он используется, можно посмотреть в GUI Apache AirFlow. В заключение отметим, что не рекомендуется применять одну и ту же функцию обратного вызова для DAG и оператора, если его поведение зависит от какого-либо (сопоставленного) свойства экземпляра задачи. Как обратные вызовы могут пригодиться при отладке и мониторинге конвейера обработки данных. читайте в нашей новой статье.

Освойте все возможности Apache AirFlow  для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://datageeks.medium.com/create-your-own-airflow-operator-ecbdf3f0b916
  2. https://medium.com/@MarinAgli1/a-quick-look-into-airflow-success-callback-functions-d140e60d3e67
Контакты авторизированного учебного центра
«Школа Больших Данных»
Адрес:
127576, г. Москва, м. Алтуфьево, Илимская ул. 5 корпус 2, офис 319, БЦ «Бизнес-Депо»
Часы работы:
Понедельник - Пятница: 09.00 – 18.00
Остались вопросы?
Звоните нам +7 (495) 414-11-21 или отправьте сообщение через контактную форму. Также вы можете найти ответы на ваши вопросы в нашем сборнике часто задаваемых вопросов.
Оставьте сообщение, и мы перезвоним вам в течение рабочего дня
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Или напишите нам в соц.сетях
Поиск по сайту