/**
* A {@link TwoInputStreamOperator} for executing {@link KeyedBroadcastProcessFunction
* KeyedBroadcastProcessFunctions}.
*
* @param <KS> The key type of the input keyed stream.
* @param <IN1> The input type of the keyed (non-broadcast) side.
* @param <IN2> The input type of the broadcast side.
* @param <OUT> The output type of the operator.
*/
@Internal
public class CoBroadcastWithKeyedOperator<KS, IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT>, Triggerable<KS, VoidNamespace> {
private static final long serialVersionUID = 5926499536290284870L;
private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
private transient TimestampedCollector<OUT> collector;
private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates;
private transient ReadWriteContextImpl rwContext;
private transient ReadOnlyContextImpl rContext;
private transient OnTimerContextImpl onTimerContext;
public CoBroadcastWithKeyedOperator(
final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors) {
super(function);
this.broadcastStateDescriptors = Preconditions.checkNotNull(broadcastStateDescriptors);
}
@Override
public void open() throws Exception {
super.open();
InternalTimerService<VoidNamespace> internalTimerService =
getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
TimerService timerService = new SimpleTimerService(internalTimerService);
collector = new TimestampedCollector<>(output);
this.broadcastStates = new HashMap<>(broadcastStateDescriptors.size());
for (MapStateDescriptor<?, ?> descriptor : broadcastStateDescriptors) {
broadcastStates.put(
descriptor,
// 初始化状态实现实例
getOperatorStateBackend().getBroadcastState(descriptor));
}
rwContext =
new ReadWriteContextImpl(
getExecutionConfig(),
getKeyedStateBackend(),
userFunction,
broadcastStates,
timerService);
rContext =
new ReadOnlyContextImpl(
getExecutionConfig(), userFunction, broadcastStates, timerService);
onTimerContext =
new OnTimerContextImpl(
getExecutionConfig(), userFunction, broadcastStates, timerService);
}
@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
collector.setTimestamp(element);
rContext.setElement(element);
userFunction.processElement(element.getValue(), rContext, collector);
rContext.setElement(null);
}
@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
collector.setTimestamp(element);
rwContext.setElement(element);
userFunction.processBroadcastElement(element.getValue(), rwContext, collector);
rwContext.setElement(null);
}
private class ReadWriteContextImpl
extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.Context {
private final ExecutionConfig config;
private final KeyedStateBackend<KS> keyedStateBackend;
private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
private final TimerService timerService;
private StreamRecord<IN2> element;
ReadWriteContextImpl(
final ExecutionConfig executionConfig,
final KeyedStateBackend<KS> keyedStateBackend,
final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
final TimerService timerService) {
function.super();
this.config = Preconditions.checkNotNull(executionConfig);
this.keyedStateBackend = Preconditions.checkNotNull(keyedStateBackend);
this.states = Preconditions.checkNotNull(broadcastStates);
this.timerService = Preconditions.checkNotNull(timerService);
}
void setElement(StreamRecord<IN2> e) {
this.element = e;
}
@Override
public Long timestamp() {
checkState(element != null);
return element.getTimestamp();
}
@Override
public <K, V> BroadcastState<K, V> getBroadcastState(
MapStateDescriptor<K, V> stateDescriptor) {
Preconditions.checkNotNull(stateDescriptor);
stateDescriptor.initializeSerializerUnlessSet(config);
BroadcastState<K, V> state = (BroadcastState<K, V>) states.get(stateDescriptor);
if (state == null) {
throw new IllegalArgumentException(
"The requested state does not exist. "
+ "Check for typos in your state descriptor, or specify the state descriptor "
+ "in the datastream.broadcast(...) call if you forgot to register it.");
}
return state;
}
@Override
public <X> void output(OutputTag<X> outputTag, X value) {
checkArgument(outputTag != null, "OutputTag must not be null.");
output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
}
@Override
public long currentProcessingTime() {
return timerService.currentProcessingTime();
}
@Override
public long currentWatermark() {
return timerService.currentWatermark();
}
@Override
public <VS, S extends State> void applyToKeyedState(
final StateDescriptor<S, VS> stateDescriptor,
final KeyedStateFunction<KS, S> function)
throws Exception {
keyedStateBackend.applyToAllKeys(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
Preconditions.checkNotNull(stateDescriptor),
Preconditions.checkNotNull(function));
}
}
private class ReadOnlyContextImpl extends ReadOnlyContext {
private final ExecutionConfig config;
private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
private final TimerService timerService;
private StreamRecord<IN1> element;
ReadOnlyContextImpl(
final ExecutionConfig executionConfig,
final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
final TimerService timerService) {
function.super();
this.config = Preconditions.checkNotNull(executionConfig);
this.states = Preconditions.checkNotNull(broadcastStates);
this.timerService = Preconditions.checkNotNull(timerService);
}
void setElement(StreamRecord<IN1> e) {
this.element = e;
}
@Override
public Long timestamp() {
checkState(element != null);
return element.hasTimestamp() ? element.getTimestamp() : null;
}
@Override
public TimerService timerService() {
return timerService;
}
@Override
public long currentProcessingTime() {
return timerService.currentProcessingTime();
}
@Override
public long currentWatermark() {
return timerService.currentWatermark();
}
@Override
public <X> void output(OutputTag<X> outputTag, X value) {
checkArgument(outputTag != null, "OutputTag must not be null.");
output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
}
@Override
public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(
MapStateDescriptor<K, V> stateDescriptor) {
Preconditions.checkNotNull(stateDescriptor);
stateDescriptor.initializeSerializerUnlessSet(config);
ReadOnlyBroadcastState<K, V> state =
(ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
if (state == null) {
throw new IllegalArgumentException(
"The requested state does not exist. "
+ "Check for typos in your state descriptor, or specify the state descriptor "
+ "in the datastream.broadcast(...) call if you forgot to register it.");
}
return state;
}
@Override
@SuppressWarnings("unchecked")
public KS getCurrentKey() {
return (KS) CoBroadcastWithKeyedOperator.this.getCurrentKey();
}
}
private class OnTimerContextImpl
extends KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT>.OnTimerContext {
private final ExecutionConfig config;
private final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> states;
private final TimerService timerService;
private TimeDomain timeDomain;
private InternalTimer<KS, VoidNamespace> timer;
OnTimerContextImpl(
final ExecutionConfig executionConfig,
final KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> function,
final Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates,
final TimerService timerService) {
function.super();
this.config = Preconditions.checkNotNull(executionConfig);
this.states = Preconditions.checkNotNull(broadcastStates);
this.timerService = Preconditions.checkNotNull(timerService);
}
@Override
public Long timestamp() {
checkState(timer != null);
return timer.getTimestamp();
}
@Override
public TimeDomain timeDomain() {
checkState(timeDomain != null);
return timeDomain;
}
@Override
public KS getCurrentKey() {
return timer.getKey();
}
@Override
public TimerService timerService() {
return timerService;
}
@Override
public long currentProcessingTime() {
return timerService.currentProcessingTime();
}
@Override
public long currentWatermark() {
return timerService.currentWatermark();
}
@Override
public <X> void output(OutputTag<X> outputTag, X value) {
checkArgument(outputTag != null, "OutputTag must not be null.");
output.collect(outputTag, new StreamRecord<>(value, timer.getTimestamp()));
}
@Override
public <K, V> ReadOnlyBroadcastState<K, V> getBroadcastState(
MapStateDescriptor<K, V> stateDescriptor) {
Preconditions.checkNotNull(stateDescriptor);
stateDescriptor.initializeSerializerUnlessSet(config);
ReadOnlyBroadcastState<K, V> state =
(ReadOnlyBroadcastState<K, V>) states.get(stateDescriptor);
if (state == null) {
throw new IllegalArgumentException(
"The requested state does not exist. "
+ "Check for typos in your state descriptor, or specify the state descriptor "
+ "in the datastream.broadcast(...) call if you forgot to register it.");
}
return state;
}
}
}
评论