Содержание
В этой статье рассмотрим, как добавить собственное соединение в Apache AirFlow, запустив его в интерактивной среде Colab с помощью Python-кода, и использовать его при отправке результатов выполнения задач DAG в свой чат-бот Телеграм.
Постановка задачи: DAG с отправкой данных в Телеграм
Недавно я подробно рассказывала, как настроить AirFlow в Google Cloud Platform, и запускать DAG-файлы из Colab, используя удаленный исполнитель. Чтобы не повторяться, сразу перейду к коду и проблемам, с которыми я столкнулась при попытке реализовать DAG (Directed Acyclic Graph) с задачей отправки данных в чат-бот Телеграмм. Как обычно, мой конвейер обработки данных очень прост – DAG под названием ANNA_DAG_TG состоит из 5 задач и состоит из следующих операторов:
- task1 – BashOperator выводит номер запуска DAG;
- task2 – BashOperator выводит текущую дату;
- task3 – BashOperator выводит текущее время;
- task4 — BashOperator выводит результаты выполнения предыдущих задач;
- task5 – TelegramOperator отправляет результаты выполнения предыдущей задачи в мой чат-бот Телеграм.

Код этого DAG выглядит следующим образом:
import datetime
import os
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.providers.telegram.operators.telegram import TelegramOperator
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime.now(),
'email_on_failure': True,
'email_on_success': True,
'email': ['vichugova.anna@gmail.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
'ANNA_DAG_TG',
default_args=default_args,
schedule_interval=timedelta(days=1),
)
# Ниже определяются операторы, которые будут выполняться в DAG
# Формирование текста с номером запуска DAG в task1, вывод текущей даты в task2,
# вывод текущего времени в task3, вывод результатов выполнения task2 и task3 в task4,
# отправка результатов в телеграм-чат в task 5
task1 = BashOperator(
task_id='task1',
bash_command='echo "Запуск DAG {{ dag_run.run_id }}" | tee /tmp/dag_run_id.txt',
dag=dag,
)
task2 = BashOperator(
task_id='task2',
bash_command='echo "Сегодня $(date +%d-%m-%Y)"',
dag=dag,
)
task3 = BashOperator(
task_id='task3',
bash_command='echo "в $(date +%H:%M:%S)"',
dag=dag,
)
task4 = BashOperator(
task_id='task4',
bash_command='echo "{{ ti.xcom_pull(task_ids=\'task1\') }}. {{ ti.xcom_pull(task_ids=\'task2\') }} {{ ti.xcom_pull(task_ids=\'task3\') }}. Выполнено успешно"',
dag=dag,
)
telegram_token = 'мой ТГ-токен'
telegram_chat_id = 'ИД моего чат-бота'
task5 = TelegramOperator(
task_id='task5',
token=telegram_token,
chat_id=telegram_chat_id,
text='{{ ti.xcom_pull(key="return_value", task_ids="task4") }}',
dag=dag
)
#Определение порядка выполнения операторов в DAG: task1 должен выполниться первым,
#затем task2 и task3 могут выполняться параллельно, после чего task4, а затем task5 в конце
task1 >> [task2, task3] >> task4 >> task5
Для передачи данных между операторами в DAG используется механизм XCom, который позволяет обмениваться данными между задачами внутри DAG. В этом коде явно передаются параметры моего заранее созданного чат-бота Телеграм. Код работает, будучи оформленным в файл ANNA_DAG_TG.py и загруженным в папку /content/airflow/dags. Чтобы он попал в AirFlow, нужно скопировать этот файл в папку /airflow/dags/, которая находится в операционной системе Linux, используемой в Colab. Для этого следует в отдельной ячейке Colab выполнить такой код:
!cp /content/airflow/dags/ANNA_DAG_TG.py ~/airflow/dags/ANNA_DAG_TG.py !airflow dags unpause ANNA_DAG_TG
Чтобы использовать собственный DAG, его нужно положить в папку /airflow/dags/, скопировав из пользовательской папки в Colab, которой можно пользоваться, подключив Google-диск. Это очень удобно для отладки DAG: можно просто удалить предыдущий Python-файл и загрузить новый. Поэтому перед выполнением копирования файла DAG необходимо выполнить импорт и монтирование Гугл-диска, а также создать каталога для хранения DAG. Как это сделать, рассмотрим далее.
Запуск Apache AirFlow в Google Colab
Поскольку при использовании Colab веб-сервер AirFlow запускается на удаленной машине, чтобы получить доступ к веб-интерфейсу фреймворка, необходимо прокинуть туннель, который делает локальный хост доступным извне. Как всегда, для этого я использую утилиту ngrok, которая позволяет открыть доступ к внутренним ресурсам машины, на которой он запущен, из внешней сети путем создания публичного URL-адреса, все запросы на который будут переброшены на локальный адрес и заданный порт удаленной машины.
Чтобы использовать ngrok и другие библиотеки и модули, включая AirFlow, их необходимо установить и импортировать в Colab:
#Установка Apache Airflow и инициализация базы данных.
!pip install apache-airflow
!airflow initdb
#Установка инструмента ngrok для создания безопасного туннеля для доступа к веб-интерфейсу Airflow из любого места
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
!pip install pyngrok
#импорт модулей
from pyngrok import ngrok
import sys
import os
import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator
from google.colab import drive
drive.mount('/content/drive')
os.makedirs('/content/airflow/dags', exist_ok=True)
dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags")
os.makedirs(dags_folder, exist_ok=True)
#Задание переменной auth_token для аутентификации в сервисе ngrok.
auth_token = "…здесь ваш токен аутентификации в сервисе ngrok…"
# Since we can't access Colab notebooks IP directly we'll use
# ngrok to create a public URL for the server via a tunnel
# Authenticate ngrok
# https://dashboard.ngrok.com/signup
# Then go to the "Your Authtoken" tab in the sidebar and copy the API key
#Аутентификация в сервисе ngrok с помощью auth_token
os.system(f"ngrok authtoken {auth_token}")
#Запуск ngrok, который создаст публичный URL для сервера через туннель
#для доступа к веб-интерфейсу Airflow из любого места.
#addr="8888" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP
public_url = ngrok.connect(addr="8888", proto="http")
#Вывод публичного URL для доступа к веб-интерфейсу Airflow
print("Адрес Airflow GUI:", public_url)
В области вывода показана ссылка для доступа к веб-серверу GUI AirFlow.

Чтобы войти в веб-интерфейс фреймворка, нужно создать пользователя, предварительно инициализировав базу данных AirFlow – по умолчанию это легковесная резидентная СУБД SQLite, которая хранится в памяти, а не на жестком диске. Например, я создала пользователя anna, под логином и паролем которого можно будет войти в веб-интерфейс AirFlow:
#############################ячейка №3 в Google Colab######################### !airflow db init #Инициализация базы данных Airflow !airflow upgradedb #Обновление базы данных Airflow #Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, адресом электронной почты anna.doe@example.com и паролем password. #Этот пользователь будет иметь роль Admin, которая дает полный доступ к интерфейсу Airflow. !airflow users create --username anna --firstname Anna --lastname Anna --email anna@example.com --role Admin --password password
Далее можно, наконец, запустить веб-сервер Airflow, чтобы управлять задачами DAG через веб-интерфейс. Сделаем это на порту 8888 в фоновом режиме (daemon mode). Команда !airflow webserver запускает веб-сервер Airflow, который обрабатывает HTTP-запросы и позволяет управлять задачами DAG через веб-интерфейс:
!airflow webserver --port 8888 --daemon
Аргумент —port 8888 задают порт, на котором будет запущен веб-сервер, а аргумент —daemon запускает сервер в фоновом режиме, чтобы процесс выполнялся в фоновом режиме и не блокировал терминал, позволяя запускать другие ячейки Colab. После выполнения этой команды можно открыть веб-браузер и перейти на страницу URL-адреса, полученного ранее с помощью утилиты ngrok, которая перебрасывает локальный хост http://localhost:8888 на внешний URL-адрес. Теперь можно войти в веб-интерфейс Apache AirFlow, используя логин и пароль ранее созданного пользователя.
Чтобы увидеть все запланированные, исполняющиеся и выполненные DAG в разделе DAGs, необходимо запустить планировщик AirFlow. Удобнее всего это сделать тоже в фоновом режиме с помощью аргумента —daemon:
!airflow scheduler --daemon
В разделе DAG открылись цепочки задач, которые создаются по умолчанию в качестве демонстрационных и обучающих материалов, а также мой пользовательский DAG:

Используя команду Trigger DAG в веб-интерфейсе, я запустила свой конвейер задач, который успешно выполнился с 3-го раза.

Первые 2 запуска моего DAG не удались, т.к. я задавала параметры соединения с Телеграм не явно, а в списке всех соединений, обращаясь к ним с помощью следующего кода в DAG:
config = ConfigParser()
config.read(config_file)
telegram_token = config.get('telegram', 'token')
telegram_chat_id= config.get('telegram', 'chat_id')
task5 = TelegramOperator(
task_id='task5',
telegram_conn_id='telegram_conn',
text='The execution results are: {{ ti.xcom_pull(key="return_value", task_ids="task4") }}',
chat_id=telegram_chat_id,
dag=dag
)
Но оказалось, что Airflow worker, где запускается DAG, не имел доступа к конфигурационному файлу с этими параметрами, т.к. я передала их только в Colab. Чтобы добавить свое соединение в AirFlow, пришлось выполнить несколько действий, причем, как оказалось, напрасно. Однако, теперь я знаю, как добавить свое соединение и покажу вам.
Как добавить свое соединение в Apache AirFlow
Просмотреть перечень поддерживаемых подключений к внешним сервисам в Apache AirFlow прямо можно в веб-интерфейсе. Разумеется, изначально подключение к Телеграм там отсутствует.

Причем, веб-интерфейс AirFlow даже не позволяет создать нужное соединение, ограничивая выбор типа:

Исправить это можно, запустив в Colab следующий код:
from airflow.models import Connection
from airflow import settings
conn = Connection(
conn_id='telegram',
conn_type='Telegram',
login='',
password='',
extra='{"token": "мой ТГ-токен", "chat_id": "ИД моего чата"}'
)
session = settings.Session()
session.add(conn)
session.commit()
В результате выполнения этого кода соединение с Телеграм появится в списке:

Внутренние параметры соединения можно посмотреть в деталях:

Впрочем, как я уже отметила выше, эти действия были излишними, если явно задать параметры подключения в коде DAG, который после этого успешно выполнился, отправив сообщения в мой ТГ-бот.

Как обычно, по завершении работы с веб-интерфейсом AirFlow надо освободить занятый порт с помощью следующего кода:
!pkill -f ngrok #поиск и завершение процесса, который запущен с помощью команды ngrok # отключение публичного URL, созданного с помощью ngrok для веб-сервера Airflow ngrok.disconnect(public_url=public_url)
О недостатках работы с AirFlow в Colab читайте в новой статье.
Узнайте больше про использование Apache AirFlow для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:


