Создаем свой оператор Apache AirFlow с ChatGPT

AirFlow custom operator, ChatGPT operator Apache AirFlow, пользовательский оператор Apache AirFlow, машинное обучение дата-инженерия ChatGPT, обучение AirFlow, AirFlow операторы DAG примеры курсы обучение, обучение инженеров данных Big Data, курсы дата-инженеров, Школа Больших Данных Учебный центр Коммерсант

Недавно мы разбирали, как дата-инженеру написать собственный оператор Apache AirFlow и использовать его в DAG. Сегодня посмотрим, каким образом с этой задачей справляется модный ИИ под названием ChatGPT.

GPT-генерация пользовательского оператора AirFlow

Хотя Apache AirFow предоставляет множество операторов для выполнения самых разных задач, иногда дата-инженеру приходится писать свои собственные Python-классы, решающие специфическую задачу в рабочем процессе, включая логику ее выполнения, входы, выходы и зависимости. Чтобы разработать собственный оператор, нужно создать новый класс Python, который наследуется от класса BaseOperator, затем зарегистрировать его и использовать в DAG.

Напомним, ChatGPT является частью семейства языковых моделей OpenAI GPT (Generative Pre-trained Transformer), который можно использовать для создания текстов, включая исходный код на популярных языках программирования, на основе заданной подсказки. API ChatGPT требует ключ и поддерживает ряд параметров, таких как максимальное количество генерируемых токенов, температура ответа и используемый движок нейросети.

В качестве подсказки для ChatGPT был задан следующий текст:

Write a custom airflow operator that utilizes the ChatGPT API from Open AI. The operator should take the users API Key and Prompt as input parameters

(Напиши пользовательский оператор AirFlow, который использует API ChatGPT от Open AI. В качестве входных параметров оператор должен использовать API-ключ и подсказку).

Сперва создается Python-файл под названием chatgpt_operator.py в каталоге проекта AirFlow.  Далее определяется пользовательский класс оператора под названием ChatGPTOperator, который будет взаимодействовать с ChatGPT API.

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import openai


class ChatGPTOperator(BaseOperator):
    @apply_defaults
    def __init__(self, api_key, prompt, *args, **kwargs):
        super(ChatGPTOperator, self).__init__(*args, **kwargs)
        self.api_key = api_key
        self.prompt = prompt

    def execute(self, context):
        # Set up the OpenAI API client
        openai.api_key = self.api_key

        # Generate a response using the ChatGPT API
        response = openai.Completion.create(
            engine="text-davinci-002",
            prompt=self.prompt,
            max_tokens=1024,
            n=1,
            stop=None,
            temperature=0.5,
        )

        # Log the response and return it
        self.log.info(f"Generated response: {response.choices[0].text}")
        return response.choices[0].text

В 3-ей строке этого Python-скрипта выполняется импорт библиотеки OpenAI, которая обеспечивает удобный доступ к API OpenAI из Python-приложений. Она включает в себя предопределенный набор классов для ресурсов API, которые динамически инициализируются из ответов API OpenAI.

Протестировать этот пользовательский оператор можно, сгенерировав DAG с помощью того же ИИ:

from airflow import DAG
from datetime import datetime, timedelta 
from chatgpt_operator import ChatGPTOperator

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 4, 5),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG('chatgpt_dag', default_args=default_args, schedule_interval=timedelta(days=1))

generate_response_task = ChatGPTOperator(
    task_id='generate_response',
    api_key='your_api_key',
    prompt='your_prompt_text',
    dag=dag,
)

generate_response_task

Чтобы сделать созданный оператор доступным для практического использования, его следует поместить его в директорию /dags или добавить в файл /plugins/operators, если используется Astro CLI. После этого оператор входит в Python-путь (PYTHONPATH) и может быть использован. Также следует убедиться в наличии всех необходимых зависимостей. В частности, предварительно надо установить Python-библиотеку OpenAI. При работе с Astro CLI достаточно добавить эту зависимость в файл requirements.txt. После этого можно использовать самую популярную на сегодня ИИ-технологию в работе с Apache AirFlow.

Узнайте больше про применение Apache AirFlow для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://openai.com/blog/chatgpt
  2. https://medium.com/apache-airflow/chatgpt-operator-for-airflow-the-operator-that-wrote-itself-2ce9e14c0537
  3. https://github.com/TheCodyRich/airflow-provider-thecodyrich
Поиск по сайту