Apache Airflow

    В этой статье я бы хотел рассказать об основных концепциях Airflow и как с ним работать.

    Что такое Airflow?

    Airflow – это open-source оркестратор для управления процессами загрузки и обработки данных. Если у вас есть большое количество задач, запускаемых на cron, особенно, если между ними есть зависимости, то Airflow может вам сильно помочь.

    Основные его преимущества – это несложная инсталяция и первые шаги, хорошая визуализация, а также возможность автоматически создавать большое число задач и широкие возможности кастомизации.

    Основной объект Airflow – это направленный ацикличный граф (DAG). Узлы DAG – это task (задачи, которые выполняют основную работу). Между task’ами есть связи. Как следует из определения, циклов в зависимостях быть не может.

    Пример визуализации DAG
    Рис.1 (Пример визуализации DAG)

    DAG в Airflow может состоять из множества веток, различных ветвлений и т.п. Также можно устанавливать зависимости не только внутри одного DAG, но и между несколькими DAG’ами. Часть задач можно пропускать (skip) в зависимости от условий, статусов завершения предыдущих задач и т.п.

    Также для задач можно выставлять различные приоритеты (priority_weight).

    Основные концепции

    DAG состоит из task и связей между ними.

    Каждый task – это, по сути, экземпляр Operator класса с заданным списком параметров (для каждого оператора они разные).

    Когда DAG запускается, Airflow создает экземпляр DAG Run.

    Когда в контексте DAG запускается task, создается task Instance, которые и выполняет различные действия с данными.

    Более подробно об основных концепциях Airflow можно почтить в официальной документации.

    Установка Airflow

    Airflow написан на Python 2.7, но уже достаточно давно в Production решениях используется Python 3. В целом проблем это не вызывает, но есть некоторые библиотеки, несовместимые с Python 3 (например MySQLdb нужно будет заменить на pymysql).

    Установка библиотек Airflow:

    sudo apt update
    sudo apt install python3
    sudo apt install python3-pip
    pip3 install --upgrade setuptools
    export SLUGIFY_USES_TEXT_UNIDECODE=yes
    sudo pip3 install apache-airflow

    Настройка окружения:

    mkdir airflow
    AIRFLOW_HOME=~/airflow
    airflow initdb

    Запуск сервисов airflow

    airflow scheduler
    airflow webserver
    airflow worker

    Также можно настроить запуск сервисов Airflow через systemd (пример из репозитория Airflow) или docker (https://github.com/puckel/docker-airflow).

    После запуска webserver будет доступен по ссылке http://localhost:8080/admin/

    Создадим свой первый DAG

    from datetime import datetime
    from airflow import DAG
    from airflow.operators.dummy_operator import DummyOperator
    from airflow.operators.python_operator import PythonOperator
    # Python функция, которая у нас будет использоваться в дальнейшем
    def print_hello():
        return ‘Hello world!’
    # Создадим экземпляр DAG (контекст для наших task)
    dag = DAG(‘hello_world’, description=‘Simple tutorial DAG’,
              schedule_interval=‘0 12 * * *’, # расписание
              start_date=datetime(2017, 3, 20), catchup=False) # дата начала работы DAG

    dummy_operator = DummyOperator(task_id=dummy_task, retries=3, dag=dag) # Пустой оператор, ничего не делает

    hello_operator = PythonOperator(task_id=hello_task, python_callable=print_hello, dag=dag) # Оператор, который вызывает нашу функцию print_hello

    dummy_operator >> hello_operator # Установим зависимости между task’ами

    Запуск DAG

    Сначала нужно положить  *.py файлик в директорию dags_folder. Найти её можно в ~/airflow/airflow.cfg

    Airflow сам увидит DAG и подтянет его в web-интерфейс.

    Настройка WEB интерфейса Apache AirFlow
    Рис.2 DAG в web-интерфейсе

    Для того, чтобы DAG стал активен, нужно нажать на переключатель on/off (стрелка 1). Тогда scheduler увидит, что этот DAG нужно запустить.

    Если нажать на кнопку 2, то можно будет увидеть структуру DAG. Выглядеть в случае нашего примера hello_world будет так:

    DAG в Apache AirFlow
    Рис. 3 Пример структуры DAG

    Зеленая рамка вокруг первого task означает, что он завершился успешно. Серая вокруг второго, что он стоит в очереди на выполнение.

    Более подробно почитать про визуализацию можно в официальной документации.

    После успешного завершения hello_task, DAG перейдет в статус success.

    Заключение

    В этой статье я рассказал об основных особенностях в airflow и о том, как начать с ним работать. В следующих статьях я расскажу о том, как генерировать большое количество тасков автоматически, писать собственные операторы, использовать airflow connections для хранения паролей и многое другое.

    Приглашаем вас на наши курсы по Apache Airflow в учебном центре « Школа Больших Данных»