Возможности и ограничения Dataset API в Apache Spark

обучение Spark SQL примеры курсы обучение, анализ данных Spark SQL, Spark SQL Dataset API для разработчика примеры курсы обучение, обучение большим данным, курсы дата-инженер аналитик Big Data, Школа Больших Данных Учебный Центр Коммерсант

В Apache Spark есть 3 структуры данных, каждая из которых имеет собственный API со своими достоинствами и недостатками. Сегодня разберем плюсы и минусы Dataset API, а также рассмотрим особенности JOIN-операций в нем.

Почему Dataset API в Apache Spark работает только со Scala и Java

Напомним, структура данных Dataset впервые появилась в Apache Spark 1.6, на пару лет позже своих предшественников (RDD и Dataframe). Dataset представляет собой набор строго типизированных объектов JVM, определяемых классом в Scala или Java. Эта распределенная коллекция объектов предоставляет преимущества RDD и DataFrame, являясь строго типизированной коллекцией предметно-ориентированных объектов, которые могут быть преобразованы параллельно с использованием функциональных или реляционных операций. Каждый Dataset также имеет нетипизированное представление, называемое DataFrame, которое является датасетом строки. Все операции, доступные для датасета, можно разделить на 2 категории:

  • преобразования, которые создают новые датасеты, например, сопоставление, фильтрация, выборка и агрегирование (groupBy).
  • действия, которые запускают вычисления и возвращают результаты, такие как подсчет, показ или запись данных в файловые системы.

Датасет поддерживает концепцию отложенных вычислений (lazy evaluation), которые запускаются только при вызове действия. Внутри себя датасет имеет логический план, описывающий вычисления, необходимые для получения данных. При вызове действия оптимизатор запросов Spark под названием Catalyst оптимизирует логический план и создает физический план для эффективного выполнения параллельным и распределенным образом. Изучить логический план, а также оптимизированный физический план поможет функция EXPLAIN.

Для эффективной поддержки объектов домена кодировщик сопоставляет доменный тип T с внутренней системой типов Spark. Например, для класса Person с двумя полями: имя (name) типа string и возраст (age) типа int, кодировщик помогает Spark сгенерировать код для сериализации этого объекта в двоичную структуру формата Tungsten. Такая двоичная структура требует гораздо меньше памяти, а также оптимизирована для эффективности обработки данных, например, в колоночном формате. Понять внутреннее двоичное представление данных поможет их схема, которую Dataset автоматически обнаруживает благодаря встроенному движку Spark SQL.

Однако, при всех достоинствах структуры данных Dataset, таких как быстрая обработка, поддержка SQL-оптимизатора и мощный API, она обладает существенным недостатком, который ограничивает ее использование. Из языков программирования Dataset API поддерживает только Scala и Java, которые намного сложнее Python.

Например, чтобы создать датасет в Apache Spark, можно указать фреймворку на файлы в системах хранения с помощью функции чтения, доступной в SparkSession:

val people = spark.read.parquet("...").as[Person]  // Scala   

Dataset<Person> people = spark.read().parquet("...").as(Encoders.bean(Person.class)); // Java

Или создать новый датасет с помощью преобразований, доступных в уже существующих датасетах. Например, следующие инструкции на языках Scala и Java создадут новый датасет, применяя фильтр к существующему:

val names = people.map(_.name)  // Scala, names is a Dataset[String]

Dataset<String> names = people.map((Person p) -> p.name, Encoders.STRING)); // Java

Операции с датасетами могут быть нетипизированными с помощью функций предметно-ориентированного языка (DSL), определенных в Dataset API. Эти операции очень похожи на операции, доступные в API датафрейма на языках R или Python. К примеру, чтобы выбрать столбец из датасета, на языке Scala надо использовать метод apply() и метод col() в Java.

val ageCol = people("age")  // Scala

Column ageCol = people.col("age"); // Java

Основная причина, по которой Dataset API недоступен на языках Python и R, заключается в том, что эта структура данных изначально была разработана для использования возможностей статической типизации Scala и Java. API Dataset сильно зависит от проверки типов во время компиляции, что возможно только в языках со статической типизацией. Это позволяет лучше оптимизировать производительность во время выполнения и облегчает процесс разработки. А Python является языком с динамической типизацией, который не выполняет проверку типов во время компиляции. Вместо этого Python зависит от проверки типов во время выполнения для обнаружения ошибок типов. Это усложняет применение тех же оптимизаций, что и в Scala и Java, и может привести к замедлению выполнения кода.

Еще одна причина, по которой Dataset API работает только на Scala и Java, является применение этих языков как основных средств создания самого фреймворка Apache Spark. Он написан на Scala, и многие его функции и оптимизации предназначены для работы именно с этим языком программирования. Хотя Spark поддерживает другие языки, включая Python и R, они не настолько глубоко интегрированы с природой этого вычислительного движка, как Scala.

Впрочем, даже с учетом строгой типизации Dataset API некоторые его методы не являются типобезопасными. Какие это методы и как с этим быть, рассмотрим далее.

Тонкости внешних соединений в Spark Dataset API

При написании преобразования данных в Spark с Dataset API возникает вопрос: надо ли оптимизировать безопасность типов или производительность? К примеру, одной из наиболее распространенных операций в Spark является операция соединения. Однако, стандартный API Dataset не предоставляет типобезопасный вариант внешнего соединения (Outer Join). А метод joinWith() безопасен только для внутреннего соединения. Результатом операции left.joinWith(right,…) является датасет пар Dataset[(L,R)], который для внешнего левого соединения может создавать строки с нулевыми полями с правой стороны. Чтобы избежать пропусков, следует преобразовать правую часть, например, так:

leftDS
  .joinWith(rightDS, joinCondition, "left_outer")
  .as[L, Option[R]] // here the conversion
  .map { case (left:L, right:Option[R]) => ... } // now we can safely map

Этот паттерн можно вынести в отдельный скрытый класс, содержащий несколько полезных методов, которые расширяют функциональность соединения стандартного датасета. Неявные классы позволяют расширять функциональность существующего API без необходимости доступа к его исходному коду. Это особенность Scala при осторожном использовании может сильно помочь разработчику. В рассматриваемом примере определим несколько методов расширения для типа датасет с именем left:

implicit class DatasetTypesafeJoins[L <: Product : TypeTag](left: Dataset[L]) {
  
  type JoinResult = (L, Option[R]) // type alias for better traceability
  
  def joinLeftOuterWith[R <: Product : TypeTag]
        (right: Dataset[R], condition: Column): Dataset[JoinResult] = {
    left
      .joinWith(right, condition, "left_outer")
      .as(Encoders.product[JoinResult])
  }
  
  // ... more extension methods
}

При этом необходимо импортировать трейт TypeTag из пакета scala.reflect.runtime.universe, который требуется Encoders.product в работе кодировщика для нужного типа результата:

import scala.reflect.runtime.universe.TypeTag

Также можно определить методы joinLeftOuterWith(), joinLeftAntiWith(), crossJoinWith(), joinLeftInnerWith().

В заключение отметим, что представленное решение работает только для Scala 2.13, поскольку TypeTag недоступен в Scala 3. Впрочем, даже с учетом этого ограничения можно упростить многие преобразования и избежать некоторых исключений NullPointerException.

Освойте тонкости применения Apache Spark для разработки приложений аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/towards-data-engineering/why-is-sparks-dataset-api-not-supported-in-other-languages-except-scala-and-java-80ba146c5651
  2. https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/Dataset.html
  3. https://www.codementor.io/@vsimko/how-to-make-joins-in-spark-dataset-api-more-type-safe-1zwki1jjiq
Поиск по сайту