Сегодня рассмотрим, как оптимизировать потребление памяти в приложениях Apache Flink, разобрав основные принципы работы и конфигурации настройки памяти этого вычислительного фреймворка. А также перечислим типовые ошибки, с которыми дата-инженер может столкнуться при разработке и эксплуатации Flink-приложений
Компоненты памяти в Apache Flink
Apache Flink обеспечивает эффективные рабочие нагрузки поверх JVM, строго контролируя использование памяти различными компонентами. Чтобы обеспечить максимальную производственную ценность для наших пользователей, Flink позволяет выполнять как высокоуровневую, так и точную настройку распределения памяти в кластерах.
Общая память процессов Flink JVM состоит из памяти, потребляемой Flink-приложением (общая память Flink), и JVM для запуска процесса. Общее потребление памяти Flink включает использование кучи JVM и памяти вне кучи (прямой или собственной).
Общая память состоит из следующих компонентов:
- куча JVM, т.е. куча фреймворка (объем памяти, зарезервированной для исполнителей задач, которая не будет выделяться слотам задач) и куча задач (объем памяти для исполнителей задач, зарезервированный для задач);
- управляемая память вне кучи, которая управляется диспетчером памяти, зарезервирована для сортировки, хеш-таблиц, кэширования и серверной части состояния RocksDB;
- память фремворка вне кучи и задачи вне кучи, т.е. прямая и собственная память JVM, зарезервированная для задач;
- сетевая память, которая может использовать кэш в прямой памяти при обмене данными между задачами (перетасовке, т.е. shuffle-операциях). Подробнее про сетевые буферы читайте в нашей новой статье.
- метапространство и служебные данные JVM, такие как стек потоков, кэш кода, освобождаемое пространство сборщика мусора и пр.
Самый простой способ управлять памятью Flink — это настроить любой из следующих параметров:
- общий размер памяти Flink через параметр taskmanager.memory.flink.size для менеджера задач (TaskManager) или опцию jobmanager.memory.flink.size для менеджера заданий (JobManager). Этот способ лучше подходит для автономных развертываний, когда нужно объявить, сколько памяти предоставляется самому Flink.
- общая память процесса через параметр memory.process.size для менеджера задач (TaskManager) или опцию jobmanager.memory.process.size для менеджера заданий (JobManager). При настройке общей памяти процесса, следует указать, сколько общей памяти должно быть назначено процессу Flink JVM. Для контейнерных развертываний это соответствует размеру запрошенного контейнера Kubernetes или Yarn.
- настроить необходимые внутренние компоненты общей памяти Flink, характерные для конкретного процесса Flink.
При выборе одного из способов остальные компоненты памяти будут настроены автоматически, исходя из значений по умолчанию или дополнительно настроенных параметров. Настройка памяти Flink необходима для всех случаев, кроме локального выполнения, иначе запуск Flink-приложения завершится ошибкой. Однако, не рекомендуется смешивать эти способы, явно настраивая как общую память процесса, так и общую память Flink. Это может привести к сбоям развертывания из-за потенциальных конфликтов конфигурации памяти.
Задания Flink без сложных операций, связанные с состоянием, такие как соединения таблиц, таймеры и т.д., не требуют большого количества управляемой памяти. Можно настроить требуемую долю (по умолчанию 0,4) общей памяти Flink в качестве управляемой памяти для выделения заданиям в зависимости от их сложности.
Справедливости ради стоит отметить, что на практике не всегда получается использовать весь объем памяти кучи в зависимости от алгоритма сборки мусора: некоторые из них выделяют для себя определенный объем кучи памяти, который нельзя изменить. Естественное непрямое использование памяти в пользовательском коде также может учитываться как часть памяти вне кучи. Лимит прямой памяти JVM добавляется для процесса JobManager только в том случае, если установлена соответствующая опция jobmanager.memory.enable-jvm-direct-memory-limit.
Если явно не настроить память компонента, Flink будет рассчитывать ее сам на основе общей памяти. Расчетное значение ограничено соответствующими опциями min/max. Например, если установлены только следующие параметры памяти:
- общая память процесса = 1000 МБ,
- минимальные накладные расходы JVM = 128 МБ;
- максимальные накладные расходы JVM = 256 МБ;
- доля накладных расходов JVM = 0,1,
тогда накладные расходы JVM будут 128 МБ, потому что размер, полученный из расчета, составляет 100 МБ = (1000 МБ x 0,1), и он меньше минимального. Расчет игнорируется, если определены размеры общей памяти и других ее компонентов. В этом случае накладные расходы JVM — это остальная часть общей памяти. Производное значение по-прежнему должно находиться в пределах диапазона между минимальным и максимальным значениями, иначе конфигурация не будет выполнена. К примеру, если установлены только следующие параметры памяти:
- общая память процесса = 1000 МБ,
- куча задач = 100 МБ;
- минимальные накладные расходы JVM = 64 МБ;
- максимальные накладные расходы JVM = 256 МБ;
- доля накладных расходов JVM = 0,1,
тогда все остальные компоненты общей памяти процесса будут иметь значения по умолчанию, включая долю управляемой памяти или память вне кучи в диспетчере заданий. А накладные расходы JVM будут не 100 МБ, а оставшаяся часть общей памяти процесса в диапазоне 64–256 МБ.
Далее рассмотрим типовые ошибки, с которыми дата-инженер может столкнуться при разработке и эксплуатации Flink-приложений.
ТОП-6 проблем с памятью и способы борьбы с ними
Основными исключениями, вызванными проблемами с памятью в Apache Flink, являются следующие:
- исключение IllegalConfigurationException, созданное TaskExecutorProcessUtils или Обычно это указывает на наличие недопустимого значения конфигурации, например, отрицательного размера памяти или расчета доли более 1. Также оно может случиться, если возник конфликт конфигурации.
- ошибка OutOfMemoryError: Java heap space указывает, что куча JVM слишком мала. Можно увеличить размер кучи JVM, увеличив общий объем памяти, или же напрямую увеличить память кучи задач для TaskManager или память кучи JVM для JobManager. Еще справиться с этой ошибкой помогает увеличение памяти кучи фреймворка для диспетчеров задач, но этот параметр стоит менять только в том случае, если самому фреймворку действительно нужно больше памяти.
- ошибка OutOfMemoryError: Direct buffer memory указывает на то, что лимит прямой памяти JVM слишком мал или есть прямая утечка памяти. Надо проверить, использует ли пользовательский код или другие внешние зависимости прямую память JVM и правильно ли она учитывается. Можно увеличить ее предел, настроив прямую память вне кучи.
- ошибка OutOfMemoryError: Metaspace указывает на то, задано недостаточное ограничение метапространства JVM. Его следует увеличить, изменив параметр метапространства JVM для TaskManagers или JobManagers.
- исключение IOException: Insufficient number of network buffers актуально только для менеджеров задач и указывает на то, что размер сконфигурированной сетевой памяти недостаточно велик. Можно сетевую память, настроив параметры memory.network.min, taskmanager.memory.network.max, taskmanager.memory.network.fraction.
- ошибка Container Memory Exceeded, которая возникает, если контейнер Flink пытается выделить память сверх запрошенного размера (Yarn или Kubernetes). Обычно это указывает на то, что Flink не зарезервировал достаточно собственной памяти. Можно обнаружить такую ситуацию с помощью внешней системы мониторинга, например, Grafana или Prometheus, либо из сообщений об ошибках, когда контейнер уничтожается средой развертывания. Столкнувшись с этой проблемой в процессе JobManager, можно включить ограничение прямой памяти JVM, установив параметр memory.enable-jvm-direct-memory-limit, чтобы исключить возможную утечку прямой памяти JVM. Если в качестве бэкэнда хранения состояний stateful-приложений Apache Flink использует база данных RocksDB (RocksDBStateBackend), о чем мы писали здесь, и управление памятью отключено, можно увеличить управляемую память TaskManager. Если управление памятью включено, и память не кучи увеличивается во время точки сохранения или полных контрольных точек, что случается из-за распределителя памяти glibc, можно добавить переменную среды MALLOC_ARENA_MAX=1 для менеджеров задач. Еще одной альтернативой является увеличение накладных расходов JVM.
Освойте тонкости настройки и использования Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники