Соединения и хуки в Apache Airflow: разбираем на примере SQLite

Соединения и хуки в Apache Airflow: разбираем на примере SQLite

    В прошлый раз мы говорили о способе взаимодействия задач между собой в Apache Airflow. Сегодня поговорим о таких сущностях, как соединение (connections) и хуки (hooks). Читайте в этой статье: что такое хук и соединение, как создать и скачать соединение, а также как подключить базу данных в Airflow.

    Что такое связи и хуки в Apache Airflow

    Соединение (connection) – это набор параметров (логин, пароль, хост) и некая внешняя система c уникальным именем (conn_id), к которой и происходит подключение. Такой системой может быть базы и хранилища данных, электронная почта, AWS, Google Cloud, Telegram и т.д. Всякий раз, когда вам нужно изнутри Airflow подключиться к БД, то будут использоваться соединения и хуки.

    Список соединений доступен в пользовательском интерфейсе во вкладке Admin->Connection. Там же можно добавить новое соединение. Если у вас нет нужного типа соединения, то его следует установить. Для этого используется Airflow Provider, список доступных пакетов находится в документации. А установить их (по крайней мере в Airflow 2.0) можно через обычный pip, например, вот так можно получить соединение с PostgreSQL:

    $ pip install apache-airflow-providers-postgres
    # или что то же самое:
    $ pip install apache-airflow[postgres]
    

    Некоторые типы соединений отсутствуют в списке conn type, например, Telegram. Тогда единственным способом создать новое соединение становится использование CLI, как это показано в документации.

    Apache Airflow для инженеров данных

    Код курса
    AIRF
    Ближайшая дата курса
    1 декабря, 2025
    Продолжительность
    24 ак.часов
    Стоимость обучения
    72 000

    Соединение на примере SQLite

    SQLite — легковесная СУБД, которая будет служить примером организации задач Airflow с базой данных. Создадим БД под названием tmp.db и поместим её в директорию $HOME/airflow. В этой БД создадим таблицу example с двумя полями. Вот так все это выглядит:

    $ cd airflow
    $ sqlite3 tmp.db
    sqlite> TABLE example(id INT, exec_time TEXT);
    

    Ниже на рисунке показано, как создать соединение в пользовательском интерфейсе. Мы указали идентификатор связи, название БД и хост. Если бы это был PostgreSQL, то необходимо было бы добавить порт (например, 5432), логин и пароль (если есть). Термин scheme может отличаться в других БД.

    Создание соединения в Apache Airflow
    Конфигурация соединения SQLite

    Теперь построим такую задачу Airflow, которая будет добавлять значения полей.

    Хук — это SQLOperator, который можно использовать в функциях

    Хук (hook) предоставляет интерфейс для взаимодействия с внешней системой в пределах одного графа. Например, некоторые задачи требуют доступа к MySQL, и чтобы не устанавливать связь каждый раз в нужной задаче, можно использовать хук. Хуки также позволяют не хранить параметры аутентификации в графе. По сути своей, хук позволяет использовать возможности SQLOperator внутри PythonOperator.

    Для баз данных используется единый API, поэтому методы для работы с ними будут одинаковыми. О хуках у нас есть статья. Итак, хук соединения SQLite импортируется и реализуется следующим образом:

    from airflow.providers.sqlite.hooks.sqlite import SqliteHook
    sqlite_hook = SqliteHook(sqlite_conn_id='sqlite_conn')
    

    Хуки баз данных наследуются от класса DbApiHook, поэтому если вам понадобятся узнать о методах, то лучше заглядывать в него.

    Пусть имеются две задачи: первая добавляет значения в таблицу базы данных, а вторая выдает эти значения. Первая задача будет использовать хук, вторая — SQLOperator, который будет иметь то же самый идентификатор соединения. Добавление значений полей в базу данных осуществляется через метод insert_rows. Ниже продемонстрирован код в Python. На это примере можно заметить, что хук представляет собой SQLOperator, который можно использовать внутри функции.

    from airflow import DAG
    from airflow.utils.dates import days_ago
    from airflow.operators.python import PythonOperator
    from airflow.providers.sqlite.operators.sqlite import SqliteOperator
    
    from airflow.providers.sqlite.hooks.sqlite import SqliteHook
    
    def insert(**kwargs):
        exec_time = kwargs['ts']
        sqlite_hook = SqliteHook(sqlite_conn_id='sqlite_conn')
        rows = [(1,  exec_time),]
        fields = ['id', 'exec_time']
        sqlite_hook.insert_rows(
            table='example',
            rows=rows,
            target_fields=fields,
        )
    
    with DAG(
        'conhook',
        schedule_interval=None,
        start_date=days_ago(2),
    ) as dag:
        insert_into_example = PythonOperator(
            task_id='insert_into_example',
            python_callable=insert,
        )
    
        print_rows = SqliteOperator(
            task_id='print_rows',
            sqlite_conn_id='sqlite_conn',
            sql='SELECT id, exec_time from example'
        )
    
        insert_into_example >> print_rows
    

    В некоторых версиях Airflow метод insert_rows не работает для SQLite из-за проблем в исходном коде [1]. Данная проблема связана с тем, что добавляются значения в базу данных SQLite не совсем правильно, поскольку вместо указаний типов данных следует использовать знак ? (подробности см. тут). Если возникают проблемы с БД, то можно выполнить запрос самостоятельно через метод run, например так:

    sql = """
    INSERT INTO example (id, exec_time)
    values (%d, "%s")
    """
    sqlite_hook = SqliteHook(sqlite_conn_id='sqlite_conn')
    sqlite_hook.run(sql % (0, exec_time))
    

    Можно ещё раз убедиться, что новые значения появляются в таблице через интерфейс командной строки, выполнив тот же самый Select.

     

    Ещё больше подробностей о соединениях, хуках в Apache Airflow вы узнаете на наших образовательных курсах в лицензированном учебном центре обучения и повышения квалификации руководителей и ИТ-специалистов (менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data) в Москве:

    Смотреть расписание

    Источники

    1. https://github.com/apache/airflow/pull/17695