写点什么

大数据 -130 - Flink CEP 详解 - 捕获超时事件提取全解析:从原理到完整实战代码教程 恶意登录案例实现

作者:武子康
  • 2025-10-20
    山东
  • 本文字数:5714 字

    阅读完需:约 19 分钟

大数据-130 - Flink CEP 详解 - 捕获超时事件提取全解析:从原理到完整实战代码教程 恶意登录案例实现

点一下关注吧!!!非常感谢!!持续更新!!!

🚀 AI 篇持续更新中!(长期更新)

AI 炼丹日志-31- 千呼万唤始出来 GPT-5 发布!“快的模型 + 深度思考模型 + 实时路由”,持续打造实用 AI 工具指南!📐🤖

💻 Java 篇正式开启!(300 篇)

目前 2025 年 10 月 20 日更新到:Java-153 深入浅出 MongoDB 全面的适用场景分析与选型指南 场景应用指南 MyBatis 已完结,Spring 已完结,Nginx 已完结,Tomcat 已完结,分布式服务正在更新!深入浅出助你打牢基础!

📊 大数据板块已完成多项干货更新(300 篇):

包括 Hadoop、Hive、Kafka、Flink、ClickHouse、Elasticsearch 等二十余项核心组件,覆盖离线+实时数仓全栈!大数据-278 Spark MLib - 基础介绍 机器学习算法 梯度提升树 GBDT 案例 详解


章节内容

上节我们完成了如下的内容:


  • Flink CEP 核心组件

  • CEP 的应用场景

  • CEP 的优势


超时事件提取

当一个模式通过 within 关键字定义了检测窗口时间时,部分事件序列可能因为超过窗口长度而被丢弃,为了能够处理这些超时的部分匹配,select 和 flatSelectAPI 调用允许制定超时处理程序。

FlinkCEP 开发流程详解

1. 数据源转换

  • 从数据源(Kafka、文件、Socket 等)获取原始数据流

  • 将数据转换为 DataStream:


  DataStream<Event> inputStream = env.addSource(source)      .map(record -> new Event(record));
复制代码


  • 常见数据源处理:

  • Kafka 数据源:使用 FlinkKafkaConsumer

  • 文件数据源:使用 readTextFile/readFile 方法

2. 模式定义与模式流创建

  • 定义事件模式(Pattern):


  Pattern<Event, ?> pattern = Pattern.<Event>begin("start")      .where(new SimpleCondition<Event>() {          @Override          public boolean filter(Event event) {              return event.getType().equals("login");          }      })      .next("middle")      .within(Time.seconds(10));
复制代码


  • 将 DataStream 与 Pattern 组合:


  PatternStream<Event> patternStream = CEP.pattern(inputStream.keyBy(Event::getUserId), pattern);
复制代码

3. 模式流处理

  • 使用 Select/Process 算子处理匹配事件:


  DataStream<Alert> alerts = patternStream.process(      new PatternProcessFunction<Event, Alert>() {          @Override          public void processMatch(              Map<String, List<Event>> match,              Context ctx,              Collector<Alert> out) {              out.collect(new Alert(match));          }      });
复制代码


  • 处理方式选择:

  • select():简单提取匹配数据

  • process():复杂处理匹配数据

  • flatSelect():一对多处理

4. 结果输出

  • 结果数据流处理:


  alerts.map(alert -> alert.toString())
复制代码


  • 输出到目标库:


  alerts.addSink(new SinkFunction<Alert>() {      @Override      public void invoke(Alert value, Context context) {          // 写入数据库/消息队列等      }  });
复制代码


  • 常见输出目标:

  • Kafka:使用 FlinkKafkaProducer

  • 数据库:使用 JDBCSink

  • 文件系统:使用 StreamingFileSink

应用场景示例

  1. 风险控制:检测连续登录失败

  2. 运维监控:识别异常日志序列

  3. 物联网:设备状态异常检测

  4. 金融交易:可疑交易模式识别


SELECT 方法:


SingleOutputStreamOperator<PayEvent> result =    patternStream.select(orderTimeoutOutput, new PatternTimeoutFunction<PayEvent, PayEvent>() {    @Override    public PayEvent timeout(Map<String, List<PayEvent>> map, long l) throws Exception {        return map.get("begin").get(0);    }}, new PatternSelectFunction<PayEvent, PayEvent>() {    @Override    public PayEvent select(Map<String, List<PayEvent>> map) throws Exception {        return map.get("pay").get(0);    }});
复制代码


对检测到的序列模式序列应用选择函数,对于每个模式序列,调用提供的 PatternSelectFunction,模式选择函数只能产生一个结果元素。对超时的部分模式序列应用超时函数,对于每个部分模式序列,调用提供的 PatternTimeoutFunction,模式超时函数只能产生一个结果元素。你可以在使用相同 OutputTag 进行 Select 操作 SingleOutputStreamOperator 上获得 SingleOutputStreamOperator 生成的超时数据流。

非确定有限自动机

FlinkCEP 在运行时会将用户的逻辑转换为这样一个 NFA Graph(NFA 对象)所以有限状态机的工作过程,就是从开始状态,根据不同的输入,自动进行转换的过程。



上图中的状态机的功能,是检测二进制数是否含有偶数个 0。从图上可以看出,输入只有 1 和 0 两种。从 S1 状态开始,只有输入 0 才会转换到 S2 状态,同样 S2 状态下只有输入 0 才会转换到 S1。所以,二进制输入完毕,如果满足最终状态,也就是最后停在 S1 状态,那么输入的二进制数就含有偶数个 0。

CEP 开发流程

FlinkCEP 开发流程:


  • DataSource 中数据转换为 DataStream、Watermark、keyby

  • 定义 Pattern,并将 DataStream 和 Pattern 组合转换为 PatternStream

  • PatternStream 经过 select、process 等算子转换为 DataStream

  • 再次转换为 DataStream 经过处理后,Sink 到目标库

添加依赖

<dependency>    <groupId>org.apache.flink</groupId>    <artifactId>flink-cep_2.12</artifactId>    <version>${flink.version}</version></dependency>
复制代码

案例 1:恶意登录检测

找出 5 秒内,连续登录失败的账号以下是数据:


new CepLoginBean(1L, "fail", 1597905234000L),new CepLoginBean(1L, "success", 1597905235000L),new CepLoginBean(2L, "fail", 1597905236000L),new CepLoginBean(2L, "fail", 1597905237000L),new CepLoginBean(2L, "fail", 1597905238000L),new CepLoginBean(3L, "fail", 1597905239000L),new CepLoginBean(3L, "success", 1597905240000L)
复制代码

整体思路

  • 获取到数据

  • 在数据源上做 Watermark

  • 在 Watermark 上根据 ID 分组 keyBy

  • 做出模式 Pattern

  • 在数据流上进行模式匹配

  • 提取匹配成功的数据

编写代码

package icu.wzk;/** * Flink CEP 登录失败检测示例 *  * 功能说明: * 该示例模拟了用户登录事件流,利用 Flink CEP 检测“同一用户在 5 秒内连续两次登录失败”的情况。 *  * 核心步骤: * 1. 构造模拟数据流 * 2. 设置事件时间(EventTime)与水位线(Watermark) * 3. 定义 CEP 模式(Pattern) * 4. 应用模式并处理匹配结果 */public class FlinkCepLoginTest {
public static void main(String[] args) throws Exception {
// 1️⃣ 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置时间语义为事件时间(EventTime) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 并行度设置为 1,方便观察控制台输出 env.setParallelism(1);
// 2️⃣ 模拟输入数据流(用户登录事件) DataStreamSource<CepLoginBean> data = env.fromElements( new CepLoginBean(1L, "fail", 1597905234000L), new CepLoginBean(1L, "success", 1597905235000L), new CepLoginBean(2L, "fail", 1597905236000L), new CepLoginBean(2L, "fail", 1597905237000L), new CepLoginBean(2L, "fail", 1597905238000L), new CepLoginBean(3L, "fail", 1597905239000L), new CepLoginBean(3L, "success", 1597905240000L) );
// 3️⃣ 为数据流分配时间戳与水位线 SingleOutputStreamOperator<CepLoginBean> watermarks = data .assignTimestampsAndWatermarks( new WatermarkStrategy<CepLoginBean>() {
@Override public WatermarkGenerator<CepLoginBean> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<CepLoginBean>() {
// 当前已观察到的最大事件时间戳 long maxTimestamp = Long.MAX_VALUE;
// 最大允许乱序时间(即事件允许延迟 500ms 到达) long maxOutOfOrderness = 500L;
@Override public void onEvent(CepLoginBean event, long eventTimestamp, WatermarkOutput output) { // 更新当前最大时间戳 maxTimestamp = Math.max(maxTimestamp, event.getTimestamp()); }
@Override public void onPeriodicEmit(WatermarkOutput output) { // 周期性发送水位线:当前最大时间戳 - 乱序时间 output.emitWatermark(new Watermark(maxTimestamp - maxOutOfOrderness)); } }; } }.withTimestampAssigner((element, recordTimestamp) -> element.getTimestamp()) );
// 4️⃣ 按用户 ID 分组(KeyBy) KeyedStream<CepLoginBean, Long> keyed = watermarks .keyBy(new KeySelector<CepLoginBean, Long>() { @Override public Long getKey(CepLoginBean value) { return value.getUserId(); } });
// 5️⃣ 定义 CEP 模式:连续两次失败且间隔不超过 5 秒 Pattern<CepLoginBean, CepLoginBean> pattern = Pattern .<CepLoginBean>begin("start") .where(new IterativeCondition<CepLoginBean>() { @Override public boolean filter(CepLoginBean cepLoginBean, Context<CepLoginBean> context) { // 第一个事件:登录失败 return cepLoginBean.getOperation().equals("fail"); } }) .next("next") .where(new IterativeCondition<CepLoginBean>() { @Override public boolean filter(CepLoginBean cepLoginBean, Context<CepLoginBean> context) { // 第二个事件:同样是登录失败 return cepLoginBean.getOperation().equals("fail"); } }) // 限定两次失败必须发生在 5 秒内 .within(Time.seconds(5));
// 6️⃣ 将模式应用到 keyed 流 PatternStream<CepLoginBean> patternStream = CEP.pattern(keyed, pattern);
// 7️⃣ 处理匹配到的事件(输出报警或进一步操作) SingleOutputStreamOperator<CepLoginBean> process = patternStream .process(new PatternProcessFunction<CepLoginBean, CepLoginBean>() { @Override public void processMatch(Map<String, List<CepLoginBean>> map, Context context, Collector<CepLoginBean> collector) { System.out.println("map: " + map); // 获取第一个“fail”事件并输出 List<CepLoginBean> start = map.get("start"); collector.collect(start.get(0)); } });
// 8️⃣ 打印匹配结果 process.print();
// 9️⃣ 启动作业 env.execute("FlinkCepLoginTest"); }}
/** * 自定义登录事件数据类 * 包含用户ID、操作类型、时间戳等字段 */class CepLoginBean {
private Long userId; // 用户ID private String operation; // 操作类型(如 "fail", "success") private Long timestamp; // 事件时间(毫秒)
public CepLoginBean(Long userId, String operation, Long timestamp) { this.userId = userId; this.operation = operation; this.timestamp = timestamp; }
public Long getUserId() { return userId; }
public void setUserId(Long userId) { this.userId = userId; }
public String getOperation() { return operation; }
public void setOperation(String operation) { this.operation = operation; }
public Long getTimestamp() { return timestamp; }
public void setTimestamp(Long timestamp) { this.timestamp = timestamp; }
@Override public String toString() { return "CepLoginBean{" + "userId=" + userId + ", operation='" + operation + '\'' + ", timestamp=" + timestamp + '}'; }}
复制代码

运行结果

可以看到程序输出:


map: {start=[CepLoginBean{userId=2, operation='fail', timestamp=1597905236000}], next=[CepLoginBean{userId=2, operation='fail', timestamp=1597905237000}]}...
Process finished with exit code 0
复制代码


运行截图如下所示:



发布于: 刚刚阅读数: 6
用户头像

武子康

关注

永远好奇 无限进步 2019-04-14 加入

Hi, I'm Zikang,好奇心驱动的探索者 | INTJ / INFJ 我热爱探索一切值得深究的事物。对技术、成长、效率、认知、人生有着持续的好奇心和行动力。 坚信「飞轮效应」,相信每一次微小的积累,终将带来深远的改变。

评论

发布
暂无评论
大数据-130 - Flink CEP 详解 - 捕获超时事件提取全解析:从原理到完整实战代码教程 恶意登录案例实现_Java_武子康_InfoQ写作社区