Улучшенная обработка пакетов с Apache Kafka и Logstash в Trendyol

Logstash Apache Kafka примеры курсы обучение, пакетная обработка событий Kafka, обучение Kafka, курсы Apache Kafka, Kafka администратор кластера курсы, Apache Kafka для дата-инженеров, Apache Kafka для администраторов и инженеров данных, Школа Больших Данных Учебный центр Коммерсант

Как турецкая e-commerce компания Trendyol повысила эффективность пакетных вычислений, используя распределенную платформу потоковой обработки событий Apache Kafka вместе с серверной утилитой сбора и фильтрации данных из разных источников Logstash.

Пакетная обработка данных и конвейер на Logstash

Хотя сегодня все больше организаций переходят на потоковую обработку событий в реальном времени, пакетная парадигма работы с данными до сих активно используется. Однако, чтобы реализовать такие вычисления эффективно, дата-инженерам необходимо сперва ответить на следующие вопросы:

  • какова частота пакетной обработки, это однократный процесс или запланированный и периодически повторяющийся?
  • Каков объем обрабатываемых данных?
  • Какова скорость роста объема данных?
  • Сколько времени занимает обработка?
  • Можно ли запустить другое задание до завершения текущего и что произойдет в этой ситуации?
  • Если до завершения текущего задания запустить другое, повлияет ли это на другие процессы?

Дополнительно к этим вопросам, дата-инженеры Trendyol столкнулись с необходимостью реализовать бизнес-требования к своей системе пакетной обработки данных. Ситуация усложнялась зависимостью от 4-х различных API и необходимости сохранять некоторые состояния с их регулярной проверкой. При завершении отдельных событий в одном API другие должны знать об этом. Ежедневно на обработку поступает около 500 новых пакетов данных, каждый из которых объемом  примерно 120К. Допустимая задержка обновления данных не более 1-го часа.

Эти требования можно реализовать, используя следующие альтернативные подходы:

  • Сбор всех нужных данных и их поочередная обработка. Это подходит для небольшого объема данных с длительным временем отклика и выполнения зависимого API. Увеличить скорость обработки данных с помощью распараллеливания, т.е. путем добавления новых экземпляров системы, не получится из-за их синхронной работы.
  • Разделение данных на небольшие фрагменты поможет справиться с ростом их объема. Если объем данных растет не очень быстро, а зависимые API и DBA стабильны, этот подход работает хорошо. Хотя процесс работает синхронно, он открыт для асинхронной работы. Однако, система не масштабируемая: увеличение объема данных и/или росте времени отклика любого API влияет на весь процесс. Если задание пакетной обработки выполняется асинхронно, но неизвестно о завершении самого раннего задание при запуске, это может создать узкое место.

Чтобы обеспечить быстрый сбор и распространение данных из основного приложения, дата-инженеры Trendyol решили использовать Logstash – конвейер обработки данных, который может принимать данные из нескольких источников, фильтровать и улучшать их, а также отправлять их в несколько мест назначения. Это универсальный инструмент с открытым исходным кодом и множеством плагинов для интеграции с источниками и приемниками данных. В случае Trendyol данные изначально хранятся в объектно-реляционной базе PostgreSQL и отправляются в Apache Kafka для масштабируемой обработки. Про опыт применения Apache Kafka в компании Trendyol мы уже писали здесь. Выполняя роль конвейера сбора и маршрутизации данных, Logstash использует подключаемый модуль JDBC в качестве входа и плагин Kafka в качестве выхода. Таким образом, совместное использование Logstash и Apache Kafka позволяют сделать систему масштабируемой, сократить время обработки пакета и снизить влияние зависимостей. Как это было сделано, мы рассмотрим далее.

Совмещение c Apache Kafka

Чтобы обеспечить масштабируемую и параллельную обработку пакетных данных, дата-инженеры Trendyol решили воспользоваться преимуществами разделов в Apache Kafka. Напомним, разделы топиков в Kafka позволяют распараллелить обработку данных, задействовав одновременно несколько брокеров. Аналогичным образом был разделен процесс планирования заданий Logstash по расписанию. Причем вместо дорогостоящего долговременного сохранения состояний, задания запускаются по требованию через равные промежутки времени с помощью планировщика Kubernetes. При этом используется JDBC-плагин Logstash для приема данных. Чтобы воспользоваться им, нужно добавить соответствующую библиотеку драйверов и ее классы, а затем реализовать Kubernetes YAML.

input {
  jdbc {
    jdbc_driver_library => "{path}/postgresql-9.4.1212.jar"
    jdbc_driver_class => "org.postgresql.Driver"
    jdbc_connection_string => "${JDBC_CONNECTION_STRING}"
    jdbc_user => "${DB_USERNAME}"
    jdbc_password => "${DB_PASSWORD}"
    statement => "select id from table order by date asc "
    clean_run => true
    jdbc_page_size => "${JDBC_PAGE_SIZE}"
    jdbc_paging_enabled => true
  }
}
filter {
    mutate {
            rename => {"table_id"=> "id"}
    }
}
output {
  kafka {
      codec => json
      topic_id => "${KAFKA_TOPIC}"
      bootstrap_servers => "${KAFKA_BOOTSTRAP_SERVERS}"
      value_serializer => "org.apache.kafka.common.serialization.ByteArraySerializer"
      retries => 5
  }
}

YAML-файл для Kubernetes выглядит следующим образом:

apiVersion: "batch/v1beta1"
kind: "CronJob"
metadata:
  name: job-deployment
spec:
  concurrencyPolicy: "Forbid"
  failedJobsHistoryLimit: 1
  successfulJobsHistoryLimit: 1
  startingDeadlineSeconds: 600
  suspend: true/false
  schedule: 0 0/15 * 1/1 * ? *
  jobTemplate: 
    spec:
      template:
        spec:
          containers:
            - name: project-name
              image: docker-images
              env:
                - name: JDBC_CONNECTION_STRING
                  value: "localhost:5432/test"
                - name: JDBC_PAGE_SIZE
                  value: "1000"
                - name: KAFKA_BOOTSTRAP_SERVERS
                  value: "localhost:9090"
                - name: KAFKA_TOPIC
                  value: "test-topic"
              resources:
                limits:
                  memory: "{memory-limit}"
                  cpu: "${memory-limit}"
                requests:
                  memory: "${memory-request}"
                  cpu: "${cpu-request}"
              imagePullPolicy: IfNotPresent
          restartPolicy: Never

Для ручного запуска заданий следует изменить параметр приостановки (suspend) на значение true.

В заключение рассмотрим плагин для передачи данных из Logstash в топики Apache Kafka. Он использует Kafka Client 2.8 и поддерживает подключение к Kafka через SSL или Kerberos SASL. По умолчанию защита отключена, но при необходимости ее можно включить.

API службы аутентификации и авторизации Java (JAAS) предоставляет службы аутентификации и авторизации пользователей для Kafka. Этот параметр указывает путь к файлу JAAS. Пример файла JAAS для клиента Kafka выглядит так:

KafkaClient {
  com.sun.security.auth.module.Krb5LoginModule required
  useTicketCache=true
  renewTicket=true
  serviceName="kafka";
  };

Задание свойств jaas_path и kerberos_config в файле конфигурации добавит их к глобальным системным свойствам JVM. Это означает, что при наличии нескольких входных данных Kafka, все они будут использовать один и тот же jaas_path и kerberos_config. Если это нежелательно, придется запускать отдельные экземпляры Logstash на разных экземплярах JVM.

Плагин Logstash к Kafka поддерживает все общие конфигурации продюсера, потребителя и топика, но единственной необходимой для подключения является идентификатор топика (topic_id). Сам кодек по умолчанию простой: Logstash будет кодировать события не только в поле сообщения, но и с отметкой времени и именем хоста. Чтобы отправить полное содержимое в формате JSON, надо установить кодек в выходной конфигурации следующим образом:

output {
      kafka {
        codec => json
        topic_id => "mytopic"
      }
    }

Возвращаясь к кейсу Trendyol отметим, что совместное использование Logstash и Apache Kafka для пакетной обработки позволило ускорить вычислительный процесс в 4 раза: с 1 часа до 15 минут, повысив удовлетворенность бизнес-пользователей. Этот масштабируемый подход упрощает миграцию данных и позволяет избежать ненужных потребителей для переноса.

Больше подробностей про администрирование и эксплуатацию Apache Kafka в системах аналитики больших данных вы узнаете на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:

Я даю свое согласие на обработку персональных данных и соглашаюсь с политикой конфиденциальности.

Источники

  1. https://medium.com/trendyol-tech/improving-batch-data-processing-using-logstash-kafka-d861520ff708
  2. https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html
Поиск по сайту