Чтобы показать еще один вариант использования графовой базы данных Neo4j, сегодня реализуем небольшое Python-приложение, которое генерирует граф знаний в облачной платформе Aura DB. Ищем финансовые переводы между компаниями и физическими лицами, считаем общую сумму и визуализируем найденные транзакции с помощью библиотеки Networkx.
Python-приложение для работы с Neo4j в AuraDB
Как и в прошлой статье, для развертывания Neo4j я буду использовать облачный сервис AuraDB, тарифная политика предлагает вариант бесплатного использования. Поскольку бесплатный тариф не включает библиотек со специальными графовыми алгоритмами, таких как Graph Data Science и APOC, придется самостоятельно писать функции для работы с графами на внутреннем языке запросов Cypher. В качестве примера возьмем ситуацию поиска финансовых транзакций между компаниями и физическими лицами. На практике это может быть востребовано в случаях финансового мониторинга и налоговых проверок.
Как обычно, пишу и запускаю Python-программу в интерактивной среде Google Colab, которая будет выполнять следующие действия:
- Подключаться к облачному инстансу графовой базы данных Neo4j , развернутом в сервисе AuraDB;
- Создавать граф знаний, состоящий из вершин разных типов, например, компания (Company) и физической лицо (Person), которые могут переводить друг другу деньги на расчетные счета. Такие транзакции и будут отношениями между узлами, которые имеют вес в виде суммы денежного перевода.
- Искать все пути между заданными вершинами с указанием общей суммы финансовых переводов.
Сперва установим необходимые библиотеки и импортируем нужные модули с помощью следующего кода:
##############################################ячейка №1 в Google Colab############################################## #установка библиотке !pip install neo4j !pip install neo4jupyter !pip install faker #импорт модулей from neo4j import GraphDatabase import logging from neo4j.exceptions import ServiceUnavailable import pandas as pd import networkx as nx import matplotlib.pyplot as plt import pandas as pd # Импорт модуля faker from faker import Faker from faker.providers.address.ru_RU import Provider
Далее напишем код создания направленного графа в Neo4j:
##############################################ячейка №2 в Google Colab##############################################
# Создание объекта Faker с использованием провайдера адресов для России
fake = Faker('ru_RU')
fake.add_provider(Provider)
# Определение класса
class App:
# Конструктор класса
def __init__(self, uri, user, password):
# Инициализация драйвера Neo4j с переданными параметрами
self.driver = GraphDatabase.driver(uri, auth=(user, password))
# Метод класса для закрытия соединения с базой данных Neo4j
def close(self):
# Не забудьте закрыть соединение драйвера, когда закончите работу с ним
self.driver.close()
# Метод класса для создания графа
def create_graph(self):
with self.driver.session() as session:
# Выполнение запроса на создание графа
session.execute_write(self._create_graph)
# Статический метод, который содержит запрос на создание графа
@staticmethod
def _create_graph(tx):
tx.run(f"""
CREATE (A:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
CREATE (B:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
CREATE (C:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
CREATE (D:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
CREATE (E:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
CREATE (F:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
CREATE (G:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
CREATE (H:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
CREATE (I:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}'}})
CREATE (K:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
CREATE (L:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
CREATE (M:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
CREATE (N:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
CREATE (A)-[:transaction {{ summa: 9340, date: date('2023-01-05')}}]->(B)
CREATE (B)-[:transaction {{ summa: 8102, date: date('2023-11-02')}}]->(C)
CREATE (C)-[:transaction {{ summa: 7345, date: date('2023-03-20')}}]->(I)
CREATE (A)-[:transaction {{ summa: 9340, date: date('2023-03-03')}}]->(D)
CREATE (B)-[:transaction {{ summa: 5752, date: date('2023-04-30')}}]->(D)
CREATE (D)-[:transaction {{ summa: 3501, date: date('2023-05-26')}}]->(E)
CREATE (D)-[:transaction {{ summa: 3103, date: date('2023-04-20')}}]->(B)
CREATE (E)-[:transaction {{ summa: 1201, date: date('2023-05-20')}}]->(I)
CREATE (B)-[:transaction {{ summa: 6130, date: date('2023-02-10')}}]->(F)
CREATE (F)-[:transaction {{ summa: 27450, date: date('2023-03-30')}}]->(G)
CREATE (F)-[:transaction {{ summa: 7612, date: date('2023-05-12')}}]->(E)
CREATE (G)-[:transaction {{ summa: 4020, date: date('2023-04-20')}}]->(H)
CREATE (H)-[:transaction {{ summa: 2004, date: date('2023-05-20')}}]->(I)
CREATE (I)-[:transaction {{ summa: 2004, date: date('2023-04-02')}}]->(A)
CREATE (K)-[:transaction {{ summa: 804, date: date('2023-04-02')}}]->(A)
CREATE (K)-[:transaction {{ summa: 2004, date: date('2023-05-05')}}]->(D)
CREATE (K)-[:transaction {{ summa: 812, date: date('2023-03-02')}}]->(H)
CREATE (N)-[:transaction {{ summa: 9012, date: date('2023-02-22')}}]->(E)
CREATE (N)-[:transaction {{ summa: 1090, date: date('2023-03-16')}}]->(B)
CREATE (M)-[:transaction {{ summa: 1083, date: date('2023-02-19')}}]->(C)
CREATE (E)-[:transaction {{ summa: 8065, date: date('2023-05-29')}}]->(M)
CREATE (L)-[:transaction {{ summa: 2398, date: date('2023-05-18')}}]->(G)
""")
# Основная программа
if __name__ == "__main__":
# Aura запросы используют зашифрованное соединение с использованием схемы URI "neo4j+s"
uri = "neo4j+s://ваш_адрес_в_Aura_db"
user = "имя_вашего_пользователя_в_Aura_db"
password = "пароль_вашего_пользователя_в_Aura_db"
# Создание экземпляра класса, передача параметров для инициализации драйвера Neo4j
app = App(uri, user, password)
# Вызов метода класса для создания графа
app.create_graph()
# Закрытие соединения с базой данных Neo4j
app.close()
В этом Python-коде Cypher-скрипт для создания вершин и ребер является аргументом метода run объекта tx в UDF-функции create_graph(self). Метод run() запускает Cypher-запрос в транзакции с автоматической фиксацией. Запрос отправляется, и заголовок результата будет получен немедленно, но содержимое результата будет извлекаться отложено, по мере использования клиентским приложением.
Чтобы не писать собственный словарь случайных значений с названиями компаний и ФИО клиентов Банка, воспользуемся библиотекой Faker, о которой я недавно писала здесь. Эта библиотека имеет методы генерации нужных данных. А благодаря наличию локализованных провайдеров, можно генерировать русскоязычные ФИО, названия компаний и примеры банковских расчетных счетов. В результате выполнения этого запроса в Neo4j создастся граф знаний, который в AuraDB визуализируется так:

Далее напишем скрипт поиска кратчайшего пути, например, между узлами «ЗАО «Сазонова Ершова» и «Терентьев Андрей Филиппович». Для визуализации найденных путей в области вывода Google Colab снова пригодится библиотека Networkx. Чтобы улучшить наглядность результатов, каждый из найденных путей будем отображать новым цветом. Для этого можно использовать метод safe_color_name() объекта Faker, но уже без локализации на русский язык. В результате Python-скрипт будет выглядеть так:
##############################################ячейка №3 в Google Colab##############################################
fake_color = Faker()
# Определение класса
class App:
# Конструктор класса
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
# Метод класса для закрытия соединения с базой данных Neo4j
def close(self):
# Don't forget to close the driver connection when you are finished with it
self.driver.close()
# Метод для поиска всех путей между узлами графа
def find_all_paths(self, from_node, to_node):
with self.driver.session() as session:
result = session.run("""
MATCH (from:{from_label} {{ name: $from_name }}), (to:{to_label} {{ name: $to_name }}),
path = (from)-[*]->(to)
RETURN distinct nodes(path) AS pathNodes,
length(path) AS pathLength,
reduce(sum = 0, r in relationships(path) | sum+r.summa) AS totalDistance
ORDER BY pathLength
""".format(from_label=from_node['label'], to_label=to_node['label']),
from_name=from_node['name'], to_name=to_node['name']) # запрос к БД
# добавляем эту строку, чтобы потреблять все результаты запроса
result_list = list(result)
paths = [] # список для хранения путей между узлами графа
if result_list:
for r in result_list:
path = r['pathNodes']
nodes = [p['name'] for p in path]
distance = r['totalDistance']
for i in range(len(nodes)-1):
source = nodes[i]
target = nodes[i+1]
paths.append({'Source': source, 'Target': target, 'Weight': distance}) # Изменение Distance на Weight
df = pd.DataFrame(paths)
df = df.drop_duplicates() # удаление дубликатов
graph=nx.DiGraph()
graph.add_weighted_edges_from(df[['Source', 'Target', 'Weight']].values) # добавление весов на каждое ребро графа
pos = nx.spring_layout(graph, k=1000)
edges = [(u, v, d['weight']) for (u, v, d) in graph.edges(data=True) if d['weight'] > 1] # Изменение списка ребер для добавления веса
color=fake_color.safe_color_name()
labels = nx.get_edge_attributes(graph, 'weight') # Получение атрибута weight для ребер
nx.draw_networkx_nodes(graph, pos, node_size=500, node_color=color)
nx.draw_networkx_labels(graph, pos)
nx.draw_networkx_edges(graph, pos, edgelist=edges, edge_color=color, arrows=True)
nx.draw_networkx_edge_labels(graph, pos, edge_labels=labels) # Добавление подписей к ребрам
plt.show() # вывод графа с найденным путем
print('Найдены все возможные транзакции между узлами: ', nodes[0],' и ', nodes[len(nodes)-1], 'на общую сумму ', distance) #вывод результата
k=0;
for node in nodes:
k=k+1;
print(k,') ', node)
return
else:
print("Транзакций между этими точками не найдено")
return pd.DataFrame()
# Основная программа
if __name__ == "__main__":
# Aura запросы используют зашифрованное соединение с использованием схемы URI "neo4j+s"
uri = "neo4j+s://ваш_адрес_в_Aura_db"
user = "имя_вашего_пользователя_в_Aura_db"
password = "пароль_вашего_пользователя_в_Aura_db"
# Создание экземпляра класса, передача параметров для инициализации драйвера Neo4j
app = App(uri, user, password)
# Вызов метода класса для поиска путей между начальным и конечным узлом
start_node='название_стартового_узла'
finish_node='название_конечного_узла'
from_node = {'name': start_node, 'label': 'Company'} #начальный узел
to_node = {'name': finish_node, 'label': 'Person'} #конечный узел
result = app.find_all_paths(from_node, to_node)
print(result)
# Закрытие соединения с базой данных Neo4j
app.close()
Результаты вывода в Google Colab:

Можно получить подобную визуализацию не только в области вывода Colab, а в интерактивном дэшборде NeoDash, о чем я рассказываю в этом материале. В этой статье я показываю, как решить аналогичную задачу анализа банковских транзакций путем применения графовых алгоритмов Networkx к pandas-датафреймам. А здесь вы узнаете, почему этот способ не эффективен для обработки большого объема данных.
Освоить эти и другими инструментами работы с графами для практического использования в реальных проектах аналитики больших данных вам помогут специализированные курсы нашего лицензированного учебного центра обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
[elementor-template id=»13619″]
Источники


