Как Apache Flink работает с файловыми системами

Flink HDFS S3 GCS файловая система, Apache Flink примеры курсы обучение для разработчика, обучение Flink, курсы Flink, Flink для разработчиков и дата-инженеров, обучение большим данным, Школа Больших Данных Учебный центр Коммерсант

Какие файловые системы поддерживает Apache Flink: средства взаимодействия с файлами, хранящимися локально или в объектных хранилищах HDFS, S3 и GCS.

Особенности работы с файловыми системами в Apache Flink

Apache Flink имеет собственную абстракцию файловой системы через класс org.apache.flink.core.fs.FileSystem. Эта абстракция обеспечивает общий набор операций и минимальные гарантии для различных типов реализаций файловых систем. Набор доступных операций в классе FileSystem довольно ограничен и не поддерживает весь возможные спектр файловых систем. Например, добавление или изменение существующих файлов не поддерживается. Файловая система идентифицируется ее схемой, такой как file://, hdfs://и т. д. Схема file предназначена для работы с локальной файловой системой. Доступ к другим типам файловых систем осуществляется реализацией, которая связывает набор файловых систем, поддерживаемых Apache Hadoop:

  • hdfs — распределенная файловая система Hadoop;
  • s3, s3n, и s3a — файловая система Amazon S3;
  • gcs — облачное хранилище Google Cloud Storage.

Flink прозрачно загружает файловые системы Hadoop, если находит классы файловой системы Hadoop в пути к классам и находит допустимую конфигурацию Hadoop. По умолчанию он ищет конфигурацию Hadoop в пути к классам. В качестве альтернативы можно указать пользовательское местоположение через запись конфигурации fs.hdfs.hadoopconf.

Экземпляры класса FileSystem и их выходные потоки FsDataOutputStream используются для постоянного хранения данных результатов работы потоковых приложений, а также для обеспечения их отказоустойчивости и восстановления. Поэтому важно четко определить семантика постоянства этих потоков.

В Apache Flink, данные, записываемые в выходной поток, считаются постоянными, если выполняются два требования:

  • Требование видимости: должно быть гарантировано, что все другие процессы, машины, виртуальные машины, контейнеры и т. д., которые могут получить доступ к файлу, видят данные последовательно, когда им задан абсолютный путь к файлу. Это требование похоже на семантику close-to-open , определенную в POSIX, но ограничено самим файлом (его абсолютным путем).
  • Требование к долговечности, т.е. к устойчивости и постоянству файловой системы. Они специфичны для конкретной файловой системы. Например, локальная файловая система (LocalFileSystem) не дает никаких гарантий устойчивости при сбоях аппаратного обеспечения и операционной системы, в отличие от реплицированных распределенных файловых систем, таких как HDFS. Распределенные файловые системы обычно гарантируют устойчивость при наличии до n одновременных отказов узлов, где n — фактор репликации.

Примечательно, что обновления родительского каталога файла, такие, чтобы файл отображался в списке содержимого каталога, не обязательно должны быть полными, чтобы данные в файловом потоке считались постоянными. Это важно для файловых систем, в которых обновления содержимого каталогов согласуются только в конечном итоге (eventual consistency). Поэтому класс FSDataOutputStream должен гарантировать сохранение данных для записанных байтов после возврата вызова его метода FSDataOutputStream.close().

Для отказоустойчивых распределенных файловых систем данные считаются постоянными после того, как они были получены и подтверждены файловой системой, как правило, путем репликации на кворум компьютеров. Так реализуется требование к устойчивости. Кроме того, абсолютный путь к файлу должен быть виден всем другим машинам, которые потенциально будут иметь доступ к файлу – это реализация требования видимости. Попадают ли данные в энергонезависимое хранилище на узлах хранения, зависит от конкретных гарантий конкретной файловой системы. Обновления метаданных в родительском каталоге файла не обязательно должны достигать согласованного состояния.

Допустимо, что некоторые машины видят файл при просмотре содержимого родительского каталога, а другие нет, если доступ к файлу по его абсолютному пути возможен на всех узлах. Локальная файловая система должна поддерживать семантику POSIX close-to-open. Поскольку локальная файловая система не имеет никаких гарантий отказоустойчивости, дополнительных требований не существует. Поэтому данные могут все еще находиться в кэше ОС, если они считаются постоянными с точки зрения локальной файловой системы. Сбои, которые приводят к потере данных кэшем ОС, считаются фатальными для локального компьютера и не покрываются гарантиями локальной файловой системы, как определено Flink. Это означает, что вычисленные результаты, контрольные точки и точки сохранения, которые записываются только в локальную файловую систему, не могут быть гарантированно восстановлены после сбоя локальной машины, что делает локальные файловые системы непригодными для производственного развертывания.

Что касается перезаписи содержимого существующих файлов, то большинство файловых систем не поддерживают это и не обеспечивают непротиворечивую видимость обновленного содержимого. По этой причине возможности Flink по работе с файловой системой не поддерживают добавление к существующим файлам или поиск в выходных потоках, чтобы ранее записанные данные могли быть изменены в том же самом файле.

Хотя технически перезапись файлов возможна: файл перезаписывается путем его удаления и создания нового файла. Однако некоторые файловые системы не могут сделать это изменение синхронно видимым для всех сторон, имеющих доступ к файлу. Например, Amazon S3 гарантирует только возможную согласованность видимости замены файла: некоторые машины могут видеть старый файл, тогда как другие узлы видят новый файл. Чтобы избежать этих проблем согласованности, реализации механизмов сбоя/восстановления во Flink строго избегают повторной записи в один и тот же путь к файлу.

Реализации FileSystem в Apache Flink являются потокобезопасными: один и тот же экземпляр класса FileSystem часто совместно используется несколькими потоками в Flink и имеет возможность одновременно создавать потоки ввода/вывода и отображать метаданные файла. Реализации FSDataOutputStream не ориентированы на многопотоковое исполнение:  экземпляры потоков не передаются между операциями чтения или записи, поскольку нет никаких гарантий видимости операций между потоками.

Взаимодействие с HDFS, AWS S3  и GCS

Как уже было отмечено выше, файловая система идентифицируются ее схемой, такой как file:// для локальной файловой системы, hdfs:// для HDFS, s3://, s3n://, и s3a:// для AWS S3 и gcs:// для Google Cloud Storage.

Можно ограничить общее количество соединений, которые файловая система может одновременно открывать. Это полезно, когда файловая система не может обрабатывать большое количество одновременных операций чтения/записи или открытия соединений одновременно. Например, небольшие кластеры HDFS с небольшим количеством обработчиков RPC иногда могут быть перегружены большим заданием Flink, пытающимся создать множество подключений во время контрольной точки. Чтобы ограничить подключения определенной файловой системы, добавьте следующие записи в конфигурацию Flink. Ограничиваемая файловая система определяется ее схемой.

Ограничить количество входных или выходных соединений (потоков) можно по отдельности, задав значения конфигураций схемы файловой системы fs.<scheme>.limit.input и fs.<scheme>.limit.output, а также установить ограничение на общее количество одновременных потоков fs.<scheme>.limit.total. Если файловая система пытается открыть больше потоков, операция блокируется до тех пор, пока некоторые потоки не закроются. Если открытие потока занимает больше времени, чем fs.<scheme>.limit.timeout, открытие потока не удастся. Чтобы неактивные потоки не занимали весь пул, предотвращая открытие новых соединений, можно добавить тайм-аут неактивности, который принудительно закроет их, если они не читают/записывают какие-либо байты в течение хотя бы этого периода времени. Для этого используется конфигурация fs.<scheme>.limit.stream-timeout. Это ограничение применяется для каждого диспетчера задач или файловой системы. Поскольку создание файловых систем происходит по схеме и центру доступа, разные центры имеют независимые пулы соединений. Например, для hdfs://myhdfs:50010/ и hdfs://anotherhdfs:4399/ будут отдельные пулы.

Можно также использовать Flink для чтения и записи данных в S3 и GCS, а также в сочетании с серверами потокового состояния. Также можно использовать объекты S3 и GCS как обычные файлы, указав пути в следующем формате: s3://<your-bucket>/<endpoint> или gs://<your-bucket>/<endpoint>. Конечная точка может быть либо одним файлом, либо каталогом, например:

// Read from S3 bucket
env.readTextFile("s3://<bucket>/<endpoint>");

// Write to S3 bucket
stream.writeAsText("s3://<bucket>/<endpoint>");

// Use S3 as checkpoint storage
env.getCheckpointConfig().setCheckpointStorage("s3://<your-bucket>/<endpoint>");

Для работы с S3 в Flink можно использовать один из плагинов файловой системы flink-s3-fs-hadoop и flink-s3-fs-prestoS3. Для работы с Goggle Cloud Storage есть плагин flink-gs-fs-hadoop. Все эти реализации являются автономными и не имеют никаких зависимостей, поэтому для их использования нет необходимости добавлять Hadoop в путь к классам. Но иногда, например, для использования S3 в качестве каталога хранения ресурсов YARN, может потребоваться настройка конкретной реализации файловой системы Hadoop S3.

Оба плагина flink-s3-fs-hadoop и flink-s3-fs-presto регистрируют оболочки FileSystem по умолчанию для URI со схемой s3://. Плагин flink-s3-fs-hadoop также регистрируется для s3a://, а flink-s3-fs-presto — для s3p://. Примечательно, что это все можно использовать одновременно. Например, задание использует файловую систему, которая поддерживает только Hadoop, но использует Presto для создания контрольных точек. В этом случае надо явно использовать s3a:// в качестве схемы для приемника Hadoop и s3p:// для контрольных точек Presto.

Плагин flink-gs-fs-hadoop регистрирует оболочку FileSystem для URI со схемой gs://. Для доступа к GCS он использует Hadoop-библиотеку gcs-connector от Google, а для поддержки – библиотеку Google-Cloud-Storage от Google RecoverableWriter.

Освойте возможности Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных и машинного обучения на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/internals/filesystems/
  2. https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/filesystems/s3/
  3. https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/filesystems/gcs/
Поиск по сайту