Перекосы данных в Apache Flink и что с ними делать: MapReduce Combiner и Bundle оператор

потоковая обработка данных с Apache Flink, перекосы данных Flink, Flink примеры обучение курсы, обучение большим данных, курсы по flink, обучение Apache Hadoop Flink SQL, Flink разработка приложений, курсы Apache Hadoop Flink SQL, курсы Hadoop для инженеров данных обучение примеры, обучение большим данным, Школа Больших Данных Учебный центр Коммерсант

Мы уже разбирали некоторые советы оптимизации Flink-приложений, связанные с неравномерным распределением данных по вычислительным узлам. Сегодня рассмотрим, как при этом пригодится паттерн MapReduce Combiner, который часто используется в экосистеме Apache Hadoop и вместо него лучше применить Bundle оператор, доступный с версии Flink 1.15.

Проблема неравномерного распределения в Big Data вообще и в Apache Flink в частности

Неравномерное распределение данных по разделам снижает скорость их обработки: одни задачи становятся перегруженными, в то время как другие простаивают. Рассмотрим это на примере анализа данных о пользовательском поведении, включая события просмотра страниц, клики и т.д. Платформа анализирует эти данные и связывает их с профилями посетителей, отправляя этот обогащенный поток событий в топик Kafka с частотой около 3000 (с всплесками до 5000) событий в секунду. Необходимо в режиме реального времени отслеживать обновления данных по количеству активных пользователей, самым просматриваемым страницам и продолжительности пользовательских сеансов. При этом требовалось создавать метрики временных рядов с минутной детализацией и предоставить возможность просмотра/выборки сводных данных по сайтам и переменным временным окнам.

Основным вычислительным механизмом решения является управляемый кластер Flink от AWS Kinesis Data Analytics в качестве инструмента обработки потоков. Агрегированные результаты минутной детализации сохраняются в базе данных PostgreSQL. При этом во входящем потоке данных присутствует неизбежная асимметрия, которая влияет на производительность задания Apache Flink. Перекос данных относится к несбалансированному распределению набора данных относительно отдельного поля. Например, население России распределено крайне неравномерно: почти 70% жителей сосредоточено в европейской части России, которая составляет примерно 21% всей территории страны. Впрочем, большинству наборов данных присуща неизбежная асимметрия, ведь большинство людей действительно живут в больших городах. Но при параллельной обработке неравномерно распределенных данных возникает проблема, когда некоторые узлы будут перегружены вычислениями с огромными объемами данных, в то время как остальные будут фактически простаивать.

В рассматриваемом примере задание Flink для создания и хранения поминутных агрегаций, столкнулось именно с этой проблемой: поле агрегации данных сильно искажено, что снижало производительность при попытке ввода по нему. В качестве примера рассмотрим крупный интернет-магазин одежды, который представляет собой огромный маркетплейс, включая множество различных фронтов, каждый со своим собственным брендом и веб-сайтами. Все события, заказы и общий трафик данных перенаправляются в центральный топик Apache Kafka, данные откуда используются для анализа пользовательского поведения. Предположим, сообщение, отправляемое в топик Kafka, имеет следующий формат:

{
“typeOfEntity”: “EVENT”,
“typeOfEvent”: “VIEW”,
“site”: “mainstream.com”,
“page”: “/homepage”,
“guestID”: “<<UUID>>”,
“sessionID”: “<<UUID>>”,
“timestamp”: “2022–10–17 12:57:20”
}

Чтобы подсчитать количество просмотров каждого сайта в минуту в реальном времени, можно построить следующий ETL-конвейер Apache Flink:

val windowResult = env.addSource(source)
.name(“Messages”)
.map(Message.fromLine(_))
.filter(m => m.typeOfEntity == “EVENT” && m.typeOfEvent == “VIEW”)
.name(“FilteredEvents”)
.keyingBy(_.site)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new AggregateViews)
.name(“ViewEventsAggregation”)

Атрибут keyingBy использует поле сайт. При запуске этого ETL-конвейера в кластере Flink, сообщения отправляются в подзадачи асимметричным образом в соответствии с количеством событий, которые есть на каждом сайте. При этом большая часть сообщений поступает с более крупных сайтов всего за несколько подзадач, а остальные простаивают. Решить проблему такого дисбаланса можно с помощью статических или динамических весов, учитывая количество ожидаемых по каждому сайту сообщений.

{
“mainstream.com”: 500,
“niche1.com”: 20,
“niche2.com”: 15,
“niche3.com”: 12
}

Зная, насколько популярен каждый сайт, можно построить карту этих весов и использовать ее для повторной балансировки агрегации. При этом целесообразно применить прием добавления так называемой «соли» через функцию случайного хэширования, что будет включать веса и соединяться их в качестве нового значения атрибута keyingBy. Это позволит разделить датасеты по самым популярным сайтам на более мелкие фрагменты и обрабатывать их равномерно. Но если сайты локализованы в разных частях мира, то время суток активно влияет на их посещаемость и создает неравномерность. Поэтому вместо статических весов следует использовать динамические. Это влечет за собой сохранение относительного количества событий, полученных за последние несколько минут с каждого сайта, а затем использование этой информации для создания нового набора весов каждые X минут. Но это решение вызывает дополнительные накладные расходы, связанные вычислением весов. Поэтому можно уменьшить количество записей, отправляемых в оконную функцию, путем предварительной агрегации и соединения нескольких записей в одну. Это известный шаблон MapReduce Combiner, который часто используется в экосистеме Apache Hadoop. Как применить его к рассматриваемому случаю во Flink-заданиях, и почему вместо него лучше использовать Bundle оператор, мы рассмотрим далее.

Что такое MapReduce Combiner и чем полезен Bundle оператор

Итак, чтобы сократить объем вычислений, нужно соединить поступающие события в более мелкие фрагменты относительно количества просмотров, а затем отправить эти пакеты на последний этап агрегирования.

Flink для дата-инженеров и разработчиков примеры курсы обучение, потоковая обработка данных Apache Flink
Как устранить перекосы данных в Apache Flink

Самый прямой и интуитивно понятный способ реализовать эту идею сводится к написанию простой функции плоского сопоставления из шаблона MapReduce Combiner в качестве предварительной агрегации. Однако, это не учитывает водяные знаки, и к тому времени, когда предварительное соединение будет завершено, некоторые записи будут за пределами их временных окон и будут считаться опоздавшими событиями. Решить эту проблему позволит оператор Bundle, который есть в Apache Flink 1.15. Работа оператора Bundle заключается в накоплении значений в одну запись до тех пор, пока не будет выполнено условие закрытия пакета. Обычно это определенное количество записей, объединенных в пакет. Затем пакет отправляется во вторую агрегацию, которая вычисляет окончательные числа и отправляет результат в приемник, например, база данных PostgreSQL.

Таким образом, следует написать функцию, реализующую Bundle оператор, и добавить новый шаг в ETL-конвейер, который предварительно агрегирует записи с помощью этого метода.

val windowResult = env.addSource(source)
.name("Messages")
.map(Message.fromLine(_))
.filter(m => m.typeOfEntity == “EVENT” && m.typeOfEvent == “VIEW”)
.name("FilteredEvents")
.transform (
"PreAggregation",
new MapBundleOperator[(String, Instant), Int, Message, Message] (
new AggregateViewsBundleFunction,
new CountBundleTrigger(100),
new KeySelector[Message,(String, Instant)] {
override def getKey(in: Message) = (in.site, in.timestamp)
}
)
)
.keyingBy(_.site)
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
.aggregate(new AggregateViews)
.name("ViewEventsAggregation")

Шаг PreAgregation использует MapBundleOperator и MapBundleFunction:

class AggregateViewsBundleFunction extends MapBundleFunction[(String, Instant), Int, Message, Message] { override def addInput(value: Int, input: Message): Int = value + 1 override def finishBundle (
  buffer: java.util.Map[(String, Instant), Int],
  out: Collector[Message]
 ): Unit = {
  val outputValues = buffer.asScala.map(Message.fromMap _)
    .toSeq.sortBy(_.timestamp.toEpochMilli)(Ordering[Long].reverse)
  outputValues.foreach(out.collect)
 }
}

В ситуациях с низкой пропускной способностью ограничение записи для пакетов будет игнорироваться, поскольку окно водяных знаков (по умолчанию 200 мс) почти всегда будет закрываться первым и принудительно закрывать пакет. Оба этих параметра (ограничение пакета и окно водяных знаков) можно точно настроить. Таким образом, применение оператора Bundle в коде Flink приложения позволяет выровнять распределение нагрузки между подзадачами и повысить производительность, а также дает возможность управлять перекосом данных во всем наборе без ущерба для скорости обработки.

Еще одним преимуществом этого способа является его гибкость, поскольку он позволяет справиться с неожиданными всплесками неравномерной пропускной способности, например, крупная распродажа на одном из сайтов большого интернет-магазина.

О другом способе ускорить SQL-запросы с оператором JOIN к неравномерно распределенным данными в Apache Flink с помощью мини-пакетных агрегаций для потоковых таблиц, читайте в нашей новой статье. А о том, как ускорить выполнение распределенной программы, используя внутренний механизм спекулятивного выполнения пакетных заданий, вы узнаете здесь.

Освойте тонкости использования Apache Flink и Spark для потоковой обработки событий в распределенных приложениях аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/@sanr_71172/dealing-with-data-skew-in-flink-b7e4c82c35ef
  2. https://data-flair.training/blog/hadoop-combiner-tutorial/

 

Поиск по сайту