Экономия места в Apache Kafka с форматом Parquet

Kafka курсы примеры обучение, обучение большим данным, Kafka форматы данных примеры курсы обучение, Apache Kafka Parquet для дата-инженеров примеры курсы обучение, Школа Больших Данных Учебный Центр Коммерсант

Недавно мы сравнивали разные форматы сериализации данных, поддерживаемые Apache Kafka. Однако, AVRO и JSON не могут похвастаться таким высоким коэффициентом сжатия, как колоночный бинарный формат Parquet. Читайте далее, как хранить больше потоковых данных на тех же ресурсах с помощью движка Deephaven и других open-source решений.

Apache Kafka и Parquet

Apache Kafka отлично справляется с высокими нагрузками, надежно обеспечивая прием и маршрутизацию данных между приложениями-продюсерами и потребителями в реальном времени. Однако, потоки данных из нескольких сотен тысяч сообщений в секунду могут очень быстро заполнить свободное место на жестких дисках даже очень объемного сервера. Поэтому вместо легко читаемого людьми JSON-формата имеет смысл хранить данные в более экономном виде, чтобы использовать аппаратные ресурсы более экономно. Например, выбрать открытый бинарный формат Apache Parquet, который часто применяется в экосистеме Hadoop для хранения больших данных. Parquet хранит данные по столбцам, обеспечивая эффективное сжатие данных и схемы кодирования с повышенной производительностью для обработки больших объемов сложных данных. Помимо самих данных (полезная нагрузка), бинарный Parquet-файл также содержит метаданные. Метаданные столбца хранятся в конце файла, что обеспечивает быструю запись за один проход. Parquet оптимизирован для парадигмы записи после многократного чтения (WORM) и отлично подходит для аналитических сценариев (OLAP-систем).

Колоночная организация хранения данных обеспечивает лучшее сжатие, поскольку данные более однородны, а также дает возможность разделения файлов. Parquet также поддерживает эволюцию схемы данных, но не читается человеком, а также не подходит для операций обновления. Parquet поддерживает следующие кодеки сжатия:

  • LZ4, основанный на алгоритме сжатия LZ4, но с дополнительной недокументированной схемой кадрирования — частью оригинальной библиотеки сжатия Hadoop, что изначально копировалось в parquet-mr, а затем эмулировалось со смешанными результатами в parquet-cpp.
  • LZO, основанный на библиотеке сжатия LZO и совместимый с ней;
  • GZIP, основанный на формате GZIP, определенной спецификацией RFC 1952;
  • Snappy, применяемый по умолчанию;
  • ZSTD — кодек с наивысшей степенью сжатия, основанный на стандартном формате Z, определенном в спецификации RFC 8478.

Чтобы сжать потоковые файлы для Kafka в Parquet-формате, можно воспользоваться методом writeTable() одноименной библиотеки движка Deephaven:

from deephaven.ParquetTools import writeTable
writeTable(table, "/data/FILE.parquet", "ZSTD")

Что такое Deephaven и чем он полезен в потоковой передаче событий с Apache Kafka, мы рассмотрим далее.

Что такое Deephaven и зачем это дата-инженеру

Deephaven — это механизм запросов, который отлично справляется с масштабной обработкой данных в реальном времени. Движок находится над источниками данных и используется для выполнения запросов и получения результатов из них, которые затем могут использоваться пользователями, приложениями или создаваться в виде материализованного представления. Deephaven также представляет собой набор технологий вокруг этого движка: интеграции, коннекторы, веб-интерфейсы и API, которые обеспечивают готовую структуру, позволяя дата-инженерам и аналитикам работать с данными.

Deephaven пригодится в следующих сценариях:

  • аналитическая обработка данных в режиме реального времени;
  • временные ряды и реляционные операции для обеспечения максимальной производительности на больших статических объемах и плотных периодических данных;
  • компиляция UDF-функций на Python, Java и C++ вместе с табличными операциями.

Такие варианты использования обычно возникают в системах Интернета вещей, финтехе, блокчейне, криптовалюте, играх, e-commerce, промышленной телеметрии, здравоохранении и госуслугах. Например, в потоковой передаче событий Deephaven можете подключиться к серии потоков Kafka, облегчая обогащение данных и выполнение вычислений, а также публикацию производных данных и их обновление далее по конвейеру.

Под капотом Deephaven модель обновления на основе графа, позволяющая эффективно выполнять инкрементный расчет результатов, а также единая абстракция для потоковых и пакетных данных. Высокопроизводительный движок Java тесно связан с родными CPython, NumPy и SciPy через мост JPy, а архитектура, ориентированная на массивы, которая позволяет выполнять векторизованные операции и быструю передачу данных. Гибкая архитектура выполняет пользовательский код в процессе вычислений без отправки данных клиенту. Для клиентов и других процессов ядра есть API на основе gRPC для расширения графа обновлений по сети, а расширение Arrow-Flight отлично подходит для датафреймов.

Веб-компоненты JS обеспечивают прокрутку и управление таблицами с миллиардами записей, включая фильтрацию, сортировку, добавление столбцов, изменение входных таблиц и агрегацию данных. В ближайшем будещем будет добавлено ядро Jupyter, чтобы использовать возможности Deephaven на стороне сервера, а также виджеты таблиц и графиков для взаимодействия с массивами и stateful-результатами в реальном времени. Также анонсирована интеграция с Python-библиотеками PyTorch и Tensor Flow для поддержки высокопроизводительного машинного обучения на данных в реальном времени без необходимости создания моментальных снимков данных.

Для работы с Apache Kafka движок Deephaven имеет предустановленные библиотеки, которые легко импортировать в своей проект. Например, следующий участок кода показывает, как можно потреблять данные из топика Kafka в виде таблицы Deephaven:

from deephaven import kafka_consumer as ck
from deephaven.stream.kafka.consumer import TableType, KeyValueSpec
import deephaven.dtypes as dht
result = ck.consume({'bootstrap.servers': 'redpanda:29092'}, 'test.topic')

Аналогичным образом можно записать любую таблицу и в поток Kafka с Deephaven:

from deephaven import time_table
from deephaven import kafka_producer as pk
from deephaven.stream.kafka.producer import KeyValueSpec
source = time_table('00:00:00.1').update(formulas = ["X = i"])
write_topic = pk.produce(source, {'bootstrap.servers': 'redpanda:29092'},'time-topic', pk.simple_spec('X'), KeyValueSpec.IGNORE)

Еще одно решение для использования Parquet-формата с  Apache Kafka

В заключение отметим, что Deephaven  — далеко не единственное решение, которое позволяет использовать формат Parquet в топиках Kafka. Также можно использовать open-source проект kafka-parquet-writer. Он читает записи из топика Kafka и записывает их в виде Parquet-файлов в локальной файловой системе или HDFS. Инструмент может записывать записи в несколько потоков. Поскольку запись в один Parquet-файл не может выполняться одновременно, каждый поток пишет в отдельный файл. Для чтения записей из Kafka используется Smart Commit Kafka Consumer — оболочку потребителя Kafka, которая реализует функцию умной фиксации. Клиент не фиксирует смещения вручную и не использует функцию автоматической фиксации по умолчанию, а подтверждает записи, в которых их процесс завершен. Затем смещения будут зафиксированы автоматически таким образом, чтобы гарантировать хотя бы однократную доставку. Таким образом, он отслеживает все смещения до тех пор, пока не получит их подтверждение. Но это скрыто от клиента: на самом деле смещения каждого раздела отслеживаются на нескольких страницах, каждая из которых отвечает за определенный диапазон смещений. Когда все смещения некоторых последовательных страниц подтверждены, последнее смещение последней полностью подтвержденной страницы будет подтверждено автоматически. Подробнее про механизмы работы со смещениями в Apache Kafka читайте в нашей новой статье.

При работе kafka-parquet-writer гарантируется доставка сообщений at least once и потребитель будет уведомлен о подтверждении записи только в том случае, если она записана в Parquet-файл и успешно сброшена на диск. Каждый поток создает Parquet-файлы, когда в выходном файле выполняются определенные критерии, например, размер файла достиг порогового значения или файл был открыт в течение определенного времени.

Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/@deephavendatalabs/kafka-parquet-maximize-speed-minimize-storage-7e6337d6b0a1
  2. https://www.adaltas.com/en/2020/07/23/benchmark-study-of-different-file-format/
  3. https://deephaven.io/core/docs/conceptual/deephaven-overview/
  4. https://github.com/sahabpardaz/kafka-parquet-writer
Поиск по сайту