Чем хороши JSON-файлы и как с ними работать в Apache Spark и Hive: проблемы обработки вложенных структур данных и способы их решения на практических примерах. Как автоматизировать переименование некорректных названий полей во вложенных структурах данных JSON-файлов на любом количестве таблиц со множеством полей, чтобы создать таблицу в Hive Metastore и обеспечить пользователям доступ к данным с помощью обычных SQL-запросов.
Про JSON-файлы и их проблемы в Apache Hive и Spark
Полу-структурированный формат JSON широко используется в области Data Science: он понятен для человека и отлично совместим с Apache Spark и Hive. Spark SQL может автоматически определять схему набора данных JSON и загружать ее как Dataset[Row]. Это преобразование можно выполнить с помощью функции read.json()для Dataset[String] или файла JSON. Напомним, JSON-текст в закодированном виде представляет собой упорядоченный набор значений или набор пар ключ/значение. Ключом может быть только строка, при этом хотя регистрозависимость не регулируется обычно имена с буквами в разных регистрах считаются разными. А в качестве значений в JSON могут выступать следующие типы данных:
- Запись или объект (Object) – неупорядоченное множество пар ключ/значение, отделенных друг от друга запятыми. Каждая запись заключается в {}. Ключ описывается строкой, между ним и значением стоит двоеточие.
- одномерный массив (Array) – упорядоченное множество однотипных или разнотипных значений, разделенных запятыми. Массив заключается в []. Значения разделяются запятыми.
- целое или вещественное число (Number);
- логические переменные (Boolean) – true (истина), false (ложь);
- пустое значение null;
- строка (String) – упорядоченное множество из символов юникода в «». Символы могут быть указаны с использованием escape-последовательностей, начинающихся с \, \», \\, \/, \t, \n, \r, \f и \b, или записаны шестнадцатеричным кодом в кодировке Unicode в виде \uFFFF.
Стоит помнить, что JSON-файл не всегда является типичным для этого формата. Каждая строка должна содержать отдельный автономный действительный объект JSON. Будучи полу-структурированным, формат JSON позволяет создавать таблицы со структурами данных, которые упрощают доступ к ним. Например, можно создать таблицу в мета-хранилище Spark, которая указывает на определенное местоположение (LOCATION), где хранится множество файлов JSON одинаковой структуры, но с разными данными:
json_path:
— file1.json
— file2.json
— file3.json
Можно даже создать таблицу с местоположением, указывающим на путь со структурой разделов. Spark автоматически обнаружит разделы в пути и создаст партиционированную таблицу, добавив разделы в качестве последних полей в CREATE TABLE для Hive Metastore. Поэтому создание таблицы для инкапсуляции доступа к нескольким файлам JSON значительно упростит доступ к данным и позволит пользователям запрашивать данные с помощью обычных SQL-запросов. Однако, файлы JSON могут иметь очень сложные вложенные структуры, при попытке определить которые часто случаются ошибки. В частности, невозможно создать таблицу с вложенным столбцом, имя которого содержит недопустимые символы (‘,’, ‘:’, ‘;’) в хранилище метаданных Hive. Как это выглядит на практике и каким образом можно обойти эту проблему, рассмотрим далее.
Код курса
HIVE
Ближайшая дата курса
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Обработка вложенных структур: проблемы и решения
Предположим, файл JSON содержит имена полей с символами, недопустимыми в хранилище метаданных Apache Hive. Узнать об этой ошибке получается не при чтении данных из файлов JSON и загрузке их в датафрейм, а только при попытке создать таблицу в Hive Metastore или при сохранении датафрейма в формате Delta/Parquet. Простым переименованием столбца эту проблему не решить, поскольку файл JSON представляет собой вложенную структуру. В частности, схема данных в PySpark — это тип структура (StructType), который содержит список из полей типа структура (StructField), каждое из которых может содержать примитивный тип данных или другую структуру. А с массивами (ArrayType) все может стать еще сложнее.
Вот пример SQL-запроса создания таблицы (CREATE TABLE), который вернет такую ошибку, когда имена некоторых полей с недопустимыми символами, причем они вложены в другие поля:
CREATE TABLE `default`.`test_table` ( `acknowledgment` BOOLEAN, `host` STRING, `device` STRING, `closed_time` BIGINT, `created_time` BIGINT, `description` STRING, `properties` STRUCT< `person_name`: STRING, `number`: BIGINT, `cause`: STRING, `references`: STRING, `from_id`: STRING, `incident:x-345-`: BIGINT, `incident_severe:x-35-`: BIGINT, `techniques`:ARRAY<STRUCT< `name:anidated`: STRING, `tactic`: STRING, `technique`: STRING>>, `priority`: STRING, `valid`: BOOLEAN, `to_id`: STRING, `usernames`: STRING>, `dst` STRING, `src` STRING, `date_uploaded` DATE) USING JSON LOCATION '<PATH_TO_TABLE>'
Ошибка случилась из-за двоеточия в полях `incident:x-345-`,`incident_severe:x-35-` и `name:anidated`. Если заменить названия этих полей на `incident_x-345-`, `incident_severe_x-35-` и `name_anidated` соответственно, все будет корректно и получится создать таблицу с вложенным столбцом в хранилище метаданных Hive. Предупредить подобную ситуацию с недопустимыми символами в названиях полей можно следующими способами:
- создать структуру таблицы вручную, удалив специальные символы. Этот вариант подходит для небольших JSON-файлов с количеством полей не более 100. Кроме того, он не масштабируется.
- непосредственно переименовать некорректные названия полей в файле JSON. Это можно автоматизировать, выполнив регулярное выражение для удаления специальных символов при создании JSON-файлов. Регулярное выражение будет сложным, поскольку нужно удалять недопустимые символы только в именах полей, а не во всем файле JSON. Кроме того, придется добавлять этап обработки при создании самих файлов JSON.
- непосредственно переименовать полученную схему данных перед сохранением или созданием таблицы в хранилище метаданных Hive. Этот вариант масштабируется на любое количество таблиц с любым количеством полей, не добавляет дополнительный шаг обработки и не влияет на формат исходных данных.
Автоматизировать 3-ий вариант с переименованием схемы данных поможет следующий PySpark-скрипт:
import pyspark.sql.types as sql_types path_table = "<PATH_TO_DATA>" table_name = "<TABLE_NAME>" def recur_rename(schema: StructType, old_char, new_char): schema_new = [] for struct_field in schema: if type(struct_field.dataType)==sql_types.StructType: schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), sql_types.StructType(recur_rename(struct_field.dataType, old_char, new_char)), struct_field.nullable, struct_field.metadata)) elif type(struct_field.dataType)==sql_types.ArrayType: if type(struct_field.dataType.elementType)==sql_types.StructType: schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), sql_types.ArrayType(sql_types.StructType(recur_rename(struct_field.dataType.elementType, old_char, new_char)),True), struct_field.nullable, struct_field.metadata)) # Recursive call to loop over all Array elements else: schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), struct_field.dataType.elementType, struct_field.nullable, struct_field.metadata)) # If ArrayType only has one field, it is no sense to use an Array so Array is exploded else: schema_new.append(sql_types.StructField(struct_field.name.replace(old_char, new_char), struct_field.dataType, struct_field.nullable, struct_field.metadata)) return schema_new def rename_columns(schema: StructType, old_char, new_char): return sql_types.StructType(recur_rename(schema, old_char, new_char)) df = spark.read.format("json").load(path_table) # Read data whose schema has to be changed. newSchema = rename_columns(df.schema, ":", "_") # Replace special characters in schema (More special characters not allowed in Spark/Hive meastore: ':', ',', ';') df2= spark.read.format("json").schema(newSchema).load(path_table) # Read data with new schema. df2.write.saveAsTable(table_name) # // Save dataframe as table (aux table creation. It will be created as a internal table in Parquet/Delta format) to generate SQL Create Table. create_table = spark.sql("SHOW CREATE TABLE {0}".format(table_name)).take(1)[0][0] spark.sql("DROP TABLE {0}".format(table_name)) # Remove aux table spark.sql(create_table.replace("USING delta","USING JSON LOCATION '{0}'".format(path_table))) # Create new table in Spark/Hive metastore with
Код курса
NOSQL
Ближайшая дата курса
Продолжительность
ак.часов
Стоимость обучения
0 руб.
Читайте нашу новую статью про функции парсинга JSON-файлов в Apache Spark. А освоить все тонкости работы с Apache Hive и Spark для эффективной аналитики больших данных вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Hadoop SQL администратор Hive
- Интеграция Hadoop и NoSQL
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
Источники