Из Kafka во Flink: пишем Python-приложение

PyFlink Google Colab, Apache Flink Python Colab Kafka, обучение Flink Kafka, Upstash Kafka, курсы Kafka Flink, Kafka serverless, Apache Kafka и Flink для разработчиков, обучение большим данным, Школа Больших Данных Учебный центр Коммерсант

Сегодня рассмотрим, как написать и запустить в Google Colab свое Python-приложение считывания данных из топика Kafka с помощью коннектора FlinkKafkaConsumer из библиотеки pyflink.datastream.connectors  и почему заставить его работать оказалось не так просто.

Использование FlinkKafkaConsumer для доступа к Kafka из Flink приложения

Недавно я показывала, как написать PyFlink-скрипт считывания данных из топика Apache Kafka, развернутого в облачной бессерверной платформе Upstash. Подробнее о том, как устроен Python API Apache Flink, читайте здесь. Однако, в тот раз вместо FlinkKafkaConsumer из библиотеки pyflink.datastream.connectors  я использовала класс Consumer библиотеки kafka-python, поскольку не загрузила JAR-файл коннектора Flink-Kafka. Загрузить этот пакет поможет инструкция

!wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar

При работе в интерактивной среде Google Colab файл коннектора загружается в текущую рабочую директорию, которая по умолчанию находится в папке /content/. Помимо этого, для работы с коннектором Flink к Kafka также необходимо установить целый ряд библиотек и импортировать некоторые их модули:

####################################ячейка в Google Colab №1 - установка и импорт библиотек###########
#установка библиотек
!pip install kafka-python 
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!pip install apache-flink

#импорт модулей 
from kafka import KafkaConsumer
from pyflink import datastream
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.common.typeinfo import Types
from pyflink.datastream.formats.json import JsonRowDeserializationSchema

!wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka/1.17.1/flink-sql-connector-kafka-1.17.1.jar

Чтобы запустить Flink-приложение необходимо сперва создать StreamExecutionEnvironment – контекст, в котором выполняется программа потоковой передачи. Причем в Colab мне пришлось явно задавать JAR-пакет коннектора, ранее установленного с помощью команды !wget:

env = StreamExecutionEnvironment.get_execution_environment()
env.add_jars("file:////content/flink-sql-connector-kafka-1.17.1.jar")

После этого можно создать объект класса FlinkKafkaConsumer, передав в параметрах название топика, учетные данные для подключения к серверу Kafka и схему десериализации сообщений:

kafka_consumer = FlinkKafkaConsumer(
    topic,
    deserialization_schema=json_format,
    properties=kafka_properties
)

Схема десериализации определена с помощью строк

row_type_info = Types.ROW_NAMED(['id', 'name', 'subject', 'content'],[Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()])
json_schema  = JsonRowDeserializationSchema.builder().type_info(row_type_info).build()

Flink поддерживает чтение/запись записей JSON с помощью класса JsonRowDeserializationSchema, который находится в пакете pyflink.datastream.formats.json – именно поэтому ранее понадобился импорт этого модуля.

Весь код PyFlink-приложения выглядит так:

####################################ячейка в Google Colab №2 - код PyFlink-приложения#####################
#Создание словаря с настройками для подключения к серверу Kafka
kafka_properties = {
"bootstrap.servers": kafkaserver, # адрес сервера Kafka
"sasl.mechanism": "SCRAM-SHA-256", # механизм аутентификации
"security.protocol": "SASL_SSL", # протокол безопасности
"sasl.jaas.config": f'org.apache.kafka.common.security.scram.ScramLoginModule required username="{user_name}" password="{user_pass}";' # параметры аутентификации
}

#Получение среды выполнения PyFlink
env = StreamExecutionEnvironment.get_execution_environment()

#Добавление в среду выполнения JAR-файла с коннектором Flink для Kafka
env.add_jars("file:////content/flink-sql-connector-kafka-1.17.1.jar")

#Создание объекта схемы данных для десериализации JSON-строк
row_type_info = Types.ROW_NAMED(['id', 'name', 'subject', 'content'], [Types.STRING(), Types.STRING(), Types.STRING(), Types.STRING()])
json_schema = JsonRowDeserializationSchema.builder().type_info(row_type_info).build()

#Создание источника данных, который будет считывать сообщения из Kafka
source = FlinkKafkaConsumer(
topic, # название топика
deserialization_schema=json_schema, # схема десериализации
properties=kafka_properties # настройки для подключения к серверу Kafka
)

#Начало чтения сообщений с самого начала топика
source.set_start_from_earliest()

#Добавление источника данных в среду выполнения
data_stream = env.add_source(source)
print('source_type ', data_stream.get_name()) #Вывод типа источника данных
print('data_types ', data_stream.get_type()) # Вывод типа элементов источника данных

data_stream \
    .map(lambda x: x, output_type=row_type_info) \ #Операция map, которая просто выводит элементы на консоль
    .print()

#Запуск выполнения программы
env.execute("Kafka to Upstash")

Однако, при выполнении этого кода у меня возникли ошибки, отладить которые мне пока не удалось.

PyFlink Colab Kafka Python Flink Streaming
Запуск PyFlink-приложения в Google Colab

Скорей всего, эти ошибки связаны с некорректной установкой зависимостей между различными модулями и классами фреймворка Flink, Python и внешнего сервера Kafka. Также, необходимо полностью использовать возможности потоковой обработки в Apache Flink, включая точки сохранения и водяные знаки, что отсутствует в моем коде. А интерактивная среда Google Colab добавляет сложности, не позволяя посмотреть логи, которые лежат в скрытой папке .config. Как побороть эти трудности, я покажу в другой раз. А в заключение напомню, что все эти и другие подробности использования Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.
Поиск по сайту