Содержание
- Что такое Connection и почему это гениально
- Операторы против Хуков: вечная путаница
- Практика: готовим инфраструктуру
- Пишем DAG: SQL-трансформации
- Продвинутый уровень: Использование Hooks
- Безопасность: Переменные окружения
- Troubleshooting - когда связи рвутся
- Помощь Cursor в работе с SQL
- Использованные референсы и материалы
- Полный перечень статей Бесплатного курса "Apache Airflow для начинающих"
Оркестратор сам по себе бесполезен. Apache Airflow — это дирижер, а не музыкант. Его задача — не хранить данные и не (всегда) обрабатывать их, а говорить другим системам, что делать. «Postgres, выполни этот запрос«, «Spark, посчитай эту витрину«, «S3, отдай файл«.
Но чтобы сказать «Postgres, выполни запрос«, Airflow должен знать, где этот Postgres находится и какой пароль использовать. Здесь новички часто совершают классическую ошибку: пишут параметры подключения прямо в коде DAG-а.
# Так делать НЕЛЬЗЯ
conn = psycopg2.connect("dbname=test user=postgres password=secret host=localhost")
Почему это плохо? Во-первых, вы «светите» пароли в репозитории кода (привет, служба безопасности). Во-вторых, если у вас изменится адрес базы данных, вам придется переписывать 50 файлов с дагами. В Airflow для этого придумана элегантная абстракция — Connections.
Что такое Connection и почему это гениально
Представьте, что Connection (соединение) — это запись в записной книжке вашего телефона. Вы не запоминаете наизусть номер друга, вы просто ищете его по имени «Вася». Если Вася сменит номер, вы поменяете его в одном месте — в контактах, и сможете продолжать звонить ему, просто нажимая на имя.
В Airflow это работает так же. Вы создаете соединение с идентификатором (Conn ID), например my_prod_postgres. Внутри вы прописываете хост, порт, логин и пароль. А в коде DAG-а вы просто говорите: «Используй my_prod_postgres».
Это дает нам три мощных преимущества:
- Безопасность: Пароли хранятся в мета-базе Airflow в зашифрованном виде (при правильной настройке) и не попадают в Git.
- Гибкость: Вы можете переключить DAG с тестовой базы на продуктовую, просто изменив настройки в веб-интерфейсе, не трогая ни строчки кода.
- Чистота кода: Ваш Python-скрипт занимается логикой, а не конфигурацией инфраструктуры.
Операторы против Хуков: вечная путаница
Когда вы начинаете работать с базами данных в Airflow, вы сталкиваетесь с двумя понятиями: Operator и Hook. Понять разницу критически важно.
Operator (Оператор) — это готовое действие. Это «кирпич», из которого строится стена. Пример: PostgresOperator. Вы даете ему SQL-запрос, и он его выполняет. Вам не нужно думать, как открывать соединение, как закрывать курсор, что делать при ошибке. Оператор — это высокий уровень абстракции: «Сделай это и забудь».
Hook (Хук) — это инструмент или ключ от двери. Это низкоуровневая обертка над библиотекой драйвера (в случае Postgres — над psycopg2). Хук сам по себе ничего не делает в DAG-е. Он используется внутри Python-кода (например, в PythonOperator), когда вам нужна сложная логика. Пример: Вам нужно прочитать данные из базы, проверить, есть ли там цифра 5, и если есть — отправить письмо, а если нет — ничего не делать. PostgresOperator тут не поможет (он умеет только выполнять SQL). Вам нужен PostgresHook, чтобы вытащить данные в переменную Python и написать if/else.
Простое правило:
- Нужно просто выполнить SQL (CREATE TABLE, INSERT, DELETE)? Берите Operator.
- Нужно вытащить данные и обработать их средствами Python (Pandas, циклы)? Берите Hook.
Apache Airflow для инженеров данных
Код курса
AIRF
Ближайшая дата курса
2 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Практика: готовим инфраструктуру
В прошлых статьях мы развернули Postgres для метаданных самого Airflow. В реальной жизни бизнес-данные и метаданные оркестратора должны лежать в разных местах. Но для обучения мы схитрим и создадим отдельную базу данных внутри того же контейнера Postgres, который у нас уже есть.
Нам не нужно менять docker-compose.yaml. Достаточно подключиться к контейнеру базы и создать БД для экспериментов.
Выполните в терминале:
# Заходим внутрь контейнера с базой docker exec -it airflow-course-postgres-1 psql -U airflow # Внутри SQL-консоли создаем базу данных CREATE DATABASE data_warehouse; \q
Примечание: имя контейнера может отличаться, проверьте его через docker ps.
Теперь научим Airflow видеть эту новую базу.
- Зайдите в веб-интерфейс Airflow (localhost:8080).
- В верхнем меню выберите Admin -> Connections.
- Нажмите синий плюс (+).
- Заполните поля:
| Connection Id | my_dwh | (это имя мы будем использовать в коде) |
| Connection Type | Postgres | (Важно! Мы используем имя сервиса из docker-compose, а не localhost) |
| Host | postgres | |
| Database | data_warehouse | |
| Login | airflow | |
| Password | airflow | |
| Port | 5432 |
- Нажмите «Test» (не всегда и не для всех Connection может быть доступна) или «Save«. Если увидите зеленую плашку — связь установлена.
Тонкий момент с Docker Networking
Почему в поле Host мы написали postgres, а не localhost? Это самая частая ошибка новичков.
- localhost для контейнера Airflow — это сам контейнер Airflow. Там нет базы данных.
- База живет в соседнем контейнере.
- В сети Docker контейнеры видят друг друга по именам сервисов, указанным в docker-compose.yaml. Так как мы назвали сервис базы postgres, именно так к нему и нужно обращаться.
Apache Airflow для инженеров данных
Код курса
AIRF
Ближайшая дата курса
2 марта, 2026
Продолжительность
24 ак.часов
Стоимость обучения
76 800
Пишем DAG: SQL-трансформации
Давайте создадим процесс, который моделирует простую витрину данных.
- Создать таблицу users, если её нет.
- Очистить её (на случай перезапуска).
- Наполнить её случайными данными.
Создайте файл dags/postgres_etl.py. Мы начнем с использования PostgresOperator.
from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime
# Определяем SQL запросы прямо здесь или выносим в отдельные файлы
create_table_sql = """
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(50),
signup_date DATE
);
"""
insert_data_sql = """
INSERT INTO users (name, signup_date) VALUES
('Alice', '2023-01-01'),
('Bob', '2023-01-02'),
('Charlie', '2023-01-03');
"""
with DAG(
dag_id="simple_postgres_etl",
start_date=datetime(2023, 1, 1),
schedule=None,
catchup=False
) as dag:
# Шаг 1: Создаем структуру
create_task = PostgresOperator(
task_id="create_table",
postgres_conn_id="my_dwh", # Ссылаемся на Connection, который создали в UI
sql=create_table_sql
)
# Шаг 2: Чистим старые данные (для идемпотентности)
clean_task = PostgresOperator(
task_id="clean_table",
postgres_conn_id="my_dwh",
sql="TRUNCATE TABLE users;"
)
# Шаг 3: Наливаем данные
fill_task = PostgresOperator(
task_id="fill_table",
postgres_conn_id="my_dwh",
sql=insert_data_sql
)
# Задаем порядок выполнения
create_task >> clean_task >> fill_task
Запустите этот DAG. Если все квадратики стали зелеными, поздравляю — вы только что управляли базой данных, не написав ни строчки кода подключения внутри пайплайна.
Продвинутый уровень: Использование Hooks
А теперь представим задачу, которую SQL-ем решить сложно. Например, нам нужно подсчитать количество пользователей, и если их больше 2-х — вывести радостное сообщение в лог. Здесь нам понадобится Python и PostgresHook. Добавим в тот же файл новую задачу.
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
def check_user_count():
# Инициализируем хук, используя тот же conn_id
pg_hook = PostgresHook(postgres_conn_id="my_dwh")
# Хук дает нам метод get_records для выполнения SQL и получения результата
records = pg_hook.get_records(sql="SELECT COUNT(*) FROM users;")
# Результат возвращается как список кортежей: [(3,)]
count = records[0][0]
print(f"Всего пользователей в базе: {count}")
if count > 2:
print("База успешно наполнена!")
else:
raise ValueError("Что-то пошло не так, данных слишком мало!")
# Создаем задачу PythonOperator
check_task = PythonOperator(
task_id="check_data_quality",
python_callable=check_user_count
)
# Добавляем в конец цепочки
fill_task >> check_task
Что здесь произошло? Мы не создавали соединение вручную через psycopg2. Мы попросили Airflow: «Дай мне хук для my_dwh«. Хук сам сходил в мета-базу, расшифровал пароль, открыл соединение, выполнил запрос и вернул данные. Это безопасно и чисто.
В последнем случае с ошибкой мы сократили в коде количество вставляемых записей чтобы вызвать ошибку исполнения DAG.
Безопасность: Переменные окружения
Создавать Connections через веб-интерфейс удобно для тестов, но в продакшене это «ручная работа«, которую инженеры не любят. Как автоматизировать этот процесс?
Airflow умеет читать настройки соединений из переменных окружения. Это «золотой стандарт» для CI/CD. Формат переменной такой: AIRFLOW_CONN_{CONN_ID_В_ВЕРХНЕМ_РЕГИСТРЕ}.
Если вы добавите в свой docker-compose.yaml (в раздел environment для всех сервисов) такую строку:
AIRFLOW_CONN_MY_DWH=postgresql://airflow:airflow@postgres:5432/data_warehouse
…то соединение my_dwh появится в Airflow автоматически. Его даже не будет видно в списке в UI (из соображений безопасности), но DAG-и смогут его использовать. Это идеальный способ передавать секреты, не сохраняя их в коде и не накликивая мышкой.
Troubleshooting — когда связи рвутся
Работа с сетью — главный источник головной боли.
Ошибка 1: psycopg2.OperationalError: connection to server at «localhost» … failed
- Диагноз: Вы пытаетесь стучаться на localhost изнутри контейнера.
- Лечение: Замените хост на имя сервиса Postgres из docker-compose (обычно postgres или db).
Ошибка 2: password authentication failed for user «airflow»
- Диагноз: Либо опечатка в пароле, либо вы пытаетесь подключиться не к той базе данных.
- Лечение: Проверьте, что в Connection вы указали верную схему (Database). По умолчанию Postgres создает базу, совпадающую с именем пользователя, но мы создавали data_warehouse вручную. Убедитесь, что она существует.
Ошибка 3: Table «users» does not exist
- Диагноз: Вы подключились не к той базе данных.
- Лечение: В Postgres кластере может быть много баз. Проверьте поле Schema в настройках соединения.
Помощь Cursor в работе с SQL
Вам не обязательно быть гуру SQL, чтобы писать сложные пайплайны. Cursor отлично справляется с ролью DBA.
Сценарий 1: Генерация DDL Выделите список полей в Python (например, словарь или датакласс) и напишите Cursor:
"Напиши SQL-запрос CREATE TABLE для Postgres, который соответствует этой структуре данных. Добавь поля created_at и updated_at."
Для пояснения:
- Фрагмент выделенного кода, который вы хотите исправить и добавление его в чат Cursor AI ( комбинация Ctrl+L)
- Включенный из VIM редактора код с 12 по 25 строку и Промпт на простом языке что вы хотите сделать.
Сценарий 2: Конвертация кода Если у вас есть старый скрипт на pandas и sqlalchemy, покажите его Cursor и попросите:
"Перепиши этот код, используя Airflow PostgresHook вместо прямого создания engine sqlalchemy. Убедись, что соединение берется из conn_id='my_dwh'."
В этой статье мы научили Airflow выходить за пределы своего контейнера и управлять реальными данными. Мы разобрали фундаментальную разницу между Операторами (для действий) и Хуками (для данных).
Теперь, когда мы умеем подключаться к базам данных, нам становится тесно в рамках структурированных таблиц. В следующей статье мы сделаем шаг в мир Big Data: подключим объектное хранилище S3 (на примере MinIO) и научимся перекладывать файлы между облаком и локальной системой, используя уже знакомые принципы Connections и Hooks.
Готовы расширять горизонты до облачных хранилищ?
Использованные референсы и материалы
- Managing Connections in Airflow
https://airflow.apache.org/docs/apache-airflow/stable/howto/connection.html
Детальный разбор того, как Airflow хранит креды, и как использовать Environment Variables вместо GUI. - Apache Airflow Provider for PostgreSQL
https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/index.html
Полный список операторов и хуков для работы с Postgres. - Psycopg2 — PostgreSQL adapter for Python
https://www.psycopg.org/docs/
Документация драйвера, который работает «под капотом» у Airflow при общении с БД.
Полный перечень статей Бесплатного курса «Apache Airflow для начинающих»
Урок 1. Apache Airflow с нуля: Архитектура, отличие от Cron и запуск в Docker
Урок 2. Масштабирование Airflow: Настройка CeleryExecutor и Redis в Docker Compose
Урок 3. Работа с базами данных в Airflow: Connections, Hooks и PostgresOperator
Урок 4. Airflow и S3: Интеграция с MinIO и Yandex Object Storage
Урок 5. Airflow и Hadoop: Настройка WebHDFS и работа с сенсорами (Sensors)
Урок 6. Запуск Apache Spark из Airflow: Гайд по SparkSubmitOperator
Урок 7. Airflow и Dask: Масштабирование тяжелых Python-задач и Pandas
Урок 8. Event-Driven Airflow: Запуск DAG по событиям из Apache Kafka
Урок 9. Загрузка данных в ClickHouse через Airflow: Быстрый ETL и батчинг
Урок 10. Airflow Best Practices: Динамические DAGи, TaskFlow API и Алертинг








