Простой пример шифрования полезной нагрузки с чувствительными данными на стороне продюсера и их расшифровка на потребителе Apache Kafka: пишем и запускаем Python-код в Google Colab.
Публикация данных в Kafka: шифрование на стороне продюсера
Apache Kafka часто используется для обмена данными между несколькими системами внутри предприятия. Однако, даже при работе во внутреннем контуре некоторые данные являются очень чувствительными к утечке. Например, персональные данные клиентов или сотрудников. Поэтому их следует защитить от несанкционированного доступа. Для разграничения прав к данным в Kafka применяются списки доступа (ACL, Access Control List). Этот метод информационной безопасности можно сочетать с простым, но довольно эффективным шифрованием полезной нагрузки, что мы и рассмотрим далее.
Предположим, в топик Kafka под названием test приложение-продюсер каждые 3 секунды отправляет следующую полезную нагрузку:
- номер сообщения;
- время генерации события в реальном мире;
- данные об отправителе сообщения с названием компании или фамилией и именем клиента, сгенерированные с помощью библиотеки Faker.
Фамилия и имя частного лица являются персональными данными, поэтому именно их и будем шифровать. А названия компаний пусть публикуются в открытом виде. Для шифрования клиентских данных мой Python-код приложения-продюсера будет включать криптографическую библиотеку с пакетом Fernet, который гарантирует целостность зашифрованного сообщения: его нельзя прочитать без ключа. Fernet — это реализация симметричной криптографии с аутентификацией. Код приложения-продюсера, написанный на Python и запущенный в Google Colab, выглядит так:
####################################ячейка в Google Colab №1 - установка и импорт библиотек########################################### #установка библиотек !pip install kafka-python !pip install faker #импорт модулей import json import random from datetime import datetime import time from time import sleep from kafka import KafkaProducer # Импорт модуля faker from faker import Faker from cryptography.fernet import Fernet import base64 import os ###################################ячейка в Google Colab №2 - публикация сообщений в Kafka########################################### # Объявление ключа шифрования key = base64.urlsafe_b64encode(os.urandom(32)) cipher_suite = Fernet(key) print(f'Ключ шифрования: ', key) with open("/content/drive/MyDrive/Colab Notebooks/key.txt", "w") as f: f.write(key.decode('utf-8')) # объявление продюсера Kafka producer = KafkaProducer( bootstrap_servers=[kafka_url], sasl_mechanism='SCRAM-SHA-256', security_protocol='SASL_SSL', sasl_plain_username=username, sasl_plain_password=password, value_serializer=lambda v: json.dumps(v).encode('utf-8'), ) topic='test' fake=Faker() number=0 #бесконечный цикл публикации данных while True: start_time = time.time() # запоминаем время начала отправки сообщения #подготовка данных для публикации в JSON-формате producer_publish_time = time.strftime("%m/%d/%Y %H:%M:%S", time.localtime(start_time)) number=number+1 #задаем случайный выбор для генерации клиентских данных (юрлицо без шифрования, физлицо с шифрованием) cr = random.choice([1,0]) if cr==1 : content = str('сообщение от ' + fake.company()) else: # Шифруем данные content = ("encrypted:"+ cipher_suite.encrypt(str('сообщение от ' + fake.name()).encode('utf-8')).decode('utf-8')) #создаем полезную нагрузку в JSON data = {'number': number, 'producer_publish_time': producer_publish_time,'content': content} # публикуем данные в Kafka part=0 future = producer.send(topic, value=data, partition=part, key=None) record_metadata = future.get(timeout=60) print(f' [x] Sent {record_metadata}') print(f' [x] Payload {data}') end_time = time.time() # запоминаем время окончания отправки сообщения delay = end_time - start_time # задержка в секундах print(f'Задержка: {delay} секунд') # повтор через 3 секунды time.sleep(3) ####################################ячейка в Google Colab №3 - закрытие соединения########################################### #Закрываем соединения producer.close()
Поскольку я запускаю скрипты продюсера и потребителя в отдельных блокнотах Colab, у них разное пространство для сохранения временных данных (директория /content). Поэтому ключ шифрования, сгенерированный с помощью инструкции
key = base64.urlsafe_b64encode(os.urandom(32))
надо где-то сохранить. Для этого примера сделаю запись ключа в текстовом файле key.txt в постоянном хранилище Google-диска, каталоге /content/drive/MyDrive/Colab Notebooks/. Оттуда этот секрет сможет считать приложение-потребитель, чтобы расшифровать зашифрованную полезную нагрузку, помеченную флагом ‘encrypted: ’. Как это будет сделано, посмотрим далее, а пока посмотрим область вывода запущенного продюсера:
Расшифровка чувствительных данных в потребителе
Чтобы наглядно показать, какие данные были зашифрованы, а потом успешно расшифрованы, я снова будут использовать прием с Google-таблицами. Приложение-потребитель будет считывать из сообщения из топика Kafka, которая, как обычно, развернута у меня в облачной serverless-платформе Upstash, и записывать данные в Google-таблицу, немного обогащая их. В частности, будет добавлено текущее время считывания. Сам код приложения-потребителя выглядит так:
####################################ячейка в Google Colab №1 - установка и импорт библиотек########################################### #установка библиотек !pip install kafka-python #импорт модулей from google.colab import auth auth.authenticate_user() import gspread from google.auth import default creds, _ = default() import json import random from datetime import datetime from kafka import KafkaConsumer from json import loads from kafka.structs import TopicPartition ####################################ячейка в Google Colab №2 - потребление из Kafka########################################### # импортируем необходимые модули для расшифровки from cryptography.fernet import Fernet import base64 #объявление потребителя Kafka consumer = KafkaConsumer( bootstrap_servers=[kafka_url], sasl_mechanism='SCRAM-SHA-256', security_protocol='SASL_SSL', sasl_plain_username=username, sasl_plain_password=password, group_id='gr5', auto_offset_reset='latest', enable_auto_commit=True ) topic='test' #Google Sheets Autentificate gc = gspread.authorize(creds) # подписка потребителя на определенный раздел topic partition part=0 #задание раздела topic_partition = TopicPartition(topic, part) # указываем имя топика и номер раздела consumer.assign([topic_partition]) #назначаем потребителя на этот топик и раздел #Открытие заранее созданного файла Гугл-таблицы по идентификатору (взять из его URL, например, у меня это https://docs.google.com/spreadsheets/d/1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM) sh = gc.open_by_key('1ZQuotMVhaOuOtnZ56mvd1zX-5JOhsXc1WTG6GTBjzzM') wks = sh.worksheet("test_"+ str(part)) #в какой лист гугл-таблиц будем записывать #начальный номер строки для записи данных x=1 # функция для расшифровки значения content def decrypt_content(content): # Объявление ключа шифрования (тот же ключ, что и у продюсера) with open("/content/drive/MyDrive/Colab Notebooks/key.txt", "r") as f: key = f.read() print(f'Ключ шифрования: ', key) cipher_suite = Fernet(key) decrypted_content = cipher_suite.decrypt(content.encode('utf-8')).decode('utf-8') print(f'РАСШИФРОВАНО: ', {decrypted_content}) return decrypted_content # обработка сообщений из Kafka for message in consumer: try: # распаковка сообщения payload = message.value.decode("utf-8") data = json.loads(payload) # вывод сообщения в консоль print(f"Offset: {message.offset}, Value: {message.value}") print(consumer.position(topic_partition)) print(f"Timestamp: {message.timestamp}, Value: {message.value}") timestamp = message.timestamp / 1000.0 datetime_object = datetime.fromtimestamp(timestamp) formatted_timestamp = datetime_object.strftime('%Y-%m-%d %H:%M:%S.%f') print(f"Timestamp: {formatted_timestamp}, Value: {message.value}") # парсинг сообщения number = data['number'] producer_publish_time = data['producer_publish_time'] content = data['content'] now = datetime.now() consuming_time = now.strftime("%m/%d/%Y %H:%M:%S") # вывод распарсенных данных в консоль print(f'Сообщение № {number}, время публикации: {producer_publish_time}, Kafka timestamp {formatted_timestamp}, сейчас: {consuming_time}, содержимое: {content}') # обновление данных в Google Sheets x += 1 wks.update_cell(x, 1, number) wks.update_cell(x, 2, producer_publish_time) wks.update_cell(x, 3, formatted_timestamp) wks.update_cell(x, 4, consuming_time) wks.update_cell(x, 5, content) # проверка на зашифрованное содержимое if content.startswith('encrypted:'): print(f'ЗАШИФРОВАНО: ', {content}) content = content.replace('encrypted:', '') print(f'ПРЕОБРАЗОВАНО: ', {content}) decrypted_content = decrypt_content(content) wks.update_cell(x, 6, decrypted_content) else: decrypted_content = content print(f'ИСХОДНОЕ НЕЗАШИФРОВАНО: ', {decrypted_content}) except Exception as e: # запись ошибок в лог-файл на Google Диске error_str = f"Error: {str(e)}, Offset: {message.offset}, Value: {message.value}\n" with open("dlq.txt", "a") as f: f.write(error_str) print(f"Error: {str(e)}") ###################################ячейка в Google Colab №3 - закрытие соединения########################################### #отписываем потребителя и закрываем соединение consumer.unsubscribe() consumer.close()
При запуске приложения в Google Colab отладочная информация выводится в области вывода:
Окончательные результаты сохраняются в Google-таблице:
Таким образом, довольно простой, но эффективный прием с шифрованием позволил защитить данные, опубликованные в топике Kafka, и использовать их в потребителе. Разумеется, общая задержка обработки данных в системе продюсер-Kafka-потребитель выросло за счет криптографических операций. В частности, в сравнении с предыдущим экспериментом без шифрования задержка на стороне продюсера была 0.02 секунды против 0,06 секунд с шифрованием. Почему представленный подход с шифрованием/расшифровкой на стороне продюсера неудобен в крупных EDA-экосистемах с Kafka и как обеспечить защиту конфиденциальных данных, улучшив управляемость, читайте в новой статье про Java-приложение Conduktor Gateway. А здесь вы узнаете про сквозное шифрования на уровне полей для Apache Kafka Connect с open-source библиотекой Kryptonite.
Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Apache Kafka для инженеров данных
- Администрирование кластера Kafka
- Администрирование Arenadata Streaming Kafka