Flink 处理函数 -6-4

用户头像
小知识点
关注
发布于: 2020 年 10 月 16 日

处理函数

DataStream API 提供了一组底层转换--处理函数,除了基本功能,还可以访问时间戳和水位线,并支持注册将来时间触发器,副输出功能可以将记录输出多个流



处理函数被用于构建事件驱动型应用,或实现一些内置窗口及转换无法实现的自定义逻辑,如:大多数Flink SQL所支持的算子都是利用处理函数实现。



8种不同处理函数:ProcessFunction、KeyedProcessFunction、CoProcessFunction、ProcessJoinFunction、BroadcastProcessFunction、KeyedBroadcastProcessFunction、ProcessWindowFunction、ProcessAllWindowFunction



KeyedProcessFunction讲解



  • 每条记录调用一次,并返回零个、一个、或多个记录。

  • 所有处理函数都实现了RichFunction接口,因此支持open()、close()、等方法



//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package org.apache.flink.streaming.api.functions;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.streaming.api.TimeDomain;
import org.apache.flink.streaming.api.TimerService;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

@PublicEvolving
public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction {
private static final long serialVersionUID = 1L;

public KeyedProcessFunction() {
}

//流中每条记录都调用一次,可以将记录转给Collector传递出去。
//Context对象是处理函数与众不同的精华,可以获取时间戳,当前记录键值,TimerService,还支持发送副输出。
public abstract void processElement(I var1, KeyedProcessFunction<K, I, O>.Context var2, Collector<O> var3) throws Exception;

//是一个回调函数,它会在注册的计时器出发时被调用,timestamp给出了所触发计时器的时间戳,Collector可以用来发出记录,OnTimeerContext能够提供和processElement()方法中的context对象相同的服务,此外它还会返回触发器的时间域(处理时间,还是事件时间)
public void onTimer(long timestamp, KeyedProcessFunction<K, I, O>.OnTimerContext ctx, Collector<O> out) throws Exception {
}

public abstract class OnTimerContext extends KeyedProcessFunction<K, I, O>.Context {
public OnTimerContext() {
super();
}

public abstract TimeDomain timeDomain();

public abstract K getCurrentKey();
}

public abstract class Context {
public Context() {
}

public abstract Long timestamp();

public abstract TimerService timerService();

public abstract <X> void output(OutputTag<X> var1, X var2);

public abstract K getCurrentKey();
}
}



demo



package run.been.flinkdemo.chapter6

import run.been.flinkdemo.util.{SensorReading, SensorSource}
import org.apache.flink.api.scala._
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.api.scala.typeutils.Types
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
import org.apache.flink.util.Collector

object ProcessFunctionTimers {

def main(args: Array[String]) {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

// ingest sensor stream
val readings: DataStream[SensorReading] = env
// SensorSource generates random temperature readings
.addSource(new SensorSource)

val warnings = readings
// key by sensor id
.keyBy(_.id)
// apply ProcessFunction to monitor temperatures
.process(new TempIncreaseAlertFunction)

warnings.print()

env.execute("Monitor sensor temperatures.")
}
}

/** Emits a warning if the temperature of a sensor
* monotonically increases for 1 second (in processing time).
*/
class TempIncreaseAlertFunction
extends KeyedProcessFunction[String, SensorReading, String] {

// hold temperature of last sensor reading
lazy val lastTemp: ValueState[Double] =
getRuntimeContext.getState(
new ValueStateDescriptor[Double]("lastTemp", Types.of[Double])
)

// hold timestamp of currently active timer
lazy val currentTimer: ValueState[Long] =
getRuntimeContext.getState(
new ValueStateDescriptor[Long]("timer", Types.of[Long])
)

override def processElement(
r: SensorReading,
ctx: KeyedProcessFunction[String, SensorReading, String]#Context,
out: Collector[String]): Unit = {

// get previous temperature
val prevTemp = lastTemp.value()
// update last temperature
lastTemp.update(r.temperature)

val curTimerTimestamp = currentTimer.value()
if (prevTemp == 0.0) {
// first sensor reading for this key.
// we cannot compare it with a previous value.
}
else if (r.temperature < prevTemp) {
// temperature decreased. Delete current timer.
ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp)
currentTimer.clear()
}
else if (r.temperature > prevTemp && curTimerTimestamp == 0) {
// temperature increased and we have not set a timer yet.
// set timer for now + 1 second
val timerTs = ctx.timerService().currentProcessingTime() + 1000
ctx.timerService().registerProcessingTimeTimer(timerTs)
// remember current timer
currentTimer.update(timerTs)
}
}

override def onTimer(
ts: Long,
ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext,
out: Collector[String]): Unit = {

out.collect("Temperature of sensor '" + ctx.getCurrentKey +
"' monotonically increased for 1 second.")
// reset current timer
currentTimer.clear()
}
}



用户头像

小知识点

关注

奇迹的出现往往就在再坚持一下的时候! 2018.04.02 加入

还未添加个人简介

评论

发布
暂无评论
Flink处理函数-6-4