Как расширить возможности Apache Flink с помощью дополнительных плагинов: подключение внешних ресурсов и обогащение отказов пользовательскими метками. Разбираемся с продвинутыми настройками для эффективной эксплуатации фреймворка.
Внешние ресурсы Apache Flink
Помимо процессора и памяти, многим рабочим нагрузкам также требуются другие ресурсы, например, графические процессоры для глубокого обучения. Для поддержки внешних ресурсов Flink предоставляет соответствующую структуру. Фреймворк поддерживает запрос различных типов ресурсов из базовых систем управления ресурсами, например, Kubernetes, и предоставляет операторам информацию, необходимую для использования этих ресурсов. Apache Flink поддерживает различные типы ресурсов: встроенные плагины для графического процессора, или пользовательские плагины для любых типов ресурсов.
В целом структура внешних ресурсов выполняет две функции:
- устанавливает соответствующие поля запросов ресурсов для запроса ресурсов из базовой системы в соответствии с конфигурацией;
- предоставляет операторам информацию, необходимую для использования ресурсов.
При развертывании в системах управления ресурсами, таких как Kubernetes или YARN, структура внешних ресурсов гарантирует, что выделенный под или контейнер будет содержать желаемые внешние ресурсы. В частности, Kubernetes поддерживает графический процессор, FPGA и пр. через механизм плагина устройства, начиная с версии 1.10. А YARN поддерживает эти ресурсы с версий 2.10 и 3.1. Информация о внешних ресурсах, содержащая основные свойства, необходимые для их использования, генерируется настроенными драйверами внешних ресурсов. Запуская приложение в автономном режиме, надо убедиться, что внешние ресурсы доступны.
Flink предоставляет собственный плагин для ресурсов графического процессора. Он использует скрипт обнаружения для обнаружения индексов устройств GPU, доступ к которым можно получить из информации о ресурсе через свойство индекса. Этот скрипт можно использовать для обнаружения графических процессоров NVIDIA или разработать свой собственный сценарий. В настоящее время для всех операторов метод getExternalResourceInfos() класса RuntimeContext возвращает один и тот же набор информации о ресурсах. Поэтому один и тот же набор устройств GPU всегда доступен всем операторам, работающим в одном диспетчере задач, без изоляции на уровне оператора.
Чтобы использовать внешние ресурсы, необходимо добавить их имена в конфигурационный файл, и задать следующие свойства:
- <resource_name>.amount – количество внешнего ресурса, которое следует запросить из внешней системы;
- external-resource.<resource_name>.yarn.config-key – ключ конфигурации в Это необязательный параметр. Если он настроен, внешняя платформа ресурсов добавит этот ключ в профиль ресурсов запросов контейнера для диспетчера ресурсов YARN
- external-resource.<resource_name>.kubernetes.config-key – ключ конфигурации в Это необязательный параметр. Если он настроен, внешняя платформа ресурсов добавитпараметры resources.limits.<config-key> и resources.requests.<config-key> в основную спецификацию контейнера TaskManager и установит для него значение external-resource.<resource_name>.amount.
- external—resource.<resource_name>.driver—factory.class – имя класса фабрики для внешнего ресурса, идентифицируемого <resource_name>. Если настроено, фабрика будет использоваться для создания экземпляров драйверов в структуре внешних ресурсов. Если этот параметр не настроен, запрошенный ресурс будет существовать до тех TaskManager пор, пока настроены соответствующие параметры. Однако, в этом случае оператор не получит никакой информации о ресурсе RuntimeContext.
- external—resource.<resource_name>.param.<param>) – необязательные параметры драйвера,шаблон именования пользовательских параметров конфигурации для внешнего ресурса, указанного <resource_name>. Только конфигурации, соответствующие этому шаблону, будут переданы в фабрику драйверов этого внешнего ресурса.
Пример конфигурации Apache Flink, в которой указаны два внешних ресурса:
external-resources: gpu;fpga # Define two external resources, "gpu" and "fpga". external-resource.gpu.driver-factory.class: org.apache.flink.externalresource.gpu.GPUDriverFactory # Define the driver factory class of gpu resource. external-resource.gpu.amount: 2 # Define the amount of gpu resource per TaskManager. external-resource.gpu.param.discovery-script.args: --enable-coordination # Define the custom param discovery-script.args which will be passed into the gpu driver. external-resource.fpga.driver-factory.class: org.apache.flink.externalresource.fpga.FPGADriverFactory # Define the driver factory class of fpga resource. external-resource.fpga.amount: 1 # Define the amount of fpga resource per TaskManager. external-resource.fpga.yarn.config-key: yarn.io/fpga # Define the corresponding config key of fpga in Yarn.
Обработка отказов: обогащение ошибок пользовательскими метками
Flink предоставляет пользователям подключаемый интерфейс для регистрации своей собственной логики и дополнения ошибок дополнительными метками метаданных в виде строковых пар ключ-значение. Это позволяет разработчикам реализовывать свои собственные плагины обогащения сбоев для классификации сбоев заданий, предоставления пользовательских показателей или выполнения вызовов внешних систем уведомлений. Для этого используется класс FailEnricher. Его экземпляры запускаются каждый раз, когда JobManager сообщает об исключении во время выполнения. Каждый экземпляр FailEnricher может асинхронно возвращать метки, связанные со сбоем, которые затем отображаются через REST API JobManager, например, метка type:System говорит о том, что сбой классифицируется как системная ошибка.
Чтобы реализовать собственный плагин FailEnricher, необходимо добавить свой FailEnricher, реализовав одноименный интерфейс, например, так:
package org.apache.flink.test.plugin.jar.failure; public class CustomEnricher implements FailureEnricher { private final Set<String> outputKeys; public CustomEnricher() { this.outputKeys = Collections.singleton("labelKey"); } @Override public Set<String> getOutputKeys() { return outputKeys; } @Override public CompletableFuture<Map<String, String>> processFailure( Throwable cause, Context context) { return CompletableFuture.completedFuture(Collections.singletonMap("labelKey", "labelValue")); } }
Каждый метод FailEnricher должен иметь набор выходных ключей , которые могут быть связаны со значениями. Этот набор ключей должен быть уникальным, иначе все пользовательские обработчики отказов с перекрывающимися ключами будут игнорироваться.
Затем надо добавить свою собственную фабрику FailEnricherFactory, реализовав одноименный интерфейс, например, так:
package org.apache.flink.test.plugin.jar.failure; public class TestFailureEnricherFactory implements FailureEnricherFactory { @Override public FailureEnricher createFailureEnricher(Configuration conf) { return new CustomEnricher(); } }
Далее следует добавить запись о сервисе, создав файл META-INF/services/org.apache.flink.core.failure.FailureEnricherFactory, содержащий имя класса FailEnricherFactory. Наконец, надо упаковать все эти файлы, классы и внешне зависимости в JAR-архив и поместить его в каталог с произвольным именем в дистрибутиве Flink.
JobManager загружает плагины FailEnricher при запуске. Чтобы они были загружены, все имена классов должны быть определены как часть конфигурации jobmanager.failure-enrichers. Если эта конфигурация пуста, обогатители запускаться НЕ будут.
Таким образом, добавление своих собственных дополнений ошибок с помощью плагинов FailEnricher ускорит процесс отладки большого приложения Apache Flink со сложной логикой обработки данных.
Освойте применение Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных и машинного обучения на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники