写点什么

cdc 同步到 s3 的故障

  • 2022 年 7 月 11 日
  • 本文字数:3878 字

    阅读完需:约 13 分钟

作者: jiyf 原文来源:https://tidb.net/blog/2ab87c6a


【是否原创】是


【首发渠道】TiDB


【首发渠道链接】其他平台首发请附上对应链接


【目录】


【正文】

故障描述

测试环境中开启 cdc 备份到 s3 存储,出现下面问题:


  • cdc 同步点停留在 2021-07-26 11:48:20.649 +0800 CST

  • 因为 cdc 的原因集群 gc safe point 卡在 20210726-11:48:20 +0800


cdc 日志片段 1:


[2021/07/29 02:38:29.845 +08:00] [INFO] [utils.go:131] ["start Flush asynchronously to storage"] ["table id"=3310] [size=340386481] ["event count"=201962][2021/07/29 02:38:41.904 +08:00] [INFO] [s3.go:155] ["[FlushRowChangedEvents] complete file"] [tableID=3310][2021/07/29 02:38:41.976 +08:00] [INFO] [utils.go:131] ["start Flush asynchronously to storage"] ["table id"=3310] [size=2157261890] ["event count"=1280000]
复制代码


cdc 日志片段 二:


[2021/07/29 02:51:50.410 +08:00] [ERROR] [processor.go:1291] ["error on running processor"] [capture=10.95.249.200:8300] [changefeed=1622788388883] [processor=51a0e857-509d-499f-816f-e5293c765aee] [error="[CDC:ErrS3SinkStorageAPI]RequestCanceled: request context canceled\"ncaused by: context canceled"] [errorVerbose="[CDC:ErrS3SinkStorageAPI]RequestCanceled: request context canceled\"ncaused by: context canceled\"ngithub.com/pingcap/errors.AddStack\"n\"tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/errors.go:174\"ngithub.com/pingcap/errors.(*Error).GenWithStackByCause\"n\"tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/normalize.go:279\"ngithub.com/pingcap/ticdc/pkg/errors.WrapError\"n\"tgithub.com/pingcap/ticdc/pkg/errors/helper.go:28\"ngithub.com/pingcap/ticdc/cdc/sink/cdclog.(*tableBuffer).flush\"n\"tgithub.com/pingcap/ticdc/cdc/sink/cdclog/s3.go:145\"ngithub.com/pingcap/ticdc/cdc/sink/cdclog.(*logSink).startFlush.func2\"n\"tgithub.com/pingcap/ticdc/cdc/sink/cdclog/utils.go:136\"ngolang.org/x/sync/errgroup.(*Group).Go.func1\"n\"tgolang.org/x/sync@v0.0.0-20200625203802-6e8e738ad208/errgroup/errgroup.go:57\"nruntime.goexit\"n\"truntime/asm_amd64.s:1373"][2021/07/29 02:51:50.410 +08:00] [WARN] [processor.go:1310] ["upload processor error failed"] [changefeed=1622788388883] [error="[CDC:ErrPDEtcdAPIError]context canceled"] [errorVerbose="[CDC:ErrPDEtcdAPIError]context canceled\"ngithub.com/pingcap/errors.AddStack\"n\"tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/errors.go:174\"ngithub.com/pingcap/errors.(*Error).GenWithStackByCause\"n\"tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/normalize.go:279\"ngithub.com/pingcap/ticdc/pkg/errors.WrapError\"n\"tgithub.com/pingcap/ticdc/pkg/errors/helper.go:28\"ngithub.com/pingcap/ticdc/cdc/kv.CDCEtcdClient.PutTaskPositionOnChange\"n\"tgithub.com/pingcap/ticdc/cdc/kv/etcd.go:706\"ngithub.com/pingcap/ticdc/cdc.runProcessor.func1\"n\"tgithub.com/pingcap/ticdc/cdc/processor.go:1308\"nruntime.goexit\"n\"truntime/asm_amd64.s:1373"]
复制代码

集群当时执行的操作

这个集群目前还在做开发测试,规模已经接近十个 tikv 节点,整体吞吐量已经不小。


业务开发在 2021-07-26 11:48:20 对一个有 50G 大小的表进行主键更改:


  1. 导出表数据到文件

  2. drop 旧表

  3. 建新表,导入旧表数据

  4. 新表创建、开始导入数据时间恰好是 2021-07-26 11:48:04

故障分析

由于集群有一定的规模,导入数据的吞吐还是挺大的。


cdc 把 tikv 的变更同步到 s3 是对产生的 event 批量进行的,每次同步 flush 操作会有两条日志对应开始和结束:


[2021/07/29 02:38:29.845 +08:00] [INFO] [utils.go:131] ["start Flush asynchronously to storage"] ["table id"=3310] [size=340386481] ["event count"=201962][2021/07/29 02:38:41.904 +08:00] [INFO] [s3.go:155] ["[FlushRowChangedEvents] complete file"] [tableID=3310]
复制代码


但是在 cdc 日志片段 1 中,最后一个 start Flush 对应的 complete 一直没有找到。


[2021/07/29 02:38:41.976 +08:00] [INFO] [utils.go:131] ["start Flush asynchronously to storage"] ["table id"=3310] [size=2157261890] ["event count"=1280000]
复制代码


这条 flush 对应的 event 个数是 128 万个,数据大小 2157261890,为 2G+,所以会不会是这里同步因为数据量太大导致的问题。


在 cdc 日志片段 二 报错 “context canceled”,看报错位置为 s3.go:145,打开源码文件:


            if len(rowDatas) > 0 {                     ...
err := hashPart.uploader.UploadPart(ctx, rowDatas) if err != nil { return cerror.WrapError(cerror.ErrS3SinkStorageAPI, err) }
... }
复制代码


代码对应的地方就是往 s3 upload 数据的位置,所以基本可以判断是批量数据太大导致。

cdc 同步原理

  1. 从 tikv 中获取的 rowChangeEvent 经过排序后会插入 processor.output 这个 channel 中, chanel 大小 1280000;

  2. 另外 processor 每隔 1 秒,会发送一个 OpTypeResolved 类型的 event 到 processor.output

  3. processor 实时地从 processor.output 获取 event 插入到 dataCh 这个 channel 中,大小也为 1280000,若获取到的 event 为 **OpTypeResolved ** 类型,processor 将会把 dataCh 中的 event 刷新到 s3 上

  4. processor 同时定时 500 * time.Millisecond 去检查 dataCh 大小,如果满足 5M 也会刷新


上面 3 和 4 是 cdc 到 s3 的刷新操作,第 3 步是为了把 1s 内的数据变更进行 batch,第 4 步是为了应对变更太多时候,加快刷新;每次刷新的数据放进 dataCh,默认值 1280000 个 event


下面代码分别对应 3 和 4 中触发的 flush 操作。


  case needFlushedUnits := <-l.notifyChan:    ...    for _, u := range needFlushedUnits {      ...      eg.Go(func() error {        log.Info("start Flush asynchronously to storage by caller",          zap.Int64("table id", uReplica.TableID()),          zap.Int64("size", uReplica.Size().Load()),          zap.Int64("event count", uReplica.Events().Load()),        )        return uReplica.flush(ectx, l)      })    }    ...
case <-ticker.C: ... for _, u := range l.units { ... if u.shouldFlush() { eg.Go(func() error { log.Info("start Flush asynchronously to storage", zap.Int64("table id", uReplica.TableID()), zap.Int64("size", uReplica.Size().Load()), zap.Int64("event count", uReplica.Events().Load()), ) return uReplica.flush(ectx, l) }) } }
复制代码


对于第 4 步中,触发 flush 的条件,可以看出 dataCh 大于 5M 才会刷新。并没有考虑 dataCh 满,按道理 dataCh 满了那也要刷新,清空 dataCh 才对。


const  maxPartFlushSize    = 5 << 20   // The minimal multipart upload size is 5Mb.
func (tb *tableBuffer) shouldFlush() bool { return tb.sendSize.Load() > maxPartFlushSize}
复制代码

问题修复

尝试把 dataCh 改小,改为 20480,编译运行,发现问题修复了,修改后看同步情况:


[2021/07/29 21:32:01.133 +08:00] [INFO] [utils.go:131] ["start Flush asynchronously to storage"] ["table id"=3310] [size=34518876] ["event count"=20480][2021/07/29 21:32:02.325 +08:00] [INFO] [s3.go:155] ["[FlushRowChangedEvents] complete file"] [tableID=3310]
复制代码


可以看到每次 flush 20480 个 event,每次 flush 总大小 34518876 byte,大约 33MB,还是比较合适的。


这次故障的原因是源码中 dataCh 过大,而当时集群吞吐大、或者因为历史 tikv 变更的堆积,导致把 dataCh 塞满,一次 batch flush 时候,数据太大导致了 “context canceled” 问题。

完善修复

上面 cdc 原理中,processor.output 默认 128000,而上面修复 dataCh 为 20480,那么容易造成 processor.output 中 event 个数远远多于 dataCh。


而间隔 1s 时间 processor 触发刷新,写 OpTypeResolved 类型 event 到 processor.output, 然后从 processor.output 流入 dataCh 进行 bath 触发刷新操作。


如果 tikv 变更多在 1s 内产生大量的 event,就容易把 dataCh 塞满,而 processor 用于唤醒刷新的 OpTypeResolved 类型 event 还停留在 processor.output 中,阻塞在 dataCh 外面。


这时候只能靠第 4 步,定时的 shouldFlush 刷新了,但是 shouldFlush 函数是有漏洞的。并没有考虑 dataCh 满,如果 dataCh 又不满 5M,那就永远触发不了刷新了。


达到满 5M 条件,那么单条 event 要满足 5M / 20480 = 256byte,对于字段少、记录小的表,很容易达不到。


所以这里完善一下 shouldFlush 函数,如果 dataCh 已经填满,那么直接触发刷新到 s3。


func (tb *tableBuffer) shouldFlush() bool {  return tb.size() >= default_chan_size_20480 || tb.sendSize.Load() > maxPartFlushSize}
复制代码


发布于: 刚刚阅读数: 2
用户头像

TiDB 社区官网:https://tidb.net/ 2021.12.15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
cdc 同步到 s3 的故障_迁移_TiDB 社区干货传送门_InfoQ写作社区