Кто кому заплатил: пример поиска банковских транзакций в Neo4j

Кто кому заплатил: пример поиска банковских транзакций в Neo4j

Чтобы показать еще один вариант использования графовой базы данных Neo4j, сегодня реализуем небольшое Python-приложение, которое генерирует граф знаний в облачной платформе Aura DB. Ищем финансовые переводы между компаниями и физическими лицами, считаем общую сумму и визуализируем найденные транзакции с помощью библиотеки Networkx.

Python-приложение для работы с Neo4j в AuraDB

Как и в прошлой статье, для развертывания Neo4j я буду использовать облачный сервис AuraDB, тарифная политика предлагает вариант бесплатного использования. Поскольку бесплатный тариф не включает библиотек  со специальными графовыми алгоритмами, таких как Graph Data Science и APOC, придется самостоятельно писать функции для работы с графами на внутреннем языке запросов Cypher. В качестве примера возьмем ситуацию поиска финансовых транзакций между компаниями и физическими лицами. На практике это может быть востребовано в случаях финансового мониторинга и налоговых проверок.

Как обычно, пишу и запускаю Python-программу в интерактивной среде Google Colab, которая будет выполнять следующие действия:

  1. Подключаться к облачному инстансу графовой базы данных Neo4j , развернутом в сервисе AuraDB;
  2. Создавать граф знаний, состоящий из вершин разных типов, например, компания (Company) и физической лицо (Person), которые могут переводить друг другу деньги на расчетные счета. Такие транзакции и будут отношениями между узлами, которые имеют вес в виде суммы денежного перевода.
  3. Искать все пути между заданными вершинами с указанием общей суммы финансовых переводов.

Сперва установим необходимые библиотеки и импортируем нужные модули с помощью следующего кода:

##############################################ячейка №1 в Google Colab##############################################
#установка библиотке
!pip install neo4j
!pip install neo4jupyter
!pip install faker

#импорт модулей
from neo4j import GraphDatabase
import logging
from neo4j.exceptions import ServiceUnavailable
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
import pandas as pd

# Импорт модуля faker
from faker import Faker
from faker.providers.address.ru_RU import Provider

Далее напишем код создания направленного графа в Neo4j:

##############################################ячейка №2 в Google Colab##############################################
# Создание объекта Faker с использованием провайдера адресов для России
fake = Faker('ru_RU')
fake.add_provider(Provider)

# Определение класса
class App:

    # Конструктор класса
    def __init__(self, uri, user, password):
        # Инициализация драйвера Neo4j с переданными параметрами
        self.driver = GraphDatabase.driver(uri, auth=(user, password))        

    # Метод класса для закрытия соединения с базой данных Neo4j
    def close(self):
        # Не забудьте закрыть соединение драйвера, когда закончите работу с ним
        self.driver.close()

    # Метод класса для создания графа
    def create_graph(self):
        with self.driver.session() as session:
            # Выполнение запроса на создание графа
            session.execute_write(self._create_graph)
            
    # Статический метод, который содержит запрос на создание графа 
    @staticmethod
    def _create_graph(tx):
        tx.run(f"""
        CREATE (A:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
        CREATE (B:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
        CREATE (C:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
        CREATE (D:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
        CREATE (E:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
        CREATE (F:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
        CREATE (G:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
        CREATE (H:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
        CREATE (I:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}'}})
        CREATE (K:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
        CREATE (L:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
        CREATE (M:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
        CREATE (N:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
        CREATE (A)-[:transaction {{ summa: 9340, date: date('2023-01-05')}}]->(B)
        CREATE (B)-[:transaction {{ summa: 8102, date: date('2023-11-02')}}]->(C)
        CREATE (C)-[:transaction {{ summa: 7345, date: date('2023-03-20')}}]->(I)
        CREATE (A)-[:transaction {{ summa: 9340, date: date('2023-03-03')}}]->(D)
        CREATE (B)-[:transaction {{ summa: 5752, date: date('2023-04-30')}}]->(D)
        CREATE (D)-[:transaction {{ summa: 3501, date: date('2023-05-26')}}]->(E)
        CREATE (D)-[:transaction {{ summa: 3103, date: date('2023-04-20')}}]->(B)
        CREATE (E)-[:transaction {{ summa: 1201, date: date('2023-05-20')}}]->(I)
        CREATE (B)-[:transaction {{ summa: 6130, date: date('2023-02-10')}}]->(F)
        CREATE (F)-[:transaction {{ summa: 27450, date: date('2023-03-30')}}]->(G)
        CREATE (F)-[:transaction {{ summa: 7612, date: date('2023-05-12')}}]->(E)
        CREATE (G)-[:transaction {{ summa: 4020, date: date('2023-04-20')}}]->(H)
        CREATE (H)-[:transaction {{ summa: 2004, date: date('2023-05-20')}}]->(I)
        CREATE (I)-[:transaction {{ summa: 2004, date: date('2023-04-02')}}]->(A)
        CREATE (K)-[:transaction {{ summa: 804, date: date('2023-04-02')}}]->(A)
        CREATE (K)-[:transaction {{ summa: 2004, date: date('2023-05-05')}}]->(D)
        CREATE (K)-[:transaction {{ summa: 812, date: date('2023-03-02')}}]->(H)
        CREATE (N)-[:transaction {{ summa: 9012, date: date('2023-02-22')}}]->(E)
        CREATE (N)-[:transaction {{ summa: 1090, date: date('2023-03-16')}}]->(B)
        CREATE (M)-[:transaction {{ summa: 1083, date: date('2023-02-19')}}]->(C)
        CREATE (E)-[:transaction {{ summa: 8065, date: date('2023-05-29')}}]->(M)
        CREATE (L)-[:transaction {{ summa: 2398, date: date('2023-05-18')}}]->(G)
        """)
        
# Основная программа
if __name__ == "__main__":
    # Aura запросы используют зашифрованное соединение с использованием схемы URI "neo4j+s"
    uri = "neo4j+s://ваш_адрес_в_Aura_db"
    user = "имя_вашего_пользователя_в_Aura_db"
    password = "пароль_вашего_пользователя_в_Aura_db"
    # Создание экземпляра класса, передача параметров для инициализации драйвера Neo4j
    app = App(uri, user, password)
    # Вызов метода класса для создания графа
    app.create_graph()
    # Закрытие соединения с базой данных Neo4j
    app.close()

В этом Python-коде Cypher-скрипт для создания вершин и ребер является аргументом метода run объекта tx в UDF-функции create_graph(self). Метод run() запускает Cypher-запрос в транзакции с автоматической фиксацией. Запрос отправляется, и заголовок результата будет получен немедленно, но содержимое результата будет извлекаться отложено, по мере использования клиентским приложением.

Чтобы не писать собственный словарь случайных значений с названиями компаний и ФИО клиентов Банка, воспользуемся библиотекой Faker, о которой я недавно писала здесь. Эта библиотека имеет методы генерации нужных данных. А благодаря наличию локализованных провайдеров, можно генерировать русскоязычные ФИО, названия компаний и примеры банковских расчетных счетов. В результате выполнения этого запроса в Neo4j создастся граф знаний, который в AuraDB визуализируется так:

Neo4j транзакции денежные переводы пример
Граф финансовых транзакций в Neo4j

Далее напишем скрипт поиска кратчайшего пути, например, между узлами «ЗАО «Сазонова Ершова» и «Терентьев Андрей Филиппович». Для визуализации найденных путей в области вывода Google Colab снова пригодится библиотека Networkx. Чтобы улучшить наглядность результатов, каждый из найденных путей будем отображать новым цветом. Для этого можно использовать метод safe_color_name() объекта Faker, но уже без локализации на русский язык. В результате Python-скрипт будет выглядеть так:

##############################################ячейка №3 в Google Colab##############################################
fake_color = Faker()
# Определение класса
class App:

    # Конструктор класса
    def __init__(self, uri, user, password):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    # Метод класса для закрытия соединения с базой данных Neo4j
    def close(self):
        # Don't forget to close the driver connection when you are finished with it
        self.driver.close()

    # Метод для поиска всех путей между узлами графа
    def find_all_paths(self, from_node, to_node):
      with self.driver.session() as session:
        result = session.run("""
            MATCH (from:{from_label} {{ name: $from_name }}), (to:{to_label} {{ name: $to_name }}), 
                  path = (from)-[*]->(to) 
            RETURN distinct nodes(path) AS pathNodes, 
                length(path) AS pathLength,  
                reduce(sum = 0, r in relationships(path) | sum+r.summa) AS totalDistance
            ORDER BY pathLength
        """.format(from_label=from_node['label'], to_label=to_node['label']),
        from_name=from_node['name'], to_name=to_node['name']) # запрос к БД

        # добавляем эту строку, чтобы потреблять все результаты запроса
        result_list = list(result)

      paths = [] # список для хранения путей между узлами графа
      if result_list:
        for r in result_list:
            path = r['pathNodes']
            nodes = [p['name'] for p in path]
            distance = r['totalDistance']

            for i in range(len(nodes)-1):
                source = nodes[i]
                target = nodes[i+1]
                paths.append({'Source': source, 'Target': target, 'Weight': distance}) # Изменение Distance на Weight
            
            df = pd.DataFrame(paths)
            df = df.drop_duplicates() # удаление дубликатов

            graph=nx.DiGraph()
            graph.add_weighted_edges_from(df[['Source', 'Target', 'Weight']].values) # добавление весов на каждое ребро графа

            pos = nx.spring_layout(graph, k=1000)
            edges = [(u, v, d['weight']) for (u, v, d) in graph.edges(data=True) if d['weight'] > 1] # Изменение списка ребер для добавления веса
            
            color=fake_color.safe_color_name()

            labels = nx.get_edge_attributes(graph, 'weight') # Получение атрибута weight для ребер
            nx.draw_networkx_nodes(graph, pos, node_size=500, node_color=color)
            nx.draw_networkx_labels(graph, pos)
            nx.draw_networkx_edges(graph, pos, edgelist=edges, edge_color=color, arrows=True)
            nx.draw_networkx_edge_labels(graph, pos, edge_labels=labels) # Добавление подписей к ребрам

            plt.show() # вывод графа с найденным путем
            print('Найдены все возможные транзакции между узлами: ', nodes[0],' и ', nodes[len(nodes)-1], 'на общую сумму ', distance) #вывод результата
            k=0;
            for node in nodes:
              k=k+1;
              print(k,') ', node)
        return
      else:
        print("Транзакций между этими точками не найдено")
        return pd.DataFrame()

# Основная программа
if __name__ == "__main__":
    # Aura запросы используют зашифрованное соединение с использованием схемы URI "neo4j+s"
    uri = "neo4j+s://ваш_адрес_в_Aura_db"
    user = "имя_вашего_пользователя_в_Aura_db"
    password = "пароль_вашего_пользователя_в_Aura_db"
    # Создание экземпляра класса, передача параметров для инициализации драйвера Neo4j
    app = App(uri, user, password)
    # Вызов метода класса для поиска путей между начальным и конечным узлом
    start_node='название_стартового_узла'
    finish_node='название_конечного_узла'
    from_node = {'name': start_node, 'label': 'Company'} #начальный узел
    to_node = {'name': finish_node, 'label': 'Person'} #конечный узел
    result = app.find_all_paths(from_node, to_node)
    print(result)
    # Закрытие соединения с базой данных Neo4j
    app.close()

Результаты вывода в Google Colab:

Google Colab Networkx, Python графы
Визуализация графа в Google Colab с помощью Networkx

Можно получить подобную визуализацию не только в области вывода Colab, а в интерактивном дэшборде NeoDash, о чем я рассказываю в этом материале. В этой статье я показываю, как решить аналогичную задачу анализа банковских транзакций путем применения графовых алгоритмов Networkx к pandas-датафреймам. А здесь вы узнаете, почему этот способ не эффективен для обработки большого объема данных.

Освоить эти и другими инструментами работы с графами для практического использования в реальных проектах аналитики больших данных вам помогут специализированные курсы нашего лицензированного учебного центра обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

[elementor-template id=»13619″]

Источники

  1. https://github.com/AnnaVichugova/PythonApps/blob/main/Neo4j_Bank
  2. https://faker.readthedocs.io/en/master/providers/