Как FLIP-304 помогает понять причину сбоя и повысить надежность Flink-приложения: обогащение типовых сообщений об ошибках пользовательскими метаданными.
Зачем нужен FLIP-304 и как это позволяет дополнять сообщения об ошибках при сбоях заданий Apache Flink
Хотя Apache Flink имеет встроенные механизмы обеспечения отказоустойчивости, такие как контрольные точки и точки сохранения, а также горячее резервирование диспетчера заданий, сбои все равно случаются. Впрочем, отказы в распределенных системах неизбежны, поэтому их не нужно бояться, а важно наладить эффективное управление ими. Обычно отчеты об отказах в распределенных системах просто фиксируют факт сбоя, оставляя без ответа важные вопросы:
- это проблема кода или проблема инфраструктуры?
- какова причина сбоя?
- как отслеживать и реагировать на подобные сбои в будущем?
Без подробных сведений отладка становится трудоемким процессом. Рассмотрим пример конвейера потоковой передачи данных в реальном времени, который обрабатывает данные о событиях пользовательского поведения. Внезапно задача завершается сбоем в одном из заданий Flink. Типовое сообщение об ошибке выглядит так:
Task failed: Slot 3, Operator: FilterTransformation Cause: NullPointerException
Не очень понятно, что именно вызвало сбой. Поэтому в релизе Flink 1.18 еще в прошлом году добавлено улучшение FLIP-304 (Flink Improvement Proposal): возможность подключать собственные средства обогащения события отказов.
Это сделано благодаря специальному подключаемому интерфейсу, который позволяет пользователям реализовывать собственные средства обогащения отказов с использованием универсальной структуры плагинов Flink. Обогатители отказов содержат пользовательскую логику и могут также выдавать предопределенные метки (пары строк ключ-значение), которые выставляются через REST-интерфейс JobMaster. Так можно типизировать причины отказов, например, Пользователь или Система, Приложение или Платформа, а также уведомлять об этом потребителям нижестоящих уровней, такие как системы уведомлений.
Теперь все сбои потоковых задач в приложении Flink проходят через класс JobMaster, отвечающий за отвечает за выполнение одного графа заданий JobGraph, и запускают ExecutionFailureHandler. Обработчик возвращает FailureHandlingResult, содержащий причину сбоя (как Throwable) и вершины для перезапуска, если сбой можно устранить.
Затем FailureHandlingResult сохраняется в виде моментального снимка и предоставляется как часть истории исключений ExceptionHistory в интерфейсах JobMaster (UI и REST), а также через сервер истории HistoryServer, если он включен. Аналогичным образом глобальные сбои, произошедшие в контексте планировщика, проходят через GlobalFailureHandler, создают FailureHandlingResults и затем сохраняются как часть моментального снимка.
Обогатитель сбоя Enricher, реализованный в рамках FLIP-304 работает следующим образом:
- срабатывает при каждом глобальном или локальном сбое, а также при отключении задания;
- получает Throwable-причину сбоя и неизменяемый контекст;
- выполняет асинхронное выполнение (отдельный IoExecutor), чтобы избежать блокировки основного потока для RPC;
- выдает метки/теги ошибок для своих уникальных, предопределенных ключей, которые определяются во время запуска.
Обогатители фактически запускаются как часть метода updateTaskExecutionState() класса JobMaster для локальных сбоев, часть disconnectTaskManager() для отключений и часть метода handleGlobalFailure() класса DefaultScheduler для глобальных сбоев. Это делает реализацию достаточно универсальной для работы со всеми планировщиками Flink и облегчает защиту от состояний гонки, например, параллельной обработки сбоев задач.
Например, следующий класс обогащает сбои, классифицируя ошибки, связанные с пользователем, и назначает им высокий приоритет.
public class CustomFailureEnricher implements FailureEnricher { private static final String typeKey = "TYPE"; @Override public Set<String> getOutputKeys() { return Stream.of(typeKey).collect(Collectors.toSet()); } @Override public CompletableFuture<Map<String, String>> enrich(Throwable cause, final Context ctx) { final Map<String, String> labels = new HashMap(); if (ExceptionUtils.findThrowable(cause, ClassCastException.class).isPresent()) { labels.put(typeKey, "USER"); } else { labels.put(typeKey, "SYSTEM"); } return CompletableFuture.completedFuture(labels); } }
Класс CustomFailureEnricher предназначен для классификации ошибок на пользовательские и системные на основе наличия конкретного типа исключения (ClassCastException). Объект hashmap labels позволяет помечать сбои метаданными, такими как пары ключ-значение.
Таким образом, улучшение Flink, реализованное в FLIP-304 позволяет создавать более сложные и надежные решения. Например, можно расширить логику обогащения сбоев с помощью пользовательского кода, чтобы закодировать собственные стратегии обработки ошибок и исключений, включая дополнительные зависимости, типы ошибок или интеграцию с внешними системами, отвечающими на запросы в определенных сценариях.
Освойте возможности Apache Flink для пакетной и потоковой аналитики больших данных и машинного обучения на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники