Урок 3. Базы данных и Connections: как научить Airflow общаться с PostgreSQL и хранить секреты

Урок 3. Базы данных и Connections: как научить Airflow общаться с PostgreSQL и хранить секреты

 

Оркестратор сам по себе бесполезен. Apache Airflow — это дирижер, а не музыкант. Его задача — не хранить данные и не (всегда) обрабатывать их, а говорить другим системам, что делать. «Postgres, выполни этот запрос«, «Spark, посчитай эту витрину«, «S3, отдай файл«.

Но чтобы сказать «Postgres, выполни запрос«, Airflow должен знать, где этот Postgres находится и какой пароль использовать. Здесь новички часто совершают классическую ошибку: пишут параметры подключения прямо в коде DAG-а.

# Так делать НЕЛЬЗЯ
conn = psycopg2.connect("dbname=test user=postgres password=secret host=localhost")

Почему это плохо? Во-первых, вы «светите» пароли в репозитории кода (привет, служба безопасности). Во-вторых, если у вас изменится адрес базы данных, вам придется переписывать 50 файлов с дагами. В Airflow для этого придумана элегантная абстракция — Connections.

 

Что такое Connection и почему это гениально

 

Представьте, что Connection (соединение) — это запись в записной книжке вашего телефона. Вы не запоминаете наизусть номер друга, вы просто ищете его по имени «Вася». Если Вася сменит номер, вы поменяете его в одном месте — в контактах, и сможете продолжать звонить ему, просто нажимая на имя.

В Airflow это работает так же. Вы создаете соединение с идентификатором (Conn ID), например my_prod_postgres. Внутри вы прописываете хост, порт, логин и пароль. А в коде DAG-а вы просто говорите: «Используй my_prod_postgres».

Интерфейс создания Connection к DB Postgres в Apache Airflow GUI

Это дает нам три мощных преимущества:

  • Безопасность: Пароли хранятся в мета-базе Airflow в зашифрованном виде (при правильной настройке) и не попадают в Git.
  • Гибкость: Вы можете переключить DAG с тестовой базы на продуктовую, просто изменив настройки в веб-интерфейсе, не трогая ни строчки кода.
  • Чистота кода: Ваш Python-скрипт занимается логикой, а не конфигурацией инфраструктуры.

 

Операторы против Хуков: вечная путаница 

 

Когда вы начинаете работать с базами данных в Airflow, вы сталкиваетесь с двумя понятиями: Operator и Hook. Понять разницу критически важно.

Operator (Оператор) — это готовое действие. Это «кирпич», из которого строится стена. Пример: PostgresOperator. Вы даете ему SQL-запрос, и он его выполняет. Вам не нужно думать, как открывать соединение, как закрывать курсор, что делать при ошибке. Оператор — это высокий уровень абстракции: «Сделай это и забудь».

Hook (Хук) — это инструмент или ключ от двери. Это низкоуровневая обертка над библиотекой драйвера (в случае Postgres — над psycopg2). Хук сам по себе ничего не делает в DAG-е. Он используется внутри Python-кода (например, в PythonOperator), когда вам нужна сложная логика. Пример: Вам нужно прочитать данные из базы, проверить, есть ли там цифра 5, и если есть — отправить письмо, а если нет — ничего не делать. PostgresOperator тут не поможет (он умеет только выполнять SQL). Вам нужен PostgresHook, чтобы вытащить данные в переменную Python и написать if/else.

Простое правило:

  • Нужно просто выполнить SQL (CREATE TABLE, INSERT, DELETE)? Берите Operator.
  • Нужно вытащить данные и обработать их средствами Python (Pandas, циклы)? Берите Hook.

 

Apache Airflow для инженеров данных

Код курса
AIRF
Ближайшая дата курса
2 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

 

Практика: готовим инфраструктуру

 

В прошлых статьях мы развернули Postgres для метаданных самого Airflow. В реальной жизни бизнес-данные и метаданные оркестратора должны лежать в разных местах. Но для обучения мы схитрим и создадим отдельную базу данных внутри того же контейнера Postgres, который у нас уже есть.

Нам не нужно менять docker-compose.yaml. Достаточно подключиться к контейнеру базы и создать БД для экспериментов.

Выполните в терминале:

# Заходим внутрь контейнера с базой
docker exec -it airflow-course-postgres-1 psql -U airflow

# Внутри SQL-консоли создаем базу данных
CREATE DATABASE data_warehouse;
\q

Примечание: имя контейнера может отличаться, проверьте его через docker ps.

How to create connection for Airflow GUI

Теперь научим Airflow видеть эту новую базу.

  • Зайдите в веб-интерфейс Airflow (localhost:8080).
  • В верхнем меню выберите Admin -> Connections.
  • Нажмите синий плюс (+).
  • Заполните поля:

 

Connection Id my_dwh (это имя мы будем использовать в коде)
Connection Type Postgres (Важно! Мы используем имя сервиса из docker-compose, а не localhost)
Host postgres
Database data_warehouse
Login airflow
Password airflow
Port 5432
  • Нажмите «Test» (не всегда и не для всех Connection может быть доступна) или «Save«. Если увидите зеленую плашку — связь установлена.

Как создать Connection в Airflow пример

Тонкий момент с Docker Networking

Почему в поле Host мы написали postgres, а не localhost? Это самая частая ошибка новичков.

  • localhost для контейнера Airflow — это сам контейнер Airflow. Там нет базы данных.
  • База живет в соседнем контейнере.
  • В сети Docker контейнеры видят друг друга по именам сервисов, указанным в docker-compose.yaml. Так как мы назвали сервис базы postgres, именно так к нему и нужно обращаться.

 

Apache Airflow для инженеров данных

Код курса
AIRF
Ближайшая дата курса
2 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800

 

Пишем DAG: SQL-трансформации

 

Давайте создадим процесс, который моделирует простую витрину данных.

  • Создать таблицу users, если её нет.
  • Очистить её (на случай перезапуска).
  • Наполнить её случайными данными.

Создайте файл dags/postgres_etl.py. Мы начнем с использования PostgresOperator.

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime

# Определяем SQL запросы прямо здесь или выносим в отдельные файлы
create_table_sql = """
CREATE TABLE IF NOT EXISTS users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(50),
    signup_date DATE
);
"""

insert_data_sql = """
INSERT INTO users (name, signup_date) VALUES 
('Alice', '2023-01-01'),
('Bob', '2023-01-02'),
('Charlie', '2023-01-03');
"""

with DAG(
    dag_id="simple_postgres_etl",
    start_date=datetime(2023, 1, 1),
    schedule=None,
    catchup=False
) as dag:

    # Шаг 1: Создаем структуру
    create_task = PostgresOperator(
        task_id="create_table",
        postgres_conn_id="my_dwh", # Ссылаемся на Connection, который создали в UI
sql=create_table_sql
    )

    # Шаг 2: Чистим старые данные (для идемпотентности)
    clean_task = PostgresOperator(
        task_id="clean_table",
        postgres_conn_id="my_dwh",
        sql="TRUNCATE TABLE users;"
    )

    # Шаг 3: Наливаем данные
    fill_task = PostgresOperator(
        task_id="fill_table",
        postgres_conn_id="my_dwh",
        sql=insert_data_sql
    )

    # Задаем порядок выполнения
    create_task >> clean_task >> fill_task

Запустите этот DAG. Если все квадратики стали зелеными, поздравляю — вы только что управляли базой данных, не написав ни строчки кода подключения внутри пайплайна.

Логи выполнения Dag с Connection Postrgres AirFlow

 

Продвинутый уровень: Использование Hooks

 

А теперь представим задачу, которую SQL-ем решить сложно. Например, нам нужно подсчитать количество пользователей, и если их больше 2-х — вывести радостное сообщение в лог. Здесь нам понадобится Python и PostgresHook. Добавим в тот же файл новую задачу.

from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook

def check_user_count():
    # Инициализируем хук, используя тот же conn_id
    pg_hook = PostgresHook(postgres_conn_id="my_dwh")
    
    # Хук дает нам метод get_records для выполнения SQL и получения результата
    records = pg_hook.get_records(sql="SELECT COUNT(*) FROM users;")
    
    # Результат возвращается как список кортежей: [(3,)]
    count = records[0][0]
    print(f"Всего пользователей в базе: {count}")
    
    if count > 2:
        print("База успешно наполнена!")
    else:
        raise ValueError("Что-то пошло не так, данных слишком мало!")

# Создаем задачу PythonOperator
check_task = PythonOperator(
    task_id="check_data_quality",
    python_callable=check_user_count
)

# Добавляем в конец цепочки
fill_task >> check_task

 

Что здесь произошло? Мы не создавали соединение вручную через psycopg2. Мы попросили Airflow: «Дай мне хук для my_dwh«. Хук сам сходил в мета-базу, расшифровал пароль, открыл соединение, выполнил запрос и вернул данные. Это безопасно и чисто.

 

Использование Connection и Hook Operator в Apache Airflow

В последнем случае с ошибкой мы сократили в коде количество вставляемых записей чтобы вызвать ошибку исполнения DAG.

 

Безопасность: Переменные окружения

 

Создавать Connections через веб-интерфейс удобно для тестов, но в продакшене это «ручная работа«, которую инженеры не любят. Как автоматизировать этот процесс?

Airflow умеет читать настройки соединений из переменных окружения. Это «золотой стандарт» для CI/CD. Формат переменной такой: AIRFLOW_CONN_{CONN_ID_В_ВЕРХНЕМ_РЕГИСТРЕ}.

Если вы добавите в свой docker-compose.yaml (в раздел environment для всех сервисов) такую строку:

AIRFLOW_CONN_MY_DWH=postgresql://airflow:airflow@postgres:5432/data_warehouse

…то соединение my_dwh появится в Airflow автоматически. Его даже не будет видно в списке в UI (из соображений безопасности), но DAG-и смогут его использовать. Это идеальный способ передавать секреты, не сохраняя их в коде и не накликивая мышкой.

 

Troubleshooting — когда связи рвутся

 

Работа с сетью — главный источник головной боли.

Ошибка 1: psycopg2.OperationalError: connection to server at «localhost» … failed

  • Диагноз: Вы пытаетесь стучаться на localhost изнутри контейнера.
  • Лечение: Замените хост на имя сервиса Postgres из docker-compose (обычно postgres или db).

Ошибка 2: password authentication failed for user «airflow»

  • Диагноз: Либо опечатка в пароле, либо вы пытаетесь подключиться не к той базе данных.
  • Лечение: Проверьте, что в Connection вы указали верную схему (Database). По умолчанию Postgres создает базу, совпадающую с именем пользователя, но мы создавали data_warehouse вручную. Убедитесь, что она существует.

Ошибка 3: Table «users» does not exist

  • Диагноз: Вы подключились не к той базе данных.
  • Лечение: В Postgres кластере может быть много баз. Проверьте поле Schema в настройках соединения.

 

Помощь Cursor в работе с SQL

 

Вам не обязательно быть гуру SQL, чтобы писать сложные пайплайны. Cursor отлично справляется с ролью DBA.

Сценарий 1: Генерация DDL Выделите список полей в Python (например, словарь или датакласс) и напишите Cursor:

 

"Напиши SQL-запрос CREATE TABLE для Postgres, который соответствует этой структуре данных. Добавь поля created_at и updated_at."

 

 

Пример генерации кода CursorAI в DAG Airflow

Для пояснения:

  1. Фрагмент выделенного кода, который вы хотите исправить и добавление его в чат Cursor AI ( комбинация Ctrl+L)
  2. Включенный из VIM редактора код с 12 по 25 строку и Промпт на простом языке что вы хотите сделать.

 

Сценарий 2: Конвертация кода Если у вас есть старый скрипт на pandas и sqlalchemy, покажите его Cursor и попросите:

"Перепиши этот код, используя Airflow PostgresHook вместо прямого создания engine sqlalchemy. Убедись, что соединение берется из conn_id='my_dwh'."

В этой статье мы научили Airflow выходить за пределы своего контейнера и управлять реальными данными. Мы разобрали фундаментальную разницу между Операторами (для действий) и Хуками (для данных).

Теперь, когда мы умеем подключаться к базам данных, нам становится тесно в рамках структурированных таблиц. В следующей статье мы сделаем шаг в мир Big Data: подключим объектное хранилище S3 (на примере MinIO) и научимся перекладывать файлы между облаком и локальной системой, используя уже знакомые принципы Connections и Hooks.

Готовы расширять горизонты до облачных хранилищ?

 

Использованные референсы и материалы

 

Полный перечень статей Бесплатного курса «Apache Airflow для начинающих»

Урок 1. Apache Airflow с нуля: Архитектура, отличие от Cron и запуск в Docker

Урок 2. Масштабирование Airflow: Настройка CeleryExecutor и Redis в Docker Compose

Урок 3. Работа с базами данных в Airflow: Connections, Hooks и PostgresOperator

Урок 4. Airflow и S3: Интеграция с MinIO и Yandex Object Storage

Урок 5. Airflow и Hadoop: Настройка WebHDFS и работа с сенсорами (Sensors)

Урок 6. Запуск Apache Spark из Airflow: Гайд по SparkSubmitOperator

Урок 7. Airflow и Dask: Масштабирование тяжелых Python-задач и Pandas

Урок 8. Event-Driven Airflow: Запуск DAG по событиям из Apache Kafka

Урок 9. Загрузка данных в ClickHouse через Airflow: Быстрый ETL и батчинг

Урок 10. Airflow Best Practices: Динамические DAGи, TaskFlow API и Алертинг

Изменение базового тарифа с 1 января 2026 года Подробнее