Flink 处理函数 -6-4
发布于: 2020 年 10 月 16 日
处理函数
DataStream API 提供了一组底层转换--处理函数,除了基本功能,还可以访问时间戳和水位线,并支持注册将来时间触发器,副输出功能可以将记录输出多个流
处理函数被用于构建事件驱动型应用,或实现一些内置窗口及转换无法实现的自定义逻辑,如:大多数Flink SQL所支持的算子都是利用处理函数实现。
8种不同处理函数:ProcessFunction、KeyedProcessFunction、CoProcessFunction、ProcessJoinFunction、BroadcastProcessFunction、KeyedBroadcastProcessFunction、ProcessWindowFunction、ProcessAllWindowFunction
KeyedProcessFunction讲解
每条记录调用一次,并返回零个、一个、或多个记录。
所有处理函数都实现了RichFunction接口,因此支持open()、close()、等方法
//// Source code recreated from a .class file by IntelliJ IDEA// (powered by Fernflower decompiler)//package org.apache.flink.streaming.api.functions;import org.apache.flink.annotation.PublicEvolving;import org.apache.flink.api.common.functions.AbstractRichFunction;import org.apache.flink.streaming.api.TimeDomain;import org.apache.flink.streaming.api.TimerService;import org.apache.flink.util.Collector;import org.apache.flink.util.OutputTag;@PublicEvolvingpublic abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction { private static final long serialVersionUID = 1L; public KeyedProcessFunction() { }//流中每条记录都调用一次,可以将记录转给Collector传递出去。//Context对象是处理函数与众不同的精华,可以获取时间戳,当前记录键值,TimerService,还支持发送副输出。 public abstract void processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3) throws Exception;//是一个回调函数,它会在注册的计时器出发时被调用,timestamp给出了所触发计时器的时间戳,Collector可以用来发出记录,OnTimeerContext能够提供和processElement()方法中的context对象相同的服务,此外它还会返回触发器的时间域(处理时间,还是事件时间) public void onTimer(long timestamp, KeyedProcessFunction<K, I, O>.OnTimerContext ctx, Collector<O> out) throws Exception { } public abstract class OnTimerContext extends KeyedProcessFunction<K, I, O>.Context { public OnTimerContext() { super(); } public abstract TimeDomain timeDomain(); public abstract K getCurrentKey(); } public abstract class Context { public Context() { } public abstract Long timestamp(); public abstract TimerService timerService(); public abstract <X> void output(OutputTag<X> var1, X var2); public abstract K getCurrentKey(); }}
demo
package run.been.flinkdemo.chapter6import run.been.flinkdemo.util.{SensorReading, SensorSource}import org.apache.flink.api.scala._import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}import org.apache.flink.api.scala.typeutils.Typesimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import org.apache.flink.util.Collectorobject ProcessFunctionTimers { def main(args: Array[String]) { // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment // use event time for the application env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // ingest sensor stream val readings: DataStream[SensorReading] = env // SensorSource generates random temperature readings .addSource(new SensorSource) val warnings = readings // key by sensor id .keyBy(_.id) // apply ProcessFunction to monitor temperatures .process(new TempIncreaseAlertFunction) warnings.print() env.execute("Monitor sensor temperatures.") }}/** Emits a warning if the temperature of a sensor * monotonically increases for 1 second (in processing time). */class TempIncreaseAlertFunction extends KeyedProcessFunction[String, SensorReading, String] { // hold temperature of last sensor reading lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState( new ValueStateDescriptor[Double]("lastTemp", Types.of[Double]) ) // hold timestamp of currently active timer lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState( new ValueStateDescriptor[Long]("timer", Types.of[Long]) ) override def processElement( r: SensorReading, ctx: KeyedProcessFunction[String, SensorReading, String]#Context, out: Collector[String]): Unit = { // get previous temperature val prevTemp = lastTemp.value() // update last temperature lastTemp.update(r.temperature) val curTimerTimestamp = currentTimer.value() if (prevTemp == 0.0) { // first sensor reading for this key. // we cannot compare it with a previous value. } else if (r.temperature < prevTemp) { // temperature decreased. Delete current timer. ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp) currentTimer.clear() } else if (r.temperature > prevTemp && curTimerTimestamp == 0) { // temperature increased and we have not set a timer yet. // set timer for now + 1 second val timerTs = ctx.timerService().currentProcessingTime() + 1000 ctx.timerService().registerProcessingTimeTimer(timerTs) // remember current timer currentTimer.update(timerTs) } } override def onTimer( ts: Long, ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext, out: Collector[String]): Unit = { out.collect("Temperature of sensor '" + ctx.getCurrentKey + "' monotonically increased for 1 second.") // reset current timer currentTimer.clear() }}
划线
评论
复制
发布于: 2020 年 10 月 16 日 阅读数: 8
小知识点
关注
奇迹的出现往往就在再坚持一下的时候! 2018.04.02 加入
还未添加个人简介
评论