Потоковая публикация данных в REST API с Apache Spark Streaming

потоковая передача данных из Delta Lake во внешний REST API со Spark Structured Streaming, foreachBatch Spark Structured Streaming, Spark Structured Streaming примеры курсы обучение Delta Lake, обучение архитекторов и инженеров Big Data, Школа Больших Данных Учебный Центр Коммерсант

Как реализовать потоковую публикацию данных из приложения Apache Spark Structured Streaming во внешний REST API, используя метод foreachBatch(), зачем перераспределять датафрейм перед его упаковкой в полезную нагрузку HTTP-запроса, от чего зависит число вызовов, и какие приемы помогут избежать сбоев из-за ошибок.

6 шагов потоковой публикации данных в REST API с Apache Spark Structured Streaming

Apache Spark Structured Streaming лежит в основе потоковой передачи данных на платформе Databricks Lakehouse. Он эффективно выполняет разнообразную логическую обработку в различных объемах, от небольших ETL-процессов до крупнейших платформ. Spark Structured Streaming может обрабатывать множество источников и приемников данных. Помимо популярных СУБД, файловых хранилищ и платформ потоковой передачи типа Apache Kafka и т.д., Spark Structured Streaming поддерживает специализированный приемник, который может выполнять произвольную логику на выходе потокового запроса с помощью метода foreachBatch(). С помощью foreachBatch() любая цель вывода, адресуемая в коде приложения, может быть местом назначения потоковых данных. Именно этот метод используется для масштабируемой публикации потоковых данных в вызовы REST API. 

Подобный сценарий характерен, в т.ч. для маршрутизации записей потокового конвейера в REST API в дополнение к постоянным приемникам, когда уже существует конвейер обработки данных на основе Spark-приложений для аналитики или машинного обучения. Другим примером может быть простая автозагрузка потоковых данных во внешний REST API.

Чтобы реализовать это средствами Spark Structured Streaming, сперва создадим датафрейм, например, считав потоковые данные из таблицы Delta:

dfSource = (spark.readStream
                .format("delta")
                .table("samples.nyctaxi.trips"))

Далее для направления потоковых данных в REST API реализуем ряд шагов. Сперва необходимо использовать foreachBatch() для передачи входящих микропакетов методу-обработчику (callRestAPIBatch), который будет обрабатывать вызовы REST API.

streamHandle = (dfSource.writeStream
                       .foreachBatch(callRestAPIBatch)
                       .start())

Далее сгруппируем несколько строк из входных данных при каждом исходящем вызове REST API. В относительном выражении вызов API через HTTP будет медленной частью процесса. Чтобы достигать высокой пропускной способности будет значительно улучшена, если включить несколько сообщений/записей в тело каждого API-вызова. При этом надо определить максимум, который можно уместить в одном вызове целевого API. В методе, вызываемом foreachBatch(), есть подготовительный шаг для преобразования микропакетного датафрейма в предварительно пакетный датафрейм, где каждая строка содержит сгруппированные записи для одного вызова API. Этот шаг также дает возможность выполнить последнее преобразование записей в формат, ожидаемый целевым API. 

В большинстве случаев для достижения желаемого уровня пропускной способности потребуется выполнять вызовы API из параллельных задач. Можно контролировать степень параллелизма, вызывая перераспределение датафрейма методом repartition(), в параметрах которого передается количество параллельных задач, которые хотят вызвать API. Причем это делается с помощью всего одной строки кода, например, для запуска параллельных 8 задач:

new_df = pre_batched_df.repartition(8)

Однако, явное перераспределение больших наборов данных может снизить производительность, особенно если оно вызывает перетасовку данных между узлами в кластере, что влечет накладные расходы. Но в большинстве случаев вызова REST API размер данных любого микропакета достаточно мал. Поэтому на практике перераспределение данных в случае работы с REST API не должно вызвать проблемы. 

Затем надо преобразовать датафрейм, который вызывает вложенную функцию, предназначенную для выполнения одного вызова REST API. Входными данными для этой функции будет одна строка предварительно упакованных данных. В примере столбец payload содержит данные полезной нагрузки, которые нужно включить в один вызов:

submitted_df = new_df.withColumn("RestAPIResponseCode",\
                          callRestApiOnce(new_df["payload"])).\
                          collect()

Внутри вложенной функции, которая будет выполнять один вызов API, надо использовать библиотеки для отправки HTTP-запроса POST для REST API. Например, в Python это можно сделать с помощью библиотеки Requests.

Чтобы обработать потенциальные ошибки при вызове REST API, логику работы с ним нужно завернуть в блок try-except или проверять код ответа HTTP. Если вызов не удался, все задание может быть не выполнено путем создания исключения (для повтора задания или устранения неполадок), или отдельные записи могут быть перенаправлены в очередь недоставленных писем для исправления или последующей повторной попытки. Например,

if not (response.status_code==200 or response.status_code==201) :
 raise Exception("Response status : {} .Response message : {}".\
                 format(str(response.status_code),response.text))

Такая последовательность из 6-шагов поможет отправить потоковые данные в REST API с возможностью масштабирования пропускной способности и тщательной обработки ошибок. На практике эту последовательность шагов реализует следующий Python-код:

from pyspark.sql.functions import *
from pyspark.sql.window import Window
import math
import requests 
from requests.adapters import HTTPAdapter
 
def preBatchRecordsForRestCall(microBatchDf, batchSize):
    batch_count = math.ceil(microBatchDf.count() / batchSize)
    microBatchDf = microBatchDf.withColumn("content", to_json(struct(col("*"))))
    microBatchDf = microBatchDf.withColumn("row_number",\
                                            row_number().over(Window().orderBy(lit('A'))))
    microBatchDf = microBatchDf.withColumn("batch_id", col("row_number") % batch_count)
    return microBatchDf.groupBy("batch_id").\
                                          agg(concat_ws(",|", collect_list("content")).\
                                          alias("payload"))

  
def callRestAPIBatch(df, batchId):
  restapi_uri = "<REST API URL>"   
    
  @udf("string")
  def callRestApiOnce(x):
    session = requests.Session()
    adapter = HTTPAdapter(max_retries=3)
    session.mount('http://', adapter)
    session.mount('https://', adapter)
 
    #this code sample calls an unauthenticated REST endpoint; add headers necessary for auth    
    headers = {'Authorization':'abcd'}
    response = session.post(restapi_uri, headers=headers, data=x, verify=False)
    if not (response.status_code==200 or response.status_code==201) :
      raise Exception("Response status : {} .Response message : {}".\
                      format(str(response.status_code),response.text))
        
    return str(response.status_code)
  
  ### Call helper method to transform df to pre-batched df with one row per REST API call
  ### The POST body size and formatting is dictated by the target API; this is an example
  pre_batched_df = preBatchRecordsForRestCall(df, 10)
  
  ### Repartition pre-batched df for target parallelism of API calls
  new_df = pre_batched_df.repartition(8)
 
  ### Invoke helper method to call REST API once per row in the pre-batched df
  submitted_df = new_df.withColumn("RestAPIResponseCode",\
                                    callRestApiOnce(new_df["payload"])).collect()
 
     
dfSource = (spark.readStream
                .format("delta")
                .table("samples.nyctaxi.trips"))

streamHandle = (dfSource.writeStream
                       .foreachBatch(callRestAPIBatch)
                       .trigger(availableNow=True)
                       .start())

Однако, помимо используемых методов и других программных конструкций при масштабной публикации потоковых данных в REST API надо учитывать еще ряд факторов, о которых мы поговорим далее.

Что еще нужно учесть разработчику и дата-инженеру

Метод foreachBatch() в Spark Structured Streaming обеспечивает гарантию доставки хотя бы один раз (at least once), что отличается от строго однократной доставки, предоставляемой при записи в некоторые приемники, такие как дельта-таблица или файловые хранилища. Но, поскольку Spark Streaming для потоковой передачи использует микропакетный подход, это может вызвать некоторые проблемы. Например, 1000 записей поступают в микропакете, и метод foreachBatch() начинает вызывать REST API. Предположим, 900 вызовов выполнены успешно, а потом произошла ошибка. Когда поток перезапустится, обработка возобновится путем повторной передачи этого пакета. Без дополнительной логики в коде приложения-продюсера 900 уже обработанных вызовов будут повторяться. Поэтому важно определить, насколько такие повторные обращения допустимы, или необходимо предпринять дополнительные шаги для защиты от дублирующей обработки.

Поэтому при использовании foreachBatch() целевой приемник данные, т.е. REST API должен быть идемпотентным или необходимо дополнительно следить за вызовами с одними и теми же данными, чтобы предотвратить создание дублей. Например, HTTP-метод POST считается неидемпотентным, что может привести к нарушению целостности данных, если этот запрос предполагает создание новой сущности, например, записи в БД, объекта какого-либо класса, нового потока и т.д. А PUT-запрос, вызываемый с теми же самыми параметрами и телом запросам, может быть реализован как идемпотентный, т.е. будет возвращать один и тот же результат даже при повторных вызовах. Поскольку в большинстве случаев повлиять на внешнюю систему, которой является REST API, нет возможности, поэтому реализацию идемпотентности придется выполнять в коде приложения-продюсера Spark Structured Streaming, о чем мы ранее писали здесь.

Помимо этого, следует учесть требования целевого API к аутентификации в HTTP-запросах. Если REST API требует аутентификации, необходимо добавить учетные данные в заголовок HTTP-запроса, внеся соответствующие изменения в код приложения Spark Structured Streaming.

Также необходимо предусмотреть ситуацию, когда целевой REST API ограничивает количество вызовов, которые можно сделать к нему в секунду, например, для предотвращения DDoS-атак. Необходимо узнать этот предел, чтобы рассчитать допустимое количество вызовов для передачи потоковых данных в теле HTTP-запроса. При превышении этого предела необходимо реализовать в коде Spark-приложения соответствующую обработку ошибок, например, с ранее упомянутой конструкций try-except. Впрочем, в любом случае следует убедиться, что целевой REST API готов к нагрузке и пропускной способности, генерируемой потоковой структурированной передачей Spark.

Поскольку рабочие узлы в хост-кластере Spark будут выполнять HTTP-вызовы к конечной точке REST API, необходимо соответствующим образом настроить среду. Наконец, учитывая особенности вызова REST API с потоковыми данными, разработчику Spark-приложения нужно оценить, сколько параллельных исполнителей/задач необходимо для достижения требуемой пропускной способности, и выбрать соответствующий размер кластера.

Освойте использование Apache Spark в аналитике больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://www.databricks.com/blog/2023/03/02/scalable-spark-structured-streaming-rest-api-destinations.html
  2. https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html
Поиск по сайту