Сегодня рассмотрим, как выполнить DAG Apache AirFlow, запустив его в интерактивной среде Colab и получив доступ в веб-GUI этого фреймворка, создав туннель локального хоста на публичный URL с помощью утилиты ngrok. В качестве примера построим простой конвейер из 5 задач.
Запуск Apache AirFlow в Google Colab
Чтобы не повторять содержимое прошлой статьи, где я подробно рассказывала, как настроить AirFlow в Google Cloud Platform, и запускать DAG-файлы из Colab, используя удаленный исполнитель. Сразу перейдем к коду, позволяющему это сделать. Напомню, что при запуске в Colab веб-сервер AirFlow запускается на удаленной машине и, чтобы достучаться до веб-интерфейса фреймворка, необходимо прокинуть туннель, который делает локальный хост доступным извне. Как обычно, для этого я использую утилиту ngrok, которая позволяет открыть доступ к внутренним ресурсам машины, на которой он запущен, из внешней сети путем создания публичного URL-адреса, все запросы на который будут переброшены на локальный адрес и заданный порт удаленной машины.
Сперва надо установить набор необходимых библиотек и импортировать модули:
###############################################ячейка №1 в Google Colab############################# #Установка Apache Airflow и инициализация базы данных. !pip install apache-airflow !airflow initdb #Установка инструмента ngrok для создания безопасного туннеля для доступа к веб-интерфейсу Airflow из любого места !wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip !unzip ngrok-stable-linux-amd64.zip !pip install pyngrok #импорт модулей from pyngrok import ngrok import sys import os import datetime from airflow import DAG from airflow.operators.bash import BashOperator from google.colab import drive drive.mount('/content/drive') os.makedirs('/content/airflow/dags', exist_ok=True) dags_folder = os.path.join(os.path.expanduser("~"), "airflow", "dags") os.makedirs(dags_folder, exist_ok=True)
Поскольку я буду запускать собственный DAG, его нужно положить в папку /airflow/dags/, скопировав из пользовательской папки в Colab. Для более удобной отладки проще всего подключить Google-диск, чтобы быстро обновлять содержимое файла с DAG, просто удаляя предыдущий и загружая новой. Именно поэтому в вышеприведенном участке кода выполняется импорт и монтирование Гугл-диска, а также создание каталога для хранения DAG.
Далее пробросим туннель с помощью ngrok, используя свой токен аутентификации, полученный ранее:
###############################################ячейка №2 в Google Colab############################# #Задание переменной auth_token для аутентификации в сервисе ngrok. auth_token = "…здесь ваш токен аутентификации в сервисе ngrok…" # Since we can't access Colab notebooks IP directly we'll use # ngrok to create a public URL for the server via a tunnel # Authenticate ngrok # https://dashboard.ngrok.com/signup # Then go to the "Your Authtoken" tab in the sidebar and copy the API key #Аутентификация в сервисе ngrok с помощью auth_token os.system(f"ngrok authtoken {auth_token}") #Запуск ngrok, который создаст публичный URL для сервера через туннель #для доступа к веб-интерфейсу Airflow из любого места. #addr="8888" указывает на порт, на котором запущен веб-сервер Airflow, а proto="http" указывает на использование протокола HTTP public_url = ngrok.connect(addr="8888", proto="http") #Вывод публичного URL для доступа к веб-интерфейсу Airflow print("Адрес Airflow GUI:", public_url)
В области вывода показана ссылка, которая нужна для доступа к веб-серверу AirFlow:
Затем следует инициализировать базу данных AirFlow – по умолчанию это легковесная резидентная СУБД SQLite, которая хранится в памяти, а не на жестком диске. Также я создала пользователя anna, под логином и паролем которого можно будет войти в веб-интерфейс AirFlow:
#############################ячейка №3 в Google Colab######################### !airflow db init #Инициализация базы данных Airflow !airflow upgradedb #Обновление базы данных Airflow #Создание нового пользователя в Apache Airflow с именем пользователя anna, именем Anna, фамилией Anna, адресом электронной почты anna.doe@example.com и паролем password. #Этот пользователь будет иметь роль Admin, которая дает полный доступ к интерфейсу Airflow. !airflow users create --username anna --firstname Anna --lastname Anna --email anna@example.com --role Admin --password password
Запуск пользовательского DAG
Далее напишем код свой простой конвейер — DAG (Directed Acyclic Graph) из нескольких задач и сохраним его в виде отдельного py-файла:
import random import math import os from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from google.colab import files from datetime import datetime, timedelta default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime.now(), 'email_on_failure': True, 'email_on_success': True, 'email': ['vichugova.anna@gmail.com'], 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG( 'ANNA_DAG_SaveFile', default_args=default_args, schedule_interval=timedelta(days=1), ) def write_to_file(**context): execution_date = context['execution_date'].strftime('%Y-%m-%d') result = context['ti'].xcom_pull(task_ids='task4') filename = f'/content/airflow/results/{execution_date}.txt' with open(filename, 'a') as f: f.write(result + '\n') # Ниже определяются операторы, которые будут выполняться в DAG # Формирование текста с номером запуска DAG в task1, вывод текущей даты в task2, # вывод текущего времени в task3, вывод результатов выполнения task2 и task3 в task4, # и запись результатов в txt-файл с названием текущей даты в task5 task1 = BashOperator( task_id='task1', bash_command='echo "Запуск DAG {{ dag_run.run_id }}" | tee /tmp/dag_run_id.txt', dag=dag, ) task2 = BashOperator( task_id='task2', bash_command='echo "Сегодня $(date +%d-%m-%Y)"', dag=dag, ) task3 = BashOperator( task_id='task3', bash_command='echo "В $(date +%H:%M:%S)"', dag=dag, ) task4 = BashOperator( task_id='task4', bash_command='echo "{{ ti.xcom_pull(task_ids=\'task1\') }}. {{ ti.xcom_pull(task_ids=\'task2\') }}. {{ ti.xcom_pull(task_ids=\'task3\') }}"', dag=dag, ) task5 = PythonOperator( task_id='task5', python_callable=write_to_file, dag=dag ) task1 >> [task2, task3] >> task4 >> task5
Представленный код определяет набор задач, связанных между собой в определенном порядке. В коде определен DAG с названием «ANNA_DAG_SaveFile», который запускается ежедневно и состоит из следующих операторов:
- task1 – BashOperator выводит номер запуска DAG;
- task2 – BashOperator выводит текущую дату;
- task3 – BashOperator выводит текущее время;
- task4 — BashOperator выводит результаты выполнения предыдущих задач;
- task5 – PythonOperator записывает результаты выполнения DAG в файл с названием текущей даты.
DAG определен так, что каждый следующий оператор зависит от результатов выполнения предыдущего оператора. Для передачи данных между операторами используется механизм XCom, который позволяет обмениваться данными между задачами внутри DAG. Также в коде определена функция write_to_file, которая используется в задаче task5 для записи результатов выполнения DAG в файл. Функция получает текущую дату и результаты выполнения оператора task4 с помощью механизма XCom и записывает их в файл с названием текущей даты в папке /content/airflow/results, которую я создала заранее.
Сохранив код DAG в виде py-файла, я загрузила его в папку /content/airflow/dags. Чтобы он попал в AirFlow, нужно скопировать этот файл в папку /airflow/dags/, которая находится в операционной системе Linux, используемой в Colab. Для этого следует в отдельной ячейке Colab выполнить такой код:
!cp /content/airflow/dags/ANNA_DAG_SaveFile.py ~/airflow/dags/ANNA_DAG_SaveFile.py !airflow dags unpause ANNA_DAG_SaveFile
Далее можно, наконец, запустить веб-сервер Airflow, чтобы управлять задачами DAG через веб-интерфейс. Сделаем это на порту 8888 в фоновом режиме (daemon mode). Команда !airflow webserver запускает веб-сервер Airflow, который обрабатывает HTTP-запросы и позволяет управлять задачами DAG через веб-интерфейс:
!airflow webserver --port 8888 --daemon
Аргумент —port 8888 задают порт, на котором будет запущен веб-сервер, а аргумент —daemon запускает сервер в фоновом режиме, чтобы процесс выполнялся в фоновом режиме и не блокировал терминал, позволяя запускать другие ячейки Colab. После выполнения этой команды можно открыть веб-браузер и перейти на страницу URL-адреса, полученного ранее с помощью утилиты ngrok, которая перебрасывает локальный хост http://localhost:8888 на внешний URL-адрес. Теперь можно войти в веб-интерфейс Apache AirFlow, используя логин и пароль ранее созданного пользователя.
Чтобы в увидеть все запланированные, исполняющиеся и выполненные DAG в разделе DAGs, необходимо запустить планировщик AirFlow. Удобнее всего это сделать тоже в фоновом режиме с помощью аргумента —daemon:
!airflow scheduler --daemon
В разделе DAG открылись цепочки задач, которые создаются по умолчанию в качестве демонстрационных и обучающих материалов, а также мой пользовательский DAG:
Кликнув на свой DAG, его можно запустить на исполнение с помощью команды Trigger DAG:
Мой простой DAG выполнился успешно несколько раз, что отображается в сводной статистике по запускам и в деталях отдельно взятого конвейера, включая его графическое отображение:
Результаты выполнения, как и задано в DAG, записываются в текстовый файл:
В завершение хочу показать сохраненные переменные XCom для обмена данными между задачами, которые можно посмотреть в соответствующем разделе веб-GUI:
По завершении работы с веб-интерфейсом AirFlow надо освободить занятый порт с помощью следующего кода:
!pkill -f ngrok #поиск и завершение процесса, который запущен с помощью команды ngrok # отключение публичного URL, созданного с помощью ngrok для веб-сервера Airflow ngrok.disconnect(public_url=public_url)
В новой статье я расскажу, как добавить собственное соединение в Apache AirFlow с помощью Python-кода в Colab, и использовать его при отправке результатов выполнения задач DAG в свой чат-бот Телеграм.
Узнайте больше про использование Apache AirFlow для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве: