写点什么

开源交流丨批流一体数据集成框架 ChunJun 数据传输模块详解分享

作者:数栈DTinsight
  • 2022 年 8 月 24 日
    浙江
  • 本文字数:1993 字

    阅读完需:约 7 分钟

课件获取:关注公众号“ChunJun”,后台私信 “课件” 获得直播课件


视频回放:点击这里


ChunJun 开源项目地址:githubgitee 喜欢我们的项目给我们点个__ STAR!STAR!!STAR!!!(重要的事情说三遍)__


技术交流钉钉 qun:30537511


本期我们带大家回顾一下六六同学的直播分享《ChunJun 数据传输模块介绍》。

一、ChunJun 数据类型转换

1、类型转换解决的问题

大家一听到「ChunJun 数据类型转换」这个概念,可能会联想到上下游之间进行数据交互时会涉及到的隐式转换。如果上游和下游数据类型一致,则不需要对数据进行任何干预,直接进行下发即可。


但是大多数情况下会涉及到两个问题,一是上游的数据源类型和下游的数据源类型不一致。比如 MySql 的 varchar 类型要写到 HdfsOrc 文件里的 string 类型的话,在上游的表示是 varchar,在下游的表示是 string,但实际上中间段 java 的类型都是 string。


另外一种情况则是,上下游之间不止数据源类型不一样,数据类型也不一样,除了要做类型的映射之外,还需要对数据本身进行改动。比如,MySql 的 date 类型要写到下游 timestamp 类型,我们需要进行的操作是把 date 中的毫秒级的时间戳拿出来,转换成 timestamp 的类型,再往下游去写。


这样就引出了一个问题,如何建立所有数据源类型之间的映射/转换关系?下面将为大家解答这个问题。


2、类型映射概览

• client 端:在 Factory 类中通过 RawConverter 类建立映射关系


• source 端:将数据封装成 AbstractBaseColumn


• sink 端:通过 AbstractBaseColumn 中的转换方法将数据转换成对应类型



ChunJun 目前支持的数据类型映射关系图如下:


3、类型映射详解

以 Timestamp 为例,如果要写入到 Long 类型的话,根据上文展示的 ChunJun 数据类型映射关系图,最终映射到 TimestampColumn 中,具体流程如下图:



上面这个例子描述的是一个单独的字段,正常情况下,会处理多个字段,这时的类型映射详解情况如下图:



as 方法就是数据类型转换的方法。使用这个机制之后,在下游可以只关心需要的数据类型,增加开发效率。

二、ChunJun 数据传输过程

了解完 ChunJun 数据类型转换后,我们来为大家分享 ChunJun 的数据传输过程。

1、上下游数据传输方式

在 ChunJun 中进行同步作业,有两种情况,一是算子链打开的情况,上游的 Source 和下游的 Sink 会被合并成一个 task,有同一个线程去做调度;二是把算子链进行关闭,Source 和 Sink 各自形成一个 task,也有各自的线程去进行调度。


在算子链打开的情况下,上下游数据传输方式可分为两种,对象重用和拷贝。


● 对象重用


· 上下游数据传输使用方法调用的形式,将上游产生的数据的对象引用直接交给下游


· 上下游算子需要形成算子链,作业开启对象重用


· env.getConfig().enableObjectReuse();


● 拷贝


· 上游传输给下游的数据,需要经过一次深拷贝


· 上下游算子需要形成算子链


算子链的好处是可以减少序列化的操作,那么为什么我们还要引入序列化呢?因为 ChunJun 的特殊性。ChunJun 同步作业的话,只有上下游两个算子,且都对接了正式的数据源,读写的时候会导致线程堵塞。因此上限由网络 io 决定,如果断开算子链,cpu 会在一端线程阻塞的时候切换到另外一端。在序列化的性能较高时,线程上下文切换带来的性能下降完全可以被弥补。


经过测试,序列化的性能比对象重用和拷贝高 30%左右。


● 序列化


· 上下游数据传输依赖于网络传输。上游数据进行序列化成 byte 数组后进行网络传输,下游收到数据后需要进行反序列化


· 上下游之间不形成算子链



知道要做序列化后,会产生一些思考,带着这些疑问,接着往下看。


• 序列化和反序列化在什么时候发生?


• Flink 支持哪些序列化?


• 序列化是怎么做的?


• 怎么找到适合的序列化方式?


• 如何实现自定义的序列化?

2、序列化传输过程

下图是 ChunJun 在进行序列化操作时的数据传输链路图:


3、DataOutView

4、TypeInformation 介绍

5、kryo 序列化 &BaseSerializer

同样是序列化一个 int 对象,对 kryo 来说,首先需要知道它的类型,然后从高位到低位依次去写入。


DataOutputView 则是直接调用一个 writeInt 的方法,写一句关键代码即可:


UNSAFE.putInt(


this.buffer,


BASE_OFFSET + this.position, v);


三、ChunJun 序列化实现

1、ColumnRowData 序列化过程

ColumnRowData 序列化过程采取标志位+实际数据的方式,具体流程如下图:



相对于 kryo 的序列化来说:


· 实现了更密集的存储


· 兼容 null 值


· 减少了不必要的数据传输

2、BinaryRowData 结构


因为数据区一格只占 8 个字节,且每个 index 只能占到一位,所以肯定存在一些没法存储在 8 字节范围之内的数据,可变长度部分就是用来存放数据区无法存放的数据。

3、BinaryRowData-setNull 操作

看到上文的 null 值判断区,有些同学可能会好奇这是什么,又是怎么进行操作的。下图将对一个下标为 11 的数据去做 setnull 操作,进行简单介绍:


4、BinaryRowData 数据存储方式


袋鼠云开源框架钉钉技术交流群(30537511),欢迎对大数据开源项目有兴趣的同学加入交流最新技术信息,开源项目库地址:https://github.com/DTStack

发布于: 刚刚阅读数: 4
用户头像

还未添加个人签名 2021.05.06 加入

还未添加个人简介

评论

发布
暂无评论
开源交流丨批流一体数据集成框架ChunJun数据传输模块详解分享_数栈DTinsight_InfoQ写作社区