作者: jiyf 原文来源:https://tidb.net/blog/2ab87c6a
【是否原创】是
【首发渠道】TiDB
【首发渠道链接】其他平台首发请附上对应链接
【目录】
【正文】
故障描述
测试环境中开启 cdc 备份到 s3 存储,出现下面问题:
cdc 日志片段 1:
[2021/07/29 02:38:29.845 +08:00] [INFO] [utils.go:131] ["start Flush asynchronously to storage"] ["table id"=3310] [size=340386481] ["event count"=201962]
[2021/07/29 02:38:41.904 +08:00] [INFO] [s3.go:155] ["[FlushRowChangedEvents] complete file"] [tableID=3310]
[2021/07/29 02:38:41.976 +08:00] [INFO] [utils.go:131] ["start Flush asynchronously to storage"] ["table id"=3310] [size=2157261890] ["event count"=1280000]
复制代码
cdc 日志片段 二:
[2021/07/29 02:51:50.410 +08:00] [ERROR] [processor.go:1291] ["error on running processor"] [capture=10.95.249.200:8300] [changefeed=1622788388883] [processor=51a0e857-509d-499f-816f-e5293c765aee] [error="[CDC:ErrS3SinkStorageAPI]RequestCanceled: request context canceled\"ncaused by: context canceled"] [errorVerbose="[CDC:ErrS3SinkStorageAPI]RequestCanceled: request context canceled\"ncaused by: context canceled\"ngithub.com/pingcap/errors.AddStack\"n\"tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/errors.go:174\"ngithub.com/pingcap/errors.(*Error).GenWithStackByCause\"n\"tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/normalize.go:279\"ngithub.com/pingcap/ticdc/pkg/errors.WrapError\"n\"tgithub.com/pingcap/ticdc/pkg/errors/helper.go:28\"ngithub.com/pingcap/ticdc/cdc/sink/cdclog.(*tableBuffer).flush\"n\"tgithub.com/pingcap/ticdc/cdc/sink/cdclog/s3.go:145\"ngithub.com/pingcap/ticdc/cdc/sink/cdclog.(*logSink).startFlush.func2\"n\"tgithub.com/pingcap/ticdc/cdc/sink/cdclog/utils.go:136\"ngolang.org/x/sync/errgroup.(*Group).Go.func1\"n\"tgolang.org/x/sync@v0.0.0-20200625203802-6e8e738ad208/errgroup/errgroup.go:57\"nruntime.goexit\"n\"truntime/asm_amd64.s:1373"]
[2021/07/29 02:51:50.410 +08:00] [WARN] [processor.go:1310] ["upload processor error failed"] [changefeed=1622788388883] [error="[CDC:ErrPDEtcdAPIError]context canceled"] [errorVerbose="[CDC:ErrPDEtcdAPIError]context canceled\"ngithub.com/pingcap/errors.AddStack\"n\"tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/errors.go:174\"ngithub.com/pingcap/errors.(*Error).GenWithStackByCause\"n\"tgithub.com/pingcap/errors@v0.11.5-0.20201126102027-b0a155152ca3/normalize.go:279\"ngithub.com/pingcap/ticdc/pkg/errors.WrapError\"n\"tgithub.com/pingcap/ticdc/pkg/errors/helper.go:28\"ngithub.com/pingcap/ticdc/cdc/kv.CDCEtcdClient.PutTaskPositionOnChange\"n\"tgithub.com/pingcap/ticdc/cdc/kv/etcd.go:706\"ngithub.com/pingcap/ticdc/cdc.runProcessor.func1\"n\"tgithub.com/pingcap/ticdc/cdc/processor.go:1308\"nruntime.goexit\"n\"truntime/asm_amd64.s:1373"]
复制代码
集群当时执行的操作
这个集群目前还在做开发测试,规模已经接近十个 tikv 节点,整体吞吐量已经不小。
业务开发在 2021-07-26 11:48:20 对一个有 50G 大小的表进行主键更改:
导出表数据到文件
drop 旧表
建新表,导入旧表数据
新表创建、开始导入数据时间恰好是 2021-07-26 11:48:04
故障分析
由于集群有一定的规模,导入数据的吞吐还是挺大的。
cdc 把 tikv 的变更同步到 s3 是对产生的 event 批量进行的,每次同步 flush 操作会有两条日志对应开始和结束:
[2021/07/29 02:38:29.845 +08:00] [INFO] [utils.go:131] ["start Flush asynchronously to storage"] ["table id"=3310] [size=340386481] ["event count"=201962]
[2021/07/29 02:38:41.904 +08:00] [INFO] [s3.go:155] ["[FlushRowChangedEvents] complete file"] [tableID=3310]
复制代码
但是在 cdc 日志片段 1 中,最后一个 start Flush 对应的 complete 一直没有找到。
[2021/07/29 02:38:41.976 +08:00] [INFO] [utils.go:131] ["start Flush asynchronously to storage"] ["table id"=3310] [size=2157261890] ["event count"=1280000]
复制代码
这条 flush 对应的 event 个数是 128 万个,数据大小 2157261890,为 2G+,所以会不会是这里同步因为数据量太大导致的问题。
在 cdc 日志片段 二 报错 “context canceled”,看报错位置为 s3.go:145,打开源码文件:
if len(rowDatas) > 0 {
...
err := hashPart.uploader.UploadPart(ctx, rowDatas)
if err != nil {
return cerror.WrapError(cerror.ErrS3SinkStorageAPI, err)
}
...
}
复制代码
代码对应的地方就是往 s3 upload 数据的位置,所以基本可以判断是批量数据太大导致。
cdc 同步原理
从 tikv 中获取的 rowChangeEvent 经过排序后会插入 processor.output 这个 channel 中, chanel 大小 1280000;
另外 processor 每隔 1 秒,会发送一个 OpTypeResolved 类型的 event 到 processor.output
processor 实时地从 processor.output 获取 event 插入到 dataCh 这个 channel 中,大小也为 1280000,若获取到的 event 为 **OpTypeResolved ** 类型,processor 将会把 dataCh 中的 event 刷新到 s3 上
processor 同时定时 500 * time.Millisecond 去检查 dataCh 大小,如果满足 5M 也会刷新
上面 3 和 4 是 cdc 到 s3 的刷新操作,第 3 步是为了把 1s 内的数据变更进行 batch,第 4 步是为了应对变更太多时候,加快刷新;每次刷新的数据放进 dataCh,默认值 1280000 个 event。
下面代码分别对应 3 和 4 中触发的 flush 操作。
case needFlushedUnits := <-l.notifyChan:
...
for _, u := range needFlushedUnits {
...
eg.Go(func() error {
log.Info("start Flush asynchronously to storage by caller",
zap.Int64("table id", uReplica.TableID()),
zap.Int64("size", uReplica.Size().Load()),
zap.Int64("event count", uReplica.Events().Load()),
)
return uReplica.flush(ectx, l)
})
}
...
case <-ticker.C:
...
for _, u := range l.units {
...
if u.shouldFlush() {
eg.Go(func() error {
log.Info("start Flush asynchronously to storage",
zap.Int64("table id", uReplica.TableID()),
zap.Int64("size", uReplica.Size().Load()),
zap.Int64("event count", uReplica.Events().Load()),
)
return uReplica.flush(ectx, l)
})
}
}
复制代码
对于第 4 步中,触发 flush 的条件,可以看出 dataCh 大于 5M 才会刷新。并没有考虑 dataCh 满,按道理 dataCh 满了那也要刷新,清空 dataCh 才对。
const maxPartFlushSize = 5 << 20 // The minimal multipart upload size is 5Mb.
func (tb *tableBuffer) shouldFlush() bool {
return tb.sendSize.Load() > maxPartFlushSize
}
复制代码
问题修复
尝试把 dataCh 改小,改为 20480,编译运行,发现问题修复了,修改后看同步情况:
[2021/07/29 21:32:01.133 +08:00] [INFO] [utils.go:131] ["start Flush asynchronously to storage"] ["table id"=3310] [size=34518876] ["event count"=20480]
[2021/07/29 21:32:02.325 +08:00] [INFO] [s3.go:155] ["[FlushRowChangedEvents] complete file"] [tableID=3310]
复制代码
可以看到每次 flush 20480 个 event,每次 flush 总大小 34518876 byte,大约 33MB,还是比较合适的。
这次故障的原因是源码中 dataCh 过大,而当时集群吞吐大、或者因为历史 tikv 变更的堆积,导致把 dataCh 塞满,一次 batch flush 时候,数据太大导致了 “context canceled” 问题。
完善修复
上面 cdc 原理中,processor.output 默认 128000,而上面修复 dataCh 为 20480,那么容易造成 processor.output 中 event 个数远远多于 dataCh。
而间隔 1s 时间 processor 触发刷新,写 OpTypeResolved 类型 event 到 processor.output, 然后从 processor.output 流入 dataCh 进行 bath 触发刷新操作。
如果 tikv 变更多在 1s 内产生大量的 event,就容易把 dataCh 塞满,而 processor 用于唤醒刷新的 OpTypeResolved 类型 event 还停留在 processor.output 中,阻塞在 dataCh 外面。
这时候只能靠第 4 步,定时的 shouldFlush 刷新了,但是 shouldFlush 函数是有漏洞的。并没有考虑 dataCh 满,如果 dataCh 又不满 5M,那就永远触发不了刷新了。
达到满 5M 条件,那么单条 event 要满足 5M / 20480 = 256byte,对于字段少、记录小的表,很容易达不到。
所以这里完善一下 shouldFlush 函数,如果 dataCh 已经填满,那么直接触发刷新到 s3。
func (tb *tableBuffer) shouldFlush() bool {
return tb.size() >= default_chan_size_20480 || tb.sendSize.Load() > maxPartFlushSize
}
复制代码
评论