Flink CEP实现10秒内连续登录失败用户分析

1、什么是CEP?

Flink CEP即 Flink Complex Event Processing,是基于DataStream流式数据提供的一套复杂事件处理编程模型。你可以把他理解为基于无界流的一套正则匹配模型,即对于无界流中的各种数据(称为事件),提供一种组合匹配的功能。

在这里插入图片描述
上图中,以不同形状代表一个DataStream中不同属性的事件。以一个圆圈和一个三角组成一个Pattern后,就可以快速过滤出原来的DataStream中符合规律的数据。举个例子,比如很多网站需要对恶意登录的用户进行屏蔽,如果用户连续三次输入错误的密码,那就要锁定当前用户。在这个场景下,所有用户的登录行为就构成了一个无界的数据流DataStream。而连续三次登录失败就是一个匹配模型Pattern。CEP编程模型的功能就是从用户登录行为这个无界数据流DataStream中,找出符合这个匹配模Pattern的所有数据。这种场景下,使用我们前面介绍的各种DataStream API其实也是可以实现的,不过相对就麻烦很多。而CEP编程模型则提供了非常简单灵活的功能实现方式。

2、代码实现

2.1 引入maven依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.roy</groupId>
    <artifactId>FlinkDemo</artifactId>
    <version>1.0</version>

    <properties>
        <flink.version>1.12.5</flink.version>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <log4j.version>2.12.1</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-blink_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- CEP主要是下面这个依赖 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-cep_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.8.3-10.0</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.14</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.1.0</version>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

2.2 基本流程

//1、获取原始事件流
DataStream<Event> input = ......; 
//2、定义匹配器
Pattern<Event,?> pattern = .......; 
//3、获取匹配流
PatternStream<Event> patternStream = CEP.pattern(input, pattern);
//4、将匹配流中的数据处理形成结果数据流
DataStream<Result> resultStream = patternStream.process(
	new PatternProcessFunction<Event, Result>() {
   
	@Override
	public void processMatch(
		Map<String, List<Event>> pattern,
		Context ctx,
		Collector<Result> out) throws Exception {
   
	}
});

2.3 完整代码

注意:代码运行前,先启动2.4 nlk socket服务

package com.roy.flink.project.userlogin;

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.functions.PatternProcessFunction;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;

import java.time.Duration;
import java.util.List;
import java.util.Map;

/**
 * @desc 十秒内连续登录失败的用户分析。使用Flink CEP进行快速模式匹配
 */
public class MyUserLoginAna {
   
    public static void main(String[] args) throws Exception {
   
        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        // //BoundedOutOfOrdernessWatermarks定时提交Watermark的间隔
        env.getConfig().setAutoWatermarkInterval(1000L);

        // 使用Socket测试
        env.setParallelism(1);
        // 1、获取原始事件流(10.86.97.206改为实际地址)
        final DataStreamSource<String> dataStreamSource = env.socketTextStream("10.86.97.206",7777);

        final SingleOutputStreamOperator<UserLoginRecord> userLoginRecordStream = dataStreamSource.map(new MapFunction<String, UserLoginRecord>() {
   
            @Override
            public UserLoginRecord map(String s) throws Exception {
   
                final String[] splitVal = s.split(",");
                return new UserLoginRecord(splitVal[0], Integer.parseInt(splitVal[1]), Long.parseLong(splitVal[2]));
            }
        }).assignTimestampsAndWatermarks(
                WatermarkStrategy.<UserLoginRecord>forBoundedOutOfOrderness(Duration.ofSeconds(1))// 主要针对乱序流,由于乱序流中需要等待迟到数据到齐,所以必须设置一个固定量的延迟时间
                        .withTimestampAssigner((SerializableTimestampAssigner<UserLoginRecord>) (element, recordTimestamp) -> element.getLoginTime())
        );

        // 2、定义匹配器
        // 2.1:10秒内出现3次登录失败的记录(不一定连续)
        // Flink CEP定义消息匹配器。
//        final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("start").where(new SimpleCondition<UserLoginRecord>() {
   
//            @Override
//            public boolean filter(UserLoginRecord userLoginRecord) throws Exception {
   
//                return 1 == userLoginRecord.getLoginRes();
//            }
//        }).times(3).within(Time.seconds(10));

        // 2.2:连续三次登录失败。next表示连续匹配。 不连续匹配使用followBy
        final Pattern<UserLoginRecord, UserLoginRecord> pattern = Pattern.<UserLoginRecord>begin("one").where(new SimpleCondition<UserLoginRecord>() {
   
            @Override
            public boolean filter(UserLoginRecord value) throws Exception {
   
                return 1 == value.getLoginRes();
            }
        }).next("two").where(new SimpleCondition<UserLoginRecord>() {
   
            @Override
            public boolean filter(UserLoginRecord value) throws Exception {
   
                return 1 == value.getLoginRes();
            }
        }).next("three").where(new SimpleCondition<UserLoginRecord>() {
   
            @Override
            public boolean filter(UserLoginRecord value) throws Exception {
   
                return 1 == value.getLoginRes();
            }
        }).within(Time.seconds(10));

        // 3、获取匹配流
        final PatternStream<UserLoginRecord> badUser = CEP.pattern(userLoginRecordStream, pattern);

        final MyProcessFunction myProcessFunction = new MyProcessFunction();
        // 4、将匹配流中的数据处理成结果数据流
        final SingleOutputStreamOperator<UserLoginRecord> badUserStream = badUser.process(myProcessFunction);
        badUserStream.print("badUser");
        env.execute("UserLoginAna");

    }// main

    public static class MyProcessFunction extends PatternProcessFunction<UserLoginRecord,UserLoginRecord>{
   

        @Override
        public void processMatch(Map<String, List<UserLoginRecord>> match, Context ctx, Collector<UserLoginRecord> out) throws Exception {
   
            // 针对2.1 连续3次登录失败
//            final List<UserLoginRecord> records = match.get("start");
//            for(UserLoginRecord record : records){
   
//                out.collect(record);
//            }

            // 针对2.2 非连续3次登录失败
            final List<UserLoginRecord> records = match.get("three");
            for(UserLoginRecord record : records){
   
                out.collect(record);
            }

        }// processMarch
    }// MyProcessFunction
}

UserLoginRecord对象,如下:


public class UserLoginRecord {
   
    private String userId;
    private int loginRes; // 0-成功, 1-失败
    private long loginTime;

    public UserLoginRecord() {
   
    }

    public UserLoginRecord(String userId, int loginRes, long loginTime) {
   
        this.userId = userId;
        this.loginRes = loginRes;
        this.loginTime = loginTime;
    }

    @Override
    public String toString() {
   
        return "UserLoginRecord{" +
                "userId='" + userId + '\'' +
                ", loginRes=" + loginRes +
                ", loginTime=" + loginTime +
                '}';
    }

    public String getUserId() {
   
        return userId;
    }

    public void setUserId(String userId) {
   
        this.userId = userId;
    }

    public int getLoginRes() {
   
        return loginRes;
    }

    public void setLoginRes(int loginRes) {
   
        this.loginRes = loginRes;
    }

    public long getLoginTime() {
   
        return loginTime;
    }

    public void setLoginTime(long loginTime) {
   
        this.loginTime = loginTime;
    }
}

2.4 nlk模拟socket服务端

在这里插入图片描述

2.5 IDEA控制台打印

在这里插入图片描述

相关推荐

  1. 【工作实践-11】关于uniapp切换账号登录失败问题

    2024-01-31 07:14:04       31 阅读
  2. 10 分钟在 Remix (React) 中实现单点登录

    2024-01-31 07:14:04       58 阅读
  3. HiveSQL题——用户连续登陆

    2024-01-31 07:14:04       48 阅读
  4. 基于 HarmonyOS 的用户登录界面实现

    2024-01-31 07:14:04       71 阅读
  5. Vue3 实现基于token 用户登录

    2024-01-31 07:14:04       40 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-31 07:14:04       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-31 07:14:04       106 阅读
  3. 在Django里面运行非项目文件

    2024-01-31 07:14:04       87 阅读
  4. Python语言-面向对象

    2024-01-31 07:14:04       96 阅读

热门阅读

  1. Flink数据实时写入HBase

    2024-01-31 07:14:04       58 阅读
  2. 深入了解Flutter中的Sliver:介绍与使用场景

    2024-01-31 07:14:04       64 阅读
  3. 网站分享(实用)

    2024-01-31 07:14:04       78 阅读
  4. NetCore iText7 根据PDF模板 导出PDF

    2024-01-31 07:14:04       60 阅读
  5. P8655 [蓝桥杯 2017 国 B] 发现环

    2024-01-31 07:14:04       54 阅读
  6. 最大公约数(左右区间问题)

    2024-01-31 07:14:04       50 阅读
  7. 深入理解并测试HttpResponse —— 关键知识和实践

    2024-01-31 07:14:04       48 阅读
  8. STM32——点灯

    2024-01-31 07:14:04       50 阅读
  9. vue中nextTick()

    2024-01-31 07:14:04       53 阅读