Еще 3 рекомендации для потоковых конвейеров Apache Flink

настройка Flink конвейера, настройка Flink приложений, конвейер приложений Apache Flink, Apache Flink примеры курсы обучение для разработчика, обучение Flink, курсы Flink, Flink для разработчиков и дата-инженеров, обучение большим данным, Школа Больших Данных Учебный центр Коммерсант

Продолжая недавний разговор про настройку конвейеров из Flink-приложений, сегодня рассмотрим, почему важна локальность данных, как избежать узких мест в приемниках потоковых данных и чем хорош HybridSource для объединения гетерогенных источников.

Обеспечьте локальность данных

Хотя распределенные системы обладают большим потенциалом по сравнению с локальными, позволяя обрабатывать больше данных, вычисления не происходят мгновенно. Одной из причин задержки является необходимость передавать данные по сети между узлами для операций соединения или свертки. Поэтому локальность данных является одним из факторов, который может существенно повысить скорость обработки информации.

Apache Flink предоставляет файловый приемник File Sink, способный записывать файлы в файловую систему или хранилище объектов, такое как HDFS, S3 или Google Cloud Storage. Помимо довольно простой настройки файлового приемника, нужно также обеспечить его эффективную работу. File Sink хранит список разделов или разделов в памяти, и каждый из них определяется распределителем BucketAssigner. Например, пользовательский BucketAssigner может использовать поле отметки времени в предоставленной записи для создания сегмента, который привязан к дате, например, date=2023-11-08. Это популярный формат разделов, используемый Apache Hive. Можно настроить File Sink, добавив его в существующий приемник потоковых записей:

val records: DataStream[Record] = … 
val fileSink: SinkFunction[Record] = …
records.addSink(fileSink)

Но при использовании этого решения в реальной среде на большом объеме исторических данных, т.е. выполнения backfilling-сценария, возникают проблемы с памятью. Приложение быстро использует всю доступную кучу Java и выходит из строя даже после увеличения памяти. Это происходит из-за того, что каждый диспетчер задач использует записи, которые могут оказаться в любом сегменте, а потому нужно хранить в памяти весь большой список сегментов. Из-за этого очень увеличивается время сборки и сброса файлов по каждому диспетчеру задач. Чтобы избежать сбоя из-за этой ситуации, можно вводить в файловый приемник записи по строке раздела, используя разделение по ключу:

val records: DataStream[Record] = …
val fileSink: SinkFunction[Record] = …
records.keyBy(_.partition).addSink(fileSink)

Такой прием позволяет направлять записи с одним и тем же разделом или сегментом в один и тот же диспетчер задач. В результате этого каждый диспетчер задач удерживает меньше данных в памяти и быстрее очищает файлы, избегая проблем с памятью. Такой подход хорошо работает, если выбранный раздел имеет хорошее распределение. Но если за некоторые даты, по которым было сделано разделение, данных значительно больше, чем за другие, что часто случается в backfilling-сценариях, возникнет перекос данных. А это также приводит к проблемам с памятью.

В этом случае надо улучшить распределение, добавив часы к ключу раздела, чтобы избавиться от перекоса и обеспечить локальность данных. Эту простую технику логической перетасовки данных для достижения хорошего параллелизма можно применить не только к файловому приемнику Apache Flink, но и к другим приемникам, а также операторам потоковой обработки.

Избегайте узких мест в приемнике

Конвейеры данных обычно имеют один или несколько приемников данных, таких как реляционная БД или NoSQL-хранилище, Apache Kafka и пр., которые иногда могут стать узкими местами в приложении Flink. Например, если целевой экземпляр базы данных имеет высокую загрузку ЦП, приложение Flink может не справиться с трафиком записи. При этом не выдается никаких исключений, но пропускная способность всей потоковой системы падает, а в пользовательском интерфейсе Flink появится отображение обратного давления. Когда приемники являются узким местом, этот механизм предупреждения сбоя потребителя из-за чрезмерной скорости публикации, с которой он не справляется. Обратное давление будет распространяться на все вышестоящие зависимости в конвейере. Поэтому важно убедиться в том, что приемники не станут узким местом.

В тех случаях, когда задержкой можно немного пожертвовать, полезно бороться с узкими местами путем первой пакетной записи в приемник в пользу более высокой пропускной способности. Запрос на пакетную запись — это процесс сбора нескольких событий в виде пакета и одновременной отправки их в приемник, а не отправки одного события за раз. Пакетная запись обеспечивает лучшее сжатие, снижение накладных расходов на передачу данных по сети и уменьшение нагрузки на процессор.

Также следует проверить и исправить искажения данных, которые также замедляют скорость вычислений из-за неравномерного распределения по узлам. Flink использует потоки с ключами, что предполагает разделение потока событий по определенному ключу и обработку разных разделов на разных узлах. Например, оператор KeyBy часто используется для повторного ввода ключа DataStream для выполнения агрегации или соединения. Он очень прост в использовании, но может вызвать множество проблем, если выбранный ключ не распределен должным образом. Например, для крупной ecommerce-платформы, обеспечивающей онлайн-торговлю для множества разных магазинов, идентификатор магазина в качестве ключа будет не лучшим выбором, поскольку у разных магазинов очень разный трафик. Из-за разной нагрузки некоторые диспетчеры задач Flink будут заняты обработкой данных, а другие будут простаивать. Это может легко привести к исключениям нехватки памяти и другим сбоям. Идентификаторы с низкой кардинальностью (< 100) также проблематичны, поскольку их сложно правильно распределить между диспетчерами задач.

Обойти этот побочный эффект можно с помощью группировки ключей распределения. Следует выбрать максимальное число, равное параллелизму оператора и случайно генерировать значение от 0 до максимального числа, а затем добавить его к своему ключу перед оператором keyBy. Так логика обработки распределится лучше, включая максимальное количество дополнительных сегментов на ключ. Но затем придется выбрать оптимальный способ объединить результаты. Например, если после обработки всех сегментов объем данных значительно уменьшился, можно использовать ключ по потоку с помощью исходного неидеального ключа, не создавая проблемного искажения данных. Другой подход может заключаться в объединении результатов во время запроса, если система запросов поддерживает это.

Используйте  HybridSource для объединения гетерогенных источников

В конвейерах обработки данных часто бывает объединение несколько разнородных источников в один с некоторым упорядочиванием. Например, много Flink-приложений читают данные из Kafka и пишут в нее. Чтобы снизить затраты на хранение данных, целесообразно настроить соответствующую политику очистки для каждого топика Kafka. Данные могут удаляться из топика Kafka по истечении определенного времени или архивироваться. При архивировании все данные топика Kafka обычно копируются в облачное хранилище объектов для долгосрочного хранения.

Если Flink-приложению надо прочитать данные из архивированного топика и из актуального, можно создать два источника, но это создает сложности с порядком обработки событий. Вместо этого лучше использовать гибридный источник (HybridSource), чтобы представить архивные данные и данные реального времени как один логический источник, который сначала считывает из архивов облачного хранилища по теме, а затем автоматически переключается на топик Kafka в реальном времени.

Разработчик приложения видит только одну логику DataStream, не беспокоясь о базовом механизме. Кроме того, HybridSource для чтения данных из облачного объектного хранилища позволяет использовать большее количество входных разделов для увеличения пропускной способности чтения, поскольку наборы данных объектного хранилища обычно разбиваются на тысячи разделов за раз, чтобы вместить огромные объемы исторических данных. Используя достаточное количество диспетчеров задач, Flink-приложение сможет быстро просматривать исторические данные, значительно сокращая время backfilling-сценариев по сравнению с чтением того же объема данных прямо из топика Kafka с небольшим числом разделов.

Пример Scala-кода для создания объекта DataStream с использованием HybridSourcePowered в классе KafkaBackfillSource выглядит так:

val stream: DataStream[KafkaEvent] =
  KafkaBackfillSource.asDataStream[KafkaEvent](
    "KafkaEventSource",
    "topic-name",
    ...
)

В этом фрагменте кода абстрагируется источник KafkaBackfillSource, позволяя разработчику оперировать архивными и актуальными данными Kafka как единым объектом DataStream. Таким образом, HybridSource позволяет Flink-приложению читать несколько разнородных источников данных в упорядоченном формате.

Узнайте больше про использование Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных и машинного обучения на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://shopify.engineering/optimizing-apache-flink-applications-tips
  2. https://shopify.engineering/optimizing-apache-flink-tips-part-two
Поиск по сайту