Обогащение ошибок при сбоях заданий Apache Flink с FLIP-304

Apache Flink примеры курсы обучение, Flink для дата-инженера, Flink для разработчика и дата-инженера, примеры курсы обучение дата-инженеров, инженерия больших данных, Школа Больших Данных Учебный Центр Коммерсант

Как FLIP-304 помогает понять причину сбоя и повысить надежность Flink-приложения: обогащение типовых сообщений об ошибках пользовательскими метаданными.

Зачем нужен FLIP-304 и как это позволяет дополнять сообщения об ошибках при сбоях заданий Apache Flink

Хотя Apache Flink имеет встроенные механизмы обеспечения отказоустойчивости, такие как контрольные точки и точки сохранения, а также горячее резервирование диспетчера заданий, сбои все равно случаются. Впрочем, отказы в распределенных системах неизбежны, поэтому их не нужно бояться, а важно наладить эффективное управление ими. Обычно отчеты об отказах в распределенных системах просто фиксируют факт сбоя, оставляя без ответа важные вопросы:

  • это проблема кода или проблема инфраструктуры?
  • какова причина сбоя?
  • как отслеживать и реагировать на подобные сбои в будущем?

Без подробных сведений отладка становится трудоемким процессом. Рассмотрим пример конвейера потоковой передачи данных в реальном времени, который обрабатывает данные о событиях пользовательского поведения. Внезапно задача завершается сбоем в одном из заданий Flink. Типовое сообщение об ошибке выглядит так:

Task failed: Slot 3, Operator: FilterTransformation Cause: NullPointerException

Не очень понятно, что именно вызвало сбой. Поэтому в релизе Flink 1.18 еще в прошлом году добавлено улучшение FLIP-304 (Flink Improvement Proposal): возможность подключать собственные средства обогащения события отказов.

Это сделано благодаря специальному подключаемому интерфейсу, который позволяет пользователям реализовывать собственные средства обогащения отказов с использованием универсальной структуры плагинов Flink. Обогатители отказов содержат пользовательскую логику и могут также выдавать предопределенные  метки (пары строк ключ-значение), которые выставляются через REST-интерфейс JobMaster. Так можно типизировать причины отказов, например, Пользователь или Система, Приложение или Платформа, а также уведомлять об этом потребителям нижестоящих уровней, такие как системы уведомлений.

Теперь все сбои потоковых задач в приложении Flink проходят через класс JobMaster, отвечающий за отвечает за выполнение одного графа заданий JobGraph,  и запускают ExecutionFailureHandler. Обработчик возвращает FailureHandlingResult, содержащий причину сбоя (как Throwable) и вершины для перезапуска, если сбой можно устранить.
Затем FailureHandlingResult сохраняется в виде моментального снимка и предоставляется как часть истории исключений ExceptionHistory в интерфейсах JobMaster (UI и REST), а также через сервер истории HistoryServer, если он включен. Аналогичным образом глобальные сбои, произошедшие в контексте планировщика, проходят через GlobalFailureHandler, создают FailureHandlingResults и затем сохраняются как часть моментального снимка.

Обогатитель сбоя Enricher, реализованный в рамках FLIP-304 работает следующим образом:

  • срабатывает при каждом глобальном или локальном сбое, а также при отключении задания;
  • получает Throwable-причину сбоя и неизменяемый контекст;
  • выполняет асинхронное выполнение (отдельный IoExecutor), чтобы избежать блокировки основного потока для RPC;
  • выдает метки/теги ошибок для своих уникальных, предопределенных ключей, которые определяются во время запуска.

Обогатители фактически запускаются как часть метода updateTaskExecutionState() класса JobMaster для локальных сбоев, часть disconnectTaskManager() для отключений и часть метода handleGlobalFailure() класса DefaultScheduler для глобальных сбоев. Это делает реализацию достаточно универсальной для работы со всеми планировщиками Flink и облегчает защиту от состояний гонки, например, параллельной обработки сбоев задач.

Например, следующий класс обогащает сбои, классифицируя ошибки, связанные с пользователем, и назначает им высокий приоритет.

public class CustomFailureEnricher implements FailureEnricher {
   private static final String typeKey = "TYPE";

   @Override
   public Set<String> getOutputKeys() {
        return Stream.of(typeKey).collect(Collectors.toSet());
   }

   @Override  
   public CompletableFuture<Map<String, String>> enrich(Throwable cause, final Context ctx) {  
        final Map<String, String> labels = new HashMap();
       if (ExceptionUtils.findThrowable(cause, ClassCastException.class).isPresent()) {
           labels.put(typeKey, "USER");
       } else {
           labels.put(typeKey, "SYSTEM");
       }
       return CompletableFuture.completedFuture(labels);

    }  
}

Класс CustomFailureEnricher предназначен для классификации ошибок на пользовательские и системные на основе наличия конкретного типа исключения (ClassCastException). Объект hashmap labels позволяет помечать сбои метаданными, такими как пары ключ-значение.

Таким образом, улучшение Flink, реализованное в FLIP-304 позволяет создавать более сложные и надежные решения. Например, можно расширить логику обогащения сбоев с помощью пользовательского кода, чтобы закодировать собственные стратегии обработки ошибок и исключений, включая дополнительные зависимости, типы ошибок или интеграцию с внешними системами, отвечающими на запросы в определенных сценариях.

Освойте возможности Apache Flink для пакетной и потоковой аналитики больших данных и машинного обучения на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

                                                                                Источники

  1. https://www.confluent.io/blog/flip-304-pluggable-failure-enrichers/
  2. https://cwiki.apache.org/confluence/display/FLINK/FLIP-304%3A+Pluggable+Failure+Enrichers
Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.