Как использовать в одном DAG Apache AirFlow задачи из разных Python-файлов

AirFlow PostreSQL, DAG Apache AirFlow PostreSQL Colab пример, Apache AirFlow GUI Google Colab, обучение Apache AirFlow, курсы дата-инженеров, обучение разработчиков Big Data, разработка AirFlow конвейеров, Школа Больших Данных Учебный Центр Коммерсант

Простой пример объединения нескольких задач, описанных в разных Python-файлах, в единый DAG Apache AirFlow на кейсе выгрузки из реляционной базы PostgreSQL данных о выполненных заказах за последние 100 дней. Разработка и запуск кода в Google Colab.

Объединение задач из отдельных Python-файлах в один DAG AirFlow

Я уже показывала, как построить простой ETL-конвейер в Apache AirFlow на примере выгрузки старых заказов из базы данных интернет-магазина на PostgreSQL. В тот раз файл DAG включал у меня не только последовательность задач, но и их описание. Управлять таким длинным файлом совершенно неудобно: лучше описывать каждую задачу в отдельном py-файле, выстраивая их последовательность в общем конвейере. Как это сделать в Google Colab, рассмотрим далее.

Поскольку экземпляр Apache AirFlow   у меня будет развернут на удаленной машине Google Colab, придется прокинуть туннель, чтобы сделать ее локальный хост доступным извне по URL. Как обычно, для этого я использую утилиту ngrok.

В этот раз мой конвейер, т.е. DAG состоит из следующих задач:

  • start_task — фиктивный оператор, представляющий начало DAG;
  • read_task – оператор Python, который выполняет функцию read_from_PostgreSQL_function для чтения данных из базы данных PostgreSQL. Для параметра Provide_context установлено значение True, чтобы получить доступ к контексту выполнения.
  • write_task – оператор Python, который выполняет функцию write_to_JSON_function для записи данных, ранее считанных из БД, в файл JSON. Для параметра Provide_context также установлено значение True, поскольку результаты задачи read_task будут входом для write_task.
  • end_task – фиктивный оператор, представляющий конец DAG.

Код DAG-файл выглядит следующим образом:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime, timedelta
from ANNA_read_task import read_from_PostgreSQL_function
from ANNA_write_task import write_to_JSON_function

default_args = {
    'owner': 'airflow',
    'start_date': datetime.now() - timedelta(days=1),
    'retries': 1
}

dag = DAG(
    dag_id='ANNA_DAG_from_PotgreSQL_2_JSON_file',
    default_args=default_args,
    schedule_interval='@daily'
)

start_task = DummyOperator(task_id='start_task', dag=dag)

read_task = PythonOperator(
    task_id='read_task',
    provide_context=True,
    python_callable=read_from_PostgreSQL_function,
    dag=dag
)

write_task = PythonOperator(
    task_id='write_task',
    provide_context=True,
    python_callable=write_to_JSON_function,
    dag=dag
)

end_task = DummyOperator(task_id='end_task', dag=dag)

start_task >> read_task >> write_task >> end_task

Задачи read_task и write_task описаны в отдельных файлах. Например, код задачи чтения из базы данных выполненных заказов (со статусом 5) за последние 100 дней выглядит так:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import os
import psycopg2
import requests

def read_from_PostgreSQL_function(**kwargs):
    connection_string = 'postgres://my-database'
    conn = psycopg2.connect(connection_string)

    select_query = """
    SELECT
        orders.id,
        orders.date,
        orders.sum,
        TRIM(product.name),
        product.price,
        order_product.quantity,
        TRIM(provider.name),
        TRIM(customer.name),
        TRIM(customer.phone),
        TRIM(customer.email),
        TRIM(customer_states.name),
        delivery.date,
        TRIM(delivery.address),
        delivery.price
    FROM
        orders
        JOIN order_product ON order_product.order = orders.id
        JOIN product ON product.id=order_product.product
        JOIN provider ON provider.id = product.provider
        JOIN customer ON orders.customer = customer.id
        JOIN customer_states ON customer_states.id = customer.state
        JOIN order_states ON order_states.id = orders.state
        JOIN delivery ON orders.delivery = delivery.id
     WHERE (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-100) AND (CURRENT_DATE))
    """

    cur = conn.cursor()
    cur.execute(select_query)
    results = cur.fetchall()

    cur.close()
    conn.close()

    return results

А код записи этих данных в JSON-файл в задаче write_task выглядит так:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import json
import os
import psycopg2

def write_to_JSON_function(**kwargs):
    results = kwargs['ti'].xcom_pull(task_ids='read_task')

    orders = []
    order_dict = {}

    for result in results:
        order_id, order_date, order_sum, product_name, product_price, product_quantity, product_provider, customer_name, customer_phone, customer_email, customer_status, delivery_date, delivery_address, delivery_price = result

        if order_id not in order_dict:
            order_dict[order_id] = {
                'order_id': order_id,
                'order_date': str(order_date),
                'order_sum': order_sum,
                'products': [],
                'customer': {
                    'customer_name': customer_name,
                    'customer_phone': customer_phone,
                    'customer_email': customer_email,
                    'customer_status': customer_status
                },
                'delivery': {
                    'delivery_date': str(delivery_date),
                    'delivery_address': delivery_address,
                    'delivery_price': delivery_price
                }
            }

        product = {
            'product_name': product_name,
            'product_price': product_price,
            'product_quantity': product_quantity,
            'product_provider': product_provider
        }

        order_dict[order_id]['products'].append(product)

    orders = list(order_dict.values())

    filename = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.json")
    filepath = os.path.join('/content/airflow/files/delivered', filename)

    with open(filepath, 'w') as f:
        json.dump(orders, f)
        f.close()

Если необходимо модифицировать эти задачи или добавить в DAG новые, это удобнее сделать, описав каждую задачу в отдельном файле. А вместе они запускаются в пакетном конвейере в виде DAG Apache AirFlow.

DAG Apache AirFlow
Успешно выполненный DAG Apache AirFlow

Реализация в Colab

Поскольку я запускаю код в интерактивной среде Google Colab, он будет более многословным. Например, сперва надо установить все необходимые библиотеки и импортировать пакеты:

###############################################ячейка №1 в Google Colab#############################
#Установка Apache Airflow и инициализация базы данных.
!pip install apache-airflow
!airflow initdb

#Установка инструмента ngrok для создания безопасного туннеля для доступа к веб-интерфейсу Airflow из любого места
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
!pip install pyngrok

#импорт модулей
from pyngrok import ngrok
import sys
import os
import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

from google.colab import drive
drive.mount('/content/drive')
os.makedirs('/content/airflow/dags', exist_ok=True)
dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags")
os.makedirs(dags_folder, exist_ok=True)

#Получение версии установленного Apache Airflow
!airflow version

Затем следует запустить веб-сервер AirFlow в режиме демона, чтобы выполнение кода в этой ячейке Colab не блокировало другие:

!airflow webserver --port 8888 --daemon

Далее запускаем тунелирование с ngrok, задав токен аутентификации этой службы:

#Задание переменной auth_token для аутентификации в сервисе ngrok.
auth_token = " токен аутентификации, взятый с https://dashboard.ngrok.com/signup " 

#Аутентификация в сервисе ngrok с помощью auth_token
os.system(f"ngrok authtoken {auth_token}")

#Запуск ngrok, который создаст публичный URL для сервера через туннель
#для доступа к веб-интерфейсу Airflow из любого места.
#addr="8888" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP
public_url = ngrok.connect(addr="8888", proto="http")

#Вывод публичного URL для доступа к веб-интерфейсу Airflow
print("Адрес Airflow GUI:", public_url)

В результате в области вывода Colab появится URL-адрес, по которому будет доступен ранее запущенный веб-сервер AirFlow. Потом необходимо создать пользователя Apache AirFlow, установить логин и пароль. Моего пользователя традиционно зовут anna с ролью администратора и паролем password:

!airflow db init #Инициализация базы данных Airflow
!airflow upgradedb #Обновление базы данных Airflow

#Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, адресом электронной почты anna.doe@example.com и паролем password.
#Этот пользователь будет иметь роль Admin, которая дает полный доступ к интерфейсу Airflow.
!airflow users create --username anna --firstname Anna --lastname Anna --email anna@example.com --role Admin --password password

Теперь можно войти в веб-интерфейс пакетного оркестратора, используя эти учетные данные.

Чтобы управлять задачами и самим DAG программным образом, я дополнила их код инструкциями записи в файл, сохранения в рабочую директорию AirFlow и копирования в пользовательскую папку для просмотра. Например, Python-код задачи чтения данных из PostgreSQL выглядит так:

########################Задача чтения из PostgreSQL#######################################
import psycopg2

from google.colab import files

code = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import os
import psycopg2
import requests

def read_from_PostgreSQL_function(**kwargs):
    connection_string = 'postgres://my-database'
    conn = psycopg2.connect(connection_string)

    select_query = """
    SELECT
        orders.id,
        orders.date,
        orders.sum,
        TRIM(product.name),
        product.price,
        order_product.quantity,
        TRIM(provider.name),
        TRIM(customer.name),
        TRIM(customer.phone),
        TRIM(customer.email),
        TRIM(customer_states.name),
        delivery.date,
        TRIM(delivery.address),
        delivery.price
    FROM
        orders
        JOIN order_product ON order_product.order = orders.id
        JOIN product ON product.id=order_product.product
        JOIN provider ON provider.id = product.provider
        JOIN customer ON orders.customer = customer.id
        JOIN customer_states ON customer_states.id = customer.state
        JOIN order_states ON order_states.id = orders.state
        JOIN delivery ON orders.delivery = delivery.id
     WHERE (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-100) AND (CURRENT_DATE))
    """

    cur = conn.cursor()
    cur.execute(select_query)
    results = cur.fetchall()

    cur.close()
    conn.close()

    return results

'''

with open('/root/airflow/dags/ANNA_read_task.py', 'w') as f:
    f.write(code)

!cp ~/airflow/dags/ANNA_read_task.py /content/airflow/dags/ANNA_read_task.py

А задача записи результатов поиска в JSON-файл так:

############################задача записи в JSON-файл##################################
from google.colab import files

code = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import json
import os

def write_to_JSON_function(**kwargs):
    results = kwargs['ti'].xcom_pull(task_ids='read_task')

    orders = []
    order_dict = {}

    for result in results:
        order_id, order_date, order_sum, product_name, product_price, product_quantity, product_provider, customer_name, customer_phone, customer_email, customer_status, delivery_date, delivery_address, delivery_price = result

        if order_id not in order_dict:
            order_dict[order_id] = {
                'order_id': order_id,
                'order_date': str(order_date),
                'order_sum': order_sum,
                'products': [],
                'customer': {
                    'customer_name': customer_name,
                    'customer_phone': customer_phone,
                    'customer_email': customer_email,
                    'customer_status': customer_status
                },
                'delivery': {
                    'delivery_date': str(delivery_date),
                    'delivery_address': delivery_address,
                    'delivery_price': delivery_price
                }
            }

        product = {
            'product_name': product_name,
            'product_price': product_price,
            'product_quantity': product_quantity,
            'product_provider': product_provider
        }

        order_dict[order_id]['products'].append(product)

    orders = list(order_dict.values())

    filename = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.json")
    filepath = os.path.join('/content/airflow/files/delivered', filename)

    with open(filepath, 'w') as f:
        json.dump(orders, f)
        f.close()

'''

with open('/root/airflow/dags/ANNA_write_task.py', 'w') as f:
    f.write(code)

!cp ~/airflow/dags/ANNA_write_task.py /content/airflow/dags/ANNA_write_task.py

Сам DAG-файл создается следующим кодом:

########################из разных файлов DAG чтения из PostgreSQL и формирования JSON-файла############################
from google.colab import files

code = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python import BranchPythonOperator
from datetime import datetime, timedelta
from ANNA_read_task import read_from_PostgreSQL_function
from ANNA_write_task import write_to_JSON_function

default_args = {
    'owner': 'airflow',
    'start_date': datetime.now() - timedelta(days=1),
    'retries': 1
}

dag = DAG(
    dag_id='ANNA_DAG_from_PotgreSQL_2_JSON_file',
    default_args=default_args,
    schedule_interval='@daily'
)

start_task = DummyOperator(task_id='start_task', dag=dag)

read_task = PythonOperator(
    task_id='read_task',
    provide_context=True,
    python_callable=read_from_PostgreSQL_function,
    dag=dag
)

write_task = PythonOperator(
    task_id='write_task',
    provide_context=True,
    python_callable=write_to_JSON_function,
    dag=dag
)

end_task = DummyOperator(task_id='end_task', dag=dag)

start_task >> read_task >> write_task >> end_task

'''

with open('/root/airflow/dags/ANNA_DAG_from_PotgreSQL_2_JSON_file.py', 'w') as f:
    f.write(code)

!cp ~/airflow/dags/ANNA_DAG_from_PotgreSQL_2_JSON_file.py /content/airflow/dags/ANNA_DAG_from_PotgreSQL_2_JSON_file.py
!airflow dags unpause ANNA_DAG_from_PotgreSQL_2_JSON_file

Чтобы увидеть этот DAG в списке всех конвейеров, нужно запустить планировщик AirFlow также в фоновом режиме:

!airflow scheduler --daemon

При отсутствии ошибок DAG появится в веб-интерфейсе:

Перечень DAG AirFlow в веб-интерфейсе фреймворка
Перечень DAG AirFlow в веб-интерфейсе фреймворка

Передача данных между задачами происходит через механизм XCom, список всех этих объектов что можно также посмотреть в веб-интерфейсе:

Список объектов XCom
Список объектов XCom

После успешного выполнения DAG в пользовательской папке Colab появятся JSON-файлы с данными:

Запуск Python-скриптов для работы с Apache AirFlow в Google Colab
Запуск Python-скриптов для работы с Apache AirFlow в Google Colab

Продолжение работы с этими скриптами показано в новой статье на примере загрузки JSON-документов в документо-ориентированное NoSQL-хранилище Elasticsearch, развернутое в облачной платформе bonsai.io

Узнайте больше про Apache AirFlow  и его практическое использование в дата-инженерии и аналитике больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту