В этой статье я бы хотел рассказать об основных концепциях Airflow и как с ним работать.
Что такое Airflow?
Airflow – это open-source оркестратор для управления процессами загрузки и обработки данных. Если у вас есть большое количество задач, запускаемых на cron, особенно, если между ними есть зависимости, то Airflow может вам сильно помочь.
Основные его преимущества – это несложная инсталяция и первые шаги, хорошая визуализация, а также возможность автоматически создавать большое число задач и широкие возможности кастомизации.
Основной объект Airflow – это направленный ацикличный граф (DAG). Узлы DAG – это task (задачи, которые выполняют основную работу). Между task’ами есть связи. Как следует из определения, циклов в зависимостях быть не может.
DAG в Airflow может состоять из множества веток, различных ветвлений и т.п. Также можно устанавливать зависимости не только внутри одного DAG, но и между несколькими DAG’ами. Часть задач можно пропускать (skip) в зависимости от условий, статусов завершения предыдущих задач и т.п.
Также для задач можно выставлять различные приоритеты (priority_weight).
Основные концепции
DAG состоит из task и связей между ними.
Каждый task – это, по сути, экземпляр Operator класса с заданным списком параметров (для каждого оператора они разные).
Когда DAG запускается, Airflow создает экземпляр DAG Run.
Когда в контексте DAG запускается task, создается task Instance, которые и выполняет различные действия с данными.
Более подробно об основных концепциях Airflow можно почтить в официальной документации.
Установка Airflow
Airflow написан на Python 2.7, но уже достаточно давно в Production решениях используется Python 3. В целом проблем это не вызывает, но есть некоторые библиотеки, несовместимые с Python 3 (например MySQLdb нужно будет заменить на pymysql).
Установка библиотек Airflow:
sudo apt update sudo apt install python3 sudo apt install python3-pip pip3 install --upgrade setuptools export SLUGIFY_USES_TEXT_UNIDECODE=yes sudo pip3 install apache-airflow
Настройка окружения:
mkdir airflow AIRFLOW_HOME=~/airflow airflow initdb
Запуск сервисов airflow
airflow scheduler airflow webserver airflow worker
Также можно настроить запуск сервисов Airflow через systemd (пример из репозитория Airflow) или docker (https://github.com/puckel/docker-airflow).
После запуска webserver будет доступен по ссылке http://localhost:8080/admin/
Создадим свой первый DAG
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
# Python функция, которая у нас будет использоваться в дальнейшем
def print_hello():
return ‘Hello world!’
# Создадим экземпляр DAG (контекст для наших task)
dag = DAG(‘hello_world’, description=‘Simple tutorial DAG’,
schedule_interval=‘0 12 * * *’, # расписание
start_date=datetime(2017, 3, 20), catchup=False) # дата начала работы DAG
dummy_operator = DummyOperator(task_id=‘dummy_task‘, retries=3, dag=dag) # Пустой оператор, ничего не делает
hello_operator = PythonOperator(task_id=‘hello_task‘, python_callable=print_hello, dag=dag) # Оператор, который вызывает нашу функцию print_hello
dummy_operator >> hello_operator # Установим зависимости между task’ами
Запуск DAG
Сначала нужно положить *.py файлик в директорию dags_folder. Найти её можно в ~/airflow/airflow.cfg
Airflow сам увидит DAG и подтянет его в web-интерфейс.
Для того, чтобы DAG стал активен, нужно нажать на переключатель on/off (стрелка 1). Тогда scheduler увидит, что этот DAG нужно запустить.
Если нажать на кнопку 2, то можно будет увидеть структуру DAG. Выглядеть в случае нашего примера hello_world будет так:
Зеленая рамка вокруг первого task означает, что он завершился успешно. Серая вокруг второго, что он стоит в очереди на выполнение.
Более подробно почитать про визуализацию можно в официальной документации.
После успешного завершения hello_task, DAG перейдет в статус success.
Заключение
В этой статье я рассказал об основных особенностях в airflow и о том, как начать с ним работать. В следующих статьях я расскажу о том, как генерировать большое количество тасков автоматически, писать собственные операторы, использовать airflow connections для хранения паролей и многое другое.
Приглашаем вас на наши курсы по Apache Airflow в учебном центре « Школа Больших Данных»