Из PostgreSQL в Elasticsearch: пишем ETL-процесс в DAG AirFlow и запускаем в Colab

ETL from PostgreSQL to Elasticsearch AirFlow, Телеграм AirFlow, DAG Apache AirFlow Colab пример, Apache AirFlow GUI Google Colab, обучение Apache AirFlow, курсы дата-инженеров, обучение разработчиков Big Data, разработка AirFlow конвейеров, Школа Больших Данных Учебный Центр Коммерсант

Пример ETL-процесса в DAG Apache AirFlow: извлечение данных о выполненных заказах из PostgreSQL, преобразование в JSON-документ и загрузка в NoSQL-хранилище Elasticsearch в виде JSON-документа с отправкой уведомления в Telegram. Разработка и запуск кода в Google Colab.

Постановка задачи и проектирование конвейера в виде DAG AirFlow

О том, как построить простой ETL-конвейер в Apache AirFlow на примере выгрузки старых заказов из базы данных интернет-магазина на PostgreSQL, я уже здесь и здесь. Сегодня усложним прошлый вариант и будем выгружать данные о выполненных заказах не в файл на Google-диске, а в NoSQL-хранилище с мощным поисковым движком Elasticsearch.

Опишем каждую задачу в отдельном py-файле, выстраивая их последовательность в общем конвейере. Мой конвейер, т.е. DAG состоит из следующих задач:

  • start_task — фиктивный оператор, представляющий начало DAG;
  • extract_task — оператор Python, который вызывает функцию read_from_PostgreSQL_function для извлечения данных из PostgreSQL. Для параметра Provide_context установлено значение True, чтобы получить доступ к контексту выполнения.
  • transform_task – оператор Python, который вызывает функцию write_to_JSON_function для преобразования данных, ранее считанных из БД, в файл JSON. Для параметра Provide_context также установлено значение True, поскольку результаты задачи extract_task будут входом для transform_task.
  • load_task – оператор Python, который вызывает функцию write_to_ElasticDB_function для загрузки данных в ElasticDB. Для параметра Provide_context также установлено значение True, поскольку результаты задачи transform_task будут входом для load_task.
  • send_notification_task – оператор Telegram, т.е. задача, которая отправляет уведомление в Telegram о выполнении процесса ETL. В тексте уведомления используются значения текущей даты и времени, а также результат выполнения задачи load_task.
  • end_task – фиктивный оператор, представляющий конец DAG.

Задачи связаны между собой стрелками, которые указывают направление выполнения. Например, start_task выполняется перед extract_task, extract_task перед transform_task и т.д.

DAG Apache AirFlow
DAG Apache AirFlow

В результате выполнения данного DAG данные из PostgreSQL будут извлечены, преобразованы в формат JSON и загружены в Elasticserch, а затем будет отправлено уведомление о выполнении процесса.

Код DAG-файла выглядит следующим образом:

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.telegram.operators.telegram import TelegramOperator
from datetime import datetime, timedelta
from ANNA_extract_task import read_from_PostgreSQL_function
from ANNA_transform_task import write_to_JSON_function
from ANNA_load_task import write_to_ElasticDB_function

default_args = {
    'owner': 'airflow',
    'start_date': datetime.now() - timedelta(days=1),
    'retries': 1
}

dag = DAG(
    dag_id='ANNA_DAG_ETL_from_PG_2_ElK',
    default_args=default_args,
    schedule_interval='@daily'
)

start_task = DummyOperator(task_id='start_task', dag=dag)

extract_task = PythonOperator(
    task_id='extract_task',
    provide_context=True,
    python_callable=read_from_PostgreSQL_function,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform_task',
    provide_context=True,
    python_callable=write_to_JSON_function,
    dag=dag
)

load_task = PythonOperator(
    task_id='load_task',
    provide_context=True,
    python_callable=write_to_ElasticDB_function,
    dag=dag
)

telegram_token = 'my TG-token'
telegram_chat_id = 'my chat id'
now=datetime.now()

send_notification_task = TelegramOperator(
    task_id='send_notification_task',
    token=telegram_token,
    chat_id=telegram_chat_id,
    text='ETL-process has been done {} with result {}'.format(
        now.strftime("%m/%d/%Y, %H:%M:%S"),
        '{{ ti.xcom_pull(key="return_value", task_ids="load_task") }}'
    ),
    dag=dag
)

end_task = DummyOperator(task_id='end_task', dag=dag)

start_task >> extract_task >> transform_task >> load_task >> send_notification_task >> end_task

Задачи extract_task, transform_task и load_task описаны в отдельных файлах. Код задачи извлечения (extract_task), которая выполняет чтение из базы данных PostgreSQL заказов, выполненных за последние 3 месяца, т.е. 90 дней от текущей даты:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import psycopg2

def read_from_PostgreSQL_function(**kwargs):
    connection_string = 'postgres://my-database'
    conn = psycopg2.connect(connection_string)

    select_query = """
    SELECT
        orders.id,
        orders.date,
        orders.sum,
        product.name,
        product.price,
        order_product.quantity,
        provider.name,
        customer.name,
        customer.phone,
        customer.email,
        customer_states.name,
        delivery.date,
        delivery.address,
        delivery.price
    FROM
        orders
        JOIN order_product ON order_product.order = orders.id
        JOIN product ON product.id=order_product.product
        JOIN provider ON provider.id = product.provider
        JOIN customer ON orders.customer = customer.id
        JOIN customer_states ON customer_states.id = customer.state
        JOIN order_states ON order_states.id = orders.state
        JOIN delivery ON orders.delivery = delivery.id
     WHERE (orders.state=4) AND (orders.date BETWEEN (CURRENT_DATE-90) AND (CURRENT_DATE))
    """

    cur = conn.cursor()
    cur.execute(select_query)
    results = cur.fetchall()

    cur.close()
    conn.close()

    return results

В этом коде идет обращение к экземпляру PostgreSQL, развернутом в облачной платформе Hasura Cloud (Neon). Задача преобразования этих данных в JSON-файл transform_task выглядит так:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import json
import os

def write_to_JSON_function(**kwargs):
    results = kwargs['ti'].xcom_pull(task_ids='extract_task')

    orders = []
    order_dict = {}

    for result in results:
        order_id, order_date, order_sum, product_name, product_price, product_quantity, product_provider, customer_name, customer_phone, customer_email, customer_status, delivery_date, delivery_address, delivery_price = result

        if order_id not in order_dict:
            order_dict[order_id] = {
                'order_id': order_id,
                'order_date': str(order_date),
                'order_sum': order_sum,
                'products': [],
                'customer': {
                    'customer_name': customer_name,
                    'customer_phone': customer_phone,
                    'customer_email': customer_email,
                    'customer_status': customer_status
                },
                'delivery': {
                    'delivery_date': str(delivery_date),
                    'delivery_address': delivery_address,
                    'delivery_price': delivery_price
                }
            }

        product = {
            'product_name': product_name,
            'product_price': product_price,
            'product_quantity': product_quantity,
            'product_provider': product_provider
        }

        order_dict[order_id]['products'].append(product)

    orders = list(order_dict.values())

    data = {
        'orders': orders
    }

    filename = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.json")
    filepath = os.path.join('/content/airflow/files/delivered', filename)

    with open(filepath, 'w') as f:
        json.dump(data, f)
        f.close()

    return data

Помимо формирования JSON-документов для их передачи в задачу load_task я также записываю результаты выполнения SQL-запроса в файл на Google-диске для отладки в директорию /content/airflow/files/delivered.

Google Colab AirFlow task
Формирование JSON-документов

Задача загрузки этих JSON-данных load_task в NoSQL-хранилище Elasticsearch выглядит так:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import requests
from elasticsearch7 import Elasticsearch

def write_to_ElasticDB_function(**kwargs):
  data = kwargs['ti'].xcom_pull(task_ids='transform_task')

  # Установка подключения к Elasticsearch
  es = Elasticsearch(
    hosts=[{'host': my_elasict_host.bonsaisearch.net', 'port': 443, 'use_ssl': True}],
    http_auth=('my-elastic-user', 'my-password')
   )

  # запись данных в индекс
  response = es.index(index='orders_index', body=data)

  # Проверка статуса ответа
  if response['result'] == 'created':
    print("Данные успешно записаны в индекс orders_index")
    notification = 'success'
  else:
    print("Не удалось записать данные в индекс")
    notification = 'failed'

  return  notification

Экземпляр Elasticsearch у меня развернут в облачной платформе bonsai. В этой документо-ориентированной СУБД с мощным поисковым движком аналогом таблицы реляционной базы данных является индекс. Поэтому я заранее создала индекс с помощью команды PUT^

Создание своего индекса в Elasticsearch в bonsai.io
Создание своего индекса в Elasticsearch в bonsai.io

В этот индекс будут загружаться JSON-документы, которые в хранятся в Elasticsearch подобно строкам в таблице реляционной базы данных. После загрузки JSON-документов их можно будет запрашивать в Kibana – веб-интерфейсе Elasticsearch. Чтобы сделать это, сперва необходимо сформировать и загрузить эти документы, что я буду делать с помощью задач в DAG Apache AirFlow.

Data Pipeline на Apache Airflow

Код курса
AIRF
Ближайшая дата курса
19 марта, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.

Реализация и запуск DAG AirFlow в Google Colab

Поскольку я запускаю код в интерактивной среде Google Colab, он будет более многословным. В частности, поскольку экземпляр Apache AirFlow   развернут на удаленной машине Google Colab, придется прокинуть туннель, чтобы сделать ее локальный хост доступным извне по URL. Как обычно, для этого я использую утилиту ngrok.

Сперва установим все необходимые библиотеки и импортируем пакеты:

###############################################ячейка №1 в Google Colab#############################
#Установка Apache Airflow и инициализация базы данных.
!pip install apache-airflow
!airflow initdb
!pip install elasticsearch7==7.10.1
from elasticsearch7 import Elasticsearch

#Установка инструмента ngrok для создания безопасного туннеля для доступа к веб-интерфейсу Airflow из любого места
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
!pip install pyngrok

!pip install apache-airflow-providers-telegram

#импорт модулей
from pyngrok import ngrok
import sys
import os
import datetime
from airflow import DAG
from airflow.providers.telegram.operators.telegram import TelegramOperator

from google.colab import drive
drive.mount('/content/drive')
os.makedirs('/content/airflow/dags', exist_ok=True)
dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags")
os.makedirs(dags_folder, exist_ok=True)

Затем следует запустить веб-сервер AirFlow в режиме демона, чтобы выполнение кода в этой ячейке Colab не блокировало другие:

!airflow webserver --port 8888 --daemon

Запускаем тунелирование с ngrok, задав токен аутентификации этой службы:

#Задание переменной auth_token для аутентификации в сервисе ngrok.
auth_token = " токен аутентификации, взятый с https://dashboard.ngrok.com/signup " 

#Аутентификация в сервисе ngrok с помощью auth_token
os.system(f"ngrok authtoken {auth_token}")

#Запуск ngrok, который создаст публичный URL для сервера через туннель
#для доступа к веб-интерфейсу Airflow из любого места.
#addr="8888" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP
public_url = ngrok.connect(addr="8888", proto="http")

#Вывод публичного URL для доступа к веб-интерфейсу Airflow
print("Адрес Airflow GUI:", public_url)

В результате в области вывода Colab появится URL-адрес, по которому будет доступен ранее запущенный веб-сервер AirFlow. Потом необходимо создать пользователя Apache AirFlow, установить логин и пароль. Моего пользователя традиционно зовут anna с ролью администратора и паролем password:

!airflow db init #Инициализация базы данных Airflow
!airflow upgradedb #Обновление базы данных Airflow

#Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, адресом электронной почты anna.doe@example.com и паролем password.
#Этот пользователь будет иметь роль Admin, которая дает полный доступ к интерфейсу Airflow.
!airflow users create --username anna --firstname Anna --lastname Anna --email anna@example.com --role Admin --password password

Теперь можно войти в веб-интерфейс пакетного оркестратора, используя эти учетные данные.

Чтобы управлять задачами и самим DAG программным образом, я дополнила их код инструкциями записи в файл, сохранения в рабочую директорию AirFlow и копирования в пользовательскую папку для просмотра. Например, Python-код задачи извлечения данных из PostgreSQL выглядит так:

########################Задача чтения из PostgreSQL#######################################
'postgres://my-database'

import psycopg2

from google.colab import files

code = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import os
import psycopg2
#import requests

def read_from_PostgreSQL_function(**kwargs):
    connection_string = 'postgres://my-database'
    conn = psycopg2.connect(connection_string)

    select_query = """
    SELECT
        orders.id,
        orders.date,
        orders.sum,
        product.name,
        product.price,
        order_product.quantity,
        provider.name,
        customer.name,
        customer.phone,
        customer.email,
        customer_states.name,
        delivery.date,
        delivery.address,
        delivery.price
    FROM
        orders
        JOIN order_product ON order_product.order = orders.id
        JOIN product ON product.id=order_product.product
        JOIN provider ON provider.id = product.provider
        JOIN customer ON orders.customer = customer.id
        JOIN customer_states ON customer_states.id = customer.state
        JOIN order_states ON order_states.id = orders.state
        JOIN delivery ON orders.delivery = delivery.id
     WHERE (orders.state=4) AND (orders.date BETWEEN (CURRENT_DATE-90) AND (CURRENT_DATE))
    """

    cur = conn.cursor()
    cur.execute(select_query)
    results = cur.fetchall()

    cur.close()
    conn.close()

    return results

'''

with open('/root/airflow/dags/ANNA_extract_task.py', 'w') as f:
    f.write(code)

!cp ~/airflow/dags/ANNA_extract_task.py /content/airflow/dags/ANNA_extract_task.py

А задача их преобразования извлеченных данных в JSON-документ выглядит так:

############################задача формирования JSON-документов##################################
from google.colab import files

code = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import json
import os

def write_to_JSON_function(**kwargs):
    results = kwargs['ti'].xcom_pull(task_ids='extract_task')

    orders = []
    order_dict = {}

    for result in results:
        order_id, order_date, order_sum, product_name, product_price, product_quantity, product_provider, customer_name, customer_phone, customer_email, customer_status, delivery_date, delivery_address, delivery_price = result

        if order_id not in order_dict:
            order_dict[order_id] = {
                'order_id': order_id,
                'order_date': str(order_date),
                'order_sum': order_sum,
                'products': [],
                'customer': {
                    'customer_name': customer_name,
                    'customer_phone': customer_phone,
                    'customer_email': customer_email,
                    'customer_status': customer_status
                },
                'delivery': {
                    'delivery_date': str(delivery_date),
                    'delivery_address': delivery_address,
                    'delivery_price': delivery_price
                }
            }

        product = {
            'product_name': product_name,
            'product_price': product_price,
            'product_quantity': product_quantity,
            'product_provider': product_provider
        }

        order_dict[order_id]['products'].append(product)

    orders = list(order_dict.values())

    data = {
        'orders': orders
    }

    filename = datetime.now().strftime("%Y-%m-%d_%H-%M-%S.json")
    filepath = os.path.join('/content/airflow/files/delivered', filename)

    with open(filepath, 'w') as f:
        json.dump(data, f)
        f.close()

    return data

'''

with open('/root/airflow/dags/ANNA_transform_task.py', 'w') as f:
    f.write(code)

!cp ~/airflow/dags/ANNA_transform_task.py /content/airflow/dags/ANNA_transform_task.py

Задача загрузки в Elasticsearch:

############################задача записи в Elasticsearch##################################
from google.colab import files

code = '''
from airflow import DAG
from airflow.operators.python import PythonOperator
import requests
from elasticsearch7 import Elasticsearch

def write_to_ElasticDB_function(**kwargs):
  data = kwargs['ti'].xcom_pull(task_ids='transform_task')

  # Установка подключения к Elasticsearch
  es = Elasticsearch(
    hosts=[{'host': my_elasict_host.bonsaisearch.net', 'port': 443, 'use_ssl': True}],
    http_auth=('my-elastic-user', 'my-password')
   )

  # Запись данных в индекс
  response = es.index(index='orders_index', body=data)

  # Проверка статуса ответа
  if response['result'] == 'created':
    print("Данные успешно записаны в индекс orders_index")
    notification = 'success'
  else:
    print("Не удалось записать данные в индекс")
    notification = 'failed'

  return  notification

'''

with open('/root/airflow/dags/ANNA_load_task.py', 'w') as f:
    f.write(code)

!cp ~/airflow/dags/ANNA_load_task.py /content/airflow/dags/ANNA_load_task.py

Сам DAG-файл создается следующим кодом:

########################из разных файлов DAG чтения из PostgreSQL и записи JSON в Elastic############################

from google.colab import files

code = '''

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.providers.telegram.operators.telegram import TelegramOperator
from datetime import datetime, timedelta
from ANNA_extract_task import read_from_PostgreSQL_function
from ANNA_transform_task import write_to_JSON_function
from ANNA_load_task import write_to_ElasticDB_function

default_args = {
    'owner': 'airflow',
    'start_date': datetime.now() - timedelta(days=1),
    'retries': 1
}

dag = DAG(
    dag_id='ANNA_DAG_ETL_from_PG_2_ElK',
    default_args=default_args,
    schedule_interval='@daily'
)

start_task = DummyOperator(task_id='start_task', dag=dag)

extract_task = PythonOperator(
    task_id='extract_task',
    provide_context=True,
    python_callable=read_from_PostgreSQL_function,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform_task',
    provide_context=True,
    python_callable=write_to_JSON_function,
    dag=dag
)

load_task = PythonOperator(
    task_id='load_task',
    provide_context=True,
    python_callable=write_to_ElasticDB_function,
    dag=dag
)

telegram_token = 'my TG-token'
telegram_chat_id = 'my chat-id'

now=datetime.now()

send_notification_task = TelegramOperator(
    task_id='send_notification_task',
    token=telegram_token,
    chat_id=telegram_chat_id,
    text='ETL-process has been done {} with result {}'.format(
        now.strftime("%m/%d/%Y, %H:%M:%S"),
        '{{ ti.xcom_pull(key="return_value", task_ids="load_task") }}'
    ),
    dag=dag
)

end_task = DummyOperator(task_id='end_task', dag=dag)

start_task >> extract_task >> transform_task >> load_task >> send_notification_task >> end_task

'''

with open('/root/airflow/dags/ANNA_DAG_from_PotgreSQL_2_Elastic.py', 'w') as f:
    f.write(code)

!cp ~/airflow/dags/ANNA_DAG_ETL_from_PG_2_ElK.py /content/airflow/dags/ANNA_DAG_ETL_from_PG_2_ElK.py
!airflow dags unpause ANNA_DAG_ETL_from_PG_2_ElK

Чтобы увидеть этот DAG в списке всех конвейеров, нужно запустить планировщик AirFlow в фоновом режиме:

!airflow scheduler --daemon

При отсутствии ошибок DAG появится в веб-интерфейсе:

Перечень DAG Apache AirFlow
Перечень DAG Apache AirFlow

Передача данных между задачами происходит через механизм XCom, список всех этих объектов что можно также посмотреть в веб-интерфейсе.

XCom AirFlow
Объекты XCom для передачи данных между задачами AirFlow

Результаты выполнения запусков созданного DAG и каждой из его задач можно просмотреть в его деталях.

Выполнение DAG Airflow
Выполнение DAG Airflow

После успешного выполнения DAG в пользовательской папке Colab появятся JSON-файлы с данными, а в Elasticsearch – данные из этих JSON-документов. Проверить это можно, отправив POST-запрос к индексу, используя DSL-язык запросов Elasticsearch в консоли платформы bonsai. Запрос с использованием API поиска будет отправляться к индексу по маршруту /orders_index/_search, а в теле запроса будет выражение для возврата всех результатов:

{
   "query":{
      "match_all":{}
   }
}
Запрос к индексу Elasticsearch с API поиска и DSL-query
Запрос к индексу Elasticsearch с API поиска и DSL-query

Поскольку последней задачей в моем DAG является отправка уведомления о результатах ETL-процесса в Телеграмм-бот, сообщение окажется в чате.

Уведомление о выполнении DAG AIrFlow в чат-бот Telegram
Уведомление о выполнении DAG AIrFlow в чат-бот Telegram

Как улучшить этот DAG, реализовав программное подключение к внешним источникам и чтение SQL-запросов из файлов, читайте в нашей новой статье.

Data Pipeline на Apache Airflow

Код курса
AIRF
Ближайшая дата курса
19 марта, 2025
Продолжительность
24 ак.часов
Стоимость обучения
72 000 руб.

Освойте Apache AirFlow  и его применение в дата-инженерии и аналитике больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту