Сегодня реализуем простой ETL-конвейер для реляционной СУБД PostgreSQL, запустив Apache AirFlow в интерактивной среде Google Colab. Пример DAG из 3-х задач: получить количество строк в одной из таблиц БД, сгенерировать новые строки и записать их, не нарушив ограничений уникальности первичного ключа.
Постановка задачи
Возьмем в качестве примера базу данных для интернет-магазина, пример проектирования которой от концептуальной до физической модели для PostgreSQL я рассматривала здесь. Интернет-магазин продает товары от разных поставщиков. Клиент может забрать каждый заказ самостоятельно или оформить платную доставку от магазина. В магазине действует программа лояльности для покупателей с присвоением им различных статусов в зависимости от общей суммы покупок.
Согласно проведенному анализу предметной области, был выполнен ряд итераций проектирования моделей данных. В результате создания физической модели и генерации DDL-скриптов в базе данных, развернутой в облачном экземпляре PostgreSQL на serverless-платформе Neon, были созданы следующие таблицы.
Таблица |
Сущность домена |
Поле |
Назначение |
Тип данных |
customer |
Клиент |
id |
Уникальный идентификатор, первичный ключ (PRIMARY KEY) |
int (целочисленный) |
name |
Имя |
сhar (символьный) |
||
|
Емейл |
сhar (символьный) |
||
phone |
Телефон |
сhar (символьный) |
||
state |
Ссылка на Статус Клиента, внешний ключ (FOREIGN KEY) |
int (целочисленный) |
||
|
||||
order |
Заказ |
id |
Уникальный идентификатор, первичный ключ (PRIMARY KEY) |
int (целочисленный) |
date |
Дата |
date (дата YYYY-MM-DD) |
||
sum |
Сумма |
Double precision (вещественное двойной точности) |
||
state |
Ссылка на Статус Заказа, внешний ключ (FOREIGN KEY) |
int (целочисленный) |
||
customer |
Ссылка на Клиент, внешний ключ (FOREIGN KEY) |
int (целочисленный) |
||
product |
Ссылка на Товары В Заказе, внешний ключ (FOREIGN KEY) |
int (целочисленный) |
||
|
||||
product |
Товар |
id |
Уникальный идентификатор, первичный ключ (PRIMARY KEY) |
int (целочисленный) |
name |
Название |
сhar (символьный) |
||
provider |
Ссылка на Поставщик, внешний ключ (FOREIGN KEY) |
int (целочисленный) |
||
price |
Цена |
Double precision (вещественное двойной точности) |
||
quantity |
Количество |
int (целочисленный) |
||
|
||||
provider |
Поставщик |
id |
Уникальный идентификатор, первичный ключ (PRIMARY KEY) |
int (целочисленный) |
name |
Название |
сhar (символьный) |
||
|
Емейл |
сhar (символьный) |
||
phone |
Телефон |
сhar (символьный) |
||
address |
Адрес |
text (текстовый) |
||
|
||||
delivery |
Доставка |
id |
Уникальный идентификатор, первичный ключ (PRIMARY KEY) |
int (целочисленный) |
date |
Дата |
date (дата YYYY-MM-DD) |
||
address |
Адрес |
text (текстовый) |
||
price |
Стоимость |
Double precision (вещественное двойной точности) |
||
|
||||
customer_states |
Статус Клиента |
id |
Уникальный идентификатор, первичный ключ (PRIMARY KEY) |
int (целочисленный) |
name |
Название |
сhar (символьный) |
||
|
||||
order_states |
Статус Заказа |
id |
Уникальный идентификатор, первичный ключ (PRIMARY KEY) |
int (целочисленный) |
name |
Название |
сhar (символьный) |
||
|
||||
order_product |
Товары в Заказе |
id |
Уникальный идентификатор, первичный ключ (PRIMARY KEY) |
int (целочисленный) |
order |
Ссылка на Заказ, внешний ключ (FOREIGN KEY) |
int (целочисленный) |
||
product |
Ссылка на Товар, внешний ключ (FOREIGN KEY) |
int (целочисленный) |
||
quantity |
Количество |
int (целочисленный) |
Схема физической модели данных выглядит следующим образом:
В таблице customer уже есть 100 записей, которые были добавлены туда заранее.
Для регистрации новой партии клиентов необходимо знать количество записей в таблице customer, чтобы соблюсти ограничения уникальности первичного ключа. Для этого необходимо сперва выполнить SQL-запрос к таблице customer
SELECT COUNT(*) FROM customer
Затем надо сгенерировать партию данных о клиентах и записать их в базу данных.
Рассмотрим эту последовательность действий как простой ETL-запрос, который можно представить в виде направленного ациклического графа (DAG) Apache AirFlow.
Генерировать данные о клиентах, как обычно, я буду с помощью Python-библиотеки Faker, разворачивать AirFlow в Google Cloud Platform, и запускать DAG-файлы из Colab, используя удаленный исполнитель, о чем ранее писала здесь и здесь.
Запуск AirFlow в Colab
Чтобы использовать AirFlow в Google Cloud Platform, и запускать DAG-файлы из Colab, следует пробросить туннель с локального хоста удаленной машины во внешний URl-адрес. Это можно сделать с помощью утилиты ngrok, которую нужно установить в Colab вместе с другими библиотеками. Для этого я написала и запустила в ячейке Colab следующий код:
!pip install apache-airflow !airflow initdb #Установка инструмента ngrok для создания безопасного туннеля для доступа к веб-интерфейсу Airflow из любого места !wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip !unzip ngrok-stable-linux-amd64.zip !pip install pyngrok #импорт модулей from pyngrok import ngrok import sys import os import datetime from airflow import DAG from airflow.operators.bash import BashOperator from google.colab import drive drive.mount('/content/drive') os.makedirs('/content/airflow/dags', exist_ok=True) dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags") os.makedirs(dags_folder, exist_ok=True)
Затем запустим веб-сервер AirFlow на порту 8888 в виде демона — системной службы, которая запускает сервер в фоновом режиме, исполняя процесс без блокировки терминала, т.е. с возможностью запускать другие ячейки Colab.
#запуск веб-сервера Apache Airflow на порту 8888. Веб-сервер Airflow предоставляет пользовательский интерфейс для управления DAGами, #просмотра логов выполнения задач, мониторинга прогресса выполнения !airflow webserver --port 8888 --daemon
Пробросим туннель с локального хоста удаленной машины Colab ко внешнему URL с помощью ngrok, используя свой (ранее полученный) токен аутентификации:
#Задание переменной auth_token для аутентификации в сервисе ngrok. auth_token = "……….ваш токен аутентификации……….." #@param {type:"string"} # Since we can't access Colab notebooks IP directly we'll use # ngrok to create a public URL for the server via a tunnel # Authenticate ngrok # https://dashboard.ngrok.com/signup # Then go to the "Your Authtoken" tab in the sidebar and copy the API key #Аутентификация в сервисе ngrok с помощью auth_token os.system(f"ngrok authtoken {auth_token}") #Запуск ngrok, который создаст публичный URL для сервера через туннель #для доступа к веб-интерфейсу Airflow из любого места. #addr="8888" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP public_url = ngrok.connect(addr="8888", proto="http") #Вывод публичного URL для доступа к веб-интерфейсу Airflow print("Адрес Airflow GUI:", public_url)
Далее инициализируем базу данных метаданных AirFlow, которой по умолчанию является легковесная СУБД SQLite. Она резидентная, т.е. хранится в памяти, а не на жестком диске. Для доступа в веб-интерфейс AirFlow, доступном по URL-адресу, полученному с помощью ngrok, нужно создать пользователя, под логином и паролем которого можно будет войти в веб-интерфейс ETL-оркестратора:
#############################ячейка №3 в Google Colab######################### !airflow db init #Инициализация базы данных Airflow !airflow upgradedb #Обновление базы данных Airflow #Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, адресом электронной почты anna@example.com и паролем password. #Этот пользователь будет иметь роль Admin, которая дает полный доступ к интерфейсу Airflow. !airflow users create --username anna --firstname Anna --lastname Anna --email anna@example.com --role Admin --password password
Теперь можно войти в GUI веб-сервера Apache AirFlow, указав логин и пароль ранее созданного пользователя.
Пока не запущен планировщик AirFlow в разделе DAGs пусто, т.е не отображается ни один конвейер. Лучше всего запустить планировщик AirFlow тоже в фоновом режиме с помощью аргумента —daemon:
!airflow scheduler --daemon
После выполнения этой команды в ячейке Colab, в веб-интерфейсе AirFlow в разделе DAGs будут показаны демонстрационные цепочки задач, которые создаются автоматически. Однако, нас интересует не обучающий типовой материал, а запуск собственного ETL-процесса для своей базы данных. Для этого напишем соответствующий пользовательский DAG.
ETL для PostgreSQL
Чтобы сократить количество ручных операций с Python-файлами, таких как копирование и загрузка, сегодня я решила оформить код конвейера в виде текста, который генерируется кодом, запускаемым в Colab. Для этого сначала следует установить необходимые пакеты: Faker – для генерации случайных данных и psycopg2 — адаптер PostgreSQL для Python, нужный для автоматической подстройки типов при составлении SQL-запросов и получении ответов.
!pip install psycopg2 import random import datetime from faker import Faker from google.colab import files from faker.providers.address.ru_RU import Provider
Наконец, напишем код, который записывает DAG в файл, копирует его в соответствующий каталог в установке AirFlow, а также активирует его выполнение.
import psycopg2 from google.colab import files code = ''' from airflow import DAG from airflow.operators.python import PythonOperator from airflow.models import Connection from datetime import datetime, timedelta from faker import Faker from faker.providers import phone_number, person import psycopg2 default_args = { 'owner': 'airflow', 'start_date': datetime.now() - timedelta(days=1), 'retries': 1 } dag = DAG( dag_id='ANNA_DAG_Generate_Customer', default_args=default_args, schedule_interval='@daily' ) def read_postgres(**kwargs): connection_string='postgres://Yor_user_name:your_password@your_host.neon.tech/neondb' conn = psycopg2.connect(connection_string) # Specify the table name table_name = 'customer' select_query = f"SELECT COUNT(*) FROM {table_name};" # Create a cursor object cur = conn.cursor() # Execute the SQL query cur.execute(select_query) # Get the result of the query result = cur.fetchone() # Close the cursor and connection cur.close() conn.close() # Return the count of records return result[0] def generate_faker_data(**kwargs): # Получение аргумента 'of' из контекста of = kwargs['ti'].xcom_pull(task_ids='read') # Creating a Faker object with Russian locale fake = Faker('ru_RU') fake.add_provider(phone_number) fake.add_provider(person) # Specify the table name table_name = 'customer' # Specify the column names of the table column_names = ('id', 'name', 'email', 'phone', 'state') # Specify the number of rows to generate k = 10 # Create the string of column names columns_str = ', '.join(f'"{column}"' for column in column_names) insert_queries = [] for i in range(k): # Generate values for each column name = f"'{fake.unique.name()}'" # Generate a unique random name email = f"'{fake.unique.email()}'" # Generate a unique random email phone = f"'{fake.phone_number()}'" # Generate a random phone number state = fake.random_int(min=1, max=4) # Generate a random value for the foreign key # Create a list of values values = [int(of) + i + 1, name, email, phone, state] # Create a string of values values_str = ', '.join(str(value) for value in values) # Create and append the DML query insert_query = f"INSERT INTO {table_name} ({columns_str}) VALUES ({values_str});" insert_queries.append(insert_query) print(insert_query) # Print to console for debugging return insert_queries def write_to_postgres(**kwargs): insert_queries = kwargs['ti'].xcom_pull(task_ids='generate') connection_string='postgres://Yor_user_name:your_password@your_host.neon.tech/neondb' conn = psycopg2.connect(connection_string) # Create a cursor object cur = conn.cursor() for insert_query in insert_queries: # Execute the SQL query cur.execute(insert_query) # Commit the changes to the database conn.commit() # Close the cursor and connection cur.close() conn.close() read_task = PythonOperator( task_id='read', python_callable=read_postgres, provide_context=True, dag=dag ) generate_task = PythonOperator( task_id='generate', python_callable=generate_faker_data, provide_context=True, dag=dag ) write_task = PythonOperator( task_id='write', python_callable=write_to_postgres, provide_context=True, dag=dag ) read_task >> generate_task >> write_task ''' with open('/root/airflow/dags/ANNA_generate_customer_data.py', 'w') as f: f.write(code) !cp ~/airflow/dags/ANNA_generate_customer_data.py /content/airflow/dags/ANNA_DAG_Generate_Customer.py !airflow dags unpause ANNA_DAG_Generate_Customer
В этом коде определены 3 Python-функции для задач DAG:
- read_postgres — функция подключения к базе данных PostgreSQL, которая выполняет SELECT-запрос для подсчета количества записей в таблице customer и возвращает количество записей;
- generate_faker_data — функция генерации случайных данных о клиентах с помощью библиотеки Faker. Она получает количество записей из задачи read_postgres и генерирует 10 строк для таблицы customer согласно заданной схеме, т.е. названиям таблиц и их типам данных. В случае моей БД таблица с клиентами включает такие поля как идентификатор, имя, электронная почта, телефон и статус. Сгенерированные данные возвращаются в виде списка INSERT-запросов.
- write_to_postgres — функция записи данных в PostgreSQL, которая выполняет ранее INSERT-запросы, сгенерированные задачей generate_faker_data, для вставки данных в таблицу customer.
Вышеприведенный код создает экземпляры класса PythonOperator для каждой задачи с указанием ее идентификатора, вызываемой Python-функции (python_callable) и DAG, к которому принадлежит задача. За передачу данных между задачами отвечает параметр provide_context=True, позволяющий предоставлять контекстные переменные (dag_run, execution_date, task_instance, task и пр.), которые нужны для выполнения представленного конвейера.
После выполнения кода, создающего Python-файл с DAG, этот конвейер появится в веб-интерфейсе AirFlow.
Запустив свой DAG с помощью команды Trigger, можно посмотреть, как каждая задача успешно выполняется. В табличном виде запуски выглядят так:
Также доступен сам граф задач.
В интерфейсе платформы Neon, где развернут экземпляр PostgreSQL, можно проверить, что новые записи в таблицу успешно добавлены.
В заключение проверим, как данные между задачами передаются с помощью контекстных переменных, просмотрев список XCom-объектов. В частности, здесь отображается значение количество существующих записей в таблице БД и сгенерированные INSERT-запросы, которые используются для последней задачи DAG.
О том, как удалить из базы старые заказы, предварительно сохранив о них сведения в JSON-файл, читайте в новой статье.
Освойте применение Apache AirFlow для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: