Как запустить DAG AirFlow в Google Colab: простой пример

DAG Apache AirFlow Colab пример, Apache AirFlow GUI Google Colab, обучение Apache AirFlow, курсы дата-инженеров, обучение разработчиков Big Data, разработка AirFlow конвейеров, Школа Больших Данных Учебный Центр Коммерсант

Сегодня рассмотрим, как выполнить DAG Apache AirFlow, запустив его в интерактивной среде Colab и получив доступ в веб-GUI этого фреймворка, создав туннель локального хоста на публичный URL с помощью утилиты ngrok. В качестве примера построим простой конвейер из 5 задач.

Запуск Apache AirFlow в Google Colab

Чтобы не повторять содержимое прошлой статьи, где я подробно рассказывала, как настроить AirFlow в Google Cloud Platform, и запускать DAG-файлы из Colab, используя удаленный исполнитель. Сразу перейдем к коду, позволяющему это сделать. Напомню, что при запуске в Colab веб-сервер AirFlow запускается на удаленной машине и, чтобы достучаться до веб-интерфейса фреймворка, необходимо прокинуть туннель, который делает локальный хост доступным извне. Как обычно, для этого я использую утилиту ngrok, которая позволяет открыть доступ к внутренним ресурсам машины, на которой он запущен, из внешней сети путем создания публичного URL-адреса, все запросы на который будут переброшены на локальный адрес и заданный порт удаленной машины.

Сперва надо установить набор необходимых библиотек и импортировать модули:

###############################################ячейка №1 в Google Colab#############################
#Установка Apache Airflow и инициализация базы данных.
!pip install apache-airflow
!airflow initdb

#Установка инструмента ngrok для создания безопасного туннеля для доступа к веб-интерфейсу Airflow из любого места
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
!pip install pyngrok

#импорт модулей
from pyngrok import ngrok
import sys
import os
import datetime
from airflow import DAG
from airflow.operators.bash import BashOperator

from google.colab import drive
drive.mount('/content/drive')
os.makedirs('/content/airflow/dags', exist_ok=True)
dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags")
os.makedirs(dags_folder, exist_ok=True)

Поскольку я буду запускать собственный DAG, его нужно положить в папку /airflow/dags/, скопировав из пользовательской папки в Colab. Для более удобной отладки проще всего подключить Google-диск, чтобы быстро обновлять содержимое файла с DAG, просто удаляя предыдущий и загружая новой. Именно поэтому в вышеприведенном участке кода выполняется импорт и монтирование Гугл-диска, а также создание каталога для хранения DAG.

Далее пробросим туннель с помощью ngrok, используя свой токен аутентификации, полученный ранее:

###############################################ячейка №2 в Google Colab#############################
#Задание переменной auth_token для аутентификации в сервисе ngrok.
auth_token = "…здесь ваш токен аутентификации в сервисе ngrok…"
# Since we can't access Colab notebooks IP directly we'll use
# ngrok to create a public URL for the server via a tunnel

# Authenticate ngrok
# https://dashboard.ngrok.com/signup
# Then go to the "Your Authtoken" tab in the sidebar and copy the API key

#Аутентификация в сервисе ngrok с помощью auth_token
os.system(f"ngrok authtoken {auth_token}")

#Запуск ngrok, который создаст публичный URL для сервера через туннель
#для доступа к веб-интерфейсу Airflow из любого места.
#addr="8888" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP
public_url = ngrok.connect(addr="8888", proto="http")

#Вывод публичного URL для доступа к веб-интерфейсу Airflow
print("Адрес Airflow GUI:", public_url)

В области вывода показана ссылка, которая нужна для доступа к веб-серверу AirFlow:

ngrok Colab
Туннель к внешнему URl с помощью утилиты ngrok

Затем следует инициализировать базу данных AirFlow – по умолчанию это легковесная резидентная СУБД SQLite, которая хранится в памяти, а не на жестком диске. Также я создала пользователя anna, под логином и паролем которого можно будет войти в веб-интерфейс AirFlow:

#############################ячейка №3 в Google Colab#########################
!airflow db init #Инициализация базы данных Airflow
!airflow upgradedb #Обновление базы данных Airflow

#Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, адресом электронной почты anna.doe@example.com и паролем password.
#Этот пользователь будет иметь роль Admin, которая дает полный доступ к интерфейсу Airflow.
!airflow users create --username anna --firstname Anna --lastname Anna --email anna@example.com --role Admin --password password

Запуск пользовательского DAG

Далее напишем код  свой простой конвейер — DAG (Directed Acyclic Graph) из нескольких задач и сохраним его в виде отдельного py-файла:

import random
import math
import os

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from google.colab import files
from datetime import datetime, timedelta


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime.now(),
    'email_on_failure': True,
    'email_on_success': True,
    'email': ['vichugova.anna@gmail.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    'ANNA_DAG_SaveFile',
    default_args=default_args,
    schedule_interval=timedelta(days=1),
)

def write_to_file(**context):
    execution_date = context['execution_date'].strftime('%Y-%m-%d')
    result = context['ti'].xcom_pull(task_ids='task4')
    filename = f'/content/airflow/results/{execution_date}.txt'
    with open(filename, 'a') as f:
        f.write(result + '\n')


# Ниже определяются операторы, которые будут выполняться в DAG
# Формирование текста с номером запуска DAG в task1, вывод текущей даты в task2,
# вывод текущего времени в task3, вывод результатов выполнения task2 и task3 в task4,
# и запись результатов в txt-файл с названием текущей даты в task5
task1 = BashOperator(
    task_id='task1',
    bash_command='echo "Запуск DAG {{ dag_run.run_id }}" | tee /tmp/dag_run_id.txt',
    dag=dag,
)

task2 = BashOperator(
    task_id='task2',
    bash_command='echo "Сегодня $(date +%d-%m-%Y)"',
    dag=dag,
)

task3 = BashOperator(
    task_id='task3',
    bash_command='echo "В $(date +%H:%M:%S)"',
    dag=dag,
)

task4 = BashOperator(
    task_id='task4',
    bash_command='echo "{{ ti.xcom_pull(task_ids=\'task1\') }}. {{ ti.xcom_pull(task_ids=\'task2\') }}. {{ ti.xcom_pull(task_ids=\'task3\') }}"',
    dag=dag,
)

task5 = PythonOperator(
    task_id='task5',
    python_callable=write_to_file,
    dag=dag
)

task1 >> [task2, task3] >> task4 >> task5

Представленный код определяет набор задач, связанных между собой в определенном порядке. В коде определен DAG с названием «ANNA_DAG_SaveFile», который запускается ежедневно и состоит из следующих операторов:

  • task1 – BashOperator выводит номер запуска DAG;
  • task2 – BashOperator выводит текущую дату;
  • task3 – BashOperator выводит текущее время;
  • task4 — BashOperator выводит результаты выполнения предыдущих задач;
  • task5 – PythonOperator записывает результаты выполнения DAG в файл с названием текущей даты.

DAG определен так, что каждый следующий оператор зависит от результатов выполнения предыдущего оператора. Для передачи данных между операторами используется механизм XCom, который позволяет обмениваться данными между задачами внутри DAG. Также в коде определена функция write_to_file, которая используется в задаче task5 для записи результатов выполнения DAG в файл. Функция получает текущую дату и результаты выполнения оператора task4 с помощью механизма XCom и записывает их в файл с названием текущей даты в папке /content/airflow/results, которую я создала заранее.

Сохранив код DAG в виде py-файла, я загрузила его в папку /content/airflow/dags. Чтобы он попал в AirFlow, нужно скопировать этот файл в папку /airflow/dags/, которая находится в операционной системе Linux, используемой в Colab. Для этого следует в отдельной ячейке Colab выполнить такой код:

!cp /content/airflow/dags/ANNA_DAG_SaveFile.py ~/airflow/dags/ANNA_DAG_SaveFile.py
!airflow dags unpause ANNA_DAG_SaveFile

Далее можно, наконец, запустить веб-сервер Airflow, чтобы управлять задачами DAG через веб-интерфейс. Сделаем это на порту 8888 в фоновом режиме (daemon mode). Команда !airflow webserver запускает веб-сервер Airflow, который обрабатывает HTTP-запросы и позволяет управлять задачами DAG через веб-интерфейс:

!airflow webserver --port 8888 --daemon

Аргумент —port 8888 задают порт, на котором будет запущен веб-сервер, а аргумент —daemon запускает сервер в фоновом режиме, чтобы процесс выполнялся в фоновом режиме и не блокировал терминал, позволяя запускать другие ячейки Colab. После выполнения этой команды можно открыть веб-браузер и перейти на страницу URL-адреса, полученного ранее с помощью утилиты ngrok, которая перебрасывает локальный хост http://localhost:8888 на внешний URL-адрес. Теперь можно войти в веб-интерфейс Apache AirFlow, используя логин и пароль ранее созданного пользователя.

Чтобы в увидеть все запланированные, исполняющиеся и выполненные DAG в разделе DAGs, необходимо запустить планировщик AirFlow. Удобнее всего это сделать тоже в фоновом режиме с помощью аргумента —daemon:

!airflow scheduler --daemon

В разделе DAG открылись цепочки задач, которые создаются по умолчанию в качестве демонстрационных и обучающих материалов, а также мой пользовательский DAG:

DAG Apache AirFlow
Перечень DAG

Кликнув на свой DAG, его можно запустить на исполнение с помощью команды Trigger DAG:

запуск DAG AirFlow
Просмотр и запуск DAG

Мой простой DAG выполнился успешно несколько раз, что отображается в сводной статистике по запускам и в деталях отдельно взятого конвейера, включая его графическое отображение:

DAG AirFLow пример
Графический вид DAG

Результаты выполнения, как и задано в DAG, записываются в текстовый файл:

Colab Python AirFlow пример
Запись результатов в текстовый файл

В завершение хочу показать сохраненные переменные XCom для обмена данными между задачами, которые можно посмотреть в соответствующем разделе веб-GUI:

XCom GUI AirFlow
Перечень сохраненных переменных XCom в GUI AirFlow

По завершении работы с веб-интерфейсом AirFlow надо освободить занятый порт с помощью следующего кода:

!pkill -f ngrok #поиск и завершение процесса, который запущен с помощью команды ngrok
# отключение публичного URL, созданного с помощью ngrok для веб-сервера Airflow
ngrok.disconnect(public_url=public_url)

В новой статье я расскажу, как добавить собственное соединение в Apache AirFlow с помощью Python-кода в Colab, и использовать его при отправке результатов выполнения задач DAG в свой чат-бот Телеграм.

Узнайте больше про использование Apache AirFlow  для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту