写点什么

写一篇最近用 DM 的总结

  • 2024-07-26
    北京
  • 本文字数:3255 字

    阅读完需:约 11 分钟

作者: TiDBer_jYQINSnf 原文来源:https://tidb.net/blog/3465c343


最近有个任务要把上游的分库分表合并后同步到下游的 TiDB,鉴于我司自己个儿的同步工具只能单线程的同步一张表,效率比较低,有时候高峰期会因为同步工具的瓶颈导致延迟。当然也不是我司的同步工具特别菜,是有点水土不服,毕竟之前那个同步工具上下游都是 MySQL, MySQL 的延迟是肯定比 TiDB 低的,TiDB 的优势就是连接数可以搞很多,每次查询的延迟稍微大一些。所以这个工具就不那么好使了。


决定用 DM 试试。


用 DM 有个问题:


我司数据库权限管控比较严格,不能申请 LOCK TABLE 权限,这样的话 DM 的 extra-args: “–consistency ” 只能设置为 none


用这个选项倒是能先跑起来,但是心里还是没底,到底这样同步数据能不能做到一致呢?会不会在 dump 开始到 sync 期间的修改丢掉?找了一通资料也没找到会不会,好在 P 社的代码都是开源的,那拿出代码看看吧。


下面就跟着这个 extra-args: “–consistency none” 选项走一走,看看到底是怎么处理的。


DM 的代码在 tiflow 里面:


https://github.com/pingcap/tiflow


代码结构也很清晰:


从 DM 的配置文件也大概能看出来,分 4 个部分,mydumper、loader、syncer、checker



是不是真这样对应的我也没完全读所有代码,我关心的一致性的问题,主要在于备份那一块,我就是去 dumpling 里面去找找。


dumpling 有这么几个函数,主要关注 process



忽略里面的 failpoint 后,代码看起来很清晰。


if dumpling, err = export.NewDumper(newCtx, m.dumpConfig); err == nil {    m.mu.Lock()    m.core = dumpling    m.mu.Unlock()    err = dumpling.Dump()    dumpling.Close()  } else {    m.logger.Warn("error occurred during NewDumper", zap.Error(err))}
复制代码


func (d *Dumper) Dump() (dumpErr error) {  // 这里判断return consistency != ConsistencyTypeSnapshot || serverType != version.ServerTypeTiDB,  // 我的情况下: consistency=none,并且源也不是tidb,这里都是返回的true  repeatableRead := needRepeatableRead(conf.ServerInfo.ServerType, conf.Consistency)       conCtrl, err = NewConsistencyController(tctx, conf, pool)  if err = conCtrl.Setup(tctx); err != nil {    return errors.Trace(err)  }
// 这里执行:SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;START TRANSACTION /*!40108 WITH CONSISTENT SNAPSHOT */ metaConn, err := createConnWithConsistency(tctx, pool, repeatableRead) m.recordStartTime(time.Now()) // 注意看这段注释:对于 consistency 是 none 的情况,binlog 的 pos 可能早于dump的数据,所以要开启 safe-mode 确保数据安全。 // for consistency lock, we can write snapshot info after all tables are locked. // the binlog pos may changed because there is still possible write between we lock tables and write master status. // but for the locked tables doing replication that starts from metadata is safe. // for consistency flush, record snapshot after whole tables are locked. The recorded meta info is exactly the locked snapshot. // for consistency snapshot, we should use the snapshot that we get/set at first in metadata. TiDB will assure the snapshot of TSO. // for consistency none, the binlog pos in metadata might be earlier than dumped data. We need to enable safe-mode to assure data safety. // 这里面用 show master status 记录了 binlog的位置 err = m.recordGlobalMetaData(metaConn, conf.ServerInfo.ServerType, false)

// for other consistencies, we should get table list after consistency is set up and GlobalMetaData is cached if conf.Consistency != ConsistencyTypeLock { if err = prepareTableListToDump(tctx, conf, metaConn); err != nil { return err } }
atomic.StoreInt64(&d.totalTables, int64(calculateTableCount(conf.Tables)))
// 这里启动 goroutine,处理接受的命令 writers, tearDownWriters, err := d.startWriters(writerCtx, wg, taskOut, rebuildConn)
// 这里再记录一次 binlog的位置,因为这里链接已经设置了RR,并且连接已经开启了事务。从这个位置往后就可以安全的退出safe-mode了 if conf.PosAfterConnect { // record again, to provide a location to exit safe mode for DM err = m.recordGlobalMetaData(metaConn, conf.ServerInfo.ServerType, true) if err != nil { tctx.L().Info("get global metadata (after connection pool established) failed", log.ShortError(err)) } }
tableDataStartTime := time.Now()
baseConn := newBaseConn(metaConn, true, rebuildMetaConn)
if conf.SQL == "" { // 这里开始 dump 数据,里面构造sql语句,发送到taskIn这个chan里面,然后由上面启动的goroutine执行。 // 包括show databases,select * from xxx,根据一系列条件构造sql,dump数据。 if err = d.dumpDatabases(writerCtx, baseConn, taskIn); err != nil && !errors.ErrorEqual(err, context.Canceled) { return err } } else { d.dumpSQL(writerCtx, baseConn, taskIn) } close(taskIn) _ = baseConn.DBConn.Close() if err := wg.Wait(); err != nil { summary.CollectFailureUnit("dump table data", err) return errors.Trace(err) }
m.recordFinishTime(time.Now()) return nil}
复制代码


总的思路就是:对连接设置 RR, 然后开启事务,先记录 binlog 的位置,再 dump 数据,为了安全,会先开启一段时间的安全模式。


安全模式的意思是:


安全模式 (safe mode) 是 DM 在进行增量同步时候的一种运行模式,在安全模式中,DM 增量同步组件在同步 binlog event 时,将把所有 INSERT 和 UPDATE 操作强制进行改写后再在下游执行。

安全模式的目的是在增量同步过程中,同一条 binlog event 能够在下游被重复同步且保证幂等性,从而确保增量同步能够“安全”进行。

DM 从 checkpoint 恢复数据同步任务后,可能重复执行某些 binlog 事件而导致下述问题:

  1. 在进行增量同步过程中,执行 DML 的操作和写 checkpoint 的操作并不是同步的;写 checkpoint 的操作和写下游数据的操作也并不能保证原子性。因此,当 DM 异常退出时,checkpoint 可能只记录到退出时刻之前的一个恢复点

  2. 当 DM 重启同步任务,并从 checkpoint 重新开始增量数据同步时,checkpoint 之后的部分数据可能已经在异常退出前被处理过了,从而导致部分 SQL 语句重复执行

  3. 如果重复执行 INSERT 操作,会导致主键或唯一索引冲突,引发同步中断;如果重复执行 UPDATE 操作,会导致不能根据筛选条件找到之前对应的更新记录。

在安全模式下,通过改写 SQL 语句,DM 可以解决上述问题。

安全模式通过 SQL 语句改写来保证 binlog event 的幂等性。具体来说,在安全模式下:

  • INSERT 语句会被改写成 REPLACE 语句。

  • UPDATE 语句会被分析,得到该语句涉及的行的主键或唯一索引的值,然后改写成 DELETE + REPLACE 语句 :先根据主键或唯一索引的定位删除对应的行,然后使用 REPLACE 语句插入一条最新值的行记录。

REPLACE 操作是 MySQL 特有的数据插入语法。使用 REPLACE 语法插入数据时,如果新插入的数据和现有数据存在主键或唯一约束冲突,MySQL 会删除所有冲突的记录,然后再执行插入记录操作,相当于“强制插入”的操作。具体请参考 MySQL 官方文档的 REPLACE 语句相关介绍


也就是说即使设置了 extra-args: “–consistency none” ,DM dump 的数据也是连续的,可靠的,放心用就行。


另外吐槽:专栏这里贴了代码再加注释真实很难用,还是先在别的记事本里改好再贴过来比较方便。


发布于: 刚刚阅读数: 3
用户头像

TiDB 社区官网:https://tidb.net/ 2021-12-15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
写一篇最近用DM的总结_迁移_TiDB 社区干货传送门_InfoQ写作社区