Мы уже рассказывали, что такое Graceful shutdown на примере Spark Streaming. Сегодня разберем реализацию этой идеи плавного завершения задач в потоковой обработке данных применяется в компании Carwow при работе с Apache Kafka и dyno-контейнерами приложений Heroku.
Потоковая обработка данных и проблема завершения потоковых заданий в контейнерах Heroku
Carwow — британская платформа покупки новых и подержанных автомобилей у франчайзинговых дилеров по модели обратного выкупа, которая сводит в сделку покупателей и продавцов, исключая прямое взаимодействие между ними. Основными компонентами этой ИТ-платформы являются контейнерные приложения Heroku и Apache Kafka как распределенная платформа потоковой передачи событий для высокопроизводительных конвейеров данных. Обычно при сбоях с Kafka в микросервисной Carwow-системе проблема была в коде потоковых приложений.
Однако, однажды дата-инженеры Carwow столкнулись с ситуацией, когда сообщения не доставлялись, и не по причине ошибок в коде Kafka продюсеров или получателей сообщений. Некоторые сервисы взаимодействуют через API, а другие используют события Kafka, чтобы любой подписчик знал о внесении изменений и инициировал другое действие из любого сервиса в системе. Например, когда дилер отвечает пользователю, сообщение сперва сохраняется, а затем в Kafka публикуется событие Dealer_sent_message. Позже другие службы будут прослушивать это событие и запускать другие действия, такие как AssignBuyingPeriod, UpdateLatestActivity, UpdateEnquiryCounts и т. д.
Для повышения производительности в Carwow используются асинхронные продюсеры Kafka, которые буферизируют события и делегируя задачу доставки сообщений, согласно заранее настроенному временному интервалу или порог. Однако, чтобы гарантировать освобождение используемых сетевых ресурсов и доставку всех буферизованных сообщений, следует отключить асинхронный продюссер, не потеряв сгенерированные данные. Как это было сделано, мы рассмотрим далее, но сначала напомним, что представляет собой платформа Heroku.
Все приложения Heroku работают в наборе легковесных контейнеров Linux, называемых dynos. Когда dyno-менеджер перезапускает dyno-менеджер, он запросит корректное завершение процессов, отправив всем им сигнал SIGTERM. Поэтому некоторые процессы в dyno-контейнере, который закрывается, могут получить несколько таких сигналов.
По умолчанию у процессов приложения есть максимум 30 секунд, чтобы завершить работу корректно. В течение этого времени они должны прекратить прием новых запросов или заданий, завершить свои текущие запросы или вернуть задания в очередь для обработки другими рабочими процессами. Если какие-либо процессы останутся после этого периода времени, диспетчер dyno принудительно завершит их с помощью SIGKILL. При выполнении контролируемых или периодических перезапусков новые dyno-контейнеры запускаются, как только в процессы в старых направлены сигналы завершения работы. Как это связано с проблемой Carwow и Apache Kafka, читайте далее.
Graceful shutdown в Apache Kafka
Код приложения Carwow вызывает метод shutdown() как обработчик anat_exit, который гарантирует выполнение при выходе из программы, если только она не уничтожается операционной системой с помощью SIGKILL:
at_exit do CarwowCore.logger.info 'Shutting down, delivering any remaining Kafka messages' async_producer.shutdown end
Каждый раз при развертывании, масштабировании или перезапуске dyno-контейнера Heroku сигнализирует процессам с помощью SIGTERM с достаточным временем для корректного завершения. Если процесс все еще выполняется через 30 секунд после начального сигнала SIGTERM, Heroku отправит сигнал SIGKILL, чтобы резко остановить выполнение. При этом нет гарантии, что Kafka вовремя очистит буферы. Таким образом, дата-инженеры Carwow обнаружили, что с 2018 года ежедневно терялось до 0,1% событий Dealer_sent_message. Это происходило из-за того, что некоторым dyno-контейнерам нужно слишком много времени для корректного завершения из-за большого количества запросов в очереди, что вынуждает Heroku отправлять SIGKILL после заданного периода и не дает достаточно времени для очистки буферов Kafka. Поэтому вместо асинхронного продюсера следует перейти на синхронный, который будет отправлять события напрямую без риска потерять их в буфере, если тот не сможет корректно закрыться.
Kafka оптимизирована для передачи сообщений пакетами, а не по отдельности, поэтому при использовании API для одного сообщения возникают значительные накладные расходы и потери производительности. Код для публикации событий Dealer_sent_message в Carwow выглядит следующим образом:
producer = CarwowCore::Kafka.sync_producer BusinessEvent::Adapters::SendToKafka.new(producer: producer).call(event) producer.deliver_messages
Реализация этой идеи с развертыванием синхронного продюсера показала ее работоспособность: события Dealer_sent_message больше не теряются. Тестирование показало, что использование синхронного производителя останавливает потерю событий Kafka. Однако, это не доказывает, что события терялись из-за SIGKILL. Синхронный продюсер решает проблему пропущенных событий, но имеет следующие недостатки:
- снижает производительность;
- сообщение не буферизуется, поэтому теряется в случае ошибки;
- доставка сообщения может завершиться ошибкой по нескольким причинам, а этот упрощенный API не обеспечивает автоматических повторных попыток.
Поэтому необходимо знать, какие запросы или задания выполняются в течение периода Graceful shutdown, сколько сообщений находится в буфере, когда процесс получает SIGTERM и пр. Чтобы получить эту информацию, нужно перехватить сигнал SIGTERM и инициировать процесс выключения с помощью надлежащих инструментов. Как только приложение получает сигнал SIGTERM, следует прекратить использование асинхронного продюсера Kafka и очистить все буферы, чтобы не потерять ни одного события в случае получения SIGKILL.
Но из-за обработки запросов в течении периода плавного завершения может потребоваться публикация события в Kafka, поэтому ее нельзя полностью закрыть. В этом случае есть несколько вариантов:
- замена асинхронного продюсера, используемого на всей платформе, синхронным сразу после получения SIGTERM;
- остановка обработки запросов, ожидающих выполнения, в течение периода Graceful shutdown, что зависит от их количества и загрузки серверов;
- измерение и запись размера буфера сразу после получения SIGTERM, чтобы накопить больше данных и принять обоснованное решение.
Узнайте больше про администрирование и использование Apache Kafka для потоковой аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков больших данных в Москве:
- Администрирование кластера Kafka
- Apache Kafka для инженеров данных
- Администрирование Arenadata Streaming Kafka