Сегодня усложним пример из прошлой статьи с простым ETL-конвейером, который добавлял в базу данных интернет-магазина новые записи о клиентах, сгенерированные с помощью библиотеки Faker. Разбираем, как удалить из PostgreSQL данные об успешно доставленных заказах за прошлый месяц, предварительно сохранив их в JSON-файл с многоуровневой структурой. Пишем и запускаем DAG Apache AirFlow в Google Colab.
Постановка задачи
Экземпляр базы данных интернет-магазина, пример проектирования которой от концептуальной до физической модели для PostgreSQL я рассматривала здесь, развернут в облаке serverless-платформе Neon. Схема физической модели данных выглядит следующим образом:
База данных уже наполнена записями. Предположим, чтобы «разгрузить» базу данных, ежемесячно необходимо удалять из нее сведения об успешно доставленных заказах за прошлый месяц, вместе с данными о продуктах, которые входят в этот заказа, а также, клиентах, сделавших заказы. Разумеется, перед удалением эти данные следует сохранить, записав их в архивное файловое хранилище. Далее сделаем это с помощью ETL-конвейера Apache AirFlow, но сперва проверим наличие в базе данных, отвечающих условиям выборки с помощью следующего SQL-запроса:
SELECT orders.id, orders.date, orders.sum, TRIM(product.name), product.price, order_product.quantity, TRIM(provider.name), TRIM(customer.name), TRIM(customer.phone), TRIM(customer.email), TRIM(customer_states.name), delivery.date, TRIM(delivery.address), delivery.price FROM orders JOIN order_product ON order_product.order = orders.id JOIN product ON product.id=order_product.product JOIN provider ON provider.id = product.provider JOIN customer ON orders.customer = customer.id JOIN customer_states ON customer_states.id = customer.state JOIN order_states ON order_states.id = orders.state JOIN delivery ON orders.delivery = delivery.id WHERE (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-30) AND (CURRENT_DATE))
В этом запросе использована функция TRIM() для обрезки «лишних» пробелов, которые возникают в полях таблиц с типом данных char. Сперва запустим это запрос в веб-интерфейсе платформы Neon, где развернут экземпляр PostgreSQL. В результате получается большая сводная таблица с выполненными заказами за прошлый месяц от текущей даты (номер заказа, дата, сумма), включая данные о входящих в заказ товарах (название, цена, количество, название поставщика), а также сделавших эти заказы клиентах (имя, емейл, телефон).
Убедившись, что данные, отвечающие нужным условиям, в базе присутствуют, далее напишем и запустим конвейер удаления этих данных из базы с их предварительным сохранением в JSON-файл. Оформим эту последовательность действий в виде DAG для Apache AirFlow.
Запуск AirFlow в Colab
Как обычно, я буду разворачивать AirFlow в Google Cloud Platform, и запускать DAG-файлы из Colab, используя удаленный исполнитель, о чем ранее писала здесь и здесь. Чтобы сделать это, следует пробросить туннель с локального хоста удаленной машины во внешний URl-адрес, поскольку Colab представляет собой удаленную среду в Google Cloud Platform, а не локальный хост разработчика. Такое туннелирование можно сделать с помощью утилиты ngrok, которую нужно установить в Colab вместе с другими библиотеками. Для этого нужно написать и запустить в ячейке Colab следующий код:
#Установка Apache Airflow и инициализация базы данных. !pip install apache-airflow !airflow initdb #Установка инструмента ngrok для создания безопасного туннеля для доступа к веб-интерфейсу Airflow из любого места !wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip !unzip ngrok-stable-linux-amd64.zip !pip install pyngrok #импорт модулей from pyngrok import ngrok import sys import os import datetime from airflow import DAG from airflow.operators.bash import BashOperator from google.colab import drive drive.mount('/content/drive') os.makedirs('/content/airflow/dags', exist_ok=True) dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags") os.makedirs(dags_folder, exist_ok=True) #Получение версии установленного Apache Airflow !airflow version
Затем запустим веб-сервер AirFlow на порту 8888 в виде демона — системной службы, которая запускает сервер в фоновом режиме, исполняя процесс без блокировки терминала, т.е. с возможностью запускать другие ячейки Colab.
#запуска веб-сервера Apache Airflow на порту 8888. Веб-сервер Airflow предоставляет пользовательский интерфейс для управления DAGами, #просмотра логов выполнения задач, мониторинга прогресса выполнения !airflow webserver --port 8888 --daemon
Пробросим туннель с локального хоста удаленной машины Colab ко внешнему URL с помощью ngrok, используя свой (ранее полученный) токен аутентификации:
#Задание переменной auth_token для аутентификации в сервисе ngrok. auth_token = "……….ваш токен аутентификации……….." #@param {type:"string"} # Since we can't access Colab notebooks IP directly we'll use # ngrok to create a public URL for the server via a tunnel # Authenticate ngrok # https://dashboard.ngrok.com/signup # Then go to the "Your Authtoken" tab in the sidebar and copy the API key #Аутентификация в сервисе ngrok с помощью auth_token os.system(f"ngrok authtoken {auth_token}") #Запуск ngrok, который создаст публичный URL для сервера через туннель #для доступа к веб-интерфейсу Airflow из любого места. #addr="8888" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP public_url = ngrok.connect(addr="8888", proto="http") #Вывод публичного URL для доступа к веб-интерфейсу Airflow print("Адрес Airflow GUI:", public_url)
Инициализируем базу данных метаданных AirFlow, которой по умолчанию является легковесная СУБД SQLite. Она резидентная, т.е. хранится в памяти, а не на жестком диске. Для доступа в веб-интерфейс AirFlow, доступном по URL-адресу, полученному с помощью ngrok, нужно создать пользователя, под логином и паролем которого можно будет войти в веб-интерфейс ETL-оркестратора:
#############################ячейка №3 в Google Colab######################### !airflow db init #Инициализация базы данных Airflow !airflow upgradedb #Обновление базы данных Airflow #Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, адресом электронной почты anna@example.com и паролем password. #Этот пользователь будет иметь роль Admin, которая дает полный доступ к интерфейсу Airflow. !airflow users create --username anna --firstname Anna --lastname Anna --email anna@example.com --role Admin --password password
Теперь можно войти в GUI веб-сервера Apache AirFlow, указав логин и пароль ранее созданного пользователя.
Пока не запущен планировщик AirFlow в разделе DAGs пусто, т.е не отображается ни один конвейер. Лучше всего запустить планировщик AirFlow тоже в фоновом режиме с помощью аргумента —daemon:
!airflow scheduler --daemon
После выполнения этой команды в ячейке Colab, в веб-интерфейсе AirFlow в разделе DAGs будут показаны демонстрационные цепочки задач, которые создаются автоматически. Однако, нас интересует не обучающий типовой материал, а запуск собственного ETL-процесса для своей базы данных. Для этого напишем соответствующий пользовательский DAG.
ETL для PostgreSQL
Чтобы сократить количество ручных операций с Python-файлами, таких как копирование и загрузка, оформим код конвейера в виде текста, который генерируется кодом, запускаемым в Colab. Следующий код записывает DAG в файл, копирует его в соответствующий каталог в установке AirFlow, а также активирует его выполнение.
import psycopg2 from google.colab import files code = ''' from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime, timedelta import json import os import psycopg2 import requests default_args = { 'owner': 'airflow', 'start_date': datetime.now() - timedelta(days=1), 'retries': 1 } dag = DAG( dag_id='ANNA_DAG_Clean_Orders', default_args=default_args, #schedule_interval='0 0 1 * *' # Повторять каждый первый день месяца в 00:00 schedule_interval='@daily' ) def read_postgres(**kwargs): connection_string='postgres://Yor_user_name:your_password@your_host.neon.tech/neondb' conn = psycopg2.connect(connection_string) select_query = """ SELECT orders.id, orders.date, orders.sum, TRIM(product.name), product.price, order_product.quantity, TRIM(provider.name), TRIM(customer.name), TRIM(customer.phone), TRIM(customer.email), TRIM(customer_states.name), delivery.date, TRIM(delivery.address), delivery.price FROM orders JOIN order_product ON order_product.order = orders.id JOIN product ON product.id=order_product.product JOIN provider ON provider.id = product.provider JOIN customer ON orders.customer = customer.id JOIN customer_states ON customer_states.id = customer.state JOIN order_states ON order_states.id = orders.state JOIN delivery ON orders.delivery = delivery.id WHERE (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-30) AND (CURRENT_DATE)) """ cur = conn.cursor() cur.execute(select_query) results = cur.fetchall() cur.close() conn.close() return results def write_to_file(**kwargs): results = kwargs['ti'].xcom_pull(task_ids='read') orders = [] order_dict = {} for result in results: order_id, order_date, order_sum, product_name, product_price, product_quantity, product_provider, customer_name, customer_phone, customer_email, customer_status, delivery_date, delivery_address, delivery_price = result if order_id not in order_dict: order_dict[order_id] = { 'order_id': order_id, 'order_date': str(order_date), 'order_sum': order_sum, 'products': [], 'customer': { 'customer_name': customer_name, 'customer_phone': customer_phone, 'customer_email': customer_email, 'customer_status': customer_status }, 'delivery': { 'delivery_date': str(delivery_date), 'delivery_address': delivery_address, 'delivery_price': delivery_price } } product = { 'product_name': product_name, 'product_price': product_price, 'product_quantity': product_quantity, 'product_provider': product_provider } order_dict[order_id]['products'].append(product) orders = list(order_dict.values()) filename = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.json") filepath = os.path.join('/content/airflow/files/delivered', filename) with open(filepath, 'w') as f: json.dump(orders, f) f.close() def delete_from_database(): connection_string='postgres://Yor_user_name:your_password@your_host.neon.tech/neondb' conn = psycopg2.connect(connection_string) delete_query = """ DELETE FROM order_product WHERE order_product.order IN ( SELECT orders.id FROM orders WHERE order_product.order = orders.id AND (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-30) AND (CURRENT_DATE)) ); DELETE FROM orders WHERE id IN ( SELECT orders.id FROM orders WHERE (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-30) AND (CURRENT_DATE)) ); """ cur = conn.cursor() cur.execute(delete_query) conn.commit() cur.close() conn.close() read_task = PythonOperator( task_id='read', python_callable=read_postgres, dag=dag ) write_task = PythonOperator( task_id='write', python_callable=write_to_file, provide_context=True, dag=dag ) delete_task = PythonOperator( task_id='delete', python_callable=delete_from_database, provide_context=True, dag=dag ) read_task >> write_task >> delete_task ''' with open('/root/airflow/dags/ANNA_DAG_Clean_Orders.py', 'w') as f: f.write(code) !cp ~/airflow/dags/ANNA_DAG_Clean_Orders.py /content/airflow/dags/ANNA_DAG_Clean_Orders.py !airflow dags unpause ANNA_DAG_Clean_Orders
В этом коде выполняется импорт библиотек psycopg2 для работы с PostgreSQL и requests для выполнения HTTP-запросов. Также определяется переменная code, где хранится сам код DAG для Apache Airflow под названием ANNA_DAG_Clean_Orders. Целесообразно запускать этот DAG раз в месяц, однако, для тестирования и отладки я поставила расписание ежедневно, задав значение @daily в интервале запуска schedule_interval=’@daily’ вместо schedule_interval=’0 0 1 * *’, что означало бы повторять каждый первый день месяца в 00:00.
В этом DAG определены следующие функции:
- read_postgres — функция подключения к базе данных PostgreSQL, которая выполняет SELECT-запрос, выбирая определенные поля из таблицы с заказами (orders) и связанных с ней таблиц, согласно условию статуса успешной доставки и попадания даты заказа в прошлый месяц от текущей даты (WHERE (orders.state=5) AND (orders.date BETWEEN (CURRENT_DATE-30) AND (CURRENT_DATE)));
- write_to_file – функция, которая записывает результаты выполненного SQL-запроса в JSON-файл в формате;
- delete_from_database, которая выполняет удаление данных из таблиц order_product и orders на основе заданного выше словия.
Вышеприведенный код создает экземпляры класса PythonOperator для каждой задачи с указанием ее идентификатора, вызываемой Python-функции (python_callable) и DAG, к которому принадлежит задача. За передачу данных между задачами отвечает параметр provide_context=True, позволяющий предоставлять контекстные переменные (dag_run, execution_date, task_instance, task и пр.), которые нужны для выполнения представленного конвейера. Как упростить этот DAG-файл, описав каждую задачу в отдельном Python-скрипте, читайте в моей новой статье.
После выполнения кода, создающего Python-файл с DAG, этот ETL-конвейер появится в веб-интерфейсе AirFlow.
Запустив свой DAG с помощью команды Trigger, можно посмотреть, как каждая задача успешно выполняется. Граф задач выглядит так:
В интерфейсе платформы Neon, где развернут экземпляр PostgreSQL, можно проверить, что данные действительно удалены. Для этого снова запустим тот же самый запрос. В результате не выведено ни одной строки.
Удаленные данные сохранены в JSON-файл в папке, определенной в коде DAG. В моем случае это каталог /content/airflow/files/delivered в пространстве Colab. Имя файла содержит дату и время, когда был выполнен этот DAG. Поскольку в процессе разработки я очень много тестировала и отлаживала код, еще до операции удаления данных, этих JSON-файлов получилось больше, чем один.
JSON-файл содержит сведения об удаленных заказах, товарах и клиентах согласно следующей схеме:
{ "$schema": "http://json-schema.org/draft-07/schema#", "title": "Generated schema for Root", "type": "array", "items": { "type": "object", "properties": { "order_id": { "type": "number" }, "order_date": { "type": "string" }, "order_sum": { "type": "number" }, "products": { "type": "array", "items": { "type": "object", "properties": { "product_name": { "type": "string" }, "product_price": { "type": "number" }, "product_quantity": { "type": "number" }, "product_provider": { "type": "string" } }, "required": [ "product_name", "product_price", "product_quantity", "product_provider" ] } }, "customer": { "type": "object", "properties": { "customer_name": { "type": "string" }, "customer_phone": { "type": "string" }, "customer_email": { "type": "string" }, "customer_status": { "type": "string" } }, "required": [ "customer_name", "customer_phone", "customer_email", "customer_status" ] }, "delivery": { "type": "object", "properties": { "delivery_date": { "type": "string" }, "delivery_address": { "type": "string" }, "delivery_price": { "type": "number" } }, "required": [ "delivery_date", "delivery_address", "delivery_price" ] } }, "required": [ "order_id", "order_date", "order_sum", "products", "customer", "delivery" ] } }
В самом файле данные отображаются не очень удобно для чтения (сплошным текстом), однако их можно переформатировать в любом онлайн-конвертере.
Таким образом, этот простой пример показал, как можно регулярно работать с удаленной РСУБД с помощью Apache AirFlow. Узнать больше про этот эффективный оркестратор рабочих процессов и его практическое использование в дата-инженерии и аналитике больших данных вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: