Чтобы сделать наши курсы по Apache Spark еще более полезными, сегодня разберем 2 варианта решения типовой задачи инженерии данных. Как быстро и эффективно считать данные из множества CSV-файлов с одинаковой схемой за несколько строк кода на PySpark.
Постановка задачи: рутинная работа с CSV-файлами
Наряду с JSON-файлами, про которые мы писали в прошлый раз, дата-инженеру часто приходится работать и с другим псевдо-табличным форматом CSV (Comma Separated Value), где данные разделены запятыми или другим символьным разделителем. Предположим, в облачном хранилище Azure Data Lake Storage (ADLS) есть набор CSV-файлов с данными, которые имеют одну и ту же схему. Каждый файл содержит разное количество записей. Необходимо объединить все данные, сохранив сведения об источнике. В данном случае это означает, что надо считать все файлы в один датафрейм с указанием имени файла в качестве дополнительного столбца в целевой таблице. Результатом будем считать таблицу с количеством записей в каждом файле.
Наиболее распространенным решением является монтирование ADLS и просмотр файла за файлом, чтобы создать единый датафрейм с количеством записей в каждом файле. Для этого можно использовать функцию count() в API DataFrame, которая считает число строк в таблице.
Для реализации этой идеи средствами Apache Spark сперва следует подключиться к ADLS. К примеру, если Spark используется на платформе Databricks, это можно сделать разными способами: через токен SAS, секреты из хранилища ключей или сквозную передачу учетных данных. Ниже приведен фрагмент кода для монтирования контейнера хранилища в блоки данных:
##Mount a ADLS gen2 storage container with databricks#Don't change configs configs = { "fs.azure.account.auth.type": "CustomAccessToken", "fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName") }""" One need following details from ADLS 1. Your container Name (Optionally, corresponding directory name) 2. Your Storage account Name """# Optionally, you can add <directory-name> to the source URI of your mount point. dbutils.fs.mount( source = "abfss://<your Container Name>@<Your storage account name>.dfs.core.windows.net/<Optional - Specific Directory under container>", mount_point = "/mnt/<Desired name for your mount - must be unique>", extra_configs = configs)#List mounts dbutils.fs.mounts()
Далее рассмотрим 2 варианта решения поставленной задачи по подсчету количества строк в каждом CSV-файле.
Типичное решение на Python
Сперва рассмотрим способ, который чаще всего выбирают начинающие Puthon-разработчики, которые привыкли иметь дело с небольшими объемами данных. В частности, можно просматривать из ранее смонтированного каталога файл за файлом, добавляя дополнительный столбец с именем файла к основному датафрейму. Здесь пригодится SQL-функция PySpark lit() для добавления нового столбца в датафрейм через присвоение постоянного значения. Функция возвращает тип столбца в качестве результата.
## import lit from sql functions - useful to add withcolumn a constant value from pyspark.sql.functions import lit ## Provide mount with directory where the files exists mount_path = '/mnt/<Your mount name>/<directory>' ## loop through the files for file in dbutils.fs.ls(mount_path): ## This could be better with defining a schema if 'flights1.csv' in file.name: df1 = spark.read.csv(f'{mount_path}/{file.name}', header = True, inferSchema = True) df1 = df1.withColumn('filename',lit(f"{mount_path}/{file.name}")) uniondf = df1 else: df2 = spark.read.csv(f'{mount_path}/{file.name}', header = False, inferSchema = True) df2 = df2.withColumn('filename',lit(f"{mount_path}/{file.name}")) uniondf = uniondf.union(df2) ## Register a temp view uniondf.createOrReplaceTempView("flights_data") ## run a group by command on temp view to get number of records per file - This could be done with data frame groupBy as well resultdf = spark.sql("select filename, count(*) from flights_data group by filename") resultdf.display()
Такое решение вполне жизнеспособно, но не очень масштабируемо, когда в каталог записывается много небольших файлов. Временные и вычислительные затраты на прохождение по всему каталогу файлов и создание датафреймов будут расти пропорционально количеству файлов. Поэтому имеет смысл поискать другой вариант, что мы и рассмотрим далее.
Ускоренная альтернатива в Apache Spark
Создадим датафрейм, используя функцию SQL input_file_name(), которая создает строковый столбец для имени файла текущей задачи Spark:
## Provide mount with directory where the files exists mount_path = '/mnt/<mount name>/<directory>' spark.sql(f"create table flights_data_2 using csv location '{mount_path}/*.csv' options(header 'true', inferSchema 'true', sep ',')") ## run a group by command on registered table resultdf = spark.sql("select input_file_name() as filename, count(*) from flights_data_2 group by filename") resultdf.display()
В этом решении применяется встроенная функция Big Data фреймворка, которая выбирает метаданные таблицы и базовые данные. Такой способ быстрее и отлично масштабируется на любое количество данных.
Узнайте больше про применение Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
- Машинное обучение в Apache Spark
- Графовые алгоритмы в Apache Spark
Источники