Flink 向副输出发送数据 6-6

用户头像
小知识点
关注
发布于: 2020 年 10 月 18 日

向副输出发送数据



  • 大多数DataStream API的算子都只有一个输出,即只能生成某个数据类型的结果流

  • split算子可以将一个流拆分成多条类型相同的流

  • 处理函数的副输出功能允许从同一个函数发出多条数据流,且它们类型可以不同

  • 每个副输出都又一个OutputTag[X]对象标识,X是副输出结果流的类型

  • 处理函数可以利用Context对象将记录发送至一个或多个副输出



demo

package run.been.flinkdemo.chapter6

import run.been.flinkdemo.util.{SensorReading, SensorSource, SensorTimeAssigner}

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
import org.apache.flink.util.Collector

object SideOutputs {

def main(args: Array[String]): Unit = {

// set up the streaming execution environment
val env = StreamExecutionEnvironment.getExecutionEnvironment

// checkpoint every 10 seconds
env.getCheckpointConfig.setCheckpointInterval(10 * 1000)

// use event time for the application
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// configure watermark interval
env.getConfig.setAutoWatermarkInterval(1000L)

// ingest sensor stream
val readings: DataStream[SensorReading] = env
// SensorSource generates random temperature readings
.addSource(new SensorSource)
// assign timestamps and watermarks which are required for event time
.assignTimestampsAndWatermarks(new SensorTimeAssigner)

val monitoredReadings: DataStream[SensorReading] = readings
// monitor stream for readings with freezing temperatures
.process(new FreezingMonitor)

// retrieve and print the freezing alarms
monitoredReadings
.getSideOutput(new OutputTag[String]("freezing-alarms"))
.print()

// print the main output
readings.print()

env.execute()
}
}

/** Emits freezing alarms to a side output for readings with a temperature below 32F. */
class FreezingMonitor extends ProcessFunction[SensorReading, SensorReading] {

// define a side output tag
lazy val freezingAlarmOutput: OutputTag[String] =
new OutputTag[String]("freezing-alarms")

override def processElement(
r: SensorReading,
ctx: ProcessFunction[SensorReading, SensorReading]#Context,
out: Collector[SensorReading]): Unit = {
// emit freezing alarm if temperature is below 32F.
if (r.temperature < 32.0) {
ctx.output(freezingAlarmOutput, s"Freezing Alarm for ${r.id}")
}
// forward all readings to the regular output
out.collect(r)
}
}




用户头像

小知识点

关注

奇迹的出现往往就在再坚持一下的时候! 2018.04.02 加入

还未添加个人简介

评论

发布
暂无评论
Flink向副输出发送数据6-6