В этой статье для дата-инженеров и разработчиков распределенных приложений потоковой аналитики больших данных разберем пример перевода сервиса Strava с кластера Cassandra в облачное хранилище AWS S3 и какую роль в этом сыграл вычислительный движок Apache Spark.
Постановка задачи: слишком дорогая Cassandra
Strava – это глобальный сервис отслеживания активности велосипедистов, бегунов и других спортсменов с помощью мобильных устройств, включающий приложения, веб-сайт, базу данных сохранённых тренировок, API для доступа к ним и другие компоненты. Strava также позиционируется как соцсеть, где профессиональные спортсмены и энтузиасты-любители могут сравнивать свои результаты, делиться опытом, рассказывать об особенностях трасс и пр.
В Strava используется OpenStreetMaps (OSM), карта мира с открытым исходным кодом, которая регулярно обновляется силами сообщества. Одним из продуктов сервиса Strava является Metroview, основанный на наборе данных о деятельности спортсменов, сопоставленный с участками дорог в OSM. Данные GPS, загруженные в Strava, сопоставляются с базовой картой для набора данных Metroview. Этот датасет очень большой и включает около 6 миллиардов действий, каждое из которых проходит по многим участкам дорог. Обновление дорог происходит ежемесячно, что представляет собой огромную нагрузку на сервис, т.к. следует не только сравнить обновленные данные путей с базовой картой, но и записать эти изменения в базу данных.
Metroview предоставляет интерфейс для этого датасета, позволяя нарезать данные по времени, типу деятельности и цели, а также экспортировать данные для загрузки в геоинформационные системы, чтобы комбинировать данные Strava с информацией о границах стран, переписи населения, рельефе местности или локальных ограничениях на перемещения.
Изначально данные хранились в NoSQL-СУБД Cassandra, которая отличается надежностью и отлично подходит для задач, связанных с графами. Однако, при обновлении базовой карты и последующего сохранения в Cassandra измененных трасс возникла проблема слишком долгой записи. Предварительный расчет времени, проведенный дата-инженерами сервиса Strava, показал, что при текущей пропускной способности заданий на запись, это заполнение займет около месяца. При этом потребуется резко увеличить размер кластера Cassandra, что потенциально удвоит стоимость хранения датасета.
Поэтому возникла необходимость в более дешевом хранилище данных с однократной записью и быстрым чтением, оптимизированным для задач доступа к данным в продукте Metroview — поиск по пересечениям границ за определенный период времени. Как это было сделано, рассмотрим далее.
Экономия с AWS S3 и Apache Spark
Поскольку идентификаторы границ монотонно увеличиваются вместе с геохеш-кодированием местоположения граничного узла, добавление индекса в хранилище на граничном идентификаторе даст преимущества в производительности за счет локальности кэша. Поэтому нужен первичный индекс по идентификатору границы, где записи с одним и тем же идентификатором сортируются по отметке времени. Для поддержки кэша выравнивания границ в памяти дата-инженеры Strava выбрали AWS S3 – облачное объектное хранилище от Amazon. В него легко можно записывать большие наборы данных с помощью Spark-заданий, а затраты на хранение данных меньше, чем в СУБД. С учетом сценариев использования Metroview, производительность параллельных операций чтения в хранилище данных не очень важна, по сравнению с продуктом, предназначенным для непосредственно для самих спортсменов.
Хотя выравнивания границ сохранены в S3 и могут использоваться для составления глобальной тепловой карты перемещений спортсменов-пользователей сервиса, данные не индексируются и не форматируются для получения в реальном времени. Чтобы исправить это, были разработаны Spark-задания для преобразования неиндексированных данных выравнивания в отсортированный и индексированный байтовый формат. Датасет был разделен по диапазонам идентификаторов границ, а каждый раздел отсортировали по дате и идентификатору действия.
В Spark-заданиях датасет был сперва партиционирован по диапазону по идентификатору границы, а затем применен метод sortWithinPartitions() для вторичной сортировки. Эта комбинация операций приводит к большому объему памяти для каждого ядра, поэтому возникла задача избегать чрезмерной нагрузки на ядра ЦП, чтобы уменьшить объем памяти для каждого исполнителя Spark. Для этого было решено вручную записывать файлы с отсортированными записями в mapPartitions – оператор, который принимает функциональный аргумент и работает с итератором, не требующим, чтобы все данные раздела хранились в памяти. Можно сгруппировать записи в итераторе и записать каждую группу с подключением к S3, инициализированным для каждого раздела. Таким образом, каждый раздел, представляющий записи для диапазона идентификаторов границ, записывается в набор файлов фиксированного размера в S3.
Каждую запись для создания файла представлена в виде байтов в ByteBuffer – формата, который сжимает данные более эффективно, чем сериализация объектов Java или Kyro, а также имеет высокую производительность на стороне чтения. При фиксированном количестве записей на байтовый файл результаты в итоге пишутся в S3.
Данные в S3 хранятся в индексированном, отсортированном и упакованном по байтам формате. Индекс загружается при перезапуске или запуске сервиса. Поскольку записи и файлы имеют фиксированный размер, индекс является бесплатным для сериализованного массива объектов при чтении данных на сервере. Каждый ключ в индексе соответствует диапазону идентификаторов границ, указывающих на префикс в S3 и содержащий небольшой набор файлов, упакованных в байты. Если клиент запрашивает границу, которая еще не сохранена в кэше сервера, сервер ищет диапазон ее нахождения и извлекает байтовые файлы из S3.
Поскольку сами файлы содержат байты, их можно загружать их непосредственно в массив байтов Java/Scala при загрузке. Более того, Java-метод System.arraycopy может выполняться одновременно в нескольких потоках, загружающихся в один и тот же массив байтов. Так можно выполнять многопоточное чтение нескольких файлов из расположения S3, привязав нагрузку к сети AWS. После загрузки массива байтов используется предварительно отсортированное свойство записей в массиве для поиска диапазонов границ, сокращая время поиска и обработки данных при последующих загрузках границ вблизи первого загруженной.
Изначально размер кэша ограничен памятью, выделенной сервису. Но, добавив согласованную схему хеширования с ThriftMux, можно значительно увеличить объем доступной памяти для полного распределенного кэша. Согласованное хеширование выделяет определенные подмножества кэша каждому экземпляру кластерного сервиса, позволяя произвольно масштабировать размер кэша в зависимости от количества экземпляров, назначенных сервису.
Результат перехода на этот кэш с поддержкой S3 по сравнению с Cassandra дал огромные преимущества в скорости загрузки: от 2 до 10 раз в зависимости от того, были ли данные кэшированы в серверной службе Metroview. Теперь нет необходимости использовать дорогостоящую распределенную NoSQL-СУБД Cassandra. Наконец, продукт Metroview может обновляться вместе с остальной инфраструктурой Strava. Однако, описанное решение значительно увеличило объем памяти внутреннего сервиса для размещения нового кэша, и появились затраты на его поддержку.
Узнайте больше про использование Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
Источники