В рамках курсов для дата-инженеров и разработчиков распределенных приложений, сегодня рассмотрим пример построения системы потоковой передачи для аналитики больших данных на базе Apache Kafka, Spark и Google BigQuery. Читайте далее про Proof of Concept для конвейера продуктовой аналитики, который обрабатывает 50 миллиардов событий каждый день, и какие важные уроки ИТ-архитектор PayPal вынес из этого опыта.
Постановка задачи: зачем нужен PoC конвейера на Apache Kafka, Spark и Google BigQuery
Команда дата-инженеров PayPal управляет конвейером поведенческой и продуктовой аналитики больших данных, ежедневно обрабатывая более 50 миллиардов событий. После изменения структуры этого data pipeline’а было решено использовать Google BigQuery в качестве системы хранилища данных, поскольку оно обеспечивает быстрые SQL-запросы и интерактивный анализ Big Data [1]. Этот RESTful веб-сервис для интерактивного широкомасштабного анализа больших наборов данных, расположенных в облачном хранилище Google Storage. Благодаря модели инфраструктура как услуга, BigQuery можно использовать вместе с MapReduce, поддерживая пакетную загрузку, а также потоковую передачу.
BigQuery предоставляет внешний доступ к масштабируемой, интерактивной системе ad-hoc запросов Dremel для анализа данных, доступных только для чтения. Сперва данные следует загрузить в Google Storage, а затем импортировать с помощью BigQuery API HTTP. BigQuery требует аутентификации для всех запросов через механизмы подобные OAuth, позволяя предоставлять доступ к данным произвольным пользователям, группам и приложениям. BigQuery позволяет создавать и удалять таблицы на основе JSON-схемы, а также импортировать данные в формате CSV или JSON с Google Storage. Для запросов используется стандартный диалект SQL, а результат возвращается в формате JSON. BigQuery можно использовать в Google Apps Script как скрипт для Google Docs или на любом языке, поддерживающем REST API или клиентские библиотеки [2].
При пакетной загрузке исходные данные, например, из файла в формате CSV или Parquet, загружаются в таблицу BigQuery за одну пакетную операцию. Обычно так выполняются типовые ETL-задания. В случае потоковой передачи данные отправляются по одной записи за раз или объединенные в микропакеты на конечную точку потоковой передачи с помощью потоковых API, предоставляемых сервисами Google. Чтобы оценить возможности измененного конвейера обработки данных в условиях пиковой нагрузки, дата-инженеры решили сперва сделать доказательство концепции (Proof of Concept, PoC) перед тем, как принять решение о переходе на потоковую передачу. В качестве основного PoC-компонента была выбрана крупная таблица из 25 миллиардов событий. Данные находятся в локальной центре обработки, который соединяется с облаком Google. Какие еще варианты рассматривали дата-инженеры PayPal, читайте в нашей новой статье.
Чтобы быстрее создать PoC, было решено использовать Spark Structured Streaming для получения событий из Kafka и потоковую запись в BigQuery. При этом использовался не готовый коннектор, поддерживающий чтение таблиц Google BigQuery в DataFrames Spark и запись DataFrames обратно в BigQuery с помощью API источника данных Spark SQL для связи с BigQuery [3]. А написан собственный коннектор, который записывает данные с помощью потоковых API в корзину Google Cloud Storage, а затем выполняет пакетную загрузку этих данных в BigQuery. Также для рассматриваемого PoC был написан Spark-приемник потоковой передачи BigQuery. С помощью настраиваемого Spark-потребителя было создано простое ETL-задание потоковой передачи, которое потребляет и записывает данные в post-преобразования BigQuery.
Для обработки 25 миллионов событий ежедневно PoC с пиковым трафиком 420 тысяч событий в секунду, что составляет примерно 1.25 Гб/сек, подключался к On-Premise облаку со скоростью соединения 20 Гб/сек.
Весь конвейер с представленным PoC работает по следующему принципу [1]:
- данные из множества разных устройств записываются в топики Apache Kafka;
- приложение Spark Structured Streaming считывает данные из топиков Kafka;
- коннектор Spark-BigQuery отправляет данные в таблицы облачного Google хранилища;
- данные из таблиц BigQuery представляются на веб-дэшбордах для наглядной BI-аналитики.
За период тестовой эксплуатации созданной системы ее разработчики вынесли полезные выводы об особенностях потоковой передачи в BigQuery, которые мы рассмотрим далее.
Пакетирование
API потоковой передачи BigQuery позволяет объединить строки в один запрос и вызвать конечную точку. Здесь есть зависимость между размером пакета и задержкой. Например, в рассматриваемом PoC для пакета из 500 записей размером 1,25 МБ задержка составила около 450-650 мс, а для пакета размером в 2 раза больше (1000 записей), задержка выросла до 1100-1500 мс. Поэтому в качестве приемлемого варианта был выбран размер пакета в 500 записей.
Core Spark - основы для разработчиков
Код курса
CORS
Ближайшая дата курса
16 декабря, 2024
Продолжительность
16 ак.часов
Стоимость обучения
48 000 руб.
Предел квоты на вставку потоковой передачи
Максимальная квота потоковой передачи составляет 1 ГБ в секунду для каждого проекта Google Cloud Platform по молчанию. При превышении этого предела возникает исключение BigQueryException с ошибкой quotaExceeded. Именно это случалось при тестировании PoC, когда входящий трафик составил около 60% реального объема.
Возникла потребность увеличить квоту на уровне проекта, создав билет в Google и внедрив логику повторной обработки запроса при каждой ошибке, чтобы избежать потери неудачной записи в итоговой таблице. Отслеживать использование квоты можно с помощью следующего BigQuery SQL-запроса, чтобы корректировать ее до того достижения предела:
SELECT
start_timestamp,
SUM(total_requests) AS total_requests,
SUM(total_rows) AS total_rows,
SUM(total_input_bytes) AS total_input_bytes,
SUM(IF(error_code IN (“QUOTA_EXCEEDED”, “RATE_LIMIT_EXCEEDED”), total_requests, 0)) AS quota_error,
SUM(IF(error_code IN (“INVALID_VALUE”, “NOT_FOUND”, “SCHEMA_INCOMPATIBLE”, “BILLING_NOT_ENABLED”, “ACCESS_DENIED”, “CUNAUTHENTICATED”), total_requests, 0)) AS user_error,
SUM(IF(error_code IN (“CONNECTION_ERROR”,“INTERNAL_ERROR”), total_requests, 0)) AS server_error, SUM(IF(error_code IS NULL, 0, total_requests)) AS total_error,
FROM `region-us`.INFORMATION_SCHEMA.STREAMING_TIMELINE_BY_PROJECT
GROUP BY start_timestamp
ORDER BY 1 DESC
Квота рассчитывается не на основе фактических данных потоковой передачи в BigQuery, а на базе полезных данных в формате JSON и метаданных по количеству байтов, полученных в конечной точке RESTful API. Так дополнительные теги из JSON также учитываются при текущем расчете квоты.
Нет повторам: включение дедубликации
При тестировании PoC было обнаружено, что во время потоковой передачи данные дублируются, и в итоговой таблице получаются повторяющиеся записи. BigQuery позволяет исключить дубли, создавая объект «RowToInsert» с включенной дедупликацией:
RowToInsert row = InsertAllRequest.RowToInsert.of(uniqueInsertId,content);
Без включения дедупликации RowToInsert row = InsertAllRequest.RowToInsert.of(content), на каждые 500 000 загруженных записей возникал 1 дубль. При включении дедупликации, дубль случался на каждые 5 миллионов загруженных записей. Таким образом, включение дедупликации не гарантирует 100%-ное удаление дубликатов, но намного снижает вероятность их возникновения. BigQuery использует уникальный идентификатор uniqueInsertId в каждой записи для выявления и удаления дублей в течение последних 10 минут.
Кэширование при потоковой передаче таблицы
BigQuery обеспечивает базовое кэширование для пакетной загрузки таблиц. При запуске повторяющиегося запроса BigQuery пытается повторно использовать кешированные результаты. Но, если данные загружаются в таблицу через потоковую передачу, BigQuery не кэширует их.
Задержка потоковой передачи в BigQuery
При тестировании описанного PoC было отмечено более высокую задержку при запросе таблицы потоковой передачи по сравнению с таблицей пакетной загрузки, даже при меньшем объеме данных. Это случается по причине сканирования буфера потоковой передачи, где изначально хранятся загруженные записи, еще не отправленные в постоянное колоночное хранилище BigQuery Capacitor, которое оптимизировано для операций с интенсивным чтением. А буфер BigQuery Streaming оптимизирован для записи большого объема данных. Это и является причиной задержки, которая может составить 15–30 секунд в зависимости от объема запрашиваемых данных. Сперва буферу потоковой передачи требовалось 10–90 минут для отправки данных в постоянное колоночное хранилище. Сегодня во 2-ой версии серверной части BigQuery это время сокращено до 2–3 минут.
Анализ данных с помощью современного Apache Spark
Код курса
SPARK
Ближайшая дата курса
16 декабря, 2024
Продолжительность
32 ак.часов
Стоимость обучения
96 000 руб.
Завтра мы продолжим разбирать кейсы использования BigQuery с Apache Spark и рассмотрим пример цифровой трансформации автомобильной компании Renault с сервисами Google Cloud Platform. А детально узнать все технические подробности эксплуатации Apache Spark и Kafka для разработки распределенных приложений и аналитики больших данных вам помогут специализированные курсы в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
- Apache Kafka для разработчиков
- Основы Apache Spark для разработчиков
- Анализ данных с Apache Spark
- Потоковая обработка в Apache Spark
Источники
- https://aride.medium.com/learnings-from-streaming-25-billion-events-to-google-bigquery-57ce81fa9898
- https://ru.wikipedia.org/wiki/BigQuery
- https://github.com/GoogleCloudDataproc/spark-bigquery-connector