Сегодня в рамках обучения дата-инженеров рассмотрим, как программно запустить DAG в Apache AirFlow через вызовы REST API. А также повторим основы интеграционного взаимодействия ИС через отправку HTTP-запросов к конечным точкам.
Как устроен REST API в Apache AirFlow
Напомним, начиная с выпуска 2 Apache Airflow включает стабильный RESTfull API версии 1.0.0 для доступа к своему объекту или запуска DAG через отправку HTTP-запросов к конечным точкам. Это пригодится, например, в задаче интеграционного тестирования DAG, о чем мы писали здесь. Чтобы упростить работу дата-инженера, Apache Airflow поддерживает ряд конечных точек REST API для своих объектов. Большинство из них принимают JSON в качестве входных данных и возвращают ответы в этом же формате. Поэтому надо добавить в запрос следующие заголовки:
Content-type: application/json Accept: application/json
Напомним, REST как самоописательный стиль интеграции информационных систем, работает по принципу запрос-ответ по протоколу HTTP. Запросы посылаются к конечным точкам (URL), которые предоставляют доступ к ресурсам. В REST API Apache AirFlow термин ресурс относится к одному типу объекта в метаданных фреймворка. API разбивается по соответствующему ресурсу его конечной точки. Имя ресурса обычно имеет множественное число и выражается в стиле camelCase, например, dagRuns. Имена ресурсов используются как часть URL-адресов конечных точек, а также в параметрах и ответах API. Между именем параметра URL и именем поля должна быть согласованность. Имена полей выражаются в стиле snake_case, например:
{ "name": "string", "slots": 0, "occupied_slots": 0, "used_slots": 0, "queued_slots": 0, "open_slots": 0 }
Как и любой RESTfull-сервис, Apache AirFlow поддерживает CRUD-операции (Create, Read, Update и Delete) для большинства ресурсов, хотя некоторые конечные точки имеют особое поведение в виде исключений. В общем случае реализация CRUD-операций в REST API выглядит следующим образом:
- чтобы создать ресурс, надо отправить HTTP-запрос POST с необходимыми метаданными ресурса в теле запроса. Ответ возвращает код ответа 201 Created при успешном использовании метаданных ресурса, включая его внутренний идентификатор, в теле ответа.
- Для чтения ресурса или для вывода списка ресурсов используется HTTP-запрос GET. Для чтения определенного ресурса в запросе отправляется его идентификатор. Ответ обычно возвращает код ответа 200 OK в случае успеха с метаданными ресурса в теле ответа. Если запрос GET не включает конкретный идентификатор ресурса, он рассматривается как запрос списка. Ответ обычно возвращает код ответа 200 OK в случае успеха с объектом, содержащим список метаданных ресурсов в теле ответа. В параметрах запроса можно задать смещение, после которого надо возвращать объекты (offset) и максимальное количество объектов для выборки (limit), по умолчанию равное 25. Например,
v1/connections?limit=25&offset=25
- Для обновления ресурса его идентификатор нужно указать в HTTP-запросе PATCH с полями, которые следует изменить в теле запроса. Ответ обычно возвращает код ответа 200 OK в случае успеха с информацией об измененном ресурсе в теле ответа.
- Удалить ресурс можно, указав его идентификатор в HTTP-запросе DELETE. Ответ обычно возвращает код ответа 204 No Content в случае успеха.
Маска обновления доступна в качестве параметра запроса в конечных точках для PATCH-запросов. Она используется для уведомления API, какие поля нужно обновить. Использование update_mask упрощает обновление объектов, точечно указывая серверу, какие поля в объекте следует обновить, вместо того обновления всех полей. Запрос на обновление игнорирует любые поля, не указанные в маске поля, оставляя их текущими значениями, например:
resource = request.get('/resource/my-id').json() resource['my_field'] = 'new-value' request.patch('/resource/my-id?update_mask=my_field', data=json.dumps(resource))
Вспомнив основы REST API и особенности его реализации в Apache AirFlow, рассмотрим, как использовать это на практическом примере для запуска DAG.
Программный запуск DAG
Чтобы работать с REST API в Apache AirFlow, следует прежде всего включить его, отредактировав файл конфигурации, так как по умолчанию фреймворк не принимает никаких запросов через этот интерфейс. В файле конфигурации airflow.cfg и следует установить нужный бэкенд аутентификации auth_backends. Например, базовая аутентификация по логину и паролю для пользователей, созданных с помощью входа в систему LDAP или в базе данных метаданных Airflow с использованием пароля, устанавливается следующим образом:
# auth_backends = airflow.api.auth.backend.session auth_backends = airflow.api.auth.backend.basic_auth
Напомним, Airflow поддерживает несколько методов аутентификации (Kerberos, Basic), и можно добавить свой собственный. Проверить, какой метод аутентификации используется сейчас, поможет следующая команда:
$ airflow config get-value api auth_backends airflow.api.auth.backend.basic_auth
Далее следует включить совместное использование ресурсов между источниками (Cross-Origin resource Sharing (CORS) – это функция безопасности браузера, которая ограничивает HTTP-запросы, инициированные сценариями, запущенными в браузере. Для этого в конфигурационном файле airflow.cfg следует установить значения параметрам access_control_allow_headers, access_control_allow_methods и access_control_allow_origins в разделе [api], например:
[api] access_control_allow_headers = origin, content-type, accept access_control_allow_methods = POST, GET, OPTIONS, DELETE access_control_allow_origins = https://exampleclientapp1.com https://exampleclientapp2.com
Далее можно протестировать API, перечислив все доступные DAG через GET-запрос на конечной точке /api/v1/dags. При использовании AirFlow на локальном хосте GET-запрос надо направить на конечную точку http://localhost:8080/api/v1/dags, предварительно установив авторизацию.
Затем нужно создать Python-файл DAG и поместить его в папку dags. Идентификатор запуска DAG (dag_run_id) может быть любым (строка или текст). Наконец, нужно получить доступ к конфигурации в коде или Python-функции. Можно получить доступ к переменным, которые установлены в конфигурации, переданной в запросе. Для этого следует написать Provide_context=True в функции PythonOperator и получить к ней доступ с помощью kwargs[‘dag_run’].conf.get(«имя переменной»). Чем Apache AirFlow Отличается от AWS Step functions, читайте в нашей новой статье.
Код курса
ADH-AIR
Ближайшая дата курса
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Освойте администрирование и эксплуатацию Apache AirFlow для аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники