大数据 Flink 培训:维表 Join/ 双流 Join 的方法
一、背景
事实表通常存储在 kafka 中,维表通常存储在外部设备中(比如 MySQL,HBase)。对于每条流式数据,可以关联一个外部维表数据源,为实时计算提供数据关联查询。维表可能是会不断变化的,在维表 JOIN 时,需指明这条记录关联维表快照的时刻。需要注意是,目前 Flink SQL 的维表 JOIN 仅支持对当前时刻维表快照的关联(处理时间语义),而不支持事实表 rowtime 所对应的的维表快照。
二、维表 Join
预加载维表
将维表全量预加载到内存里去做关联,具体的实现方式就是我们定义一个类,去实现 RichFlatMapFunction,然后在 open 函数中读取维度数据库,再将数据全量的加载到内存,然后在 probe 流上使用算子 ,运行时与内存维度数据做关联。
这个方案的优点就是实现起来比较简单,缺点也比较明显,因为我们要把每个维度数据都加载到内存里面,所以它只支持少量的维度数据。同时如果我们要去更新维表的话,还需要重启作业,所以它在维度数据的更新方面代价是有点高的,而且会造成一段时间的延迟。对于预加载维表来说,它适用的场景就是小维表,变更频率诉求不是很高,且对于变更的及时性的要求也比较低的这种场景。
改进:open()新建一个线程定时加载维表,这样就不需要人工的去重启 Job 来让维度数据做更新,可以实现一个周期性的维度数据的更新_大数据培训。
distributed cache
通过 Distributed cash 的机制去分发本地的维度文件到 Task Manager 后再加载到内存做关联。实现方式可以分为三步:
通过 env.registerCached 注册文件。
实现 RichFunction,在 open 函数里面通过 RuntimeContext 来获取 Cache 文件。
解析和使用这部分文件数据。
因为数据要加载到内存中,所以支持的数据量比较小。而且如果维度数据需要更新,也是需要重启作业的。
那么它适用的场景就是维度数据是文件形式的、数据量比较小、并且更新的频率也比较低的一些场景,比如说我们读一个静态的码表、配置文件等等。
热存储关联
把维度数据导入到像 Redis、Tair、HBase 这样的一些热存储中,然后通过异步 IO 去查询,并且叠加使用 Cache 机制,还可以加一些淘汰的机制,最后将维度数据缓存在内存里,来减轻整体对热存储的访问压力。
如上图展示的这样的一个流程。在 Cache 这块的话,比较推荐谷歌的 Guava Cache,它封装了一些关于 Cache 的一些异步的交互,还有 Cache 淘汰的一些机制,用起来是比较方便的。
异步 IO 可以并行发出多个请求,整个吞吐是比较高的,延迟会相对低很多。如果使用异步 IO 的话,它对于外部存储的吞吐量上升以后,会使得外部存储有比较大的压力,有时也会成为我们整个数据处理上延迟的瓶颈。所以引入 Cache 机制是希望通过 Cache 来去减少我们对外部存储的访问量。
这个方案的优点就是维度数据不用全量加载到内存中,不受限于内存大小。但是需要依赖热存储资源,再加上 cache 过期时间,所以最后结果会有一定的延迟。适用于维度数据量比较大,能接受维度更新有一定延迟的情况。
广播维表
利用 Broadcast State 将维度数据流广播到下游 Task 做 Join。
将维度数据发送到 Kafka 作为广播原始流 S1
定义状态描述符 MapStateDescriptor。调用 S1.broadcast(),获得 broadCastStream S2
调用非广播流 S3.connect(S2),得到 BroadcastConnectedStream S4
在 KeyedBroadcastProcessFunction/BroadcastProcessFunction 实现关联处理逻辑,并作为参数调用 S4.process()
广播维表维度的变更可以及时的更新到结果,但是数据还是需要保存在内存中,因为它是存在 State 里的,所以支持维表数据量仍然不是很大。适用的场景就是我们需要时时的去感知维度的变更,且维度数据又可以转化为实时流。
Temporal table function join
首先说明一下什么是 Temporal table?它其实是一个概念:就是能够返回持续变化表的某一时刻数据内容的视图,持续变化表也就是 Changingtable,可以是一个实时的 Changelog 的数据,也可以是放在外部存储上的一个物化的维表。
它的实现是通过 UDTF 去做 probe 流和 Temporal table 的 join,称之 Temporal table function join。这种 Join 的方式,它适用的场景是维度数据为 Changelog 流的形式,而且我们有需要按时间版本去关联的诉求。
在 Changelog 流上面去定义 TemporalTableFunction,这里面有两个关键的参数是必要的。第 1 个参数就是能够帮我们去识别版本信息的一个 Time attribute,第 2 个参数是需要去做关联的组件。
在 tableEnv 里面去注册 TemporalTableFunction 的名字。
维表 Join 方案对比
三、双流 Join
批处理有两种方式处理两个表的 Join,一种是基于排序的 Sort-Merge Join,更一种是转化为 Hash Table 加载到内存里做 Hash Join。
在双流 Join 的场景中,Join 的对象是两个流,数据是不断进入的,所以我们 Join 的结果也是需要持续更新的。基本思路是将一个无线的数据流,尽可能拆分成有限数据集去做 Join。
Regular Join
这种 Join 方式需要去保留两个流的状态,持续性地保留并且不会去做清除。两边的数据对于对方的流都是所有可见的,所以数据就需要持续性的存在 State 里面,那么 State 又不能存的过大,因此这个场景的只适合有界数据流。
Interval Join
加入了一个时间窗口的限定,要求在两个流做 Join 的时候,其中一个流必须落在另一个流的时间戳的一定时间范围内,并且它们的 Join key 相同才能够完成 Join。加入了时间窗口的限定,就使得我们可以对超出时间范围的数据做一个清理,这样的话就不需要去保留全量的 State。
Interval Join 是同时支持 processing time 和 even time 去定义时间的。如果使用的是 processing time,Flink 内部会使用系统时间去划分窗口,并且去做相关的 state 清理。如果使用 even time 就会利用 Watermark 的机制去划分窗口,并且做 State 清理。
Window join
将两个流中有相同 key 和处在相同 window 里的元素去做 Join。它的执行的逻辑比较像 Inner Join,必须同时满足 Join key 相同,而且在同一个 Window 里元素才能够在最终结果中输出。
文章来源于大数据技术与架构
评论