写点什么

上游 sql 通过 drainer 同步到 kafka 时在 kafka 中是什么样子的

  • 2022 年 8 月 26 日
    北京
  • 本文字数:9416 字

    阅读完需:约 31 分钟

作者: qhd2004 原文来源:https://tidb.net/blog/680955d5


环境:上游为 v5.4.1 版本 tidb 集群,下游为 2.12-2.4.1 版本 kafka 集群,使用 drainer 进行同步数据


本文对上游中的 ddl、dml 在下游是如何体现,以及是否会对同步产生影响,做个抛砖引玉的介绍,相关测试过程如下:


drainer 的配置


- host: 10.103.236.178  ssh_port: 22  port: 8239  deploy_dir: /data/tidb-deploy/drainer-8239  data_dir: /data/tidb-data/drainer-8239  log_dir: log  config:    syncer.db-type: kafka    syncer.ignore-schemas: INFORMATION_SCHEMA,PERFORMANCE_SCHEMA,mysql,db_name    syncer.to.kafka-addrs: 10.xxx.xxx.10:9092,10.xxx.xxx.11:9092,10.xxx.xxx.59:9092    syncer.to.kafka-max-messages: 1024    syncer.to.kafka-version: 2.4.1    syncer.to.topic-name: syk-test-binlog-to-kafka  arch: amd64  os: linux
复制代码


注意:也可以设置 replicate-do-db 来指定只复制哪些库,但是这个 replicate-do-db 参数我在测试时,是不成功的,体现在 drainer 这个服务根本就启动不起来。

1,上游执行 create

create table moe_test(    id int(3) auto_increment not null primary key,    name char(10) not null,    address varchar(50) default 'beijing',    year date);
复制代码


drainer 日志


[2022/08/19 11:17:06.836 +08:00] [INFO] [collector.go:285] ["start query job"] [id=105937] [binlog="tp:Commit start_ts:435389471196708868 commit_ts:435389471196708871 prewrite_key:\"mDB:5840\\000\\376\\000\\000\\000\\000\\000\\000\\000hTable:10\\3775936\\000\\000\\000\\000\\373\" ddl_query:\"create table moe_test\\n(\\n    id int(3) auto_increment not null primary key,\\n    name char(10) not null,\\n    address varchar(50) default 'beijing',\\n    year date\\n)\" ddl_job_id:105937 ddl_schema_state:5 "][2022/08/19 11:17:06.863 +08:00] [INFO] [collector.go:307] ["get ddl job"] [job="ID:105937, Type:create table, State:synced, SchemaState:public, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 11:17:05.256 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"][2022/08/19 11:17:06.973 +08:00] [INFO] [syncer.go:518] ["add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed"] [sql="create table moe_test\n(\n    id int(3) auto_increment not null primary key,\n    name char(10) not null,\n    address varchar(50) default 'beijing',\n    year date\n)"] ["commit ts"=435389471196708871] [shouldSkip=false][2022/08/19 11:17:06.999 +08:00] [INFO] [sarama.go:122] ["[sarama] client/metadata fetching metadata for [[syk-test-binlog-to-kafka] 10.xxx.xxx.59:9092] from broker %!s(MISSING)\n"][2022/08/19 11:17:07.023 +08:00] [INFO] [sarama.go:122] ["[sarama] Connected to broker at [10.xxx.xxx.59:9092] (unregistered)\n"][2022/08/19 11:17:07.096 +08:00] [INFO] [sarama.go:122] ["[sarama] client/brokers registered new broker #[2 %!d(string=10.xxx.xxx.10:9092)] at %!s(MISSING)"][2022/08/19 11:17:07.096 +08:00] [INFO] [sarama.go:122] ["[sarama] client/brokers registered new broker #[3 %!d(string=10.xxx.xxx.11:9092)] at %!s(MISSING)"][2022/08/19 11:17:07.096 +08:00] [INFO] [sarama.go:122] ["[sarama] client/brokers registered new broker #[1 %!d(string=10.xxx.xxx.59:9092)] at %!s(MISSING)"][2022/08/19 11:17:07.096 +08:00] [INFO] [client.go:902] ["[sarama] client/metadata found some partitions to be leaderless"][2022/08/19 11:17:07.096 +08:00] [INFO] [client.go:870] ["[sarama] client/metadata retrying after 500ms... (10000 attempts remaining)\n"][2022/08/19 11:17:07.598 +08:00] [INFO] [sarama.go:122] ["[sarama] client/metadata fetching metadata for [[syk-test-binlog-to-kafka] 10.xxx.xxx.59:9092] from broker %!s(MISSING)\n"][2022/08/19 11:17:07.612 +08:00] [INFO] [async_producer.go:744] ["[sarama] producer/broker/2 starting up\n"][2022/08/19 11:17:07.613 +08:00] [INFO] [async_producer.go:760] ["[sarama] producer/broker/2 state change to [open] on syk-test-binlog-to-kafka/0\n"][2022/08/19 11:17:07.626 +08:00] [INFO] [sarama.go:122] ["[sarama] Connected to broker at [10.xxx.xxx.10:9092 %!s(int32=2)] (registered as #%!d(MISSING))\n"]
复制代码


kafka 中查询



小结:在 drainer 日志中 create 语句、kafka broker 信息、topic 都体现出来了。查看 kafka 的内容,发现的是 create 的 sql 语句。因此,对于 create 语句,不会产生大的 binlog,也不会引起 kafka server: Message was too large

2,上游执行 insert

insert into moe_test (name,address,year) values('allen','大连一中','1976-10-10');insert into moe_test (name,address,year) values('jack','大连二中','1975-12-23');insert into moe_test (name,address,year) values('jordan','芝加哥公牛','1984-03-23');
复制代码


drainer 日志


[2022/08/19 12:25:57.918 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435390554362609666] [version=130819]
复制代码


kafka 中查询



小结:对于 insert,kafka 中内容是表结构与行数据都有体现,因此,对于如果是大量 insert 语句,是可能产生大的 binlog,也可能引起 kafka server: Message was too large,也可能引起下面错误


[2022/08/18 18:16:30.214 +08:00] [INFO] [pump.go:166] ["receive big size binlog"] [size="624 MB"][2022/08/18 18:16:30.577 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373417798303755] [version=202807][2022/08/18 18:17:51.315 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373417811410954] [version=202807][2022/08/18 18:17:54.373 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373440801177602] [version=202807][2022/08/18 18:17:57.460 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373441535442957] [version=202807][2022/08/18 18:33:27.895 +08:00] [ERROR] [syncer.go:533] ["Failed to close syncer"] [error="fail to push msg to kafka after 30s, check if kafka is up and working"] [errorVerbose="fail to push msg to kafka after 30s, check if kafka is up and working\ngithub.com/pingcap/tidb-binlog/drainer/sync.(*KafkaSyncer).run\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/sync/kafka.go:236\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1371"][2022/08/18 18:33:27.895 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373669469650948] [version=202816][2022/08/18 18:33:31.311 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373669469650950] [version=202816][2022/08/18 18:33:37.896 +08:00] [INFO] [server.go:465] ["begin to close drainer server"][2022/08/18 18:33:37.896 +08:00] [ERROR] [util.go:69] ["Recovered from panic"] [err="\"Waiting too long for `Syncer.run` to quit.\""] ["real stack"="github.com/pingcap/tidb-binlog/drainer.(*taskGroup).start.func1.1\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/util.go:71\nruntime.gopanic\n\t/usr/local/go/src/runtime/panic.go:965\ngithub.com/pingcap/tidb-binlog/drainer.(*Syncer).run\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/syncer.go:539\ngithub.com/pingcap/tidb-binlog/drainer.(*Syncer).Start\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/syncer.go:151\ngithub.com/pingcap/tidb-binlog/drainer.(*Server).Start.func4\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/server.go:290\ngithub.com/pingcap/tidb-binlog/drainer.(*taskGroup).start.func1\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/util.go:79"] [name=syncer][2022/08/18 18:33:37.896 +08:00] [INFO] [util.go:76] [Exit] [name=syncer][2022/08/18 18:33:37.927 +08:00] [INFO] [server.go:430] ["has already update status"] [id=10.xxx.xxx.59:8249][2022/08/18 18:33:37.927 +08:00] [INFO] [server.go:469] ["commit status done"][2022/08/18 18:33:37.927 +08:00] [INFO] [collector.go:136] ["publishBinlogs quit"][2022/08/18 18:33:37.927 +08:00] [INFO] [util.go:76] [Exit] [name=heartbeat][2022/08/18 18:33:37.927 +08:00] [INFO] [pump.go:77] ["pump is closing"] [id=10.xxx.xxx.xxx:8250][2022/08/18 18:33:37.927 +08:00] [INFO] [pump.go:77] ["pump is closing"] [id=10.xxx.xxx.xxx:8250][2022/08/18 18:33:37.927 +08:00] [INFO] [util.go:76] [Exit] [name=collect][2022/08/18 18:33:37.927 +08:00] [INFO] [main.go:73] ["drainer exit"]
复制代码

3,上游执行 delete

delete from moe_test where id=1;
复制代码


drainer 日志


[2022/08/19 12:30:42.362 +08:00] [INFO] [sarama.go:122] ["[sarama] client/metadata fetching metadata for [[syk-test-binlog-to-kafka] 10.99.110.11:9092] from broker %!s(MISSING)\n"][2022/08/19 12:30:45.497 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435390629322424322] [version=130819]
复制代码


kafka 中查询



小结:对于 delete 来说,可能情况跟上面的 insert 一样。

4,上游执行 update

update moe_test set address='xxxxxx' where id=2;
复制代码


drainer 日志


[2022/08/19 12:35:46.485 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435390708280459266] [version=130819]
复制代码


kafka 中查询



小结:update 在 kafka 中是把更新前的值与更新的值一起体现出来,其他可能的情况跟 insert 一样。

5,上游执行列操作

alter table moe_test drop column year;
复制代码


drainer 日志


[2022/08/19 12:38:02.341 +08:00] [INFO] [collector.go:285] ["start query job"] [id=105938] [binlog="tp:Commit start_ts:435390744390795265 commit_ts:435390744403902465 prewrite_key:\"mDB:5840\\000\\376\\000\\000\\000\\000\\000\\000\\000hTable:10\\3775936\\000\\000\\000\\000\\373\" ddl_query:\"alter table moe_test drop column year\" ddl_job_id:105938 ddl_schema_state:1 "][2022/08/19 12:38:03.347 +08:00] [INFO] [collector.go:307] ["get ddl job"] [job="ID:105938, Type:drop column, State:synced, SchemaState:queueing, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 12:38:02.056 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"][2022/08/19 12:38:03.347 +08:00] [INFO] [schema.go:289] ["Got DeleteOnly Job"] [job="ID:105938, Type:drop column, State:synced, SchemaState:delete only, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 12:38:02.056 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"][2022/08/19 12:38:03.347 +08:00] [INFO] [syncer.go:454] ["Syncer skips DeleteOnly DDL"] [job="ID:105938, Type:drop column, State:synced, SchemaState:delete only, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 12:38:02.056 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"] [ts=435390744403902465][2022/08/19 12:38:07.549 +08:00] [INFO] [collector.go:285] ["start query job"] [id=105938] [binlog="tp:Commit start_ts:435390744430116865 commit_ts:435390744430116866 prewrite_key:\"mDB:5840\\000\\376\\000\\000\\000\\000\\000\\000\\000hTable:10\\3775936\\000\\000\\000\\000\\373\" ddl_query:\"alter table moe_test drop column year\" ddl_job_id:105938 "][2022/08/19 12:38:07.551 +08:00] [INFO] [collector.go:307] ["get ddl job"] [job="ID:105938, Type:drop column, State:synced, SchemaState:queueing, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 12:38:02.056 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"][2022/08/19 12:38:07.552 +08:00] [INFO] [schema.go:501] ["Finished dropping column"] [job="ID:105938, Type:drop column, State:synced, SchemaState:queueing, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 12:38:02.056 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"][2022/08/19 12:38:07.563 +08:00] [INFO] [syncer.go:518] ["add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed"] [sql="alter table moe_test drop column year"] ["commit ts"=435390744430116866] [shouldSkip=false][2022/08/19 12:38:07.577 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435390744430116866] [version=130823][2022/08/19 12:38:12.697 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435390746579697665] [version=130823]
复制代码


kafka 中查询



小结:日志与 kafka 内容中都体现的是 alter 语句,不会产生大的 binlog,也不会引起 kafka server: Message was too large。

6,上游执行 truncate

truncate table moe_test;
复制代码


drainer 日志


[2022/08/19 14:05:31.893 +08:00] [INFO] [collector.go:285] ["start query job"] [id=105940] [binlog="tp:Commit start_ts:435392119781720073 commit_ts:435392119781720075 prewrite_key:\"mDB:5840\\000\\376\\000\\000\\000\\000\\000\\000\\000hTID:1059\\37736\\000\\000\\000\\000\\000\\000\\371\" ddl_query:\"truncate table moe_test\" ddl_job_id:105940 ddl_schema_state:5 "][2022/08/19 14:05:31.897 +08:00] [INFO] [collector.go:307] ["get ddl job"] [job="ID:105940, Type:truncate table, State:synced, SchemaState:public, SchemaID:5840, TableID:105936, RowCount:0, ArgLen:0, start time: 2022-08-19 14:05:28.806 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"][2022/08/19 14:05:31.926 +08:00] [INFO] [syncer.go:518] ["add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed"] [sql="truncate table moe_test"] ["commit ts"=435392119781720075] [shouldSkip=false][2022/08/19 14:05:31.926 +08:00] [INFO] [async_producer.go:1011] ["[sarama] producer/broker/2 state change to [closing] because kafka: broker not connected\n"][2022/08/19 14:05:31.926 +08:00] [INFO] [async_producer.go:611] ["[sarama] producer/leader/syk-test-binlog-to-kafka/0 state change to [retrying-1]\n"][2022/08/19 14:05:31.926 +08:00] [INFO] [async_producer.go:621] ["[sarama] producer/leader/syk-test-binlog-to-kafka/0 abandoning broker 2\n"][2022/08/19 14:05:31.926 +08:00] [INFO] [async_producer.go:750] ["[sarama] producer/broker/2 input chan closed\n"][2022/08/19 14:05:31.926 +08:00] [INFO] [async_producer.go:843] ["[sarama] producer/broker/2 shut down\n"][2022/08/19 14:05:32.427 +08:00] [INFO] [sarama.go:122] ["[sarama] client/metadata fetching metadata for [[syk-test-binlog-to-kafka] 10.99.110.10:9092] from broker %!s(MISSING)\n"][2022/08/19 14:05:32.441 +08:00] [INFO] [async_producer.go:744] ["[sarama] producer/broker/2 starting up\n"][2022/08/19 14:05:32.441 +08:00] [INFO] [async_producer.go:760] ["[sarama] producer/broker/2 state change to [open] on syk-test-binlog-to-kafka/0\n"][2022/08/19 14:05:32.441 +08:00] [INFO] [async_producer.go:594] ["[sarama] producer/leader/syk-test-binlog-to-kafka/0 selected broker 2\n"][2022/08/19 14:05:32.441 +08:00] [INFO] [async_producer.go:627] ["[sarama] producer/leader/syk-test-binlog-to-kafka/0 state change to [flushing-1]\n"][2022/08/19 14:05:32.441 +08:00] [INFO] [async_producer.go:649] ["[sarama] producer/leader/syk-test-binlog-to-kafka/0 state change to [normal]\n"][2022/08/19 14:05:32.454 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435392119781720075] [version=130824]
复制代码


kafka 中查询



小结:日志与 kafka 内容中都体现的是 truncate 语句,不会产生大的 binlog,也不会引起 kafka server: Message was too large。

7,上游执行 drop

drop table syk_test;
复制代码


drainer 日志


[2022/08/19 14:10:03.681 +08:00] [INFO] [collector.go:285] ["start query job"] [id=105941] [binlog="tp:Commit start_ts:435392191333924865 commit_ts:435392191347032067 prewrite_key:\"mDB:5840\\000\\376\\000\\000\\000\\000\\000\\000\\000hTID:4603\\3777\\000\\000\\000\\000\\000\\000\\000\\370\" ddl_query:\"drop table syk_test\" ddl_job_id:105941 "][2022/08/19 14:10:03.685 +08:00] [INFO] [collector.go:307] ["get ddl job"] [job="ID:105941, Type:drop table, State:synced, SchemaState:queueing, SchemaID:5840, TableID:46037, RowCount:0, ArgLen:0, start time: 2022-08-19 14:10:01.606 +0800 CST, Err:<nil>, ErrCount:0, SnapshotVersion:0"][2022/08/19 14:10:03.685 +08:00] [INFO] [syncer.go:518] ["add ddl item to syncer, you can add this commit ts to `ignore-txn-commit-ts` to skip this ddl if needed"] [sql="drop table syk_test"] ["commit ts"=435392191347032067] [shouldSkip=false][2022/08/19 14:10:03.700 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435392191347032067] [version=130827][2022/08/19 14:10:09.786 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435392192985169921] [version=130827]
复制代码


kafka 中查询



小结:日志与 kafka 内容中都体现的是 drop 语句,不会产生大的 binlog,也不会引起 kafka server: Message was too large。


根据上面测试可以得出结论:


1,只有 dml 才可能造成大的 binlog。


2,大事务可能引起 kafka server: Message was too large,此时可以调整下游 kafka 中参数(参考https://docs.pingcap.com/zh/tidb/v4.0/handle-tidb-binlog-errors#drainer-%E5%90%8C%E6%AD%A5%E6%95%B0%E6%8D%AE%E5%88%B0-kafka-%E6%97%B6%E6%8A%A5%E9%94%99-kafka-server-message-was-too-large-server-rejected-it-to-avoid-allocation-error),但更应该考虑上游业务,是否可以把大事务拆成多个小事务。


3,大 binlog 会引起下面错误(我们反复测试后发现 binlog 在 500M-1G 时会引起)。


[2022/08/18 18:33:27.895 +08:00] [ERROR] [syncer.go:533] ["Failed to close syncer"] [error="fail to push msg to kafka after 30s, check if kafka is up and working"] [errorVerbose="fail to push msg to kafka after 30s, check if kafka is up and working\ngithub.com/pingcap/tidb-binlog/drainer/sync.(*KafkaSyncer).run\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/sync/kafka.go:236\nruntime.goexit\n\t/usr/local/go/src/runtime/asm_amd64.s:1371"][2022/08/18 18:33:27.895 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373669469650948] [version=202816][2022/08/18 18:33:31.311 +08:00] [INFO] [syncer.go:278] ["write save point"] [ts=435373669469650950] [version=202816][2022/08/18 18:33:37.896 +08:00] [INFO] [server.go:465] ["begin to close drainer server"][2022/08/18 18:33:37.896 +08:00] [ERROR] [util.go:69] ["Recovered from panic"] [err="\"Waiting too long for `Syncer.run` to quit.\""] ["real stack"="github.com/pingcap/tidb-binlog/drainer.(*taskGroup).start.func1.1\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/util.go:71\nruntime.gopanic\n\t/usr/local/go/src/runtime/panic.go:965\ngithub.com/pingcap/tidb-binlog/drainer.(*Syncer).run\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/syncer.go:539\ngithub.com/pingcap/tidb-binlog/drainer.(*Syncer).Start\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/syncer.go:151\ngithub.com/pingcap/tidb-binlog/drainer.(*Server).Start.func4\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/server.go:290\ngithub.com/pingcap/tidb-binlog/drainer.(*taskGroup).start.func1\n\t/home/jenkins/agent/workspace/build-common/go/src/github.com/pingcap/tidb-binlog/drainer/util.go:79"] [name=syncer][2022/08/18 18:33:37.896 +08:00] [INFO] [util.go:76] [Exit] [name=syncer][2022/08/18 18:33:37.927 +08:00] [INFO] [server.go:430] ["has already update status"] [id=10.xxx.xxx.59:8249][2022/08/18 18:33:37.927 +08:00] [INFO] [server.go:469] ["commit status done"][2022/08/18 18:33:37.927 +08:00] [INFO] [collector.go:136] ["publishBinlogs quit"][2022/08/18 18:33:37.927 +08:00] [INFO] [util.go:76] [Exit] [name=heartbeat][2022/08/18 18:33:37.927 +08:00] [INFO] [pump.go:77] ["pump is closing"] [id=10.xxx.xxx.xxx:8250][2022/08/18 18:33:37.927 +08:00] [INFO] [pump.go:77] ["pump is closing"] [id=10.xxx.xxx.xxx:8250][2022/08/18 18:33:37.927 +08:00] [INFO] [util.go:76] [Exit] [name=collect][2022/08/18 18:33:37.927 +08:00] [INFO] [main.go:73] ["drainer exit"]
复制代码


#end


发布于: 刚刚阅读数: 4
用户头像

TiDB 社区官网:https://tidb.net/ 2021.12.15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
上游sql通过drainer同步到kafka时在kafka中是什么样子的_实践案例_TiDB 社区干货传送门_InfoQ写作社区