写点什么

TiDB Binlog 源码阅读系列文章(八)Loader Package 介绍

原文来源:https://tidb.net/blog/bae66851


本文作者:赵一霖


上一篇文章中,我们介绍了 Drainer server,其中 Syncer 的实现之一 MySQLSyncer 就是对 Loader 的封装。Loader 主要用于将 binlog 数据写入到支持 MySQL 协议的数据库。由于 Drainer/Reparo 等组件都需要类似功能,因此我们将 Loader 抽离成独立的 Package,供其他应用程序调用。Loader 代码主要在 TiDB Binlog repo 的 pkg/loader 目录下。


本文将从 Loader 的接口定义开始,由上至下的介绍 Loader 将 binlog 写入下游数据库的过程。

Loader Interface

Loader 组件的接口定义


type Loader interface {    // SetSafeMode 设置 SafeMode 状态    SetSafeMode(bool)
// GetSafeMode 获取 SafeMode 状态 GetSafeMode() bool
// Input 返回一个用于输入事务的 Channel Input() chan<- *Txn
// Successes 事务执行成功后,将输出到 Successes Successes() <-chan *Txn
// Close 安全关闭 Loader Close()
// Run 运行 Loader 相关流程 Run() error}
复制代码


Loader 将处理调用者传入的 Txn Cahnnel,Txn 的结构中包含 DDL、DML 等信息,将在下一篇中详细介绍。Loader 的使用者可以将待同步的事务传入 Input,并从 Successes 中接收事务提交成功的事件通知。Example Loader 是一个比较简单的 Loader 使用范例,可供大家参考。

SafeMode

了解 Binlog 组件的读者可能知道,drainer 有一个配置项 safe-mode。开启安全模式后,Loader 会将 Insert 语句换为 Replace 语句,将 Update 语句拆分为 Delete + Replace 语句,使得下游 MySQL/TiDB 可被重复写入。


在 Loader 中,我们通过 SetSafeModeGetSafeMode 来设置和获取 SafeMode 状态。SetSafeModeGetSafeMode 的实现如下:


func (s *loaderImpl) SetSafeMode(safe bool) {    if safe {        atomic.StoreInt32(&s.safeMode, 1)    } else {        atomic.StoreInt32(&s.safeMode, 0)    }}
func (s *loaderImpl) GetSafeMode() bool { v := atomic.LoadInt32(&s.safeMode) return v != 0}
复制代码


SetSafeModeGetSafeMode 实际上都是对 safeMode 这个变量进行原子操作,接下来我们来追踪一下 safeMode 变量,safeMode(*executor).singleExec 函数中被用到:


func (e *executor) singleExec(dmls []*DML, safeMode bool) error {    tx, err := e.begin()    if err != nil {        return errors.Trace(err)    }
for _, dml := range dmls { if safeMode && dml.Tp == UpdateDMLType { sql, args := dml.deleteSQL() if _, err := tx.autoRollbackExec(sql, args...); err != nil { return errors.Trace(err) }
sql, args = dml.replaceSQL() if _, err := tx.autoRollbackExec(sql, args...); err != nil { return errors.Trace(err) } } else if safeMode && dml.Tp == InsertDMLType { sql, args := dml.replaceSQL() if _, err := tx.autoRollbackExec(sql, args...); err != nil { return errors.Trace(err) } } else { sql, args := dml.sql() if _, err := tx.autoRollbackExec(sql, args...); err != nil { return errors.Trace(err) } } }
err = tx.commit() return errors.Trace(err)}

复制代码


发布于: 刚刚阅读数: 1
用户头像

TiDB 社区官网:https://tidb.net/ 2021-12-15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
TiDB Binlog 源码阅读系列文章(八)Loader Package 介绍_数据库开发_TiDB 社区干货传送门_InfoQ写作社区