Знакомство с aiokafka: асинхронный Python-клиент для Apache Kafka

обучение Kafka, курсы по Apache Kafka, тренинг Kafka, Apache Kafka курсы для разработчиков больших данных в Москве, обучение разработчиков Big Data Kafka Python, Apache Kafka и Python, примеры Kafka Python, Школа Больших Данных Учебный центр Коммерсант

Мы уже писали о Python-клиентах Apache Kafka, которые позволяют разрабатывать приложения потоковой передачи события, используя популярный Python вместо сложных языков Java и Scala. Сегодня познакомимся с еще одной Python-библиотекой, которая представляет асинхронный клиент для Kafka. Что такое aiokafka и чем это отличается от kafka-python: краткий обзор для обучения инженеров данных и разработчиков Big Data.

Зачем нужен еще один Python-клиент для Apache Kafka

Чтобы писать на Python программный код для работы с распределенной платформой потоковой передачи событий Apache Kafka, можно воспользоваться PyKafka (API-интерфейс Python от Parse.ly), клиентом Python для Kafka от корпорации Confluent или open-source библиотекой Kafka-Python. Все эти инструменты мы рассматривали здесь. В 2016 году также вышла еще одна библиотека с открытым исходным кодом – aiokafka. Возникает вопрос, зачем нужен еще один аналог с такими же функциональными возможностями? Ответим на это, сравнив aiokafka с kafka-python.

Проект kafka-python пытается полностью имитировать клиентский API-интерфейс Java. Он не слишком быстрый, но все же имеет неплохую пропускную способность, активно развивается и оперативно реагирует на изменения в Java-клиенте. Библиотека kafka-python предназначена для использования в многопоточной среде. А из-за имитации клиента Java, он превращается в потоковую JVM-подобную среду, где не так много способов асинхронного выполнения действий, т.к. обычно потоки Java очень мощные и могут использовать несколько ядер.

Поэтому kafka-python не просто адаптировать для асинхронного использования, даже при том, что библиотека выполняет асинхронный ввод-вывод с использованием селекторов. У этого API в целом синхронное, т.е. блокирующее поведение, включая блокировку использования сокетов, синхронизацию потоков и пр. В частности, bootstrap блокируется в самом конструкторе, есть блокирующий итератор для потребления и блокирующая отправка запросов продюсера, если буфер заполнен.

Поэтому для асинхронных вызовов, которые позволяют отправителю вызова продолжать работу, не дожидаясь ответа, необходим неблокирующий API-интерфейс. Именно эту идею и воплощает библиотека aiokafka, что мы рассмотрим далее.

Как устроена aiokafka

Будучи схожей с kafka-python по некоторым функциональным возможностям, особенно в части API продюсера, aiokafka существенно отличается в дизайне API потребителя. Например, в kafka-python есть блокирующий вызов KafkaConsumer.poll(),который выполняет выборку сообщений и опрос сокетов с помощью epoll, kqueue или другого доступного API пользовательской ОС, а также обеспечивает жизнеспособность группы потребителей и отвечает за автофиксацию смещений. В aiokafka у потребителя нет метода poll() – класс AIOKafkaConsumer предоставляет интерфейс под именем getmany(), выполняя перебалансировку группы потребителей в фоновом режиме.

Напомним, в исходном Java-клиенте Kafka до версии 0.10.1 отправка heartbeat-сигналов выполнялась только при вызове poll(). Это приводило к отсутствию у потребителя прямого способа остановить обработку и присоединиться к группе в случае исключения из-за неполучения heartbeat-сигнала. Цикл завершал обработку всего пакета, что приходилось обходить через вызовы pause() и poll(0) для вычислительных тактов. Эти проблемы были устранены, когда Java-клиент и kafka-python изменили поведение на фоновую отправку потока.

А библиотека aiokafka делегирует отправку heartbeat-сигналов фоновой задаче, которая отправляет их координатору все время работы цикла событий. Это поведение очень похоже на Java-клиент, за исключением отсутствия тактовых импульсов для длинных методов, привязанных к CPU.

Кроме этого, aiokafka также выполняет групповую перебалансировку в той же фоновой задаче. Поэтому время обработки между вызовами getmany() фактически не влияет на ребалансировку. Нет необходимости настраивать конфигурацию max.poll.interval.ms для времени ожидания перебалансировки и времени ожидания обработки потребителя. В aiokafka эти параметры не связаны между собой, но зато добавлены конфигурации rebalance_timeout_ms и max_poll_interval_ms. Они позволяют разработчику контролировать моменты начала и окончания перебалансировки с помощью ConsumerRebalanceListener. Достаточно установить rebalance_timeout_ms на максимальное время, которое приложение может провести в ожидании обратного вызова. Если обратный вызов ожидает обработки последнего результата getmany(), рекомендуется установить эту конфигурацию равной значению max_poll_interval_ms, как и в Java-клиенте.

В Java-клиенте Kafka и проекте kafka-python предварительная выборка очень проста, поскольку она выполняется в вызове poll(), если недостаточно данных для удовлетворения другого опроса. В интерфейсе итератора при обработке почти всех данных это будет выглядеть так:

def poll():
    max_records = self.config['max_poll_records']
    records = consumer.fethed_records(max_records)
    if not consumer.has_enough_records(max_records)
        consumer.send_fetches()  # prefetch another batch
    return records

Такой простой способ отлично работает, когда надо автоматизировать задачу ввода-вывода с обработкой записей. Но в случае семантического разделения, т.е. специфической обработке каждого раздела, возможна задержка, привязанная ко времени обработки данных во всех топиках. Поэтому aiokafka пытается выполнять предварительную выборку для каждого раздела. Например, обработав все данные, ожидающие раздела, в интерфейсе итератора, aiokafka сразу же попытается предварительно загрузить новые данные. Хотя аналогичный интерфейс можно построить поверх метода pause() в kafka-python, это займет много усилий и потребует большое количество программного кода.

Но использование метода getmany() в aiokafka без указания разделов приведет к тому же поведению предварительной выборки, что и использование вызова poll().

Чтобы использовать aiokafka, надо сперва установить эту библиотеку, а затем импортировать из нее нужный интерйфес. Например, следующий код показывает создание объекта продюсера, который методом send_and_wait() отправляет сообщение в буферную память, затем получает пакеты сообщений и отправляет их на соответствующие узлы в кластере. После публикации сообщений продюсер останавливается.

pip install aiokafkafrom aiokafka import AIOKafkaProducer
import asyncio async def produce_msg():
    print("Started Producing")
    producer = AIOKafkaProducer(bootstrap_servers='localhost:9092')
    await producer.start()
    try:
        while True:
            msg = input("enter_msg> ")
            await producer.send_and_wait("topic_one", bytes(msg, 'utf-8'))
    except KeyboardInterrupt as kint:
        print("Stop producing")
    finally:
        await producer.stop() asyncio.run(produce_msg())
 print("Done producing")

Узнайте больше про администрирование и эксплуатацию Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/@LogeshSakthivel/event-stream-to-kafka-in-python-part-i-58db78553d2a
  2. https://github.com/aio-libs/aiokafka
  3. https://aiokafka.readthedocs.io/en/stable/kafka-python_difference.html
Поиск по сайту