Чтобы показать еще один вариант использования графовой базы данных Neo4j, сегодня реализуем небольшое Python-приложение, которое генерирует граф знаний в облачной платформе Aura DB. Ищем финансовые переводы между компаниями и физическими лицами, считаем общую сумму и визуализируем найденные транзакции с помощью библиотеки Networkx.
Python-приложение для работы с Neo4j в AuraDB
Как и в прошлой статье, для развертывания Neo4j я буду использовать облачный сервис AuraDB, тарифная политика предлагает вариант бесплатного использования. Поскольку бесплатный тариф не включает библиотек со специальными графовыми алгоритмами, таких как Graph Data Science и APOC, придется самостоятельно писать функции для работы с графами на внутреннем языке запросов Cypher. В качестве примера возьмем ситуацию поиска финансовых транзакций между компаниями и физическими лицами. На практике это может быть востребовано в случаях финансового мониторинга и налоговых проверок.
Как обычно, пишу и запускаю Python-программу в интерактивной среде Google Colab, которая будет выполнять следующие действия:
- Подключаться к облачному инстансу графовой базы данных Neo4j , развернутом в сервисе AuraDB;
- Создавать граф знаний, состоящий из вершин разных типов, например, компания (Company) и физической лицо (Person), которые могут переводить друг другу деньги на расчетные счета. Такие транзакции и будут отношениями между узлами, которые имеют вес в виде суммы денежного перевода.
- Искать все пути между заданными вершинами с указанием общей суммы финансовых переводов.
Сперва установим необходимые библиотеки и импортируем нужные модули с помощью следующего кода:
##############################################ячейка №1 в Google Colab############################################## #установка библиотке !pip install neo4j !pip install neo4jupyter !pip install faker #импорт модулей from neo4j import GraphDatabase import logging from neo4j.exceptions import ServiceUnavailable import pandas as pd import networkx as nx import matplotlib.pyplot as plt import pandas as pd # Импорт модуля faker from faker import Faker from faker.providers.address.ru_RU import Provider
Далее напишем код создания направленного графа в Neo4j:
##############################################ячейка №2 в Google Colab############################################## # Создание объекта Faker с использованием провайдера адресов для России fake = Faker('ru_RU') fake.add_provider(Provider) # Определение класса class App: # Конструктор класса def __init__(self, uri, user, password): # Инициализация драйвера Neo4j с переданными параметрами self.driver = GraphDatabase.driver(uri, auth=(user, password)) # Метод класса для закрытия соединения с базой данных Neo4j def close(self): # Не забудьте закрыть соединение драйвера, когда закончите работу с ним self.driver.close() # Метод класса для создания графа def create_graph(self): with self.driver.session() as session: # Выполнение запроса на создание графа session.execute_write(self._create_graph) # Статический метод, который содержит запрос на создание графа @staticmethod def _create_graph(tx): tx.run(f""" CREATE (A:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }}) CREATE (B:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }}) CREATE (C:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }}) CREATE (D:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }}) CREATE (E:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }}) CREATE (F:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }}) CREATE (G:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }}) CREATE (H:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }}) CREATE (I:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}'}}) CREATE (K:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }}) CREATE (L:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }}) CREATE (M:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }}) CREATE (N:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }}) CREATE (A)-[:transaction {{ summa: 9340, date: date('2023-01-05')}}]->(B) CREATE (B)-[:transaction {{ summa: 8102, date: date('2023-11-02')}}]->(C) CREATE (C)-[:transaction {{ summa: 7345, date: date('2023-03-20')}}]->(I) CREATE (A)-[:transaction {{ summa: 9340, date: date('2023-03-03')}}]->(D) CREATE (B)-[:transaction {{ summa: 5752, date: date('2023-04-30')}}]->(D) CREATE (D)-[:transaction {{ summa: 3501, date: date('2023-05-26')}}]->(E) CREATE (D)-[:transaction {{ summa: 3103, date: date('2023-04-20')}}]->(B) CREATE (E)-[:transaction {{ summa: 1201, date: date('2023-05-20')}}]->(I) CREATE (B)-[:transaction {{ summa: 6130, date: date('2023-02-10')}}]->(F) CREATE (F)-[:transaction {{ summa: 27450, date: date('2023-03-30')}}]->(G) CREATE (F)-[:transaction {{ summa: 7612, date: date('2023-05-12')}}]->(E) CREATE (G)-[:transaction {{ summa: 4020, date: date('2023-04-20')}}]->(H) CREATE (H)-[:transaction {{ summa: 2004, date: date('2023-05-20')}}]->(I) CREATE (I)-[:transaction {{ summa: 2004, date: date('2023-04-02')}}]->(A) CREATE (K)-[:transaction {{ summa: 804, date: date('2023-04-02')}}]->(A) CREATE (K)-[:transaction {{ summa: 2004, date: date('2023-05-05')}}]->(D) CREATE (K)-[:transaction {{ summa: 812, date: date('2023-03-02')}}]->(H) CREATE (N)-[:transaction {{ summa: 9012, date: date('2023-02-22')}}]->(E) CREATE (N)-[:transaction {{ summa: 1090, date: date('2023-03-16')}}]->(B) CREATE (M)-[:transaction {{ summa: 1083, date: date('2023-02-19')}}]->(C) CREATE (E)-[:transaction {{ summa: 8065, date: date('2023-05-29')}}]->(M) CREATE (L)-[:transaction {{ summa: 2398, date: date('2023-05-18')}}]->(G) """) # Основная программа if __name__ == "__main__": # Aura запросы используют зашифрованное соединение с использованием схемы URI "neo4j+s" uri = "neo4j+s://ваш_адрес_в_Aura_db" user = "имя_вашего_пользователя_в_Aura_db" password = "пароль_вашего_пользователя_в_Aura_db" # Создание экземпляра класса, передача параметров для инициализации драйвера Neo4j app = App(uri, user, password) # Вызов метода класса для создания графа app.create_graph() # Закрытие соединения с базой данных Neo4j app.close()
В этом Python-коде Cypher-скрипт для создания вершин и ребер является аргументом метода run объекта tx в UDF-функции create_graph(self). Метод run() запускает Cypher-запрос в транзакции с автоматической фиксацией. Запрос отправляется, и заголовок результата будет получен немедленно, но содержимое результата будет извлекаться отложено, по мере использования клиентским приложением.
Чтобы не писать собственный словарь случайных значений с названиями компаний и ФИО клиентов Банка, воспользуемся библиотекой Faker, о которой я недавно писала здесь. Эта библиотека имеет методы генерации нужных данных. А благодаря наличию локализованных провайдеров, можно генерировать русскоязычные ФИО, названия компаний и примеры банковских расчетных счетов. В результате выполнения этого запроса в Neo4j создастся граф знаний, который в AuraDB визуализируется так:
Далее напишем скрипт поиска кратчайшего пути, например, между узлами «ЗАО «Сазонова Ершова» и «Терентьев Андрей Филиппович». Для визуализации найденных путей в области вывода Google Colab снова пригодится библиотека Networkx. Чтобы улучшить наглядность результатов, каждый из найденных путей будем отображать новым цветом. Для этого можно использовать метод safe_color_name() объекта Faker, но уже без локализации на русский язык. В результате Python-скрипт будет выглядеть так:
##############################################ячейка №3 в Google Colab############################################## fake_color = Faker() # Определение класса class App: # Конструктор класса def __init__(self, uri, user, password): self.driver = GraphDatabase.driver(uri, auth=(user, password)) # Метод класса для закрытия соединения с базой данных Neo4j def close(self): # Don't forget to close the driver connection when you are finished with it self.driver.close() # Метод для поиска всех путей между узлами графа def find_all_paths(self, from_node, to_node): with self.driver.session() as session: result = session.run(""" MATCH (from:{from_label} {{ name: $from_name }}), (to:{to_label} {{ name: $to_name }}), path = (from)-[*]->(to) RETURN distinct nodes(path) AS pathNodes, length(path) AS pathLength, reduce(sum = 0, r in relationships(path) | sum+r.summa) AS totalDistance ORDER BY pathLength """.format(from_label=from_node['label'], to_label=to_node['label']), from_name=from_node['name'], to_name=to_node['name']) # запрос к БД # добавляем эту строку, чтобы потреблять все результаты запроса result_list = list(result) paths = [] # список для хранения путей между узлами графа if result_list: for r in result_list: path = r['pathNodes'] nodes = [p['name'] for p in path] distance = r['totalDistance'] for i in range(len(nodes)-1): source = nodes[i] target = nodes[i+1] paths.append({'Source': source, 'Target': target, 'Weight': distance}) # Изменение Distance на Weight df = pd.DataFrame(paths) df = df.drop_duplicates() # удаление дубликатов graph=nx.DiGraph() graph.add_weighted_edges_from(df[['Source', 'Target', 'Weight']].values) # добавление весов на каждое ребро графа pos = nx.spring_layout(graph, k=1000) edges = [(u, v, d['weight']) for (u, v, d) in graph.edges(data=True) if d['weight'] > 1] # Изменение списка ребер для добавления веса color=fake_color.safe_color_name() labels = nx.get_edge_attributes(graph, 'weight') # Получение атрибута weight для ребер nx.draw_networkx_nodes(graph, pos, node_size=500, node_color=color) nx.draw_networkx_labels(graph, pos) nx.draw_networkx_edges(graph, pos, edgelist=edges, edge_color=color, arrows=True) nx.draw_networkx_edge_labels(graph, pos, edge_labels=labels) # Добавление подписей к ребрам plt.show() # вывод графа с найденным путем print('Найдены все возможные транзакции между узлами: ', nodes[0],' и ', nodes[len(nodes)-1], 'на общую сумму ', distance) #вывод результата k=0; for node in nodes: k=k+1; print(k,') ', node) return else: print("Транзакций между этими точками не найдено") return pd.DataFrame() # Основная программа if __name__ == "__main__": # Aura запросы используют зашифрованное соединение с использованием схемы URI "neo4j+s" uri = "neo4j+s://ваш_адрес_в_Aura_db" user = "имя_вашего_пользователя_в_Aura_db" password = "пароль_вашего_пользователя_в_Aura_db" # Создание экземпляра класса, передача параметров для инициализации драйвера Neo4j app = App(uri, user, password) # Вызов метода класса для поиска путей между начальным и конечным узлом start_node='название_стартового_узла' finish_node='название_конечного_узла' from_node = {'name': start_node, 'label': 'Company'} #начальный узел to_node = {'name': finish_node, 'label': 'Person'} #конечный узел result = app.find_all_paths(from_node, to_node) print(result) # Закрытие соединения с базой данных Neo4j app.close()
Результаты вывода в Google Colab:
Можно получить подобную визуализацию не только в области вывода Colab, а в интерактивном дэшборде NeoDash, о чем я рассказываю в этом материале. В этой статье я показываю, как решить аналогичную задачу анализа банковских транзакций путем применения графовых алгоритмов Networkx к pandas-датафреймам. А здесь вы узнаете, почему этот способ не эффективен для обработки большого объема данных.
Освоить эти и другими инструментами работы с графами для практического использования в реальных проектах аналитики больших данных вам помогут специализированные курсы нашего лицензированного учебного центра обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники