Сегодня рассмотрим опыт международной компании Emumba, которая специализируется на инженерии и аналитике больших данных. Читайте далее, как выгодно масштабировать конвейер потоковой передачи данных от миллионов устройств интернета вещей, используя Apache Kafka, KStream и Druid в облачной инфраструктуре AWS.
Архитектура PoC для потоковой передачи событий от миллионов IoT-устройств
Миллионы устройств интернета вещей регулярно отправляют данные в облачную платформу для обработки, где также выполняются операции обогащения данных, чтобы сделать их доступными для бизнес-аналитики. Из-за большого количества отправителей данных к центральной платформе выдвигаются специальные требования. Однажды в компании Emumba, которая создает аналитические конвейеры обработки данных, случился проект по разработке IoT-прототипа, где несколько десятков миллионов устройств регулярно отправляют небольшую полезную нагрузку в виде JSON-файлов размером 1,2 КБ в центральную платформу. Задача состояла в том, чтобы обеспечить масштабируемую потоковую обработку, прием и запрос данных в надежной и рентабельной архитектуре.
Проанализировав требования к платформе, архитекторы Emumba начали разрабатывать проект решения. Значения показателей ожидаемой нагрузки составили 250 событий весом около 0,3 ГБ в секунду. В день это выходило примерно 26 ТБ, что аналогично 15 тысяч HD-фильмов, которые необходимо было обрабатывать при приеме и потоковой обработке в реальном времени, чтобы вывести аналитику на дэшборды. Redshift и Snowflake в сочетании с AWS Kinesis не могли справиться с таким масштабом за приемлемую стоимость. Подробнее про сравнение Apache Kafka И AWS Kinesis читайте в нашей новой статье. Поэтому было решено выбрать архитектуру из следующих трех компонентов:
- Apache Kafka для приема данных, разделенных по составному ключу, что приводит к отдельным временным потокам;
- KStream — это абстракция потока записей пар ключ-значение в Kafka Streams, где каждая запись является независимой сущностью/событием в реальном мире. Это нужно для обработки потоков с отслеживанием состояния.
- Apache Druid в качестве хранилища данных запросов в реальном времени и модернизация этой NoSQL-СУБД в виде продукта Imply, который нужен для создания наглядных дэшбордов. Подробнее о том, что такое Apache Druid, мы писали здесь.
Эта архитектура была реализована и протестирована как подтверждение концепции (PoC, Prof of Concept) с данными, смоделированными только для 100 000 устройств. Установка PoC включала отдельный кластер Kafka и KStream с использованием экземпляров m5.xlarge с томами EBS. Чтобы предоставлять один пакет за отчетный интервал, была использована функция подавления в KStreams API и включен RocksDB в качестве хранилища ключей и значений для сохранения состояния потоковой передачи.
Поскольку все компоненты системы (Kafka, KStream и Druid) изначально масштабируемы, инженеры Emumba предположили, что с горизонтальным масштабированием настройки PoC не будет никаких проблем. Простая интерполяция показала, что кластера из 8 узлов Kafka и KStream будет достаточно, поэтому был создан кластер большего размера, чтобы справиться масштабами примерно в 30 раз больше. Также была включена репликация на разделах Kafka. Но тестирование показало, что PoC перестал справляться с нагрузкой при передаче данных со скоростью около 250 тысяч событий в секунду. Как это было исправлено, мы рассмотрим далее.
Решение проблем с Apache Kafka в облачной инфраструктуре AWS
Анализ системных логов показал, что скорость потребления вместо 250 000 событий в секунду для потоковых приложений периодически падала до нуля. Хотя сама платформа Kafka работала без сбоев, приложение KStream периодически останавливало процессор сервера обработки данных без видимых причин, при отсутствии узких мест в диске/сети. Потоковое приложение периодически простаивало. Попытки масштабировать потоковый кластер по вертикали (8 экземпляров m5.2xlarge) и по горизонтали (13 экземпляров m5.xlarge) вместе с увеличением количества разделов Kafka не исправили ситуацию.
Просмотрев использование памяти, было решено сбалансировать размер кучи JVM на узлах KStream, чтобы удовлетворить потребности процессора в памяти, а также разрешить использование памяти RocksDB, написанной на C++, для хранения данных окна. Поэтому были выполнены расчеты объема памяти, которая нужна узлу KStream за пределами кучи, с учетом размера данных, хранящихся в памяти, для каждого раздела для 15-минутного окна.
Это решило проблему, но по-прежнему возникали задержки, связанные с дисковым вводом-выводом, и скорость потребления потокового приложения периодически падала до нуля, когда оно работало в течение некоторого периода времени. Поэтому, чтобы справиться с ограничением дискового ввода-вывода, было решено использовать инстанс i3 en.xlarge, оптимизированный для хранения данных. Благодаря мониторингу системных метрик было замечено, что при использовании функции подавления приложению KStream требовалось значительно больше памяти и дискового ввода-вывода, чем требовалось stateful-вычислений Emumba. Поэтому была разработана и внедрена собственная UDF-подавления, которая значительно улучшила производительность. Однако, проблемы с блокировкой остались: скорость потребления с очень высокой (около 350 000 событий в секунду) снижалась до неприемлемо низкого уровня примерно через 30 минут работы приложения.
Углубившись в детали программно-аппаратных ограничений инстансов AWS, инженеры Emumba обнаружили, что большинство типов экземпляров поддерживают высокую скорость передачи трафика, но имеют низкую базовую пропускную способность. Для выбранного i3en.xlarge при пакетной скорости 25 Гбит/с базовая (постоянная) пропускная способность составляла 4,2 Гбит/с. Инстансы этого типа могут поддерживать максимальную производительность в течение 30 минут не реже одного раза в 24 часа. Поэтому инстанс i3en.xlarge, оптимизированный для хранения, не поддерживал устойчивые дисковые операции ввода-вывода.
Чтобы выбрать подходящий инстанс, следовало правильно оценить пропускную способность сети и диска в масштабе всего кластера для рассматриваемого сценария потоковой передачи с учетом архитектурных особенностей и стратегии репликации для Kafka и KStream. В рабочем развертывании кластер Kafka с минимальным коэффициентом репликации 2, который обеспечивает реплицирование данных на разные узлы, приводит к удвоению требований к сети (0,3 Гбит/с). А, поскольку Kafka записывает данные на диск, аналогичное удвоение относится и дисковому вводу-выводу.
С приложениями KStream отказоустойчивость достигается путем отправки данных хранилища состояний в раздел журнала изменений в кластере Kafka на каждом узле. Размер состояния приложения Emumba для каждого устройства оставлял около 1,2 КБ, т.е. это еще 0,3 Гбит/с нагрузки на сеть и дисковый ввод-вывод для записи в Kafka. Дополнительную нагрузку создает NoSQL-СУБД Druid, куда считываются данные из выходного топика Kafka. В итоге нагрузка на сеть и диски вырастает до 3 Гбит/с или 24 Гбит/с в масштабе кластера. Чтобы обеспечить эти требования, для кластера Kafka и KStream было решено использовать инстансы i3.8xlarge со следующими характеристиками:
- 32 ядра ЦП;
- 244 ГБ памяти;
- 10 Гбит/с устойчивая пропускная способность сети;
- 4 диска по 1,9 ТБ со скоростью ввода-вывода 875 МБ/с.
Поскольку потоковое приложение KStream потребляло много ресурсов ЦП, а Kafka требовала много дискового пространства, было решено объединить узлы брокера Kafka и KStream в 3-узловом кластере. Однако, тестирование показало, что один из узлов кластера имеет очень плохую производительность диска, хотя потоковые данные были равномерно сбалансированы, а на всех узлах выполнялись аналогичные операции. После удаления этого экземпляра и создания нового инстанса с такими же характеристиками проблема была решена. Таким образом, итоговая программно-аппаратная архитектура PoC на базе Apache Kafka, KStream и Druid показала возможность обработки более 250 000 событий в секунду в течение длительного периода времени.
Пример Emumba показывает \ключевые выводы для архитектора и дата-инженера, которые хотят создавать масштабируемые системы потоковой передачи:
- Следует все заранее просчитывать потребляемые ресурсы с учетом используемых технологий: простая интерполяция не работает в масштабировании инфраструктуры. Важно глубоко понимать архитектуру базовых распределенных систем, их стратегии масштабирования и взаимодействие между ними при больших объемах и скорости потоковых данных.
- Важно иметь понимание особенностей пропускной способности диска и сети. Базовая нагрузка для сетевого и дискового ввода-вывода исходит из расчетов вычислений и понимания поведения распределенной системы при производственной нагрузке. Как правило, в облачной инфраструктуре этот расчет подразумевает использование высокопроизводительных экземпляров, поддерживающих устойчивые скорости. Это довольно дорогостоящие инстансы, характеристики которых (память, ЦП и диск) сильно влияют на окончательную стоимость облачной инфраструктуры.
- Надо учитывать дополнительные рабочие нагрузки, чтобы оптимизировать утилизацию вычислительных ресурсов. В частности, в рассмотренном примере размещение Kafka и KStream на одном узле было по сути оптимизацией ресурсов. Аналогичный прием можно использовать для Kubernetes или EKS.
- Наконец, необходимо сравнивать технические и экономические характеристики возможных решений. Например, вместо Kafka и KStreams можно было выбрать AWS Kinesis и Kinesis Data Analytics, что помогло бы избежать трудностей с определением оптимальной облачной инфраструктуры. Однако, стоимость такого решения была бы намного выше. Важна совокупная стоимость владения (TCO, Total Cost Ownership), включающая все затраты, т.е. финансовые и трудовые ресурсы.
Освойте администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники