59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)

Flink 系列文章

一、Flink 专栏

Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。

  • 1、Flink 部署系列
    本部分介绍Flink的部署、配置相关基础内容。

  • 2、Flink基础系列
    本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。

  • 3、Flik Table API和SQL基础系列
    本部分介绍Flink Table Api和SQL的基本用法,比如Table API和SQL创建库、表用法、查询、窗口函数、catalog等等内容。

  • 4、Flik Table API和SQL提高与应用系列
    本部分是table api 和sql的应用部分,和实际的生产应用联系更为密切,以及有一定开发难度的内容。

  • 5、Flink 监控系列
    本部分和实际的运维、监控工作相关。

二、Flink 示例专栏

Flink 示例专栏是 Flink 专栏的辅助说明,一般不会介绍知识点的信息,更多的是提供一个一个可以具体使用的示例。本专栏不再分目录,通过链接即可看出介绍的内容。

两专栏的所有文章入口点击:Flink 系列文章汇总索引



本文完整的介绍了Flink 的类库CEP的内容,通过大量的示例展示如何使用CEP。

如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本文除了maven依赖外,没有其他依赖。

本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)

一、Flink的复杂事件处理介绍

Flink CEP(Complex event processing)是在Flink上层实现的复杂事件处理库。 它可以让你在无限事件流中检测出特定的事件模型,有机会掌握数据中重要的那部分。

实时处理中的一个关键问题是检测数据流中的事件模式。复杂事件处理(CEP)解决了将连续传入的事件与模式进行匹配的问题。匹配的结果通常是从输入事件派生的复杂事件。与对存储的数据执行查询的传统DBMS不同,CEP对存储的查询执行数据。所有与查询无关的数据都可以立即丢弃。考虑到CEP查询应用于潜在的无限数据流,这种方法的优势是显而易见的。此外,输入被立即处理。一旦系统看到了匹配序列的所有事件,就会立即发出结果。这一方面有效地提高了CEP的实时分析能力。

CEP的处理范式引起了人们的极大兴趣,并在各种各样的用例中得到了应用。最值得注意的是,CEP目前用于金融应用,如股票市场趋势和信用卡欺诈检测。此外,它还用于基于RFID的跟踪和监控,例如,检测仓库中未正确检查物品的盗窃行为。CEP还可以通过指定可疑用户行为的模式来检测网络入侵。

本页讲述了Flink CEP中可用的API,我们首先讲述模式API,它可以让你指定想在数据流中检测的模式,然后讲述如何检测匹配的事件序列并进行处理。 再然后我们讲述Flink在按照事件时间处理迟到事件时的假设, 以及如何从旧版本的Flink向1.13之后的版本迁移作业。

FlinkCEP 不是二进制发布包的一部分。

1、maven依赖

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-cep</artifactId>
   <version>1.17.2</version>
</dependency>

2、入门示例

实现将输入流中balance大于23的输出。

import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestFirstDemo {
   

	@Data
	@NoArgsConstructor
	@AllArgsConstructor
	static class User {
   
		private Integer id;
		private String name;
		private Double balance;
		private Integer age;
		private String email;

		@Override
		public boolean equals(Object obj) {
   
			if (obj instanceof User) {
   
				User user = (User) obj;
				return this.id == user.id && this.name.equals(user.getName());
			} else {
   
				return false;
			}
		}

		@Override
		public int hashCode() {
   
			return super.hashCode() + Double.hashCode(id);
		}
	}

	final static List<User> userList = Arrays.asList(
			new User(1001, "alan", 20d, 18, "alan.chan.chn@163.com"),
			new User(1002, "alanchan", 22d, 20, "alan.chan.chn@163.com"),
			new User(1003, "alanchanchn", 23d, 22, "alan.chan.chn@163.com"),
			new User(1004, "alan_chan", 21d, 19, "alan.chan.chn@163.com"),
			new User(1005, "alan_chan_chn", 23d, 21, "alan.chan.chn@163.com"));

	public static void main(String[] args) throws Exception {
   
		// 设置环境
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 输入流
		DataStream<User> users = env.fromCollection(userList);

		// 设置匹配模式
		Pattern<User, ?> userPattern = Pattern.<User>begin("start").where(new SimpleCondition<User>() {
   

			@Override
			public boolean filter(User value) throws Exception {
   
				return value.getBalance() >= 23;
			}

		});

		// 将输入流匹配设置的模式,并得到匹配后的流
		DataStream<String> userResult = CEP.pattern(users, userPattern).inProcessingTime()
				.flatSelect(new PatternFlatSelectFunction<User, String>() {
   

					@Override
					public void flatSelect(Map<String, List<User>> pattern, Collector<String> out) throws Exception {
   
						out.collect(pattern.get("start").toString());
					}

				}, Types.STRING);

		// 输出
		userResult.print("user:");
		// 数据源
		// stockList

		// 控制台输出
		// user::10> [TestFirstDemo.User(id=1003, name=alanchanchn, balance=23.0,age=22, email=alan.chan.chn@163.com)]
		// user::11> [TestFirstDemo.User(id=1005, name=alan_chan_chn, balance=23.0, age=21, email=alan.chan.chn@163.com)]

		env.execute();
	}

}

DataStream中的事件,如果你想在上面进行模式匹配的话,必须实现合适的 equals()和hashCode()方法,因为FlinkCEP使用它们来比较和匹配事件。

3、编程模型

CEP编程模型分为三步,即如下:

  • 定义模式
 Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(
         Pattern.<LoginEvent>begin("first")
                 .where(new SimpleCondition<LoginEvent>() {
   

                     @Override
                     public boolean filter(LoginEvent value) throws Exception {
   
                         return value.getStatus().equals("F");
                     }

                 })

 );
  • 将模式映射到流上
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);
  • 在流上提取匹配模式后的数据
patternStream.flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {
   

            @Override
            public void flatSelect(Map<String, List<LoginEvent>> pattern, Collector<String> out)
                    throws Exception {
   
                out.collect(pattern.get("first").toString());
            }

        });

二、模式API

模式API可以让你定义想从输入流中抽取的复杂模式序列。

每个复杂的模式序列包括多个简单的模式,比如,寻找拥有相同属性事件序列的模式。从现在开始,我们把这些简单的模式称作模式, 把我们在数据流中最终寻找的复杂模式序列称作模式序列,你可以把模式序列看作是这样的模式构成的图, 这些模式基于用户指定的条件从一个转换到另外一个,比如 event.getName().equals(“end”)。 一个匹配是输入事件的一个序列,这些事件通过一系列有效的模式转换,能够访问到复杂模式图中的所有模式。

每个模式必须有一个独一无二的名字,你可以在后面使用它来识别匹配到的事件。

模式的名字不能包含字符":"

1、单个模式

一个模式可以是一个单例或者循环模式。

单例模式只接受一个事件,循环模式可以接受多个事件。

在模式匹配表达式中,模式"a b+ c? d"(或者"a",后面跟着一个或者多个"b",再往后可选择的跟着一个"c",最后跟着一个"d"), a,c?,和 d都是单例模式,b+是一个循环模式。

默认情况下,模式都是单例的,你可以通过使用量词把它们转换成循环模式。 每个模式可以有一个或者多个条件来决定它接受哪些事件。

1)、量词

在Flink CEP中,你可以通过这些方法指定循环模式:

  • pattern.oneOrMore(),指定期望一个给定事件出现一次或者多次的模式(例如前面提到的b+模式);
  • pattern.times(#ofTimes),指定期望一个给定事件出现特定次数的模式,例如出现4次a;
  • pattern.times(#fromTimes, #toTimes),指定期望一个给定事件出现次数在一个最小值和最大值中间的模式,比如出现2-4次a。

你可以使用pattern.greedy()方法让循环模式变成贪心的,但现在还不能让模式组贪心。

你可以使用pattern.optional()方法让所有的模式变成可选的,不管是否是循环模式。

对一个命名为userPattern的模式,以下量词是有效的:

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestCEPDemo {
   
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class LoginEvent {
   
        private Integer userId;
        private String ip;
        private String status;
        private Long timestamp;

        @Override
        public boolean equals(Object obj) {
   
            if (obj instanceof LoginEvent) {
   
                LoginEvent loginEvent = (LoginEvent) obj;
                return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)
                        && this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();
            } else {
   
                return false;
            }
        }

        @Override
        public int hashCode() {
   
            return super.hashCode() + Long.hashCode(timestamp);
        }
    }

    final static List<LoginEvent> loginEventList = Arrays.asList(
            new LoginEvent(1001, "192.168.10.1", "F", 2L),
            new LoginEvent(1001, "192.168.10.2", "F", 3L),
            new LoginEvent(1002, "192.168.10.8", "F", 4L),
            new LoginEvent(1001, "192.168.10.6", "F", 5L),
            new LoginEvent(1002, "192.168.10.8", "F", 7L),
            new LoginEvent(1002, "192.168.10.8", "F", 8L),
            new LoginEvent(1002, "192.168.10.8", "S", 6L),
            new LoginEvent(1003, "192.168.10.8", "F", 6L),
            new LoginEvent(1004, "192.168.10.3", "S", 4L)
            );

    static void test1() throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("first")
                .where(new SimpleCondition<LoginEvent>() {
   

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
   
                        return value.getStatus().equals("F");
                    }

                })
                // .times(3) // 期望出现3次
                // .times(3).optional() // 期望出现0或者3次
                // .times(2, 4) // 期望出现2、3或者4次
                // .times(2, 4).greedy() // 期望出现2、3或者4次,并且尽可能的重复次数多
                // .times(2, 4).optional() // 期望出现0、2、3或者4次
                // .times(2, 4).optional().greedy() // 期望出现0、2、3或者4次,并且尽可能的重复次数多
                // .oneOrMore() // 期望出现1到多次
                // .oneOrMore().greedy() // 期望出现1到多次,并且尽可能的重复次数多
                // .oneOrMore().optional() // 期望出现0到多次
                // .oneOrMore().optional().greedy() // 期望出现0到多次,并且尽可能的重复次数多
                // .timesOrMore(2) // 期望出现2到多次
                // .timesOrMore(2).greedy() // 期望出现2到多次,并且尽可能的重复次数多
                // .timesOrMore(2).optional() // 期望出现0、2或多次
                .timesOrMore(2).optional().greedy() // 期望出现0、2或多次,并且尽可能的重复次数多
                ;

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        patternStream
                .select(new PatternSelectFunction<LoginEvent, String>() {
   
                    @Override
                    public String select(Map<String, List<LoginEvent>> map) throws Exception {
   

                        return map.get("first").toString() ;
                    }
                })
                .print("输出信息:\n");

        // 控制台输出:

        env.execute();
    }

    public static void main(String[] args) throws Exception {
   
        test1();
    }
}


2)、条件

对每个模式你可以指定一个条件来决定一个进来的事件是否被接受进入这个模式,例如,它的value字段应该大于5,或者大于前面接受的事件的平均值。 指定判断事件属性的条件可以通过pattern.where()、pattern.or()或者pattern.until()方法。 这些可以是IterativeCondition或者SimpleCondition。

1、迭代条件

这是最普遍的条件类型。使用它可以指定一个基于前面已经被接受的事件的属性或者它们的一个子集的统计数据来决定是否接受时间序列的条件。

在 Flink CEP 中,提供了 IterativeCondition 抽象类。这其实是更加通用的条件表达,查看源码可以发现, .where()方法本身要求的参数类型就是 IterativeCondition;而之前 的SimpleCondition 是它的一个子类。

在 IterativeCondition 中同样需要实现一个 filter()方法,不过与 SimpleCondition 中不同的是,这个方法有两个参数:除了当前事件之外,还有一个上下文 Context。调用这个上下文的.getEventsForPattern()方法,传入一个模式名称,就可以拿到这个模式中已匹配到的所有数据了。

下面是一个迭代条件的代码,它接受"first"模式下一个事件的userid开头是"1001", 并且前面已经匹配到的事件登录状态改为S。 迭代条件非常强大,尤其是跟循环模式结合使用时。

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestCEPDemo {
   
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class LoginEvent {
   
        private Integer userId;
        private String ip;
        private String status;
        private Long timestamp;

        @Override
        public boolean equals(Object obj) {
   
            if (obj instanceof LoginEvent) {
   
                LoginEvent loginEvent = (LoginEvent) obj;
                return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)
                        && this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();
            } else {
   
                return false;
            }
        }

        @Override
        public int hashCode() {
   
            return super.hashCode() + Long.hashCode(timestamp);
        }
    }

    final static List<LoginEvent> loginEventList = Arrays.asList(
            new LoginEvent(1001, "192.168.10.1", "F", 2L),
            new LoginEvent(1001, "192.168.10.2", "F", 3L),
            new LoginEvent(1002, "192.168.10.8", "F", 4L),
            new LoginEvent(1001, "192.168.10.6", "F", 5L),
            new LoginEvent(1002, "192.168.10.8", "F", 7L),
            new LoginEvent(1002, "192.168.10.8", "F", 8L),
            new LoginEvent(1002, "192.168.10.8", "S", 6L),
            new LoginEvent(1003, "192.168.10.8", "F", 6L),
            new LoginEvent(1004, "192.168.10.3", "S", 4L));

    static void test2() throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("first")
                .where(new SimpleCondition<LoginEvent>() {
   

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
   
                        return value.getStatus().equals("F");
                    }

                })
                .next("second")
                .where(new IterativeCondition<TestCEPDemo.LoginEvent>() {
   

                    @Override
                    public boolean filter(LoginEvent value, Context<LoginEvent> ctx) throws Exception {
   
                        Iterable<LoginEvent> loginEventIterable = ctx.getEventsForPattern("first");
                        for (LoginEvent loginEvent : loginEventIterable) {
   
                            if (loginEvent.getUserId() == 1001) {
   
                                loginEvent.setStatus("S");
                            }
                        }
                        return value.getStatus().equals("F");
                    }

                });

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        patternStream
                .select(new PatternSelectFunction<LoginEvent, String>() {
   
                    @Override
                    public String select(Map<String, List<LoginEvent>> map) throws Exception {
   
                        return map.get("first").toString()+" \n"+map.get("second").toString();
                    }
                })
                .print("输出信息:\n");

        // 控制台输出:

        env.execute();
    }

    public static void main(String[] args) throws Exception {
   
        test2();
    }
}

调用ctx.getEventsForPattern(…)可以获得所有前面已经接受作为可能匹配的事件。调用这个操作的代价可能很小也可能很大,所以在实现你的条件时,尽量少使用它。

2、简单条件

这种类型的条件扩展了前面提到的IterativeCondition类,它决定是否接受一个事件只取决于事件自身的属性。


start.where(SimpleCondition.of(value -> value.getName().startsWith("foo")));

你可以通过pattern.subtype(subClass)方法限制接受的事件类型是初始事件的子类型。


start.subtype(SubEvent.class)
    .where(SimpleCondition.of(value -> ... /*一些判断条件*/));
    
3、组合条件

你可以把subtype条件和其他的条件结合起来使用。这适用于任何条件,你可以通过依次调用where()来组合条件。 最终的结果是每个单一条件的结果的逻辑AND。

如果想使用OR来组合条件,你可以像下面这样使用or()方法。


pattern
    .where(SimpleCondition.of(value -> ... /*一些判断条件*/))
    .or(SimpleCondition.of(value -> ... /*一些判断条件*/));
    

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestCEPDemo {
   
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class LoginEvent {
   
        private Integer userId;
        private String ip;
        private String status;
        private Long timestamp;

        @Override
        public boolean equals(Object obj) {
   
            if (obj instanceof LoginEvent) {
   
                LoginEvent loginEvent = (LoginEvent) obj;
                return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)
                        && this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();
            } else {
   
                return false;
            }
        }

        @Override
        public int hashCode() {
   
            return super.hashCode() + Long.hashCode(timestamp);
        }
    }

    final static List<LoginEvent> loginEventList = Arrays.asList(
            new LoginEvent(1001, "192.168.10.1", "F", 2L),
            new LoginEvent(1001, "192.168.10.2", "F", 3L),
            new LoginEvent(1002, "192.168.10.8", "F", 4L),
            new LoginEvent(1001, "192.168.10.6", "F", 5L),
            new LoginEvent(1002, "192.168.10.8", "F", 7L),
            new LoginEvent(1002, "192.168.10.8", "F", 8L),
            new LoginEvent(1002, "192.168.10.8", "S", 6L),
            new LoginEvent(1003, "192.168.10.8", "F", 6L),
            new LoginEvent(1004, "192.168.10.3", "S", 4L));

    static void test3() throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("first")
                .where(new SimpleCondition<LoginEvent>() {
   

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
   
                        return value.getStatus().equals("F");
                    }

                })
                .or(new IterativeCondition<TestCEPDemo.LoginEvent>() {
   

                    @Override
                    public boolean filter(LoginEvent value, Context<LoginEvent> ctx) throws Exception {
   
                        return value.getIp().equals("192.168.10.3");
                    }

                });

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        patternStream
                .select(new PatternSelectFunction<LoginEvent, String>() {
   
                    @Override
                    public String select(Map<String, List<LoginEvent>> map) throws Exception {
   
                        return map.get("first").toString();
                    }
                })
                .print("输出信息:\n");

        // 控制台输出:

        env.execute();
    }

    public static void main(String[] args) throws Exception {
   
        test3();
    }
}

4、停止条件

如果使用循环模式(oneOrMore()和oneOrMore().optional()),你可以指定一个停止条件,例如,接受事件的值大于5直到值的和小于50。

为了更好的理解它,看下面的例子。给定

  • 模式:“(a+ until b)” (一个或者更多的"a"直到"b")
  • 到来的事件序列:“a1” “c” “a2” “b” “a3”
  • 输出结果会是: {a1 a2} {a1} {a2} {a3}.

你可以看到{a1 a2 a3}和{a2 a3}由于停止条件没有被输出。

3)、where(condition)

为当前模式定义一个条件。为了匹配这个模式,一个事件必须满足某些条件。 多个连续的 where() 语句取与组成判断条件。


pattern.where(new IterativeCondition<Event>() {
   
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
   
        return ... // 一些判断条件
    }
});

4)、or(condition)

增加一个新的判断,和当前的判断取或。一个事件只要满足至少一个判断条件就匹配到模式。


pattern.where(new IterativeCondition<Event>() {
   
	    @Override
	    public boolean filter(Event value, Context ctx) throws Exception {
   
	        return ...; //  一些判断条件
	    }
	}).or(new IterativeCondition<Event>() {
   
	    @Override
	    public boolean filter(Event value, Context ctx) throws Exception {
   
	        return ...; // 替代条件 
	    }
});

5)、until(condition)

为循环模式指定一个停止条件。意思是满足了给定的条件的事件出现后,就不会再有事件被接受进入模式了。 只适用于和oneOrMore()同时使用。

在基于事件的条件中,它可用于清理对应模式的状态。


pattern.oneOrMore().until(new IterativeCondition<Event>() {
   
    @Override
    public boolean filter(Event value, Context ctx) throws Exception {
   
        return ...; // 替代条件 
    }
});

static void test4() throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("first")
                // .where(new SimpleCondition<LoginEvent>() {
   

                //     @Override
                //     public boolean filter(LoginEvent value) throws Exception {
   
                //         return value.getStatus().equals("F");
                //     }

                // })
                .oneOrMore()
                .until(new IterativeCondition<TestCEPDemo.LoginEvent>() {
   

                    @Override
                    public boolean filter(LoginEvent value, Context<LoginEvent> ctx) throws Exception {
   
                        return value.getIp().equals("192.168.10.3");
                    }

                });

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        patternStream
                .select(new PatternSelectFunction<LoginEvent, String>() {
   
                    @Override
                    public String select(Map<String, List<LoginEvent>> map) throws Exception {
   
                        return map.get("first").toString();
                    }
                })
                .print("输出信息:\n");

        // 控制台输出:

        env.execute();
}

6)、subtype(subClass)

为当前模式定义一个子类型条件。一个事件只有是这个子类型的时候才能匹配到模式。


pattern.subtype(SubEvent.class);

7)、oneOrMore()

指定模式期望匹配到的事件至少出现一次。 默认(在子事件间)使用松散的内部连续性。 推荐使用 until()或者 within()来清理状态。


pattern.oneOrMore();

8)、timesOrMore(#times)

指定模式期望匹配到的事件至少出现 #times 次。 默认(在子事件间)使用松散的内部连续性。


pattern.timesOrMore(2);

9)、times(#ofTimes)

指定模式期望匹配到的事件正好出现的次数。 默认(在子事件间)使用松散的内部连续性。

pattern.times(2);

10)、times(#fromTimes, #toTimes)

指定模式期望匹配到的事件出现次数在#fromTimes和#toTimes之间。 默认(在子事件间)使用松散的内部连续性。

pattern.times(2, 4);

11)、optional()

指定这个模式是可选的,也就是说,它可能根本不出现。这对所有之前提到的量词都适用。

pattern.oneOrMore().optional();

12)、greedy()

指定这个模式是贪心的,也就是说,它会重复尽可能多的次数。这只对量词适用,现在还不支持模式组。

pattern.oneOrMore().greedy();

2、组合模式

1)、组合模式介绍

模式序列由一个初始模式作为开头,如下所示:

Pattern<Event, ?> start = Pattern.<Event>begin("start");

接下来,你可以增加更多的模式到模式序列中并指定它们之间所需的连续条件。

FlinkCEP支持事件之间如下形式的连续策略:

  • 严格连续: 期望所有匹配的事件严格的一个接一个出现,中间没有任何不匹配的事件。
  • 松散连续: 忽略匹配的事件之间的不匹配的事件。
  • 不确定的松散连续: 更进一步的松散连续,允许忽略掉一些匹配事件的附加匹配。

可以使用下面的方法来指定模式之间的连续策略:

  • next(),指定严格连续,
  • followedBy(),指定松散连续,
  • followedByAny(),指定不确定的松散连续。

或者

  • notNext(),如果不想后面直接连着一个特定事件
  • notFollowedBy(),如果不想一个特定事件发生在两个事件之间的任何地方。

如果模式序列没有定义时间约束,则不能以 notFollowedBy() 结尾。

一个 NOT 模式前面不能是可选的模式。


// 严格连续
Pattern<Event, ?> strict = start.next("middle").where(...);

// 松散连续
Pattern<Event, ?> relaxed = start.followedBy("middle").where(...);

// 不确定的松散连续
Pattern<Event, ?> nonDetermin = start.followedByAny("middle").where(...);

// 严格连续的NOT模式
Pattern<Event, ?> strictNot = start.notNext("not").where(...);

// 松散连续的NOT模式
Pattern<Event, ?> relaxedNot = start.notFollowedBy("not").where(...);

松散连续意味着跟着的事件中,只有第一个可匹配的事件会被匹配上,而不确定的松散连接情况下,有着同样起始的多个匹配会被输出。 举例来说,模式"a b",给定事件序列"a",“c”,“b1”,“b2”,会产生如下的结果:

  • "a"和"b"之间严格连续: {} (没有匹配),"a"之后的"c"导致"a"被丢弃。
  • “a"和"b"之间松散连续: {a b1},松散连续会"跳过不匹配的事件直到匹配上的事件”。
  • "a"和"b"之间不确定的松散连续: {a b1}, {a b2},这是最常见的情况。

也可以为模式定义一个有效时间约束。

例如,你可以通过pattern.within()方法指定一个模式应该在10秒内发生。 这种时间模式支持处理时间和事件时间.

一个模式序列只能有一个时间限制。如果限制了多个时间在不同的单个模式上,会使用最小的那个时间限制。

next.within(Time.seconds(10));

注意定义过时间约束的模式允许以 notFollowedBy() 结尾。

例如,可以定义如下的模式:

Pattern.<Event>begin("start")
    .next("middle")
    .where(SimpleCondition.of(value -> value.getName().equals("a")))
    .notFollowedBy("end")
    .where(SimpleCondition.of(value -> value.getName().equals("b")))
    .within(Time.seconds(10));
    

2)、循环模式中的连续性

连续性会被运用在被接受进入模式的事件之间。 用这个例子来说明上面所说的连续性,

一个模式序列"a b+ c"(“a"后面跟着一个或者多个(不确定连续的)“b”,然后跟着一个"c”)

输入为"a",“b1”,“d1”,“b2”,“d2”,“b3”,“c”,

输出结果如下:

  • 严格连续: {a b1 c}, {a b2 c}, {a b3 c} - 没有相邻的 “b” 。
  • 松散连续: {a b1 c},{a b1 b2 c},{a b1 b2 b3 c},{a b2 c},{a b2 b3 c},{a b3 c} - "d"都被忽略了。
  • 不确定松散连续: {a b1 c},{a b1 b2 c},{a b1 b3 c},{a b1 b2 b3 c},{a b2 c},{a b2 b3 c},{a b3 c} - 注意{a b1 b3 c},这是因为"b"之间是不确定松散连续产生的。

对于循环模式(例如oneOrMore()和times())),默认是松散连续。

如果想使用严格连续,你需要使用consecutive()方法明确指定

如果想使用不确定松散连续,你可以使用allowCombinations()方法。

  • consecutive()
    与oneOrMore()和times()一起使用, 在匹配的事件之间施加严格的连续性, 也就是说,任何不匹配的事件都会终止匹配(和next()一样)。
    如果不使用它,那么就是松散连续(和followedBy()一样)。

Pattern.<Event>begin("start")
    .where(SimpleCondition.of(value -> value.getName().equals("c")))
    .followedBy("middle")
    .where(SimpleCondition.of(value -> value.getName().equals("a")))
    .oneOrMore()
    .consecutive()
    .followedBy("end1")
    .where(SimpleCondition.of(value -> value.getName().equals("b")));

// 输入:C D A1 A2 A3 D A4 B
// 会产生下面的输出:
// 如果施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B}
// 不施加严格连续性: {C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}

  • allowCombinations()
    与oneOrMore()和times()一起使用,在匹配的事件中间施加不确定松散连续性(和followedByAny()一样)。
    如果不使用,就是松散连续(和followedBy()一样)。
Pattern.<Event>begin("start")
    .where(SimpleCondition.of(value -> value.getName().equals("c")))
    .followedBy("middle")
    .where(SimpleCondition.of(value -> value.getName().equals("a")))
    .oneOrMore()
    .allowCombinations()
    .followedBy("end1")
    .where(SimpleCondition.of(value -> value.getName().equals("b")));

// 输入:C D A1 A2 A3 D A4 B
// 会产生如下的输出:
// 如果使用不确定松散连续: {C A1 B},{C A1 A2 B},{C A1 A3 B},{C A1 A4 B},{C A1 A2 A3 B},{C A1 A2 A4 B},{C A1 A3 A4 B},{C A1 A2 A3 A4 B}
// 如果不使用:{C A1 B},{C A1 A2 B},{C A1 A2 A3 B},{C A1 A2 A3 A4 B}

3、模式组

也可以定义一个模式序列作为begin,followedBy,followedByAny和next的条件。

这个模式序列在逻辑上会被当作匹配的条件, 并且返回一个GroupPattern,可以在GroupPattern上使用oneOrMore(),times(#ofTimes), times(#fromTimes, #toTimes),optional(),consecutive(),allowCombinations()。

此处官方的示例好像是不能编译过的,需要将其“?”变为具体的输出类型方可。

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.GroupPattern;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestCEPDemo {
   
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class LoginEvent {
   
        private Integer userId;
        private String ip;
        private String status;
        private Long timestamp;

        @Override
        public boolean equals(Object obj) {
   
            if (obj instanceof LoginEvent) {
   
                LoginEvent loginEvent = (LoginEvent) obj;
                return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)
                        && this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();
            } else {
   
                return false;
            }
        }

        @Override
        public int hashCode() {
   
            return super.hashCode() + Long.hashCode(timestamp);
        }
    }

    final static List<LoginEvent> loginEventList = Arrays.asList(
            new LoginEvent(1001, "192.168.10.1", "F", 2L),
            new LoginEvent(1001, "192.168.10.2", "F", 3L),
            new LoginEvent(1002, "192.168.10.8", "F", 4L),
            new LoginEvent(1001, "192.168.10.6", "F", 5L),
            new LoginEvent(1002, "192.168.10.8", "F", 7L),
            new LoginEvent(1002, "192.168.10.8", "F", 8L),
            new LoginEvent(1002, "192.168.10.8", "S", 6L),
            new LoginEvent(1003, "192.168.10.8", "F", 6L),
            new LoginEvent(1004, "192.168.10.3", "S", 4L));

    static void test5() throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, LoginEvent> loginEventPattern = Pattern.begin(
                Pattern.<LoginEvent>begin("first")
                        .where(new SimpleCondition<LoginEvent>() {
   

                            @Override
                            public boolean filter(LoginEvent value) throws Exception {
   
                                return value.getStatus().equals("F");
                            }

                        })

        );

        // 严格连续
        Pattern<LoginEvent, ?> strictLoginEventPattern = loginEventPattern.next(
                Pattern.<LoginEvent>begin("test").where(new SimpleCondition<LoginEvent>() {
   

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
   
                        return false;
                    }

                })).times(3);

        // 松散连续
        Pattern<LoginEvent, ?> relaxedLoginEventPattern = Pattern.begin(
                Pattern.<LoginEvent>begin("r_first")
                        .where(new SimpleCondition<LoginEvent>() {
   

                            @Override
                            public boolean filter(LoginEvent value) throws Exception {
   
                                return value.getStatus().equals("F");
                            }

                        })

        ).oneOrMore();

        // 不确定松散连续
        Pattern<LoginEvent, ?> nonDeterminLoginEventPattern = Pattern.begin(
                Pattern.<LoginEvent>begin("n_first")
                        .where(new SimpleCondition<LoginEvent>() {
   

                            @Override
                            public boolean filter(LoginEvent value) throws Exception {
   
                                return value.getStatus().equals("F");
                            }

                        })

        ).optional();

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, nonDeterminLoginEventPattern);

        // 将匹配到的流选择出来输出
        patternStream
                .select(new PatternSelectFunction<LoginEvent, String>() {
   
                    @Override
                    public String select(Map<String, List<LoginEvent>> map) throws Exception {
   
                        return map.get("n_first").toString();
                    }
                })
                .print("输出信息:\n");

        // 控制台输出:

        env.execute();
    }

    public static void main(String[] args) throws Exception {
   
        test5();
    }
}

具体方法说明详见下图
在这里插入图片描述

4、匹配后跳过策略

对于一个给定的模式,同一个事件可能会分配到多个成功的匹配上。

为了控制一个事件会分配到多少个匹配上,你需要指定跳过策略AfterMatchSkipStrategy。

有五种跳过策略,如下:

  • NO_SKIP: 每个成功的匹配都会被输出。
  • SKIP_TO_NEXT: 丢弃以相同事件开始的所有部分匹配。
  • SKIP_PAST_LAST_EVENT: 丢弃起始在这个匹配的开始和结束之间的所有部分匹配。
  • SKIP_TO_FIRST: 丢弃起始在这个匹配的开始和第一个出现的名称为PatternName事件之间的所有部分匹配。
  • SKIP_TO_LAST: 丢弃起始在这个匹配的开始和最后一个出现的名称为PatternName事件之间的所有部分匹配。

当使用SKIP_TO_FIRST和SKIP_TO_LAST策略时,需要指定一个合法的PatternName。

  • 示例:不同跳过策略的结果
    模式:b+ c
    输入流:b1 b2 b3 c
    结果如下:
    在这里插入图片描述
  • 示例:NO_SKIP和SKIP_TO_FIRST区别
    模式: (a | b | c) (b | c) c+.greedy d
    输入流:a b c1 c2 c3 d
    结果如下:
    在这里插入图片描述
  • 示例: NO_SKIP和SKIP_TO_NEXT区别
    模式:a b+
    输入流:a b1 b2 b3
    结果如下:
    在这里插入图片描述
    想指定要使用的跳过策略,只需要调用下面的方法创建AfterMatchSkipStrategy:
    在这里插入图片描述
    可以通过调用下面方法将跳过策略应用到模式上:

AfterMatchSkipStrategy skipStrategy = ...;
Pattern.begin("patternName", skipStrategy);

使用SKIP_TO_FIRST/LAST时,有两个选项可以用来处理没有事件可以映射到对应模式名上的情况。 默认情况下会使用NO_SKIP策略,另外一个选项是抛出异常。 可以使用如下的选项:


AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss();

三、检测模式

1、将模式应用到流上

将模式应用到事件流上只要调用 CEP 类的静态方法.pattern()即可,将数据流(DataStream)和模式(Pattern)作为两个参数传入就可以了。最终得到的是一个 PatternStream:

// 代码片段,完整内容可以参考本文中的其他完整示例
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 按用户id分组
DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList)
        .keyBy(loginEvent -> loginEvent.getUserId());

// 定义模式
Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin( ... );

// 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

输入流根据你的使用场景可以是keyed或者non-keyed。

在 non-keyed 流上使用模式将会使你的作业并发度被设为1。

这里的 DataStream,也可以通过 keyBy 进行按键分区得到 KeyedStream,接下来对复杂事件的检测就会针对不同的 key 单独进行了。

模式中定义的复杂事件,发生是有先后顺序的,这里“先后”的判断标准取决于具体的时间语义。

默认情况下采用事件时间语义,那么事件会以各自的时间戳进行排序;

如果是处理时间语义,那么所谓先后就是数据到达的顺序。

对于时间戳相同或是同时到达的事件,我们还可以在 CEP.pattern()中传入一个比较器作为第三个参数,用来进行更精确的排序:

// 代码片段
DataStream<Event> input = ...;
Pattern<Event, ?> pattern = ...;
EventComparator<Event> comparator = ...; // 可选的

PatternStream<Event> patternStream = CEP.pattern(loginEventDS, loginEventPattern, comparator);

得到 PatternStream 后,接下来要做的就是对匹配事件的检测处理了。

2、从模式中选取

PatternStream 的转换操作分成两种:选择提取(select)操作和处理(process)操作。与 DataStream 的转换类似,在调用API 时传入一个函数类,即选择操作传入的是一个 PatternSelectFunction,处理操作传入PatternProcessFunction。

1)、匹配事件的选择提取(PatternSelectFunction)

处理匹配事件最简单的方式,就是从 PatternStream 中直接把匹配的复杂事件提取出来,包装成想要的信息输出,这个操作就是“选择”(select)。

PatternSelectFunction:代码中基于 PatternStream 直接调用.select()方法,传入一个 PatternSelectFunction 作为参数。

  • 示例
static void test1() throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("first")
                .where(new SimpleCondition<LoginEvent>() {
   

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
   
                        return value.getStatus().equals("F");
                    }

                })
                .next("second")
                .where(new SimpleCondition<LoginEvent>() {
   

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
   
                        return value.getStatus().equals("F");
                    }

                }).next("third")
                .where(new SimpleCondition<LoginEvent>() {
   

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
   
                        return value.getStatus().equals("F");
                    }

                });

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        patternStream
                .select(new PatternSelectFunction<LoginEvent, String>() {
   
                    @Override
                    public String select(Map<String, List<LoginEvent>> map) throws Exception {
   
                        return map.get("first").toString() + " \n" + map.get("second").toString() + " \n"
                        + map.get("third").toString();
                    }
                })
                .print("输出信息:\n");

        // 控制台输出:

        env.execute();
    }

PatternSelectFunction 是 Flink CEP 提供的一个函数类接口,需要实现一个 select()方法,这个方法每当检测到一组匹配的复杂事件时都会调用一次。它会将检测到的匹配事件保存在一个 Map 里,对应的 key 就是这些事件的名称。这里的“事件名称”就对应着在模式中定义的每个个体模式的名称;而个体模式可以是循环模式,一个名称会对应多个事件,所以最终保存在 Map 里的 value 就是一个事件的列表(List)。

如果个体模式是单例的,那么 List 中只有一个元素,直接调用.get(0)就可以把它取出。

如果个体模式是循环的,List 中就有可能有多个元素了。

可以将匹配到的事件包装成 String 类型输出,代码如下:


static void test2() throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("pattern")
                .where(new SimpleCondition<LoginEvent>() {
   

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
   
                        return value.getStatus().equals("F");
                    }

                })
                .times(3) // 匹配三次
                .consecutive();

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        patternStream
                .select(new PatternSelectFunction<LoginEvent, String>() {
   
                    @Override
                    public String select(Map<String, List<LoginEvent>> map) throws Exception {
   
                        // list中放了一个匹配了3个事件的模式
                        return map.get("pattern").get(0).toString() + " \n" + map.get("pattern").get(1).toString()
                                + " \n" + map.get("pattern").get(2).toString();
                    }
                })
                .print("输出信息:\n");

        // 控制台输出:

        env.execute();
    }

2)、匹配事件的选择提取(PatternFlatSelectFunction)

要实现一个flatSelect()方法,与 select()的不同就在于没有返回值,b并且多了一个收集器(Collector)参数 out,通过调用 out.collet()方法就可以实现多次发送输出数据了。

static void test3() throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("pattern")
                .where(new SimpleCondition<LoginEvent>() {
   

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
   
                        return value.getStatus().equals("F");
                    }

                })
                .times(3) // 匹配三次
                .consecutive();

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        patternStream
                .flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {
   

                    @Override
                    public void flatSelect(Map<String, List<LoginEvent>> map, Collector<String> out)
                            throws Exception {
   
                        out.collect(// list中放了一个匹配了3个事件的模式
                                map.get("pattern").get(0).toString() + " \n" + map.get("pattern").get(1).toString()
                                        + " \n" + map.get("pattern").get(2).toString());
                    }

                })
                .print("输出信息:\n");

        // 控制台输出:

        env.execute();
    }

3)、匹配事件的通用处理(PatternProcessFunction)

在获得到一个PatternStream之后,你可以应用各种转换来发现事件序列。

推荐使用PatternProcessFunction。

PatternProcessFunction有一个processMatch的方法在每找到一个匹配的事件序列时都会被调用。 它按照Map<String, List>的格式接收一个匹配,映射的键是你的模式序列中的每个模式的名称,值是被接受的事件列表(IN是输入事件的类型)。 模式的输入事件按照时间戳进行排序。为每个模式返回一个接受的事件列表的原因是当使用循环模式(比如oneToMany()和times())时, 对一个模式会有不止一个事件被接受。

PatternProcessFunction 中必须实现一个 processMatch()方法;这个方法与之前的 flatSelect()类似,只是多了一个上下文 Context 参数。利用这个上下文可以获取当前的时间信息,比如事件的时间戳(timestamp)或者处理时间(processing time),还可以调用.output()方法将数据输出到侧输出流。在 CEP 中,侧输出流一般被用来处理超时事件。

  • 官方示例

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> {
   
    @Override
    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
        IN startEvent = match.get("start").get(0);
        IN endEvent = match.get("end").get(0);
        out.collect(OUT(startEvent, endEvent));
    }
}

  • 完整示例

package org.cep;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestCEPDemo {
   
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class LoginEvent {
   
        private Integer userId;
        private String ip;
        private String status;
        private Long timestamp;

        @Override
        public boolean equals(Object obj) {
   
            if (obj instanceof LoginEvent) {
   
                LoginEvent loginEvent = (LoginEvent) obj;
                return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)
                        && this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();
            } else {
   
                return false;
            }
        }

        @Override
        public int hashCode() {
   
            return super.hashCode() + Long.hashCode(timestamp);
        }
    }

    final static List<LoginEvent> loginEventList = Arrays.asList(
            new LoginEvent(1001, "192.168.10.1", "F", 2L),
            new LoginEvent(1001, "192.168.10.2", "F", 3L),
            new LoginEvent(1002, "192.168.10.8", "F", 4L),
            new LoginEvent(1001, "192.168.10.6", "F", 5L),
            new LoginEvent(1002, "192.168.10.8", "F", 7L),
            new LoginEvent(1002, "192.168.10.8", "F", 8L),
            new LoginEvent(1002, "192.168.10.8", "S", 6L),
            new LoginEvent(1003, "192.168.10.8", "F", 6L),
            new LoginEvent(1004, "192.168.10.3", "S", 4L));

    static void testProcess() throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(
                Pattern.<LoginEvent>begin("first")
                        .where(new SimpleCondition<LoginEvent>() {
   

                            @Override
                            public boolean filter(LoginEvent value) throws Exception {
   
                                return value.getStatus().equals("F");
                            }

                        })

        );

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        patternStream
                .flatSelect(new PatternFlatSelectFunction<LoginEvent, String>() {
   

                    @Override
                    public void flatSelect(Map<String, List<LoginEvent>> pattern, Collector<String> out)
                            throws Exception {
   
                        out.collect(pattern.get("first").toString());
                    }

                })
                .print("flatSelect输出信息:\n");

        patternStream.process(new PatternProcessFunction<LoginEvent, String>() {
   

            @Override
            public void processMatch(Map<String, List<LoginEvent>> match, Context ctx, Collector<String> out)
                    throws Exception {
   
                out.collect(match.get("first").toString());
            }

        }).print("process输出信息:\n");

        // 控制台输出:

        env.execute();
    }

    public static void main(String[] args) throws Exception {
   
        testProcess();
    }
}

PatternProcessFunction可以访问Context对象。有了它之后,你可以访问时间属性,比如currentProcessingTime或者当前匹配的timestamp (最新分配到匹配上的事件的时间戳)。

3、处理超时的部分匹配

当一个模式上通过within加上窗口长度后,部分匹配的事件序列就可能因为超过窗口长度而被丢弃。可以使用TimedOutPartialMatchHandler接口 来处理超时的部分匹配。这个接口可以和其它的混合使用。也就是说你可以在自己的PatternProcessFunction里另外实现这个接口。 TimedOutPartialMatchHandler提供了另外的processTimedOutMatch方法,这个方法对每个超时的部分匹配都会调用。

  • 官方示例

class MyPatternProcessFunction<IN, OUT> extends PatternProcessFunction<IN, OUT> implements TimedOutPartialMatchHandler<IN> {
   
    @Override
    public void processMatch(Map<String, List<IN>> match, Context ctx, Collector<OUT> out) throws Exception;
        ...
    }

    @Override
    public void processTimedOutMatch(Map<String, List<IN>> match, Context ctx) throws Exception;
        IN startEvent = match.get("start").get(0);
        ctx.output(outputTag, T(startEvent));
    }
}

processTimedOutMatch不能访问主输出。 但你可以通过Context对象把结果输出到侧输出。

前面提到的PatternProcessFunction是在Flink 1.8之后引入的,从那之后推荐使用这个接口来处理匹配到的结果。 用户仍然可以使用像select/flatSelect这样旧格式的API,它们会在内部被转换为PatternProcessFunction。

PatternStream<Event> patternStream = CEP.pattern(input, pattern);

OutputTag<String> outputTag = new OutputTag<String>("side-output"){
   };

SingleOutputStreamOperator<ComplexEvent> flatResult = patternStream.flatSelect(
    outputTag,
    new PatternFlatTimeoutFunction<Event, TimeoutEvent>() {
   
        public void timeout(
                Map<String, List<Event>> pattern,
                long timeoutTimestamp,
                Collector<TimeoutEvent> out) throws Exception {
   
            out.collect(new TimeoutEvent());
        }
    },
    new PatternFlatSelectFunction<Event, ComplexEvent>() {
   
        public void flatSelect(Map<String, List<IN>> pattern, Collector<OUT> out) throws Exception {
   
            out.collect(new ComplexEvent());
        }
    }
);

DataStream<TimeoutEvent> timeoutFlatResult = flatResult.getSideOutput(outputTag);

1)、使用 PatternProcessFunction 的侧输出流

在 Flink CEP 中 , 提供了一个专门捕捉超时的部分匹配事件的接口TimedOutPartialMatchHandler。这个接口需要实现一个 processTimedOutMatch()方法,可以将超时的已检测到的部分匹配事件放在一个 Map 中,作为方法的第一个参数;方法的第二个参数则是 PatternProcessFunction 的上下文 Context。这个接口必须与 PatternProcessFunction结合使用,对处理结果的输出则需要利用侧输出流来进行。

官方推荐做法

完整示例如下:


package org.cep;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestCEPDemo {
   
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class LoginEvent {
   
        private Integer userId;
        private String ip;
        private String status;
        private Long timestamp;

        @Override
        public boolean equals(Object obj) {
   
            if (obj instanceof LoginEvent) {
   
                LoginEvent loginEvent = (LoginEvent) obj;
                return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)
                        && this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();
            } else {
   
                return false;
            }
        }

        @Override
        public int hashCode() {
   
            return super.hashCode() + Long.hashCode(timestamp);
        }
    }

    final static List<LoginEvent> loginEventList = Arrays.asList(
            new LoginEvent(1001, "192.168.10.1", "F", 2L),
            new LoginEvent(1001, "192.168.10.2", "F", 3L),
            new LoginEvent(1002, "192.168.10.8", "F", 4L),
            new LoginEvent(1001, "192.168.10.6", "F", 5L),
            new LoginEvent(1002, "192.168.10.8", "F", 7L),
            new LoginEvent(1002, "192.168.10.8", "F", 8L),
            new LoginEvent(1002, "192.168.10.8", "S", 6L),
            new LoginEvent(1003, "192.168.10.8", "F", 6L),
            new LoginEvent(1005, "192.168.10.8", "F", 26L),
            new LoginEvent(1004, "192.168.10.3", "S", 4L));

    // 推荐做法
    static void testProcessTimedOut() throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(
                Pattern.<LoginEvent>begin("first")
                        .where(new SimpleCondition<LoginEvent>() {
   

                            @Override
                            public boolean filter(LoginEvent value) throws Exception {
   
                                return value.getStatus().equals("F");
                            }

                        })

        );

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        OutputTag<String> outputTag = new OutputTag<String>("alan_ProcessTimedOut", TypeInformation.of(String.class));
        DataStream<String> resultStream = patternStream.process(new AlanProcessTimedOut(outputTag));

        // 正常流输出
        resultStream.print("输出信息:\n");
        // 超时流输出,通过OutputTag
        ((SingleOutputStreamOperator<String>) resultStream).getSideOutput(outputTag).print("timeout输出信息:\n");

        // 控制台输出:

        env.execute();
    }

    public static void main(String[] args) throws Exception {
   
        testProcessTimedOut();
    }

    static class AlanProcessTimedOut extends PatternProcessFunction<LoginEvent, String>
            implements TimedOutPartialMatchHandler<LoginEvent> {
   

        private OutputTag<String> outputTag;

        public AlanProcessTimedOut(OutputTag<String> outputTag) {
   
            this.outputTag = outputTag;
        }

        // 超时匹配处理
        @Override
        public void processTimedOutMatch(Map<String, List<LoginEvent>> match, Context ctx) throws Exception {
   
            // OutputTag<LoginEvent> outputTag = new OutputTag<LoginEvent>("AlanProcessTimedOut");
            ctx.output(outputTag, match.get("first").toString());
        }

        // 正常匹配处理
        @Override
        public void processMatch(Map<String, List<LoginEvent>> match, Context ctx, Collector<String> out)
                throws Exception {
   
            out.collect(match.get("first").toString());
        }

    }
}

2)、使用 PatternTimeoutFunction的侧输出流

PatternProcessFunction通过实现TimedOutPartialMatchHandler接口扩展出了处理超时事件的能力,这是官方推荐的做法。

Flink CEP 中也保留了简化版的PatternSelectFunction,它无法直接处理超时事件,不过可以通过调用 PatternStream的.select()方法时多传入一个 PatternTimeoutFunction 参数来实现这一点。

PatternTimeoutFunction 是早期版本中用于捕获超时事件的接口。它需要实现一个 timeout()方法,同样会将部分匹配的事件放在一个 Map 中作为参数传入,此外还有一个参数是当前的时间戳。提取部分匹配事件进行处理转换后,可以将通知或报警信息输出。

在调用 PatternStream 的.select()方法时需要传入三个参数:

  • 侧输出流标签(OutputTag)
  • 超时事件处理函数 PatternTimeoutFunction
  • 匹配事件提取函数PatternSelectFunction
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternFlatSelectFunction;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.PatternTimeoutFunction;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.functions.TimedOutPartialMatchHandler;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.IterativeCondition;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 
 */
public class TestCEPDemo {
   
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class LoginEvent {
   
        private Integer userId;
        private String ip;
        private String status;
        private Long timestamp;

        @Override
        public boolean equals(Object obj) {
   
            if (obj instanceof LoginEvent) {
   
                LoginEvent loginEvent = (LoginEvent) obj;
                return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)
                        && this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();
            } else {
   
                return false;
            }
        }

        @Override
        public int hashCode() {
   
            return super.hashCode() + Long.hashCode(timestamp);
        }
    }

    final static List<LoginEvent> loginEventList = Arrays.asList(
            new LoginEvent(1001, "192.168.10.1", "F", 2L),
            new LoginEvent(1001, "192.168.10.2", "F", 3L),
            new LoginEvent(1002, "192.168.10.8", "F", 4L),
            new LoginEvent(1001, "192.168.10.6", "F", 5L),
            new LoginEvent(1002, "192.168.10.8", "F", 7L),
            new LoginEvent(1002, "192.168.10.8", "F", 8L),
            new LoginEvent(1002, "192.168.10.8", "S", 6L),
            new LoginEvent(1003, "192.168.10.8", "F", 6L),
            new LoginEvent(1005, "192.168.10.8", "F", 26L),
            new LoginEvent(1004, "192.168.10.3", "S", 4L));

    static void testProcessTimedOut2() throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.begin(
                Pattern.<LoginEvent>begin("first")
                        .where(new SimpleCondition<LoginEvent>() {
   

                            @Override
                            public boolean filter(LoginEvent value) throws Exception {
   
                                return value.getStatus().equals("F");
                            }

                        })

        );

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        OutputTag<String> outputTag = new OutputTag<String>("alan_ProcessTimedOut", TypeInformation.of(String.class));

        SingleOutputStreamOperator<String> resultStream = patternStream.select(outputTag,
                new PatternTimeoutFunction<LoginEvent, String>() {
   

                    // 处理超时流
                    @Override
                    public String timeout(Map<String, List<LoginEvent>> pattern, long timeoutTimestamp)
                            throws Exception {
   
                        return pattern.get("first").toString() + "  timeoutTimestamp:" + timeoutTimestamp;
                    }
                }, new PatternSelectFunction<LoginEvent, String>() {
   

                    // 处理正常流
                    @Override
                    public String select(Map<String, List<LoginEvent>> pattern) throws Exception {
   
                        return pattern.get("first").toString();
                    }
                });

        // 正常流输出
        resultStream.print("输出信息:\n");
        // 超时流输出,通过OutputTag
        resultStream.getSideOutput(outputTag).print("timeout输出信息:\n");

        // 控制台输出:

        env.execute();
    }

    public static void main(String[] args) throws Exception {
   
        testProcessTimedOut2();
    }

}

四、CEP库中的时间

1、按照事件时间处理迟到事件

在CEP中,事件的处理顺序很重要。在使用事件时间时,为了保证事件按照正确的顺序被处理,一个事件到来后会先被放到一个缓冲区中, 在缓冲区里事件都按照时间戳从小到大排序,当水位线到达后,缓冲区中所有小于水位线的事件被处理。这意味着水位线之间的数据都按照时间戳被顺序处理。

这个库假定按照事件时间时水位线一定是正确的。
为了保证跨水位线的事件按照事件时间处理,Flink CEP库假定水位线一定是正确的,并且把时间戳小于最新水位线的事件看作是晚到的。 晚到的事件不会被处理。你也可以指定一个侧输出标志来收集比最新水位线晚到的事件,你可以这样做:


PatternStream<Event> patternStream = CEP.pattern(input, pattern);

OutputTag<String> lateDataOutputTag = new OutputTag<String>("late-data"){
   };

SingleOutputStreamOperator<ComplexEvent> result = patternStream
    .sideOutputLateData(lateDataOutputTag)
    .select(
        new PatternSelectFunction<Event, ComplexEvent>() {
   ...}
    );

DataStream<String> lateData = result.getSideOutput(lateDataOutputTag);

2、时间上下文

在PatternProcessFunction中,用户可以和IterativeCondition中 一样按照下面的方法使用实现了TimeContext的上下文:


/**
 * 支持获取事件属性比如当前处理事件或当前正处理的事件的时间。
 * 用在{@link PatternProcessFunction}和{@link org.apache.flink.cep.pattern.conditions.IterativeCondition}中
 */
@PublicEvolving
public interface TimeContext {
   

	/**
	 * 当前正处理的事件的时间戳。
	 *
	 * <p>如果是{@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime},这个值会被设置为事件进入CEP算子的时间。
	 */
	long timestamp();

	/** 返回当前的处理时间。 */
	long currentProcessingTime();
}

这个上下文让用户可以获取处理的事件(在IterativeCondition时候是进来的记录,在PatternProcessFunction是匹配的结果)的时间属性。 调用TimeContext#currentProcessingTime总是返回当前的处理时间,而且尽量去调用这个函数而不是调用其它的比如说System.currentTimeMillis()。

使用EventTime时,TimeContext#timestamp()返回的值等于分配的时间戳。 使用ProcessingTime时,这个值等于事件进入CEP算子的时间点(在PatternProcessFunction中是匹配产生的时间)。 这意味着多次调用这个方法得到的值是一致的。

五、可选的参数设置

用于配置 Flink CEP 的 SharedBuffer 缓存容量的选项。它可以加快 CEP 算子的处理速度,并限制内存中缓存的元素数量。

仅当 state.backend.type 设置为 rocksdb 时限制内存使用才有效,这会将超过缓存数量的元素传输到 rocksdb 状态存储而不是内存状态存储。当 state.backend.type 设置为 rocksdb 时,这些配置项有助于限制内存。相比之下,当 state.backend 设置为非 rocksdb 时,缓存会导致性能下降。与使用 Map 实现的旧缓存相比,状态部分将包含更多从 guava-cache 换出的元素,这将使得 copy on write 时的状态处理增加一些开销。

在这里插入图片描述

六、示例

本部分通过几个示例展示CEP的使用方式,有些是工作中实际的例子简化版,有些是为了演示CEP功能构造的例子,其运行结果均在代码的注释中。

1、maven依赖

本文示例均使用此处的依赖。

<properties>
	<encoding>UTF-8</encoding>
	<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
	<maven.compiler.source>1.8</maven.compiler.source>
	<maven.compiler.target>1.8</maven.compiler.target>
	<java.version>1.8</java.version>
	<scala.version>2.12</scala.version>
	<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-java</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java</artifactId>
		<version>${flink.version}</version>
		<!-- <scope>provided</scope> -->
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-csv</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-json</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.2</version>
		<!-- <scope>provided</scope> -->
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-cep</artifactId>
		<version>1.17.2</version>
	</dependency>
</dependencies>

2、示例:输出每个用户连续三次登录失败的信息,允许数据延迟10s

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 输出每个用户连续三次登录失败的信息,允许数据延迟10s
 */
public class TestLoginFailDemo {
   
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class LoginEvent {
   
        private Integer userId;
        private String ip;
        private String status;
        private Long timestamp;

        @Override
        public boolean equals(Object obj) {
   
            if (obj instanceof LoginEvent) {
   
                LoginEvent loginEvent = (LoginEvent) obj;
                return this.userId == loginEvent.getUserId() && this.ip.equals(loginEvent.ip)
                        && this.status.equals(loginEvent.getStatus()) && this.timestamp == loginEvent.getTimestamp();
            } else {
   
                return false;
            }
        }

        @Override
        public int hashCode() {
   
            return super.hashCode() + Long.hashCode(timestamp);
        }
    }

    final static List<LoginEvent> loginEventList = Arrays.asList(
            new LoginEvent(1001, "192.168.10.1", "F", 2L),
            new LoginEvent(1001, "192.168.10.2", "F", 3L),
            new LoginEvent(1002, "192.168.10.8", "F", 4L),
            new LoginEvent(1001, "192.168.10.6", "F", 5L),
            new LoginEvent(1002, "192.168.10.8", "F", 7L),
            new LoginEvent(1002, "192.168.10.8", "F", 8L),
            new LoginEvent(1002, "192.168.10.8", "S", 6L));

    public static void main(String[] args) throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LoginEvent> loginEventDS = env.fromCollection(loginEventList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LoginEvent>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((loginEvent, rs) -> loginEvent.getTimestamp()))
                .keyBy(loginEvent -> loginEvent.getUserId());

        // 定义模式,连续的三个登录失败事件
        Pattern<LoginEvent, ?> loginEventPattern = Pattern.<LoginEvent>begin("first")
                .where(new SimpleCondition<LoginEvent>() {
   

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
   
                        return value.getStatus().equals("F");
                    }

                })
                .next("second")
                .where(new SimpleCondition<LoginEvent>() {
   

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
   
                        return value.getStatus().equals("F");
                    }

                })
                .next("third")
                .where(new SimpleCondition<LoginEvent>() {
   

                    @Override
                    public boolean filter(LoginEvent value) throws Exception {
   
                        return value.getStatus().equals("F");
                    }

                });

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LoginEvent> patternStream = CEP.pattern(loginEventDS, loginEventPattern);

        // 将匹配到的流选择出来输出
        patternStream
                .select(new PatternSelectFunction<LoginEvent, String>() {
   
                    @Override
                    public String select(Map<String, List<LoginEvent>> map) throws Exception {
   
                        // 个体模式是单例的,List 中只有一个元素
                        LoginEvent first = map.get("first").get(0);
                        LoginEvent second = map.get("second").get(0);
                        LoginEvent third = map.get("third").get(0);

                        // return first.toString() + "\n" + second.toString() + "\n" + third.toString();
                        return map.get("first").toString() + " \n" + map.get("second").toString() + " \n"
                                + map.get("third").toString();
                    }
                })
                .print("连续三次登录失败用户信息:\n");

        // 连续三次登录失败用户信息:
        // :9> TestLoginFailDemo.LoginEvent(userId=1001, ip=192.168.10.1, status=F,timestamp=2)
        // TestLoginFailDemo.LoginEvent(userId=1001, ip=192.168.10.2, status=F, timestamp=3)
        // TestLoginFailDemo.LoginEvent(userId=1001, ip=192.168.10.6, status=F, timestamp=5)

        env.execute();
    }

}

3、示例:查找 同一个ip在10秒内访问同一个链接超过3次的ip,可以不连续

可以监控接口是否被攻击,应用应该比较广泛。

import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.nfa.aftermatch.AfterMatchSkipStrategy;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 查找 同一个ip在10秒内访问同一个链接超过3次的ip,可以不连续
 */
public class TestRepeatAccessDemo {
   
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class LogMessage {
   
        private String ip;
        private String url;
        private Long timestamp;

        @Override
        public boolean equals(Object obj) {
   
            if (obj instanceof LogMessage) {
   
                LogMessage logMessage = (LogMessage) obj;
                return this.ip.equals(logMessage.getIp()) && this.url.equals(logMessage.getUrl())
                        && this.timestamp == logMessage.getTimestamp();
            } else {
   
                return false;
            }
        }

        @Override
        public int hashCode() {
   
            return super.hashCode() + Long.hashCode(timestamp);
        }
    }

    final static List<LogMessage> logMessageList = Arrays.asList(
            new LogMessage("192.168.10.1", "URL1", 2L),
            new LogMessage("192.168.10.1", "URL1", 3L),
            new LogMessage("192.168.10.1", "URL2", 4L),
            new LogMessage("192.168.10.1", "URL2", 5L),
            new LogMessage("192.168.10.8", "URL1", 6L),
            new LogMessage("192.168.10.1", "URL1", 7L));

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class RiskLogList extends LogMessage {
   
        private int count;
    }

    static void test1() throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 按用户id分组
        DataStream<LogMessage> logMessageDS = env.fromCollection(logMessageList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<LogMessage>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((logMessage, rs) -> logMessage.getTimestamp()))
                .keyBy(logMessage -> logMessage.getIp() + logMessage.getUrl()); // 根据ip和url分组

        // 定义模式
        Pattern<LogMessage, ?> logMessagePattern = Pattern.<LogMessage>begin("first")
                .followedBy("second")
                .times(2)
                .within(Time.seconds(10));

        // 将Pattern应用到流上,检测匹配的复杂事件,得到一个PatternStream
        PatternStream<LogMessage> patternStream = CEP.pattern(logMessageDS, logMessagePattern);

        // 将匹配到的流选择出来输出
        patternStream.process(new PatternProcessFunction<LogMessage, String>() {
   

            @Override
            public void processMatch(Map<String, List<LogMessage>> match, Context ctx, Collector<String> out)
                    throws Exception {
   
                LogMessage logMessage1 = match.get("first").get(0);
                LogMessage logMessage2 = match.get("second").get(0);
                LogMessage logMessage3 = match.get("second").get(1);

                boolean flag = logMessage1.getUrl().equals(logMessage2.getUrl())
                        && logMessage1.getUrl().equals(logMessage3.getUrl());
                if (flag) {
   
                    out.collect(logMessage1.getIp() + "  url:" + logMessage1.getUrl() + "   timestamp:"
                            + logMessage1.getTimestamp() + "  timestamp2:" + logMessage2.getTimestamp()
                            + "  timestamp3:" + logMessage3.getTimestamp());
                }
            }

        }).print("输出信息:\n");

        // 控制台输出:
        // 输出信息::1> 192.168.10.1 url:URL1 timestamp:2 timestamp2:3 timestamp3:7
        env.execute();
    }

    public static void main(String[] args) throws Exception {
   
        test1();
    }
}

4、示例:监测服务器的温度并告警

监测 机器温度一小时内三次大于设定温度进行风险记录,将风险记录中的数据上一次大于平均值进行报警。

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/*
 * @Author: alanchan
 * @LastEditors: alanchan
 * @Description: 监测 机器温度一小时内三次大于设定温度进行风险记录,将风险记录中的数据上一次大于平均值进行报警
 */
public class TestMachineMonitoring {
   
    // 机器的基本信息
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class MechineInfo {
   
        private int mechineId;
        private String mechineName;
        private int temperature;
        private Long timestamp;

        @Override
        public boolean equals(Object obj) {
   
            if (obj instanceof MechineInfo) {
   
                MechineInfo mechineInfo = (MechineInfo) obj;
                return this.mechineId == mechineInfo.getMechineId()
                        && this.mechineName.equals(mechineInfo.getMechineName())
                        && this.timestamp == mechineInfo.getTimestamp()
                        && this.temperature == mechineInfo.getTemperature();
            } else {
   
                return false;
            }
        }

        @Override
        public int hashCode() {
   
            return super.hashCode() + Long.hashCode(timestamp);
        }
    }

    // 机器的三次平均温度
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class MechineRiskInfo {
   
        private int mechineId;
        private double avgTemperature;
        private Long timestamp;

        @Override
        public boolean equals(Object obj) {
   
            if (obj instanceof MechineRiskInfo) {
   
                MechineRiskInfo mechineRiskInfo = (MechineRiskInfo) obj;
                return this.mechineId == mechineRiskInfo.getMechineId()
                        && this.avgTemperature == mechineRiskInfo.getAvgTemperature()
                        && this.timestamp == mechineRiskInfo.getTimestamp();
            } else {
   
                return false;
            }
        }

        @Override
        public int hashCode() {
   
            return super.hashCode() + Long.hashCode(timestamp);
        }
    }

    // 预警通知信息
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    static class MechineAlertInfo {
   
        private int mechineId;
        private String email;
        private double avgTemperature;
        private Long timestamp;

        @Override
        public boolean equals(Object obj) {
   
            if (obj instanceof MechineAlertInfo) {
   
                MechineAlertInfo mechineAlertInfo = (MechineAlertInfo) obj;
                return this.mechineId == mechineAlertInfo.getMechineId() && this.email == mechineAlertInfo.getEmail()
                        && this.avgTemperature == mechineAlertInfo.getAvgTemperature()
                        && this.timestamp == mechineAlertInfo.getTimestamp();
            } else {
   
                return false;
            }
        }

        @Override
        public int hashCode() {
   
            return super.hashCode() + Long.hashCode(timestamp);
        }
    }

    // 初始化流数据
    static List<MechineInfo> mechineInfoList = Arrays.asList(
            new MechineInfo(1, "m1", 331, 2L),
            new MechineInfo(1, "m1", 321, 4L),
            new MechineInfo(1, "m1", 311, 5L),
            new MechineInfo(1, "m1", 361, 7L),
            new MechineInfo(1, "m1", 351, 9L),
            new MechineInfo(1, "m1", 341, 11L),
            new MechineInfo(2, "m11", 121, 3L),
            new MechineInfo(3, "m21", 101, 4L),
            new MechineInfo(4, "m31", 98, 5L),
            new MechineInfo(5, "m41", 123, 6L));

    // 风险数据集合
    // static List<MechineRiskInfo> mechineRiskInfoList = new ArrayList();
    // 预警数据集合
    // static Map<String, MechineAlertInfo> mechineAlertInfoMap = new HashMap<String, MechineAlertInfo>();
    // 预警温度
    private static final double TEMPERATURE_SETTING = 100;
    // 超时数据

    static void test1() throws Exception {
   
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 按用户id分组
        DataStream<MechineInfo> mechineInfoStream = env.fromCollection(mechineInfoList).assignTimestampsAndWatermarks(
                WatermarkStrategy.<MechineInfo>forBoundedOutOfOrderness(Duration.ofSeconds(10))
                        .withTimestampAssigner((logMessage, rs) -> logMessage.getTimestamp()))
                .keyBy(mechineInfo -> mechineInfo.getMechineId()); // 根据ip和url分组

        // 定义模式1,过滤温度大于设置温度
        Pattern<MechineInfo, ?> mechineInfoPattern = Pattern.<MechineInfo>begin("first")
                .where(new SimpleCondition<MechineInfo>() {
   

                    @Override
                    public boolean filter(MechineInfo value) throws Exception {
   
                        return value.getTemperature() >= TEMPERATURE_SETTING;
                    }

                })
                .followedBy("second")
                .where(new SimpleCondition<MechineInfo>() {
   

                    @Override
                    public boolean filter(MechineInfo value) throws Exception {
   
                        return value.getTemperature() >= TEMPERATURE_SETTING;
                    }

                })
                .times(2)
                .within(Time.minutes(10));

        PatternStream<MechineInfo> patternStream = CEP.pattern(mechineInfoStream, mechineInfoPattern);

        // 筛选,并计算 三次温度平均值
        DataStream<MechineRiskInfo> mechineRiskInfoStream = patternStream
                .process(new PatternProcessFunction<TestMachineMonitoring.MechineInfo, MechineRiskInfo>() {
   

                    @Override
                    public void processMatch(Map<String, List<MechineInfo>> match, Context ctx,
                            Collector<MechineRiskInfo> out)
                            throws Exception {
   
                        MechineInfo firstMechineInfo = match.get("first").get(0);
                        MechineInfo secondMechineInfo1 = match.get("second").get(0);
                        MechineInfo secondMechineInfo2 = match.get("second").get(1);

                        // System.out.printf("mechineInfo:id=%s,name=%s,t=%s,ts=%s",
                        //         firstMechineInfo.getMechineId(),
                        //         firstMechineInfo.getMechineName(), firstMechineInfo.getTemperature(),
                        //         firstMechineInfo.getTimestamp() + "\n");

                        // System.out.printf("secondMechineInfo1:id=%s,name=%s,t=%s,ts=%s",
                        //         secondMechineInfo1.getMechineId(),
                        //         secondMechineInfo1.getMechineName(), secondMechineInfo1.getTemperature(),
                        //         secondMechineInfo1.getTimestamp() + "\n");

                        // System.out.printf("secondMechineInfo2:id=%s,name=%s,t=%s,ts=%s",
                        //         secondMechineInfo2.getMechineId(),
                        //         secondMechineInfo2.getMechineName(), secondMechineInfo2.getTemperature(),
                        //         secondMechineInfo2.getTimestamp() + "\n");

                        out.collect(new MechineRiskInfo(
                                firstMechineInfo.getMechineId(), (firstMechineInfo.getTemperature()
                                        + secondMechineInfo1.getTemperature() + secondMechineInfo2.getTemperature())
                                        / 3,
                                ctx.timestamp()));
                    }

                }).keyBy(mechineRiskInfo -> mechineRiskInfo.getMechineId());

        mechineRiskInfoStream.print("mechineRiskInfoStream:");
        // 定义模式2,比较风险数据的前后两条,如果是上升的趋势,则报警,并设置报警联系人
        Pattern<MechineRiskInfo, ?> mechineRiskInfoPattern = Pattern.<MechineRiskInfo>begin("step1")
                .next("step2")
                .within(Time.hours(1));

        PatternStream<MechineRiskInfo> patternStream2 = CEP.pattern(mechineRiskInfoStream, mechineRiskInfoPattern);

        // 筛选 警告信息,并设置发送邮箱
        DataStream<MechineAlertInfo> mechineAlertInfoList = patternStream2
                .process(new PatternProcessFunction<TestMachineMonitoring.MechineRiskInfo, MechineAlertInfo>() {
   

                    @Override
                    public void processMatch(Map<String, List<MechineRiskInfo>> match, Context ctx,
                            Collector<MechineAlertInfo> out) throws Exception {
   
                        MechineRiskInfo mechineRiskInfo1 = match.get("step1").get(0);
                        MechineRiskInfo mechineRiskInfo2 = match.get("step2").get(0);
                        MechineAlertInfo MechineAlertInfo = null;
                        if (mechineRiskInfo1.getAvgTemperature() <= mechineRiskInfo2.getAvgTemperature()) {
   
                            MechineAlertInfo = new MechineAlertInfo(mechineRiskInfo1.getMechineId(),
                                    "alan.chan.chn@163.com",
                                    mechineRiskInfo2.getAvgTemperature(), ctx.currentProcessingTime());

                            out.collect(MechineAlertInfo);
                        }

                    }

                });
        mechineAlertInfoList.print("mechineAlertInfoList:");
        // mechineAlertInfoList::11> TestMachineMonitoring.MechineAlertInfo(mechineId=1, email=alan.chan.chn@163.com, avgTemperature=331.0, timestamp=1705366481553)
        // mechineAlertInfoList::11> TestMachineMonitoring.MechineAlertInfo(mechineId=1, email=alan.chan.chn@163.com, avgTemperature=341.0, timestamp=1705366481566)
        // mechineAlertInfoList::11> TestMachineMonitoring.MechineAlertInfo(mechineId=1, email=alan.chan.chn@163.com, avgTemperature=351.0, timestamp=1705366481567)
        env.execute();

    }

    public static void main(String[] args) throws Exception {
   
        test1();
    }
}

以上,本文完整的介绍了Flink 的类库CEP的内容,通过大量的示例展示如何使用CEP。

本专题分为以下几篇介绍:
59、Flink CEP - Flink的复杂事件处理介绍及示例(1)-入门
59、Flink CEP - Flink的复杂事件处理介绍及示例(2)- 模式API
59、Flink CEP - Flink的复杂事件处理介绍及示例(3)- 模式选取及超时处理
59、Flink CEP - Flink的复杂事件处理介绍及示例(4)- 延迟数据处理和三个实际应用示例
59、Flink CEP - Flink的复杂事件处理介绍及示例(完整版)

最近更新

  1. TCP协议是安全的吗?

    2024-01-26 11:36:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-26 11:36:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-26 11:36:01       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-26 11:36:01       20 阅读

热门阅读

  1. C Primer Plus(第六版)13.11 编程练习 第13题

    2024-01-26 11:36:01       31 阅读
  2. C语言——栈的实现

    2024-01-26 11:36:01       36 阅读
  3. Nginx location 使用正则匹配路径

    2024-01-26 11:36:01       36 阅读
  4. 前端学习-0125

    2024-01-26 11:36:01       33 阅读
  5. 服务器宝塔安全需要修改的设置

    2024-01-26 11:36:01       37 阅读
  6. UnityUI看向相机

    2024-01-26 11:36:01       33 阅读
  7. mysql更新charset

    2024-01-26 11:36:01       29 阅读
  8. sealos apt&&yum安装 && sealos 部署k8s

    2024-01-26 11:36:01       37 阅读
  9. GET基于报错的sql注入利用-脱库

    2024-01-26 11:36:01       36 阅读
  10. 优雅的控制协程(goroutine)的并发数量

    2024-01-26 11:36:01       33 阅读