Flink 时间服务和计时器 -6-5
发布于: 2020 年 10 月 17 日
时间服务和计时器
Context和OnTimerContext对象中的TimerService方法
@PublicEvolvingpublic interface TimerService {    String UNSUPPORTED_REGISTER_TIMER_MSG = "Setting timers is only supported on a keyed streams.";    String UNSUPPORTED_DELETE_TIMER_MSG = "Deleting timers is only supported on a keyed streams.";    //返回当前处理时间    long currentProcessingTime();        //返回当前水位线时间    long currentWatermark();        //针对当前键值注册一个处理时间计时器,当执行机器的处理时间,到达指定时间戳,该计时器就会触发,有点儿像定时任务    void registerProcessingTimeTimer(long var1);        //针对当前键值注册一个事件时间计时器    void registerEventTimeTimer(long var1);    //针对当前键值删除一个处理时间计时器    void deleteProcessingTimeTimer(long var1);      //针对当前键值删除一个事件时间计时器    void deleteEventTimeTimer(long var1);}
- 系统对于processElement()和onTimer两个方法的调用是同步的,可以防止并发访问和操作状态。 
- 在非键值分区流上设置计时器 
计时器只允许在按照键值分区的数据流上注册,常见用途是在某些键值不再使用后清除键值分区状态,或实现一些基于时间的自定义窗口逻辑。为了在一条非键值分区的数据流上使用计时器,可以通过KeySelector中返回一个“假冒的”常数键值来创建一条键值分区数据流。
注意该操作会使得所有数据发送到单个任务,从而强制算子以并行度为1来执行。
计时器使用
- 对于每个键值和时间戳只能注册一个计时器 
- 每个键值可以有多个计时器,但是每个时间戳就只能有一个计时器 
- 默认情况下,KeyedProcessFunction会将全部计时器的时间戳放到堆中的一个优先队列里。 
- 也可以配置放入RocksDB状态后端来存储计时器 
- 所有计时器会和它的状态一起写入检查点。 
- 如果应用需要从故障中恢复,那么所有在应用重启过程中过期的处理时间计时器会在应用恢复后立即触发。 
- 有一个例外,如果使用增量RocksDB状态后端,且将计时器存储在堆内,计时器写入检查点的过程就会是同步的。 
- 建议不要使用太多计时器,以免检查点写入时间过长。 
package run.been.flinkdemo.chapter6import run.been.flinkdemo.util.{SensorReading, SensorSource}import org.apache.flink.api.scala._import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}import org.apache.flink.api.scala.typeutils.Typesimport org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.KeyedProcessFunctionimport org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}import org.apache.flink.util.Collectorobject ProcessFunctionTimers {  def main(args: Array[String]) {    // set up the streaming execution environment    val env = StreamExecutionEnvironment.getExecutionEnvironment    // use event time for the application    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)    // ingest sensor stream    val readings: DataStream[SensorReading] = env      // SensorSource generates random temperature readings      .addSource(new SensorSource)    val warnings = readings      // key by sensor id      .keyBy(_.id)      // apply ProcessFunction to monitor temperatures      .process(new TempIncreaseAlertFunction)    warnings.print()    env.execute("Monitor sensor temperatures.")  }}/** Emits a warning if the temperature of a sensor  * monotonically increases for 1 second (in processing time).  */class TempIncreaseAlertFunction  extends KeyedProcessFunction[String, SensorReading, String] {  // hold temperature of last sensor reading  lazy val lastTemp: ValueState[Double] =    getRuntimeContext.getState(      new ValueStateDescriptor[Double]("lastTemp", Types.of[Double])    )  // hold timestamp of currently active timer  lazy val currentTimer: ValueState[Long] =    getRuntimeContext.getState(      new ValueStateDescriptor[Long]("timer", Types.of[Long])    )  override def processElement(      r: SensorReading,      ctx: KeyedProcessFunction[String, SensorReading, String]#Context,      out: Collector[String]): Unit = {    // get previous temperature    val prevTemp = lastTemp.value()    // update last temperature    lastTemp.update(r.temperature)    val curTimerTimestamp = currentTimer.value()    if (prevTemp == 0.0) {      // first sensor reading for this key.      // we cannot compare it with a previous value.    }    else if (r.temperature < prevTemp) {      // temperature decreased. Delete current timer.      ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp)      currentTimer.clear()    }    else if (r.temperature > prevTemp && curTimerTimestamp == 0) {      // temperature increased and we have not set a timer yet.      // set timer for now + 1 second      val timerTs = ctx.timerService().currentProcessingTime() + 1000      ctx.timerService().registerProcessingTimeTimer(timerTs)      // remember current timer      currentTimer.update(timerTs)    }  }  override def onTimer(      ts: Long,      ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext,      out: Collector[String]): Unit = {    out.collect("Temperature of sensor '" + ctx.getCurrentKey +      "' monotonically increased for 1 second.")    // reset current timer    currentTimer.clear()  }}
划线
评论
复制
发布于: 2020 年 10 月 17 日阅读数: 64

小知识点
关注
奇迹的出现往往就在再坚持一下的时候! 2018.04.02 加入
还未添加个人简介











 
    
评论