Защита чувствительных данных в системе с Apache Kafka через криптографию

Python-продюсер Kafka, пример публикация данных в Apache Kafka, пример потребления данных из Apache Kafka, Python Produser Consumer API, шифрование данных для Kafka, публикация сообщений в Kafka и задержка потребления, как измерить задержку потребления в Apache Kafka, потребление из Kafka в реальном времени, обучение Apache Kafka, Apache Kafka для дата-инженеров и разработчиков, примеры сжатия сообщения Apache Kafka, курсы по Apache Kafka, Apache Kafka разработчик примеры курсы обучение, Школа Больших Данных Учебный центр Коммерсант

Простой пример шифрования полезной нагрузки с чувствительными данными на стороне продюсера и их расшифровка на потребителе Apache Kafka: пишем и запускаем Python-код в Google Colab.

Публикация данных в Kafka: шифрование на стороне продюсера

Apache Kafka часто используется для обмена данными между несколькими системами внутри предприятия. Однако, даже при работе во внутреннем контуре некоторые данные являются очень чувствительными к утечке. Например, персональные данные клиентов или сотрудников. Поэтому их следует защитить от несанкционированного доступа. Для разграничения прав к данным в Kafka применяются списки доступа (ACL, Access Control List). Этот метод информационной безопасности можно сочетать с простым, но довольно эффективным шифрованием полезной нагрузки, что мы и рассмотрим далее.

Предположим, в топик Kafka под названием test приложение-продюсер каждые 3 секунды отправляет следующую полезную нагрузку:

  • номер сообщения;
  • время генерации события в реальном мире;
  • данные об отправителе сообщения с названием компании или фамилией и именем клиента, сгенерированные с помощью библиотеки Faker.

Фамилия и имя частного лица являются персональными данными, поэтому именно их и будем шифровать. А названия компаний пусть публикуются в открытом виде. Для шифрования клиентских данных мой Python-код приложения-продюсера будет включать криптографическую библиотеку с пакетом Fernet, который гарантирует целостность зашифрованного сообщения: его нельзя прочитать без ключа. Fernet — это реализация симметричной криптографии с аутентификацией. Код приложения-продюсера, написанный на Python и запущенный в Google Colab, выглядит так:

####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
#установка библиотек
!pip install kafka-python
!pip install faker

#импорт модулей
import json
import random
from datetime import datetime
import time
from time import sleep
from kafka import KafkaProducer

# Импорт модуля faker
from faker import Faker
from cryptography.fernet import Fernet
import base64
import os

###################################ячейка в Google Colab №2 - публикация сообщений в Kafka###########################################
# Объявление ключа шифрования
key = base64.urlsafe_b64encode(os.urandom(32))
cipher_suite = Fernet(key)
print(f'Ключ шифрования: ', key)
with open("/content/drive/MyDrive/Colab Notebooks/key.txt", "w") as f:
    f.write(key.decode('utf-8'))

# объявление продюсера Kafka
producer = KafkaProducer(
  bootstrap_servers=[kafka_url],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username=username,
  sasl_plain_password=password,
  value_serializer=lambda v: json.dumps(v).encode('utf-8'),
)
topic='test'

fake=Faker()

number=0

#бесконечный цикл публикации данных
while True:
  start_time = time.time()  # запоминаем время начала отправки сообщения

  #подготовка данных для публикации в JSON-формате
  producer_publish_time = time.strftime("%m/%d/%Y %H:%M:%S", time.localtime(start_time))

  number=number+1

  #задаем случайный выбор для генерации клиентских данных (юрлицо без шифрования, физлицо с шифрованием)
  cr = random.choice([1,0])
  if cr==1 :
    content = str('сообщение от ' + fake.company())
  else:
    # Шифруем данные
    content = ("encrypted:"+ cipher_suite.encrypt(str('сообщение от ' + fake.name()).encode('utf-8')).decode('utf-8'))

  #создаем полезную нагрузку в JSON
  data = {'number': number, 'producer_publish_time': producer_publish_time,'content': content}

  # публикуем данные в Kafka 
  part=0
  future = producer.send(topic, value=data, partition=part, key=None)
  record_metadata = future.get(timeout=60)
  print(f' [x] Sent {record_metadata}')
  print(f' [x] Payload {data}')

  end_time = time.time()  # запоминаем время окончания отправки сообщения
  delay = end_time - start_time  # задержка в секундах
  print(f'Задержка: {delay} секунд')

  # повтор через 3 секунды
  time.sleep(3)
####################################ячейка в Google Colab №3 - закрытие соединения###########################################
#Закрываем соединения
producer.close()

Поскольку я запускаю скрипты продюсера и потребителя в отдельных блокнотах Colab, у них разное пространство для сохранения временных данных (директория /content). Поэтому ключ шифрования, сгенерированный с помощью инструкции

key = base64.urlsafe_b64encode(os.urandom(32))

надо где-то сохранить. Для этого примера сделаю запись ключа в текстовом файле key.txt в постоянном хранилище Google-диска, каталоге /content/drive/MyDrive/Colab Notebooks/. Оттуда этот секрет сможет считать приложение-потребитель, чтобы расшифровать зашифрованную полезную нагрузку, помеченную флагом ‘encrypted: ’. Как это будет сделано, посмотрим далее, а пока посмотрим область вывода запущенного продюсера:

Python-продюсер Kafka, пример публикация данных в Apache Kafka
Публикация данных в Apache Kafka

Расшифровка чувствительных данных в потребителе

Чтобы наглядно показать, какие данные были зашифрованы, а потом успешно расшифрованы, я снова будут использовать прием с Google-таблицами. Приложение-потребитель будет считывать из сообщения из топика Kafka, которая, как обычно, развернута у меня в облачной serverless-платформе Upstash, и записывать данные в Google-таблицу, немного обогащая их. В частности, будет добавлено текущее время считывания. Сам код приложения-потребителя выглядит так:

####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
#установка библиотек
!pip install kafka-python

#импорт модулей
from google.colab import auth
auth.authenticate_user()
import gspread
from google.auth import default
creds, _ = default()

import json
import random
from datetime import datetime

from kafka import KafkaConsumer
from json import loads
from kafka.structs import TopicPartition
####################################ячейка в Google Colab №2 - потребление из Kafka###########################################
# импортируем необходимые модули для расшифровки
from cryptography.fernet import Fernet
import base64

#объявление потребителя Kafka
consumer = KafkaConsumer(
  bootstrap_servers=[kafka_url],
  sasl_mechanism='SCRAM-SHA-256',
  security_protocol='SASL_SSL',
  sasl_plain_username=username,
  sasl_plain_password=password,
  group_id='gr5',
  auto_offset_reset='latest',
  enable_auto_commit=True
)
topic='test'

#Google Sheets Autentificate
gc = gspread.authorize(creds)

# подписка потребителя на определенный раздел topic partition
part=0 #задание раздела
topic_partition = TopicPartition(topic, part)   # указываем имя топика и номер раздела
consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел

#Открытие заранее созданного файла Гугл-таблицы по идентификатору (взять из его URL, например, у меня это https://docs.google.com/spreadsheets/d/1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM)
sh = gc.open_by_key('1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM')
wks = sh.worksheet("test_"+ str(part)) #в какой лист гугл-таблиц будем записывать

#начальный номер строки для записи данных
x=1

# функция для расшифровки значения content
def decrypt_content(content):
    # Объявление ключа шифрования (тот же ключ, что и у продюсера)
    with open("/content/drive/MyDrive/Colab Notebooks/key.txt", "r") as f:
        key = f.read()
        print(f'Ключ шифрования: ', key)

    cipher_suite = Fernet(key)
    decrypted_content = cipher_suite.decrypt(content.encode('utf-8')).decode('utf-8')
    print(f'РАСШИФРОВАНО: ', {decrypted_content})
    return decrypted_content

# обработка сообщений из Kafka
for message in consumer:
    try:
        # распаковка сообщения
        payload = message.value.decode("utf-8")
        data = json.loads(payload)

        # вывод сообщения в консоль
        print(f"Offset: {message.offset}, Value: {message.value}")
        print(consumer.position(topic_partition))
        print(f"Timestamp: {message.timestamp}, Value: {message.value}")
        timestamp = message.timestamp / 1000.0
        datetime_object = datetime.fromtimestamp(timestamp)
        formatted_timestamp = datetime_object.strftime('%Y-%m-%d %H:%M:%S.%f')
        print(f"Timestamp: {formatted_timestamp}, Value: {message.value}")

        # парсинг сообщения
        number = data['number']
        producer_publish_time = data['producer_publish_time']
        content = data['content']

        now = datetime.now()
        consuming_time = now.strftime("%m/%d/%Y %H:%M:%S")

        # вывод распарсенных данных в консоль
        print(f'Сообщение № {number}, время публикации: {producer_publish_time}, Kafka timestamp {formatted_timestamp}, сейчас: {consuming_time}, содержимое: {content}')
        
        # обновление данных в Google Sheets
        x += 1
        wks.update_cell(x, 1, number)
        wks.update_cell(x, 2, producer_publish_time)
        wks.update_cell(x, 3, formatted_timestamp)
        wks.update_cell(x, 4, consuming_time)
        wks.update_cell(x, 5, content)

        # проверка на зашифрованное содержимое
        if content.startswith('encrypted:'):
            print(f'ЗАШИФРОВАНО: ', {content})
            content = content.replace('encrypted:', '')
            print(f'ПРЕОБРАЗОВАНО: ', {content})
            decrypted_content = decrypt_content(content)
            wks.update_cell(x, 6, decrypted_content)
        else:
            decrypted_content = content
            print(f'ИСХОДНОЕ НЕЗАШИФРОВАНО: ', {decrypted_content})    
    except Exception as e:
        # запись ошибок в лог-файл на Google Диске
        error_str = f"Error: {str(e)}, Offset: {message.offset}, Value: {message.value}\n"
        with open("dlq.txt", "a") as f:
            f.write(error_str)
        print(f"Error: {str(e)}")
###################################ячейка в Google Colab №3 - закрытие соединения###########################################
#отписываем потребителя и закрываем соединение
consumer.unsubscribe()
consumer.close()

При запуске приложения в Google Colab отладочная информация выводится в области вывода:

Потребление из Kafka
Потребление из Kafka

Окончательные результаты сохраняются в Google-таблице:

Python Kafka consumer Google Colab
Вывод данных, считанных из Kafka, в Google-таблице

Таким образом, довольно простой, но эффективный прием с шифрованием позволил защитить данные, опубликованные в топике Kafka, и использовать их в потребителе. Разумеется, общая задержка обработки данных в системе продюсер-Kafka-потребитель выросло за счет криптографических операций. В частности, в сравнении с предыдущим экспериментом без шифрования задержка на стороне продюсера была 0.02 секунды против 0,06 секунд с шифрованием. Почему представленный подход с шифрованием/расшифровкой на стороне продюсера неудобен в крупных EDA-экосистемах с Kafka и как обеспечить защиту конфиденциальных данных, улучшив управляемость, читайте в новой статье про Java-приложение Conduktor Gateway. А здесь вы узнаете про сквозное шифрования на уровне полей для Apache Kafka Connect с open-source библиотекой Kryptonite.

Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту