TiCDC 源码阅读(五)TiCDC 对 DDL 的处理和 Filter 解析
作者: TiDB 社区小助手原文来源:https://tidb.net/blog/3e9df8d4
导读
本文是 TiCDC 源码解读的第五篇,本文将会介绍 TiCDC 对 DDL 的处理方式和 Filter 功能的实现(基于 TiCDC v6.5.0 版本代码) ,文章将会围绕以下 4 个问题展开。
为什么 TiCDC 只用 Owner 节点来同步 DDL?
DDL 事件会对同步任务的进度有什么影响?
TiCDC 是怎么在内部维护表的 Schema 信息的?
TiCDC 的 Filter 功能是怎么实现的?
希望在回答完这几个问题之后,大家能够对 TiCDC DDL 同步机制有所了解,并且能够对 Filter 模块的有比较深入的认识。
1: 同步框架回顾

第一期《TiCDC 架构概览》的文章中,我们认识到了 TiCDC 的 DML 同步流和 DDL 同步流是分开的。
从上面的架构图中可以看到, DML 的同步是由 Processor 进行的,数据流从上游的 TiKV 流入经过 Processor 内的 TablePipeline ,最后被同步到下游。而 DDL 同步则是由 Owner 进行的,OwnerDDLPuller 拉取上游发生的 DDL 事件,然后在内部经过一系列的处理之后,通过 DDLSink 同步到下游。
在深入认识 DDL 的处理细节之前,需要先结合以上的架构图,对下面几个实体有所了解:
OwnerSchemaStorage:由 Owner 持有,维护了当前所有表最新的 schema 信息,这些表的 schema 信息主要会被 scheduler 所使用,用来感知同步任务的表数量的变化;此外,还会被 owner 用来解析 ddlPuller 拉取到的 DDL 事件。
ProcessorSchemaStorage:由 Processor 持有,维护了当前所有表的多个版本的 schema 信息,这些信息会被 Mounter 用来解析行变更信息。
BarrierTs:由 Owner 向 Processor 发送的控制信息,它会让 Processor 把同步进度阻塞到 BarrierTs 所指定的值。TiCDC 内部有几种不同类型的 BarrierTs,为了简化叙述,本文中提到的 BarrierTs 仅表示 DDL 事件产生的 DDLBarrierTs。
OwnerDDLPuller:由 Owner 持有,负责拉取和过滤上游 TiDB 集群的 DDL 事件,并把它们缓存在一个队列中,等待 Owner 处理;此外,它还会维护一个 ResolvedTs,该值为上游 TiKV 发送过来的最新的 ResolvedTs,在没有 DDL 事件到来的时候,Owner 将会使用它来推进 DDLBirrierTs。
ProcessorDDLPuller:由 Processor 持有,负责拉取和过滤上游 TiDB 集群的 DDL 事件,然后把它们发送给 Processor 去更新 ProcessorSchemaStorage。
DDLSink:由 Owner 持有,负责执行 DDL 到下游。
2: 为什么 TiCDC 选择由 Owner 节点来同步 DDL ?
TiCDC 之所以选择由 Owner 来同步 DDL,是因为需要确保在同步一条 DDL 的时候:
所有早于该条 DDL 的行变更事件都已同步到下游。
所有晚于该条 DDL 的行变更都需要在该条 DDL 成功同步到下游之后才会继续同步。
否则,就有可能出现上下游数据不一致的情况。
要达到以上两个目的,就需要保证我们执行一条 DDL 之前,所有 Processor 上面的同步流都准确地停在 DDL 的 commitTs 这个时间点,等到 DDL 被成功同步到下游之后再恢复同步。
在当前 TiCDC 的同步模型中,Owner 是负责发号施令的角色,它拥有整个同步任务所有表的状态信息,能够对 Processor 下达停止同步的控制指令,也能够得知同步流是否停止在恰当的时刻上。所以,为了简化 DDL 的处理逻辑,我们选择仅由 Owner 节点来进行 DDL 的同步。
3 : DDL 事件会对同步任务的进度产生什么影响?
如上文所述,在同步一条 DDL 之前,我们需要先让同步任务的数据流准确地停止在该 DDL 的 commitTs 这个时刻。这是通过 Owner 计算并向 Processor 发送 BarrierTs 实现的,接下来将会结合下面的顺序图来详细讲解该流程。
-1680493738478.png)
OwnerDDLPuller 在 Changefeed 启动之后,就会持续监听上游 TiDB 集群发生的 DDL 事件,当接收到一条 DDL 事件之后,它会先对该条 DDL 事件进行过滤。如果该条 DDL 和 Changefeed 所需同步的表无关,则会被直接忽略,否则将会被加入到 OwnerDDLPuller 的 pendingDDL 队列中。
Owner 在每轮 tick 里面都会去检查 pendingDDL 队列中是否有待执行的 DDL 事件。如果有待执行的 DDL 事件,则会取队头的 DDL 的 CommitTs 为 DDLBarrierTs。然后,通过 Etcd 向 Processor 广播这个 DDLBarrierTs,Processor 收到之后,会把该值设置为所有 TablePipeline 同步数据流的上界,当同步流前进到这个时间点之后,就需要停下来等待。
Owner 在每轮 tick 内部都会检查当前 Changefeed 的 CheckpointTs 是否已经前进到 DDLBarrierTs 的值,若前进到该值,则说明该条 DDL 之前的所有 DML 事件都已经被成功同步到下游了。此时,Owner 会把 DDL 事件应用到 OwnerSchemaStorage 上,使得元数据和上游保持一致;然后,Owner 会调用 DDLSink 把该条 DDL 同步到下游。等到 DDL 成功执行之后,Owner 就会推进 DDLBarrierTs 为下一条未执行的 DDL 的 CommitTs,如果不存在未执行的 DDL 事件,那么 DDLBarrierTs 会被推进到 OwnerDDLPuller 维护的 ResolvedTs 值。这样,Processor 的同步流就能够继续向前推进。
在当前的实现中,Owner 对 DDL 事件的处理逻辑主要存在 handlerBarrier() 这个函数中,核心逻辑如下:
在 Processor 侧的主要逻辑则存在于 pushResolvedTs2Table() 这个函数中,核心逻辑如下:
通过上述的讲解,可以较为自然的得出这个结论:任意一张表的 DDL 事件会阻塞所有表的 DML 同步进度,因此上游执行耗时较长的 DDL 或者短时间内执行大量 DDL,都容易引起同步任务的延迟上升。
4 : TiCDC 是怎么在内部维护表的 Schema 信息的?
TiCDC 对表的 Schema 信息的维护是 Changefeed 级别的,每个 Changefeed 在 Owner 节点上都会拥有一份 Schema 信息,在每个 Processor 节点上也都会有一份 Schema 信息。
Schema 在 Changefeed 创建的时候被初始化,TiCDC 使用 Changefeed 的 startTs 从上游 TiKV 获取了一份 snapshot,并把 snapshot 里面所有的数据库和表信息都存储在 Schema 里。在 Changefeed 的运行过程中,TiCDC 会持续维护和更新 Schema 信息,每次有新的 DDL 事件到来的时候,TiCDC 都会把 DDL 事件应用到这个 Schema 上面,以保证 Schema 和上游 TiDB 中的 Schema 是一致的。
-1680493856355.png)
Owner 节点上的 Schema 中只保存了每张表最新的那份信息,原因在于 Owner 节点只负责 DDL 的同步,并且 TiCDC 保证了 DDL 的同步是线性有序的,它在解析下一条 DDL 的时候,只需要上一条 DDL 执行结束的时候的 Schema 信息就可以确保解析的正确性。除此之外,Scheduler 还会调用 Schema 提供的 AllPhysicalTables() 方法来感知当前是否有表的增减,触发调度任务。
-1680493892431.png)
而 Processor 节点上的 SchemaStorage 中则保存了每张表最近几个版本的信息,因为 Processor 需要负责 DML 的同步,而 DML 的同步进度是有可能落后于 Processor 节点上 DDLPuller 拉取 DDL 事件的速度的。所以,为了能够正确地解析 DML 事件,我们需要在 SchemaStorage 中维护 CheckpointTs 之后版本的表信息,而 CheckpointTs 之前的信息则可以清理掉。TiCDC 保证 CheckpointTs 之前的变更事件都已经被同步到下游,也就肯定不会再需要解析 CheckpointTs 的 DML 事件了。
需要注意的是,在上游短时间发生大量的 DDL 时,SchemaStorage 需要频繁地进行更新,并且会短时间内产生多个版本的 Schema 信息,有可能造成 TiCDC 内存使用量大幅上升。
5 : TiCDC 的 Filter 功能是怎么实现的?
目前 TiCDC 的 Filter 的基本功能如下:
同步或者忽略用户指定的库或者表
过滤 TiCDC 不支持同步的 DDL 事件
过滤用户指定忽略的 DDL 事件
过滤用户指定忽略的行变更事件
Filter 主要会被 DDLPuller、Mounter、SchemaStorage 这三者调用,用来实现以上提到的几个功能。若需要了解如何配置 Filter,可以参考文档:Changefeed 日志过滤器。接下来,我们将会从源码的角度来了解 Filter 的相关实现。
上面即是 Filter 的接口定义,大家可以从接口中的方法名和注释就了解到 Filter 具有什么样的功能。实现 Filter 接口的结构体定义如下:
下面详细介绍一下组成 filter 的几个结构体:
tableFilter 是表库过滤器,根据用户指定的规则来同步或者过滤对应的表和库,它是通过表名和库名在 changefeed 初始化的阶段进行过滤的。如果用户在 Filter 规则中配置了只同步某张表,那么 changefeed 就只会拉取该表的变更事件。
dmlExprFilter 是 sql 表达式过滤器,实现了通过用户指定的 SQL 表达式来过滤对应 DML 事件的功能。该行为是在 Mounter 中进行的,它会根据用户提供的 sql 表达式来对每一行变更进行计算,过滤掉符合计算结果的行变更事件。
sqlEventFilter 是事件类型过滤器,它根据用户指定的事件类型来过滤符合条件的 DDL 或者 DML 事件。该行为也是在 Mounter 中进行的。
ignoreTxnStartTs 则是根据指定的 startTs 来过滤事件,一般不推荐用户使用。
以上几个结构体内部的实现逻辑都较为简单,整个 Filter 接口的方法就是由这几个结构体提供的方法组合而成的,感兴趣的读者可以自行点进源码链接进行阅读。比较值得注意的一点是 TiCDC 对 DDL 事件的同步支持,目前 TiCDC 对 DDL 同步采用的是白名单模式,仅支持同步白名单内的 DDL 事件。因此,当接收到非白名单事件的 DDL 时,TiCDC 会直接丢弃。
以上就是本文章的全部内容,希望读者看完之后能够对 TiCDC 有更深入的认识。
版权声明: 本文为 InfoQ 作者【TiDB 社区干货传送门】的原创文章。
原文链接:【http://xie.infoq.cn/article/01c36891d2aa31ea52a27ff0f】。文章转载请联系作者。
评论