Недавно мы разбирали, как дата-инженеру написать собственный оператор Apache AirFlow и использовать его в DAG. Сегодня посмотрим, каким образом с этой задачей справляется модный ИИ под названием ChatGPT.
GPT-генерация пользовательского оператора AirFlow
Хотя Apache AirFow предоставляет множество операторов для выполнения самых разных задач, иногда дата-инженеру приходится писать свои собственные Python-классы, решающие специфическую задачу в рабочем процессе, включая логику ее выполнения, входы, выходы и зависимости. Чтобы разработать собственный оператор, нужно создать новый класс Python, который наследуется от класса BaseOperator, затем зарегистрировать его и использовать в DAG.
Напомним, ChatGPT является частью семейства языковых моделей OpenAI GPT (Generative Pre-trained Transformer), который можно использовать для создания текстов, включая исходный код на популярных языках программирования, на основе заданной подсказки. API ChatGPT требует ключ и поддерживает ряд параметров, таких как максимальное количество генерируемых токенов, температура ответа и используемый движок нейросети.
В качестве подсказки для ChatGPT был задан следующий текст:
Write a custom airflow operator that utilizes the ChatGPT API from Open AI. The operator should take the users API Key and Prompt as input parameters
(Напиши пользовательский оператор AirFlow, который использует API ChatGPT от Open AI. В качестве входных параметров оператор должен использовать API-ключ и подсказку).
Сперва создается Python-файл под названием chatgpt_operator.py в каталоге проекта AirFlow. Далее определяется пользовательский класс оператора под названием ChatGPTOperator, который будет взаимодействовать с ChatGPT API.
from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults import openai class ChatGPTOperator(BaseOperator): @apply_defaults def __init__(self, api_key, prompt, *args, **kwargs): super(ChatGPTOperator, self).__init__(*args, **kwargs) self.api_key = api_key self.prompt = prompt def execute(self, context): # Set up the OpenAI API client openai.api_key = self.api_key # Generate a response using the ChatGPT API response = openai.Completion.create( engine="text-davinci-002", prompt=self.prompt, max_tokens=1024, n=1, stop=None, temperature=0.5, ) # Log the response and return it self.log.info(f"Generated response: {response.choices[0].text}") return response.choices[0].text
В 3-ей строке этого Python-скрипта выполняется импорт библиотеки OpenAI, которая обеспечивает удобный доступ к API OpenAI из Python-приложений. Она включает в себя предопределенный набор классов для ресурсов API, которые динамически инициализируются из ответов API OpenAI.
Протестировать этот пользовательский оператор можно, сгенерировав DAG с помощью того же ИИ:
from airflow import DAG from datetime import datetime, timedelta from chatgpt_operator import ChatGPTOperator default_args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2023, 4, 5), 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('chatgpt_dag', default_args=default_args, schedule_interval=timedelta(days=1)) generate_response_task = ChatGPTOperator( task_id='generate_response', api_key='your_api_key', prompt='your_prompt_text', dag=dag, ) generate_response_task
Чтобы сделать созданный оператор доступным для практического использования, его следует поместить его в директорию /dags или добавить в файл /plugins/operators, если используется Astro CLI. После этого оператор входит в Python-путь (PYTHONPATH) и может быть использован. Также следует убедиться в наличии всех необходимых зависимостей. В частности, предварительно надо установить Python-библиотеку OpenAI. При работе с Astro CLI достаточно добавить эту зависимость в файл requirements.txt. После этого можно использовать самую популярную на сегодня ИИ-технологию в работе с Apache AirFlow.
Узнайте больше про применение Apache AirFlow для дата-инженерии и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники