Почему stateful-приложения Apache Flink падают в AWS: RocksDB и IOPS облачных SSD

курсы по Flink, разработка Apache Flink, обучение разработчиков Big Data, Apache Flink курсы обучение RocksDB

Продолжая разбирать особенности разработки потоковых приложений Apache Flink, сегодня рассмотрим проблему падения пропускной способности задания из-за встроенного хранилища состояний RocksDB и ее зависимость от производительности дисков. Вас ждет настоящая детективная история о том, как важно заглядывать под капот облачных кластеров и настраивать конфигурации своих stateful-приложений потоковой аналитики больших данных с учетом ограничений PaaS-провайдера.

RocksDB и stateful-приложения Apache Flink: краткий ликбез

Из 3-х возможных backend’ов для хранения состояний stateful-приложений Apache Flink на практике чаще всего используется RocksDBStateBackend на базе встраиваемой key-value NoSQL-СУБД RocksDB. Во время выполнения Flink-задания RocksDB встраивается в процессы TaskManager и работает в собственных потоках с локальными файлами [1].

С помощью RocksDBStateBackend текущее (рабочее, In-flight) состояние приложения сначала записывается во внутреннюю память вне кучи, а затем сохраняется на локальные диски при достижении настроенного порогового значения. Текущее состояние в RocksDBStateBackend передается файлам на диске, поэтому его производительность напрямую влияет на RocksDB. Именно из-за такой зависимости для ускорения работы stateful-приложения Flink рекомендуется располагать каталог с данными о его текущем состоянии на локальном диске, а не в местах удаленного сетевого расположения в NFS или Hadoop HDFS. Для повышения пропускной способности и скорости следует выбирать локальные SSD-диски [1]. Однако, такая простая рекомендация не всегда реализуема на практике, особенно когда речь идет о крупных Big Data системах, развернутых в облачных кластерах. В этом случае нужно подстраиваться под особенности и ограничения PaaS-провайдера, что мы и рассмотрим далее на примере выполнения Flink-заданий в кластере Amazon Elastic Kubernetes Service (EKS) в рамках кейса интернета вещей (IoT, Internet of Things).

Что случилось с заданием Flink в облаке AWS: кейс компании Ververica

Подобно тому как Confluent занимается популяризацией и коммерциализаций Apache Kafka, а Databricks – Spark, немецкая компания Ververica продвигает Flink для разработки распределенных приложений и аналитики больших данных. Рассматриваемый кейс включает IoT-задачу обработки потока событий, исходящих от множества устройств. Каждое событие содержит идентификатор устройства, тип события и метку времени, когда событие было сгенерировано. Flink-задание разделяет поток на основе идентификатора устройства, сопоставляет тип события с последней меткой времени его получения и сохраняет это в состоянии приложения. Из сотни типов событий для каждого входящего события Flink-задание считывает метку времени из состояния для полученного типа и сравнивает ее с входящим timestamp. Если входящая метка времени новее, то сохраненный в состоянии timestamp обновляется.

Это задание Flink выполняется в кластере Amazon EKS – среде запуска и масштабирования приложений Kubernetes в облаке AWS и локально. Дополнительное конфигурирование EKS не проводилось, использовались настройки по умолчанию. На Flink TaskManager выделено 1,5 ядра ЦП и 4 ГБ памяти. В качестве хранилища состояний используется RocksDB, настроенную на управляемую память Flink. Параметр конфигурации state.backend.rocksdb.localdir явно не задан, поэтому каталог хранения временных данных /tmp на корневом томе базового экземпляра EC2 используется по умолчанию для рабочего (текущего) состояния RocksDB [2]. Это состояние (in-flight), над которым работает задание Flink, всегда хранится локально в памяти с возможностью переноса на диск и может быть утеряно при сбое задания, не влияя на возможность восстановления задания [1].

Сперва Flink-задание отлично работало на EKS, но через какое-то время его производительность резко снизилась. Однажды это падение произошло в 23:50 на уровне около 10 тысяч событий в секунду. Остановка задания с перезапуском из точки сохранения (savepoint) состояния не помогли: его пропускная способность все равно оставалась низкой, а подобные отказы повторялись все чаще. При этом падении пропускной способности загрузка ЦП контейнера TaskManager также уменьшилась. А потребление памяти в контейнере TaskManager достигло предела выделения задолго до падения пропускной способности, не особо изменившись в час Х, т.е. 23:50.

Чтобы определить причины замедления Flink-задания и понять, что случилось с TaskManager, DevOps-инженеры Flink включили управленческие расширения Java (JMX, Java Management Extensions) для контроля и управления приложениями, установив следующие параметры TaskManager JVM [2]:

env.java.opts.taskmanager: >-

              -Dcom.sun.management.jmxremote

              -Dcom.sun.management.jmxremote.authenticate=false

              -Dcom.sun.management.jmxremote.ssl=false

              -Dcom.sun.management.jmxremote.local.only=false

              -Dcom.sun.management.jmxremote.port=1099

              -Dcom.sun.management.jmxremote.rmi.port=1099

              -Djava.rmi.server.hostname=127.0.0.1

Подключив локально работающую VisualVM к TaskManager, специалисты просмотрели загрузку ЦП и определили, что 93% времени CPU занимал поток UpdateState, который запускает оператор UpdateState для считывания и обновления состояния в RocksDB. Внутри потока UpdateState почти все время ЦП было занято собственным методом org.rocksdb.RocksDB.get(). Это свидетельствует, что само Flink-задание стало бутылочным горлышком чтения состояния из RocksDB.

Для дальнейшей отладки были включены следующие Flink-метрики RocksDB [2]:

  • backend.rocksdb.metrics.block-cache-capacity: true
  • backend.rocksdb.metrics.block-cache-pinned-usage: true
  • backend.rocksdb.metrics.block-cache-usage: true
  • backend.rocksdb.metrics.estimate-table-readers-mem: true

По умолчанию все собственные метрики RocksDB отключены, так как они могут отрицательно сказаться на производительности Flink-задания, поэтому их следует очень осторожно применять в production. Все вышеотмеченные метрики относятся к блочному кэшу – памяти, где RocksDB кэширует данные для чтения. Этот блочный кэш быстро заполнился в первые несколько минут при запуске задания, в основном за счет записей состояния. Однако, это не объясняет внезапного падения пропускной способности около 23:50. Когда запись состояния отсутствует в блочном кеше RocksDB, ее чтение из этого хранилища состояний потребует дисковых операций ввода-вывода (IOPS). Поэтому DevOps-инженеры Ververica решили искать причину проблемы в дисках [2], что мы и рассмотрим далее.

IOPS в SSD-томах сервисов Amazon  и не только

Перейдя к проверке дисковых метрик корневого тома, специалисты Ververica нашли, что пропускная способность чтения упала в момент снижения пропускной способности задания Flink. То же самое произошло и с пропускной способностью записи. После проверки емкости дисковых IOPS-операций, было обнаружено, что по умолчанию каждый экземпляр EC2 в кластере AWS EKS, созданном по умолчанию, является экземпляром m5.large с корневым gp2-томом универсального хранилища Amazon Elastic Block Store (EBS) [2]. Обычно этот высокопроизводительный сервис блочного хранилища для Amazon EC2 обеспечивает высокую пропускную способность и интенсивность транзакций рабочих нагрузок при любом масштабе.

Официальная документация Amazon EC2 отмечает, что тома с поддержкой SSD общего назначения (gp2 и gp3) и SSD с выделенным IOPS (io1 и io2) обеспечивают стабильную производительность независимо от того, является ли операция ввода-вывода случайной или последовательной. Размер IOPS ограничен 256 КБ для томов SSD и 1024 КБ для томов HDD, поскольку SSD-технология обрабатывает небольшие или случайные операции ввода-вывода намного эффективнее, чем HDD. Когда небольшие операции ввода-вывода являются физически последовательными, Amazon EBS пытается объединить их в одну IOPS-операцию до максимального возможно размера. Аналогично, когда IOPS-операции превышают максимальный размер, Amazon EBS пытается разделить их на более мелкие операции [3].

Напомним, для SSD-дисков величина IOPS сильно зависит от алгоритмов прошивки микроконтроллера и скорости работы интерфейса памяти. При последовательном доступе к данным на большом размере блока IOPS становится максимальным. У большинства SSD на основе флэш-модулей NAND величина IOPS на запись намного меньше, чем на чтение из-за того, что при попытке повторной записи в один и тот же блок запускается сборка мусора, и запись выполняется в менее используемый блок для увеличения срока службы носителя [4].

В рассматриваемом кейсе компании Ververica корневой том размером 80 ГБ обеспечивал базовую скорость 240 IOPS-операций в секунду. Поэтому DevOps-инженеры пришли к выводу, что диск переполнился, и потому производительность Flink- задания резко упала.

RocksDB, Flink, AWS, IOPS, Amazon Web Services, Kubernetes
Анализ производительности IOPS-операций SSD-дисков кластера AWS EKS

Изначально высокие показатели IOPS обеспечивались кредитами ввода-вывода, которые AWS предоставляет каждому gp2-тому для поддержки пакетных IOPS-запросов. Когда эти первоначальные кредиты были исчерпаны, а пакетный баланс упал до 0, и случилась проблема с Flink-заданием. Определив это, специалисты Ververica решили подключить выделенный том с высокой IOPS-скоростью, например, использовать том gp3 или io1/io2, а затем установить в нем каталог для хранения состояния Backend.rocksdb.localdir в конфигурации Flink [2].

Таким образом, из всей этой детективной истории можно сделать вывод, для эффективной разработки Flink-приложений необходимо знать особенности не только этого фреймворка, но и окружения, в котором он развернут. О том, как упростить процесс разработки и отладки Flink-приложений с помощью Byteman, читайте в нашей новой статье.

Освоить все тонкости разработки дата-конвейеров и распределенных приложений потоковой аналитики больших данных с Apache Flink, а также прочими компонентами экосистемы Hadoop вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://flink.apache.org/2021/01/18/rocksdb.html
  2. https://www.ververica.com/blog/the-impact-of-disks-on-rocksdb-state-backend-in-flink-a-case-study
  3. https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ebs-io-characteristics.html
  4. https://ru.wikipedia.org/wiki/IOPS
Поиск по сайту