Подсчет записей в CSV-файлах средствами Apache Spark

Подсчет записей в CSV-файлах средствами Apache Spark

    Чтобы сделать наши курсы по Apache Spark еще более полезными, сегодня разберем 2 варианта решения типовой задачи инженерии данных. Как быстро и эффективно считать данные из множества CSV-файлов с одинаковой схемой за несколько строк кода на PySpark.

    Постановка задачи: рутинная работа с CSV-файлами

    Наряду с JSON-файлами, про которые мы писали в прошлый раз, дата-инженеру часто приходится работать и с другим псевдо-табличным форматом CSV (Comma Separated Value), где данные разделены запятыми или другим символьным разделителем. Предположим, в облачном хранилище Azure Data Lake Storage (ADLS) есть набор CSV-файлов с данными, которые имеют одну и ту же схему. Каждый файл содержит разное количество записей. Необходимо объединить все данные, сохранив сведения об источнике. В данном случае это означает, что надо считать все файлы в один датафрейм с указанием имени файла в качестве дополнительного столбца в целевой таблице. Результатом будем считать таблицу с количеством записей в каждом файле.

    PySpark пример, обучение Apache Spark на практических примерах, курсы дата-инженеров Spark SQL
    Как посчитать количество записей во множестве CSV-файлов: прикладная дата-инженерия

    Наиболее распространенным решением является монтирование ADLS и просмотр файла за файлом, чтобы создать единый датафрейм с количеством записей в каждом файле. Для этого можно использовать функцию count() в API DataFrame, которая считает число строк в таблице.

    Для реализации этой идеи средствами Apache Spark сперва следует подключиться к ADLS. К примеру, если Spark используется на платформе Databricks, это можно сделать разными способами: через токен SAS, секреты из хранилища ключей или сквозную передачу учетных данных. Ниже приведен фрагмент кода для монтирования контейнера хранилища в блоки данных:

    ##Mount a ADLS gen2 storage container with databricks#Don't change configs
    configs = {
      "fs.azure.account.auth.type": "CustomAccessToken",
      "fs.azure.account.custom.token.provider.class": spark.conf.get("spark.databricks.passthrough.adls.gen2.tokenProviderClassName")
    }"""
    One need following details from ADLS
    1. Your container Name (Optionally, corresponding directory name)
    2. Your Storage account Name
    """# Optionally, you can add <directory-name> to the source URI of your mount point.
    dbutils.fs.mount(
      source = "abfss://<your Container Name>@<Your storage account name>.dfs.core.windows.net/<Optional - Specific Directory under container>",
      mount_point = "/mnt/<Desired name for your mount - must be unique>",
      extra_configs = configs)#List mounts
    dbutils.fs.mounts()

    Далее рассмотрим 2 варианта решения поставленной задачи по подсчету количества строк в каждом CSV-файле.

    Типичное решение на Python

    Сперва рассмотрим способ, который чаще всего выбирают начинающие Puthon-разработчики, которые привыкли иметь дело с небольшими объемами данных. В частности, можно просматривать из ранее смонтированного каталога файл за файлом, добавляя дополнительный столбец с именем файла к основному датафрейму. Здесь пригодится SQL-функция PySpark lit() для добавления нового столбца в датафрейм через присвоение постоянного значения. Функция возвращает тип столбца в качестве результата.

        ## import lit from sql functions - useful to add withcolumn a constant value
    from pyspark.sql.functions import lit
        ## Provide mount with directory where the files exists
        mount_path = '/mnt/<Your mount name>/<directory>'
        ## loop through the files 
        for file in dbutils.fs.ls(mount_path):
            ## This could be better with defining a schema
            if 'flights1.csv' in file.name:
                df1 = spark.read.csv(f'{mount_path}/{file.name}', header = True, inferSchema = True)
                df1 = df1.withColumn('filename',lit(f"{mount_path}/{file.name}"))
                uniondf = df1
            else:
                df2 = spark.read.csv(f'{mount_path}/{file.name}', header = False, inferSchema = True)
                df2 = df2.withColumn('filename',lit(f"{mount_path}/{file.name}"))
                uniondf = uniondf.union(df2)
        ## Register a temp view
        uniondf.createOrReplaceTempView("flights_data")
        ## run a group by command on temp view to get number of records per file - This could be done with data frame groupBy as well
        resultdf  = spark.sql("select filename, count(*) from flights_data group by filename")
        resultdf.display()
    

    Такое решение вполне жизнеспособно, но не очень масштабируемо, когда в каталог записывается много небольших файлов. Временные и вычислительные затраты на прохождение по всему каталогу файлов и создание датафреймов будут расти пропорционально количеству файлов.  Поэтому имеет смысл поискать другой вариант, что мы и рассмотрим далее.

    Ускоренная альтернатива в Apache Spark

    Создадим датафрейм, используя функцию SQL input_file_name(), которая создает строковый столбец для имени файла текущей задачи Spark:

    ## Provide mount with directory where the files exists 
    mount_path = '/mnt/<mount name>/<directory>'
    spark.sql(f"create table flights_data_2 using csv location '{mount_path}/*.csv' options(header 'true', inferSchema 'true', sep ',')")
    ## run a group by command on registered table
    resultdf  = spark.sql("select input_file_name() as filename, count(*) from flights_data_2 group by filename")
    resultdf.display()
    

    В этом решении применяется встроенная функция Big Data фреймворка, которая выбирает метаданные таблицы и базовые данные. Такой способ быстрее и отлично масштабируется на любое количество данных.

    Узнайте больше про применение Apache Spark для задач дата-инженерии, разработки распределенных приложений и аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

    Источники

    1. https://insightsndata.com/interesting-spark-sql-function-fc223c603657
    2. https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.input_file_name/