Подсчет записей в CSV-файлах средствами Apache Spark

Apache Spark для аналитиков данных и дата-инженеров примеры курсы обучение, курсы примеры обучение Spark SQL PySpark, обучение Spark курсы, примеры Spark обработка CSV, примеры Spark для разработчиков курсы обучение, обучение большим данным, Школа Больших Данных Учебный Центр Коммерсант

Чтобы сделать наши курсы по Apache Spark еще более полезными, сегодня разберем 2 варианта решения типовой задачи инженерии данных. Как быстро и эффективно считать данные из множества CSV-файлов с одинаковой схемой за несколько строк кода на PySpark.

Постановка задачи: рутинная работа с CSV-файлами

Наряду с JSON-файлами, про которые мы писали в прошлый раз, дата-инженеру часто приходится работать и с другим псевдо-табличным форматом CSV (Comma Separated Value), где данные разделены запятыми или другим символьным разделителем. Предположим, в облачном хранилище Azure Data Lake Storage (ADLS) есть набор CSV-файлов с данными, которые имеют одну и ту же схему. Каждый файл содержит разное количество записей. Необходимо объединить все данные, сохранив сведения об источнике. В данном случае это означает, что надо считать все файлы в один датафрейм с указанием имени файла в качестве дополнительного столбца в целевой таблице. Результатом будем считать таблицу с количеством записей в каждом файле.

PySpark пример, обучение Apache Spark на практических примерах, курсы дата-инженеров Spark SQL
Как посчитать количество записей во множестве CSV-файлов: прикладная дата-инженерия

Наиболее распространенным решением является монтирование ADLS и просмотр файла за файлом, чтобы создать единый датафрейм с количеством записей в каждом файле. Для этого можно использовать функцию count() в API DataFrame, которая считает число строк в таблице.

Для реализации этой идеи средствами Apache Spark сперва следует подключиться к ADLS. К примеру, если Spark используется на платформе Databricks, это можно сделать разными способами: через токен SAS, секреты из хранилища ключей или сквозную передачу учетных данных. Ниже приведен фрагмент кода для монтирования контейнера хранилища в блоки данных:

##Mount a ADLS gen2 storage container with databricks#Don't change configs
configs = {
  "fs.azure.account.auth.type": "CustomAccessToken",
  "fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
}"""
One need following details from ADLS
1. Your container Name (Optionally, corresponding directory name)
2. Your Storage account Name
"""# Optionally, you can add <directory-name> to the source URI of your mount point.
dbutils.fs.mount(
  source = "abfss://<your Container Name>@<Your storage account name>.dfs.core.windows.net/<Optional - Specific Directory under container>",
  mount_point = "/mnt/<Desired name for your mount - must be unique>",
  extra_configs = configs)#List mounts
dbutils.fs.mounts()

Далее рассмотрим 2 варианта решения поставленной задачи по подсчету количества строк в каждом CSV-файле.

Типичное решение на Python

Сперва рассмотрим способ, который чаще всего выбирают начинающие Puthon-разработчики, которые привыкли иметь дело с небольшими объемами данных. В частности, можно просматривать из ранее смонтированного каталога файл за файлом, добавляя дополнительный столбец с именем файла к основному датафрейму. Здесь пригодится SQL-функция PySpark lit() для добавления нового столбца в датафрейм через присвоение постоянного значения. Функция возвращает тип столбца в качестве результата.

    ## import lit from sql functions - useful to add withcolumn a constant value
from pyspark.sql.functions import lit
    ## Provide mount with directory where the files exists
    mount_path = '/mnt/<Your mount name>/<directory>'
    ## loop through the files 
    for file in dbutils.fs.ls(mount_path):
        ## This could be better with defining a schema
        if 'flights1.csv' in file.name:
            df1 = spark.read.csv(f'{mount_path}/{file.name}', header = True, inferSchema = True)
            df1 = df1.withColumn('filename',lit(f"{mount_path}/{file.name}"))
            uniondf = df1
        else:
            df2 = spark.read.csv(f'{mount_path}/{file.name}', header = False, inferSchema = True)
            df2 = df2.withColumn('filename',lit(f"{mount_path}/{file.name}"))
            uniondf = uniondf.union(df2)
    ## Register a temp view
    uniondf.createOrReplaceTempView("flights_data")
    ## run a group by command on temp view to get number of records per file - This could be done with data frame groupBy as well
    resultdf  = spark.sql("select filename, count(*) from flights_data group by filename")
    resultdf.display()

Такое решение вполне жизнеспособно, но не очень масштабируемо, когда в каталог записывается много небольших файлов. Временные и вычислительные затраты на прохождение по всему каталогу файлов и создание датафреймов будут расти пропорционально количеству файлов.  Поэтому имеет смысл поискать другой вариант, что мы и рассмотрим далее.

Ускоренная альтернатива в Apache Spark

Создадим датафрейм, используя функцию SQL input_file_name(), которая создает строковый столбец для имени файла текущей задачи Spark:

## Provide mount with directory where the files exists 
mount_path = '/mnt/<mount name>/<directory>'
spark.sql(f"create table flights_data_2 using csv location '{mount_path}/*.csv' options(header 'true', inferSchema 'true', sep ',')")
## run a group by command on registered table
resultdf  = spark.sql("select input_file_name() as filename, count(*) from flights_data_2 group by filename")
resultdf.display()

В этом решении применяется встроенная функция Big Data фреймворка, которая выбирает метаданные таблицы и базовые данные. Такой способ быстрее и отлично масштабируется на любое количество данных.

Узнайте больше про применение Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://insightsndata.com/interesting-spark-sql-function-fc223c603657
  2. https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.input_file_name.html
Поиск по сайту