Какие типы данных поддерживает Apache Flink, как сериализация влияет на скорость обработки, зачем выбирать специализированные типы данных вместо общих структур и возможно ли изменение схемы данных без перезапуска приложения.
Типы данных в Apache Flink
В Apache Flink сериализация играет ключевую роль в процессе обработки данных, обеспечивая преобразование объектов в байтовый поток для передачи по сети или хранения в памяти. Эффективная сериализация необходима для достижения высокой производительности и низкой задержки в распределенных вычислительных средах, характерных для Flink. Каждый фреймворк поддерживает определенные типы данных и обрабатывает их с помощью своих внутренних механизмов сериализации и десериализации. В Apache Flink это реализуется с помощью собственных дескрипторов типов, их извлечения и фреймворка сериализации. Вообще во Flask есть следующие категории типов данных:
- Кортежи Java и Case-классы Scala. Кортежи Java (Tuple) — это составные типы, которые содержат фиксированное количество полей с различными типами, от 1 до 25 (Tuple1 — Tuple25). Каждое поле кортежа может быть произвольным типом Flink, включая дополнительные кортежи. Так можно получить сложную вложенную структуру данных. К полям кортежа можно получить доступ напрямую, используя имя поля, например, f4, или универсальный getter-метод tuple.getField(int position), указав индекс поля, которые начинаются с 0. Case-классы Scala (и кортежи, которые являются их частным случаем) — это составные типы, которые содержат фиксированное количество полей с различными типами. Поля кортежа адресуются по их именам со смещением 1, например, _1 для первого поля. Доступ к полям Case-класса осуществляется по их имени.
- Java POJO — специальный тип данных, который могут иметь публичные классы Java и Scala, если они имеют открытый конструктор без аргументов (по умолчанию) и все их поля публичные или доступны через функции getter и setter. Тип поля должен поддерживаться зарегистрированным сериализатором. POJO обычно представлены с помощью PojoTypeInfo и сериализованы с помощью сериализатора Pojo или Kryo. Исключением является случай, когда POJO на самом деле являются типами Avro (Avro Specific Records) или Avro Reflect Types. В этом случае POJO представляются с помощью AvroTypeInfo и сериализуются с помощью сериализатора Avro. Типы POJO проще в использовании, чем общие типы, и более эффективно обрабатываются фреймворком Flink.
- Примитивные типы — Flink поддерживает все примитивные типы Java и Scala, целочисленные, вещественные, символьные и логические.
- Общие типы классов Java и Scala, включая пользовательские, кроме тех, поля которых не могут быть сериализованы, например, указатели файлов, потоки ввода-вывода или другие собственные ресурсы. Классы, которые следуют соглашениям Java Beans, в целом работают хорошо. Все классы, которые не идентифицированы как типы POJO, Flink обрабатывает как общие типы, рассматривая их как черные ящики без возможности получить доступ к их содержимому, например, для эффективной сортировки. Для сериализации и десериализации используется фреймворк Kryo.
- Значения, которые описывают свою сериализацию и десериализацию вручную, реализуя интерфейс apache.flink.types.Value с методами read и write. Рекомендуется использование тип даннх value, когда сериализация общего типа будет неэффективной, например, разреженный вектор элементов как массив. Зная, что массив в основном состоит из nuul-значений, можно использовать специальную кодировку для ненулевых элементов, тогда как сериализация общего типа просто запишет все элементы массива. Интерфейс org.apache.flink.types.CopyableValue поддерживает логику ручного внутреннего клонирования аналогичным образом. Flink включает типы значений, которые соответствуют базовым типам данных: ByteValue, ShortValue, IntValue, LongValue, FloatValue, DoubleValue, StringValue, CharValue, BooleanValue. Эти типы значений действуют как изменяемые варианты базовых типов данных: их значение может быть изменено, что позволяет программу повторно использовать объекты и снимать нагрузку со сборщика мусора.
- Объекты, записываемые в Hadoop, реализующие интерфейс apache.hadoop.Writable. Логика сериализации, определенная в их методах write() и readFields() будет использоваться для сериализации.
- Специальные типы, например, Either, Option, и Try в Scala и Either в Java, который значение двух возможных типов, что полезно для обработки ошибок или операторов, которым необходимо выводить два разных типа записей.
Если Flink не может определить оптимальный сериализатор для пользовательского типа данных, он использует Kryo — универсальный и гибкий фреймворк для сериализации. Однако, такая унификация может быть менее эффективна по сравнению с специализированными сериализаторами.
Влияние сериализации на скорость потоковой обработки
Примечательно, что компилятор Java отбрасывает большую часть информации об обобщенном типе после компиляции. Это означает, что во время выполнения экземпляр объекта больше не знает свой обобщенный тип. Например, экземпляры DataStream<String> и DataStream<Long> выглядят одинаково для JVM. Однако, Flink требует информацию о типе в момент подготовки программы к выполнению, когда вызывается основной метод программы. Flink пытается восстановить как можно больше информации о типе с помощью отражения, используя несколько бит, которые сохраняет Java: сигнатуры функций и информацию о подклассах. Эта логика также содержит некоторые простые выводы типов для случаев, когда возвращаемый тип функции зависит от ее входного типа. Так Flink пытается реконструировать информацию о типе, которая была отброшена различными способами, и явно сохранить ее в наборах данных и операторах. Например, можно узнать тип данных, используя метод DataStream.getType(), который возвращает экземпляр TypeInformation, что является внутренним способом представления типов Flink. Класс TypeInformation является базовым классом для всех дескрипторов типов: он раскрывает некоторые основные свойства типа и может генерировать сериализаторы и компараторы для типов.
Вывод типа данных имеет свои ограничения и в некоторых случаях требует дополнительных инструкций. Например, методы, которые создают наборы данных из коллекций StreamExecutionEnvironment.fromCollection(), или универсальные функции, такие как MapFunction<I, O>. Для этого придется реализовать интерфейс ResultTypeQueryable.
Правильный выбор и определение типов данных в Flink непосредственно влияет на эффективность обработки данных. Flink пытается вывести много информации о типах данных, которые используются во время распределенных вычислений, чтобы повысить эффективность сериализации и размещения данных в памяти. Это также избавляет разработчика от необходимости беспокоиться о фреймворках сериализации и регистрации типов. В общем случае информация о типах данных необходима, когда программа вызывает DataStream, и перед любым вызовом execute(), print(), count() или collect().
Чтобы повысить эффективность обработки данных на уровне наиболее подходящего типа, можно опираться на следующие рекомендации:
- предоставить Flink больше информации о структуре данных через TypeInformation, чтобы система автоматически выбирала наиболее сериализаторы;
- использовать специализированные типы данных, например, POJO или Tuple вместо общих структур, чтобы сократить накладные расходы на сериализацию;
- вместо универсальных сериализаторов (Kryo) использовать специализированные.
Таким образом, понимание механизмов сериализации и типов данных в Apache Flink позволяет повысить эффективность высокопроизводительной обработки больших объемов потоковых данных. Кроме того, Flink поддерживает эволюцию схемы данных состояния stateful-приложения благодаря встроенной поддержке форматов Avro и Apache Parquet, которые обеспечивают эволюцию схемы, позволяя добавлять, удалять или изменять поля, сохраняя обратную совместимость.
Также Flink интегрируется с внешними реестрами схем, например, реестр схемы Kafka (Confluent Schema Registry). Благодаря этой интеграции Flink-приложение может извлекать и регистрировать схемы данных из реестра, чтобы динамически обрабатывать изменения и обеспечивать совместимость между продюсерами и потребителями данных. Еще у Flink есть функцию Dynamic Table Schema, которая позволяет динамически определять и изменять схему таблицы, добавляя удаляя или изменяя ее столбцы без перезапуска приложения. Наконец, можно реализовать пользовательские десериализаторы и сериализаторы для обработки определенных форматов данных и изменений схемы, чтобы обрабатывать сложные преобразования данных и эволюцию их структур.
Освойте возможности Apache Flink для пакетной и потоковой аналитики больших данных и машинного обучения на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники
- https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/serialization/types_serialization/
- https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/dev/datastream/fault-tolerance/serialization/schema_evolution/
- https://forum.huawei.com/enterprise/intl/en/thread/Does-Apache-Flink-provide-mechanisms-for-handling-schema-evolution-and-compatibility-when-the-data-format-changes-in-application-development/702948320344948736?blogId=702948320344948736