В прошлый раз мы говорили о способе взаимодействия задач между собой в Apache Airflow. Сегодня поговорим о таких сущностях, как соединение (connections) и хуки (hooks). Читайте в этой статье: что такое хук и соединение, как создать и скачать соединение, а также как подключить базу данных в Airflow.
Что такое связи и хуки в Apache Airflow
Соединение (connection) – это набор параметров (логин, пароль, хост) и некая внешняя система c уникальным именем (conn_id
), к которой и происходит подключение. Такой системой может быть базы и хранилища данных, электронная почта, AWS, Google Cloud, Telegram и т.д. Всякий раз, когда вам нужно изнутри Airflow подключиться к БД, то будут использоваться соединения и хуки.
Список соединений доступен в пользовательском интерфейсе во вкладке Admin->Connection. Там же можно добавить новое соединение. Если у вас нет нужного типа соединения, то его следует установить. Для этого используется Airflow Provider, список доступных пакетов находится в документации. А установить их (по крайней мере в Airflow 2.0) можно через обычный pip, например, вот так можно получить соединение с PostgreSQL:
$ pip install apache-airflow-providers-postgres # или что то же самое: $ pip install apache-airflow[postgres]
Некоторые типы соединений отсутствуют в списке conn type, например, Telegram. Тогда единственным способом создать новое соединение становится использование CLI, как это показано в документации.
Data Pipeline на Apache Airflow
Код курса
AIRF
Ближайшая дата курса
27 ноября, 2024
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.
Соединение на примере SQLite
SQLite — легковесная СУБД, которая будет служить примером организации задач Airflow с базой данных. Создадим БД под названием tmp.db
и поместим её в директорию $HOME/airflow
. В этой БД создадим таблицу example
с двумя полями. Вот так все это выглядит:
$ cd airflow $ sqlite3 tmp.db sqlite> TABLE example(id INT, exec_time TEXT);
Ниже на рисунке показано, как создать соединение в пользовательском интерфейсе. Мы указали идентификатор связи, название БД и хост. Если бы это был PostgreSQL, то необходимо было бы добавить порт (например, 5432), логин и пароль (если есть). Термин scheme может отличаться в других БД.
Теперь построим такую задачу Airflow, которая будет добавлять значения полей.
Хук — это SQLOperator, который можно использовать в функциях
Хук (hook) предоставляет интерфейс для взаимодействия с внешней системой в пределах одного графа. Например, некоторые задачи требуют доступа к MySQL, и чтобы не устанавливать связь каждый раз в нужной задаче, можно использовать хук. Хуки также позволяют не хранить параметры аутентификации в графе. По сути своей, хук позволяет использовать возможности SQLOperator
внутри PythonOperator
.
Для баз данных используется единый API, поэтому методы для работы с ними будут одинаковыми. О хуках у нас есть статья. Итак, хук соединения SQLite импортируется и реализуется следующим образом:
from airflow.providers.sqlite.hooks.sqlite import SqliteHook sqlite_hook = SqliteHook(sqlite_conn_id='sqlite_conn')
Хуки баз данных наследуются от класса DbApiHook, поэтому если вам понадобятся узнать о методах, то лучше заглядывать в него.
Код курса
ADH-AIR
Ближайшая дата курса
по запросу
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Пусть имеются две задачи: первая добавляет значения в таблицу базы данных, а вторая выдает эти значения. Первая задача будет использовать хук, вторая — SQLOperator
, который будет иметь то же самый идентификатор соединения. Добавление значений полей в базу данных осуществляется через метод insert_rows
. Ниже продемонстрирован код в Python. На это примере можно заметить, что хук представляет собой SQLOperator, который можно использовать внутри функции.
from airflow import DAG from airflow.utils.dates import days_ago from airflow.operators.python import PythonOperator from airflow.providers.sqlite.operators.sqlite import SqliteOperator from airflow.providers.sqlite.hooks.sqlite import SqliteHook def insert(**kwargs): exec_time = kwargs['ts'] sqlite_hook = SqliteHook(sqlite_conn_id='sqlite_conn') rows = [(1, exec_time),] fields = ['id', 'exec_time'] sqlite_hook.insert_rows( table='example', rows=rows, target_fields=fields, ) with DAG( 'conhook', schedule_interval=None, start_date=days_ago(2), ) as dag: insert_into_example = PythonOperator( task_id='insert_into_example', python_callable=insert, ) print_rows = SqliteOperator( task_id='print_rows', sqlite_conn_id='sqlite_conn', sql='SELECT id, exec_time from example' ) insert_into_example >> print_rows
В некоторых версиях Airflow метод insert_rows
не работает для SQLite из-за проблем в исходном коде [1]. Данная проблема связана с тем, что добавляются значения в базу данных SQLite не совсем правильно, поскольку вместо указаний типов данных следует использовать знак ?
(подробности см. тут). Если возникают проблемы с БД, то можно выполнить запрос самостоятельно через метод run
, например так:
sql = """ INSERT INTO example (id, exec_time) values (%d, "%s") """ sqlite_hook = SqliteHook(sqlite_conn_id='sqlite_conn') sqlite_hook.run(sql % (0, exec_time))
Можно ещё раз убедиться, что новые значения появляются в таблице через интерфейс командной строки, выполнив тот же самый Select
.
Ещё больше подробностей о соединениях, хуках в Apache Airflow вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:
Источники