Сегодня рассмотрим, с какими нетиповыми ошибками может столкнуться дата-инженер при работе с Apache Flink, а также как решить эти проблемы. Где и что править, когда сервер BLOB-объектов завис из-за слишком большого количества подключений, почему не хватает памяти при развертывании Flink-приложений в кластере Kubernetes и как ускорить инициализацию заданий.
Особенности работы с BLOB-объектами
Apache Flink — отличная платформа распределенной обработки данных для создания stateful-приложений, выполняющих вычисления как в потоковом, так и в пакетном режимах. Однако, при работе с Flink, как и с другим любым фреймворком, иногда случаются нетривиальные ситуации, выход из которых находится не сразу. Например, при запуске Flink на виртуальных машинах, даже в небольшом масштабе (1 JobManager и 3 TaskManager) задания в кластере не могут переключиться на новые диспетчеры задач из-за невозможности вызвать сервер BLOB-объектов. Сервер BLOB-объектов является компонентом JobManager и обеспечивает создание структуры каталогов для хранения BLOB-объектов или их временного кэширования.
При этом сервер BLOB-объектов продолжает работать и прослушивает порт, который отображается в логе, а с помощью инструментов netstat и ss можно увидеть открытые подключения к этому порту. В конфигурации Flink за соединение с BLOB-объектом отвечает параметр blob.fetch.num-concurrent, который определяет максимальное количество одновременных запросов на получение BLOB-объектов, обслуживаемых диспетчером заданий. По умолчанию это значение по умолчанию равно 50.
Проблема заключается в том, что сервер BLOB-объектов находится в состоянии ожидания time_wait из-за слишком большого количества подключений. И, когда TaskManager пытается подключиться к серверу BLOB-объектов из-за перезапуска задания или отработки отказа, новые подключения к серверу BLOB-объектов не могут быть установлены. Проблему решает перезапуск JobManager с более высоким значением параметра конфигурации blob.fetch.num-concurrent. Таким образом, необходимо постоянно следить за этой метрикой, чтобы вовремя скорректировать эту конфигурацию, поскольку в реальных проектах высокая доступность является обязательной для JobManager.
Тонкости работы Flink-приложений на Kubernetes
При масштабировании кластера Flink на виртуальных машинах можно столкнуться с некоторыми ограничениями. Например, при развертывании на Kubernetes нужно настроить корректные конфигурации для подов, где задания Flink, что является достаточно сложной задачей. Недостаток памяти или ошибка Out Of Memory (OOM) часто возникает в приложениях на Kubernetes, когда объем обрабатываемых ими данных растет. В частности, в кластере Flink, где выполняются пакетные задания, может случиться OOM метапространства JVM в JobManager. Если эти пакетные задания отправляются в кластер ежедневно, но классы не будут выгружаться, OOM произойдет в метапространстве JVM. Если эти пакетные задания не являются критичными с точки зрения бизнеса, самый простой путь решения такой проблемы сводится к периодическому перезапуску кластера Flink. Поэтому дата-инженеру очень важно отслеживать метрики JVM, такие как размер кучи, метапространство, количество потоков, загруженные классы и пр. Подробнее о том, какие метрики JVM надо отслеживать, мы рассматривали здесь.
Еще одной причиной OOM-ошибки при развертывании Flink-приложений в кластере Kubernetes может быть несогласованность настроек памяти. Например, если кластер Flink развертывается с Helm-диаграммы, нужно корректно настроить в файле values.yaml ограничения памяти пода Kubernetes, а также taskmanager.numberOfTaskSlots и taskmanager.process.size для Apache Flink. Если вместо значений по умолчанию для всех этих конфигураций подставить свои, к примеру, уменьшив taskmanager.numberOfTaskSlots и лимиты памяти подов Kubernetes, но не обновить соответственно taskmanager.process.size, то это значение превысит пределы памяти подов Kubernetes. Хотя Flink-задания будут выполняться, используемый размер кучи при этом растет, а потому некоторые поды TaskManager будут завершены из-за OOM.
Наконец, OOM-ошибка может возникнуть и из-за проблем в заданиях Flink. Например, если key-value NoSQL-СУБД RocksDB не применяется в качестве бэкенда Flink-приложений, большая часть памяти используется для кучи JVM. При этом часто запускается полная сборка мусора (GC), что дополнительно снижает производительность потокового приложения. Проблема может скрываться в том, что Flink не удаляет заблаговременно окна с истекшим сроком действия, если по логике работы приложения удаление вызывается событием, наступающим после истечения срока действия окна. В результате в памяти накапливается много окон с истекшим сроком действия, что приводит к OOM. Избежать этого можно, уменьшив данные, хранящиеся в окнах.
Слишком долгая инициализация задания
При росте количества заданий в кластере Flink для запуска каждого из них в TaskManager требуется все больше времени. В итоге задание может находиться в состоянии инициализации в течение 5 минут после его отправки. После перехода в состояние выполнения, задание может зависнуть в ожидании выделения слотов задач, фактически не начиная выполняться. В итоге проблемное задание блокирует все последующие. Причина этой проблемы может быть в разрыве соединений между ResourceManager, Dispatcher и TaskManager: даже при наличии доступных слотов для задач, ResourceManager не может запросить и выделить их. Перезапуск диспетчера задач устраняет эту проблему, нормализуя процесс распределения заданий, и позволяя выполняться всем заданиям.
Однако, медленный процесс инициализации задания в кластере Apache Flink на Kubernetes может быть связан с настройками этой платформы контейнерной виртуализации в ConfigMap – объекте API, где сохраняются метаданные, неконфиденциальные данные и информация JobMaster в парах ключ-значение. Поды могут использовать ConfigMaps как переменные среды, аргументы командной строки или как файлы конфигурации в томе. ConfigMap позволяет отделить конфигурацию среды от образов контейнеров, чтобы упростить переносимость приложений. Если ConfigMap отмененных заданий не очищается, в Kubernetes остается много этих неиспользуемых API-объектов. Некоторые компоненты (ResourceManager, JobManager, Dispatcher, RestEndpoint) и задания Flink имеют отдельные службы выбора лидера и ConfigMap. Ограничение одновременных запросов в конфигурации kubernetes.max.concurrent.requests определяет, сколько ConfigMaps может просматривать Flink-клиент Kubernetes одновременно.
Узнайте больше про использование возможностей Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники