Простой пример объединения нескольких задач, описанных в разных Python-файлах, в единый DAG Apache AirFlow на кейсе выгрузки из реляционной базы PostgreSQL данных о выполненных заказах за последние 100 дней. Разработка и запуск кода в Google Colab.
Объединение задач из отдельных Python-файлах в один DAG AirFlow
Я уже показывала, как построить простой ETL-конвейер в Apache AirFlow на примере выгрузки старых заказов из базы данных интернет-магазина на PostgreSQL. В тот раз файл DAG включал у меня не только последовательность задач, но и их описание. Управлять таким длинным файлом совершенно неудобно: лучше описывать каждую задачу в отдельном py-файле, выстраивая их последовательность в общем конвейере. Как это сделать в Google Colab, рассмотрим далее.
Поскольку экземпляр Apache AirFlow у меня будет развернут на удаленной машине Google Colab, придется прокинуть туннель, чтобы сделать ее локальный хост доступным извне по URL. Как обычно, для этого я использую утилиту ngrok.
В этот раз мой конвейер, т.е. DAG состоит из следующих задач:
- start_task — фиктивный оператор, представляющий начало DAG;
- read_task – оператор Python, который выполняет функцию read_from_PostgreSQL_function для чтения данных из базы данных PostgreSQL. Для параметра Provide_context установлено значение True, чтобы получить доступ к контексту выполнения.
- write_task – оператор Python, который выполняет функцию write_to_JSON_function для записи данных, ранее считанных из БД, в файл JSON. Для параметра Provide_context также установлено значение True, поскольку результаты задачи read_task будут входом для write_task.
- end_task – фиктивный оператор, представляющий конец DAG.
Код DAG-файл выглядит следующим образом:
from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python import BranchPythonOperator from datetime import datetime, timedelta from ANNA_read_task import read_from_PostgreSQL_function from ANNA_write_task import write_to_JSON_function default_args = { 'owner': 'airflow', 'start_date': datetime.now() - timedelta(days=1), 'retries': 1 } dag = DAG( dag_id='ANNA_DAG_from_PotgreSQL_2_JSON_file', default_args=default_args, schedule_interval='@daily' ) start_task = DummyOperator(task_id='start_task', dag=dag) read_task = PythonOperator( task_id='read_task', provide_context=True, python_callable=read_from_PostgreSQL_function, dag=dag ) write_task = PythonOperator( task_id='write_task', provide_context=True, python_callable=write_to_JSON_function, dag=dag ) end_task = DummyOperator(task_id='end_task', dag=dag) start_task >> read_task >> write_task >> end_task
Задачи read_task и write_task описаны в отдельных файлах. Например, код задачи чтения из базы данных выполненных заказов (со статусом 5) за последние 100 дней выглядит так:
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import os import psycopg2 import requests def read_from_PostgreSQL_function(**kwargs): connection_string = 'postgres://my-database' conn = psycopg2.connect(connection_string) select_query = """ SELECT orders.id, orders.date, orders.sum, TRIM(product.name), product.price, order_product.quantity, TRIM(provider.name), TRIM(customer.name), TRIM(customer.phone), TRIM(customer.email), TRIM(customer_states.name), delivery.date, TRIM(delivery.address), delivery.price FROM orders JOIN order_product ON order_product.order = orders.id JOIN product ON product.id=order_product.product JOIN provider ON provider.id = product.provider JOIN customer ON orders.customer = customer.id JOIN customer_states ON customer_states.id = customer.state JOIN order_states ON order_states.id = orders.state JOIN delivery ON orders.delivery = delivery.id WHERE (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-100) AND (CURRENT_DATE)) """ cur = conn.cursor() cur.execute(select_query) results = cur.fetchall() cur.close() conn.close() return results
А код записи этих данных в JSON-файл в задаче write_task выглядит так:
from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import json import os import psycopg2 def write_to_JSON_function(**kwargs): results = kwargs['ti'].xcom_pull(task_ids='read_task') orders = [] order_dict = {} for result in results: order_id, order_date, order_sum, product_name, product_price, product_quantity, product_provider, customer_name, customer_phone, customer_email, customer_status, delivery_date, delivery_address, delivery_price = result if order_id not in order_dict: order_dict[order_id] = { 'order_id': order_id, 'order_date': str(order_date), 'order_sum': order_sum, 'products': [], 'customer': { 'customer_name': customer_name, 'customer_phone': customer_phone, 'customer_email': customer_email, 'customer_status': customer_status }, 'delivery': { 'delivery_date': str(delivery_date), 'delivery_address': delivery_address, 'delivery_price': delivery_price } } product = { 'product_name': product_name, 'product_price': product_price, 'product_quantity': product_quantity, 'product_provider': product_provider } order_dict[order_id]['products'].append(product) orders = list(order_dict.values()) filename = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.json") filepath = os.path.join('/content/airflow/files/delivered', filename) with open(filepath, 'w') as f: json.dump(orders, f) f.close()
Если необходимо модифицировать эти задачи или добавить в DAG новые, это удобнее сделать, описав каждую задачу в отдельном файле. А вместе они запускаются в пакетном конвейере в виде DAG Apache AirFlow.
Реализация в Colab
Поскольку я запускаю код в интерактивной среде Google Colab, он будет более многословным. Например, сперва надо установить все необходимые библиотеки и импортировать пакеты:
###############################################ячейка №1 в Google Colab############################# #Установка Apache Airflow и инициализация базы данных. !pip install apache-airflow !airflow initdb #Установка инструмента ngrok для создания безопасного туннеля для доступа к веб-интерфейсу Airflow из любого места !wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip !unzip ngrok-stable-linux-amd64.zip !pip install pyngrok #импорт модулей from pyngrok import ngrok import sys import os import datetime from airflow import DAG from airflow.operators.bash import BashOperator from google.colab import drive drive.mount('/content/drive') os.makedirs('/content/airflow/dags', exist_ok=True) dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags") os.makedirs(dags_folder, exist_ok=True) #Получение версии установленного Apache Airflow !airflow version
Затем следует запустить веб-сервер AirFlow в режиме демона, чтобы выполнение кода в этой ячейке Colab не блокировало другие:
!airflow webserver --port 8888 --daemon
Далее запускаем тунелирование с ngrok, задав токен аутентификации этой службы:
#Задание переменной auth_token для аутентификации в сервисе ngrok. auth_token = " токен аутентификации, взятый с https://dashboard.ngrok.com/signup " #Аутентификация в сервисе ngrok с помощью auth_token os.system(f"ngrok authtoken {auth_token}") #Запуск ngrok, который создаст публичный URL для сервера через туннель #для доступа к веб-интерфейсу Airflow из любого места. #addr="8888" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP public_url = ngrok.connect(addr="8888", proto="http") #Вывод публичного URL для доступа к веб-интерфейсу Airflow print("Адрес Airflow GUI:", public_url)
В результате в области вывода Colab появится URL-адрес, по которому будет доступен ранее запущенный веб-сервер AirFlow. Потом необходимо создать пользователя Apache AirFlow, установить логин и пароль. Моего пользователя традиционно зовут anna с ролью администратора и паролем password:
!airflow db init #Инициализация базы данных Airflow !airflow upgradedb #Обновление базы данных Airflow #Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, адресом электронной почты anna.doe@example.com и паролем password. #Этот пользователь будет иметь роль Admin, которая дает полный доступ к интерфейсу Airflow. !airflow users create --username anna --firstname Anna --lastname Anna --email anna@example.com --role Admin --password password
Теперь можно войти в веб-интерфейс пакетного оркестратора, используя эти учетные данные.
Чтобы управлять задачами и самим DAG программным образом, я дополнила их код инструкциями записи в файл, сохранения в рабочую директорию AirFlow и копирования в пользовательскую папку для просмотра. Например, Python-код задачи чтения данных из PostgreSQL выглядит так:
########################Задача чтения из PostgreSQL####################################### import psycopg2 from google.colab import files code = ''' from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import os import psycopg2 import requests def read_from_PostgreSQL_function(**kwargs): connection_string = 'postgres://my-database' conn = psycopg2.connect(connection_string) select_query = """ SELECT orders.id, orders.date, orders.sum, TRIM(product.name), product.price, order_product.quantity, TRIM(provider.name), TRIM(customer.name), TRIM(customer.phone), TRIM(customer.email), TRIM(customer_states.name), delivery.date, TRIM(delivery.address), delivery.price FROM orders JOIN order_product ON order_product.order = orders.id JOIN product ON product.id=order_product.product JOIN provider ON provider.id = product.provider JOIN customer ON orders.customer = customer.id JOIN customer_states ON customer_states.id = customer.state JOIN order_states ON order_states.id = orders.state JOIN delivery ON orders.delivery = delivery.id WHERE (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-100) AND (CURRENT_DATE)) """ cur = conn.cursor() cur.execute(select_query) results = cur.fetchall() cur.close() conn.close() return results ''' with open('/root/airflow/dags/ANNA_read_task.py', 'w') as f: f.write(code) !cp ~/airflow/dags/ANNA_read_task.py /content/airflow/dags/ANNA_read_task.py
А задача записи результатов поиска в JSON-файл так:
############################задача записи в JSON-файл################################## from google.colab import files code = ''' from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import json import os def write_to_JSON_function(**kwargs): results = kwargs['ti'].xcom_pull(task_ids='read_task') orders = [] order_dict = {} for result in results: order_id, order_date, order_sum, product_name, product_price, product_quantity, product_provider, customer_name, customer_phone, customer_email, customer_status, delivery_date, delivery_address, delivery_price = result if order_id not in order_dict: order_dict[order_id] = { 'order_id': order_id, 'order_date': str(order_date), 'order_sum': order_sum, 'products': [], 'customer': { 'customer_name': customer_name, 'customer_phone': customer_phone, 'customer_email': customer_email, 'customer_status': customer_status }, 'delivery': { 'delivery_date': str(delivery_date), 'delivery_address': delivery_address, 'delivery_price': delivery_price } } product = { 'product_name': product_name, 'product_price': product_price, 'product_quantity': product_quantity, 'product_provider': product_provider } order_dict[order_id]['products'].append(product) orders = list(order_dict.values()) filename = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.json") filepath = os.path.join('/content/airflow/files/delivered', filename) with open(filepath, 'w') as f: json.dump(orders, f) f.close() ''' with open('/root/airflow/dags/ANNA_write_task.py', 'w') as f: f.write(code) !cp ~/airflow/dags/ANNA_write_task.py /content/airflow/dags/ANNA_write_task.py
Сам DAG-файл создается следующим кодом:
########################из разных файлов DAG чтения из PostgreSQL и формирования JSON-файла############################ from google.colab import files code = ''' from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.python import BranchPythonOperator from datetime import datetime, timedelta from ANNA_read_task import read_from_PostgreSQL_function from ANNA_write_task import write_to_JSON_function default_args = { 'owner': 'airflow', 'start_date': datetime.now() - timedelta(days=1), 'retries': 1 } dag = DAG( dag_id='ANNA_DAG_from_PotgreSQL_2_JSON_file', default_args=default_args, schedule_interval='@daily' ) start_task = DummyOperator(task_id='start_task', dag=dag) read_task = PythonOperator( task_id='read_task', provide_context=True, python_callable=read_from_PostgreSQL_function, dag=dag ) write_task = PythonOperator( task_id='write_task', provide_context=True, python_callable=write_to_JSON_function, dag=dag ) end_task = DummyOperator(task_id='end_task', dag=dag) start_task >> read_task >> write_task >> end_task ''' with open('/root/airflow/dags/ANNA_DAG_from_PotgreSQL_2_JSON_file.py', 'w') as f: f.write(code) !cp ~/airflow/dags/ANNA_DAG_from_PotgreSQL_2_JSON_file.py /content/airflow/dags/ANNA_DAG_from_PotgreSQL_2_JSON_file.py !airflow dags unpause ANNA_DAG_from_PotgreSQL_2_JSON_file
Чтобы увидеть этот DAG в списке всех конвейеров, нужно запустить планировщик AirFlow также в фоновом режиме:
!airflow scheduler --daemon
При отсутствии ошибок DAG появится в веб-интерфейсе:
Передача данных между задачами происходит через механизм XCom, список всех этих объектов что можно также посмотреть в веб-интерфейсе:
После успешного выполнения DAG в пользовательской папке Colab появятся JSON-файлы с данными:
Продолжение работы с этими скриптами показано в новой статье на примере загрузки JSON-документов в документо-ориентированное NoSQL-хранилище Elasticsearch, развернутое в облачной платформе bonsai.io
Узнайте больше про Apache AirFlow и его практическое использование в дата-инженерии и аналитике больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: