Canal 如何实现数据库库事务的一致性
在 Canal 中关于事务 Event 的环形缓存区实现类为 EventTransactionBuffer。
[](()1.1 类图
EventTransactionBuffer 的类图如下:
根据类图我们可以到其存储结构还是比较简单的。
int bufferSize 环形缓存区的长度,默认为 1024,该长度必须为 2 的幂次方,因为对位运算非常友好。
int indexMask 环形缓存区下标掩码,其值为 bufferSize - 1 ,sequence * indexMask 能快速定位序号 sequence 所在环形缓存区中的具体下标。
CannalEntry.Entry[] entries 环形缓存区数据数组,即缓存区实际存储数据的内存区域,为数组结构,长度为 bufferSize。
AtomicLong putSequence 当前写入的序号,每调用 add 方法添加一条数据,该值增加一,可超过缓存区的实际长度。
AtomicLong flushSequence 当前已处理的数据序号,flushSequence <= putSequence,(putSequence - flushSequence)表示未处理的数据,即缓存区累积的有效数据。
TransactionFlushCallback flushCallbackflush 回调函数,这个和环形缓存区本身关系不大,这个与 Canal 特定业务的,环形缓存区中收集到一个完整的事务变更日志列表后,将这部分内容传入业务回调方法,并重新利用这些缓存空间。
环形缓存区的重大要义就是循环利用。
[](()1.2 环形缓存区存储实现
接下来我们通过其 add 方法来看一下环形缓存区的,在研究环形缓存区之前,将结合 8 个元素的环形缓存区进行讲解。
EventTransactionBuffer 的 add 方法代码如下:
![在这里插入图片描述](https://img-blog.csdnimg.cn/2 《一线大厂 Java 面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》开源 0200712151726412.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3ByZXN0aWdlZGluZw==,size_16,color_FFFFFF,t_70#pic_center)
首先根据 binlog 事件类型来决定是否调用 flush 方法,这个就是实现将一个事务的事务一起提交到消费端,回到环形缓存区的具体实现,我们重点关注 put 方法 与 flush 方法的实现。
EventTransactionBuffer#put
其实现的核心步骤:
检测当前环形缓存区是否已满,如果未满,则向缓存区中添加一条数据。添加数据的具体逻辑:
获取下一个写入的序号 next,等于当前已写入的序号 + 1,即 putSequence + 1。
通过 next & indexMask 取得放入 CannalEntry.Entry[] entries 中的下标,与 next % bufferSize 效果等同。
如果已满,则首先将缓存区中的数据刷新,即将未处理的数据 Java 开源项目【ali1024.coding.net/public/P7/Java/git】 全部抽取,提交到数据消费方,然后释放缓存区,继续添加数据。
最后
作为过来人,小编是整理了很多进阶架构视频资料、面试文档以及 PDF 的学习资料,针对上面一套系统大纲小编也有对应的相关进阶架构视频资料
评论