Обогащение ошибок при сбоях заданий Apache Flink с FLIP-304

Обогащение ошибок при сбоях заданий Apache Flink с FLIP-304

    Как FLIP-304 помогает понять причину сбоя и повысить надежность Flink-приложения: обогащение типовых сообщений об ошибках пользовательскими метаданными.

    Зачем нужен FLIP-304 и как это позволяет дополнять сообщения об ошибках при сбоях заданий Apache Flink

    Хотя Apache Flink имеет встроенные механизмы обеспечения отказоустойчивости, такие как контрольные точки и точки сохранения, а также горячее резервирование диспетчера заданий, сбои все равно случаются. Впрочем, отказы в распределенных системах неизбежны, поэтому их не нужно бояться, а важно наладить эффективное управление ими. Обычно отчеты об отказах в распределенных системах просто фиксируют факт сбоя, оставляя без ответа важные вопросы:

    • это проблема кода или проблема инфраструктуры?
    • какова причина сбоя?
    • как отслеживать и реагировать на подобные сбои в будущем?

    Без подробных сведений отладка становится трудоемким процессом. Рассмотрим пример конвейера потоковой передачи данных в реальном времени, который обрабатывает данные о событиях пользовательского поведения. Внезапно задача завершается сбоем в одном из заданий Flink. Типовое сообщение об ошибке выглядит так:

    Task failed: Slot 3, Operator: FilterTransformation Cause: NullPointerException

    Не очень понятно, что именно вызвало сбой. Поэтому в релизе Flink 1.18 еще в прошлом году добавлено улучшение FLIP-304 (Flink Improvement Proposal): возможность подключать собственные средства обогащения события отказов.

    Это сделано благодаря специальному подключаемому интерфейсу, который позволяет пользователям реализовывать собственные средства обогащения отказов с использованием универсальной структуры плагинов Flink. Обогатители отказов содержат пользовательскую логику и могут также выдавать предопределенные  метки (пары строк ключ-значение), которые выставляются через REST-интерфейс JobMaster. Так можно типизировать причины отказов, например, Пользователь или Система, Приложение или Платформа, а также уведомлять об этом потребителям нижестоящих уровней, такие как системы уведомлений.

    Теперь все сбои потоковых задач в приложении Flink проходят через класс JobMaster, отвечающий за отвечает за выполнение одного графа заданий JobGraph,  и запускают ExecutionFailureHandler. Обработчик возвращает FailureHandlingResult, содержащий причину сбоя (как Throwable) и вершины для перезапуска, если сбой можно устранить.
    Затем FailureHandlingResult сохраняется в виде моментального снимка и предоставляется как часть истории исключений ExceptionHistory в интерфейсах JobMaster (UI и REST), а также через сервер истории HistoryServer, если он включен. Аналогичным образом глобальные сбои, произошедшие в контексте планировщика, проходят через GlobalFailureHandler, создают FailureHandlingResults и затем сохраняются как часть моментального снимка.

    Обогатитель сбоя Enricher, реализованный в рамках FLIP-304 работает следующим образом:

    • срабатывает при каждом глобальном или локальном сбое, а также при отключении задания;
    • получает Throwable-причину сбоя и неизменяемый контекст;
    • выполняет асинхронное выполнение (отдельный IoExecutor), чтобы избежать блокировки основного потока для RPC;
    • выдает метки/теги ошибок для своих уникальных, предопределенных ключей, которые определяются во время запуска.

    Обогатители фактически запускаются как часть метода updateTaskExecutionState() класса JobMaster для локальных сбоев, часть disconnectTaskManager() для отключений и часть метода handleGlobalFailure() класса DefaultScheduler для глобальных сбоев. Это делает реализацию достаточно универсальной для работы со всеми планировщиками Flink и облегчает защиту от состояний гонки, например, параллельной обработки сбоев задач.

    Например, следующий класс обогащает сбои, классифицируя ошибки, связанные с пользователем, и назначает им высокий приоритет.

    public class CustomFailureEnricher implements FailureEnricher {
       private static final String typeKey = "TYPE";
    
       @Override
       public Set<String> getOutputKeys() {
            return Stream.of(typeKey).collect(Collectors.toSet());
       }
    
       @Override  
       public CompletableFuture<Map<String, String>> enrich(Throwable cause, final Context ctx) {  
            final Map<String, String> labels = new HashMap();
           if (ExceptionUtils.findThrowable(cause, ClassCastException.class).isPresent()) {
               labels.put(typeKey, "USER");
           } else {
               labels.put(typeKey, "SYSTEM");
           }
           return CompletableFuture.completedFuture(labels);
    
        }  
    }

    Класс CustomFailureEnricher предназначен для классификации ошибок на пользовательские и системные на основе наличия конкретного типа исключения (ClassCastException). Объект hashmap labels позволяет помечать сбои метаданными, такими как пары ключ-значение.

    Таким образом, улучшение Flink, реализованное в FLIP-304 позволяет создавать более сложные и надежные решения. Например, можно расширить логику обогащения сбоев с помощью пользовательского кода, чтобы закодировать собственные стратегии обработки ошибок и исключений, включая дополнительные зависимости, типы ошибок или интеграцию с внешними системами, отвечающими на запросы в определенных сценариях.

    Освойте возможности Apache Flink для пакетной и потоковой аналитики больших данных и машинного обучения на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

                                                                                    Источники

    1. https://www.confluent.io/blog/flip-304-pluggable-failure-enrichers/
    2. https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers
    [elementor-template id="13619"]