Почему не рекомендуется публиковать в Kafka сообщения больших размеров, и как это сделать, если очень нужно: когда приходится перенастраивать конфигурации продюсера, топика и потребителя, и какие это параметры.
Почему не нужно публиковать в Kafka сообщения больших размеров
Apache Kafka, как и другие брокеры сообщений, оптимизирована для передачи данных небольшого размера. Обычно это текстовые форматы (JSON, AVRO, protobuf, XML и пр.), а не мультимедиа. Это ограничение обусловлено самим назначением брокера сообщений, который должен обеспечить асинхронную интеграцию приложений в почти реальном времени. Обработка больших файлов существенно увеличивает нагрузку на сеть и на сам брокер, снижая общую производительность и масштабируемость потоковой системы. Передача больших объемов данных приводит к росту задержек, нарушая SLA по быстродействию (времени отклика).
Кроме того, Kafka хранит опубликованные сообщения на диске или в облаке благодаря KIP-405 (многоуровневое хранилище, что мы разбирали здесь и здесь). Оба варианта требуют значительных ресурсов памяти и дискового пространства, что может быть дорого. Наконец, передача больших файлов усложняет обеспечение надежности и соблюдение гарантий доставки сообщений из-за риска сетевых сбоев.
Поэтому максимальный размер сообщений в Apache Kafka по умолчанию равен 1 МБ. Превышать этот предел настоятельно не рекомендуется из-за следующих последствий:
- фрагментация кучи JMV на стороне брокера. Будучи Java-приложением, которое выполняется в виртуальной машине, Kafka использует куча JVM для хранения объектов во время выполнения. Фрагментация кучи затрудняет эффективное использование памяти. В результате могут возникать ситуации, когда даже при достаточном общем объеме свободной памяти JVM не может выделить непрерывный блок для нового большого сообщения. Это приводит к увеличению частоты сборки мусора (garbage collection), что является очень ресурсоемкой процедурой, которая сильно снижает производительность и увеличивает задержку обработки данных.
- нехватка страничного кэша операционной системы – области памяти, которая хранит недавно использованные данные с диска, чтобы ускорить доступ к ним. Kafka использует страничный кэш для быстрой записи сообщений на диск. Большие сообщения занимают больше места в страничном кэше. Если размер сообщений превышает оптимальный, кэш может быстро заполняться, и операционная система будет вынуждена чаще сбрасывать данные на диск или даже удалять полезные данные из кэша. Это увеличивает задержки записи и чтения, может привести к увеличению времени отклика при публикации сообщений и снижает общую пропускную способность системы.
- ограниченные размеры буфера на стороне клиентов, т.е. продюсера и потребителя. Клиенты Kafka используют буферы для временного хранения сообщений перед их публикацией или после потребления. Эти буферы имеют ограниченный размер для предотвращения чрезмерного потребления памяти. При отправке или получении больших сообщений буферы быстрее заполняются. Для продюсеров это чревато блокировками или задержками при публикации данных. Для потребителей это опасно увеличением времени обработки сообщений и даже может привести к сбоям, если буферы переполняются и не успевают освобождаться.
Несмотря на эти последствия, иногда приходится нарушать рекомендации по размеру публикуемого в Kafka сообщения. В каких случаях это происходит и как это сделать, рассмотрим далее.
И все-таки как это сделать, если очень нужно
На практике ситуации, когда надо публиковать в Kafka текстовые данные больших размеров, встречаются довольно часто. Например, когда сообщения содержат сложные или обширные структуры данных, такие как большие JSON- или XML-документы, что встречается в интеграциях между разными корпоративными системами при передаче детализированных отчетов или файлов конфигураций. Модели машинного обучения или наборы данных для их обучения тоже могут превышать 1 МБ. Если данные агрегируются в большие пакеты, это также приводит к увеличению размера сообщения. Устройства Интернета вещей (IoT, Internet of Things) и телеметрии могут генерировать большие объемы данных за одно сообщение, особенно если собирается детализированная информация с множества сенсоров одновременно. Наконец, подробные логи и отчеты о состоянии системы с высокой интенсивностью логирования, а также передача архивных данных тоже приводят к росту размера сообщений.
Поэтому в случаях, когда абсолютно необходимо увеличить рекомендуемый максимальный размер сообщения (1 МБ), надо менять параметры конфигурации продюсера, потребителя и топика:
- message.bytes — максимальный размер сообщения на уровне топика;
- request.size — максимальный размер сообщения на уровне продюсера;
- batch.size – размер пакета сообщений при публикации в Kafka, настройка на уровне продюсера;
- buffer.memory — общий объем памяти в байтах, который продюсер может использовать для буферизации записей, ожидающих отправки в Kafka;
- request.size — максимальный размер запроса на публикацию данных от продюсера (в байтах). Это фактически ограничение максимального размера несжатого пакета записей. Брокер имеет свой собственный предел размера пакета записей (после сжатия, если оно включено), который может отличаться от этого.
- max.bytes — максимальное количество байтов, возвращаемое в результате запроса на выборку данных, т.е. при потреблении сообщений на уровне потребителя;
- partition.fetch.bytes – максимальный объем данных на раздел, возвращаемый брокером в ответ на запрос потребителя. Потребитель извлекает записи пакетами. Если первый пакет записей в первом непустом разделе больше этого предела, пакет все равно будет возвращен, чтобы потребитель мог продолжить работу.
Например, если продюсер публикует в топик Kafka из 10 разделов сообщения размером 2 МБ, а потребители могут считывать максимум 5 записей за раз, необходимо настроить следующие конфигурации:
Уровень | Конфигурация | Значение |
Топик | max.message.bytes | 2097152 |
Продюсер | max.request.size | 2097152 |
batch.size | 10485760 | |
buffer.memory | 104857600 | |
Потребитель | max.partition.fetch.bytes | 10485760 |
fetch.max.bytes | 104857600 |
Поскольку потоковый конвейер начинается с продюсера, сперва нужно установить конфигурации на продюсере, затем на уровне топика, а потом на уровне потребителя. Можно задать максимальный размер сообщения на уровне брокера, определив предельное значение параметру message.max.bytes, но это не рекомендуется из-за неравномерного распределения нагрузки между узлами кластера и снизит его производительность.
Рекомендуемыми альтернативами вместо настройки Kafka на большой размер сообщений является их сжатие на уровне продюсера и/или разбиение большого сообщения на несколько более мелких, которые можно опубликовать в один раздел, задав ключ на стороне продюсера, чтобы гарантировать правильный порядок сообщений. Восстановить исходное большое сообщение из более мелких можно позже на стороне потребителя, если это необходимо.
Научитесь администрированию и эксплуатации Apache Kafka на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники