Flink 向副输出发送数据 6-6
发布于: 2020 年 10 月 18 日
向副输出发送数据
大多数DataStream API的算子都只有一个输出,即只能生成某个数据类型的结果流
split算子可以将一个流拆分成多条类型相同的流
处理函数的副输出功能允许从同一个函数发出多条数据流,且它们类型可以不同
每个副输出都又一个OutputTag[X]对象标识,X是副输出结果流的类型
处理函数可以利用Context对象将记录发送至一个或多个副输出
demo
package run.been.flinkdemo.chapter6import run.been.flinkdemo.util.{SensorReading, SensorSource, SensorTimeAssigner}import org.apache.flink.api.scala._import org.apache.flink.streaming.api.TimeCharacteristicimport org.apache.flink.streaming.api.functions.ProcessFunctionimport org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}import org.apache.flink.util.Collectorobject SideOutputs { def main(args: Array[String]): Unit = { // set up the streaming execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment // checkpoint every 10 seconds env.getCheckpointConfig.setCheckpointInterval(10 * 1000) // use event time for the application env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // configure watermark interval env.getConfig.setAutoWatermarkInterval(1000L) // ingest sensor stream val readings: DataStream[SensorReading] = env // SensorSource generates random temperature readings .addSource(new SensorSource) // assign timestamps and watermarks which are required for event time .assignTimestampsAndWatermarks(new SensorTimeAssigner) val monitoredReadings: DataStream[SensorReading] = readings // monitor stream for readings with freezing temperatures .process(new FreezingMonitor) // retrieve and print the freezing alarms monitoredReadings .getSideOutput(new OutputTag[String]("freezing-alarms")) .print() // print the main output readings.print() env.execute() }}/** Emits freezing alarms to a side output for readings with a temperature below 32F. */class FreezingMonitor extends ProcessFunction[SensorReading, SensorReading] { // define a side output tag lazy val freezingAlarmOutput: OutputTag[String] = new OutputTag[String]("freezing-alarms") override def processElement( r: SensorReading, ctx: ProcessFunction[SensorReading, SensorReading]#Context, out: Collector[SensorReading]): Unit = { // emit freezing alarm if temperature is below 32F. if (r.temperature < 32.0) { ctx.output(freezingAlarmOutput, s"Freezing Alarm for ${r.id}") } // forward all readings to the regular output out.collect(r) }}
划线
评论
复制
发布于: 2020 年 10 月 18 日 阅读数: 7
小知识点
关注
奇迹的出现往往就在再坚持一下的时候! 2018.04.02 加入
还未添加个人简介
评论