Как распараллелить чтение данных из JDBC-источников в Apache Spark

курсы Apache Spark SQL для инженеров данных и разработчиков, разработка Spark, Apache Spark JDBC для разработчиков, Spark JDBC инженерия больших данных, обучение разработчиков Apache Spark, Школа Больших Данных Учебный Центр Коммерсант

Мы уже писали, как ускорить выполнение заданий Spark SQL по чтению данных из JDBC-источников. В продолжение этой важной темы для обучения дата-инженеров и разработчиков распределенных приложений, сегодня рассмотрим, зачем настраивать опции функции spark.read() и как это сделать наиболее эффективно.

Скорость выполнения SQL-запросов и параметры чтения данных из JDBC-источников в Apache Spark

Spark SQL включает источник данных, который может считывать данные из внешних баз данных с помощью JDBC, возвращая результаты в виде датафрейма. Источник данных JDBC легче использовать из Java или Python, поскольку он не требует от пользователя предоставления ClassTag. Для некоторых баз данных (DB2, MariaDB, MS SQL, Oracle и PostgreSQL) есть встроенные провайдеры подключения, которые совместимы с защищенным протоколом Kerberos. При их отсутствии можно воспользоваться API-интерфейсом разработчика JdbcConnectionProvider для собственной реализации процесса аутентификации.

Возвращаясь к непосредственному чтению данных из JDBC-источников с помощью заданий Apache Spark, напомним, что при этом дата-инженеру также необходимо выполнить типовые процедуры конфигурирования приложения: определить оптимальное количество разделов в случайном порядке, выделение памяти для экземпляров драйвера и исполнителя, количество ядер исполнителя  и т.д. Чтобы прочитать данные из JDBC-источника, например, объектно-реляционной СУБД PostgreSQL, и получить результат в виде датафрейма, можно использовать следующий код:

jdbcDF = spark.read \
    .format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("user", os.environ['user']) \
    .option("password", os.environ['pass']) \
    .option("query", query) \
    .load()

Core Spark - основы для разработчиков

Код курса
CORS
Ближайшая дата курса
16 декабря, 2024
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.

Однако, он будет работать очень медленно, поскольку в нем используется только один поток. Проверить это можно в пользовательском интерфейсе Spark, увидев, что во время выполнения кода выполняется только одна задача. А полученный датафрейм будет иметь всего один раздел. Но именно раздел является единицей параллелизма в Apache Spark. Spark назначает одну задачу на раздел, т.е. каждый раздел будет обрабатываться одним ядром-исполнителем. Поэтому, чтобы использовать преимущество этого фреймворка для параллельной обработки данных, следует увеличить количество потоков, работающих одновременно. Для этого можно модифицировать вышеприведенный запрос, добавив в него несколько опций:

jdbcDF = spark.read \
    .format("jdbc") \
    .option("driver", "org.postgresql.Driver") \
    .option("url", "jdbc:postgresql:dbserver") \
    .option("user", os.environ['user']) \
    .option("password", os.environ['pass']) \
    .option("dbtable", final_query) \
    .option("numPartitions", partitionsCount) \
    .option("partitionColumn", "column_name") \
    .option("lowerBound", f"{min_bound}") \
    .option("upperBound", f"{max_bound}") \
    .load()

Чтобы одновременно читать данные и разделять их в соответствии с потребностями, вместо параметра SQL-запроса следует его через параметр имени таблицы dbtable, присвоив ему псевдоним (final_query) и заключив в круглые скобки. По сути, исходный запрос теперь передается как подзапрос. Чтобы распараллелить чтение, можно указать максимальное количество разделов, задав параметр partitionsCount. Это также определяет максимальное количество одновременных подключений JDBC. Если количество разделов для записи превышает этот предел, следует уменьшить его, вызывая объединение (numPartitions) перед записью. Обычно устанавливается количество разделов в диапазоне от 100% до 400% от количества ядер ЦП.

Параметр partitionColumn сильно влияет на скорость выполнения запросов Spark SQL. Рекомендуется, чтобы этот столбец разделения был числовым или имел тип дата/отметка времени. Для наилучшей производительности значения столбцов должны быть максимально равномерно распределены и иметь большую кардинальность, т.е. уникальные данные. Дополнительное ускорение может придать индексация этого столбца. Нижняя граница и верхняя граница используются только для определения шага раздела, а не для фильтрации строк в таблице. Столбец с логическим типом данных не подходит для значения параметра partitionColumn, поскольку литералы True и False преобразуются в числовые значения 0 и 1. При этом, независимо от количества разделов, только два раздела будут заполнены и только два ядра-исполнителя будут выполнять всю работу, а остальные будут простаивать, если для них не запланировано другое задание. Это снижает производительность запросов Spark SQL.

Если в исходных данных отсутствуют столбцы с числами, датой или отметкой времени, то можно использовать CTE-выражения и оконные функции для создания числового столбца с помощью row_number(). Хотя этот прием вызывает небольшие накладные расходы, все равно окончательный SQL-запрос будет выполняться намного быстрее с использованием вновь созданного столбца, а не при попытке загрузить все данные в один раздел с помощью одного ядра исполнителя.

Если нижняя и верхняя границы столбца разделения неизвестны заранее, можно выполнить 2 запроса вместо одного:

  • сперва найти минимальное и максимальное значения столбца, который будет использоваться в качестве partitionColumn;
  • затем передавать результаты из первого запроса в качестве аргументов нижней и верхней границ.

Таким образом, чтобы повысить эффективность использования Apache Spark при чтении данных из JDBC-источников, следует не просто применять функцию spark.read(), а настраивать ее, задавая оптимальные значения опций SQL-запроса для анализируемых данных. Читайте в нашей новой статье, как ускорить выполнение запросов Spark SQL с помощью плагина Gluten с поддержкой нескольких векторизованных движков и аппаратных ускорителей.

Потоковая обработка в Apache Spark

Код курса
SPOT
Ближайшая дата курса
6 февраля, 2025
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.

Узнайте больше про Apache Spark для разработки приложений аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/@pintoiu.gabriel/spark-concurrent-jdbc-data-reads-5423552c93f5
  2. https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
Поиск по сайту