TiDB 同步数据到 Kafka 最佳实践
作者: Billmay 原文来源:https://tidb.net/blog/192ff559
本文介绍如何使用 TiCDC 创建一个将增量数据复制到 Kafka 的 Changefeed。
创建同步任务,复制增量数据 Kafka
使用以下命令来创建同步任务:
--changefeed-id
:同步任务的 ID,格式需要符合正则表达式^[a-zA-Z0-9]+(\-[a-zA-Z0-9]+)*$
。如果不指定该 ID,TiCDC 会自动生成一个 UUID(version 4 格式)作为 ID。--sink-uri
:同步任务下游的地址,详见:Sink URI 配置 Kafka。--start-ts
:指定 changefeed 的开始 TSO。TiCDC 集群将从这个 TSO 开始拉取数据。默认为当前时间。--target-ts
:指定 changefeed 的目标 TSO。TiCDC 集群拉取数据直到这个 TSO 停止。默认为空,即 TiCDC 不会自动停止。--config
:指定 changefeed 配置文件,详见:TiCDC Changefeed 配置参数。
Sink URI 配置 kafka
Sink URI 用于指定 TiCDC 目标系统的连接信息,遵循以下格式:
一个通用的配置样例如下所示:
URI 中可配置的的参数如下:
最佳实践
TiCDC 推荐用户自行创建 Kafka Topic,你至少需要设置该 Topic 每次向 Kafka broker 发送消息的最大数据量和下游 Kafka partition 的数量。在创建 changefeed 的时候,这两项设置分别对应
max-message-bytes
和partition-num
参数。如果你在创建 changefeed 时,使用了尚未存在的 Topic,那么 TiCDC 会尝试使用
partition-num
和replication-factor
参数自行创建 Topic。建议明确指定这两个参数。在大多数情况下,建议使用
canal-json
协议。
注意
当 protocol
为 open-protocol
时,TiCDC 会尽量避免产生长度超过 max-message-bytes
的消息。但如果单条数据变更记录需要超过 max-message-bytes
个字节来表示,为了避免静默失败,TiCDC 会试图输出这条消息并在日志中输出 Warning。
TiCDC 使用 Kafka 的认证与授权
使用 Kafka 的 SASL 认证时配置样例如下所示:
SASL/PLAIN
SASL/SCRAM
SCRAM-SHA-256、SCRAM-SHA-512 与 PLAIN 方式类似,只需要将
sasl-mechanism
指定为对应的认证方式即可。SASL/GSSAPI
SASL/GSSAPI user
类型认证:
sasl-gssapi-user
和 sasl-gssapi-realm
的值与 kerberos 中指定的 principle 有关。例如,principle 为 alice/for-kafka@example.com
,则 sasl-gssapi-user
和 sasl-gssapi-realm
的值应该分别指定为 alice/for-kafka
和 example.com
。
SASL/GSSAPI keytab
类型认证:
SASL/GSSAPI 认证方式详见 Configuring GSSAPI。
TLS/SSL 加密
如果 Kafka broker 启用了 TLS/SSL 加密,则需要在
--sink-uri
中增加enable-tls=true
参数值。如果需要使用自签名证书,则还需要在--sink-uri
中指定ca
、cert
跟key
几个参数。ACL 授权
TiCDC 能够正常工作所需的最小权限集合如下:
对 Topic 资源类型的
Create
和Write
权限。对 Cluster 资源类型的
DescribeConfigs
权限。
TiCDC 集成 Kafka Connect (Confluent Platform)
如要使用 Confluent 提供的 data connectors 向关系型或非关系型数据库传输数据,请选择 avro
协议,并在 schema-registry
中提供 Confluent Schema Registry 的 URL。
配置样例如下所示:
集成具体步骤详见与 Confluent Cloud 进行数据集成。
自定义 Kafka Sink 的 Topic 和 Partition 的分发规则
Matcher 匹配规则
以上一节示例配置文件中的 dispatchers 配置项为例:
对于匹配了 matcher 规则的表,按照对应的 topic 表达式指定的策略进行分发。例如表 test3.aa,按照 topic 表达式 2 分发;表 test5.aa,按照 topic 表达式 3 分发。
对于匹配了多个 matcher 规则的表,以靠前的 matcher 对应的 topic 表达式为准。例如表 test1.aa,按照 topic 表达式 1 分发。
对于没有匹配任何 matcher 的表,将对应的数据变更事件发送到 –sink-uri 中指定的默认 topic 中。例如表 test10.aa 发送到默认 topic。
对于匹配了 matcher 规则但是没有指定 topic 分发器的表,将对应的数据变更发送到 –sink-uri 中指定的默认 topic 中。例如表 test6.aa 发送到默认 topic。
Topic 分发器
Topic 分发器用 topic = “xxx” 来指定,并使用 topic 表达式来实现灵活的 topic 分发策略。topic 的总数建议小于 1000。
Topic 表达式的基本规则为 [prefix]{schema}[middle][{table}][suffix]
,详细解释如下:
prefix
:可选项,代表 Topic Name 的前缀。{schema}
:必选项,用于匹配库名。middle
:可选项,代表库表名之间的分隔符。{table}
:可选项,用于匹配表名。suffix
:可选项,代表 Topic Name 的后缀。
其中 prefix
、middle
以及 suffix
仅允许出现大小写字母(a-z
、A-Z
)、数字(0-9
)、点号(.
)、下划线(_
)和中划线(-
);{schema}
、{table}
均为小写,诸如 {Schema}
以及 {TABLE}
这样的占位符是无效的。
一些示例如下:
matcher = ['test1.table1', 'test2.table2'], topic = "hello_{schema}_{table}"
对于表
test1.table1
对应的数据变更事件,发送到名为hello_test1_table1
的 topic 中对于表
test2.table2
对应的数据变更事件,发送到名为hello_test2_table2
的 topic 中matcher = ['test3.*', 'test4.*'], topic = "hello_{schema}_world"
对于
test3
下的所有表对应的数据变更事件,发送到名为hello_test3_world
的 topic 中对于
test4
下的所有表对应的数据变更事件,发送到名为hello_test4_ world
的 topic 中matcher = ['*.*'], topic = "{schema}_{table}"
对于 TiCDC 监听的所有表,按照“库名 _ 表名”的规则分别分发到独立的 topic 中;例如对于
test.account
表,TiCDC 会将其数据变更日志分发到名为test_account
的 Topic 中。
DDL 事件的分发
库级别 DDL
诸如 create database
、drop database
这类和某一张具体的表无关的 DDL,称之为库级别 DDL。对于库级别 DDL 对应的事件,被发送到 --sink-uri
中指定的默认 topic 中。
表级别 DDL
诸如 alter table
、create table
这类和某一张具体的表相关的 DDL,称之为表级别 DDL。对于表级别 DDL 对应的事件,按照 dispatchers 的配置,被发送到相应的 topic 中。
例如,对于 matcher = ['test.*'], topic = {schema}_{table}
这样的 dispatchers 配置,DDL 事件分发情况如下:
若 DDL 事件中涉及单张表,则将 DDL 事件原样发送到相应的 topic 中。
对于 DDL 事件
drop table test.table1
,该事件会被发送到名为test_table1
的 topic 中。若 DDL 事件中涉及多张表(
rename table
/drop table
/drop view
都可能涉及多张表),则将单个 DDL 事件拆分为多个发送到相应的 topic 中。对于 DDL 事件
rename table test.table1 to test.table10, test.table2 to test.table20
,则将rename table test.table1 to test.table10
的 DDL 事件发送到名为test_table1
的 topic 中,将rename table test.table2 to test.table20
的 DDL 事件发送到名为test.table2
的 topic 中。
Partition 分发器
partition 分发器用 partition = “xxx” 来指定,支持 default、ts、index-value、table 四种 partition 分发器,分发规则如下:
default:有多个唯一索引(包括主键)时按照 table 模式分发;只有一个唯一索引(或主键)按照 index-value 模式分发;如果开启了 old value 特性,按照 table 分发
ts:以行变更的 commitTs 做 Hash 计算并进行 event 分发
index-value:以表的主键或者唯一索引的值做 Hash 计算并进行 event 分发
table:以表的 schema 名和 table 名做 Hash 计算并进行 event 分发
注意
从 v6.1 开始,为了明确配置项的含义,用来指定 partition 分发器的配置项由原来的 dispatcher
改为 partition
,partition
为 dispatcher
的别名。例如,以下两条规则完全等价:
但是 dispatcher
与 partition
不能出现在同一条规则中。例如,以下规则非法:
版权声明: 本文为 InfoQ 作者【TiDB 社区干货传送门】的原创文章。
原文链接:【http://xie.infoq.cn/article/e85179dc26873511e869579ee】。文章转载请联系作者。
评论