写点什么

Flink 处理函数实战之三:KeyedProcessFunction 类

  • 2022 年 4 月 17 日
  • 本文字数:2320 字

    阅读完需:约 8 分钟

  1. 最后是整个逻辑功能的主体:ProcessTime.java,这里面有自定义的 KeyedProcessFunction 子类,还有程序入口的 main 方法,代码在下面列出来之后,还会对关键部分做介绍:


package com.bolingcavalry.keyedprocessfunction;


import com.bolingcavalry.Splitter;


import org.apache.flink.api.common.state.ValueState;


import org.apache.flink.api.common.state.ValueStateDescriptor;


import org.apache.flink.api.java.tuple.Tuple;


import org.apache.flink.api.java.tuple.Tuple2;


import org.apache.flink.configuration.Configuration;


import org.apache.flink.streaming.api.TimeCharacteristic;


import org.apache.flink.streaming.api.datastream.DataStream;


import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;


import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;


import org.apache.flink.streaming.api.functions.KeyedProcessFunction;


import org.apache.flink.streaming.api.watermark.Watermark;


import org.apache.flink.util.Collector;


import java.text.SimpleDateFormat;


import java.util.Date;


/**


  • @author will

  • @email zq2599@gmail.com

  • @date 2020-05-17 13:43

  • @description 体验 KeyedProcessFunction 类(时间类型 Java 开源项目【ali1024.coding.net/public/P7/Java/git】 是处理时间)


*/


public class ProcessTime {


/**


  • KeyedProcessFunction 的子类,作用是将每个单词最新出现时间记录到 backend,并创建定时器,

  • 定时器触发的时候,检查这个单词距离上次出现是否已经达到 10 秒,如果是,就发射给下游算子


*/


static class CountWithTimeoutFunction extends KeyedProcessFunction<Tuple, Tuple2<String, Integer>, Tuple2<String, Long>> {


// 自定义状态


private ValueState<CountWithTimestamp> state;


@Override


public void open(Configuration parameters) throws Exception {


// 初始 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》开源 化状态,name 是 myState


state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));


}


@Override


public void processElement(


Tuple2<String, Integer> value,


Context ctx,


Collector<Tuple2<String, Long>> out) throws Exception {


// 取得当前是哪个单词


Tuple currentKey = ctx.getCurrentKey();


// 从 backend 取得当前单词的 myState 状态


CountWithTimestamp current = state.value();


// 如果 myState 还从未没有赋值过,就在此初始化


if (current == null) {


current = new CountWithTimestamp();


current.key = value.f0;


}


// 单词数量加一


current.count++;


// 取当前元素的时间戳,作为该单词最后一次出现的时间


current.lastModified = ctx.timestamp();


// 重新保存到 backend,包括该单词出现的次数,以及最后一次出现的时间


state.update(current);


// 为当前单词创建定时器,十秒后后触发


long timer = current.lastModified + 10000;


ctx.timerService().registerProcessingTimeTimer(timer);


// 打印所有信息,用于核对数据正确性


System.out.println(String.format("process, %s, %d, lastModified : %d (%s), timer : %d (%s)\n\n",


currentKey.getField(0),


current.count,


current.lastModified,


time(current.lastModified),


timer,


time(timer)));


}


/**


  • 定时器触发后执行的方法

  • @param timestamp 这个时间戳代表的是该定时器的触发时间

  • @param ctx

  • @param out

  • @throws Exception


*/


@Override


public void onTimer(


long timestamp,


OnTimerContext ctx,


Collector<Tuple2<String, Long>> out) throws Exception {


// 取得当前单词


Tuple currentKey = ctx.getCurrentKey();


// 取得该单词的 myState 状态


CountWithTimestamp result = state.value();


// 当前元素是否已经连续 10 秒未出现的标志


boolean isTimeout = false;


// timestamp 是定时器触发时间,如果等于最后一次更新时间+10 秒,就表示这十秒内已经收到过该单词了,


// 这种连续十秒没有出现的元素,被发送到下游算子


if (timestamp == result.lastModified + 10000) {


// 发送


out.collect(new Tuple2<String, Long>(result.key, result.count));


isTimeout = true;


}


// 打印数据,用于核对是否符合预期


System.out.println(String.format("ontimer, %s, %d, lastModified : %d (%s), stamp : %d (%s), isTimeout : %s\n\n",


currentKey.getField(0),


result.count,


result.lastModified,


time(result.lastModified),


timestamp,


time(timestamp),


String.valueOf(isTimeout)));


}


}


public static void main(String[] args) throws Exception {


final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


// 并行度 1


env.setParallelism(1);


// 处理时间


env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);


// 监听本地 9999 端口,读取字符串


DataStream<String> socketDataStream = env.socketTextStream("localhost", 9999);


// 所有输入的单词,如果超过 10 秒没有再次出现,都可以通过 CountWithTimeoutFunction 得到


DataStream<Tuple2<String, Long>> timeOutWord = socketDataStream


// 对收到的字符串用空格做分割,得到多个单词


.flatMap(new Splitter())

分享

这次面试我也做了一些总结,确实还有很多要学的东西。相关面试题也做了整理,可以分享给大家,了解一下面试真题,想进大厂的或者想跳槽的小伙伴不妨好好利用时间来学习。学习的脚步一定不能停止!



Spring Cloud 实战



Spring Boot 实战



面试题整理(性能优化+微服务+并发编程+开源框架+分布式)

用户头像

还未添加个人签名 2022.04.13 加入

还未添加个人简介

评论

发布
暂无评论
Flink处理函数实战之三:KeyedProcessFunction类_Java_爱好编程进阶_InfoQ写作平台