В этой статье рассмотрим, как добавить собственное соединение в Apache AirFlow, запустив его в интерактивной среде Colab с помощью Python-кода, и использовать его при отправке результатов выполнения задач DAG в свой чат-бот Телеграм.
Постановка задачи: DAG с отправкой данных в Телеграм
Недавно я подробно рассказывала, как настроить AirFlow в Google Cloud Platform, и запускать DAG-файлы из Colab, используя удаленный исполнитель. Чтобы не повторяться, сразу перейду к коду и проблемам, с которыми я столкнулась при попытке реализовать DAG (Directed Acyclic Graph) с задачей отправки данных в чат-бот Телеграмм. Как обычно, мой конвейер обработки данных очень прост – DAG под названием ANNA_DAG_TG состоит из 5 задач и состоит из следующих операторов:
- task1 – BashOperator выводит номер запуска DAG;
- task2 – BashOperator выводит текущую дату;
- task3 – BashOperator выводит текущее время;
- task4 — BashOperator выводит результаты выполнения предыдущих задач;
- task5 – TelegramOperator отправляет результаты выполнения предыдущей задачи в мой чат-бот Телеграм.
Код этого DAG выглядит следующим образом:
import datetime import os from datetime import datetime, timedelta from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.providers.telegram.operators.telegram import TelegramOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.now(), 'email_on_failure': True, 'email_on_success': True, 'email': ['vichugova.anna@gmail.com'], 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'ANNA_DAG_TG', default_args=default_args, schedule_interval=timedelta(days=1), ) # Ниже определяются операторы, которые будут выполняться в DAG # Формирование текста с номером запуска DAG в task1, вывод текущей даты в task2, # вывод текущего времени в task3, вывод результатов выполнения task2 и task3 в task4, # отправка результатов в телеграм-чат в task 5 task1 = BashOperator( task_id='task1', bash_command='echo "Запуск DAG {{ dag_run.run_id }}" | tee /tmp/dag_run_id.txt', dag=dag, ) task2 = BashOperator( task_id='task2', bash_command='echo "Сегодня $(date +%d-%m-%Y)"', dag=dag, ) task3 = BashOperator( task_id='task3', bash_command='echo "в $(date +%H:%M:%S)"', dag=dag, ) task4 = BashOperator( task_id='task4', bash_command='echo "{{ ti.xcom_pull(task_ids=\'task1\') }}. {{ ti.xcom_pull(task_ids=\'task2\') }} {{ ti.xcom_pull(task_ids=\'task3\') }}. Выполнено успешно"', dag=dag, ) telegram_token = 'мой ТГ-токен' telegram_chat_id = 'ИД моего чат-бота' task5 = TelegramOperator( task_id='task5', token=telegram_token, chat_id=telegram_chat_id, text='{{ ti.xcom_pull(key="return_value", task_ids="task4") }}', dag=dag ) #Определение порядка выполнения операторов в DAG: task1 должен выполниться первым, #затем task2 и task3 могут выполняться параллельно, после чего task4, а затем task5 в конце task1 >> [task2, task3] >> task4 >> task5
Для передачи данных между операторами в DAG используется механизм XCom, который позволяет обмениваться данными между задачами внутри DAG. В этом коде явно передаются параметры моего заранее созданного чат-бота Телеграм. Код работает, будучи оформленным в файл ANNA_DAG_TG.py и загруженным в папку /content/airflow/dags. Чтобы он попал в AirFlow, нужно скопировать этот файл в папку /airflow/dags/, которая находится в операционной системе Linux, используемой в Colab. Для этого следует в отдельной ячейке Colab выполнить такой код:
!cp /content/airflow/dags/ANNA_DAG_TG.py ~/airflow/dags/ANNA_DAG_TG.py !airflow dags unpause ANNA_DAG_TG
Чтобы использовать собственный DAG, его нужно положить в папку /airflow/dags/, скопировав из пользовательской папки в Colab, которой можно пользоваться, подключив Google-диск. Это очень удобно для отладки DAG: можно просто удалить предыдущий Python-файл и загрузить новый. Поэтому перед выполнением копирования файла DAG необходимо выполнить импорт и монтирование Гугл-диска, а также создать каталога для хранения DAG. Как это сделать, рассмотрим далее.
Запуск Apache AirFlow в Google Colab
Поскольку при использовании Colab веб-сервер AirFlow запускается на удаленной машине, чтобы получить доступ к веб-интерфейсу фреймворка, необходимо прокинуть туннель, который делает локальный хост доступным извне. Как всегда, для этого я использую утилиту ngrok, которая позволяет открыть доступ к внутренним ресурсам машины, на которой он запущен, из внешней сети путем создания публичного URL-адреса, все запросы на который будут переброшены на локальный адрес и заданный порт удаленной машины.
Чтобы использовать ngrok и другие библиотеки и модули, включая AirFlow, их необходимо установить и импортировать в Colab:
#Установка Apache Airflow и инициализация базы данных. !pip install apache-airflow !airflow initdb #Установка инструмента ngrok для создания безопасного туннеля для доступа к веб-интерфейсу Airflow из любого места !wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip !unzip ngrok-stable-linux-amd64.zip !pip install pyngrok #импорт модулей from pyngrok import ngrok import sys import os import datetime from airflow import DAG from airflow.operators.bash import BashOperator from google.colab import drive drive.mount('/content/drive') os.makedirs('/content/airflow/dags', exist_ok=True) dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags") os.makedirs(dags_folder, exist_ok=True) #Задание переменной auth_token для аутентификации в сервисе ngrok. auth_token = "…здесь ваш токен аутентификации в сервисе ngrok…" # Since we can't access Colab notebooks IP directly we'll use # ngrok to create a public URL for the server via a tunnel # Authenticate ngrok # https://dashboard.ngrok.com/signup # Then go to the "Your Authtoken" tab in the sidebar and copy the API key #Аутентификация в сервисе ngrok с помощью auth_token os.system(f"ngrok authtoken {auth_token}") #Запуск ngrok, который создаст публичный URL для сервера через туннель #для доступа к веб-интерфейсу Airflow из любого места. #addr="8888" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP public_url = ngrok.connect(addr="8888", proto="http") #Вывод публичного URL для доступа к веб-интерфейсу Airflow print("Адрес Airflow GUI:", public_url)
В области вывода показана ссылка для доступа к веб-серверу GUI AirFlow.
Чтобы войти в веб-интерфейс фреймворка, нужно создать пользователя, предварительно инициализировав базу данных AirFlow – по умолчанию это легковесная резидентная СУБД SQLite, которая хранится в памяти, а не на жестком диске. Например, я создала пользователя anna, под логином и паролем которого можно будет войти в веб-интерфейс AirFlow:
#############################ячейка №3 в Google Colab######################### !airflow db init #Инициализация базы данных Airflow !airflow upgradedb #Обновление базы данных Airflow #Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, адресом электронной почты anna.doe@example.com и паролем password. #Этот пользователь будет иметь роль Admin, которая дает полный доступ к интерфейсу Airflow. !airflow users create --username anna --firstname Anna --lastname Anna --email anna@example.com --role Admin --password password
Далее можно, наконец, запустить веб-сервер Airflow, чтобы управлять задачами DAG через веб-интерфейс. Сделаем это на порту 8888 в фоновом режиме (daemon mode). Команда !airflow webserver запускает веб-сервер Airflow, который обрабатывает HTTP-запросы и позволяет управлять задачами DAG через веб-интерфейс:
!airflow webserver --port 8888 --daemon
Аргумент —port 8888 задают порт, на котором будет запущен веб-сервер, а аргумент —daemon запускает сервер в фоновом режиме, чтобы процесс выполнялся в фоновом режиме и не блокировал терминал, позволяя запускать другие ячейки Colab. После выполнения этой команды можно открыть веб-браузер и перейти на страницу URL-адреса, полученного ранее с помощью утилиты ngrok, которая перебрасывает локальный хост http://localhost:8888 на внешний URL-адрес. Теперь можно войти в веб-интерфейс Apache AirFlow, используя логин и пароль ранее созданного пользователя.
Чтобы увидеть все запланированные, исполняющиеся и выполненные DAG в разделе DAGs, необходимо запустить планировщик AirFlow. Удобнее всего это сделать тоже в фоновом режиме с помощью аргумента —daemon:
!airflow scheduler --daemon
В разделе DAG открылись цепочки задач, которые создаются по умолчанию в качестве демонстрационных и обучающих материалов, а также мой пользовательский DAG:
Используя команду Trigger DAG в веб-интерфейсе, я запустила свой конвейер задач, который успешно выполнился с 3-го раза.
Первые 2 запуска моего DAG не удались, т.к. я задавала параметры соединения с Телеграм не явно, а в списке всех соединений, обращаясь к ним с помощью следующего кода в DAG:
config = ConfigParser() config.read(config_file) telegram_token = config.get('telegram', 'token') telegram_chat_id= config.get('telegram', 'chat_id') task5 = TelegramOperator( task_id='task5', telegram_conn_id='telegram_conn', text='The execution results are: {{ ti.xcom_pull(key="return_value", task_ids="task4") }}', chat_id=telegram_chat_id, dag=dag )
Но оказалось, что Airflow worker, где запускается DAG, не имел доступа к конфигурационному файлу с этими параметрами, т.к. я передала их только в Colab. Чтобы добавить свое соединение в AirFlow, пришлось выполнить несколько действий, причем, как оказалось, напрасно. Однако, теперь я знаю, как добавить свое соединение и покажу вам.
Как добавить свое соединение в Apache AirFlow
Просмотреть перечень поддерживаемых подключений к внешним сервисам в Apache AirFlow прямо можно в веб-интерфейсе. Разумеется, изначально подключение к Телеграм там отсутствует.
Причем, веб-интерфейс AirFlow даже не позволяет создать нужное соединение, ограничивая выбор типа:
Исправить это можно, запустив в Colab следующий код:
from airflow.models import Connection from airflow import settings conn = Connection( conn_id='telegram', conn_type='Telegram', login='', password='', extra='{"token": "мой ТГ-токен", "chat_id": "ИД моего чата"}' ) session = settings.Session() session.add(conn) session.commit()
В результате выполнения этого кода соединение с Телеграм появится в списке:
Внутренние параметры соединения можно посмотреть в деталях:
Впрочем, как я уже отметила выше, эти действия были излишними, если явно задать параметры подключения в коде DAG, который после этого успешно выполнился, отправив сообщения в мой ТГ-бот.
Как обычно, по завершении работы с веб-интерфейсом AirFlow надо освободить занятый порт с помощью следующего кода:
!pkill -f ngrok #поиск и завершение процесса, который запущен с помощью команды ngrok # отключение публичного URL, созданного с помощью ngrok для веб-сервера Airflow ngrok.disconnect(public_url=public_url)
О недостатках работы с AirFlow в Colab читайте в новой статье.
Узнайте больше про использование Apache AirFlow для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: