Простой пример объединения нескольких задач, описанных в разных Python-файлах, в единый DAG Apache AirFlow на кейсе выгрузки из реляционной базы PostgreSQL данных о выполненных заказах за последние 100 дней. Разработка и запуск кода в Google Colab.
Объединение задач из отдельных Python-файлах в один DAG AirFlow
Я уже показывала, как построить простой ETL-конвейер в Apache AirFlow на примере выгрузки старых заказов из базы данных интернет-магазина на PostgreSQL. В тот раз файл DAG включал у меня не только последовательность задач, но и их описание. Управлять таким длинным файлом совершенно неудобно: лучше описывать каждую задачу в отдельном py-файле, выстраивая их последовательность в общем конвейере. Как это сделать в Google Colab, рассмотрим далее.
Поскольку экземпляр Apache AirFlow у меня будет развернут на удаленной машине Google Colab, придется прокинуть туннель, чтобы сделать ее локальный хост доступным извне по URL. Как обычно, для этого я использую утилиту ngrok.
В этот раз мой конвейер, т.е. DAG состоит из следующих задач:
- start_task — фиктивный оператор, представляющий начало DAG;
- read_task – оператор Python, который выполняет функцию read_from_PostgreSQL_function для чтения данных из базы данных PostgreSQL. Для параметра Provide_context установлено значение True, чтобы получить доступ к контексту выполнения.
- write_task – оператор Python, который выполняет функцию write_to_JSON_function для записи данных, ранее считанных из БД, в файл JSON. Для параметра Provide_context также установлено значение True, поскольку результаты задачи read_task будут входом для write_task.
- end_task – фиктивный оператор, представляющий конец DAG.
Код DAG-файл выглядит следующим образом:
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime, timedelta
from ANNA_read_task import read_from_PostgreSQL_function
from ANNA_write_task import write_to_JSON_function
default_args = {
'owner': 'airflow',
'start_date': datetime.now() - timedelta(days=1),
'retries': 1
}
dag = DAG(
dag_id='ANNA_DAG_from_PotgreSQL_2_JSON_file',
default_args=default_args,
schedule_interval='@daily'
)
start_task = DummyOperator(task_id='start_task', dag=dag)
read_task = PythonOperator(
task_id='read_task',
provide_context=True,
python_callable=read_from_PostgreSQL_function,
dag=dag
)
write_task = PythonOperator(
task_id='write_task',
provide_context=True,
python_callable=write_to_JSON_function,
dag=dag
)
end_task = DummyOperator(task_id='end_task', dag=dag)
start_task >> read_task >> write_task >> end_task
Задачи read_task и write_task описаны в отдельных файлах. Например, код задачи чтения из базы данных выполненных заказов (со статусом 5) за последние 100 дней выглядит так:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import os
import psycopg2
import requests
def read_from_PostgreSQL_function(**kwargs):
connection_string = 'postgres://my-database'
conn = psycopg2.connect(connection_string)
select_query = """
SELECT
orders.id,
orders.date,
orders.sum,
TRIM(product.name),
product.price,
order_product.quantity,
TRIM(provider.name),
TRIM(customer.name),
TRIM(customer.phone),
TRIM(customer.email),
TRIM(customer_states.name),
delivery.date,
TRIM(delivery.address),
delivery.price
FROM
orders
JOIN order_product ON order_product.order = orders.id
JOIN product ON product.id=order_product.product
JOIN provider ON provider.id = product.provider
JOIN customer ON orders.customer = customer.id
JOIN customer_states ON customer_states.id = customer.state
JOIN order_states ON order_states.id = orders.state
JOIN delivery ON orders.delivery = delivery.id
WHERE (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-100) AND (CURRENT_DATE))
"""
cur = conn.cursor()
cur.execute(select_query)
results = cur.fetchall()
cur.close()
conn.close()
return results
А код записи этих данных в JSON-файл в задаче write_task выглядит так:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import json
import os
import psycopg2
def write_to_JSON_function(**kwargs):
results = kwargs['ti'].xcom_pull(task_ids='read_task')
orders = []
order_dict = {}
for result in results:
order_id, order_date, order_sum, product_name, product_price, product_quantity, product_provider, customer_name, customer_phone, customer_email, customer_status, delivery_date, delivery_address, delivery_price = result
if order_id not in order_dict:
order_dict[order_id] = {
'order_id': order_id,
'order_date': str(order_date),
'order_sum': order_sum,
'products': [],
'customer': {
'customer_name': customer_name,
'customer_phone': customer_phone,
'customer_email': customer_email,
'customer_status': customer_status
},
'delivery': {
'delivery_date': str(delivery_date),
'delivery_address': delivery_address,
'delivery_price': delivery_price
}
}
product = {
'product_name': product_name,
'product_price': product_price,
'product_quantity': product_quantity,
'product_provider': product_provider
}
order_dict[order_id]['products'].append(product)
orders = list(order_dict.values())
filename = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.json")
filepath = os.path.join('/content/airflow/files/delivered', filename)
with open(filepath, 'w') as f:
json.dump(orders, f)
f.close()
Если необходимо модифицировать эти задачи или добавить в DAG новые, это удобнее сделать, описав каждую задачу в отдельном файле. А вместе они запускаются в пакетном конвейере в виде DAG Apache AirFlow.

Реализация в Colab
Поскольку я запускаю код в интерактивной среде Google Colab, он будет более многословным. Например, сперва надо установить все необходимые библиотеки и импортировать пакеты:
###############################################ячейка №1 в Google Colab#############################
#Установка Apache Airflow и инициализация базы данных.
!pip install apache-airflow
!airflow initdb
#Установка инструмента ngrok для создания безопасного туннеля для доступа к веб-интерфейсу Airflow из любого места
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
!pip install pyngrok
#импорт модулей
from pyngrok import ngrok
import sys
import os
import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from google.colab import drive
drive.mount('/content/drive')
os.makedirs('/content/airflow/dags', exist_ok=True)
dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags")
os.makedirs(dags_folder, exist_ok=True)
#Получение версии установленного Apache Airflow
!airflow version
Затем следует запустить веб-сервер AirFlow в режиме демона, чтобы выполнение кода в этой ячейке Colab не блокировало другие:
!airflow webserver --port 8888 --daemon
Далее запускаем тунелирование с ngrok, задав токен аутентификации этой службы:
#Задание переменной auth_token для аутентификации в сервисе ngrok.
auth_token = " токен аутентификации, взятый с https://dashboard.ngrok.com/signup "
#Аутентификация в сервисе ngrok с помощью auth_token
os.system(f"ngrok authtoken {auth_token}")
#Запуск ngrok, который создаст публичный URL для сервера через туннель
#для доступа к веб-интерфейсу Airflow из любого места.
#addr="8888" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP
public_url = ngrok.connect(addr="8888", proto="http")
#Вывод публичного URL для доступа к веб-интерфейсу Airflow
print("Адрес Airflow GUI:", public_url)
В результате в области вывода Colab появится URL-адрес, по которому будет доступен ранее запущенный веб-сервер AirFlow. Потом необходимо создать пользователя Apache AirFlow, установить логин и пароль. Моего пользователя традиционно зовут anna с ролью администратора и паролем password:
!airflow db init #Инициализация базы данных Airflow !airflow upgradedb #Обновление базы данных Airflow #Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, адресом электронной почты anna.doe@example.com и паролем password. #Этот пользователь будет иметь роль Admin, которая дает полный доступ к интерфейсу Airflow. !airflow users create --username anna --firstname Anna --lastname Anna --email anna@example.com --role Admin --password password
Теперь можно войти в веб-интерфейс пакетного оркестратора, используя эти учетные данные.
Чтобы управлять задачами и самим DAG программным образом, я дополнила их код инструкциями записи в файл, сохранения в рабочую директорию AirFlow и копирования в пользовательскую папку для просмотра. Например, Python-код задачи чтения данных из PostgreSQL выглядит так:
########################Задача чтения из PostgreSQL#######################################
import psycopg2
from google.colab import files
code = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import os
import psycopg2
import requests
def read_from_PostgreSQL_function(**kwargs):
connection_string = 'postgres://my-database'
conn = psycopg2.connect(connection_string)
select_query = """
SELECT
orders.id,
orders.date,
orders.sum,
TRIM(product.name),
product.price,
order_product.quantity,
TRIM(provider.name),
TRIM(customer.name),
TRIM(customer.phone),
TRIM(customer.email),
TRIM(customer_states.name),
delivery.date,
TRIM(delivery.address),
delivery.price
FROM
orders
JOIN order_product ON order_product.order = orders.id
JOIN product ON product.id=order_product.product
JOIN provider ON provider.id = product.provider
JOIN customer ON orders.customer = customer.id
JOIN customer_states ON customer_states.id = customer.state
JOIN order_states ON order_states.id = orders.state
JOIN delivery ON orders.delivery = delivery.id
WHERE (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-100) AND (CURRENT_DATE))
"""
cur = conn.cursor()
cur.execute(select_query)
results = cur.fetchall()
cur.close()
conn.close()
return results
'''
with open('/root/airflow/dags/ANNA_read_task.py', 'w') as f:
f.write(code)
!cp ~/airflow/dags/ANNA_read_task.py /content/airflow/dags/ANNA_read_task.py
А задача записи результатов поиска в JSON-файл так:
############################задача записи в JSON-файл##################################
from google.colab import files
code = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import json
import os
def write_to_JSON_function(**kwargs):
results = kwargs['ti'].xcom_pull(task_ids='read_task')
orders = []
order_dict = {}
for result in results:
order_id, order_date, order_sum, product_name, product_price, product_quantity, product_provider, customer_name, customer_phone, customer_email, customer_status, delivery_date, delivery_address, delivery_price = result
if order_id not in order_dict:
order_dict[order_id] = {
'order_id': order_id,
'order_date': str(order_date),
'order_sum': order_sum,
'products': [],
'customer': {
'customer_name': customer_name,
'customer_phone': customer_phone,
'customer_email': customer_email,
'customer_status': customer_status
},
'delivery': {
'delivery_date': str(delivery_date),
'delivery_address': delivery_address,
'delivery_price': delivery_price
}
}
product = {
'product_name': product_name,
'product_price': product_price,
'product_quantity': product_quantity,
'product_provider': product_provider
}
order_dict[order_id]['products'].append(product)
orders = list(order_dict.values())
filename = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.json")
filepath = os.path.join('/content/airflow/files/delivered', filename)
with open(filepath, 'w') as f:
json.dump(orders, f)
f.close()
'''
with open('/root/airflow/dags/ANNA_write_task.py', 'w') as f:
f.write(code)
!cp ~/airflow/dags/ANNA_write_task.py /content/airflow/dags/ANNA_write_task.py
Сам DAG-файл создается следующим кодом:
########################из разных файлов DAG чтения из PostgreSQL и формирования JSON-файла############################
from google.colab import files
code = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime, timedelta
from ANNA_read_task import read_from_PostgreSQL_function
from ANNA_write_task import write_to_JSON_function
default_args = {
'owner': 'airflow',
'start_date': datetime.now() - timedelta(days=1),
'retries': 1
}
dag = DAG(
dag_id='ANNA_DAG_from_PotgreSQL_2_JSON_file',
default_args=default_args,
schedule_interval='@daily'
)
start_task = DummyOperator(task_id='start_task', dag=dag)
read_task = PythonOperator(
task_id='read_task',
provide_context=True,
python_callable=read_from_PostgreSQL_function,
dag=dag
)
write_task = PythonOperator(
task_id='write_task',
provide_context=True,
python_callable=write_to_JSON_function,
dag=dag
)
end_task = DummyOperator(task_id='end_task', dag=dag)
start_task >> read_task >> write_task >> end_task
'''
with open('/root/airflow/dags/ANNA_DAG_from_PotgreSQL_2_JSON_file.py', 'w') as f:
f.write(code)
!cp ~/airflow/dags/ANNA_DAG_from_PotgreSQL_2_JSON_file.py /content/airflow/dags/ANNA_DAG_from_PotgreSQL_2_JSON_file.py
!airflow dags unpause ANNA_DAG_from_PotgreSQL_2_JSON_file
Чтобы увидеть этот DAG в списке всех конвейеров, нужно запустить планировщик AirFlow также в фоновом режиме:
!airflow scheduler --daemon
При отсутствии ошибок DAG появится в веб-интерфейсе:

Передача данных между задачами происходит через механизм XCom, список всех этих объектов что можно также посмотреть в веб-интерфейсе:

После успешного выполнения DAG в пользовательской папке Colab появятся JSON-файлы с данными:

Продолжение работы с этими скриптами показано в новой статье на примере загрузки JSON-документов в документо-ориентированное NoSQL-хранилище Elasticsearch, развернутое в облачной платформе bonsai.io
Узнайте больше про Apache AirFlow и его практическое использование в дата-инженерии и аналитике больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:


