写点什么

面试官: Flink 双流 JOIN 了解吗? 简单说说其实现原理

  • 2022 年 1 月 14 日
  • 本文字数:6949 字

    阅读完需:约 23 分钟

摘要:今天和大家聊聊 Flink 双流 Join 问题。这是一个高频面试点,也是工作中常遇到的一种真实场景。

 

本文分享自华为云社区《万字直通面试:Flink双流JOIN》,作者:大数据兵工厂 。

 

如何保证 Flink 双流 Join 准确性和及时性、除了窗口 join 还存在哪些实现方式、究竟如何回答才能完全打动面试官呢。。你将在本文中找到答案。


1、引子

1.1 数据库 SQL 中的 JOIN

 

我们先来看看数据库 SQL 中的 JOIN 操作。如下所示的订单查询 SQL,通过将订单表的 id 和订单详情表 order_id 关联,获取所有订单下的商品信息。

 

select    a.id as '订单id',   a.order_date as '下单时间',   a.order_amount as '订单金额',   b.order_detail_id as '订单详情id',   b.goods_name as '商品名称',   b.goods_price as '商品价格',   b.order_id as '订单id'from    dwd_order_info_pfd aright join    dwd_order_detail_pfd bon a.id = b.order_id
复制代码

 

这是一段很简单的 SQL 代码,就不详细展开叙述了。此处主要引出 SQL 中的 JOIN 类型,这里用到的是 right join , 即右连接。

  • left join: 保留左表全部数据和右表关联数据,右表非关联数据置 NULL

  • right join: 保留右表全部数据和左表关联数据,左表非关联数据置 NULL

  • inner join: 保留左表关联数据和右边关联数据

  • cross join: 保留左表和右表数据笛卡尔积

 

基于关联键值逐行关联匹配,过滤表数据并生成最终结果,提供给下游数据分析使用。

 

就此打住,关于数据库 SQL 中的 JOIN 原理不再多赘述,感兴趣的话大家可自行研究,下面我们将目光转移到大数据领域看看吧。

1.2 离线场景下的 JOIN

 

假设存在这样一个场景:

已知 Mysql 数据库中订单表和订单明细表,且满足一对多的关系,统计 T-1 天所有订单的商品分布详情。

 

聪明的大家肯定已经给出了答案,没错~就是上面的 SQL:

 

select a.*, b.*from    dwd_order_info_pfd aright join    dwd_order_detail_pfd bon a.id = b.order_id
复制代码

 

现在修改下条件:已知订单表和订单明细表均为亿级别数据,求相同场景下的分析结果。

 

咋办?此时关系型数据库貌似不大合适了~开始放大招:使用大数据计算引擎来解决。

 

考虑到 T-1 统计场景对时效性要求很低,可以使用 Hive SQL 来处理,底层跑 Mapreduce 任务。如果想提高运行速度,换成 Flink 或 Spark 计算引擎,使用内存计算。

 

 

至于查询 SQL 和上面一样,并将其封装成一个定时调度任务, 等系统调度运行。如果结果不正确的话,由于数据源和数据静态不变,大不了重跑,看起来感觉皆大欢喜~

 

可是好景不长,产品冤家此时又给了你一个无法拒绝的需求:我要实时统计!!

2、实时场景下的 JOIN

 

还是上面的场景,此时数据源换成了实时订单流和实时订单明细流,比如 Kafka 的两个 topic,要求实时统计每分钟内所有订单下的商品分布详情。

 

 

现在情况貌似变得复杂了起来,简单分析下:

1.    数据源。实时数据流,和静态流不同,数据是实时流入的且动态变化,需要计算程序支持实时处理机制。

2.    关联性。前面提到静态数据执行多次 join 操作,左表和右表能关联的数据是很恒定的;而实时数据流(左右表)如果进入时机不一致,原本可以关联的数据会关联不上或者发生错误。

3.    延迟性。实时统计,提供分钟甚至秒级别响应结果。

由于流数据 join 的特殊性,在满足实时处理机制、低延迟、强关联性的前提下,看来需要制定完善的数据方案,才能实现真正的流数据 JOIN。

2.1 方案思路

 

我们知道订单数据和订单明细数据是一对多的关系,即一条订单数据对应着多条商品明细数据,毕竟买一件商品也是那么多邮费,不如打包团购。。而一条明细数据仅对应一条订单数据。

 

这样,双流 join 策略可以考虑如下思路:

  • 当数据流为订单数据时。无条件保留,无论当前是否关联到明细数据,均留作后续 join 使用。

  • 当数据流为明细数据时。在关联到其订单数据后,就可以 say goodbye 了,否则暂时保留等待下一次与订单数据的邂逅。

 

完成所有处于同一时段内的订单数据和订单明细数据 join, 清空存储状态

 

实际生产场景中,需要考虑更多的复杂情况,包括 JOIN 过程的数据丢失等异常情况的处理,此处仅示意。

 

好了,看起来我们已经有了一个马马虎虎的实时流 JOIN 方案雏形。

 

貌似可以准备动手大干一场了~ 别着急,有人已经帮我们偷偷的实现了:Apache Flink

3、Flink 的双流 JOIN

 

Apache Flink 是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink 被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。——来自 Flink 官网定义

 

 

这里我们只需要知道 Flink 是一个实时计算引擎就行了,主要关注其如何实现双流 JOIN。

3.1 内部运行机制

 

  • 内存计算:Flink 任务优先在内存中计算,内存不够时保存到访问高效的磁盘,提供秒级延迟响应。

  • 状态强一致性:Flink 使用一致性快照保存状态,并定期检查本地状态、持久存储来保证状态一致性。

  • 分布式执行: Flink 应用程序可以划分为无数个并行任务在集群中执行,几乎无限量使用 CPU、主内存、磁盘和网络 IO。

  • 内置高级编程模型:Flink 编程模型抽象为 SQL、Table、DataStream|DataSet API、Process 四层,并封装成丰富功能的算子,其中就包含 JOIN 类型的算子。

 

 

仔细看看,我们前面章节讨论的实时流 JOIN 方案的前提是否都满足了呢?

1.    实时处理机制: Flink 天生即实时计算引擎

2.    低延迟: Flink 内存计算秒级延迟

3.    强关联性: Flink 状态一致性和 join 类算子

 

不由感叹, 这个 Flink 果然强啊~

保持好奇心,我们去瞅瞅 Flink 双流 join 的真正奥义!!

3.2 JOIN 实现机制

 

Flink 双流 JOIN 主要分为两大类。一类是基于原生 State 的 Connect 算子操作,另一类是基于窗口的 JOIN 操作。其中基于窗口的 JOIN 可细分为 window join 和 interval join 两种。

 

  • 实现原理:底层原理依赖 Flink 的 State 状态存储,通过将数据存储到 State 中进行关联 join, 最终输出结果。

 

 

恍然大悟, Flink 原来是通过 State 状态来缓存等待 join 的实时流。

 

这里给大家抛出一个问题:

用 redis 存储可不可以,state 存储相比 redis 存储的区别?

 

回到正题,这几种方式到底是如何实现双流 JOIN 的?我们接着往下看。

注意: 后面内容将多以文字 + 代码的形式呈现,避免枯燥,我放了一堆原创示意图~

4、基于 Window Join 的双流 JOIN 实现机制

 

顾名思义,此类方式利用 Flink 的窗口机制实现双流 join。通俗理解,将两条实时流中元素分配到同一个时间窗口中完成 Join。

 

底层原理: 两条实时流数据缓存在 Window State 中,当窗口触发计算时,执行 join 操作。

 

4.1 join 算子

 

先看看 Window join 实现方式之一的 join 算子。这里涉及到 Flink 中的窗口(window)概念,因此 Window Joinan 按照窗口类型区分的话某种程度来说可以细分出 3 种:

  • Tumbling Window Join (滚动窗口)

 

  • Sliding Window Join (滑动窗口)

 

  • Session Widnow Join(会话窗口) 

 

两条流数据按照关联主键在(滚动、滑动、会话)窗口内进行 inner join, 底层基于 State 存储,并支持处理时间和事件时间两种时间特征,看下源码:

 

 

源码核心总结:windows 窗口 + state 存储 + 双层 for 循环执行 join()

 

现在让我们把时间轴往回拉一点点,在实时场景 JOIN 那里我们收到了这样的需求:统计每分钟内所有订单下的商品明细分布。

 

OK, 使用 join 算子小试牛刀一下。我们定义 60 秒的滚动窗口,将订单流和订单明细流通过 order_id 关联,得到如下的程序:

 

val env = ...// kafka 订单流val orderStream = ... // kafka 订单明细流val orderDetailStream = ...    orderStream.join(orderDetailStream)    .where(r => r._1)  //订单id    .equalTo(r => r._2) //订单id    .window(TumblingProcessTimeWindows.of(          Time.seconds(60)))    .apply {(r1, r2) => r1 + " : " + r2}    .print()
复制代码

 

整个代码其实很简单,概要总结下:

  • 定义两条输入实时流 A、B

  • A 流调用 join(b 流)算子

  • 关联关系定义: where 为 A 流关联键,equalTo 为 B 流关联键,都是订单 id

  • 定义 window 窗口(60s 间隔)

  • apply 方法定义逻辑输出

 

这样只要程序稳定运行,就能够持续不断的计算每分钟内订单分布详情,貌似解决问题了奥~

 

还是别高兴太早,别忘了此时的 join 类型是 inner join。复习一下知识: inner join 指的是仅保留两条流关联上的数据。

 

这样双流中没关联上的数据岂不是都丢掉了?别担心,Flink 还提供了另一个 window join 操作: coGroup 算子。

4.2 coGroup 算子

 

coGroup 算子也是基于 window 窗口机制,不过 coGroup 算子比 Join 算子更加灵活,可以按照用户指定的逻辑匹配左流或右流数据并输出。

 

换句话说,我们通过自己指定双流的输出来达到 left join 和 right join 的目的。

 

现在来看看在相同场景下 coGroup 算子是如何实现 left join:

 

#这里看看java算子的写法orderDetailStream  .coGroup(orderStream)  .where(r -> r.getOrderId())  .equalTo(r -> r.getOrderId())  .window(TumblingProcessingTimeWindows.of(Time.seconds(60)))  .apply(new CoGroupFunction<OrderDetail, Order, Tuple2<String, Long>>() {    @Override    public void coGroup(Iterable<OrderDetail> orderDetailRecords, Iterable<Order> orderRecords, Collector<Tuple2<String, Long>> collector)  {      for (OrderDetail orderDetaill : orderDetailRecords) {        boolean flag = false;        for (Order orderRecord : orderRecords) {          // 右流中有对应的记录          collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), orderDetailRecords.getGoods_price()));          flag = true;        }        if (!flag) {          // 右流中没有对应的记录          collector.collect(new Tuple2<>(orderDetailRecords.getGoods_name(), null));        }      }    }  })  .print();
复制代码

 

这里需要说明几点:

  • join 算子替换为 coGroup 算子

  • 两条流依然需要在一个 window 中且定义好关联条件

  • apply 方法中自定义判断,此处对右值进行判断:如果有值则进行连接输出,否则右边置为 NULL。

 

可以这么说,现在我们已经彻底搞定了窗口双流 JOIN。

 

只要你给我提供具体的窗口大小,我就能通过 join 或 coGroup 算子鼓捣出各种花样 join,而且使用起来特别简单。

 

但是假如此时我们亲爱的产品又提出了一个小小问题:

大促高峰期,商品数据某时段会写入不及时,时间可能比订单早也可能比订单晚,还是要计算每分钟内的订单商品分布详情,没问题吧~

 

当然有问题:两条流如果步调不一致,还用窗口来控制能 join 的上才怪了~ 很容易等不到 join 流窗口就自动关闭了。

 

还好,我知道 Flink 提供了 Interval join 机制。

5、基于 Interval Join 的双流 JOIN 实现机制

 

Interval Join 根据右流相对左流偏移的时间区间(interval)作为关联窗口,在偏移区间窗口中完成 join 操作。

 

有点不好理解,我画个图看下:

​stream2.time ∈ (stream1.time +low, stream1.time +high)

满足数据流 stream2 在数据流 stream1 的 interval(low, high)偏移区间内关联 join。interval 越大,关联上的数据就越多,超出 interval 的数据不再关联。

  • 实现原理:interval join 也是利用 Flink 的 state 存储数据,不过此时存在 state 失效机制 ttl,触发数据清理操作。

 

这里再引出一个问题:

state 的 ttl 机制需要怎么设置?不合理的 ttl 设置会不会撑爆内存?

我会在后面的文章中深入讲解下 State 的 ttl 机制,欢迎大家一起探讨~

 

下面简单看下 interval join 的代码实现过程:

 

val env = ...// kafka 订单流val orderStream = ... // kafka 订单明细流val orderDetailStream = ...    orderStream.keyBy(_.1)    // 调用intervalJoin关联    .intervalJoin(orderDetailStream._2)    // 设定时间上限和下限    .between(Time.milliseconds(-30), Time.milliseconds(30))      .process(new ProcessWindowFunction())    class ProcessWindowFunction extends ProcessJoinFunction...{   override def processElement(...) {      collector.collect((r1, r2) => r1 + " : " + r2)   }}
复制代码

 

订单流在流入程序后,等候(low,high)时间间隔内的订单明细流数据进行 join, 否则继续处理下一个流。

 

从代码中我们发现,interval join 需要在两个 KeyedStream 之上操作,即 keyBy(),并在 between()方法中指定偏移区间的上下界。

 

需要注意的是 interval join 实现的也是 inner join,且目前只支持事件时间。

6、基于 Connect 的双流 JOIN 实现机制

 

前面在使用 Window join 或者 Interval Join 来实现双流 join 的时候,我发现了其中的共性:

无论哪种实现方式,Flink 内部都将 join 过程透明化,在算子中封装了所有的实现细节。

 

这是什么?是编程语言中的抽象概念~ 隐藏底层细节,对外暴露统一 API, 大幅简化程序编码。

 

可是这样会引来一个问题:如果程序报错或者数据异常,如何快速进行调优排查,直接看源码吗?不大现实。。

 

这里介绍基于 Connect 算子实现的双流 JOIN 方法,我们可自己控制双流 JOIN 处理逻辑,同时保持过程时效性和准确性。

6.1 Connect 算子原理

 

对两个 DataStream 执行 connect 操作,将其转化为 ConnectedStreams, 生成的 Streams 可以调用不同方法在两个实时流上执行,且双流之间可以共享状态。

 

 

图上我们可以看到,两个数据流被 connect 之后,只是被放在了同一个流中,内部依然保持各自的数据和形式,两个流相互独立。

[DataStream1, DataStream2] -> ConnectedStreams[1,2]

这样,我们可以在 Connect 算子底层的 ConnectedStreams 中编写代码,自行实现双流 JOIN 的逻辑处理。

6.2 技术实现

 

1.调用 connect 算子,根据 orderid 进行分组,并使用 process 算子分别对两条流进行处理。

 

orderStream.connect(orderDetailStream)  .keyBy("orderId", "orderId")  .process(new orderProcessFunc());
复制代码

 

2.process 方法内部进行状态编程, 初始化订单、订单明细和定时器的 ValueState 状态。

 

private ValueState<OrderEvent> orderState;private ValueState<TxEvent> orderDetailState;private ValueState<Long> timeState;
// 初始化状态ValueorderState = getRuntimeContext().getState( new ValueStateDescriptor<Order> ("order-state",Order.class));····
复制代码

 

3.为每个进入的数据流保存 state 状态并创建定时器。在时间窗口内另一个流达到时进行 join 并输出,完成后删除定时器。

 

@Overridepublic void processElement1(Order value, Context ctx, Collector<Tuple2<Order, OrderDetail>> out){  if (orderDetailState.value() == null){    //明细数据未到,先把订单数据放入状态     orderState.update(value);    //建立定时器,60秒后触发     Long ts = (value.getEventTime()+10)*1000L;     ctx.timerService().registerEventTimeTimer(       ts);     timeState.update(ts);  }else{    //明细数据已到,直接输出到主流     out.collect(new Tuple2<>(value,orderDetailS       tate.value()));    //删除定时器     ctx.timerService().deleteEventTimeTimer      (timeState.value());     //清空状态,注意清空的是支付状态      orderDetailState.clear();      timeState.clear();  }}...@Overridepublic void processElement2(){  ...}
复制代码

 

4.未及时达到的数据流触发定时器输出到侧输出流,左流先到而右流未到,则输出左流,反之输出右连流。

 

@Overridepublic void onTimer(long timestamp, OnTimerContext ctx, Collector<Tuple2<Order, OrderDetail>> out) {  // 实现左连接   if (orderState.value() != null){       ctx.output(new OutputTag<String>("left-jo        in") {},        orderState.value().getTxId());   // 实现右连接   }else{      ctx.output(new OutputTag<String>("left-jo        in") {},        orderDetailState.value().getTxId());   }   orderState.clear();   orderDetailState.clear();   timeState.clear();}
复制代码

 

总体思想:基于数据时间实现订单数据及订单明细数据的关联,超时或者缺失则由侧输出流输出。

在 connect 中针对订单流和订单明细流,先创建定时器并保存 state 状态,处于窗口内就进行 join, 否则进入侧输出流。

7、双流 JOIN 的优化与总结

 

1.    为什么我的双流 join 时间到了却不触发,一直没有输出

检查一下 watermark 的设置是否合理,数据时间是否远远大于 watermark 和窗口时间,导致窗口数据经常为空

 

2.    state 数据保存多久,会内存爆炸吗

state 自带有 ttl 机制,可以设置 ttl 过期策略,触发 Flink 清理过期 state 数据。建议程序中的 state 数据结构用完后手动 clear 掉。

 

3.    我的双流 join 倾斜怎么办

join 倾斜三板斧: 过滤异常 key、拆分表减少数据、打散 key 分布。当然可以的话我建议加内存!加内存!加内存!!

 

4.    想实现多流 join 怎么办

目前无法一次实现,可以考虑先 union 然后再二次处理;或者先进行 connnect 操作再进行 join 操作,仅建议~

 

5.    join 过程延迟、没关联上的数据会丢失吗

这个一般来说不会,join 过程可以使用侧输出流存储延迟流;如果出现节点网络等异常,Flink checkpoint 也可以保证数据不丢失。

某日

 

面试官: Flink 双流 join 了解吗? 简单说说其实现原理。

某君: Flink 双流 JOIN 是。。。

 

点击关注,第一时间了解华为云新鲜技术~

发布于: 刚刚阅读数: 2
用户头像

提供全面深入的云计算技术干货 2020.07.14 加入

华为云开发者社区,提供全面深入的云计算前景分析、丰富的技术干货、程序样例,分享华为云前沿资讯动态,方便开发者快速成长与发展,欢迎提问、互动,多方位了解云计算! 传送门:https://bbs.huaweicloud.com/

评论

发布
暂无评论
面试官: Flink双流JOIN了解吗? 简单说说其实现原理