Flink 在窗口上应用函数 -6-9

用户头像
小知识点
关注
发布于: 3 小时前

在窗口上应用函数



  • 增量聚合函数:窗口内以状态形式存储某个值,且需要根据每个加入窗口的元素对该值进行更新。此类函数通常非常节省空间,且最终会将聚合值作为单个结果发送出去。如:ReduceFunction,AggregationFunction



  • 全量窗口函数:收集窗口内所有元素,并在执行计算时,对它们进行遍历。占用更多空间,但是支持更复杂逻辑。



RunduceFunciton

  • 输入输出状态必须一致



val minTempPerWindow: DataStream[(String, Double)] = sensorData
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15))
.reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))



class MinTempFunction extends ReduceFunction[(String, Double)] {
override def reduce(r1: (String, Double), r2: (String, Double)) = {
(r1._1, r1._2.min(r2._2))
}
}



AggregateFunction





class AvgTempFunction
extends AggregateFunction[(String, Double), (String, Double, Int), (String, Double)] {
//初始化,创建累加器
override def createAccumulator() = {
("", 0.0, 0)
}

//向累加器中添加一个输入元素
override def add(in: (String, Double), acc: (String, Double, Int)) = {
(in._1, in._2 + acc._2, 1 + acc._3)
}

//根据累加器计算并返回结果
override def getResult(acc: (String, Double, Int)) = {
(acc._1, acc._2 / acc._3)
}


//合并2个累加器并返回结果
override def merge(acc1: (String, Double, Int), acc2: (String, Double, Int)) = {
(acc1._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
}
}

case class MinMaxTemp(id: String, min: Double, max:Double, endTs: Long)



用户头像

小知识点

关注

奇迹的出现往往就在再坚持一下的时候! 2018.04.02 加入

还未添加个人简介

评论

发布
暂无评论
Flink在窗口上应用函数-6-9