class AvgTempFunction
  extends AggregateFunction[(String, Double), (String, Double, Int), (String, Double)] {
//初始化,创建累加器
  override def createAccumulator() = {
    ("", 0.0, 0)
  }
//向累加器中添加一个输入元素
  override def add(in: (String, Double), acc: (String, Double, Int)) = {
    (in._1, in._2 + acc._2, 1 + acc._3)
  }
//根据累加器计算并返回结果
  override def getResult(acc: (String, Double, Int)) = {
    (acc._1, acc._2 / acc._3)
  }
//合并2个累加器并返回结果
  override def merge(acc1: (String, Double, Int), acc2: (String, Double, Int)) = {
    (acc1._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
  }
}
case class MinMaxTemp(id: String, min: Double, max:Double, endTs: Long)
评论