Обработка вложенных структур в JSON-файлах для Hive Metastore c Apache Spark

Apache Hive курсы примеры обучение, SQL on Hadoop курсы примеры обучение, Hive Metastore JSON Spark, Apache Hive Spark, обучение Spark Hive курсы, обучение Spark SQL, примеры Spark Hive для разработчиков курсы обучение, обучение большим данным, Школа Больших Данных Учебный Центр Коммерсант

Чем хороши JSON-файлы и как с ними работать в Apache Spark и Hive: проблемы обработки вложенных структур данных и способы их решения на практических примерах. Как автоматизировать переименование некорректных названий полей во вложенных структурах данных JSON-файлов на любом количестве таблиц со множеством полей, чтобы создать таблицу в Hive Metastore и обеспечить пользователям доступ к данным с помощью обычных SQL-запросов.

Про JSON-файлы и их проблемы в Apache Hive и Spark

Полу-структурированный формат JSON широко используется в области Data Science: он понятен для человека и отлично совместим с Apache Spark и Hive. Spark SQL может автоматически определять схему набора данных JSON и загружать ее как Dataset[Row]. Это преобразование можно выполнить с помощью функции read.json()для Dataset[String] или файла JSON. Напомним, JSON-текст в закодированном виде представляет собой упорядоченный набор значений или набор пар ключ/значение. Ключом может быть только строка, при этом хотя регистрозависимость не регулируется обычно имена с буквами в разных регистрах считаются разными. А в качестве значений в JSON могут выступать следующие типы данных:

  • Запись или объект (Object) – неупорядоченное множество пар ключ/значение, отделенных друг от друга запятыми. Каждая запись заключается в {}. Ключ описывается строкой, между ним и значением стоит двоеточие.
  • одномерный массив (Array) – упорядоченное множество однотипных или разнотипных значений, разделенных запятыми. Массив заключается в []. Значения разделяются запятыми.
  • целое или вещественное число (Number);
  • логические переменные (Boolean) – true (истина), false (ложь);
  • пустое значение null;
  • строка (String) – упорядоченное множество из символов юникода в «». Символы могут быть указаны с использованием escape-последовательностей, начинающихся с \, \», \\, \/, \t, \n, \r, \f и \b, или записаны шестнадцатеричным кодом в кодировке Unicode в виде \uFFFF.

Стоит помнить, что JSON-файл не всегда является типичным для этого формата. Каждая строка должна содержать отдельный автономный действительный объект JSON. Будучи полу-структурированным, формат JSON позволяет создавать таблицы со структурами данных, которые упрощают доступ к ним. Например, можно создать таблицу в мета-хранилище Spark, которая указывает на определенное местоположение (LOCATION), где хранится множество файлов JSON одинаковой структуры, но с разными данными:

json_path:

— file1.json

file2.json

file3.json

Можно даже создать таблицу с местоположением, указывающим на путь со структурой разделов. Spark автоматически обнаружит разделы в пути и создаст партиционированную таблицу, добавив разделы в качестве последних полей в CREATE TABLE для Hive Metastore. Поэтому создание таблицы для инкапсуляции доступа к нескольким файлам JSON значительно упростит доступ к данным и позволит пользователям запрашивать данные с помощью обычных SQL-запросов. Однако, файлы JSON могут иметь очень сложные вложенные структуры, при попытке определить которые часто случаются ошибки. В частности, невозможно создать таблицу с вложенным столбцом, имя которого содержит недопустимые символы (‘,’, ‘:’, ‘;’) в хранилище метаданных Hive. Как это выглядит на практике и каким образом можно обойти эту проблему, рассмотрим далее.

Обработка вложенных структур: проблемы и решения

Предположим, файл JSON содержит имена полей с символами, недопустимыми в хранилище метаданных Apache Hive. Узнать об этой ошибке получается не при чтении данных из файлов JSON и загрузке их в датафрейм, а только при попытке создать таблицу в Hive Metastore или при сохранении датафрейма в формате Delta/Parquet. Простым переименованием столбца эту проблему не решить, поскольку файл JSON представляет собой вложенную структуру. В частности, схема данных в PySpark — это тип структура (StructType), который содержит список из полей типа структура (StructField), каждое из которых может содержать примитивный тип данных или другую структуру. А с массивами (ArrayType) все может стать еще сложнее.

Вот пример SQL-запроса создания таблицы (CREATE TABLE), который вернет такую ошибку, когда имена некоторых полей с недопустимыми символами, причем они вложены в другие поля:

CREATE TABLE `default`.`test_table` (
`acknowledgment` BOOLEAN,
`host` STRING,
`device` STRING,
`closed_time` BIGINT,
`created_time` BIGINT,
`description` STRING,
`properties` STRUCT<
`person_name`: STRING,
`number`: BIGINT,
`cause`: STRING,
`references`: STRING,
`from_id`: STRING,
`incident:x-345-`: BIGINT,
`incident_severe:x-35-`: BIGINT,
`techniques`:ARRAY<STRUCT<
`name:anidated`: STRING,
`tactic`: STRING,
`technique`: STRING>>,
`priority`: STRING,
`valid`: BOOLEAN,
`to_id`: STRING,
`usernames`: STRING>,
`dst` STRING,
`src` STRING,
`date_uploaded` DATE)
USING JSON
LOCATION '<PATH_TO_TABLE>'

Ошибка случилась из-за двоеточия в полях `incident:x-345-`,`incident_severe:x-35-` и `name:anidated`. Если заменить названия этих полей на `incident_x-345-`, `incident_severe_x-35-` и `name_anidated` соответственно, все будет корректно и получится создать таблицу с вложенным столбцом в хранилище метаданных Hive. Предупредить подобную ситуацию с недопустимыми символами в названиях полей можно следующими способами:

  • создать структуру таблицы вручную, удалив специальные символы. Этот вариант подходит для небольших JSON-файлов с количеством полей не более 100. Кроме того, он не масштабируется.
  • непосредственно переименовать некорректные названия полей в файле JSON. Это можно автоматизировать, выполнив регулярное выражение для удаления специальных символов при создании JSON-файлов. Регулярное выражение будет сложным, поскольку нужно удалять недопустимые символы только в именах полей, а не во всем файле JSON. Кроме того, придется добавлять этап обработки при создании самих файлов JSON.
  • непосредственно переименовать полученную схему данных перед сохранением или созданием таблицы в хранилище метаданных Hive. Этот вариант масштабируется на любое количество таблиц с любым количеством полей, не добавляет дополнительный шаг обработки и не влияет на формат исходных данных.

Автоматизировать 3-ий вариант с переименованием схемы данных поможет следующий PySpark-скрипт:

import pyspark.sql.types as sql_types

path_table = "<PATH_TO_DATA>"
table_name = "<TABLE_NAME>"

def recur_rename(schema: StructType, old_char, new_char):
    schema_new = []
    for struct_field in schema:
        if type(struct_field.dataType)==sql_types.StructType:
            schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), sql_types.StructType(recur_rename(struct_field.dataType, old_char, new_char)), struct_field.nullable, struct_field.metadata))
        elif type(struct_field.dataType)==sql_types.ArrayType: 
            if type(struct_field.dataType.elementType)==sql_types.StructType:
                schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), sql_types.ArrayType(sql_types.StructType(recur_rename(struct_field.dataType.elementType, old_char, new_char)),True), struct_field.nullable, struct_field.metadata)) # Recursive call to loop over all Array elements
            else:
                schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), struct_field.dataType.elementType, struct_field.nullable, struct_field.metadata)) # If ArrayType only has one field, it is no sense to use an Array so Array is exploded
        else:
            schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), struct_field.dataType, struct_field.nullable, struct_field.metadata))
    return schema_new

def rename_columns(schema: StructType, old_char, new_char):
    return sql_types.StructType(recur_rename(schema, old_char, new_char))

df = spark.read.format("json").load(path_table) # Read data whose schema has to be changed.
newSchema = rename_columns(df.schema, ":", "_") # Replace special characters in schema (More special characters not allowed in Spark/Hive meastore: ':', ',', ';')
df2= spark.read.format("json").schema(newSchema).load(path_table) # Read data with new schema.
df2.write.saveAsTable(table_name) # // Save dataframe as table (aux table creation. It will be created as a internal table in Parquet/Delta format) to generate SQL Create Table.
create_table = spark.sql("SHOW CREATE TABLE {0}".format(table_name)).take(1)[0][0]
spark.sql("DROP TABLE {0}".format(table_name)) # Remove aux table
spark.sql(create_table.replace("USING delta","USING JSON LOCATION '{0}'".format(path_table))) # Create new table in Spark/Hive metastore with 

Читайте нашу новую статью про функции парсинга JSON-файлов в Apache Spark. А освоить все тонкости работы с Apache Hive и Spark для эффективной аналитики больших данных вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Источники

  1. https://towardsdev.com/create-a-spark-hive-meta-store-table-using-nested-json-with-invalid-field-names-505f215eb5bf
  2. https://ru.wikipedia.org/wiki/JSON
  3. https://spark.apache.org/docs/latest/sql-data-sources-json.html
Поиск по сайту