Как добавить свое соединение в AirFlow на Colab: DAG с отправкой данных в Телеграм-чат

Как добавить свое соединение в AirFlow на Colab: DAG с отправкой данных в Телеграм-чат

    В этой статье рассмотрим, как добавить собственное соединение в Apache AirFlow, запустив его в интерактивной среде Colab с помощью Python-кода, и использовать его при отправке результатов выполнения задач DAG в свой чат-бот Телеграм.

    Постановка задачи: DAG с отправкой данных в Телеграм

    Недавно я подробно рассказывала, как настроить AirFlow в Google Cloud Platform, и запускать DAG-файлы из Colab, используя удаленный исполнитель. Чтобы не повторяться, сразу перейду к коду и проблемам, с которыми я столкнулась при попытке реализовать DAG (Directed Acyclic Graph) с задачей отправки данных в чат-бот Телеграмм. Как обычно, мой конвейер обработки данных очень прост – DAG под названием ANNA_DAG_TG состоит из 5 задач и состоит из следующих операторов:

    • task1 – BashOperator выводит номер запуска DAG;
    • task2 – BashOperator выводит текущую дату;
    • task3 – BashOperator выводит текущее время;
    • task4 — BashOperator выводит результаты выполнения предыдущих задач;
    • task5 – TelegramOperator отправляет результаты выполнения предыдущей задачи в мой чат-бот Телеграм.
    DAG AirFlow
    DAG AirFlow

    Код этого DAG выглядит следующим образом:

    import datetime
    import os
    from datetime import datetime, timedelta
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator
    from airflow.providers.telegram.operators.telegram import TelegramOperator
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime.now(),
        'email_on_failure': True,
        'email_on_success': True,
        'email': ['vichugova.anna@gmail.com'],
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    dag = DAG(
        'ANNA_DAG_TG',
        default_args=default_args,
        schedule_interval=timedelta(days=1),
    )
    
    # Ниже определяются операторы, которые будут выполняться в DAG
    # Формирование текста с номером запуска DAG в task1, вывод текущей даты в task2,
    # вывод текущего времени в task3, вывод результатов выполнения task2 и task3 в task4,
    # отправка результатов в телеграм-чат в task 5
    task1 = BashOperator(
        task_id='task1',
        bash_command='echo "Запуск DAG {{ dag_run.run_id }}" | tee /tmp/dag_run_id.txt',
        dag=dag,
    )
    
    task2 = BashOperator(
        task_id='task2',
        bash_command='echo "Сегодня $(date +%d-%m-%Y)"',
        dag=dag,
    )
    
    task3 = BashOperator(
        task_id='task3',
        bash_command='echo "в $(date +%H:%M:%S)"',
        dag=dag,
    )
    
    task4 = BashOperator(
        task_id='task4',
        bash_command='echo "{{ ti.xcom_pull(task_ids=\'task1\') }}. {{ ti.xcom_pull(task_ids=\'task2\') }} {{ ti.xcom_pull(task_ids=\'task3\') }}. Выполнено успешно"',
        dag=dag,
    )
    
    telegram_token = 'мой ТГ-токен'
    telegram_chat_id = 'ИД моего чат-бота'
    
    
    task5 = TelegramOperator(
        task_id='task5', 
        token=telegram_token, 
        chat_id=telegram_chat_id, 
        text='{{ ti.xcom_pull(key="return_value", task_ids="task4") }}',
        dag=dag  
    )
    
    #Определение порядка выполнения операторов в DAG: task1 должен выполниться первым,
    #затем task2 и task3 могут выполняться параллельно, после чего task4, а затем task5 в конце
    task1 >> [task2, task3] >> task4 >> task5

    Для передачи данных между операторами в DAG используется механизм XCom, который позволяет обмениваться данными между задачами внутри DAG. В этом коде явно передаются параметры моего заранее созданного чат-бота Телеграм. Код работает, будучи оформленным в файл ANNA_DAG_TG.py и загруженным в папку /content/airflow/dags. Чтобы он попал в AirFlow, нужно скопировать этот файл в папку /airflow/dags/, которая находится в операционной системе Linux, используемой в Colab. Для этого следует в отдельной ячейке Colab выполнить такой код:

    !cp /content/airflow/dags/ANNA_DAG_TG.py ~/airflow/dags/ANNA_DAG_TG.py
    !airflow dags unpause ANNA_DAG_TG

    Чтобы использовать собственный DAG, его нужно положить в папку /airflow/dags/, скопировав из пользовательской папки в Colab, которой можно пользоваться, подключив Google-диск. Это очень удобно для отладки DAG: можно просто удалить предыдущий Python-файл и загрузить новый. Поэтому перед выполнением копирования файла DAG необходимо выполнить импорт и монтирование Гугл-диска, а также создать каталога для хранения DAG. Как это сделать, рассмотрим далее.

    Запуск Apache AirFlow в Google Colab

    Поскольку при использовании Colab веб-сервер AirFlow запускается на удаленной машине, чтобы получить доступ к веб-интерфейсу фреймворка, необходимо прокинуть туннель, который делает локальный хост доступным извне. Как всегда, для этого я использую утилиту ngrok, которая позволяет открыть доступ к внутренним ресурсам машины, на которой он запущен, из внешней сети путем создания публичного URL-адреса, все запросы на который будут переброшены на локальный адрес и заданный порт удаленной машины.

    Чтобы использовать ngrok и другие библиотеки и модули, включая AirFlow, их необходимо установить и импортировать в Colab:

    #Установка Apache Airflow и инициализация базы данных.
    !pip install apache-airflow
    !airflow initdb
    
    #Установка инструмента ngrok для создания безопасного туннеля для доступа к веб-интерфейсу Airflow из любого места
    !wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
    !unzip ngrok-stable-linux-amd64.zip
    !pip install pyngrok
    
    #импорт модулей
    from pyngrok import ngrok
    import sys
    import os
    import datetime
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    
    from google.colab import drive
    drive.mount('/content/drive')
    os.makedirs('/content/airflow/dags', exist_ok=True)
    dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags")
    os.makedirs(dags_folder, exist_ok=True)
    
    #Задание переменной auth_token для аутентификации в сервисе ngrok.
    auth_token = "…здесь ваш токен аутентификации в сервисе ngrok…"
    # Since we can't access Colab notebooks IP directly we'll use
    # ngrok to create a public URL for the server via a tunnel
    
    # Authenticate ngrok
    # https://dashboard.ngrok.com/signup
    # Then go to the "Your Authtoken" tab in the sidebar and copy the API key
    
    #Аутентификация в сервисе ngrok с помощью auth_token
    os.system(f"ngrok authtoken {auth_token}")
    
    #Запуск ngrok, который создаст публичный URL для сервера через туннель
    #для доступа к веб-интерфейсу Airflow из любого места.
    #addr="8888" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP
    public_url = ngrok.connect(addr="8888", proto="http")
    
    #Вывод публичного URL для доступа к веб-интерфейсу Airflow
    print("Адрес Airflow GUI:", public_url)

    В области вывода показана ссылка для доступа к веб-серверу GUI AirFlow.

    ngrok Colab AirFlow
    Туннелирование с помощью ngrok

    Чтобы войти в веб-интерфейс фреймворка, нужно создать пользователя, предварительно инициализировав базу данных AirFlow – по умолчанию это легковесная резидентная СУБД SQLite, которая хранится в памяти, а не на жестком диске. Например, я создала пользователя anna, под логином и паролем которого можно будет войти в веб-интерфейс AirFlow:

    #############################ячейка №3 в Google Colab#########################
    !airflow db init #Инициализация базы данных Airflow
    !airflow upgradedb #Обновление базы данных Airflow
    
    #Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, адресом электронной почты anna.doe@example.com и паролем password.
    #Этот пользователь будет иметь роль Admin, которая дает полный доступ к интерфейсу Airflow.
    !airflow users create --username anna --firstname Anna --lastname Anna --email anna@example.com --role Admin --password password

    Далее можно, наконец, запустить веб-сервер Airflow, чтобы управлять задачами DAG через веб-интерфейс. Сделаем это на порту 8888 в фоновом режиме (daemon mode). Команда !airflow webserver запускает веб-сервер Airflow, который обрабатывает HTTP-запросы и позволяет управлять задачами DAG через веб-интерфейс:

    !airflow webserver --port 8888 --daemon

    Аргумент —port 8888 задают порт, на котором будет запущен веб-сервер, а аргумент —daemon запускает сервер в фоновом режиме, чтобы процесс выполнялся в фоновом режиме и не блокировал терминал, позволяя запускать другие ячейки Colab. После выполнения этой команды можно открыть веб-браузер и перейти на страницу URL-адреса, полученного ранее с помощью утилиты ngrok, которая перебрасывает локальный хост http://localhost:8888 на внешний URL-адрес. Теперь можно войти в веб-интерфейс Apache AirFlow, используя логин и пароль ранее созданного пользователя.

    Чтобы увидеть все запланированные, исполняющиеся и выполненные DAG в разделе DAGs, необходимо запустить планировщик AirFlow. Удобнее всего это сделать тоже в фоновом режиме с помощью аргумента —daemon:

    !airflow scheduler --daemon

    В разделе DAG открылись цепочки задач, которые создаются по умолчанию в качестве демонстрационных и обучающих материалов, а также мой пользовательский DAG:

    Перечень DAG AirFlow
    Перечень DAG в веб-интерфейсе AirFlow

    Используя команду Trigger DAG в веб-интерфейсе, я запустила свой конвейер задач, который успешно выполнился с 3-го раза.

    Запуски DAG AirFlow
    Сводка по запускам DAG

    Первые 2 запуска моего DAG не удались, т.к. я задавала параметры соединения с Телеграм не явно, а в списке всех соединений, обращаясь к ним с помощью следующего кода в DAG:

    config = ConfigParser()
    config.read(config_file)  
    telegram_token = config.get('telegram', 'token')
    telegram_chat_id= config.get('telegram', 'chat_id')
    
    task5 = TelegramOperator(
        task_id='task5',  
        telegram_conn_id='telegram_conn',  
        text='The execution results are: {{ ti.xcom_pull(key="return_value", task_ids="task4") }}',    
        chat_id=telegram_chat_id,      
        dag=dag    
    )

    Но оказалось, что Airflow worker, где запускается DAG, не имел доступа к конфигурационному файлу с этими параметрами, т.к. я передала их только в Colab. Чтобы добавить свое соединение в AirFlow, пришлось выполнить несколько действий, причем, как оказалось, напрасно. Однако, теперь я знаю, как добавить свое соединение и покажу вам.

    Как добавить свое соединение в Apache AirFlow

    Просмотреть перечень поддерживаемых подключений к внешним сервисам в Apache AirFlow прямо можно в веб-интерфейсе. Разумеется, изначально подключение к Телеграм там отсутствует.

    Подключения к внешним системам AirFlow
    Подключения к внешним системам AirFlow

    Причем, веб-интерфейс AirFlow даже не позволяет создать нужное соединение, ограничивая выбор типа:

    Добавление нового подключения AirFlow
    Добавление нового подключения

    Исправить это можно, запустив в Colab следующий код:

    from airflow.models import Connection
    from airflow import settings
    
    conn = Connection(
        conn_id='telegram',
        conn_type='Telegram',
        login='',
        password='',
        extra='{"token": "мой ТГ-токен", "chat_id": "ИД моего чата"}'
    )
    
    session = settings.Session()
    session.add(conn)
    session.commit()

    В результате выполнения этого кода соединение с Телеграм появится в списке:

    Добавленное подключение AirFlow
    Добавленное подключение

    Внутренние параметры соединения можно посмотреть в деталях:

    Детали Телеграм-подключения AirFlow
    Детали Телеграм-подключения

    Впрочем, как я уже отметила выше, эти действия были излишними, если явно задать параметры подключения в коде DAG, который после этого успешно выполнился, отправив сообщения в мой ТГ-бот.

    Телеграм AirFlow
    Результаты выполнения задачи отправки данных в Телеграм-бот

    Как обычно, по завершении работы с веб-интерфейсом AirFlow надо освободить занятый порт с помощью следующего кода:

    !pkill -f ngrok #поиск и завершение процесса, который запущен с помощью команды ngrok
    # отключение публичного URL, созданного с помощью ngrok для веб-сервера Airflow
    ngrok.disconnect(public_url=public_url)

    О недостатках работы с AirFlow в Colab читайте в новой статье.

    Узнайте больше про использование Apache AirFlow  для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: