Как ускорить выполнение заданий Apache Flink с помощью спекулятивного выполнения

потоковая обработка данных с Apache Flink, перекосы данных Flink, Flink примеры обучение курсы, обучение большим данных, курсы по flink, обучение Apache Hadoop Flink SQL, Flink разработка приложений, курсы Apache Hadoop Flink SQL, курсы Hadoop для инженеров данных обучение примеры, обучение большим данным, Школа Больших Данных Учебный центр Коммерсант

Что такое спекулятивное выполнение заданий в Apache Flink, какой планировщик его поддерживает, какие конфигурации нужно настроить для его эффективного использования и зачем при этом переопределять поведение генератора разделений потокового источника данных.

Что такое спекулятивное выполнение заданий Apache Flink

Распределенная природа Apache Flink приводит к тому, что приложения, созданные с помощью этого фреймворка, часто сталкиваются с проблемой перекоса данных, когда они неравномерно распределены по узлам кластера. Это приводит к торможению распределенной программы из-за чрезмерной загрузки отдельных узлов и роста накладных расходов на передачу данных по сети. Подробнее об этом мы писали здесь и здесь.

Смягчить эту проблему можно, включив спекулятивное выполнение заданий – встроенный во фреймворк механизм устранения торможения, вызванного проблемными узлами. Впрочем, спекулятивное выполнение заданий актуально не только в случае перекоса данных. У проблемного узла также могут возникнуть проблемы с оборудованием, сбои сети или операций дискового ввода-вывода, высокая загрузка ЦП. Все это приводит к тому, что задачи на таком узле будут выполняться намного медленнее задач на других узлах, влияя на общее время выполнения пакетного задания. В таких случаях спекулятивное выполнение запускает новые попытки выполнения медленной задачи на узлах, которые не определены как проблемные. Новые попытки обрабатывают те же входные данные и производят те же данные, что и старая (тормозная) попытка, которая не останавливается и продолжает выполняться.

Однако, как только успешно завершится первая из попыток (любая новая или старая), ее выходные данные будут видны и использованы нижестоящими задачами, а оставшиеся попытки будут отменены. Для обнаружения медленных задач Flink использует соответствующий детектор. Узлы, в которых находятся медленные задачи, идентифицируются как проблемные узлы и блокируются с помощью механизма черного списка. Планировщик создаст новые попытки для медленных задач и развернет их на незаблокированных узлах.

Flink не поддерживает спекулятивное выполнение заданий DataSet, поскольку этот API станет устаревшим в ближайшем будущем. Сегодня рекомендуемым низкоуровневым API для разработки пакетных заданий Flink считается DataStream. Включить спекулятивное выполнение можно с помощью конфигурации execution.batch.speculative.enabled, установленной в значение true. В настоящее время спекулятивное выполнение заданий поддерживает только адаптивный планировщик пакетных заданий (Adaptive Batch Scheduler). Именно его используют пакетные задания Flink по умолчанию, если другое явно не задано. Подробнее о том, как планируются и исполняются задания Flink, мы рассказываем здесь.

Напомним, Adaptive Batch Scheduler может автоматически корректировать план выполнения и поддерживает автоматическое определение параллелизма операторов для пакетных заданий. Если для оператора не задан параллелизм, планировщик выберет для него параллелизм в соответствии с размером используемых наборов данных. Это освобождает разработчика пакетных заданий от настройки параллелизма Кроме того, автоматически настраиваемые параллелизмы обычно лучше соответствуют потребляемым наборам данных, объем которых часто меняется. Операторам из пакетных заданий SQL можно назначать различные параллелизмы, которые настраиваются автоматически.

Как это использовать: настройка конфигураций и параметров

Поскольку Adaptive Batch Scheduler является планировщиком по умолчанию для пакетных заданий Flink, для его использования никакой дополнительной настройки в конфигурации jobmanager.scheduler не требуется. Однако, следует оставить значение execution.batch-shuffle-mode незаданным или явно установить его в значение по умолчанию ALL_EXCHANGES_BLOCKING или ALL_EXCHANGES_HYBRID_FULL из ALL_EXCHANGES_HYBRID_SELECTIVE.

Чтобы спекулятивное выполнение работало лучше для разных заданий, можно настроить следующие параметры конфигурации планировщика: execution.batch.speculative.max-concurrent-executions и execution.batch.speculative.block-slow-node-duration. А также параметры конфигурации детектора медленных задач:

  • slow-task-detector.check-interval;
  • slow-task-detector.execution-time.baseline-lower-bound;
  • slow-task-detector.execution-time.baseline-multiplier;
  • slow-task-detector.execution-time.baseline-ratio.

Детектор медленных задач обнаруживает их на основе времени выполнения, периодически подсчитывая все завершенные выполнения. Если процент завершенных выполнений достигнет настроенного отношения slow-task-detector.execution-time.baseline-ratio, базовый уровень будет определен как медиана времени выполнения, умноженная на настроенный мультипликатор slow-task-detector.execution-time.baseline-multiplier. Затем запущенная задача, время выполнения которой превышает базовый уровень, будет маркирована как медленная. Стоит отметить, что время выполнения зависит от объема входных данных вершины выполнения. Поэтому выполнения с большими различиями в объеме данных, но близкой вычислительной мощностью не будут помечены как медленные задачи, когда происходит перекос данных. Это помогает избежать ненужных спекулятивных попыток.

Если узел является источником или используется режим гибридного перемешивания, оптимизация времени выполнения, взвешенная с объемом входных данных, не вступит в силу, поскольку объем входных данных нельзя оценить. Когда задание использует пользовательский источник данных (SourceEvent), необходимо изменить SplitEnumerator этого источника, чтобы реализовать интерфейс SupportsHandleExecutionAttemptSourceEvent:

public interface SupportsHandleExecutionAttemptSourceEvent { 
void handleSourceEvent(int subtaskId, int attemptNumber, SourceEvent sourceEvent); 
}

SplitEnumerator генерирует разделения и назначает их считывателям потокового источника данных, работая как отдельный экземпляр в диспетчере заданий для сохранения незавершенных операций разделения и их сбалансированное назначение читателям. Подробнее об этом мы писали здесь и здесь.

Переопределение SourceEvent означает, что SplitEnumerator должен знать о попытке отправки события. Иначе возникнут исключения, когда диспетчер заданий получит исходное событие от задач, что приведет к сбоям задания. Никаких дополнительных изменений не требуется для других источников для работы со спекулятивным выполнением, включая источники SourceFunction, источники InputFormat и новые источники. Все исходные коннекторы, предлагаемые Apache Flink, могут работать со спекулятивным выполнением.

Для обеспечения совместимости спекулятивное выполнение отключено по умолчанию для приемника, если он не реализует интерфейс SupportsConcurrentExecutionAttempts:

public interface SupportsConcurrentExecutionAttempts {
}

SupportsConcurrentExecutionAttempts работает для приемника потоковых данных, включая Sink, SinkFunction и OutputFormat. Если какой-либо оператор в задаче не поддерживает спекулятивное выполнение, вся задача будет помечена как не поддерживающая спекулятивное выполнение. Это означает, что если приемник не поддерживает спекулятивное выполнение, задача, содержащая оператор приемника, не может быть выполнена спекулятивно. В реализации приемника Flink отключает спекулятивное выполнение для Committer, включая операторы, расширенные с помощью WithPreCommitTopology и WithPostCommitTopology, чтобы избежать неожиданных проблем с одновременной фиксацией.

После включения спекулятивного выполнения медленных задач веб-интерфейс Apache Flink будет отображать спекулятивные попытки на вкладке подзадач (SubTasks) на странице задания. Также будут показаны заблокированные диспетчеры задач в кластере Overview Task Managers. Оценить эффективность спекулятивного исполнения поможет подробный просмотр системных метрик этих задач, доступный в веб-интерфейсе или лог-файлах.

Про режимы развертывания заданий Apache Flink читайте в нашей новой статье.

Узнайте больше про использование Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных и машинного обучения на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/speculative_execution/
  2. https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/elastic_scaling/
Поиск по сайту