Сегодня разберем тему, особенно полезную для обучения разработчиков распределенных приложений и дата-инженеров масштабных платформ аналитики больших данных на Apache Flink: обнаружение сложных цепочек связанных событий в потоковой обработке. Как создать свой шаблон поиска сложных событий с библиотекой FlinkCEP.
Комплексная обработка событий или зачем вам CEP
Современный data-driven бизнес хочет принимать на основе данных в режиме реального времени, чтобы быстро реагировать на поведение своих клиентов и меняющиеся тенденции внешнего рынка. Это нужно для систем аналитики больших данных и машинного обучения, например, для выявления мошеннических операций и предупреждения аномальных сбоев в технологических процессах. Также такая возможность полезна для платформ интернета-вещей и других бизнес-приложений. Однако, на лету обнаруживать шаблоны в потоках событий, что заблаговременно принять меры – не самая тривиальная задача. Впрочем, ее успешно решают специализированные системы комплексной обработки событий (Complex Event Processing, CEP), сопоставляя поток входящих событий с заранее определенным шаблоном. CEP в реальном времени ищет релевантные данные по сохраненному запросу, в отличие от традиционных СУБД, отбрасывая неподходящие компоненты потока.
CEP позволяет найти объект по шаблонизированному запросу в бесконечном потоке входных данных. При этом объектом может быть все что угодно, что имеет определенное поведение в виде последовательности событий. А шаблоном будет набор действий, которые может выполнять этот объект. Как только CEP обнаружит данные, удовлетворяющие заданному шаблону, система идентифицирует искомый объект в режиме реального времени. Проиллюстрируем эту идею на практическом примере пользовательского поведения на сайте. Для формирования персональной рекомендации посетителю, очень заинтересованному в покупке, надо идентифицировать такого посетителя, который выполняет следующие действия:
- посмотрел каталог продукции;
- выбрал конкретный товар;
- открыл отзывы других покупателей;
- заглянул в раздел «О компании».
Таким образом, из всего множества посетителей сайта следует выделить только тех, чье поведение содержит эту цепочку событий. Некоторые технологии Big Data поддерживают такую возможность работы с потоковыми данными, и Apache Flink является одним из них. В этом фреймворке, который идеально подходит для рабочих нагрузок CEP благодаря своей истинной потоковой природе и малой задержке, а также высокой пропускной способности обработки данных в реальном времени есть соответствующая библиотека.
Впервые API библиотеки CEP в Apache Flink был представлен в 2016 году. Библиотека FlinkCEP не является частью бинарного дистрибутива фреймворка, поэтому надо добавить зависимость FlinkCEP в файл pom.xml Flink-проекта. Например, на Java это будет выглядеть так:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-cep</artifactId> <version>1.15.0</version> </dependency>
Далее можно приступить к написанию CEP-программы с использованием Pattern API, что мы и рассмотрим далее.
Как работает CEP-библиотека
События в потоке данных, к которым нужно применить сопоставление с образцом, должны реализовывать методы equals() и hashCode(), поскольку FlinkCEP использует их для сравнения и сопоставления событий. API библиотеки позволяет определять шаблоны сложных событий, каждый из которых состоит из нескольких стадий или состояний. Шаблон имеет уникальное имя и должен начинаться с начального состояния, а затем переходить из одного состояния в другое, согласно условиям, которые задал пользователь. Каждое состояние должно иметь уникальное имя, чтобы позже идентифицировать совпадающие события.
Каждая последовательность сложных паттернов состоит из нескольких простых шаблонов, то есть паттернов, ищущих отдельные события с одинаковыми свойствами. FlinkCEP представляет такую последовательность паттернов в виде графа, где переходы от одного паттерна к другому происходят на основе заданных пользователем условий, например, event.getName().equals(«end»). Совпадение — это последовательность входных событий, которая входит во все шаблоны сложного графа через последовательность допустимых переходов.
Рассмотрим некоторые из наиболее часто используемых операций шаблона CEP:
- Begin — определяет начальное состояние шаблона и записывается следующим образом:
[php]Pattern<Event, ?> start = Pattern.<Event>begin("start");[/php]
- Next — добавляет новое состояние шаблона, и соответствующее событие должно сменить предыдущий шаблон соответствия:
[php]Pattern<Event, ?> next = start.next("далее");[/php]
- FollowedBy — добавляет новое состояние шаблона, но здесь могут происходить другие события между двумя совпадающими событиями:
[php]Pattern<Event, ?> followBy = start.followedBy("следующий");[/php]
- Where — определяет условие фильтра для текущего состояния шаблона, и если событие проходит фильтр, оно может соответствовать состоянию:
[php]patternState.where(new FilterFunction <Event>() { @Override public boolean filter(Event value) throws Exception { return … // some condition } });[/php]
- Or — добавляет новое условие фильтра, которое объединяется по ИЛИ с существующим условием фильтра. Только если событие проходит условие фильтра, оно может соответствовать состоянию.
- Within — определяет максимальный интервал времени, в течение которого последовательность событий соответствует записи шаблона, которую он отбрасывает. Мы можем записать это как:
[php]patternState.within(Time.seconds(10));[/php]
Зная операции FlinkCEP, можно создать поток шаблонов для запуска потока событий:
[php]DataStream <Event> input = … Pattern <Event, ?> pattern = … PatternStream <Event> patternStream = CEP.pattern(input, pattern);[/php]
В следующем примере есть начало шаблона, середина (имя = » error «) и конец (имя = » critical «) в ключевом потоке данных событий. События обозначаются их идентификаторами, и допустимый шаблон должен произойти в течение 10 секунд. Вся обработка выполняется с временем события. Код программы FlinkCEP будет выглядеть следующим образом:
StreamExecutionEnvironment env = ...; DataStream<Event> input = ...; DataStream<Event> partitionedInput = input.keyBy(new KeySelector<Event, Integer>() { @Override public Integer getKey(Event value) throws Exception { return value.getId(); } }); Pattern<Event, ?> pattern = Pattern.<Event>begin("start") .next("middle").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("error"); } }).followedBy("end").where(new SimpleCondition<Event>() { @Override public boolean filter(Event value) throws Exception { return value.getName().equals("critical"); } }).within(Time.seconds(10)); PatternStream<Event> patternStream = CEP.pattern(partitionedInput, pattern); DataStream<Alert> alerts = patternStream.select(new PatternSelectFunction<Event, Alert>() { @Override public Alert select(Map<String, List<Event>> pattern) throws Exception { return createAlert(pattern); } });
Таким образом, CEP-библиотека Apache Flink — это отличный API для решения сложных задач с минимальным объемом кода, который позволяет достичь высокой пропускной способности и обработки данных с низкой задержкой. Результаты можно получать в режиме реального времени, как только доступен входной поток событий, которые можно агрегировать во времени, фильтровать по тайм-ауту и выполнять другие вычисления. Это полезно при генерации предупреждений и уведомлений во множестве бизнес-приложений. Читайте в нашей новой статье про HTTP-коннектор для Apache Flink, который позволяет выполнять SQL-запросы в этом фреймворке, соединяя потоки с данными из внешнего RESTful-сервиса, доступ к которым реализуется по HTTP-запросу GET. А здесь мы рассказываем про JDBC-коннектор от GetIndata, который позволяет найти шаблоны в измененных данных, реализуя CDC-идеи.
Узнайте больше про использование возможностей Apache Flink для потоковой обработки событий в распределенных приложениях аналитики больших данных на специализированных курсах в нашем лицензированном учебном центре обучения и повышения квалификации для разработчиков, менеджеров, архитекторов, инженеров, администраторов, Data Scientist’ов и аналитиков Big Data в Москве:
Источники