Как создать свой коннектор Apache Spark: пример интеграции с Tableau

курсы по Spark, обучение Apache Spark, Apache Spark for developres, Apache Spark integration, обработка данных, большие данные, Big Data, Spark, Data Lake, Greenplum, Python

Говоря про практическое обучение Apache Spark для дата-инженеров, сегодня рассмотрим особенности разработки собственного коннектора для этого фреймворка на примере его интеграции с BI-системой Tableau. Читайте далее, как конвертировать Spark RDD в нужный формат и сделать свой коннектор удобным для пользователей.

Интеграция Spark с внешними источниками данных через коннекторы

Apache Spark – отличный инструмент для быстрой обработки Big Data, однако он не является хранилищем, в отличие от озера данных на Hadoop HDFS или СУБД, таких как Cassandra, Mongo DB, Greenplum, Elasticsearch и прочие базы данных. Чтобы считывать информацию из Data Lake, СУБД или другого источника, в Спарк используются специальные коннекторы в виде интерфейсов для одной из основных структур данных этого фреймворка — RDD (Resilient Distributed Dataset, надежная распределенная коллекция данных типа таблицы). Полученная информация, как правило, подвергается аналитической обработке и визуализации, например, в BI-инструментах или других системах аналитики больших данных.

Фреймворк предоставляет множеств готовых коннекторов в Spark SQL API. Например, коннектор записи write позволяет легко конвертировать структуру данных DataFrame в CSV с помощью всего одной строки кода: dataframe.write.csv(‘mycsv.csv‘). Также готовые коннекторы Спарк поддерживают и другие распространенные форматы: текстовые файлы, JSON, Parquet, ORC и т.д. Крупные компании-разработчики программного обеспечения уровня Microsoft, MongoDB Inc.  Pivotal Software и другие вендоры популярных систем, предлагают собственные форматы хранения больших данных и коннекторы для их интеграции со Спарк. К примеру, популярный продукт для бизнес-аналитики Tableau использует Tableau Data Extract (.tde) илиHyper (.hyper) в качестве форматов хранения своих таблиц [1].

Разработчик аналитической MPP-СУБД Greenplum, на основе которой создана отечественная Arenadata DB, корпорация Pivotal Software также предлагает готовый Greenplum-Spark Connector в виде JAR-файла. Начать работу с ним можно выполнив команду spark-shell с параметром –jars, который определяет путь файловой системы к JAR-файлу Greenplum-Spark Connector [2]:

spark-user@spark-node$ export GSC_JAR=/path/to/greenplum-spark_-.jar

spark-user@spark-node$ spark-shell —jars $GSC_JAR

СУБД Mongo DB также предлагает свой коннектор для Spark, предоставляя доступ ко всем библиотекам этого Big Data фреймворка, включая Scala, Java, Python и R. Данные MongoDB материализованы в виде DataFrames и Datasets для анализа с помощью машинного обучения, графического, потокового и SQL API [3].

При всем многообразии готовых и уникальных коннекторов для Apache Spark, все они работают по одному принципу, который мы рассмотрим далее.

Как устроены коннекторы Спарк и при чем здесь RDD

Работу Spark-коннекторов можно представить в виде 3-х этапов [1]:

  1. преобразование датафрейма в целевой формат. При этом стоит помнить про особенности обработки больших данных в этом фреймворке. В частности, RDD распределены по разделам (parttion), не доступным напрямую драйверу кластера, в котором выполняется код Python. Поэтому для каждого раздела RDD нужно собрать данные для драйвера с помощью метода .collect(), а затем просмотреть собранный раздел и вставить его в целевой файл построчно. Пример такой манипуляции показывает следующий код на Python:

def add_partition_index(partition_index, partition_rows):   

yield (partition_index, list(partition_rows))

partitioned_rdd = rdd.mapPartitionWithIndex(add_partition_index)

for current_partition_index in range(rdd.getNumPartitions()):  

[(_, current_partition_rows)] = partitioned_rdd \       

.filter(lambda x: x[0] == current_partition_index) \       

.collect()    for row in current_partition_rows:       

convert_and_insert(row)

Функция convert_and_insert() будет предоставлена ​​поставщиками данных. К примеру, в случае BI-системы Tableau можно обратиться к Tableau SDK (для формата .tde) или Tableau API 2.0 (для формата .hyper), которые имеют C ++, Java и Python API.

Это предполагает полный контроль разработчика Big Data над следующими факторами:

  • память драйвера, которую можно настроить для текущего сеанса Спарк через spark.driver.memory, потому что все разделы, которые будут собраны в этот драйвер один за другим, должны соответствовать;
  • разбиение исходного датафрейма, поскольку ни один раздел не должен превышать объем памяти драйвера, чтобы избежать ошибок OutOfMemory, о которой мы говорили вчера.
  1. Экспорт исходного файла в облако, чтобы сделать данные доступными для пользователей. Например, в случае Tableau это выполняется с помощью REST API для публикации файлов этой BI-системы на ее сервере. Как правило, каждый вендор предоставляет разработчикам выделенные API для публикации данных в облаке. Однако, поскольку не все API достаточно подробно документированы, имеет смысл просмотреть код напрямую, чтобы понять, реализована нужная вам функция.
  2. Упаковка готового коннектора в удобный для пользователей вид. Например, можно реализовать интерфейс командной строки на Python поверх коннектора. Это позволит пользователю выбирать исходную и целевую среды, между которыми нужно передать данные. С учетом заданных параметров и файла конфигурации, запускаются сервисы преобразования и экспорта для публикации форматированных данных в облаке.
Apache Spark Connector
Принцип работы коннектора Apache Spark

Подробное видео о разработке собственного коннектора Спарк для MongoDB на Java, Python и R доступно для просмотра в [4]. Примечательно, что рассмотренный пример можно легко адаптировать для интеграции Apache Spark с другим источником данных благодаря универсальной последовательности действий по разработке уникального коннектора [4]:

  • привести распределенные данные к подходящему формату;
  • реализовать код коннектора на Scala или Java;
  • предоставить коннектору доступ к нужным функциям фреймворка Спарк, например, потоковая передача Spark Streaming и Spark SQL;
  • протестировать работу созданного коннектора;
  • расширить разработанный коннектор для поддержки Python и R (если необходимо);
  • опубликовать готовый коннектор для общего использования.

В следующей статье мы продолжим разговор про коннекторы Спарк с целью обучения инженеров больших данных и рассмотрим, как устроен Greenplum-Spark Connector, а также кейсы и примеры его практического использования.

А освоить практические особенности эксплуатации Apache Spark для аналитики больших данных вы сможете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

 

 

Источники

  1. https://www.sicara.ai/blog/2018-12-12-publish-data-outside-data-lake-spark-connector
  2. https://greenplum.org/introducing-pivotal-greenplum-spark-connector-integrating-apache-spark/
  3. https://www.mongodb.com/products/spark-connector
  4. https://databricks.com/session/how-to-connect-spark-to-your-own-datasource
Поиск по сайту