Разгружаем PostgreSQL: ETL-конвейер с Apache AirFlow в Google Colab

Разгружаем PostgreSQL: ETL-конвейер с Apache AirFlow в Google Colab

    Сегодня усложним пример из прошлой статьи с простым ETL-конвейером, который добавлял в базу данных интернет-магазина новые записи о клиентах, сгенерированные с помощью библиотеки Faker. Разбираем, как удалить из PostgreSQL данные об успешно доставленных заказах за прошлый месяц, предварительно сохранив их в JSON-файл с многоуровневой структурой. Пишем и запускаем DAG Apache AirFlow в Google Colab.

    Постановка задачи

    Экземпляр базы данных интернет-магазина, пример проектирования которой от концептуальной до физической модели для PostgreSQL я рассматривала здесь, развернут в облаке serverless-платформе Neon. Схема физической модели данных выглядит следующим образом:

    Физическая модель данных PostgreSQL
    Физическая модель данных для PostgreSQL

    База данных уже наполнена записями. Предположим, чтобы «разгрузить» базу данных, ежемесячно необходимо удалять из нее сведения об успешно доставленных заказах за прошлый месяц, вместе с данными о продуктах, которые входят в этот заказа, а также, клиентах, сделавших заказы. Разумеется, перед удалением эти данные следует сохранить, записав их в архивное файловое хранилище. Далее сделаем это с помощью ETL-конвейера Apache AirFlow, но сперва проверим наличие в базе данных, отвечающих условиям выборки с помощью следующего SQL-запроса:

    SELECT
        orders.id,
        orders.date,
        orders.sum,
        TRIM(product.name),
        product.price,
        order_product.quantity,
        TRIM(provider.name),
        TRIM(customer.name),
        TRIM(customer.phone),
        TRIM(customer.email),
        TRIM(customer_states.name),
        delivery.date,
        TRIM(delivery.address),
        delivery.price
    FROM
        orders 
        JOIN order_product ON order_product.order = orders.id
        JOIN product ON product.id=order_product.product
        JOIN provider ON provider.id = product.provider
        JOIN customer ON orders.customer = customer.id
        JOIN customer_states ON customer_states.id = customer.state
        JOIN order_states ON order_states.id = orders.state
        JOIN delivery ON orders.delivery = delivery.id
     WHERE (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-30) AND (CURRENT_DATE))

    В этом запросе использована функция TRIM() для обрезки «лишних» пробелов, которые возникают в полях таблиц с типом данных char. Сперва запустим это запрос в веб-интерфейсе платформы Neon, где развернут экземпляр PostgreSQL. В результате получается большая сводная таблица с выполненными заказами за прошлый месяц от текущей даты (номер заказа, дата, сумма), включая данные о входящих в заказ товарах (название, цена, количество, название поставщика), а также сделавших эти заказы клиентах (имя, емейл, телефон).

    Neon PostgreSQL
    Первоначальная выборка данных в GUI Neon

    Убедившись, что данные, отвечающие нужным условиям, в базе присутствуют, далее напишем и запустим конвейер удаления этих данных из базы с их предварительным сохранением в JSON-файл. Оформим эту последовательность действий в виде DAG для Apache AirFlow.

    Запуск AirFlow в Colab

    Как обычно, я буду разворачивать AirFlow в Google Cloud Platform, и запускать DAG-файлы из Colab, используя удаленный исполнитель, о чем ранее писала здесь и здесь. Чтобы сделать это, следует пробросить туннель с локального хоста удаленной машины во внешний URl-адрес, поскольку Colab представляет собой удаленную среду в Google Cloud Platform, а не локальный хост разработчика. Такое туннелирование можно сделать с помощью утилиты ngrok, которую нужно установить в Colab вместе с другими библиотеками. Для этого нужно написать и запустить в ячейке Colab следующий код:

    #Установка Apache Airflow и инициализация базы данных.
    !pip install apache-airflow
    !airflow initdb
    
    #Установка инструмента ngrok для создания безопасного туннеля для доступа к веб-интерфейсу Airflow из любого места
    !wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
    !unzip ngrok-stable-linux-amd64.zip
    !pip install pyngrok
    
    #импорт модулей
    from pyngrok import ngrok
    import sys
    import os
    import datetime
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    
    from google.colab import drive
    drive.mount('/content/drive')
    os.makedirs('/content/airflow/dags', exist_ok=True)
    dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags")
    os.makedirs(dags_folder, exist_ok=True)
    
    #Получение версии установленного Apache Airflow
    !airflow version

    Затем запустим веб-сервер AirFlow на порту 8888 в виде демона — системной службы, которая запускает сервер в фоновом режиме, исполняя процесс без блокировки терминала, т.е. с возможностью запускать другие ячейки Colab.

    #запуска веб-сервера Apache Airflow на порту 8888. Веб-сервер Airflow предоставляет пользовательский интерфейс для управления DAGами,
    #просмотра логов выполнения задач, мониторинга прогресса выполнения
    !airflow webserver --port 8888 --daemon

    Пробросим туннель с локального хоста удаленной машины Colab ко внешнему URL с помощью ngrok, используя свой (ранее полученный) токен аутентификации:

    #Задание переменной auth_token для аутентификации в сервисе ngrok.
    auth_token = "……….ваш токен аутентификации……….." #@param {type:"string"}
    # Since we can't access Colab notebooks IP directly we'll use
    # ngrok to create a public URL for the server via a tunnel
    
    # Authenticate ngrok
    # https://dashboard.ngrok.com/signup
    # Then go to the "Your Authtoken" tab in the sidebar and copy the API key
    
    #Аутентификация в сервисе ngrok с помощью auth_token
    os.system(f"ngrok authtoken {auth_token}")
    
    #Запуск ngrok, который создаст публичный URL для сервера через туннель 
    #для доступа к веб-интерфейсу Airflow из любого места. 
    #addr="8888" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP
    public_url = ngrok.connect(addr="8888", proto="http")
    
    #Вывод публичного URL для доступа к веб-интерфейсу Airflow
    print("Адрес Airflow GUI:", public_url)

    Инициализируем базу данных метаданных AirFlow, которой по умолчанию является легковесная СУБД SQLite. Она резидентная, т.е. хранится в памяти, а не на жестком диске. Для доступа в веб-интерфейс AirFlow, доступном по URL-адресу, полученному с помощью ngrok, нужно создать пользователя, под логином и паролем которого можно будет войти в веб-интерфейс ETL-оркестратора:

    #############################ячейка №3 в Google Colab#########################
    !airflow db init #Инициализация базы данных Airflow
    !airflow upgradedb #Обновление базы данных Airflow
    
    #Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, адресом электронной почты anna@example.com и паролем password.
    #Этот пользователь будет иметь роль Admin, которая дает полный доступ к интерфейсу Airflow.
    !airflow users create --username anna --firstname Anna --lastname Anna --email anna@example.com --role Admin --password password

    Теперь можно войти в GUI веб-сервера Apache AirFlow, указав логин и пароль ранее созданного пользователя.

    веб-интерфейс Apache AirFlow
    Вход в веб-интерфейс Apache AirFlow

    Пока не запущен планировщик AirFlow в разделе DAGs пусто, т.е не отображается ни один конвейер. Лучше всего запустить планировщик AirFlow тоже в фоновом режиме с помощью аргумента —daemon:

    !airflow scheduler --daemon

    После выполнения этой команды в ячейке Colab, в веб-интерфейсе AirFlow в разделе DAGs будут показаны демонстрационные цепочки задач, которые создаются автоматически. Однако, нас интересует не обучающий типовой материал, а запуск собственного ETL-процесса для своей базы данных. Для этого напишем соответствующий пользовательский DAG.

    ETL для PostgreSQL

    Чтобы сократить количество ручных операций с Python-файлами, таких как копирование и загрузка, оформим код конвейера в виде текста, который генерируется кодом, запускаемым в Colab. Следующий код записывает DAG в файл, копирует его в соответствующий каталог в установке AirFlow, а также активирует его выполнение.

    import psycopg2
    
    from google.colab import files
    
    code = '''
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from datetime import datetime, timedelta
    import json
    import os
    import psycopg2
    import requests
    
    default_args = {
        'owner': 'airflow',
        'start_date': datetime.now() - timedelta(days=1),
        'retries': 1
    }
    
    dag = DAG(
        dag_id='ANNA_DAG_Clean_Orders',
        default_args=default_args,
        #schedule_interval='0 0 1 * *'  # Повторять каждый первый день месяца в 00:00
        schedule_interval='@daily'
    )
    
    def read_postgres(**kwargs):
        connection_string='postgres://Yor_user_name:your_password@your_host.neon.tech/neondb'
        conn = psycopg2.connect(connection_string)
    
        select_query = """
        SELECT
            orders.id,
            orders.date,
            orders.sum,
            TRIM(product.name),
            product.price,
            order_product.quantity,
            TRIM(provider.name),
            TRIM(customer.name),
            TRIM(customer.phone),
            TRIM(customer.email),
            TRIM(customer_states.name),
            delivery.date,
            TRIM(delivery.address),
            delivery.price
        FROM
            orders 
            JOIN order_product ON order_product.order = orders.id
            JOIN product ON product.id=order_product.product
            JOIN provider ON provider.id = product.provider
            JOIN customer ON orders.customer = customer.id
            JOIN customer_states ON customer_states.id = customer.state
            JOIN order_states ON order_states.id = orders.state
            JOIN delivery ON orders.delivery = delivery.id
         WHERE (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-30) AND (CURRENT_DATE))
        """
    
        cur = conn.cursor()
        cur.execute(select_query)
        results = cur.fetchall()
    
        cur.close()
        conn.close()
    
        return results
    
    def write_to_file(**kwargs):
        results = kwargs['ti'].xcom_pull(task_ids='read')
    
        orders = []
        order_dict = {}
    
        for result in results:
            order_id, order_date, order_sum, product_name, product_price, product_quantity, product_provider, customer_name, customer_phone, customer_email, customer_status, delivery_date, delivery_address, delivery_price = result
    
            if order_id not in order_dict:
                order_dict[order_id] = {
                    'order_id': order_id,
                    'order_date': str(order_date),
                    'order_sum': order_sum,
                    'products': [],
                    'customer': {
                        'customer_name': customer_name,
                        'customer_phone': customer_phone,
                        'customer_email': customer_email,
                        'customer_status': customer_status
                    },
                    'delivery': {
                        'delivery_date': str(delivery_date),
                        'delivery_address': delivery_address, 
                        'delivery_price': delivery_price
                    }
                }
    
            product = {
                'product_name': product_name,
                'product_price': product_price,
                'product_quantity': product_quantity,
                'product_provider': product_provider
            }
    
            order_dict[order_id]['products'].append(product)
    
        orders = list(order_dict.values())
    
        filename = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.json")
        filepath = os.path.join('/content/airflow/files/delivered', filename)
    
        with open(filepath, 'w') as f:
            json.dump(orders, f)
            f.close()
    
    def delete_from_database():
    
        connection_string='postgres://Yor_user_name:your_password@your_host.neon.tech/neondb'
        conn = psycopg2.connect(connection_string)
    
        delete_query = """
    DELETE FROM order_product
    WHERE order_product.order IN (
        SELECT orders.id
        FROM orders
        WHERE order_product.order = orders.id 
        AND (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-30) AND (CURRENT_DATE))
    );
    
    DELETE FROM orders
    WHERE id IN (
        SELECT orders.id
        FROM orders 
        WHERE (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-30) AND (CURRENT_DATE))
    );
        """
    
        cur = conn.cursor()
        cur.execute(delete_query)
        conn.commit()
    
        cur.close()
        conn.close()
    
    read_task = PythonOperator(
        task_id='read',
        python_callable=read_postgres,
        dag=dag
    )
    
    write_task = PythonOperator(
        task_id='write',
        python_callable=write_to_file,
        provide_context=True,
        dag=dag
    )
    
    delete_task = PythonOperator(
        task_id='delete',
        python_callable=delete_from_database,
        provide_context=True,
        dag=dag
    )
    
    read_task >> write_task >> delete_task
    '''
    
    with open('/root/airflow/dags/ANNA_DAG_Clean_Orders.py', 'w') as f:
        f.write(code)
    
    !cp ~/airflow/dags/ANNA_DAG_Clean_Orders.py /content/airflow/dags/ANNA_DAG_Clean_Orders.py
    !airflow dags unpause ANNA_DAG_Clean_Orders

    В этом коде выполняется импорт библиотек psycopg2 для работы с PostgreSQL и requests для выполнения HTTP-запросов. Также определяется переменная code, где хранится сам код DAG для Apache Airflow под названием ANNA_DAG_Clean_Orders. Целесообразно запускать этот DAG раз в месяц, однако, для тестирования и отладки я поставила расписание ежедневно, задав значение @daily в интервале запуска schedule_interval=’@daily’ вместо schedule_interval=’0 0 1 * *’, что означало бы повторять каждый первый день месяца в 00:00.

    В этом DAG определены следующие функции:

    • read_postgres — функция подключения к базе данных PostgreSQL, которая выполняет SELECT-запрос, выбирая определенные поля из таблицы с заказами (orders) и связанных с ней таблиц, согласно условию статуса успешной доставки и попадания даты заказа в прошлый месяц от текущей даты (WHERE (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-30) AND (CURRENT_DATE)));
    • write_to_file – функция, которая записывает результаты выполненного SQL-запроса в JSON-файл в формате;
    • delete_from_database, которая выполняет удаление данных из таблиц order_product и orders на основе заданного выше словия.

    Вышеприведенный код создает экземпляры класса PythonOperator для каждой задачи с указанием ее идентификатора, вызываемой Python-функции (python_callable) и DAG, к которому принадлежит задача. За передачу данных между задачами отвечает параметр provide_context=True, позволяющий предоставлять контекстные переменные (dag_run, execution_date, task_instance, task и пр.), которые нужны для выполнения представленного конвейера. Как упростить этот DAG-файл, описав каждую задачу в отдельном Python-скрипте, читайте в моей новой статье.

    После выполнения кода, создающего Python-файл с DAG, этот ETL-конвейер появится в веб-интерфейсе AirFlow.

    DAG AirFlow
    Перечень всех DAG

    Запустив свой DAG с помощью команды Trigger, можно посмотреть, как каждая задача успешно выполняется. Граф задач выглядит так:

    DAG AirFlow ETL
    Граф ETL-конвейера

    В интерфейсе платформы Neon, где развернут экземпляр PostgreSQL, можно проверить, что данные действительно удалены. Для этого снова запустим тот же самый запрос. В результате не выведено ни одной строки.

    Neon PostgreSQL
    Конечная выборка данных в GUI Neon

    Удаленные данные сохранены в JSON-файл в папке, определенной в коде DAG. В моем случае это каталог /content/airflow/files/delivered в пространстве Colab. Имя файла содержит дату и время, когда был выполнен этот DAG. Поскольку в процессе разработки я очень много тестировала и отлаживала код, еще до операции удаления данных, этих JSON-файлов получилось больше, чем один.

    Google Colab example AirFlow
    Сохраненные JSON-файлы

    JSON-файл содержит сведения об удаленных заказах, товарах и клиентах согласно следующей схеме:

    {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "title": "Generated schema for Root",
      "type": "array",
      "items": {
        "type": "object",
        "properties": {
          "order_id": {
            "type": "number"
          },
          "order_date": {
            "type": "string"
          },
          "order_sum": {
            "type": "number"
          },
          "products": {
            "type": "array",
            "items": {
              "type": "object",
              "properties": {
                "product_name": {
                  "type": "string"
                },
                "product_price": {
                  "type": "number"
                },
                "product_quantity": {
                  "type": "number"
                },
                "product_provider": {
                  "type": "string"
                }
              },
              "required": [
                "product_name",
                "product_price",
                "product_quantity",
                "product_provider"
              ]
            }
          },
          "customer": {
            "type": "object",
            "properties": {
              "customer_name": {
                "type": "string"
              },
              "customer_phone": {
                "type": "string"
              },
              "customer_email": {
                "type": "string"
              },
              "customer_status": {
                "type": "string"
              }
            },
            "required": [
              "customer_name",
              "customer_phone",
              "customer_email",
              "customer_status"
            ]
          },
          "delivery": {
            "type": "object",
            "properties": {
              "delivery_date": {
                "type": "string"
              },
              "delivery_address": {
                "type": "string"
              },
              "delivery_price": {
                "type": "number"
              }
            },
            "required": [
              "delivery_date",
              "delivery_address",
              "delivery_price"
            ]
          }
        },
        "required": [
          "order_id",
          "order_date",
          "order_sum",
          "products",
          "customer",
          "delivery"
        ]
      }
    }

    В самом файле данные отображаются не очень удобно для чтения (сплошным текстом), однако их можно переформатировать в любом онлайн-конвертере.

    JSON data formatted
    Отформатированный JSON-файл с удаленными из базы записями

    Таким образом, этот простой пример показал, как можно регулярно работать с удаленной РСУБД с помощью Apache AirFlow. Узнать больше про этот эффективный оркестратор рабочих процессов и его практическое использование в дата-инженерии и аналитике больших данных вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: