一文搞懂 Flink Stream Join 原理
发布于: 2021 年 03 月 30 日
总括
详解
一般情况下,我们会写如下的代码
DataStreamSource<Tuple2<Long, Long>> addSource = env.addSource(new WordSource()); addSource.join(addSource).where(new KeySelector<Tuple2<Long, Long>, Long>() { @Override public Long getKey(Tuple2<Long, Long> value) throws Exception {// System.out.println("where "+value.f0); return value.f0; } }).equalTo(new KeySelector<Tuple2<Long, Long>, Long>() { @Override public Long getKey(Tuple2<Long, Long> value) throws Exception { System.out.println("equalTo "+value.f0); return value.f0; } }).window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .apply(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() { @Override public Tuple2<Long, Long> join(Tuple2<Long, Long> first, Tuple2<Long, Long> second) throws Exception {// System.out.println("vvvvv "+first+second); return new Tuple2<>(first.f0,first.f1+second.f1); } }) .print("join====");复制代码
点进去可以得到 join 的入口方法
//join 的入口方法 otherStream 为 stream2,生成 joinedStream public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) { return new JoinedStreams<>(this, otherStream); }复制代码
然后
//对 stream1 应用 keySelector public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) { requireNonNull(keySelector); final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType()); return where(keySelector, keyType); }复制代码
然后调用 Where 类的 equalTo 方法,保证了 stream1 stream2 相同的 key 进入到同一个窗口
//对 stream2 应用 keySelector 保证 stream1 和 stream2 相同的 key 或者说要关联的 key 在同一个窗口内 public EqualTo equalTo(KeySelector<T2, KEY> keySelector) { requireNonNull(keySelector); final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType()); return equalTo(keySelector, otherKey); }复制代码
再往下调用 EqualTo 类的 window 方法
@PublicEvolving public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) { return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null); }复制代码
然后会调用 WithWindow 的 apply 方法
//应用 apply 方法 public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) { TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType( function, JoinFunction.class, 0, 1, 2, TypeExtractor.NO_INDEX, input1.getType(), input2.getType(), "Join", false);
return apply(function, resultType); }
public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) { //clean the closure function = input1.getExecutionEnvironment().clean(function);
coGroupedWindowedStream = input1.coGroup(input2) .where(keySelector1) .equalTo(keySelector2) .window(windowAssigner) .trigger(trigger) .evictor(evictor) .allowedLateness(allowedLateness);
return coGroupedWindowedStream .apply(new JoinCoGroupFunction<>(function), resultType); }复制代码
至此为止,关键性的方法 apply 出现了,通过 apply 的实现,我们可以知道,join 底层是通过 coGroup 实现的,得到 coGroupedWindowedStream,其中的 function 即为我们自定义的 function.
coGroupedWindowedStream 的 apply 方法最终调用了 WindowStream 的 apply 方法
// 转化为 operator private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {
final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null); KeySelector<T, K> keySel = input.getKeySelector();
WindowOperator<K, T, Iterable<T>, R, W> operator;
if (evictor != null) { @SuppressWarnings({"unchecked", "rawtypes"}) TypeSerializer<StreamRecord<T>> streamRecordSerializer = (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); // 窗口中 state ttl long_max_value ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer);
operator = new EvictingWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, function, trigger, evictor, allowedLateness, lateDataOutputTag);
} else { // 窗口中 state ttl long_max_value ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents", input.getType().createSerializer(getExecutionEnvironment().getConfig()));
operator = new WindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, function, trigger, allowedLateness, lateDataOutputTag); }
// StreamOperator 转化为 dataStream return input.transform(opName, resultType, operator); }复制代码
转化为了 windowOperator。当 window 执行的时候,调用了 CoGroupWindowFunction 的 apply 方法
@Override // window 在执行的时候,即 userFunction.process public void apply(KEY key, W window, Iterable<TaggedUnion<T1, T2>> values, Collector<T> out) throws Exception { //会将两个 stream 的数据,添加到 list 当中 List<T1> oneValues = new ArrayList<>(); List<T2> twoValues = new ArrayList<>();
for (TaggedUnion<T1, T2> val: values) { if (val.isOne()) { oneValues.add(val.getOne()); } else { twoValues.add(val.getTwo()); } } wrappedFunction.coGroup(oneValues, twoValues, out); }复制代码
而 wrappedFunction.coGroup 调用了 JoinCoGroupFunction.coGroup,从而实现双流 join
@Override // join 最终执行的地方,其中 first、second 都是窗口中的数据 public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception { for (T1 val1: first) { for (T2 val2: second) { //这里执行用户定义的 join 方法 out.collect(wrappedFunction.join(val1, val2)); } } }复制代码
划线
评论
复制
发布于: 2021 年 03 月 30 日阅读数: 7
版权声明: 本文为 InfoQ 作者【shengjk1】的原创文章。
原文链接:【http://xie.infoq.cn/article/2f50a2bd4a3c82f171cd84bd4】。
本文遵守【CC-BY 4.0】协议,转载请保留原文出处及本版权声明。
shengjk1
关注
还未添加个人签名 2018.04.26 加入
博客 https://blog.csdn.net/jsjsjs1789











评论