Сегодня мы продолжим говорить про Apache Spark Structured Streaming и его применение для обновления данных в таблицах Delta Lake. А также на практических примерах разберем, как выполняются основные операции работы с данными средствами Spark Structured Streaming API.
Таблицы в Delta Lake
Delta Lake – это уровня хранилища данных с открытым исходным кодом, который обеспечивает надежность озера данных, поддерживает ACID-транзакции и масштабируемую обработку метаданных, объединяя потоковые и пакетные операции. Delta Lake работает на базе существующего озера данных на HDFS, Amazon S3 или Azure Data Lake Storage, будучи полностью совместимо со всеми API Apache Spark. Подробнее о том, что такое Delta Lake, мы рассказывали в этой статье.
Delta Lake состоит из трех основных компонентов:
- уровень хранения, где хранятся фактические данные и файлы;
- дельта-таблицы, которые создаются поверх данных, хранящихся на уровне хранилища;
- движок, который повышает производительность Delta Lake для рабочих нагрузок SQL и датафреймов благодаря улучшенному оптимизатору запросов, уровню кэширования и собственному векторизованному механизму выполнения. Именно этот движок является коммерческим продуктом компании Databricks, хотя само Delta Lake является проектом с открытым исходным кодом.
Также в Delta Lake есть журнал, который хранится в папке с именем _delta_log и содержит данные о событиях создания дельта-таблиц для файлов, хранящихся в озера данных. Этот лог помогает отображать правильные обновленные данные для пользователей или, в случае путешествия во времени, среди многих других применений.
Операции с дельта-таблицами средствами Apache Spark
Рассмотрим, как выполняются типовые операции с дельта-таблицами. В частности, создание:
CREATE TABLE IF NOT EXISTS db_name.table_name( Column1 string, Column2 string) USING delta LOCATION ‘/path/to/data/folder/’
Чтение данных в Spark датафрейм из дельта-таблиц выполняется следующим образом:
df = spark.table('db_name.table_name')
или
df = spark.read.format('delta').load('/path/to/data')
Для записи датафрейма в дельта-таблицы используем методы saveAsTable() или save(), которые есть в API Spark:
df.write.format("delta").saveAsTable("db_name.table_name")
или
df.write.format("delta").mode("overwrite").save("/path/to/data")
Чтобы выполнить операцию upsert в дельта-таблицах, используем команду MERGE:
MERGE INTO db_name.table_name as target USING temp_table as source ON target.column1 = source.column1 WHEN MATCHED THEN UPDATE SET target.column1 = source.column1, target.column2 = source.column2 WHEN NOT MATCHED THEN INSERT (column1, column2) VALUES (column1, column2)
Чтобы получить историю транзакций, выполненных в дельта-таблице, есть инструкция DESCRIBE:
DESCRIBE HISTORY table_name
Для так называемых путешествий во времени, чтобы увидеть старые версии дельта-таблицы, воспользуемся SQL-запросом на выборку данных:
SELECT * FROM table_name TIMESTAMP AS OF timestampSELECT * FROM table_name VERSION AS OF version
Можно повысить скорость запросов на чтение из таблицы, объединив небольшие файлы в более крупные с помощью команды OPTIMIZE:
OPTIMIZE table_name
Для удаления файлов данных, которые больше не находятся в последнем состоянии журнала транзакций для таблицы и старше порога хранения используется команда VACUUM:
VACUUM table_name
Далее рассмотрим, какие ошибки могут возникнуть при потоковом обновлении данных в Delta Lake.
Ошибки и решения
Как мы писали в прошлой статье, при обновлении данных в Delta Lake, можно столкнуться с дублированием или пропуском данных. В частности, при использовании следующего кода Spark Scala для добавления в дельта-таблицу при выполнении операции во время процесса, выполняемого каждый час:
val query = data .writeStream .option("checkpointLocation", "my/path/to") .foreachBatch { (dataBatch: Dataset[Model], _: Long) => dataBatch.write.format("delta").mode("Append").save("my/path/to") }) .trigger(trigger) .start()
Возникает следующая ошибка:
java.lang.UnsupportedOperation: Detected a data update (for example part-XXXXX-XXXXXXXX-XXXX-XXXX-XXXXXXXXXXX-XXXX.snappy.parquet) in the source table at version XXX. This is currently not supported. If you'd like to ignore updates, set the option 'ignoreChanges' to 'true' If you would like the data update to be reflected, please restart this query with a fresh checkpoint directory.
Исходя из формулировки ошибки, решением может стать установка поля ignoreChanges в значение true. Однако, нельзя игнорировать данные, которые требуется обновить: нижестоящие потребители этих данных будут обрабатывать дубликаты вместо корректных данных. Кроме того, такая ошибка показывает, что в коде существует проблема, но вместо устранения ее причины выбирается легкий обходной путь добавления поврежденных данных. Поэтому более целесообразно решение, которое позволяет обновлять данные, даже если это не устранит саму проблему сбоя обновлений, поскольку вышестоящие данные могут изменяться и обрабатываться по-разному.
В качестве такого инструмента можно использовать API слияния (MERGE), о котором мы писали здесь. Это работает даже при изменении временных меток и пользовательских изменениях. Однако, данные для записи должны иметь соответствующий ключ для выполнения обновлений. Если Spark Structured Streaming и Delta Lake сложно поддерживать, а сами данные не велики, вместо потоковой передачи можно просто сохранить датафрейм в нужном каталоге озера данных:
data.write.mode("overwrite").format("delta").save("path/to")
В этом случае любые поступающие изменения полностью заменят существующие данные. Какая функция позволяет дата-инженеру быстро узнать обо всех изменениях в дельта-таблицах, вы узнаете здесь. А как сделать подобный сервис самообслуживаемым, читайте в нашей новой статье.
Узнайте больше про использование Apache Spark в современных дата-архитектурах аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники