
sync_diff_inspector 表结构比较功能探索

  • 2024-07-26
作者: yangzhj 原文来源:https://tidb.net/blog/91ad0e37



上述链接是 TiDB 官方校验工具 sync_diff_inspector 的解释,其中功能的第一条就是:对比表结构和数据。

本人最近经历的一个迁移项目在使用该工具过程中,对迁移上下游 库表进行校验时,发现某张表上游无主键,下游有主键,其他结构信息相同时,sync_diff_inspector 工具显示校验无差异。基于上述情况,本人准备翻阅工具的代码,对结构校验的功能进行探索。


01 获取代码

首先该项目采用的是 sync_diff_inspector 6.5.7 版本,因此需要从 github 获取对应版本的源码包:https://github.com/pingcap/tidb-tools/releases/tag/v6.5.7 。然后使用 go ide 工具打开分析。

02 代码分析

对于 go 代码的分析,一般从 main 函数作为程序运行入口。我们 从 main 函数代码开始,逐步分析结构校验的代码逻辑:

第一步:main.go 文件 ,程序主入口

package main
import (//略)
func main() { //初始化和配置文件检查加载相关代码,略 if !checkSyncState(ctx, cfg) { //执行sync_diff 校验函数 log.Warn("check failed!!!") os.Exit(1) } log.Info("check pass!!!")}
func checkSyncState(ctx context.Context, cfg *config.Config) bool {//sync_diff 校验函数定义 beginTime := time.Now() defer func() { log.Info("check data finished", zap.Duration("cost", time.Since(beginTime))) }()
d, err := NewDiff(ctx, cfg) //创建一个校验示例 if err != nil { fmt.Printf("There is something error when initialize diff, please check log info in %s\n", filepath.Join(cfg.Task.OutputDir, config.LogFileName)) log.Fatal("failed to initialize diff process", zap.Error(err)) return false } defer d.Close()
if !cfg.CheckDataOnly { //判断是否只校验数据,如果只校验数据为否,则会执行下面 检查结构 的代码。 err = d.StructEqual(ctx)//执行结构校验函数 if err != nil { fmt.Printf("There is something error when compare structure of table, please check log info in %s\n", filepath.Join(cfg.Task.OutputDir, config.LogFileName)) log.Fatal("failed to check structure difference", zap.Error(err)) return false } } else { log.Info("Check table data only, skip struct check") } if !cfg.CheckStructOnly { err = d.Equal(ctx) if err != nil { fmt.Printf("There is something error when compare data of table, please check log info in %s\n", filepath.Join(cfg.Task.OutputDir, config.LogFileName)) log.Fatal("failed to check data difference", zap.Error(err)) return false } } else { log.Info("Check table struct only, skip data check") } return d.PrintSummary(ctx)}


第二步:diff.go 文件,校验准备工作,循环校验每张表,获取结构校验所需的表信息。

func (df *Diff) StructEqual(ctx context.Context) error {//结构校验函数定义  tables := df.downstream.GetTables()  tableIndex := 0  if df.startRange != nil {    tableIndex = df.startRange.ChunkRange.Index.TableIndex  }  for ; tableIndex < len(tables); tableIndex++ {    isEqual, isSkip, isAllTableExist := false, true, tables[tableIndex].TableLack    if common.AllTableExist(isAllTableExist) {      var err error      isEqual, isSkip, err = df.compareStruct(ctx, tableIndex) //循环调用比较结构的函数      if err != nil {        return errors.Trace(err)      }    }    progress.RegisterTable(dbutil.TableName(tables[tableIndex].Schema, tables[tableIndex].Table), !isEqual, isSkip, isAllTableExist)    df.report.SetTableStructCheckResult(tables[tableIndex].Schema, tables[tableIndex].Table, isEqual, isSkip, isAllTableExist)  }  return nil}
func (df *Diff) compareStruct(ctx context.Context, tableIndex int) (isEqual bool, isSkip bool, err error) {//比较结构函数的定义 sourceTableInfos, err := df.upstream.GetSourceStructInfo(ctx, tableIndex) if err != nil { return false, true, errors.Trace(err) } table := df.downstream.GetTables()[tableIndex] isEqual, isSkip = utils.CompareStruct(sourceTableInfos, table.Info) //调用 utils包的结构校验函数,参数是上下游的表信息 table.IgnoreDataCheck = isSkip return isEqual, isSkip, nil}

第三步骤:util.go 文件 做结构校验逻辑的函数。

// CompareStruct compare tables' columns and indices from upstream and downstream.// There are 2 return values:////  isEqual  : result of comparing tables' columns and indices//  isPanic  : the differences of tables' struct can not be ignored. Need to skip data comparing.func CompareStruct(upstreamTableInfos []*model.TableInfo, downstreamTableInfo *model.TableInfo) (isEqual bool, isPanic bool) {  // compare columns  for _, upstreamTableInfo := range upstreamTableInfos {    if len(upstreamTableInfo.Columns) != len(downstreamTableInfo.Columns) {      // the numbers of each columns are different, don't compare data      log.Error("column num not equal",//校验列数量不一致        zap.String("upstream table", upstreamTableInfo.Name.O),        zap.Int("column num", len(upstreamTableInfo.Columns)),        zap.String("downstream table", downstreamTableInfo.Name.O),        zap.Int("column num", len(downstreamTableInfo.Columns)),      )      return false, true    }
for i, column := range upstreamTableInfo.Columns { if column.Name.O != downstreamTableInfo.Columns[i].Name.O { // names are different, panic! log.Error("column name not equal",//校验列名称不一致 zap.String("upstream table", upstreamTableInfo.Name.O), zap.String("column name", column.Name.O), zap.String("downstream table", downstreamTableInfo.Name.O), zap.String("column name", downstreamTableInfo.Columns[i].Name.O), ) return false, true }
if !isCompatible(column.GetType(), downstreamTableInfo.Columns[i].GetType()) { // column types are different, panic! log.Error("column type not compatible",//校验列类型不一致 zap.String("upstream table", upstreamTableInfo.Name.O), zap.String("column name", column.Name.O), zap.Uint8("column type", column.GetType()), zap.String("downstream table", downstreamTableInfo.Name.O), zap.String("column name", downstreamTableInfo.Columns[i].Name.O), zap.Uint8("column type", downstreamTableInfo.Columns[i].GetType()), ) return false, true }
if !sameProperties(column, downstreamTableInfo.Columns[i]) { //调用列属性校验函数 // column properties are different, panic! log.Error("column properties not compatible",//校验列属性不一致 zap.String("upstream table", upstreamTableInfo.Name.O), zap.String("column name", column.Name.O), zap.Uint8("column type", column.GetType()), zap.String("downstream table", downstreamTableInfo.Name.O), zap.String("column name", downstreamTableInfo.Columns[i].Name.O), zap.Uint8("column type", downstreamTableInfo.Columns[i].GetType()), ) return false, true } } }
// compare indices 校验索引的逻辑 deleteIndicesSet := make(map[string]struct{}) unilateralIndicesSet := make(map[string]struct{}) downstreamIndicesMap := make(map[string]*struct { index *model.IndexInfo cnt int }) for _, index := range downstreamTableInfo.Indices { downstreamIndicesMap[index.Name.O] = &struct { index *model.IndexInfo cnt int }{index, 0} } for _, upstreamTableInfo := range upstreamTableInfos {
NextIndex: for _, upstreamIndex := range upstreamTableInfo.Indices { if _, ok := deleteIndicesSet[upstreamIndex.Name.O]; ok { continue NextIndex }
indexU, ok := downstreamIndicesMap[upstreamIndex.Name.O] if ok { if len(indexU.index.Columns) != len(upstreamIndex.Columns) { // different index, should be removed deleteIndicesSet[upstreamIndex.Name.O] = struct{}{} continue NextIndex }
for i, indexColumn := range upstreamIndex.Columns { if indexColumn.Offset != indexU.index.Columns[i].Offset || indexColumn.Name.O != indexU.index.Columns[i].Name.O { // different index, should be removed deleteIndicesSet[upstreamIndex.Name.O] = struct{}{} continue NextIndex } } indexU.cnt = indexU.cnt + 1 } else { unilateralIndicesSet[upstreamIndex.Name.O] = struct{}{} } } }
existBilateralIndex := false for _, indexU := range downstreamIndicesMap { if _, ok := deleteIndicesSet[indexU.index.Name.O]; ok { continue } if indexU.cnt < len(upstreamTableInfos) { // Some upstreamInfos don't have this index. unilateralIndicesSet[indexU.index.Name.O] = struct{}{} } else { // there is an index the whole tables have, // so unilateral indices can be deleted. existBilateralIndex = true } }
// delete indices // If there exist bilateral index, unilateral indices can be deleted. if existBilateralIndex { for indexName := range unilateralIndicesSet { deleteIndicesSet[indexName] = struct{}{} } } else { log.Warn("no index exists in both upstream and downstream", zap.String("table", downstreamTableInfo.Name.O)) } if len(deleteIndicesSet) > 0 { newDownstreamIndices := make([]*model.IndexInfo, 0, len(downstreamTableInfo.Indices)) for _, index := range downstreamTableInfo.Indices { if _, ok := deleteIndicesSet[index.Name.O]; !ok { newDownstreamIndices = append(newDownstreamIndices, index) } else { log.Debug("delete downstream index", zap.String("name", downstreamTableInfo.Name.O), zap.String("index", index.Name.O)) } } downstreamTableInfo.Indices = newDownstreamIndices
for _, upstreamTableInfo := range upstreamTableInfos { newUpstreamIndices := make([]*model.IndexInfo, 0, len(upstreamTableInfo.Indices)) for _, index := range upstreamTableInfo.Indices { if _, ok := deleteIndicesSet[index.Name.O]; !ok { newUpstreamIndices = append(newUpstreamIndices, index) } else { log.Debug("delete upstream index", zap.String("name", upstreamTableInfo.Name.O), zap.String("index", index.Name.O)) } } upstreamTableInfo.Indices = newUpstreamIndices }
return len(deleteIndicesSet) == 0, false //当两边索引不一致时,返回否}
func sameProperties(c1, c2 *model.ColumnInfo) bool {//列属性校验函数定义, switch c1.GetType() { case mysql.TypeVarString, mysql.TypeString, mysql.TypeVarchar: if c1.FieldType.GetCharset() != c2.FieldType.GetCharset() { log.Warn("Ignoring character set differences", zap.String("column name", c1.Name.O), zap.String("charset source", c1.FieldType.GetCharset()), zap.String("charset target", c2.FieldType.GetCharset()), ) } if c1.FieldType.GetCollate() != c2.FieldType.GetCollate() { log.Warn("Ignoring collation differences", zap.String("column name", c1.Name.O), zap.String("collation source", c1.FieldType.GetCollate()), zap.String("collation target", c2.FieldType.GetCollate()), ) } return c1.FieldType.GetFlen() == c2.FieldType.GetFlen() //判断上下游列的长度定义是否相同 default: return true }}

03 功能总结

根据对上述结构校验代码的分析,我们可以总结出 v6.5.7 sync_diff_inspector 工具可以校验 的表结构项目包括如下几项:

  1. 列的数量

  2. 列的名称

  3. 列的类型

  4. 列的长度定义

  5. 索引的差异

