写点什么

一文搞懂 Flink Stream Join 原理

用户头像
shengjk1
关注
发布于: 2021 年 03 月 30 日

总括

详解

一般情况下,我们会写如下的代码


DataStreamSource<Tuple2<Long, Long>> addSource = env.addSource(new WordSource());				addSource.join(addSource).where(new KeySelector<Tuple2<Long, Long>, Long>() {			@Override			public Long getKey(Tuple2<Long, Long> value) throws Exception {//				System.out.println("where "+value.f0);				return value.f0;			}		}).equalTo(new KeySelector<Tuple2<Long, Long>, Long>() {			@Override			public Long getKey(Tuple2<Long, Long> value) throws Exception {				System.out.println("equalTo "+value.f0);				return value.f0;			}		}).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))			.apply(new JoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long, Long>>() {				@Override				public Tuple2<Long, Long> join(Tuple2<Long, Long> first, Tuple2<Long, Long> second) throws Exception {//						System.out.println("vvvvv "+first+second);					return new Tuple2<>(first.f0,first.f1+second.f1);				}			})			.print("join====");
复制代码

点进去可以得到 join 的入口方法


//join 的入口方法  otherStream 为 stream2,生成 joinedStream	public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {		return new JoinedStreams<>(this, otherStream);	}
复制代码

然后


//对 stream1 应用 keySelector	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {		requireNonNull(keySelector);		final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());		return where(keySelector, keyType);	}
复制代码

然后调用 Where 类的 equalTo 方法,保证了 stream1 stream2 相同的 key 进入到同一个窗口


//对 stream2 应用 keySelector 保证 stream1 和 stream2 相同的 key 或者说要关联的 key 在同一个窗口内		public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {			requireNonNull(keySelector);			final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());			return equalTo(keySelector, otherKey);		}
复制代码

再往下调用 EqualTo 类的 window 方法


@PublicEvolving			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);			}
复制代码

然后会调用 WithWindow 的 apply 方法


//应用 apply 方法		public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function) {			TypeInformation<T> resultType = TypeExtractor.getBinaryOperatorReturnType(				function,				JoinFunction.class,				0,				1,				2,				TypeExtractor.NO_INDEX,				input1.getType(),				input2.getType(),				"Join",				false);
return apply(function, resultType); }
public <T> DataStream<T> apply(JoinFunction<T1, T2, T> function, TypeInformation<T> resultType) { //clean the closure function = input1.getExecutionEnvironment().clean(function);
coGroupedWindowedStream = input1.coGroup(input2) .where(keySelector1) .equalTo(keySelector2) .window(windowAssigner) .trigger(trigger) .evictor(evictor) .allowedLateness(allowedLateness);
return coGroupedWindowedStream .apply(new JoinCoGroupFunction<>(function), resultType); }
复制代码

至此为止,关键性的方法 apply 出现了,通过 apply 的实现,我们可以知道,join 底层是通过 coGroup 实现的,得到 coGroupedWindowedStream,其中的 function 即为我们自定义的 function.


coGroupedWindowedStream 的 apply 方法最终调用了 WindowStream 的 apply 方法


// 转化为 operator	private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {
final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null); KeySelector<T, K> keySel = input.getKeySelector();
WindowOperator<K, T, Iterable<T>, R, W> operator;
if (evictor != null) { @SuppressWarnings({"unchecked", "rawtypes"}) TypeSerializer<StreamRecord<T>> streamRecordSerializer = (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig())); // 窗口中 state ttl long_max_value ListStateDescriptor<StreamRecord<T>> stateDesc = new ListStateDescriptor<>("window-contents", streamRecordSerializer);
operator = new EvictingWindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, function, trigger, evictor, allowedLateness, lateDataOutputTag);
} else { // 窗口中 state ttl long_max_value ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents", input.getType().createSerializer(getExecutionEnvironment().getConfig()));
operator = new WindowOperator<>(windowAssigner, windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), keySel, input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), stateDesc, function, trigger, allowedLateness, lateDataOutputTag); }
// StreamOperator 转化为 dataStream return input.transform(opName, resultType, operator); }
复制代码

转化为了 windowOperator。当 window 执行的时候,调用了 CoGroupWindowFunction 的 apply 方法


@Override		// window 在执行的时候,即 userFunction.process		public void apply(KEY key,				W window,				Iterable<TaggedUnion<T1, T2>> values,				Collector<T> out) throws Exception {			//会将两个 stream 的数据,添加到 list 当中			List<T1> oneValues = new ArrayList<>();			List<T2> twoValues = new ArrayList<>();
for (TaggedUnion<T1, T2> val: values) { if (val.isOne()) { oneValues.add(val.getOne()); } else { twoValues.add(val.getTwo()); } } wrappedFunction.coGroup(oneValues, twoValues, out); }
复制代码

而 wrappedFunction.coGroup 调用了 JoinCoGroupFunction.coGroup,从而实现双流 join

@Override		// join 最终执行的地方,其中 first、second 都是窗口中的数据		public void coGroup(Iterable<T1> first, Iterable<T2> second, Collector<T> out) throws Exception {			for (T1 val1: first) {				for (T2 val2: second) {					//这里执行用户定义的 join 方法					out.collect(wrappedFunction.join(val1, val2));				}			}		}
复制代码


发布于: 2021 年 03 月 30 日阅读数: 7
用户头像

shengjk1

关注

还未添加个人签名 2018.04.26 加入

博客 https://blog.csdn.net/jsjsjs1789

评论

发布
暂无评论
一文搞懂 Flink Stream Join原理