Параллельное выполнение задач в DAG Apache AirFlow: практический пример

Apache AirFlow примеры курсы обучение, Apache Airflow для дата-инженера и администратора кластера, обучение Apache Airflow, курсы Airflow, как работает Apache Airflow, исполнители задач Airflow, Школа Больших Данных Учебный Центр Коммерсант

Сегодня на практическом примере посмотрим, как запускать в DAG Apache AirFlow параллельное исполнение нескольких задач, применим пару лучших практик реализации ETL-конвейера для работы с PostgreSQL, а также разберем неоднозначности программного добавления соединений с внешними системами.

Постановка задачи

Предположим, необходимо получить аналитику по продажам товаров интернет-магазина, выгрузив данные из PostgreSQL в виде табличных и графических PDF-отчетов. Пример простого Python-кода для этой задачи я показывала в блоге нашей Школы прикладного бизнес-анализа. Чтобы показать, как работает распараллеливание задач в рамках одного DAG Apache AirFlow, будем формировать 2 разных отчета: один с гистограммой распределения дохода по месяцам, а другой – по самым продаваемым товарам. После формирования обоих отчетов в Телеграм-бот должно приходить уведомление об успешном выполнении этих задач.

Как обычно, в качестве примера я возьму базу данных своего ранее спроектированного и реализованного интернет-магазина, которая имеет следующую схему для PostgreSQL

Физическая модель данных интернет-магазина для PostgreSQL
Физическая модель данных интернет-магазина для PostgreSQL

Итоговый отчет в виде неизменяемого PDF-файла должен содержать данные о помесячном доходе и количестве заказов, а также сведения о наиболее популярных, т.е. самых часто продаваемых товарах в текстовой и графической формах. Для визуализации помесячной выручки отлично подходят гистограммы, а для оценки популярности конкретного товара относительно других – круговая диаграмма. Все эти и другие виды графиков поддерживает Python-библиотека matplotlib. Помимо функций для работы с двумерной и трехмерной графикой, она также позволяет формировать PDF-документ с текстовыми, табличными, численными и графическими данными. Поэтому для решаемой задачи будем использовать именно ее методы API, формируя задачи для DAG AirFlow.

Реализация DAG AirFlow

Поскольку я запускаю AirFlow как Python-библиотеку в интерактивной среде Google Colab, сперва необходимо установить требуемые зависимости:

#Установка Apache Airflow и инициализация базы данных.
!pip install apache-airflow
!airflow initdb

#Установка инструмента ngrok для создания безопасного туннеля для доступа к веб-интерфейсу Airflow из любого места
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
!pip install pyngrok

#установка провайдеров
!pip install apache-airflow-providers-telegram
!pip install psycopg2-binary
!pip install apache-airflow-providers-postgres

#импорт модулей
from pyngrok import ngrok
import sys
import os
import json
import datetime
from airflow import DAG
from airflow.providers.telegram.operators.telegram import TelegramOperator

Создадим директории, где будут храниться Python-файлы отдельных задач и SQL-запросов.

from google.colab import drive
drive.mount('/content/drive')
os.makedirs('/content/airflow/dags', exist_ok=True) #создание директории с py-файлами

os.makedirs('/content/airflow/files/reports', exist_ok=True) #создание директории с отчетами

os.makedirs('/content/airflow/sql', exist_ok=True) #создание директории с отчетами

dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags")
os.makedirs(dags_folder, exist_ok=True)
os.makedirs("/root/airflow/dags/sql", exist_ok=True)

Далее запустим веб-сервер AirFlow

#запуска веб-сервера Apache Airflow на порту 8888. Веб-сервер Airflow предоставляет пользовательский интерфейс для управления DAGами,
#просмотра логов выполнения задач, мониторинга прогресса выполнения
!airflow webserver --port 8888 --daemon

Для доступа к веб-серверу AirFlow извне необходимо пробросить туннель для раскрытия порта, на котором он запущен. Как обычно, я делаю это с помощью утилиты ngrok:

#Аутентификация в сервисе ngrok с помощью auth_token
os.system(f"ngrok authtoken {auth_token}")

#Запуск ngrok, который создаст публичный URL для сервера через туннель
#для доступа к веб-интерфейсу Airflow из любого места.
#addr="8888" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP
public_url = ngrok.connect(addr="8888", proto="http")

#Вывод публичного URL для доступа к веб-интерфейсу Airflow
print("Адрес Airflow GUI:", public_url)
Создание публичного URl для доступа к веб-серверу AirFlow извне
Создание публичного URl для доступа к веб-серверу AirFlow извне

Создадим пользователя AirFlow, задав ему логин и пароль:

!airflow db init #Инициализация базы данных Airflow
!airflow upgradedb #Обновление базы данных Airflow

#Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, адресом электронной почты anna.doe@example.com и паролем password.
#Этот пользователь будет иметь роль Admin, которая дает полный доступ к интерфейсу Airflow.
!airflow users create --username anna --firstname Anna --lastname Anna --email anna@example.com --role Admin --password password

Поскольку указывать параметры подключения к внешним системам, таким как СУБД PostgreSQL или Telegram непосредственно в коде DAG считается плохой практикой, зададим это программным образом. Например, мой экземпляр PostgreSQL развернут в облачной платформе Neon:

######################################добавляем соединение с PG#################################
from airflow import settings
from airflow.models import Connection
from sqlalchemy.orm import sessionmaker

# Установите параметры соединения
conn_id = "my_postgres_conn"
conn_type = "postgres"
host = "my-v-host.aws.neon.tech"
schema = "neondb"
login = "username"
password = "password"
port = 5432

# Создайте объект соединения
new_conn = Connection(
    conn_id=conn_id,
    conn_type=conn_type,
    host=host,
    schema=schema,
    login=login,
    password=password,
    port=port
)

# Создайте сессию
Session = sessionmaker(bind=settings.engine)
session = Session()

if not session.query(Connection).filter(Connection.conn_id == conn_id).first():
    # Добавьте соединение в базу данных, если его ещё нет
    session.add(new_conn)
    session.commit()

session.close()

Примечательно, что при задании параметров соединения в ключе schema надо указать название БД, а не схему:

Добавление подключения к PostgreSQL
Добавление подключения к PostgreSQL

Аналогичным образом определим параметры подключения к TG-боту:

######################################добавляем соединение с TG#################################
from airflow import settings
from airflow.models import Connection
from sqlalchemy.orm import sessionmaker

# Создайте объект соединения
new_conn = Connection(
    conn_id='my_TG_conn',
    conn_type='http',
    login='login',
    password='API-token TG',
    host='telegram_chat_id',  # telegram_chat_id
    schema='',  # нет схемы
    port=None,  # нет порта
    extra=json.dumps({"telegram_token": telegram_token, "telegram_chat_id": telegram_chat_id})
)

# Создайте сессию
Session = sessionmaker(bind=settings.engine)
session = Session()

# Используйте conn_id из объекта соединения для проверки
if not session.query(Connection).filter(Connection.conn_id == new_conn.conn_id).first():
    # Добавьте соединение в базу данных, если его ещё нет
    session.add(new_conn)
    session.commit()

session.close()

Созданные подключения отобразятся в GUI веб-сервера AirFlow:

Подключения к внешним системам
Подключения к внешним системам

Аналогично подключениям, вписывать текст SQL-запроса непосредственно в Python-файл задачи DAG считается плохой практикой. Лучше описать его в отдельном sql-файле, который потом будет использоваться как параметр для задачи оператора PostgreSQL. Чтобы считать данные из файла с SQL-запросами, AirFlow должен иметь доступ к нему, т.е. файл должен размещаться в той же среде, где работает AirFlow и должен быть доступен для чтения. При использовании PostgresOperator и указания пути к файлу с SQL-запросом, AirFlow ожидает, что он будет находиться внутри директории dags или в папке, которая указана в настройках sql для Airflow. По умолчанию SQL-файлы ищутся в папке dags и подпапках. Поэтому будем сохранять их в ранее созданной директории /root/airflow/dags/sql:

Файл с SQL-запросом получения помесячной выручки из БД создается таким образом:

###################################SQL-запрос получения помесячной выручки из БД (сумма всех заказов, сгруппированных по месяцам)################
from google.colab import files

code = '''
    SELECT to_char(orders.date, 'Month') AS mes, SUM(orders.sum) AS income, COUNT(orders.sum) AS quantity
            FROM orders
            GROUP BY mes
            ORDER BY (MIN(orders.date));
'''

with open('/root/airflow/dags/sql/SQL-zapros_month_income.sql', 'w') as f:
    f.write(code)

!cp ~/airflow/dags/sql/SQL-zapros_month_income.sql /content/airflow/sql/SQL-zapros_month_income.sql

Файл с SQL-запросом получения наиболее популярных товаров создается так:

###################################SQL-запрос получения больше всего востребованных товаров из БД 
######################(тех товаров во всех заказах, которых купили больше 500 единиц)################
from google.colab import files

code = '''
SELECT product.name AS tovar, SUM(order_product.quantity) AS kolichestvo
            FROM order_product
            JOIN product on product.id=order_product.product
            JOIN orders on orders.id=order_product.order
            WHERE (EXTRACT(MONTH FROM orders.date) BETWEEN 1 AND 6)
            GROUP BY tovar
            HAVING SUM(order_product.quantity)>500
            ORDER BY kolichestvo;
'''

with open('/root/airflow/dags/sql/SQL-zapros_popular_products.sql', 'w') as f:
    f.write(code)

!cp ~/airflow/dags/sql/SQL-zapros_popular_products.sql /content/airflow/sql/SQL-zapros_popular_products.sql

Создадим Python-файл с задачей формирования гистрограммы по данным из SQL-запроса помесячной выручки:

############################задача формирования PDF-документа с гистограммой##################################
from google.colab import files

code = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import pandas as pd
import datetime
import os

def write_hist_to_pdf_function(**kwargs):
    results = kwargs['ti'].xcom_pull(task_ids='month_income_extract_task')

    for result in results:
        df = pd.DataFrame(results, columns=['МЕСЯЦ', 'ДОХОД', 'КОЛИЧЕСТВО ЗАКАЗОВ'])

    # Получаем текущую дату
    current_date = datetime.datetime.now()
    
    # Форматируем дату в строку (например, '2023-03-15')
    date_str = current_date.strftime('%Y-%m-%d')

    filename = '/content/airflow/files/HIST_report_' + date_str + '.pdf'

    with PdfPages(filename) as pdf:
            
            fig_table, ax_table = plt.subplots(figsize=(6, 10))  # Выберите подходящий размер фигуры
            
            # Задание заголовка
            ax_table.set_title(f"Доходы по месяцам")
            # Задание содержимого таблицы со статистикой
            ax_table.axis('tight') # задаем границы области для таблицы так, чтобы они плотно обрамляли содержимое
            ax_table.axis('off') # выключаем отображение осей для таблицы (нет границ и делений)
            table = ax_table.table(cellText=df.values, colLabels=df.columns, loc='center') #задаем содержимое ячеек таблицы, заголовки столбцов и расположение таблицы
            table.set_fontsize(10) # устанавливаем размер шрифта для текста в таблице вручную
            table.scale(1.2, 1.2)  # Можно изменить масштаб таблицы для лучшего отображения
            # Сохранение в pdf и закрытие страницы в файле, чтобы освободить память, связанную с этим объектом Figure в Matplotlib
            pdf.savefig(fig_table)
            plt.close(fig_table)

            # Создание фигуры и осей для гистограммы
            fig_hist, ax_hist = plt.subplots(figsize=(14, 6))  # Выберите подходящий размер фигуры
            # Установим метки на оси X с названиями месяцев
            ax_hist.set_xticks(range(1+len(df['МЕСЯЦ'])))
            # Строим столбики гистограммы
            ax_hist.bar(df['МЕСЯЦ'], df['ДОХОД'], width=0.4, edgecolor="white", label='доход за месяц', linewidth=0.7)
            # Рисуем число заказов
            ax_hist.plot(df['МЕСЯЦ'], 5000*df['КОЛИЧЕСТВО ЗАКАЗОВ'], 'r', label='количество заказов (в масштабе x5000)', linewidth=2.0)
            # Задание содержимого таблицы со статистикой
            ax_hist.set_title(f"Доход от заказов и их количество по месяцам")
            #Задание осей гистограммы
            ax_hist.set_xlabel('Месяц')
            ax_hist.set_ylabel('Доход')
            # Добавляем легенду на график
            ax_hist.legend()
            #Сохранение в pdf и закрытие страницы в файле, чтобы освободить память, связанную с этим объектом Figure в Matplotlib
            pdf.savefig(fig_hist)
            plt.close(fig_hist)
    
    return current_date

'''

with open('/root/airflow/dags/ANNA_HIST_report_PDF_task.py', 'w') as f:
    f.write(code)

!cp ~/airflow/dags/ANNA_HIST_report_PDF_task.py /content/airflow/dags/ANNA_HIST_report_PDF_task.py

Python-файл с задачей формирования круговой диаграммы по данным из SQL-запроса о наиболее популярных товарах:

############################задача формирования PDF-документа с круговой диаграммой##################################
from google.colab import files

code = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import matplotlib.pyplot as plt
from matplotlib.backends.backend_pdf import PdfPages
import pandas as pd
import datetime
import os

def write_circle_to_pdf_function(**kwargs):
    results = kwargs['ti'].xcom_pull(task_ids='popular_products_extract_task')

    for result in results:
        df = pd.DataFrame(results, columns=['ТОВАР', 'КОЛИЧЕСТВО'])

    # Получаем текущую дату
    current_date = datetime.datetime.now()
    
    # Форматируем дату в строку (например, '2023-03-15')
    date_str = current_date.strftime('%Y-%m-%d')

    filename = '/content/airflow/files/CIRCLE_report_' + date_str + '.pdf'

    with PdfPages(filename) as pdf:

          fig_table, ax_table = plt.subplots(figsize=(6, 10))  # Выберите подходящий размер фигуры
      
          #Задание заголовка
          ax_table.set_title(f"Самые часто покупаемые товары за 1-ые полгода")    
          # Задание содержимого таблицы со статистикой
          ax_table.axis('tight') #задаем границы области для таблицы так, чтобы они плотно обрамляли содержимое
          ax_table.axis('off') #выключаем отображение осей для таблицы (нет границ и делений)
          table = ax_table.table(cellText=df.values, colLabels=df.columns, loc='center') #задаем содержимое ячеек таблицы, заголовки столбцов и расположение таблицы
          #table.auto_set_font_size(True)
          table.set_fontsize(8) #устанавливаем размер шрифта для текста в таблице вручную
          table.scale(1.2, 1.2)  # Можно изменить масштаб таблицы для лучшего отображения
          #Сохранение в pdf и закрытие страницы в файле, чтобы освободить память, связанную с этим объектом Figure в Matplotlib
          pdf.savefig(fig_table)
          plt.close(fig_table)

          # Создание фигуры и осей для круговой диаграммы
          fig_circle, ax_circle = plt.subplots(figsize=(10, 8)) # 10 - ширина, 8 - высота фигуры в дюймах
          #Задание заголовка
          ax_circle.set_title(f"Самые часто покупаемые товары за 1-е полгода")
          # Строим круговую диаграмму с отображением данных
          ax_circle.pie(df['КОЛИЧЕСТВО'], labels=df['ТОВАР'], autopct='%1.1f%%', startangle=90)
          # Устанавливаем соотношение сторон диаграммы как 1, чтобы круг выглядел как круг, а не эллипс
          plt.axis('equal')
          #Сохранение в pdf и закрытие страницы в файле, чтобы освободить память, связанную с этим объектом Figure в Matplotlib
          pdf.savefig(fig_circle)
          plt.close(fig_circle)
    
    return current_date

'''

with open('/root/airflow/dags/ANNA_popular_products_PDF_report_task.py', 'w') as f:
    f.write(code)

!cp ~/airflow/dags/ANNA_popular_products_PDF_report_task.py /content/airflow/dags/ANNA_popular_products_PDF_report_task.py

Наконец, напишем Python-код для DAG, включив туда ранее созданные задачи:

########################из разных файлов DAG чтения из PostgreSQL и записи в PDF-файл############################

from google.colab import files

code = '''

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.telegram.operators.telegram import TelegramOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
from datetime import datetime, timedelta
from ANNA_popular_products_PDF_report_task import write_circle_to_pdf_function
from ANNA_HIST_report_PDF_task import write_hist_to_pdf_function

default_args = {
    'owner': 'airflow',
    'start_date': datetime.now() - timedelta(days=1),
    'retries': 1
}

dag = DAG(
    dag_id='ANNA_DAG_PG_2_TG',
    default_args=default_args,
    schedule_interval='@daily'
    #schedule_interval='*/5 * * * *'
)

start_task = DummyOperator(task_id='start_task', dag=dag)

extract_hist_task = PostgresOperator(
    task_id='month_income_extract_task',
    postgres_conn_id='my_postgres_conn',
    sql="sql/SQL-zapros_month_income.sql",
    dag=dag
)

extract_circle_task = PostgresOperator(
    task_id='popular_products_extract_task',
    postgres_conn_id='my_postgres_conn',
    sql="sql/SQL-zapros_popular_products.sql",
    dag=dag
)

make_report_HIST_task = PythonOperator(
    task_id='write_hist_to_task',
    provide_context=True,
    python_callable=write_hist_to_pdf_function,
    dag=dag
)

make_report_CIRCLE_task = PythonOperator(
    task_id='write_circle_to_pdf_task',
    provide_context=True,
    python_callable=write_circle_to_pdf_function,
    dag=dag
)

now=datetime.now()

send_notification_task = TelegramOperator(
    task_id='send_notification_task',
    telegram_conn_id='my_TG_conn',
    text='Reports has been created at {}'.format(
        datetime.now().strftime("%m/%d/%Y, %H:%M:%S")
    ),
    dag=dag
)

end_task = DummyOperator(
    task_id='end',
    dag=dag,
)

start_task >> extract_hist_task
extract_hist_task >> make_report_HIST_task 
make_report_HIST_task >> send_notification_task 
start_task >> extract_circle_task
extract_circle_task >> make_report_CIRCLE_task
make_report_CIRCLE_task >> send_notification_task 
send_notification_task >> end_task

'''

with open('/root/airflow/dags/ANNA_DAG_PG_2_TG.py', 'w') as f:
    f.write(code)

!cp ~/airflow/dags/ANNA_DAG_PG_2_TG.py /content/airflow/dags/ANNA_DAG_PG_2_TG.py
!airflow dags unpause ANNA_DAG_PG_2_TG

Чтобы созданный DAG отобразился в GUI веб-сервера, запустим планировщик AirFlow:

!airflow scheduler --daemon

При отсутствии ошибок DAG отобразится в списке всех конвейеров и его можно запустить:

Исполнение DAG AirFlow
Исполнение DAG AirFlow

В результате успешного выполнения этого конвейера из 7 задач в заданной директории временного пространства Google Colab создадутся необходимые отчеты.

Сгенерированные PDF-отчеты
Сгенерированные PDF-отчеты

А также сообщение об успешном создании уйдет в Telegram-бот

Сообщения в ТГ-бот
Сообщения в ТГ-бот

Сами PDF-файлы отчетов доступны по ссылкам:

Узнайте больше про Apache AirFlow и его практическое использование в дата-инженерии, машинном обучении и аналитике больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту