写点什么

Flink CEP 监测刷屏用户

发布于: 2021 年 05 月 18 日
Flink CEP 监测刷屏用户

规则:用户如果在 10s 内,同时连续输入同样一句话超过 5 次,就认为是恶意刷屏


使用 Flink CEP 检测刷屏用户


object BarrageBehavior02 {  case class Message(userId: String, ip: String, msg: String)
def main(args: Array[String]): Unit = { //初始化运行环境 val env = StreamExecutionEnvironment.getExecutionEnvironment
//设置并行度 env.setParallelism(1)
// 模拟数据源 val loginEventStream: DataStream[Message] = env.fromCollection( List( Message("1", "192.168.0.1", "beijing"), Message("1", "192.168.0.2", "beijing"), Message("1", "192.168.0.3", "beijing"), Message("1", "192.168.0.4", "beijing"), Message("2", "192.168.10.10", "shanghai"), Message("3", "192.168.10.10", "beijing"), Message("3", "192.168.10.11", "beijing"), Message("4", "192.168.10.10", "beijing"), Message("5", "192.168.10.11", "shanghai"), Message("4", "192.168.10.12", "beijing"), Message("5", "192.168.10.13", "shanghai"), Message("5", "192.168.10.14", "shanghai"), Message("5", "192.168.10.15", "beijing"), Message("6", "192.168.10.16", "beijing"), Message("6", "192.168.10.17", "beijing"), Message("6", "192.168.10.18", "beijing"), Message("5", "192.168.10.18", "shanghai"), Message("6", "192.168.10.19", "beijing"), Message("6", "192.168.10.19", "beijing"), Message("5", "192.168.10.18", "shanghai") ) )
//定义模式 val loginbeijingPattern = Pattern.begin[Message]("start") .where(_.msg != null) //一条登录失败 .times(5).optional //将满足五次的数据配对打印 .within(Time.seconds(10))
//进行分组匹配 val loginbeijingDataPattern = CEP.pattern(loginEventStream.keyBy(_.userId), loginbeijingPattern)
//查找符合规则的数据 val loginbeijingResult: DataStream[Option[Iterable[Message]]] = loginbeijingDataPattern.select(patternSelectFun = (pattern: collection.Map[String, Iterable[Message]]) => { var loginEventList: Option[Iterable[Message]] = null loginEventList = pattern.get("start") match { case Some(value) => { if (value.toList.map(x => (x.userId, x.msg)).distinct.size == 1) { Some(value) } else { None } } } loginEventList })
//打印测试 loginbeijingResult.filter(x=>x!=None).map(x=>{ x match { case Some(value)=> value } }).print()
env.execute("BarrageBehavior02) }}
复制代码


Flink CEP

1. 模式与模式序列

  • 简单模式称为模式,将最终在数据流中进行搜索匹配的复杂模式序列称为模式序列,每个复杂模式序列是由多个简单模式组成。

  • 匹配到的一系列输入事件,这些事件通过一系列有效的模式转换,能够访问复杂模式图的所有模式。

  • 每个模式必须具有唯一的名称,我们可以使用模式名称来标识该模式匹配到的事件。

2. 单个模式

一个模式既可以是单例的,也可以是循环的。单例模式接受单个事件,循环模式可以接受多个事件

3. 模式示例:

有如下模式:a b+ c?d

其中a,b,c,d这些字母代表的是模式,+代表循环,b+就是循环模式;?代表可选,c?就是可选模式;

所以上述模式的意思就是:a后面可以跟一个或多个b,后面再可选的跟c,最后跟d

其中a、c? 、d是单例模式,b+是循环模式。

一般情况下,模式都是单例模式,可以使用量词(Quantifiers)将其转换为循环模式。

每个模式可以带有一个或多个条件,这些条件是基于事件接收进行定义的。或者说,每个模式通过一个或多个条件来匹配和接收事件。

了解完上述概念后,接下来介绍下案例中需要用到的几个 CEP API:

案例中用到的 CEP API:

  • Begin:定义一个起始模式状态

    用法:start = Pattern.<Event>begin("start");

  • Next:附加一个新的模式状态。匹配事件必须直接接续上一个匹配事件

    用法:next = start.next("next");

  • Where:定义当前模式状态的过滤条件。仅当事件通过过滤器时,它才能与状态匹配

    用法:patternState.where(_.message == "yyds");

  • Within: 定义事件序列与模式匹配的最大时间间隔。如果未完成的事件序列超过此时间,则将其丢弃

    用法:patternState.within(Time.seconds(10));

  • Times:一个给定类型的事件出现了指定次数

    用法:patternState.times(5);

发布于: 2021 年 05 月 18 日阅读数: 15
用户头像

专注于大数据技术研究 2020.11.10 加入

运营公众号:五分钟学大数据。大数据领域原创技术号,深入大数据技术

评论

发布
暂无评论
Flink CEP 监测刷屏用户