Анализ данных в рамках пользовательский сеансов (сессий) – довольно востребованный кейс в Apache Spark, который не так просто реализовать из-за особенностей потоковой и пакетной обработки, а также эксплуатационных расходов. Сегодня рассмотрим, как работают сеансовые окна Spark Structured Streaming и каковы ограничения этого фреймворка.
Что такое сеансовые окна: краткий ликбез по аналитике больших данных
Оконные функции в Apache Spark мы уже рассматривали здесь. Напомним, при потоковой передаче в рамках сеанса группируются события, относящиеся к одному конкретному объекту (пользователю, устройству и пр.). Такую группировку можно рассматривать как окно, в котором накапливается вся активность этого объекта, за которым следует период бездействия или явное событие, закрывающее окно. Например, пользователь выполнил следующие действия:
- («click», «2021-10-12 20:00»);
- («link hover», «2021-10-32 20:03»);
- («click», «2021-10-32 20:06»);
- («click», «2021-10-12 20:10»);
Сессия заканчивается после закрытия браузера (действие «browser_close») или 10 минут бездействия. Сессия заканчивается в 20:20 и имеет следующий формат:
{«user_id»: …, «actions»: [«click», «link hover», «click», «click «],» duration_sec «: 600,» end_reason «:» inactivity «}.
Сеансовое окно (session window) разбивает поток на части конечного размера, к которым можно применять вычисления. Окна сеанса фиксируют период активности данных, который заканчивается перерывом в бездействии. В отличие от переворачивающихся (tumbling) и скользящих (sliding) окон, сеансовые окна не перекрываются и не имеют фиксированного времени начала и окончания. Промежуток бездействия используется для закрытия текущего сеанса, а следующие события назначаются новому сеансу. Сеансовые окна особенно полезны для анализа данных при исследовании действий пользователей за конкретный период времени, в течение которого они выполняли определенные действия. Однако, эта лаконичная идея оказалась не слишком проста в реализации [1]. Как это делается в Apache Spark, мы рассмотрим далее.
Анализ данных с помощью современного Apache Spark
Код курса
SPARK
Ближайшая дата курса
16 декабря, 2024
Продолжительность
32 ак.часов
Стоимость обучения
96 000 руб.
Сеансовая аналитика в пакетной и потоковой обработке Apache Spark
Для решения проблемы сеанса в потоковой передаче есть разные шаблоны, но наиболее распространенным являлась обработка с отслеживанием состояния. В зависимости от последнего сеанса идет накопление событий в каком-то состоянии с генерацией результатов при обнаружении завершающих действий. Состояние может иметь разный формат и храниться во внешнем хранилище данных с быстрым доступом: кэш в памяти, key-value хранилище с небольшой задержкой, основная память приложения с резервной копией в контрольной точке.
Например, чтобы отслеживать сеансы из потоков событий потребуется сохранять произвольные типы данных в качестве состояния и выполнять произвольные операции с состоянием, используя события потока данных в каждом триггере. Начиная с Apache Spark 2.2, это можно сделать с помощью операций mapGroupsWithState и flatMapGroupsWithState, которые позволяют применять определяемый пользователем код к сгруппированным наборам данных для обновления состояния, определенного пользователем. При этом разработчику стоит помнить о некоторых особенностях потоковой обработки в Apache Spark: функция состояния должна быть реализована с учетом семантики режима вывода. В частности, в режиме обновления (Update) функция состояния не должна выдавать строки, которые старше текущего водяного знака плюс допустимая задержка поздней записи. А в режиме добавления (Append) функция состояния может выдавать такие строки. Это ограничение глобального водяного знака, которое потенциально может вызвать проблемы сеансовой обработки.
Как и для любых произвольных операций с сохранением состояния в Apache Spark, изменение схемы пользовательского состояния и типа тайм-аута не допускается. Изменения в определяемой пользователем функции отображения состояний разрешены, но зависят от определяемой пользователем логики. Если необходимо поддерживать изменения схемы состояния, рекомендуется явно кодировать и декодировать сложные структуры данных состояния в байты, используя схему, которая поддерживает миграцию. Например, сохраняя состояние как байты в кодировке AVRO, можно изменять эту схему между перезапусками запроса, поскольку двоичное состояние всегда будет успешно восстановлено. Наконец, начиная с Apache Spark 2.4 не следует использовать операции mapGroupsWithState и flatMapGroupsWithState в режиме обновления перед соединением через JOIN [2].
Потоковая обработка в Apache Spark
Код курса
SPOT
Ближайшая дата курса
6 февраля, 2025
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.
Если нет жестких требований к обработке данных в режиме реального времени или около этого, т.е. допускается более высокая задержка, то для решения проблем сеанса можно использовать пакетный подход. Идея состоит в том, чтобы хранить все действия пользователя в распределенной и масштабируемой файловой системе, такой как AWS S3, Google Cloud Storage или Hadoop HDFS, чтобы генерировать сеансы через равные промежутки времени. В идеале это делается с помощью оркестратора типа Apache AirFlow. Ключевым аспектом здесь является обеспечение последовательности выполнения. Это означает, что генерация очередного события может начаться только в том случае, если генерация предыдущего завершилась корректно. Для повышения производительности файлы следует разбивать по времени событий, чтобы не фильтровать события, произошедшие за последние 24 часа для последней эпохи дня.
Возможно, в новой версии Apache Spark сеансовая аналитика в потоковой передаче станет проще благодаря функции session_window, которая использует метку времени и период бездействия в качестве параметров [3]: def session_window(timeColumn: Column, gapDuration: String)
Например, имеем следующие входные данные для одного пользователя:
- {«time»: «2021-10-03 19:39:34», «user_id»: «a»}
- {«time»: «2021-10-03 19:39:41», «user_id»: «a»}
- {«time»: «2021-10-03 19:39:42», «user_id»: «a»}
- {«time»: «2021-10-03 19:39:49», «user_id»: «a»}
- {«time»: «2021-10-03 19:40:03», «user_id»: «a»}
Первое событие – это запись {«time»: «2021-10-03 19:39:34», «user_id»: «a»}, а другие записи означают последующие события с периодом бездействия менее 10 секунд, кроме последнего.
Группировка этих данных в виде датафрейма сеансового окна длительностью 10 секунд будет выглядеть так:
val sessionDF = df.groupBy(session_window(‘time, «10 seconds»), ‘user_id)
.count
В результате получим 4 события, которые попадают в первое окно сеанса, потому что период бездействия меньше 10 секунд. Пятое событие попадет в следующее сеансовое окно для этого пользователя, поскольку интервал активности превысил 10 секунд.
Узнайте больше практических примеров использования Apache Spark для разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
Источники