Содержание
Простой пример шифрования полезной нагрузки с чувствительными данными на стороне продюсера и их расшифровка на потребителе Apache Kafka: пишем и запускаем Python-код в Google Colab.
Публикация данных в Kafka: шифрование на стороне продюсера
Apache Kafka часто используется для обмена данными между несколькими системами внутри предприятия. Однако, даже при работе во внутреннем контуре некоторые данные являются очень чувствительными к утечке. Например, персональные данные клиентов или сотрудников. Поэтому их следует защитить от несанкционированного доступа. Для разграничения прав к данным в Kafka применяются списки доступа (ACL, Access Control List). Этот метод информационной безопасности можно сочетать с простым, но довольно эффективным шифрованием полезной нагрузки, что мы и рассмотрим далее.
Предположим, в топик Kafka под названием test приложение-продюсер каждые 3 секунды отправляет следующую полезную нагрузку:
- номер сообщения;
- время генерации события в реальном мире;
- данные об отправителе сообщения с названием компании или фамилией и именем клиента, сгенерированные с помощью библиотеки Faker.
Фамилия и имя частного лица являются персональными данными, поэтому именно их и будем шифровать. А названия компаний пусть публикуются в открытом виде. Для шифрования клиентских данных мой Python-код приложения-продюсера будет включать криптографическую библиотеку с пакетом Fernet, который гарантирует целостность зашифрованного сообщения: его нельзя прочитать без ключа. Fernet — это реализация симметричной криптографии с аутентификацией. Код приложения-продюсера, написанный на Python и запущенный в Google Colab, выглядит так:
####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
#установка библиотек
!pip install kafka-python
!pip install faker
#импорт модулей
import json
import random
from datetime import datetime
import time
from time import sleep
from kafka import KafkaProducer
# Импорт модуля faker
from faker import Faker
from cryptography.fernet import Fernet
import base64
import os
###################################ячейка в Google Colab №2 - публикация сообщений в Kafka###########################################
# Объявление ключа шифрования
key = base64.urlsafe_b64encode(os.urandom(32))
cipher_suite = Fernet(key)
print(f'Ключ шифрования: ', key)
with open("/content/drive/MyDrive/Colab Notebooks/key.txt", "w") as f:
f.write(key.decode('utf-8'))
# объявление продюсера Kafka
producer = KafkaProducer(
bootstrap_servers=[kafka_url],
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
sasl_plain_username=username,
sasl_plain_password=password,
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
)
topic='test'
fake=Faker()
number=0
#бесконечный цикл публикации данных
while True:
start_time = time.time() # запоминаем время начала отправки сообщения
#подготовка данных для публикации в JSON-формате
producer_publish_time = time.strftime("%m/%d/%Y %H:%M:%S", time.localtime(start_time))
number=number+1
#задаем случайный выбор для генерации клиентских данных (юрлицо без шифрования, физлицо с шифрованием)
cr = random.choice([1,0])
if cr==1 :
content = str('сообщение от ' + fake.company())
else:
# Шифруем данные
content = ("encrypted:"+ cipher_suite.encrypt(str('сообщение от ' + fake.name()).encode('utf-8')).decode('utf-8'))
#создаем полезную нагрузку в JSON
data = {'number': number, 'producer_publish_time': producer_publish_time,'content': content}
# публикуем данные в Kafka
part=0
future = producer.send(topic, value=data, partition=part, key=None)
record_metadata = future.get(timeout=60)
print(f' [x] Sent {record_metadata}')
print(f' [x] Payload {data}')
end_time = time.time() # запоминаем время окончания отправки сообщения
delay = end_time - start_time # задержка в секундах
print(f'Задержка: {delay} секунд')
# повтор через 3 секунды
time.sleep(3)
####################################ячейка в Google Colab №3 - закрытие соединения###########################################
#Закрываем соединения
producer.close()
Поскольку я запускаю скрипты продюсера и потребителя в отдельных блокнотах Colab, у них разное пространство для сохранения временных данных (директория /content). Поэтому ключ шифрования, сгенерированный с помощью инструкции
key = base64.urlsafe_b64encode(os.urandom(32))
надо где-то сохранить. Для этого примера сделаю запись ключа в текстовом файле key.txt в постоянном хранилище Google-диска, каталоге /content/drive/MyDrive/Colab Notebooks/. Оттуда этот секрет сможет считать приложение-потребитель, чтобы расшифровать зашифрованную полезную нагрузку, помеченную флагом ‘encrypted: ’. Как это будет сделано, посмотрим далее, а пока посмотрим область вывода запущенного продюсера:

Расшифровка чувствительных данных в потребителе
Чтобы наглядно показать, какие данные были зашифрованы, а потом успешно расшифрованы, я снова будут использовать прием с Google-таблицами. Приложение-потребитель будет считывать из сообщения из топика Kafka, которая, как обычно, развернута у меня в облачной serverless-платформе Upstash, и записывать данные в Google-таблицу, немного обогащая их. В частности, будет добавлено текущее время считывания. Сам код приложения-потребителя выглядит так:
####################################ячейка в Google Colab №1 - установка и импорт библиотек###########################################
#установка библиотек
!pip install kafka-python
#импорт модулей
from google.colab import auth
auth.authenticate_user()
import gspread
from google.auth import default
creds, _ = default()
import json
import random
from datetime import datetime
from kafka import KafkaConsumer
from json import loads
from kafka.structs import TopicPartition
####################################ячейка в Google Colab №2 - потребление из Kafka###########################################
# импортируем необходимые модули для расшифровки
from cryptography.fernet import Fernet
import base64
#объявление потребителя Kafka
consumer = KafkaConsumer(
bootstrap_servers=[kafka_url],
sasl_mechanism='SCRAM-SHA-256',
security_protocol='SASL_SSL',
sasl_plain_username=username,
sasl_plain_password=password,
group_id='gr5',
auto_offset_reset='latest',
enable_auto_commit=True
)
topic='test'
#Google Sheets Autentificate
gc = gspread.authorize(creds)
# подписка потребителя на определенный раздел topic partition
part=0 #задание раздела
topic_partition = TopicPartition(topic, part) # указываем имя топика и номер раздела
consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел
#Открытие заранее созданного файла Гугл-таблицы по идентификатору (взять из его URL, например, у меня это https://docs.google.com/spreadsheets/d/1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM)
sh = gc.open_by_key('1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM')
wks = sh.worksheet("test_"+ str(part)) #в какой лист гугл-таблиц будем записывать
#начальный номер строки для записи данных
x=1
# функция для расшифровки значения content
def decrypt_content(content):
# Объявление ключа шифрования (тот же ключ, что и у продюсера)
with open("/content/drive/MyDrive/Colab Notebooks/key.txt", "r") as f:
key = f.read()
print(f'Ключ шифрования: ', key)
cipher_suite = Fernet(key)
decrypted_content = cipher_suite.decrypt(content.encode('utf-8')).decode('utf-8')
print(f'РАСШИФРОВАНО: ', {decrypted_content})
return decrypted_content
# обработка сообщений из Kafka
for message in consumer:
try:
# распаковка сообщения
payload = message.value.decode("utf-8")
data = json.loads(payload)
# вывод сообщения в консоль
print(f"Offset: {message.offset}, Value: {message.value}")
print(consumer.position(topic_partition))
print(f"Timestamp: {message.timestamp}, Value: {message.value}")
timestamp = message.timestamp / 1000.0
datetime_object = datetime.fromtimestamp(timestamp)
formatted_timestamp = datetime_object.strftime('%Y-%m-%d %H:%M:%S.%f')
print(f"Timestamp: {formatted_timestamp}, Value: {message.value}")
# парсинг сообщения
number = data['number']
producer_publish_time = data['producer_publish_time']
content = data['content']
now = datetime.now()
consuming_time = now.strftime("%m/%d/%Y %H:%M:%S")
# вывод распарсенных данных в консоль
print(f'Сообщение № {number}, время публикации: {producer_publish_time}, Kafka timestamp {formatted_timestamp}, сейчас: {consuming_time}, содержимое: {content}')
# обновление данных в Google Sheets
x += 1
wks.update_cell(x, 1, number)
wks.update_cell(x, 2, producer_publish_time)
wks.update_cell(x, 3, formatted_timestamp)
wks.update_cell(x, 4, consuming_time)
wks.update_cell(x, 5, content)
# проверка на зашифрованное содержимое
if content.startswith('encrypted:'):
print(f'ЗАШИФРОВАНО: ', {content})
content = content.replace('encrypted:', '')
print(f'ПРЕОБРАЗОВАНО: ', {content})
decrypted_content = decrypt_content(content)
wks.update_cell(x, 6, decrypted_content)
else:
decrypted_content = content
print(f'ИСХОДНОЕ НЕЗАШИФРОВАНО: ', {decrypted_content})
except Exception as e:
# запись ошибок в лог-файл на Google Диске
error_str = f"Error: {str(e)}, Offset: {message.offset}, Value: {message.value}\n"
with open("dlq.txt", "a") as f:
f.write(error_str)
print(f"Error: {str(e)}")
###################################ячейка в Google Colab №3 - закрытие соединения###########################################
#отписываем потребителя и закрываем соединение
consumer.unsubscribe()
consumer.close()
При запуске приложения в Google Colab отладочная информация выводится в области вывода:

Окончательные результаты сохраняются в Google-таблице:

Таким образом, довольно простой, но эффективный прием с шифрованием позволил защитить данные, опубликованные в топике Kafka, и использовать их в потребителе. Разумеется, общая задержка обработки данных в системе продюсер-Kafka-потребитель выросло за счет криптографических операций. В частности, в сравнении с предыдущим экспериментом без шифрования задержка на стороне продюсера была 0.02 секунды против 0,06 секунд с шифрованием. Почему представленный подход с шифрованием/расшифровкой на стороне продюсера неудобен в крупных EDA-экосистемах с Kafka и как обеспечить защиту конфиденциальных данных, улучшив управляемость, читайте в новой статье про Java-приложение Conduktor Gateway. А здесь вы узнаете про сквозное шифрования на уровне полей для Apache Kafka Connect с open-source библиотекой Kryptonite.
Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Apache Kafka для инженеров данных
- Администрирование кластера Kafka
- Администрирование Arenadata Streaming Kafka
[elementor-template id=»13619″]


