Интеграция Neo4j с Apache Spark: обзор коннектора 4.1

Neo4j Apache Spark интеграция коннектор примеры обучение курсы, обучение графовые алгоритмы Neo4j Apache Spark, курсы Neo4j, обучение Apache Spark, курсы Neo4j с Apache Spark GraphFrames GraphX, графовые алгоритмы в бизнес-приложениях курсы примеры обучение, Школа Больших Данных Учебный Центр Коммерсант

Продвигая наш новый курс по графовой аналитике больших данных в бизнес-приложениях, сегодня заглянем под капот коннектора Neo4j к Apache Spark. Сценарии использования, принципы работы, поддержка потоковой передачи Spark и другие новинки версии 4.1 для построения эффективных аналитических коннекторов с помощью алгоритмов на графах.

Как работает коннектор Neo4j к Apache Spark: краткий обзор

Осенью 2021 года Neo4j выпустил новую версию коннектора к Apache Spark, основными новинками которой стала доступность потоковой передачи Spark, как источника, так и приемника. Этот коннектор работает с Neo4j 3.5 и всей серией 4+, независимо от того, запускается ли он как отдельный экземпляр, в режиме кластера или как управляемая служба в облачном решении AuraDB. Neo4j Connector для Apache Spark версии 4.1 поддерживает Spark 2.4.5+ с Scala 2.11 и Scala 2.12 и Spark 3.0+ с Scala 2.12. В зависимости от комбинации версий Spark и Scala могут понадобиться разные файлы JAR-пакетов, которые имеют вид neo4j-connector-apache-spark _ $ {scala.version} _ $ {spark.version} _ $ {connector.version}.

Коннектор позволяет создавать или потреблять данные в Neo4j AuraDB и из него с любым источником, совместимым с Apache Spark. Коннектор Neo4j для Apache Spark упрощает интеграцию графов со Spark, позволяя считывать любой набор узлов или отношений как датафрейм в Spark или, наоборот, записывать любой DataFrame в Neo4j как набор узлов или отношений. Можно также использовать операторы языка запросов Cypher для обработки записей в датафрейм в выбранном пользователем шаблоне графа.

Примечательно, что этот коннектор не основан на Cypher для Apache Spark/Morpheus, предусматривающий предоставление интерпретатора, выполняющий запросы Cypher в среде Spark. Коннектор предоставляет собственное графовое представление для Spark и выполняет роль средства интеграции между Neo4j и Spark, фокусируясь на чтении и записи. Через этот коннектор весь код Cypher выполняется строго внутри Neo4j, а среда Spark работает как обычно, с датафреймами, причем коннектор не предоставляет примитивы графового API для Spark.

Поскольку коннектор основан на новом Spark DataSource API, другие интерпретаторы для Python и R, также будут работать: API остается прежним, потребуются лишь незначительные изменения синтаксиса, чтобы учесть различия между Python и Scala.

Изначально коннектор Spark записывал данные в Neo4j пакетами. Поскольку Neo4j является транзакционной СУБД, все изменения производятся внутри транзакции, которые имеют накладные расходы. Уменьшить эти накладные расходы и повысить производительность можно следующими способами:

  • увеличить размер пакета (size) — чем больше пакет, тем меньше транзакций выполняется для записи всех данных и тем меньше накладных расходов на транзакции;
  • установить в экземпляре Neo4j достаточный размер свободной кучи и кэша страниц, т.к. недостаток памяти не позволят зафиксировать большие пакеты, замедляя импорт.

Ранее коннектор Neo4j для Apache Spark позволял выполняться пакетную загрузку и перемещение данных из таких систем, как BigQuery, Snowflake, Azure Synapse и других в графы этой NoSQL-СУБД. Коннектор позволяет проектировать конвейеры графовой аналитики больших данных для многих вариантов использования. Однако, пакетные операции не работают с другими источниками потоковой передачи, такими как AWS Kinesis, Google Pubsub, Apache Kafka, Flume и пр. Поэтому в новой версии поддерживается потоковая передача Spark, которая пригодится для множества бизнес-сценариев. О них мы поговорим далее.

Поддержка потоковой передачи Spark в новой версии

Двунаправленная потоковая передача с помощью Spark актуальна для следующих бизнес-сценариев:

  • потоковый ETL, когда необходимо непрерывно собирать данные, очищать, агрегировать их и помещать в граф знаний Neo4j, чтобы обнаружить аномальное поведение почти в реальном времени;
  • обогащение данных – потоковая передача Spark может обогатить живые данные и связать их с другими статическими датасетами, добавляя нужные значения к графу с помощью Cypher;
  • обнаружение триггерных событийSpark Streaming позволяет быстро обнаруживать редкие или необычные («триггерные») события, которые могут указывать на потенциально серьезную проблему, и быстро реагировать на них. Например, банки используют триггеры для обнаружения мошеннических транзакций и предотвращения их, а в производстве часто приходится отправлять автоматические предупреждения, чтобы предотвратить поломки оборудования. Подробнее о применении графовых алгоритмах в бизнес-приложениях мы писали здесь.
  • машинное обучениедля ML-процессов, реализованных в Spark, потоковая передача изменений данных из графа знаний поможет в развертывании моделей онлайн-прогнозирования.

Поскольку потоковая передача работает в обоих направлениях, можно использовать поддержку Spark Streaming, чтобы добавить возможности графа в любой поток потоковых данных. Достаточно передать данные в Neo4j, обогатить их или преобразовать с помощью Cypher и других подходов, а затем быстро передать в нужное место назначения.

Neo4j Apache Spark интеграция коннектор примеры обучение курсы
Потоковая интеграция с Neo4j с Apache Spark

Коннектор просто повторно использует API Spark Streaming, рассматривая Neo4j как источник потоковой передачи. При наличии опыта работы с коннектором чтения/записи данных из/в Neo4j с помощью пакетного API, разработчик может использовать функции Apache Spark Structured Streaming, добавив несколько дополнительных параметров, например, указав момент считывания потока.

Например, следующий код на Scala показывает чтение данных о событиях пользовательского поведения из Neo4j и их запись в Spark датафрейм. Сперва создается поток, к которому запускается запрос, а результат его выполнения записывается во временную таблицу в памяти под названием «testReadStream». Далее идет запрос к этой таблице, чтобы увидеть, какие данные возвращаются.

graph_stream = (

  spark.readStream

    .format(«org.neo4j.spark.DataSource»)

    .option(«authentication.type», «basic»)

    .option(«url», url)

    .option(«authentication.basic.username», user)

    .option(«authentication.basic.password», password)

    .option(«streaming.property.name», «lastUpdated»)

    .option(«streaming.from», «ALL»)

    .option(«labels», «Person»)

    .load()

)

query = (graph_stream.writeStream

    .format(«memory»)

    .queryName(«testReadStream»)

    .start())

spark.sql(«select * from testReadStream»).show(1000, False)

 

А запись потока данных о событиях пользовательского поведения из AWS Kinesis в Neo4j с помощью Apache Spark Structured Streaming следующим Scala-кодом:

val kinesisQuery = kinesisStream

 .writeStream

 .format(«org.neo4j.spark.DataSource»)

 // Neo4j Aura connection options

 .option(«url», «neo4j+s://abcd.databases.neo4j.io»)

 .option(«authentication.type», «basic»)

 .option(«authentication.basic.username», «neo4j»)

 .option(«authentication.basic.password», «password»)

 .option(«checkpointLocation», «/tmp/kinesis2Neo4jCheckpoint»)

 // end connection options

 .option(«save.mode», «Append»)

 .option(«relationship», «CHECKED_IN»)

 .option(«relationship.save.strategy», «keys»)

 .option(«relationship.properties», «user_checkin_time:at»)

 .option(«relationship.source.labels», «:Attendee»)

 .option(«relationship.source.save.mode», «Overwrite»)

 .option(«relationship.source.node.keys», «user_name:name»)

 .option(«relationship.target.labels», «:Event»)

 .option(«relationship.target.save.mode», «Overwrite»)

 .option(«relationship.target.node.keys», «event_name:name»)

 .start()

 

Завтра мы продолжим разговор про интеграции этой NoSQL-СУБД с другими системами и рассмотрим, как связать ее с Apache Kafka. А о том, как разработчики Cypher хотели внедрить его в Spark, но так и не реализовали эту идею до конца, читайте в нашей новой статье

Еще больше интересных примеров графовой аналитики больших данных в бизнес-приложениях вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков больших данных в Москве:

Источники

  1. https://neo4j.com/blog/apache-spark-for-neo4j-auradb/
  2. https://neo4j.com/developer/spark/faq/
Контакты авторизированного учебного центра
«Школа Больших Данных»
Адрес:
127576, г. Москва, м. Алтуфьево, Илимская ул. 5 корпус 2, офис 319, БЦ «Бизнес-Депо»
Часы работы:
Понедельник - Пятница: 09.00 – 18.00
Остались вопросы?
Звоните нам +7 (495) 414-11-21 или отправьте сообщение через контактную форму. Также вы можете найти ответы на ваши вопросы в нашем сборнике часто задаваемых вопросов.
Оставьте сообщение, и мы перезвоним вам в течение рабочего дня
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Или напишите нам в соц.сетях
Поиск по сайту