Какие источники исходных данных поддерживает Apache Spark для пакетной и потоковой обработки, обеспечивая отказоустойчивые вычисления в большом масштабе средствами SQL и Structured Streaming.
Источники данных Apache Spark SQL и структурированной потоковой передачи
Будучи фреймворком для создания распределенных приложений обработки больших объемов данных, Apache Spark может подключаться к разным источникам этих данных, в зависимости от используемого API. Например, Spark SQL поддерживает работу с различными источниками данных через интерфейс DataFrame. DataFrame позволяет работать с реляционными преобразованиями, а также создавать временные представления, чтобы выполнять SQL-запросы к данным в источнике.
Список источников данных, которые поддерживает Spark SQL, довольно широк:
- файлы в форматах Parquet, ORC, AVRO, JSON, CSV, TEXT, Protobuf, а также бинарные файлы;
- таблицы NoSQL-СУБД Apache Hive, включая подключение к постоянному хранилищу метаданных, поддержку серверов и пользовательских функций Hive;
- другие базы данных, доступ к которым возможен через JDBC. Результаты возвращаются в виде DataFrame, и их можно легко обработать в Spark SQL или объединить с другими источниками данных. Источник данных JDBC проще, чем JdbcRDD, т.к. его можно использовать из Java или Python, поскольку он не требует от пользователя предоставления ClassTag. Чтобы использовать этот источник данных, нужно включить JDBC-драйвер конкретной базы данных в путь к классам Spark.
Однако, Spark SQL – это API для пакетной обработки данных. Если нужно реализовать потоковые вычисления, в Apache Spark используется модуль Structured Streaming, который поддерживает следующие источники входных данных:
- файловые источники, когда файлы, записанные в каталоге, читаются как поток и обрабатываются в порядке времени их изменения. Отменить порядок обработки можно, установив параметр latestFirst в значение true. Аналогично Spark SQL, Structured Streaming также поддерживает форматы TEXT, CSV, JSON, ORC, Parquet, и методы интерфейса DataStreamReader. Файлы должны быть помещены в заданный каталог атомарно, что в большинстве файловых систем может быть достигнуто с помощью операций перемещения.
- Kafka, когда данные потребляются из топиков Apache Kafka версии 0.10.0 и выше;
- Сокет, который обычно используется для тестирования, считывая текстовые данные в кодировке UTF-8 из соединения сокета. Сокет прослушивающего сервера находится у драйвера Spark. Этот источник данных рекомендуется использовать только для тестирования, поскольку он не обеспечивает сквозных гарантий отказоустойчивости. Для использования этого источника данных необходимо только указать сокет, т.е. хост и порт для подключения.
- Скоростной источник (rate source), который тоже нужен для тестирования. Он генерирует данные с указанным количеством строк в секунду, где каждая выходная строка содержит отметку времени отправки сообщения timestamp и значение value типа long, содержащее количество сообщений, начиная с 0 в первой строке. Этот источник данных является отказоустойчивым.
- Микропакетный скоростной источник (Rate Per Micro-Batch source) также используется для тестирования. Он генерирует данные в указанном количестве строк на микропакет, где каждая выходная строка содержит отметку времени отправки сообщения timestamp и значение value типа long, содержащее количество сообщений, начиная с 0 в первой строке. В отличие от предыдущего источника данных, Rate Per Micro-Batch source предоставляет согласованный набор входных строк для каждого микропакета независимо от выполнения запроса, позволяя тонко настроить его конфигурацию: триггер, задержку и пр. Этот источник данных также является отказоустойчивым, т.е. гарантирует, что данные могут быть воспроизведены с использованием смещений контрольных точек после сбоя.
Поскольку 3 из вышеперечисленных источников данных используются для разработки, тестирования и отладки, далее рассмотрим файловые и потоковые источники данных для Apache Spark Structured Streaming.
Файловые и потоковые источники данных для Structured Streaming
Например, файловые источники являются отказоустойчивыми и настраиваются с помощью следующих параметров:
- path – путь к входному каталогу, общий для всех форматов файлов. Можно задать общие пути, но не несколько путей, разделенных запятыми.
- maxFilesPerTrigger – максимальное количество новых файлов, которые будут учитываться в каждом триггере;
- latestFirst, по умолчанию равный false. Этот параметр указывает, следует ли сначала обрабатывать последние новые файлы, что полезно при наличии большого количества невыполненных файлов;
- fileNameOnly, по умолчанию равный false. Этот параметр указывает, надо проверять ли новые файлы на основе только их имени, а не полного пути к ним;
- maxFileAge – максимальный возраст файла, который можно найти в этом каталоге, прежде чем он будет проигнорирован. Для первого пакета все файлы будут считаться действительными. Если latestFirst установлен в значение true и задан maxFilesPerTrigger, то maxFileAge будет игнорироваться, поскольку валидные старые файлы, которые должны быть обработаны, могут быть проигнорированы. Максимальный возраст указывается относительно отметки времени последнего файла, а не отметки времени текущей системы. По умолчанию параметр maxFileAge равен 1 неделя.
- cleanSource — опция очистки завершенных файлов после обработки, может быть установлен в архивировать, удалить или отключен. Если значение не указано, по умолчанию этот параметр отключен. Задав архив, необходимо указать каталог для сохранения архивированных файлов sourceArchiveDir, который не должен совпадать с исходным шаблоном по глубине (количество каталогов в корневом каталоге). Глубина равна минимальной глубине на обоих путях, чтобы архивные файлы никогда не использовались в качестве новых источников данных. Архивирование путем перемещения файлов, как и их удаление увеличивает накладные расходы на обработку данных в каждом микропакете, даже если это происходит в отдельном потоке. Поэтому необходимо понять стоимость каждой операции в файловой системе, прежде чем включать эту функцию. Однако, иногда включение этой опции может снизить затраты на составление списка файловых источников. Количество потоков, используемых в очистке файлов, можно настроить с помощью конфигурации sql.streaming.fileSource.cleaner.numThreads, по умолчанию равной 1. При включении этой опции не следует использовать исходный путь из нескольких источников или запросов. Аналогично, нужно убедиться, что исходный путь не совпадает ни с одним файлом в выходном каталоге приемника файлового потока. Если не удалять или не перемещать файлы, потоковый запрос может не выполниться. Иногда Spark может не очистить некоторые файловые источники, например, если слишком много файлов поставлено в очередь на очистку.
Что касается Apache Kafka как источника данных для Spark Structured Streaming, то она может использоваться как для потоковой, так и для пакетной обработки. Например, следующий участок кода на PySpark показывает создание датафрейма через подключение к Kafka как к источнику данных, включая поддержку заголовков сообщений:
df = spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("subscribe", "topic1") \ .option("includeHeaders", "true") \ .load() df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "headers")
Поскольку инициализация потребителей Kafka обычно занимает много времени, Spark объединяет потребителей Kafka с исполнителями, используя пул Apache Commons, обеспечивая кэширования потребителей с помощью ключа, содержащего название топика, его раздел и идентификатор группы потребителей. Размер пула ограничен значением конфигурации spark.kafka.consumer.cache.capacity, работая как мягкое ограничение, чтобы не блокировать задачи Spark. Поток вытеснения бездействующего периодически удаляет потребителей, которые не используются дольше заданного времени ожидания. Если этот порог достигнут, удаляется запись, которая в данный момент не используется. Иначе пул будет постоянно расти и вырастет до максимального количества одновременных задач, которое может выполняться в исполнителе, то есть до количества слотов задач.
Если задача по какой-либо причине завершается сбоем, новая задача выполняется с использованием вновь созданного потребителя Kafka по соображениям безопасности. При этом аннулируются все потребителей в пуле, имеющих одинаковый ключ кэширования, чтобы удалить потребителя, который использовался при неудачном выполнении. Потребители, которые используют любые другие задачи, не будут закрыты, но также будут признаны недействительными, когда они будут возвращены в пул. Наряду с потребителями, Spark объединяет записи, полученные из Kafka, отдельно, чтобы позволить потребителям Kafka сохранять состояние с точки зрения Spark и максимизировать эффективность объединения. Он использует тот же ключ кэширования, что и пул потребителей Kafka, но не использует пул Apache Commons.
Узнайте больше про возможности Apache Spark для разработки приложений аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Потоковая обработка в Apache Spark
- Анализ данных с Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
- Архитектура данных с Apache Spark
Источники