Сегодня на практическом примере посмотрим, как запускать в DAG Apache AirFlow параллельное исполнение нескольких задач, применим пару лучших практик реализации ETL-конвейера для работы с PostgreSQL, а также разберем неоднозначности программного добавления соединений с внешними системами.
Постановка задачи
Предположим, необходимо получить аналитику по продажам товаров интернет-магазина, выгрузив данные из PostgreSQL в виде табличных и графических PDF-отчетов. Пример простого Python-кода для этой задачи я показывала в блоге нашей Школы прикладного бизнес-анализа. Чтобы показать, как работает распараллеливание задач в рамках одного DAG Apache AirFlow, будем формировать 2 разных отчета: один с гистограммой распределения дохода по месяцам, а другой – по самым продаваемым товарам. После формирования обоих отчетов в Телеграм-бот должно приходить уведомление об успешном выполнении этих задач.
Как обычно, в качестве примера я возьму базу данных своего ранее спроектированного и реализованного интернет-магазина, которая имеет следующую схему для PostgreSQL
Итоговый отчет в виде неизменяемого PDF-файла должен содержать данные о помесячном доходе и количестве заказов, а также сведения о наиболее популярных, т.е. самых часто продаваемых товарах в текстовой и графической формах. Для визуализации помесячной выручки отлично подходят гистограммы, а для оценки популярности конкретного товара относительно других – круговая диаграмма. Все эти и другие виды графиков поддерживает Python-библиотека matplotlib. Помимо функций для работы с двумерной и трехмерной графикой, она также позволяет формировать PDF-документ с текстовыми, табличными, численными и графическими данными. Поэтому для решаемой задачи будем использовать именно ее методы API, формируя задачи для DAG AirFlow.
Реализация DAG AirFlow
Поскольку я запускаю AirFlow как Python-библиотеку в интерактивной среде Google Colab, сперва необходимо установить требуемые зависимости:
#Установка Apache Airflow и инициализация базы данных. !pip install apache-airflow !airflow initdb #Установка инструмента ngrok для создания безопасного туннеля для доступа к веб-интерфейсу Airflow из любого места !wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip !unzip ngrok-stable-linux-amd64.zip !pip install pyngrok #установка провайдеров !pip install apache-airflow-providers-telegram !pip install psycopg2-binary !pip install apache-airflow-providers-postgres #импорт модулей from pyngrok import ngrok import sys import os import json import datetime from airflow import DAG from airflow.providers.telegram.operators.telegram import TelegramOperator
Создадим директории, где будут храниться Python-файлы отдельных задач и SQL-запросов.
from google.colab import drive drive.mount('/content/drive') os.makedirs('/content/airflow/dags', exist_ok=True) #создание директории с py-файлами os.makedirs('/content/airflow/files/reports', exist_ok=True) #создание директории с отчетами os.makedirs('/content/airflow/sql', exist_ok=True) #создание директории с отчетами dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags") os.makedirs(dags_folder, exist_ok=True) os.makedirs("/root/airflow/dags/sql", exist_ok=True)
Далее запустим веб-сервер AirFlow
#запуска веб-сервера Apache Airflow на порту 8888. Веб-сервер Airflow предоставляет пользовательский интерфейс для управления DAGами, #просмотра логов выполнения задач, мониторинга прогресса выполнения !airflow webserver --port 8888 --daemon
Для доступа к веб-серверу AirFlow извне необходимо пробросить туннель для раскрытия порта, на котором он запущен. Как обычно, я делаю это с помощью утилиты ngrok:
#Аутентификация в сервисе ngrok с помощью auth_token os.system(f"ngrok authtoken {auth_token}") #Запуск ngrok, который создаст публичный URL для сервера через туннель #для доступа к веб-интерфейсу Airflow из любого места. #addr="8888" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP public_url = ngrok.connect(addr="8888", proto="http") #Вывод публичного URL для доступа к веб-интерфейсу Airflow print("Адрес Airflow GUI:", public_url)
Создадим пользователя AirFlow, задав ему логин и пароль:
!airflow db init #Инициализация базы данных Airflow !airflow upgradedb #Обновление базы данных Airflow #Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, адресом электронной почты anna.doe@example.com и паролем password. #Этот пользователь будет иметь роль Admin, которая дает полный доступ к интерфейсу Airflow. !airflow users create --username anna --firstname Anna --lastname Anna --email anna@example.com --role Admin --password password
Поскольку указывать параметры подключения к внешним системам, таким как СУБД PostgreSQL или Telegram непосредственно в коде DAG считается плохой практикой, зададим это программным образом. Например, мой экземпляр PostgreSQL развернут в облачной платформе Neon:
######################################добавляем соединение с PG################################# from airflow import settings from airflow.models import Connection from sqlalchemy.orm import sessionmaker # Установите параметры соединения conn_id = "my_postgres_conn" conn_type = "postgres" host = "my-v-host.aws.neon.tech" schema = "neondb" login = "username" password = "password" port = 5432 # Создайте объект соединения new_conn = Connection( conn_id=conn_id, conn_type=conn_type, host=host, schema=schema, login=login, password=password, port=port ) # Создайте сессию Session = sessionmaker(bind=settings.engine) session = Session() if not session.query(Connection).filter(Connection.conn_id == conn_id).first(): # Добавьте соединение в базу данных, если его ещё нет session.add(new_conn) session.commit() session.close()
Примечательно, что при задании параметров соединения в ключе schema надо указать название БД, а не схему:
Аналогичным образом определим параметры подключения к TG-боту:
######################################добавляем соединение с TG################################# from airflow import settings from airflow.models import Connection from sqlalchemy.orm import sessionmaker # Создайте объект соединения new_conn = Connection( conn_id='my_TG_conn', conn_type='http', login='login', password='API-token TG', host='telegram_chat_id', # telegram_chat_id schema='', # нет схемы port=None, # нет порта extra=json.dumps({"telegram_token": telegram_token, "telegram_chat_id": telegram_chat_id}) ) # Создайте сессию Session = sessionmaker(bind=settings.engine) session = Session() # Используйте conn_id из объекта соединения для проверки if not session.query(Connection).filter(Connection.conn_id == new_conn.conn_id).first(): # Добавьте соединение в базу данных, если его ещё нет session.add(new_conn) session.commit() session.close()
Созданные подключения отобразятся в GUI веб-сервера AirFlow:
Аналогично подключениям, вписывать текст SQL-запроса непосредственно в Python-файл задачи DAG считается плохой практикой. Лучше описать его в отдельном sql-файле, который потом будет использоваться как параметр для задачи оператора PostgreSQL. Чтобы считать данные из файла с SQL-запросами, AirFlow должен иметь доступ к нему, т.е. файл должен размещаться в той же среде, где работает AirFlow и должен быть доступен для чтения. При использовании PostgresOperator и указания пути к файлу с SQL-запросом, AirFlow ожидает, что он будет находиться внутри директории dags или в папке, которая указана в настройках sql для Airflow. По умолчанию SQL-файлы ищутся в папке dags и подпапках. Поэтому будем сохранять их в ранее созданной директории /root/airflow/dags/sql:
Файл с SQL-запросом получения помесячной выручки из БД создается таким образом:
###################################SQL-запрос получения помесячной выручки из БД (сумма всех заказов, сгруппированных по месяцам)################ from google.colab import files code = ''' SELECT to_char(orders.date, 'Month') AS mes, SUM(orders.sum) AS income, COUNT(orders.sum) AS quantity FROM orders GROUP BY mes ORDER BY (MIN(orders.date)); ''' with open('/root/airflow/dags/sql/SQL-zapros_month_income.sql', 'w') as f: f.write(code) !cp ~/airflow/dags/sql/SQL-zapros_month_income.sql /content/airflow/sql/SQL-zapros_month_income.sql
Файл с SQL-запросом получения наиболее популярных товаров создается так:
###################################SQL-запрос получения больше всего востребованных товаров из БД ######################(тех товаров во всех заказах, которых купили больше 500 единиц)################ from google.colab import files code = ''' SELECT product.name AS tovar, SUM(order_product.quantity) AS kolichestvo FROM order_product JOIN product on product.id=order_product.product JOIN orders on orders.id=order_product.order WHERE (EXTRACT(MONTH FROM orders.date) BETWEEN 1 AND 6) GROUP BY tovar HAVING SUM(order_product.quantity)>500 ORDER BY kolichestvo; ''' with open('/root/airflow/dags/sql/SQL-zapros_popular_products.sql', 'w') as f: f.write(code) !cp ~/airflow/dags/sql/SQL-zapros_popular_products.sql /content/airflow/sql/SQL-zapros_popular_products.sql
Создадим Python-файл с задачей формирования гистрограммы по данным из SQL-запроса помесячной выручки:
############################задача формирования PDF-документа с гистограммой################################## from google.colab import files code = ''' from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import matplotlib.pyplot as plt from matplotlib.backends.backend_pdf import PdfPages import pandas as pd import datetime import os def write_hist_to_pdf_function(**kwargs): results = kwargs['ti'].xcom_pull(task_ids='month_income_extract_task') for result in results: df = pd.DataFrame(results, columns=['МЕСЯЦ', 'ДОХОД', 'КОЛИЧЕСТВО ЗАКАЗОВ']) # Получаем текущую дату current_date = datetime.datetime.now() # Форматируем дату в строку (например, '2023-03-15') date_str = current_date.strftime('%Y-%m-%d') filename = '/content/airflow/files/HIST_report_' + date_str + '.pdf' with PdfPages(filename) as pdf: fig_table, ax_table = plt.subplots(figsize=(6, 10)) # Выберите подходящий размер фигуры # Задание заголовка ax_table.set_title(f"Доходы по месяцам") # Задание содержимого таблицы со статистикой ax_table.axis('tight') # задаем границы области для таблицы так, чтобы они плотно обрамляли содержимое ax_table.axis('off') # выключаем отображение осей для таблицы (нет границ и делений) table = ax_table.table(cellText=df.values, colLabels=df.columns, loc='center') #задаем содержимое ячеек таблицы, заголовки столбцов и расположение таблицы table.set_fontsize(10) # устанавливаем размер шрифта для текста в таблице вручную table.scale(1.2, 1.2) # Можно изменить масштаб таблицы для лучшего отображения # Сохранение в pdf и закрытие страницы в файле, чтобы освободить память, связанную с этим объектом Figure в Matplotlib pdf.savefig(fig_table) plt.close(fig_table) # Создание фигуры и осей для гистограммы fig_hist, ax_hist = plt.subplots(figsize=(14, 6)) # Выберите подходящий размер фигуры # Установим метки на оси X с названиями месяцев ax_hist.set_xticks(range(1+len(df['МЕСЯЦ']))) # Строим столбики гистограммы ax_hist.bar(df['МЕСЯЦ'], df['ДОХОД'], width=0.4, edgecolor="white", label='доход за месяц', linewidth=0.7) # Рисуем число заказов ax_hist.plot(df['МЕСЯЦ'], 5000*df['КОЛИЧЕСТВО ЗАКАЗОВ'], 'r', label='количество заказов (в масштабе x5000)', linewidth=2.0) # Задание содержимого таблицы со статистикой ax_hist.set_title(f"Доход от заказов и их количество по месяцам") #Задание осей гистограммы ax_hist.set_xlabel('Месяц') ax_hist.set_ylabel('Доход') # Добавляем легенду на график ax_hist.legend() #Сохранение в pdf и закрытие страницы в файле, чтобы освободить память, связанную с этим объектом Figure в Matplotlib pdf.savefig(fig_hist) plt.close(fig_hist) return current_date ''' with open('/root/airflow/dags/ANNA_HIST_report_PDF_task.py', 'w') as f: f.write(code) !cp ~/airflow/dags/ANNA_HIST_report_PDF_task.py /content/airflow/dags/ANNA_HIST_report_PDF_task.py
Python-файл с задачей формирования круговой диаграммы по данным из SQL-запроса о наиболее популярных товарах:
############################задача формирования PDF-документа с круговой диаграммой################################## from google.colab import files code = ''' from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import matplotlib.pyplot as plt from matplotlib.backends.backend_pdf import PdfPages import pandas as pd import datetime import os def write_circle_to_pdf_function(**kwargs): results = kwargs['ti'].xcom_pull(task_ids='popular_products_extract_task') for result in results: df = pd.DataFrame(results, columns=['ТОВАР', 'КОЛИЧЕСТВО']) # Получаем текущую дату current_date = datetime.datetime.now() # Форматируем дату в строку (например, '2023-03-15') date_str = current_date.strftime('%Y-%m-%d') filename = '/content/airflow/files/CIRCLE_report_' + date_str + '.pdf' with PdfPages(filename) as pdf: fig_table, ax_table = plt.subplots(figsize=(6, 10)) # Выберите подходящий размер фигуры #Задание заголовка ax_table.set_title(f"Самые часто покупаемые товары за 1-ые полгода") # Задание содержимого таблицы со статистикой ax_table.axis('tight') #задаем границы области для таблицы так, чтобы они плотно обрамляли содержимое ax_table.axis('off') #выключаем отображение осей для таблицы (нет границ и делений) table = ax_table.table(cellText=df.values, colLabels=df.columns, loc='center') #задаем содержимое ячеек таблицы, заголовки столбцов и расположение таблицы #table.auto_set_font_size(True) table.set_fontsize(8) #устанавливаем размер шрифта для текста в таблице вручную table.scale(1.2, 1.2) # Можно изменить масштаб таблицы для лучшего отображения #Сохранение в pdf и закрытие страницы в файле, чтобы освободить память, связанную с этим объектом Figure в Matplotlib pdf.savefig(fig_table) plt.close(fig_table) # Создание фигуры и осей для круговой диаграммы fig_circle, ax_circle = plt.subplots(figsize=(10, 8)) # 10 - ширина, 8 - высота фигуры в дюймах #Задание заголовка ax_circle.set_title(f"Самые часто покупаемые товары за 1-е полгода") # Строим круговую диаграмму с отображением данных ax_circle.pie(df['КОЛИЧЕСТВО'], labels=df['ТОВАР'], autopct='%1.1f%%', startangle=90) # Устанавливаем соотношение сторон диаграммы как 1, чтобы круг выглядел как круг, а не эллипс plt.axis('equal') #Сохранение в pdf и закрытие страницы в файле, чтобы освободить память, связанную с этим объектом Figure в Matplotlib pdf.savefig(fig_circle) plt.close(fig_circle) return current_date ''' with open('/root/airflow/dags/ANNA_popular_products_PDF_report_task.py', 'w') as f: f.write(code) !cp ~/airflow/dags/ANNA_popular_products_PDF_report_task.py /content/airflow/dags/ANNA_popular_products_PDF_report_task.py
Наконец, напишем Python-код для DAG, включив туда ранее созданные задачи:
########################из разных файлов DAG чтения из PostgreSQL и записи в PDF-файл############################ from google.colab import files code = ''' from airflow import DAG from airflow.operators.python import PythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.providers.telegram.operators.telegram import TelegramOperator from airflow.providers.postgres.operators.postgres import PostgresOperator from datetime import datetime, timedelta from ANNA_popular_products_PDF_report_task import write_circle_to_pdf_function from ANNA_HIST_report_PDF_task import write_hist_to_pdf_function default_args = { 'owner': 'airflow', 'start_date': datetime.now() - timedelta(days=1), 'retries': 1 } dag = DAG( dag_id='ANNA_DAG_PG_2_TG', default_args=default_args, schedule_interval='@daily' #schedule_interval='*/5 * * * *' ) start_task = DummyOperator(task_id='start_task', dag=dag) extract_hist_task = PostgresOperator( task_id='month_income_extract_task', postgres_conn_id='my_postgres_conn', sql="sql/SQL-zapros_month_income.sql", dag=dag ) extract_circle_task = PostgresOperator( task_id='popular_products_extract_task', postgres_conn_id='my_postgres_conn', sql="sql/SQL-zapros_popular_products.sql", dag=dag ) make_report_HIST_task = PythonOperator( task_id='write_hist_to_task', provide_context=True, python_callable=write_hist_to_pdf_function, dag=dag ) make_report_CIRCLE_task = PythonOperator( task_id='write_circle_to_pdf_task', provide_context=True, python_callable=write_circle_to_pdf_function, dag=dag ) now=datetime.now() send_notification_task = TelegramOperator( task_id='send_notification_task', telegram_conn_id='my_TG_conn', text='Reports has been created at {}'.format( datetime.now().strftime("%m/%d/%Y, %H:%M:%S") ), dag=dag ) end_task = DummyOperator( task_id='end', dag=dag, ) start_task >> extract_hist_task extract_hist_task >> make_report_HIST_task make_report_HIST_task >> send_notification_task start_task >> extract_circle_task extract_circle_task >> make_report_CIRCLE_task make_report_CIRCLE_task >> send_notification_task send_notification_task >> end_task ''' with open('/root/airflow/dags/ANNA_DAG_PG_2_TG.py', 'w') as f: f.write(code) !cp ~/airflow/dags/ANNA_DAG_PG_2_TG.py /content/airflow/dags/ANNA_DAG_PG_2_TG.py !airflow dags unpause ANNA_DAG_PG_2_TG
Чтобы созданный DAG отобразился в GUI веб-сервера, запустим планировщик AirFlow:
!airflow scheduler --daemon
При отсутствии ошибок DAG отобразится в списке всех конвейеров и его можно запустить:
В результате успешного выполнения этого конвейера из 7 задач в заданной директории временного пространства Google Colab создадутся необходимые отчеты.
А также сообщение об успешном создании уйдет в Telegram-бот
Сами PDF-файлы отчетов доступны по ссылкам:
Узнайте больше про Apache AirFlow и его практическое использование в дата-инженерии, машинном обучении и аналитике больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: