Регулярные выражения в Apache Spark

Регулярные выражения в Apache Spark

    Каждый дата-инженер и аналитик данных активно использует регулярные выражения для поиска значений в тексте по заданному шаблону. Сегодня рассмотрим, как это сделать с функциями regexp_replace(), rlike() и regexp_extract в Apache Spark на примере небольшого PySpark-приложения.

    Как работает функция regexp_replace()

    Регулярным выражением называется последовательность символов, задающая шаблон соответствия в тексте. Например, регулярные выражения часто используются с SQL-оператором LIKE и поддерживаются почти во всех реляционных СУБД. Apache Spark тоже имеет подобную функцию. В Python-интерфейсе Apache Spark, PySpark есть несколько функций, которые выполняют сопоставление регулярных выражений: regexp_replace(), rlike() и regexp_extract. Начиная с версии 3.4.0, о которой мы писали здесь, эти функции поддерживается в Spark Connect.  Расcмотрим подробнее, чем они отличаются и как используются.

    Начнем с regexp_replace(). Как следует из названия, эта функция заменяется все подстроки, если в строке будет найдено совпадение с регулярным выражением. Функция находится в пакете pyspark.sql.functions и имеет следующий синтаксис:

    regexp_replace(string: ColumnOrName, pattern: Union[str, Column], replacement: Union[str, Column])

    Эта функция заменяет все подстроки указанного строкового значения, соответствующие регулярному выражению, другой подстрокой. Так можно заменить строковое значение столбца другой строкой. Функция regexp_replace() возвращает столбец. Если соответствий заданному регулярному выражению не найдено, функция возвращает пустую строку. Чтобы продемонстрировать, как это работает, напишем небольшое PySpark приложение, которое генерирует фейковые данные о 10 клиентах и заменяет сокращения обозначений населенных пунктов в адресе полным наименованием, например, «п.» на «поселок», а «г.» на «город».

    !pip install pyspark
    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !pip install faker
    
    #импорт модулей
    from pyspark.sql import SparkSession
    import pyspark
    import sys
    import os
    import random
    
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
    
    # Импорт модуля faker
    from faker import Faker
    from faker.providers.address.ru_RU import Provider
    
    from pyspark.sql.functions import regexp_replace
    
    # Создаем объект SparkSession и устанавливаем имя приложения
    spark = SparkSession.builder.appName("MySparkApp").getOrCreate()
    
    # Import and configure random data generator Faker
    fake = Faker('ru_RU')
    fake.add_provider(Provider)
    
    # Create empty list to store generated data
    data1 = []
    
    # Generate data and add to list
    for i in range(10):
        k= random.randint(0, 1)
        name = fake.name()
        address=fake.city()
        data1.append((i, name, random.randint(18, 100), address))
    
    #Create dataframe from data list
    df = spark.createDataFrame(data1, schema=['id', 'client', 'age', 'address'])
    print("Исходный датасет")
    df.show(truncate=False)
    
    def replace_address(df):
        df = df.select('id', 'client', 'age', regexp_replace('address', 'г\.', 'город').alias('address'))
        df = df.select('id', 'client', 'age', regexp_replace('address', 'д\.', 'деревня').alias('address'))
        df = df.select('id', 'client', 'age', regexp_replace('address', 'с\.', 'село').alias('address'))
        df = df.select('id', 'client', 'age', regexp_replace('address', 'п\.', 'поселок').alias('address'))
        df = df.select('id', 'client', 'age', regexp_replace('address', 'ст\.', 'станция').alias('address'))
        df = df.select('id', 'client', 'age', regexp_replace('address', 'клх', 'колхоз').alias('address'))
        return df
    
    new_df = replace_address(df)
    print("После замены")
    new_df.show(truncate=False)

    Результаты вывода и замены с помощью regexp_replace():

    regexp_replace() PySpark, Spark SQL примеры
    Применение regexp_replace() в PySpark

    Функции rlike() и regexp_extract() в Spark SQL

    Функция rlike() возвращает Возвращает логическое значение Column на основе совпадения с регулярным выражением. логический столбец на основе совпадения строки с заданным регулярным выражением. Это функция класса org.apache.spark.sql.Column. С ее помощью можно найти записи, содержищие указанную подстроку. Например, найти всех клиентов, проживающих в населенных пунктах, названия которых содержат строку ‘бург’: Санкт-Петербург, Оренбург, Екатеринбург и пр. Для этого следует написать такой участок кода:

    print("Кто живет в бурге")
    new_df.filter(new_df.address.contains('бург')).collect()
    rlike() PySpark, Spark SQl Python примеры курсы обучение
    Результаты применения функции rlike() в PySpark

    Наконец, рассмотрим еще одну функцию, которая позволяет работать с регулярными выражениями в Apache Spark — regexp_extract(). — извлечение определенной группы, соответствующей регулярному выражению Java, из указанного столбца строки. Если регулярное выражение не совпало или указанная группа не совпала, возвращается пустая строка. Функция находится в модуле pyspark.sql.functions и имеет следующий синтаксис:

    regexp_extract(str: ColumnOrName, pattern: str, idx: int). Функция принимает 3 аргумента:

    • str — столбец, по которому надо найти записи, соответствующие заданному выражению;
    • pattern — шаблон регулярного выражения для извлечения подстроки из указанного столбца;
    • idx — часть совпадения, которую нужно извлечь из группы совпадений, от 0 до 9.

    Чтобы продемонстрировать, как она работает, немного изменим код генерации случайных адресов. Код PySpark-приложения теперь будет следующим:

    from pyspark.sql.functions import regexp_extract
    import re
    
    # Создаем объект SparkSession и устанавливаем имя приложения
    spark = SparkSession.builder.appName("MySparkApp").getOrCreate()
    
    # Import and configure random data generator Faker
    fake = Faker('ru_RU')
    fake.add_provider(Provider)
    
    # Create empty list to store generated data
    data1 = []
    
    # Generate data and add to list
    for i in range(1000):
        k= random.randint(0, 1)
        name = fake.name()
        address=fake.address()
        data1.append((i, name, random.randint(18, 100), address))
    
    #Create dataframe from data list
    df = spark.createDataFrame(data1, schema=['id', 'client', 'age', 'address'])
    print("Исходный датасет")
    df.show(10, truncate=False)

    Теперь воспользуемся функцией regexp_extract() для извлечения каждого компонента адреса в отдельный столбец:

    # Извлечение города
    df = df.withColumn("город", regexp_extract(df["address"], r"([^,]+)", 1))
    
    # Извлечение улицы или переулка
    df = df.withColumn("улица", regexp_extract(df.address, r", (.+), д.", 1))
    
    # Извлечение номера дома
    df = df.withColumn("дом", regexp_extract('address', r'д\. (\d+)', 1))
    
    # Извлечение номера квартиры
    df = df.withColumn("квартира", regexp_extract("address", r"\d+(?:/\d+)?", 0))
    
    # Извлечение индекса
    df = df.withColumn("индекс", regexp_extract(df.address, r'\d+$', 0))
    
    # Show updated dataframe
    print("Датасет с компонентами адреса")
    df.show(10, truncate=False)

    Результаты вывода:

    regexp_extract() PySpark, Spark SQl Python
    Применение функции regexp_extract() в PySpark

    Функция regexp_extract() для меня оказалась самой сложной, поскольку в ней очень много параметров:

    • «r» перед строкой регулярного выражения указывает на то, что это сырая строка, и экранирование не требуется;
    • «([^,]+)» означает любой символ, кроме запятой, повторяющийся один или несколько раз;
    • «, (.+), д.» означает запятую, пробел, любой символ один или несколько раз, запятую, пробел, «д»;
    • «д\. (\d+)» означает «д.» (с символом экранирования перед точкой), пробел, одну или несколько цифр;
    • «\d+(?:/\d+)?» означает одну или несколько цифр, за которыми может следовать «/» и еще одна группа цифр (необязательно);
    • «\d+$» означает одну или несколько цифр, которые являются последними символами строки – использовалось для поиска индекса.

    Номер группы в regexp_extract() указывает на то, какую конкретную группу в регулярном выражении нужно извлечь. Группы в регулярном выражении определяются с помощью круглых скобок. Каждая группа может быть обозначена уникальным номером, начиная с 1. В регулярных выражениях группа — это подмножество символов, которое ограничено круглыми скобками (). Группы позволяют группировать символы вместе и применять к ним определенные операции или преобразования. С помощью групп можно извлекать подстроку из строки, группировать символы вместе и применять к ним операции, а также использовать обратные ссылки, которые представляют собой ссылки на предыдущие группы в регулярном выражении, чтобы сопоставлять повторяющиеся или симметричные паттерны в строке. В случае нашего примера группа с номером 1 означало первое совпадение с заданным в регулярном выражении шаблоне, а 0 – весь шаблон, что использовалось для числовых значений в индексе.

    Узнайте про все возможности Apache Spark для разработки приложений аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

    [elementor-template id=»13619″]

    Источники

    1. https://blog.devgenius.io/regular-expression-regexp-in-pyspark-e5f9b5d9617a
    2. https://sparkbyexamples.com/spark/spark-rlike-regex-matching-examples/
    3. https://linuxhint.com/pyspark-regexp-extract/