class AvgTempFunction
extends AggregateFunction[(String, Double), (String, Double, Int), (String, Double)] {
//初始化,创建累加器
override def createAccumulator() = {
("", 0.0, 0)
}
//向累加器中添加一个输入元素
override def add(in: (String, Double), acc: (String, Double, Int)) = {
(in._1, in._2 + acc._2, 1 + acc._3)
}
//根据累加器计算并返回结果
override def getResult(acc: (String, Double, Int)) = {
(acc._1, acc._2 / acc._3)
}
//合并2个累加器并返回结果
override def merge(acc1: (String, Double, Int), acc2: (String, Double, Int)) = {
(acc1._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
}
}
case class MinMaxTemp(id: String, min: Double, max:Double, endTs: Long)
评论