Кто кому заплатил: пример поиска банковских транзакций в Neo4j

Кто кому заплатил: пример поиска банковских транзакций в Neo4j

    Чтобы показать еще один вариант использования графовой базы данных Neo4j, сегодня реализуем небольшое Python-приложение, которое генерирует граф знаний в облачной платформе Aura DB. Ищем финансовые переводы между компаниями и физическими лицами, считаем общую сумму и визуализируем найденные транзакции с помощью библиотеки Networkx.

    Python-приложение для работы с Neo4j в AuraDB

    Как и в прошлой статье, для развертывания Neo4j я буду использовать облачный сервис AuraDB, тарифная политика предлагает вариант бесплатного использования. Поскольку бесплатный тариф не включает библиотек  со специальными графовыми алгоритмами, таких как Graph Data Science и APOC, придется самостоятельно писать функции для работы с графами на внутреннем языке запросов Cypher. В качестве примера возьмем ситуацию поиска финансовых транзакций между компаниями и физическими лицами. На практике это может быть востребовано в случаях финансового мониторинга и налоговых проверок.

    Как обычно, пишу и запускаю Python-программу в интерактивной среде Google Colab, которая будет выполнять следующие действия:

    1. Подключаться к облачному инстансу графовой базы данных Neo4j , развернутом в сервисе AuraDB;
    2. Создавать граф знаний, состоящий из вершин разных типов, например, компания (Company) и физической лицо (Person), которые могут переводить друг другу деньги на расчетные счета. Такие транзакции и будут отношениями между узлами, которые имеют вес в виде суммы денежного перевода.
    3. Искать все пути между заданными вершинами с указанием общей суммы финансовых переводов.

    Сперва установим необходимые библиотеки и импортируем нужные модули с помощью следующего кода:

    ##############################################ячейка №1 в Google Colab##############################################
    #установка библиотке
    !pip install neo4j
    !pip install neo4jupyter
    !pip install faker
    
    #импорт модулей
    from neo4j import GraphDatabase
    import logging
    from neo4j.exceptions import ServiceUnavailable
    import pandas as pd
    import networkx as nx
    import matplotlib.pyplot as plt
    import pandas as pd
    
    # Импорт модуля faker
    from faker import Faker
    from faker.providers.address.ru_RU import Provider

    Далее напишем код создания направленного графа в Neo4j:

    ##############################################ячейка №2 в Google Colab##############################################
    # Создание объекта Faker с использованием провайдера адресов для России
    fake = Faker('ru_RU')
    fake.add_provider(Provider)
    
    # Определение класса
    class App:
    
        # Конструктор класса
        def __init__(self, uri, user, password):
            # Инициализация драйвера Neo4j с переданными параметрами
            self.driver = GraphDatabase.driver(uri, auth=(user, password))        
    
        # Метод класса для закрытия соединения с базой данных Neo4j
        def close(self):
            # Не забудьте закрыть соединение драйвера, когда закончите работу с ним
            self.driver.close()
    
        # Метод класса для создания графа
        def create_graph(self):
            with self.driver.session() as session:
                # Выполнение запроса на создание графа
                session.execute_write(self._create_graph)
                
        # Статический метод, который содержит запрос на создание графа 
        @staticmethod
        def _create_graph(tx):
            tx.run(f"""
            CREATE (A:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
            CREATE (B:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
            CREATE (C:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
            CREATE (D:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
            CREATE (E:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
            CREATE (F:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
            CREATE (G:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
            CREATE (H:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
            CREATE (I:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}'}})
            CREATE (K:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
            CREATE (L:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
            CREATE (M:Company {{ name: '{fake.company()}', checking_account: '{fake.checking_account()}' }})
            CREATE (N:Person {{ name: '{fake.name()}', checking_account: '{fake.checking_account()}' }})
            CREATE (A)-[:transaction {{ summa: 9340, date: date('2023-01-05')}}]->(B)
            CREATE (B)-[:transaction {{ summa: 8102, date: date('2023-11-02')}}]->(C)
            CREATE (C)-[:transaction {{ summa: 7345, date: date('2023-03-20')}}]->(I)
            CREATE (A)-[:transaction {{ summa: 9340, date: date('2023-03-03')}}]->(D)
            CREATE (B)-[:transaction {{ summa: 5752, date: date('2023-04-30')}}]->(D)
            CREATE (D)-[:transaction {{ summa: 3501, date: date('2023-05-26')}}]->(E)
            CREATE (D)-[:transaction {{ summa: 3103, date: date('2023-04-20')}}]->(B)
            CREATE (E)-[:transaction {{ summa: 1201, date: date('2023-05-20')}}]->(I)
            CREATE (B)-[:transaction {{ summa: 6130, date: date('2023-02-10')}}]->(F)
            CREATE (F)-[:transaction {{ summa: 27450, date: date('2023-03-30')}}]->(G)
            CREATE (F)-[:transaction {{ summa: 7612, date: date('2023-05-12')}}]->(E)
            CREATE (G)-[:transaction {{ summa: 4020, date: date('2023-04-20')}}]->(H)
            CREATE (H)-[:transaction {{ summa: 2004, date: date('2023-05-20')}}]->(I)
            CREATE (I)-[:transaction {{ summa: 2004, date: date('2023-04-02')}}]->(A)
            CREATE (K)-[:transaction {{ summa: 804, date: date('2023-04-02')}}]->(A)
            CREATE (K)-[:transaction {{ summa: 2004, date: date('2023-05-05')}}]->(D)
            CREATE (K)-[:transaction {{ summa: 812, date: date('2023-03-02')}}]->(H)
            CREATE (N)-[:transaction {{ summa: 9012, date: date('2023-02-22')}}]->(E)
            CREATE (N)-[:transaction {{ summa: 1090, date: date('2023-03-16')}}]->(B)
            CREATE (M)-[:transaction {{ summa: 1083, date: date('2023-02-19')}}]->(C)
            CREATE (E)-[:transaction {{ summa: 8065, date: date('2023-05-29')}}]->(M)
            CREATE (L)-[:transaction {{ summa: 2398, date: date('2023-05-18')}}]->(G)
            """)
            
    # Основная программа
    if __name__ == "__main__":
        # Aura запросы используют зашифрованное соединение с использованием схемы URI "neo4j+s"
        uri = "neo4j+s://ваш_адрес_в_Aura_db"
        user = "имя_вашего_пользователя_в_Aura_db"
        password = "пароль_вашего_пользователя_в_Aura_db"
        # Создание экземпляра класса, передача параметров для инициализации драйвера Neo4j
        app = App(uri, user, password)
        # Вызов метода класса для создания графа
        app.create_graph()
        # Закрытие соединения с базой данных Neo4j
        app.close()

    В этом Python-коде Cypher-скрипт для создания вершин и ребер является аргументом метода run объекта tx в UDF-функции create_graph(self). Метод run() запускает Cypher-запрос в транзакции с автоматической фиксацией. Запрос отправляется, и заголовок результата будет получен немедленно, но содержимое результата будет извлекаться отложено, по мере использования клиентским приложением.

    Чтобы не писать собственный словарь случайных значений с названиями компаний и ФИО клиентов Банка, воспользуемся библиотекой Faker, о которой я недавно писала здесь. Эта библиотека имеет методы генерации нужных данных. А благодаря наличию локализованных провайдеров, можно генерировать русскоязычные ФИО, названия компаний и примеры банковских расчетных счетов. В результате выполнения этого запроса в Neo4j создастся граф знаний, который в AuraDB визуализируется так:

    Neo4j транзакции денежные переводы пример
    Граф финансовых транзакций в Neo4j

    Далее напишем скрипт поиска кратчайшего пути, например, между узлами «ЗАО «Сазонова Ершова» и «Терентьев Андрей Филиппович». Для визуализации найденных путей в области вывода Google Colab снова пригодится библиотека Networkx. Чтобы улучшить наглядность результатов, каждый из найденных путей будем отображать новым цветом. Для этого можно использовать метод safe_color_name() объекта Faker, но уже без локализации на русский язык. В результате Python-скрипт будет выглядеть так:

    ##############################################ячейка №3 в Google Colab##############################################
    fake_color = Faker()
    # Определение класса
    class App:
    
        # Конструктор класса
        def __init__(self, uri, user, password):
            self.driver = GraphDatabase.driver(uri, auth=(user, password))
    
        # Метод класса для закрытия соединения с базой данных Neo4j
        def close(self):
            # Don't forget to close the driver connection when you are finished with it
            self.driver.close()
    
        # Метод для поиска всех путей между узлами графа
        def find_all_paths(self, from_node, to_node):
          with self.driver.session() as session:
            result = session.run("""
                MATCH (from:{from_label} {{ name: $from_name }}), (to:{to_label} {{ name: $to_name }}), 
                      path = (from)-[*]->(to) 
                RETURN distinct nodes(path) AS pathNodes, 
                    length(path) AS pathLength,  
                    reduce(sum = 0, r in relationships(path) | sum+r.summa) AS totalDistance
                ORDER BY pathLength
            """.format(from_label=from_node['label'], to_label=to_node['label']),
            from_name=from_node['name'], to_name=to_node['name']) # запрос к БД
    
            # добавляем эту строку, чтобы потреблять все результаты запроса
            result_list = list(result)
    
          paths = [] # список для хранения путей между узлами графа
          if result_list:
            for r in result_list:
                path = r['pathNodes']
                nodes = [p['name'] for p in path]
                distance = r['totalDistance']
    
                for i in range(len(nodes)-1):
                    source = nodes[i]
                    target = nodes[i+1]
                    paths.append({'Source': source, 'Target': target, 'Weight': distance}) # Изменение Distance на Weight
                
                df = pd.DataFrame(paths)
                df = df.drop_duplicates() # удаление дубликатов
    
                graph=nx.DiGraph()
                graph.add_weighted_edges_from(df[['Source', 'Target', 'Weight']].values) # добавление весов на каждое ребро графа
    
                pos = nx.spring_layout(graph, k=1000)
                edges = [(u, v, d['weight']) for (u, v, d) in graph.edges(data=True) if d['weight'] > 1] # Изменение списка ребер для добавления веса
                
                color=fake_color.safe_color_name()
    
                labels = nx.get_edge_attributes(graph, 'weight') # Получение атрибута weight для ребер
                nx.draw_networkx_nodes(graph, pos, node_size=500, node_color=color)
                nx.draw_networkx_labels(graph, pos)
                nx.draw_networkx_edges(graph, pos, edgelist=edges, edge_color=color, arrows=True)
                nx.draw_networkx_edge_labels(graph, pos, edge_labels=labels) # Добавление подписей к ребрам
    
                plt.show() # вывод графа с найденным путем
                print('Найдены все возможные транзакции между узлами: ', nodes[0],' и ', nodes[len(nodes)-1], 'на общую сумму ', distance) #вывод результата
                k=0;
                for node in nodes:
                  k=k+1;
                  print(k,') ', node)
            return
          else:
            print("Транзакций между этими точками не найдено")
            return pd.DataFrame()
    
    # Основная программа
    if __name__ == "__main__":
        # Aura запросы используют зашифрованное соединение с использованием схемы URI "neo4j+s"
        uri = "neo4j+s://ваш_адрес_в_Aura_db"
        user = "имя_вашего_пользователя_в_Aura_db"
        password = "пароль_вашего_пользователя_в_Aura_db"
        # Создание экземпляра класса, передача параметров для инициализации драйвера Neo4j
        app = App(uri, user, password)
        # Вызов метода класса для поиска путей между начальным и конечным узлом
        start_node='название_стартового_узла'
        finish_node='название_конечного_узла'
        from_node = {'name': start_node, 'label': 'Company'} #начальный узел
        to_node = {'name': finish_node, 'label': 'Person'} #конечный узел
        result = app.find_all_paths(from_node, to_node)
        print(result)
        # Закрытие соединения с базой данных Neo4j
        app.close()

    Результаты вывода в Google Colab:

    Google Colab Networkx, Python графы
    Визуализация графа в Google Colab с помощью Networkx

    Можно получить подобную визуализацию не только в области вывода Colab, а в интерактивном дэшборде NeoDash, о чем я рассказываю в этом материале. В этой статье я показываю, как решить аналогичную задачу анализа банковских транзакций путем применения графовых алгоритмов Networkx к pandas-датафреймам. А здесь вы узнаете, почему этот способ не эффективен для обработки большого объема данных.

    Освоить эти и другими инструментами работы с графами для практического использования в реальных проектах аналитики больших данных вам помогут специализированные курсы нашего лицензированного учебного центра обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

    [elementor-template id=»13619″]

    Источники

    1. https://github.com/AnnaVichugova/PythonApps/blob/main/Neo4j_Bank
    2. https://faker.readthedocs.io/en/master/providers/