Сегодня рассмотрим, как загружать большие объемы данных из REST API-сервисов с Apache Spark, написав на PySpark собственную UDF-функцию с преобразованием withColumn(), чтобы воспользоваться всеми преимуществами распределенных вычислений этого фреймворка.
Локальное исполнение на драйвере и распараллеливание REST-API вызовов в Apache Spark
Мы уже рассказывали, что конвертация Python-скрипта в распределенный код Apache Spark – не самая тривиальная задача. Например, выполнение кода Python вне контекста Dataframe на драйвере выполняется локально, а не распараллеливается по узлам кластера. Это становится проблемой при работе с большими объемами данных, например, когда нужно загрузить большие объемы данных через службу REST API.
Рассмотрим пример:
import requests import jsonres = Nonetry: res = requests.get(url, data=body, headers=headers) except Exception as e: print(e)if res != None and res.status_code == 200: print(json.loads(res.text))
Если просто запустить этот код на исполнение, он будет выполнен локально на драйвере, не используя преимущества распределенного фреймворка, такие как параллелизм и масштабирование. Это превращает код PySpark в просто однопоточную программу Python. Поэтому, чтобы использовать все возможности Apache Spark, необходимо найти альтернативное решение, например, написав пользовательскую функцию (UDF, User Defined Function) с оператором withColumn. В частности, можно создать DataFrame, в котором каждая строка представляет собой один запрос к службе REST. UDF используется для инкапсуляции HTTP-запроса, возвращая структурированный столбец, представляющий REST-ответ от API внешнего сервиса. К этому ответу можно применить синтаксический анализ (парсинг), чтобы разнести возвращаемые поля ключ-значения по столбцам датафрейма. Если возвращаемый JSON-файл содержит вложенные структуры типа массивов и объектов, их следует распарсить дополнительно, подобно тому, как мы писали здесь. Можно самостоятельно реализовать подобное решение, преобразовав многоуровневые сложные иерархические столбцы в неиерархическую версию самих себя. В преобразованном датафрейме не будет столбцов с данными сложного типа: всем вложенным атрибутам назначится собственный столбец.
Рассмотрим предложенное решение на примере бесплатной службы REST API правительства США, которая возвращает марки и модели транспортных средств, зарегистрированных в этой стране: https://vpic.nhtsa.dot.gov/api/vehicles/getallmakes?format=json.
Реализация UDF-функции на PySpark
Для отправки HTTP-запросов можно использовать Python-библиотеку Requests, которая избавляет от необходимости вручную добавлять строки запроса к URL-адресам или кодировать данные в запросе POST. Предположим, нужно использовать данные из REST API, вызываемый несколько раз, чтобы получить требуемые данные. Чтобы воспользоваться параллелизмом Apache Spark, каждый вызов REST API будет инкапсулирован UDF-функцией, которая привязана к DataFrame. Каждая строка в DataFrame будет представлять собой один вызов службы REST API. После выполнения действия в DataFrame результат каждого отдельного вызова REST API будет добавлен к каждой строке в виде структурированного типа данных.
Используем библиотеку Requests для выполнения HTTP-запроса get или post. Ответ RESTful-сервиса передается обратно в виде объекта JSON.
import requests import json from pyspark.sql.functions import udf, col, explode from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType from pyspark.sql import Row def executeRestApi(verb, url, headers, body): # headers = { 'content-type': "application/json" } res = None # Make API request, get response object back, create dataframe from above schema. try: if verb == "get": res = requests.get(url, data=body, headers=headers) else: res = requests.post(url, data=body, headers=headers) except Exception as e: return e if res != None and res.status_code == 200: return json.loads(res.text) return None
Apache Spark позволяет выбирать, какие значения нужны из JSON, возвращаемого вызовом REST API. Следует всего лишь при объявлении схемы данных определить, какие части JSON нужны.
schema = StructType([ StructField("Count", IntegerType(), True), StructField("Message", StringType(), True), StructField("SearchCriteria", StringType(), True), StructField("Results", ArrayType( StructType([ StructField("Make_ID", IntegerType()), StructField("Make_Name", StringType()) ]) )) ])
Потоковая обработка в Apache Spark
Код курса
SPOT
Ближайшая дата курса
27 февраля, 2025
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.
Затем необходимо объявить UDF, убедившись, что тип возвращаемого значения установлен в соответствии с заявленной схемой. Это гарантирует, что новый столбец, который используется для выполнения пользовательской функции, в конечном итоге будет содержать данные в виде структурированного объекта, а не простого текста в формате JSON. Действие аналогично использованию функции from_json, которая принимает схему в качестве второго параметра.
udf_executeRestApi = udf(executeRestApi, schema)
Наконец, необходимо создать DataFrame, где каждая строка представляет собой один вызов REST API. Количество столбцов в датафрейме может быть любым, один из которых должен содержать URL-адрес и/или параметры, необходимые для выполнения вызова REST API. В нашем примере создадим датафрейм следующим образом:
from pyspark.sql import Rowheaders = { 'content-type': "application/json" }body = json.dumps({ })RestApiRequestRow = Row("verb", "url", "headers", "body") request_df = spark.createDataFrame([ RestApiRequestRow("get", "https://vpic.nhtsa.dot.gov/api/vehicles/getallmakes?format=json", headers, body) ])
Класс Row используется для определения столбцов датафрейма, и с помощью метода createDataFrame экземпляр RestApiRequestRow объявляется для каждого отдельного вызова API. Далее можно использовать метод withColumn() в DataFrame для выполнения UDF и REST API.
result_df = request_df \ .withColumn("result", udf_executeRestApi(col("verb"), col("url"), col("headers"), col("body")))
Поскольку Apache Spark поддерживает ленивые или отложенные вычисления для преобразований, UDF для датафрейма будет выполняться после материализации запроса и вызове какого-либо действия, например, show(). При этом создается план запроса, но сами данные все еще находятся в хранилище и ожидают обработки. Spark будет распределять вызовы API между всеми рабочими процессами, прежде чем возвращать такие результаты HTTP-вызова с заголовками и телом результата. Если выполнить парсинг возвращаемого от RESTful-сервиса JSON-ответа с пользовательской UDF-функцией Collapse_columns, записывая в датафрейм только нужный атрибут, который определен как result.Results, создание итогового датафрейма будет выглядеть так:
df = request_df.select(explode(col("result.Results")).alias("results")) df.select(collapse_columns(df.schema)).show()
Графовые алгоритмы в Apache Spark
Код курса
GRAS
Ближайшая дата курса
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.
Узнайте больше про использование Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
Источники