Как добавить свое соединение в AirFlow на Colab: DAG с отправкой данных в Телеграм-чат

Телеграм AirFlow, DAG Apache AirFlow Colab пример, Apache AirFlow GUI Google Colab, обучение Apache AirFlow, курсы дата-инженеров, обучение разработчиков Big Data, разработка AirFlow конвейеров, Школа Больших Данных Учебный Центр Коммерсант

В этой статье рассмотрим, как добавить собственное соединение в Apache AirFlow, запустив его в интерактивной среде Colab с помощью Python-кода, и использовать его при отправке результатов выполнения задач DAG в свой чат-бот Телеграм.

Постановка задачи: DAG с отправкой данных в Телеграм

Недавно я подробно рассказывала, как настроить AirFlow в Google Cloud Platform, и запускать DAG-файлы из Colab, используя удаленный исполнитель. Чтобы не повторяться, сразу перейду к коду и проблемам, с которыми я столкнулась при попытке реализовать DAG (Directed Acyclic Graph) с задачей отправки данных в чат-бот Телеграмм. Как обычно, мой конвейер обработки данных очень прост – DAG под названием ANNA_DAG_TG состоит из 5 задач и состоит из следующих операторов:

  • task1 – BashOperator выводит номер запуска DAG;
  • task2 – BashOperator выводит текущую дату;
  • task3 – BashOperator выводит текущее время;
  • task4 — BashOperator выводит результаты выполнения предыдущих задач;
  • task5 – TelegramOperator отправляет результаты выполнения предыдущей задачи в мой чат-бот Телеграм.
DAG AirFlow
DAG AirFlow

Код этого DAG выглядит следующим образом:

import datetime
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.providers.telegram.operators.telegram import TelegramOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'email_on_failure': True,
    'email_on_success': True,
    'email': ['vichugova.anna@gmail.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'ANNA_DAG_TG',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)

# Ниже определяются операторы, которые будут выполняться в DAG
# Формирование текста с номером запуска DAG в task1, вывод текущей даты в task2,
# вывод текущего времени в task3, вывод результатов выполнения task2 и task3 в task4,
# отправка результатов в телеграм-чат в task 5
task1 = BashOperator(
    task_id='task1',
    bash_command='echo "Запуск DAG {{ dag_run.run_id }}" | tee /tmp/dag_run_id.txt',
    dag=dag,
)

task2 = BashOperator(
    task_id='task2',
    bash_command='echo "Сегодня $(date +%d-%m-%Y)"',
    dag=dag,
)

task3 = BashOperator(
    task_id='task3',
    bash_command='echo "в $(date +%H:%M:%S)"',
    dag=dag,
)

task4 = BashOperator(
    task_id='task4',
    bash_command='echo "{{ ti.xcom_pull(task_ids=\'task1\') }}. {{ ti.xcom_pull(task_ids=\'task2\') }} {{ ti.xcom_pull(task_ids=\'task3\') }}. Выполнено успешно"',
    dag=dag,
)

telegram_token = 'мой ТГ-токен'
telegram_chat_id = 'ИД моего чат-бота'


task5 = TelegramOperator(
    task_id='task5', 
    token=telegram_token, 
    chat_id=telegram_chat_id, 
    text='{{ ti.xcom_pull(key="return_value", task_ids="task4") }}',
    dag=dag  
)

#Определение порядка выполнения операторов в DAG: task1 должен выполниться первым,
#затем task2 и task3 могут выполняться параллельно, после чего task4, а затем task5 в конце
task1 >> [task2, task3] >> task4 >> task5

Для передачи данных между операторами в DAG используется механизм XCom, который позволяет обмениваться данными между задачами внутри DAG. В этом коде явно передаются параметры моего заранее созданного чат-бота Телеграм. Код работает, будучи оформленным в файл ANNA_DAG_TG.py и загруженным в папку /content/airflow/dags. Чтобы он попал в AirFlow, нужно скопировать этот файл в папку /airflow/dags/, которая находится в операционной системе Linux, используемой в Colab. Для этого следует в отдельной ячейке Colab выполнить такой код:

!cp /content/airflow/dags/ANNA_DAG_TG.py ~/airflow/dags/ANNA_DAG_TG.py
!airflow dags unpause ANNA_DAG_TG

Чтобы использовать собственный DAG, его нужно положить в папку /airflow/dags/, скопировав из пользовательской папки в Colab, которой можно пользоваться, подключив Google-диск. Это очень удобно для отладки DAG: можно просто удалить предыдущий Python-файл и загрузить новый. Поэтому перед выполнением копирования файла DAG необходимо выполнить импорт и монтирование Гугл-диска, а также создать каталога для хранения DAG. Как это сделать, рассмотрим далее.

Запуск Apache AirFlow в Google Colab

Поскольку при использовании Colab веб-сервер AirFlow запускается на удаленной машине, чтобы получить доступ к веб-интерфейсу фреймворка, необходимо прокинуть туннель, который делает локальный хост доступным извне. Как всегда, для этого я использую утилиту ngrok, которая позволяет открыть доступ к внутренним ресурсам машины, на которой он запущен, из внешней сети путем создания публичного URL-адреса, все запросы на который будут переброшены на локальный адрес и заданный порт удаленной машины.

Чтобы использовать ngrok и другие библиотеки и модули, включая AirFlow, их необходимо установить и импортировать в Colab:

#Установка Apache Airflow и инициализация базы данных.
!pip install apache-airflow
!airflow initdb

#Установка инструмента ngrok для создания безопасного туннеля для доступа к веб-интерфейсу Airflow из любого места
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
!pip install pyngrok

#импорт модулей
from pyngrok import ngrok
import sys
import os
import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

from google.colab import drive
drive.mount('/content/drive')
os.makedirs('/content/airflow/dags', exist_ok=True)
dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags")
os.makedirs(dags_folder, exist_ok=True)

#Задание переменной auth_token для аутентификации в сервисе ngrok.
auth_token = "…здесь ваш токен аутентификации в сервисе ngrok…"
# Since we can't access Colab notebooks IP directly we'll use
# ngrok to create a public URL for the server via a tunnel

# Authenticate ngrok
# https://dashboard.ngrok.com/signup
# Then go to the "Your Authtoken" tab in the sidebar and copy the API key

#Аутентификация в сервисе ngrok с помощью auth_token
os.system(f"ngrok authtoken {auth_token}")

#Запуск ngrok, который создаст публичный URL для сервера через туннель
#для доступа к веб-интерфейсу Airflow из любого места.
#addr="8888" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP
public_url = ngrok.connect(addr="8888", proto="http")

#Вывод публичного URL для доступа к веб-интерфейсу Airflow
print("Адрес Airflow GUI:", public_url)

В области вывода показана ссылка для доступа к веб-серверу GUI AirFlow.

ngrok Colab AirFlow
Туннелирование с помощью ngrok

Чтобы войти в веб-интерфейс фреймворка, нужно создать пользователя, предварительно инициализировав базу данных AirFlow – по умолчанию это легковесная резидентная СУБД SQLite, которая хранится в памяти, а не на жестком диске. Например, я создала пользователя anna, под логином и паролем которого можно будет войти в веб-интерфейс AirFlow:

#############################ячейка №3 в Google Colab#########################
!airflow db init #Инициализация базы данных Airflow
!airflow upgradedb #Обновление базы данных Airflow

#Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, адресом электронной почты anna.doe@example.com и паролем password.
#Этот пользователь будет иметь роль Admin, которая дает полный доступ к интерфейсу Airflow.
!airflow users create --username anna --firstname Anna --lastname Anna --email anna@example.com --role Admin --password password

Далее можно, наконец, запустить веб-сервер Airflow, чтобы управлять задачами DAG через веб-интерфейс. Сделаем это на порту 8888 в фоновом режиме (daemon mode). Команда !airflow webserver запускает веб-сервер Airflow, который обрабатывает HTTP-запросы и позволяет управлять задачами DAG через веб-интерфейс:

!airflow webserver --port 8888 --daemon

Аргумент —port 8888 задают порт, на котором будет запущен веб-сервер, а аргумент —daemon запускает сервер в фоновом режиме, чтобы процесс выполнялся в фоновом режиме и не блокировал терминал, позволяя запускать другие ячейки Colab. После выполнения этой команды можно открыть веб-браузер и перейти на страницу URL-адреса, полученного ранее с помощью утилиты ngrok, которая перебрасывает локальный хост http://localhost:8888 на внешний URL-адрес. Теперь можно войти в веб-интерфейс Apache AirFlow, используя логин и пароль ранее созданного пользователя.

Чтобы увидеть все запланированные, исполняющиеся и выполненные DAG в разделе DAGs, необходимо запустить планировщик AirFlow. Удобнее всего это сделать тоже в фоновом режиме с помощью аргумента —daemon:

!airflow scheduler --daemon

В разделе DAG открылись цепочки задач, которые создаются по умолчанию в качестве демонстрационных и обучающих материалов, а также мой пользовательский DAG:

Перечень DAG AirFlow
Перечень DAG в веб-интерфейсе AirFlow

Используя команду Trigger DAG в веб-интерфейсе, я запустила свой конвейер задач, который успешно выполнился с 3-го раза.

Запуски DAG AirFlow
Сводка по запускам DAG

Первые 2 запуска моего DAG не удались, т.к. я задавала параметры соединения с Телеграм не явно, а в списке всех соединений, обращаясь к ним с помощью следующего кода в DAG:

config = ConfigParser()
config.read(config_file)  
telegram_token = config.get('telegram', 'token')
telegram_chat_id= config.get('telegram', 'chat_id')

task5 = TelegramOperator(
    task_id='task5',  
    telegram_conn_id='telegram_conn',  
    text='The execution results are: {{ ti.xcom_pull(key="return_value", task_ids="task4") }}',    
    chat_id=telegram_chat_id,      
    dag=dag    
)

Но оказалось, что Airflow worker, где запускается DAG, не имел доступа к конфигурационному файлу с этими параметрами, т.к. я передала их только в Colab. Чтобы добавить свое соединение в AirFlow, пришлось выполнить несколько действий, причем, как оказалось, напрасно. Однако, теперь я знаю, как добавить свое соединение и покажу вам.

Как добавить свое соединение в Apache AirFlow

Просмотреть перечень поддерживаемых подключений к внешним сервисам в Apache AirFlow прямо можно в веб-интерфейсе. Разумеется, изначально подключение к Телеграм там отсутствует.

Подключения к внешним системам AirFlow
Подключения к внешним системам AirFlow

Причем, веб-интерфейс AirFlow даже не позволяет создать нужное соединение, ограничивая выбор типа:

Добавление нового подключения AirFlow
Добавление нового подключения

Исправить это можно, запустив в Colab следующий код:

from airflow.models import Connection
from airflow import settings

conn = Connection(
    conn_id='telegram',
    conn_type='Telegram',
    login='',
    password='',
    extra='{"token": "мой ТГ-токен", "chat_id": "ИД моего чата"}'
)

session = settings.Session()
session.add(conn)
session.commit()

В результате выполнения этого кода соединение с Телеграм появится в списке:

Добавленное подключение AirFlow
Добавленное подключение

Внутренние параметры соединения можно посмотреть в деталях:

Детали Телеграм-подключения AirFlow
Детали Телеграм-подключения

Впрочем, как я уже отметила выше, эти действия были излишними, если явно задать параметры подключения в коде DAG, который после этого успешно выполнился, отправив сообщения в мой ТГ-бот.

Телеграм AirFlow
Результаты выполнения задачи отправки данных в Телеграм-бот

Как обычно, по завершении работы с веб-интерфейсом AirFlow надо освободить занятый порт с помощью следующего кода:

!pkill -f ngrok #поиск и завершение процесса, который запущен с помощью команды ngrok
# отключение публичного URL, созданного с помощью ngrok для веб-сервера Airflow
ngrok.disconnect(public_url=public_url)

О недостатках работы с AirFlow в Colab читайте в новой статье.

Узнайте больше про использование Apache AirFlow  для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту