Кто кому заплатил: пример поиска банковских транзакций в Neo4j

Cypher Python Neo4j AuraDB графы примеры курсы обучение, обучение Neo4j примеры, курсы дата-аналитик Neo4j Python примеры обучение, обучение аналитике больших данных, Neo4j задачи на графах бизнес приложения примеры, Cypher Python Neo4j примеры курсы обучение, обучение большим данным, Школа Больших Данных Учебный Центр Коммерсант

Чтобы показать еще один вариант использования графовой базы данных Neo4j, сегодня реализуем небольшое Python-приложение, которое генерирует граф знаний в облачной платформе Aura DB. Ищем финансовые переводы между компаниями и физическими лицами, считаем общую сумму и визуализируем найденные транзакции с помощью библиотеки Networkx.

Python-приложение для работы с Neo4j в AuraDB

Как и в прошлой статье, для развертывания Neo4j я буду использовать облачный сервис AuraDB, тарифная политика предлагает вариант бесплатного использования. Поскольку бесплатный тариф не включает библиотек  со специальными графовыми алгоритмами, таких как Graph Data Science и APOC, придется самостоятельно писать функции для работы с графами на внутреннем языке запросов Cypher. В качестве примера возьмем ситуацию поиска финансовых транзакций между компаниями и физическими лицами. На практике это может быть востребовано в случаях финансового мониторинга и налоговых проверок.

Как обычно, пишу и запускаю Python-программу в интерактивной среде Google Colab, которая будет выполнять следующие действия:

  1. Подключаться к облачному инстансу графовой базы данных Neo4j , развернутом в сервисе AuraDB;
  2. Создавать граф знаний, состоящий из вершин разных типов, например, компания (Company) и физической лицо (Person), которые могут переводить друг другу деньги на расчетные счета. Такие транзакции и будут отношениями между узлами, которые имеют вес в виде суммы денежного перевода.
  3. Искать все пути между заданными вершинами с указанием общей суммы финансовых переводов.

Сперва установим необходимые библиотеки и импортируем нужные модули с помощью следующего кода:

##############################################ячейка №1 в Google Colab##############################################
#установка библиотке
!pip install neo4j
!pip install neo4jupyter
!pip install faker

#импорт модулей
from neo4j import GraphDatabase
import logging
from neo4j.exceptions import ServiceUnavailable
import pandas as pd
import networkx as nx
import matplotlib.pyplot as plt
import pandas as pd

# Импорт модуля faker
from faker import Faker
from faker.providers.address.ru_RU import Provider

Далее напишем код создания направленного графа в Neo4j:

##############################################ячейка №2 в Google Colab##############################################
# Создание объекта Faker с использованием провайдера адресов для России
fake = Faker('ru_RU')
fake.add_provider(Provider)

# Определение класса
class App:

    # Конструктор класса
    def __init__(self, uri, user, password):
        # Инициализация драйвера Neo4j с переданными параметрами
        self.driver = GraphDatabase.driver(uri, auth=(user, password))        

    # Метод класса для закрытия соединения с базой данных Neo4j
    def close(self):
        # Не забудьте закрыть соединение драйвера, когда закончите работу с ним
        self.driver.close()

    # Метод класса для создания графа
    def create_graph(self):
        with self.driver.session() as session:
            # Выполнение запроса на создание графа
            session.execute_write(self._create_graph)
            
    # Статический метод, который содержит запрос на создание графа 
    @staticmethod
    def _create_graph(tx):
        tx.run(f"""
        CREATE (A:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
        CREATE (B:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
        CREATE (C:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
        CREATE (D:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
        CREATE (E:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
        CREATE (F:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
        CREATE (G:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
        CREATE (H:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
        CREATE (I:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}'}})
        CREATE (K:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
        CREATE (L:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
        CREATE (M:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
        CREATE (N:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
        CREATE (A)-[:transaction {{ summa: 9340, date: date('2023-01-05')}}]->(B)
        CREATE (B)-[:transaction {{ summa: 8102, date: date('2023-11-02')}}]->(C)
        CREATE (C)-[:transaction {{ summa: 7345, date: date('2023-03-20')}}]->(I)
        CREATE (A)-[:transaction {{ summa: 9340, date: date('2023-03-03')}}]->(D)
        CREATE (B)-[:transaction {{ summa: 5752, date: date('2023-04-30')}}]->(D)
        CREATE (D)-[:transaction {{ summa: 3501, date: date('2023-05-26')}}]->(E)
        CREATE (D)-[:transaction {{ summa: 3103, date: date('2023-04-20')}}]->(B)
        CREATE (E)-[:transaction {{ summa: 1201, date: date('2023-05-20')}}]->(I)
        CREATE (B)-[:transaction {{ summa: 6130, date: date('2023-02-10')}}]->(F)
        CREATE (F)-[:transaction {{ summa: 27450, date: date('2023-03-30')}}]->(G)
        CREATE (F)-[:transaction {{ summa: 7612, date: date('2023-05-12')}}]->(E)
        CREATE (G)-[:transaction {{ summa: 4020, date: date('2023-04-20')}}]->(H)
        CREATE (H)-[:transaction {{ summa: 2004, date: date('2023-05-20')}}]->(I)
        CREATE (I)-[:transaction {{ summa: 2004, date: date('2023-04-02')}}]->(A)
        CREATE (K)-[:transaction {{ summa: 804, date: date('2023-04-02')}}]->(A)
        CREATE (K)-[:transaction {{ summa: 2004, date: date('2023-05-05')}}]->(D)
        CREATE (K)-[:transaction {{ summa: 812, date: date('2023-03-02')}}]->(H)
        CREATE (N)-[:transaction {{ summa: 9012, date: date('2023-02-22')}}]->(E)
        CREATE (N)-[:transaction {{ summa: 1090, date: date('2023-03-16')}}]->(B)
        CREATE (M)-[:transaction {{ summa: 1083, date: date('2023-02-19')}}]->(C)
        CREATE (E)-[:transaction {{ summa: 8065, date: date('2023-05-29')}}]->(M)
        CREATE (L)-[:transaction {{ summa: 2398, date: date('2023-05-18')}}]->(G)
        """)
        
# Основная программа
if __name__ == "__main__":
    # Aura запросы используют зашифрованное соединение с использованием схемы URI "neo4j+s"
    uri = "neo4j+s://ваш_адрес_в_Aura_db"
    user = "имя_вашего_пользователя_в_Aura_db"
    password = "пароль_вашего_пользователя_в_Aura_db"
    # Создание экземпляра класса, передача параметров для инициализации драйвера Neo4j
    app = App(uri, user, password)
    # Вызов метода класса для создания графа
    app.create_graph()
    # Закрытие соединения с базой данных Neo4j
    app.close()

В этом Python-коде Cypher-скрипт для создания вершин и ребер является аргументом метода run объекта tx в UDF-функции create_graph(self). Метод run() запускает Cypher-запрос в транзакции с автоматической фиксацией. Запрос отправляется, и заголовок результата будет получен немедленно, но содержимое результата будет извлекаться отложено, по мере использования клиентским приложением.

Чтобы не писать собственный словарь случайных значений с названиями компаний и ФИО клиентов Банка, воспользуемся библиотекой Faker, о которой я недавно писала здесь. Эта библиотека имеет методы генерации нужных данных. А благодаря наличию локализованных провайдеров, можно генерировать русскоязычные ФИО, названия компаний и примеры банковских расчетных счетов. В результате выполнения этого запроса в Neo4j создастся граф знаний, который в AuraDB визуализируется так:

Neo4j транзакции денежные переводы пример
Граф финансовых транзакций в Neo4j

Далее напишем скрипт поиска кратчайшего пути, например, между узлами «ЗАО «Сазонова Ершова» и «Терентьев Андрей Филиппович». Для визуализации найденных путей в области вывода Google Colab снова пригодится библиотека Networkx. Чтобы улучшить наглядность результатов, каждый из найденных путей будем отображать новым цветом. Для этого можно использовать метод safe_color_name() объекта Faker, но уже без локализации на русский язык. В результате Python-скрипт будет выглядеть так:

##############################################ячейка №3 в Google Colab##############################################
fake_color = Faker()
# Определение класса
class App:

    # Конструктор класса
    def __init__(self, uri, user, password):
        self.driver = GraphDatabase.driver(uri, auth=(user, password))

    # Метод класса для закрытия соединения с базой данных Neo4j
    def close(self):
        # Don't forget to close the driver connection when you are finished with it
        self.driver.close()

    # Метод для поиска всех путей между узлами графа
    def find_all_paths(self, from_node, to_node):
      with self.driver.session() as session:
        result = session.run("""
            MATCH (from:{from_label} {{ name: $from_name }}), (to:{to_label} {{ name: $to_name }}), 
                  path = (from)-[*]->(to) 
            RETURN distinct nodes(path) AS pathNodes, 
                length(path) AS pathLength,  
                reduce(sum = 0, r in relationships(path) | sum+r.summa) AS totalDistance
            ORDER BY pathLength
        """.format(from_label=from_node['label'], to_label=to_node['label']),
        from_name=from_node['name'], to_name=to_node['name']) # запрос к БД

        # добавляем эту строку, чтобы потреблять все результаты запроса
        result_list = list(result)

      paths = [] # список для хранения путей между узлами графа
      if result_list:
        for r in result_list:
            path = r['pathNodes']
            nodes = [p['name'] for p in path]
            distance = r['totalDistance']

            for i in range(len(nodes)-1):
                source = nodes[i]
                target = nodes[i+1]
                paths.append({'Source': source, 'Target': target, 'Weight': distance}) # Изменение Distance на Weight
            
            df = pd.DataFrame(paths)
            df = df.drop_duplicates() # удаление дубликатов

            graph=nx.DiGraph()
            graph.add_weighted_edges_from(df[['Source', 'Target', 'Weight']].values) # добавление весов на каждое ребро графа

            pos = nx.spring_layout(graph, k=1000)
            edges = [(u, v, d['weight']) for (u, v, d) in graph.edges(data=True) if d['weight'] > 1] # Изменение списка ребер для добавления веса
            
            color=fake_color.safe_color_name()

            labels = nx.get_edge_attributes(graph, 'weight') # Получение атрибута weight для ребер
            nx.draw_networkx_nodes(graph, pos, node_size=500, node_color=color)
            nx.draw_networkx_labels(graph, pos)
            nx.draw_networkx_edges(graph, pos, edgelist=edges, edge_color=color, arrows=True)
            nx.draw_networkx_edge_labels(graph, pos, edge_labels=labels) # Добавление подписей к ребрам

            plt.show() # вывод графа с найденным путем
            print('Найдены все возможные транзакции между узлами: ', nodes[0],' и ', nodes[len(nodes)-1], 'на общую сумму ', distance) #вывод результата
            k=0;
            for node in nodes:
              k=k+1;
              print(k,') ', node)
        return
      else:
        print("Транзакций между этими точками не найдено")
        return pd.DataFrame()

# Основная программа
if __name__ == "__main__":
    # Aura запросы используют зашифрованное соединение с использованием схемы URI "neo4j+s"
    uri = "neo4j+s://ваш_адрес_в_Aura_db"
    user = "имя_вашего_пользователя_в_Aura_db"
    password = "пароль_вашего_пользователя_в_Aura_db"
    # Создание экземпляра класса, передача параметров для инициализации драйвера Neo4j
    app = App(uri, user, password)
    # Вызов метода класса для поиска путей между начальным и конечным узлом
    start_node='название_стартового_узла'
    finish_node='название_конечного_узла'
    from_node = {'name': start_node, 'label': 'Company'} #начальный узел
    to_node = {'name': finish_node, 'label': 'Person'} #конечный узел
    result = app.find_all_paths(from_node, to_node)
    print(result)
    # Закрытие соединения с базой данных Neo4j
    app.close()

Результаты вывода в Google Colab:

Google Colab Networkx, Python графы
Визуализация графа в Google Colab с помощью Networkx

Можно получить подобную визуализацию не только в области вывода Colab, а в интерактивном дэшборде NeoDash, о чем я рассказываю в этом материале. В этой статье я показываю, как решить аналогичную задачу анализа банковских транзакций путем применения графовых алгоритмов Networkx к pandas-датафреймам. А здесь вы узнаете, почему этот способ не эффективен для обработки большого объема данных.

Освоить эти и другими инструментами работы с графами для практического использования в реальных проектах аналитики больших данных вам помогут специализированные курсы нашего лицензированного учебного центра обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://github.com/AnnaVichugova/PythonApps/blob/main/Neo4j_Bank
  2. https://neo4j.com/cloud/platform/aura-graph-database/
  3. https://neo4j.com/docs/api/python-driver/current/api.html
  4. https://faker.readthedocs.io/en/master/providers.html
Поиск по сайту