Программный запуск DAG Apache AirFlow через REST API

Программный запуск DAG Apache AirFlow через REST API

Сегодня в рамках обучения дата-инженеров рассмотрим, как программно запустить DAG в Apache AirFlow через вызовы REST API. А также повторим основы интеграционного взаимодействия ИС через отправку HTTP-запросов к конечным точкам.

 Как устроен REST API в Apache AirFlow

Напомним, начиная с выпуска 2 Apache Airflow включает стабильный RESTfull API версии 1.0.0 для доступа к своему объекту или запуска DAG через отправку HTTP-запросов к конечным точкам. Это пригодится, например, в задаче интеграционного тестирования DAG, о чем мы писали здесь. Чтобы упростить работу дата-инженера, Apache Airflow поддерживает ряд конечных точек REST API для своих объектов. Большинство из них принимают JSON в качестве входных данных и возвращают ответы в этом же формате. Поэтому надо добавить в запрос следующие заголовки:

Content-type: application/json
Accept: application/json

Напомним, REST как самоописательный стиль интеграции информационных систем, работает по принципу запрос-ответ по протоколу HTTP. Запросы посылаются к конечным точкам (URL), которые предоставляют доступ к ресурсам. В REST API Apache AirFlow термин ресурс относится к одному типу объекта в метаданных фреймворка. API разбивается по соответствующему ресурсу его конечной точки. Имя ресурса обычно имеет множественное число и выражается в стиле camelCase, например, dagRuns. Имена ресурсов используются как часть URL-адресов конечных точек, а также в параметрах и ответах API. Между именем параметра URL и именем поля должна быть согласованность. Имена полей выражаются в стиле snake_case, например:

{
  "name": "string",
  "slots": 0,
  "occupied_slots": 0,
  "used_slots": 0,
  "queued_slots": 0,
  "open_slots": 0
}

Как и любой RESTfull-сервис, Apache AirFlow поддерживает CRUD-операции (Create, Read, Update и Delete) для большинства ресурсов, хотя некоторые конечные точки имеют особое поведение в виде исключений. В общем случае реализация CRUD-операций в REST API выглядит следующим образом:

  • чтобы создать ресурс, надо отправить HTTP-запрос POST с необходимыми метаданными ресурса в теле запроса. Ответ возвращает код ответа 201 Created при успешном использовании метаданных ресурса, включая его внутренний идентификатор, в теле ответа.
  • Для чтения ресурса или для вывода списка ресурсов используется HTTP-запрос GET. Для чтения определенного ресурса в запросе отправляется его идентификатор. Ответ обычно возвращает код ответа 200 OK в случае успеха с метаданными ресурса в теле ответа. Если запрос GET не включает конкретный идентификатор ресурса, он рассматривается как запрос списка. Ответ обычно возвращает код ответа 200 OK в случае успеха с объектом, содержащим список метаданных ресурсов в теле ответа. В параметрах запроса можно задать смещение, после которого надо возвращать объекты (offset) и максимальное количество объектов для выборки (limit), по умолчанию равное 25. Например,
v1/connections?limit=25&offset=25
  • Для обновления ресурса его идентификатор нужно указать в HTTP-запросе PATCH с полями, которые следует изменить в теле запроса. Ответ обычно возвращает код ответа 200 OK в случае успеха с информацией об измененном ресурсе в теле ответа.
  • Удалить ресурс можно, указав его идентификатор в HTTP-запросе DELETE. Ответ обычно возвращает код ответа 204 No Content в случае успеха.

Маска обновления доступна в качестве параметра запроса в конечных точках для PATCH-запросов. Она используется для уведомления API, какие поля нужно обновить. Использование update_mask упрощает обновление объектов, точечно указывая серверу, какие поля в объекте следует обновить, вместо того обновления всех полей. Запрос на обновление игнорирует любые поля, не указанные в маске поля, оставляя их текущими значениями, например:

resource = request.get('/resource/my-id').json()
  resource['my_field'] = 'new-value'
  request.patch('/resource/my-id?update_mask=my_field', data=json.dumps(resource))

Вспомнив основы REST API и особенности его реализации в Apache AirFlow, рассмотрим, как использовать это на практическом примере для запуска DAG.

Программный запуск DAG

Чтобы работать с REST API в Apache AirFlow, следует прежде всего включить его, отредактировав файл конфигурации, так как по умолчанию фреймворк не принимает никаких запросов через этот интерфейс. В файле конфигурации airflow.cfg и следует установить нужный бэкенд аутентификации auth_backends. Например, базовая аутентификация по логину и паролю для пользователей, созданных с помощью входа в систему LDAP или в базе данных метаданных Airflow с использованием пароля, устанавливается следующим образом:

# auth_backends = airflow.api.auth.backend.session
auth_backends = airflow.api.auth.backend.basic_auth

Напомним, Airflow поддерживает несколько методов аутентификации (Kerberos, Basic), и можно добавить свой собственный. Проверить, какой метод аутентификации используется сейчас, поможет следующая команда:

$ airflow config get-value api auth_backends
airflow.api.auth.backend.basic_auth

Далее следует включить совместное использование ресурсов между источниками (Cross-Origin resource Sharing (CORS) – это функция безопасности браузера, которая ограничивает HTTP-запросы, инициированные сценариями, запущенными в браузере. Для этого в конфигурационном файле airflow.cfg следует установить значения параметрам access_control_allow_headers, access_control_allow_methods и access_control_allow_origins в разделе [api], например:

[api]
access_control_allow_headers = origin, content-type, accept
access_control_allow_methods = POST, GET, OPTIONS, DELETE
access_control_allow_origins = https://exampleclientapp1.com https://exampleclientapp2.com

Далее можно протестировать API, перечислив все доступные DAG через GET-запрос на конечной точке /api/v1/dags. При использовании AirFlow на локальном хосте GET-запрос надо направить на конечную точку http://localhost:8080/api/v1/dags, предварительно установив авторизацию.

Затем нужно создать Python-файл DAG и поместить его в папку dags. Идентификатор запуска DAG (dag_run_id) может быть любым (строка или текст). Наконец, нужно получить доступ к конфигурации в коде или Python-функции. Можно получить доступ к переменным, которые установлены в конфигурации, переданной в запросе. Для этого следует написать Provide_context=True в функции PythonOperator и получить к ней доступ с помощью kwargs[‘dag_run’].conf.get(«имя переменной»). Чем Apache AirFlow Отличается от AWS Step functions, читайте в нашей новой статье.

Освойте администрирование и эксплуатацию Apache AirFlow для аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

[elementor-template id=»13619″]

Источники

  1. https://deeputyagi39.medium.com/how-to-trigger-airflow-dag-using-rest-api-dd40e3f7a30d
  2. https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref/
  3. https://airflow.apache.org/docs/apache-airflow/stable/security/api/