写点什么

tiup cluster display 执行流程代码详解

  • 2023-04-07
    北京
  • 本文字数:9753 字

    阅读完需:约 32 分钟

作者: Hacker_loCdZ5zu 原文来源:https://tidb.net/blog/54da9e58

一、前言

在运维 tidb 集群的时候,经常需要使用 tiup cluster display 去查看组件是否是运行状态,笔者之前一直认为是 tiup 可能是通过探测组件端口的方式去判断组件是否存活(类似 telnet ip port 的方式),但是笔者之前遇到过 1 个问题,当目标组件服务器的 firewalld 开启的时候,用 tiup cluster 获取该组件的状态时,发现该组件的状态为 Down, 但是该组件确实是正常运行的,用 telnet 去探测也是可以得到返回信息,这就让笔者感到好奇,tiup cluster 到底是通过什么样的方式去判断,一个组件是 Up 还是 Down 呢? 本着好奇的态度去翻阅了 tiup cluster 关于 display 部分的代码,本文较为详细的解读执行在执行 tiup cluster display 的时候, tiup 是怎么去判断 tidb 组件的状态

二、前期检查

display 的执行代码入口在 src/tiup-1.11.0/components/cluster/command/display.go 里面,在正式执行 display 的功能之前,会做一些前期检查和准备


1.exist, err := tidbSpec.Exist(dopt.ClusterName)去判断集群名称是否存在,判断集群名称是否存在的方法是通过判断对应的集群拓扑文件是否存在,例如我的集群名是tidb-test,那就去判断/home/tidb/.tiup/storage/cluster/clusters/tidb-test/meta.yaml是否存在,如果不存在,则判断集群名是不存在的2.获取集群的元数据信息,集群的元数据信息例如版本等信息赋值给变量metadatametadata, err := spec.ClusterMetadata(dopt.ClusterName)这段代码的主要逻辑,其实就是把集群拓扑文件的信息通过yaml.Unmarshal函数进行解析返回给metadata变量,例如集群拓扑yaml文件主要是分为了3个大属性(或者是叫做3个大类),user,tidb_version,topology,通过这段代码就得到集群的这些信息
3.如果指定了--version,只会打印集群的版本信息,通过 metadata.Version 变量就得到了集群的版本信息if showVersionOnly { fmt.Println(metadata.Version) return nil}3.如果指定了--dashboard参数 ,只会打印dashboard地址if showDashboardOnly { tlsCfg, err := metadata.Topology.TLSConfig(tidbSpec.Path(dopt.ClusterName, spec.TLSCertKeyDir)) if err != nil { return err } return cm.DisplayDashboardInfo(dopt.ClusterName, time.Second*time.Duration(gOpt.APITimeout), tlsCfg)}4.如果指定了--labels ,只会打印集群的label信息if showTiKVLabels { return cm.DisplayTiKVLabels(dopt, gOpt)}
5. 最后面才是执行查询集群状态信息的方法例如执行tiup cluster display tidb-testreturn cm.Display(dopt, gOpt)
复制代码

三、Display 执行流程

cm.Display 的代码在 src/tiup-1.11.0/pkg/cluster/manager/display.go 里面,而在这段代码中获取集群组件状态信息是通过 GetClusterTopology 方法去获得的 (src/tiup-1.11.0/pkg/cluster/manager/display.go)


clusterInstInfos, err := m.GetClusterTopology(dopt, opt)
复制代码


GetClusterTopology 方法的执行流程


1.获取ClusterMeta 结构体指针metadata, err := m.meta(name)通过m.meta 方法,metadata变量本质是一个*spec.ClusterMeta ,是ClusterMeta 这个结构体的对应的指针type ClusterMeta struct {  User    string `yaml:"user"`         // the user to run and manage cluster on remote  Version string `yaml:"tidb_version"` // the version of TiDB cluster  // EnableFirewall bool   `yaml:"firewall"`  OpsVer string `yaml:"last_ops_ver,omitempty"` // the version of ourself that updated the meta last time
Topology *Specification `yaml:"topology"`}
ClusterMeta 结构体所定义的字段与集群拓扑yaml文件的格式是相对应的,比如集群拓扑yaml文件主要是分为了3个大属性(或者是叫做3个大类),user,tidb_version,topology,其它的配置项都是从这3个大的属性下面去获得的user: tidbtidb_version: v6.1.2topology: global: user: tidb ssh_port: 22 ssh_type: builtin deploy_dir: /tidb-deploy data_dir: /tidb-data os: linux .........

ClusterMeta 这个结构体有很多方法,例如在src/tiup-1.11.0/pkg/cluster/spec/util.go 下面定义了这个结构体的方法func (m *ClusterMeta) GetTopology() Topology { return m.Topology}
// SetTopology implement Metadata interface.func (m *ClusterMeta) SetTopology(topo Topology) { tidbTopo, ok := topo.(*Specification) //tidbTopo, ok := topo.S if !ok { panic(fmt.Sprintln("wrong type: ", reflect.TypeOf(topo))) }
m.Topology = tidbTopo}
// GetBaseMeta implements Metadata interface.func (m *ClusterMeta) GetBaseMeta() *BaseMeta { return &BaseMeta{ Version: m.Version, User: m.User, OpsVer: &m.OpsVer, }}在m.meta 方法里面继而又调用了 m.specManager.Metadata这个方法,m.specManager.Metadata 里面的主要逻辑是调用yaml.Unmarshal 函数去解析集群的拓扑文件,得到集群的拓扑文件信息,这一点是非常重要的
2.接下来通过metadata 变量,就可以调用结构体ClusterMeta相应的方法,从而具体得到集群拓扑文件相应的信息topo := metadata.GetTopology() 通过调用ClusterMeta结构体的GetTopology方法,这个方法的返回值正好是ClusterMeta结构体里面的Topology字段,这个字段对应的就是集群拓扑文件里面的topology属性信息,当然ClusterMeta结构体里面的Topology字段是一个嵌套结构体里面嵌套了Specification 这个结构体, Specification 这个结构体 对应的就是集群拓扑文件里面的topology属性下面的配置项
同理base := metadata.GetBaseMeta()通过调用ClusterMeta结构体的GetBaseMeta方法,这个方法通过构造函数的方式,返回的是集群拓扑文件里面的user,tidb_version和last_ops_ver,omitempty 这些信息(不过笔者对last_ops_ver,omitempty 这两项配置信息不是很了解,就不去深究了)其实,代码写到了这里,我们大抵明白了整个display的逻辑,首先通过yaml.Unmarshal去解析整个集群的拓扑文件,从而得到集群拓扑文件里面的信息
statusTimeout := time.Duration(opt.APITimeout) * time.Second这里定义了获取组件状态的超时时间,默认是10s,通过 --status-timeout 进行传递在src/tiup-1.11.0/components/cluster/command/display.go里面的cmd.Flags().Uint64Var(&statusTimeout, "status-timeout", 40, "Timeout in seconds when getting node status")这行代码通过传递进来的参数对这个变量赋值
3.获取pd地址masterList := topo.BaseTopo().MasterList通过topo.BaseTopo() 这个方法通过构造函数将BaseTopo结构体进行赋值,然后通过BaseTopo结构体的MasterList字段获取到了整个集群pd 的地址,pd的地址以切片的形式返回
4.获取需要查看具体组件或者具体节点的信息filterRoles := set.NewStringSet(opt.Roles...)filterNodes := set.NewStringSet(opt.Nodes...)如果这个时候diplay 的时候有-R或者-N指定了需要判断状态的具体组件或者是节点的ip:port,那么上述代码就会将组件或者节点信息存储到filterRoles或者是filterNodes中tlsCfg, err := topo.TLSConfig(m.specManager.Path(name, spec.TLSCertKeyDir))获取tsl的信息,由于我们一般不使用tls进行通讯,这里不做这个讨论
5.查询pd组件状态下面的这段代码,通过topo.IterInstance 方法通过函数回调获得了pd组件的状态这段代码的核心逻辑如下1.通过ComponentsByStartOrder(src/tiup-1.11.0/pkg/cluster/spec/spec.go) 方法获得了将tidb目前所有的组件的信息放到了1个切片返回,这个切片的类型其实是1个Component接口类型(src/tiup-1.11.0/pkg/cluster/spec/instance.go),也就是说这个切片里面的元素可以调用这个接口所定义的函数2.构建循环,去轮询每个组件,每个组件的Instances()方法返回的也是1个接口类型的切片,最终调用接口里面的ComponentName方法去判断是否是pd组件3.查询pd组件的状态,查询pd组件的状态最终是通过每个组件的Status方法去完成的,具体的逻辑,会在文章的最后面说明当然这段代码还涉及到并发线程去查询组件的信息,并发线程数是 --concurrency 参数指定的var mu sync.Mutex topo.IterInstance(func(ins spec.Instance) { if ins.ComponentName() != spec.ComponentPD && ins.ComponentName() != spec.ComponentDMMaster { return } status := ins.Status(ctx, statusTimeout, tlsCfg, masterList...) mu.Lock() if strings.HasPrefix(status, "Up") || strings.HasPrefix(status, "Healthy") { instAddr := fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort()) masterActive = append(masterActive, instAddr) } masterStatus[ins.ID()] = status fmt.Println("masterStatus", masterStatus) mu.Unlock() }, opt.Concurrency)6.获取除pd组件以外的其它组件的状态下面的这段代码,是整个display 过程中去真正查询组件状态信息的代码整体的执行逻辑如下1.如果指定-R或者-N参数去判断某个组件或者是某个实例的地址去查询状态的时候,如果这个组件或者实例的地址不存在的时候就会直接报错退出执行2.如果是pd组件,不需要再次查询pd的状态了,直接就可以得到pd的状态了,如果pd的地址是dashboard的地址,status变量会加上"|UI"字符串3.查询除pd以外组件的其它组件的状态topo.IterInstance(func(ins spec.Instance) { // apply role filter if len(filterRoles) > 0 && !filterRoles.Exist(ins.Role()) { fmt.Println("role not exists", ins.Role()) return } // apply node filter if len(filterNodes) > 0 && !filterNodes.Exist(ins.ID()) { fmt.Println("node not exists", ins.ID()) return }
dataDir := "-" insDirs := ins.UsedDirs() deployDir := insDirs[0] if len(insDirs) > 1 { dataDir = insDirs[1] }
var status, memory string switch ins.ComponentName() { case spec.ComponentPD: status = masterStatus[ins.ID()] instAddr := fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort()) if dashboardAddr == instAddr { status += "|UI" } case spec.ComponentDMMaster: status = masterStatus[ins.ID()] default: status = ins.Status(ctx, statusTimeout, tlsCfg, masterActive...) }......
查询组件的状态,是通过status = ins.Status(ctx, statusTimeout, tlsCfg, masterActive...) 这行代码完成的
复制代码

四、查询组件状态的总体执行流程

查询组件的状态的整体逻辑为:之前在上段代码中提过,去循环每个组件定义Instances()方法,得到Instance的接口,从而可以调用Instance的接口里面的所定义的Status方法去获得各个组件的运行状态Instance的接口的定义(src/tiup-1.11.0/pkg/cluster/spec/instance.go)type Instance interface {  InstanceSpec  ID() string  Ready(context.Context, ctxt.Executor, uint64, *tls.Config) error  InitConfig(ctx context.Context, e ctxt.Executor, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error  ScaleConfig(ctx context.Context, e ctxt.Executor, topo Topology, clusterName string, clusterVersion string, deployUser string, paths meta.DirPaths) error  PrepareStart(ctx context.Context, tlsCfg *tls.Config) error  ComponentName() string  InstanceName() string  ServiceName() string  ResourceControl() meta.ResourceControl  GetHost() string  GetPort() int  GetSSHPort() int  DeployDir() string  UsedPorts() []int  UsedDirs() []string  Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string  Uptime(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config) time.Duration  DataDir() string  LogDir() string  OS() string // only linux supported now  Arch() string  IsPatched() bool  SetPatched(bool)  setTLSConfig(ctx context.Context, enableTLS bool, configs map[string]interface{}, paths meta.DirPaths) (map[string]interface{}, error)}

func (i *BaseInstance) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, pdList ...string) string { return i.StatusFn(ctx, timeout, tlsCfg, pdList...)}
复制代码

五、查询 drainer 等组件状态的执行流程

在目前现有 tiup 代码中,各个组件状态的查询其实是有稍许差别的,tidb、drainer、prometheus 等组件的状态查询的代码逻辑其实和 pd、tikv 组件的代码逻辑是不一样的,接下来就以查询 drainer 组件状态为例,去观察查询 drainer 等组件状态的大体逻辑是怎么样的,drainer、tidb、prometheus 等组件的状态的查询是通过 Status 方法调用了 statusByHost 函数 (src/tiup-1.11.0/pkg/cluster/spec/drainer.go) 去确定组件的状态是 up 还是 down


func statusByHost(host string, port int, path string, timeout time.Duration, tlsCfg *tls.Config) string {  if timeout < time.Second {    timeout = statusQueryTimeout  }
client := utils.NewHTTPClient(timeout, tlsCfg) //是否对集群启用 TLS。启用之后,组件之间、客户端与组件之间都必须使用生成的 TLS 证书进行连接,默认值:false scheme := "http" if tlsCfg != nil { scheme = "https" } if path == "" { path = "/" } url := fmt.Sprintf("%s://%s:%d%s", scheme, host, port, path) instance_address := fmt.Sprintf("%s:%d", host, port) // body doesn't have any status section needed body, err := client.Get(context.TODO(), url) if err != nil || body == nil { return "Down" } return "Up"}
复制代码


我把 tiup 关于 statusByHost 函数 的执行代码单独拿了出来,放到了下面的代码中,利用这部分代码可以直接获取某个实例的运行状态为 up 还是 down,其实 statusByHost 函数 总体执行的逻辑其实是在得到相关实例的 url(api)(比如http://172.16.1.1:8249/status)后去调用 http 的相关模块去请求这个 url,然后然后通过请求的 url 的返回内容去判断实例是 up 还是 down 状态


package main
import ( "context" "crypto/tls" "fmt"
"io" "net" "net/http" "net/url" "os" "time")
type HTTPClient struct { client *http.Client header http.Header}
// NewHTTPClient returns a new HTTP client with timeout and HTTPS supportfunc NewHTTPClient(timeout time.Duration, tlsConfig *tls.Config) *HTTPClient { if timeout < time.Second { timeout = 10 * time.Second // default timeout is 10s } tr := &http.Transport{ TLSClientConfig: tlsConfig, Dial: (&net.Dialer{Timeout: 3 * time.Second}).Dial, } // prefer to use the inner http proxy httpProxy := os.Getenv("TIUP_INNER_HTTP_PROXY") if len(httpProxy) == 0 { httpProxy = os.Getenv("HTTP_PROXY") } if len(httpProxy) > 0 { if proxyURL, err := url.Parse(httpProxy); err == nil { tr.Proxy = http.ProxyURL(proxyURL) } } return &HTTPClient{ client: &http.Client{ Timeout: timeout, Transport: tr, }, }}
// GetWithStatusCode fetch a URL with GET method and returns the response, also the status code.func (c *HTTPClient) GetWithStatusCode(ctx context.Context, url string) ([]byte, int, error) { var statusCode int req, err := http.NewRequest("GET", url, nil) ////发送GET请求 ////url:请求地址 //req 返回请求url的返回内容,是1个结构体类型 if err != nil { return nil, statusCode, err }
req.Header = c.header
if ctx != nil { req = req.WithContext(ctx) } res, err := c.client.Do(req)
if err != nil { fmt.Printf("url:%v, statusCode is:%v,err is:%v \n", url, statusCode, err)
return nil, statusCode, err //如果实例状态获取异常,比如实例关闭,那么就会将错误和statusCode 返回 } defer res.Body.Close() fmt.Println("the res.StatusCode is", res.StatusCode) //the res.StatusCode is 200 data, err := checkHTTPResponse(res)
return data, res.StatusCode, err //如果实例状态正常,res.StatusCode 为200}
// checkHTTPResponse checks if an HTTP response is with normal status codesfunc checkHTTPResponse(res *http.Response) ([]byte, error) { body, err := io.ReadAll(res.Body)
if err != nil { return nil, err }
if res.StatusCode < 200 || res.StatusCode >= 400 { return body, fmt.Errorf("error requesting %s, response: %s, code %d", res.Request.URL, string(body), res.StatusCode) }
return body, nil}
func (c *HTTPClient) Get(ctx context.Context, url string) ([]byte, error) { data, _, err := c.GetWithStatusCode(ctx, url) return data, err}
func GetInstanceStatus(url string) string { client := NewHTTPClient(10, nil) //通过构造函数NewHTTPClient 将结构体HTTPClient 赋值给变量的client body, err := client.Get(context.TODO(), url) if err != nil || body == nil { return "Down" } return "Up"}
//0 通过构造函数NewHTTPClient 将结构体HTTPClient 赋值给变量的client//1.通过client.get 函数(传递instace的http url 地址 )查看instance 的状态//2.get 函数调用GetWithStatusCode函数 查看url 地址的状态//3.GetWithStatusCode 函数又调用checkHTTPResponse 函数,//总结 如果GetWithStatusCode 返回错误信息或者http 请求返回的内容为空,则判断实例状态为Down
func main() { url := "http://172.16.1.1:8249/status" status := GetInstanceStatus(url) //GetInstanceStatus(url) fmt.Println(status)
}
复制代码


statusByHost 函数的大概执行过程,所以去判断 1 个组件的状态是 Up 还是 Down 其实不是通过探测端口的方式,而是通过组件自身暴露出来的 url(或者叫 api),然后通过 http 去访问这个 url 的响应内容获得的,这一点之前是没有想到的。


//0 通过构造函数NewHTTPClient 将结构体HTTPClient 赋值给变量的client//1.通过client.get 函数(传递instace的http url 地址 )查看instance 的状态//2.get 函数调用GetWithStatusCode函数 查看url 地址的状态//3.GetWithStatusCode 函数又调用checkHTTPResponse 函数,//总结 如果GetWithStatusCode 返回错误或者http 请求返回的内容为空,则判断实例状态为Down
复制代码

六、查询 pd 组件状态的执行流程

查询 pd 组件状态的代码主要集中在 src/tiup-1.11.0/pkg/cluster/spec/pd.go 里面的 Status 方法


func (s *PDSpec) Status(ctx context.Context, timeout time.Duration, tlsCfg *tls.Config, _ ...string) string {  if timeout < time.Second {    timeout = statusQueryTimeout  }  addr := fmt.Sprintf("%s:%d", s.Host, s.ClientPort)  pc := api.NewPDClient(ctx, []string{addr}, timeout, tlsCfg)    // check health  err := pc.CheckHealth()  if err != nil {    return "Down"  }  // find leader node  leader, err := pc.GetLeader()  if err != nil {    return "ERR"  }  res := "Up"  if s.Name == leader.Name {    res += "|L"  }  return res}
复制代码


这段代码的主要逻辑如下


1.生成1个pd状态的url(或者叫做api),例如http://172.168.1.3:2379/pd/ping2.执行CheckHealth()方法调用http相关模块的get请求去访问这个pd状态的url,如果请求出错,判断这个pd实例的状态为down3.生成pd的leader状态的url(或者叫做api),例如http://172.168.1.3:2379/pd/api/v1/leader,执行 GetLeader()方法调用http相关模块的get请求去访问这个url,通过访问这个url会返回pd 组件的leader的地址(比如pd leader是172.168.1.4:2379,但是通过访问172.168.1.3:2379/pd/api/v1/leader 也可以将leader的地址也返回,比如可以执行linux命令curl http://172.168.1.3:2379/pd/api/v1/leader 看看返回的信息是怎样的)4.如果本次的pd地址刚好和通过访问leader url返回信息中的leader地址是同1个,则说明本次的pd地址就是leader,则返回"Up|L",否则就返回"Up"
复制代码

七、查询 tikv 组件状态的执行流程

https://docs.pingcap.com/zh/tidb/stable/tidb-scheduling#信息收集


在 tidb 所有组件中,tikv 的组件状态是最复杂的,有 up、Disconnect、Offline、Tombstone 等各种状态,那么这些状态的逻辑是怎么获得的?tikv 获取状态的逻辑主要集中在 src/tiup-1.11.0/pkg/cluster/spec/tikv.go 里面的 checkStoreStatus 和 Status 方法


func checkStoreStatus(ctx context.Context, storeAddr string, tlsCfg *tls.Config, pdList ...string) string {  if len(pdList) < 1 {    return "N/A"  }  pdapi := api.NewPDClient(ctx, pdList, statusQueryTimeout, tlsCfg)  store, err := pdapi.GetCurrentStore(storeAddr)  if err != nil {    if errors.Is(err, api.ErrNoStore) {      return "N/A"    }    return "Down"  }  return store.Store.StateName}
复制代码


总体执行逻辑为:


1.通过http的模块去访问pd的url,例如http://172.168.1.3:2379/pd/api/v1/stores?state=0&state=1&state=2,这个url会返回所有tikv的状态信息,然后把这些信息放到1个叫storesInfo的结构体中,用linux命令curl(或者浏览器访问) 可以访问http://172.168.1.3:2379/pd/api/v1/stores?state=0&state=1&state=22.去循环这个结构体里面的信息,去判断如果某个tikv实例的状态不是Tombstone状态,则终止循环,并且判断这个如果这个实例的状态值是不是Pending Offline状态,如果不是,则直接返回这个tikv实例状态值(如果tikv的状态值是Up、Disconnected、Down 走的就是这个逻辑)3.如果通过循环storesInfo结构体得到某个tikv的实例是Tombstone状态,那么并不会马上结束循环,而是继续去循环,直到找到最大的storeid,然后才返回这个这个tikv store的状态,至于为什么这么涉及,通过查看代码注释,如果pd发生切换, store ID 可能存在重复现象,在这里,笔者就只解读代码的执行逻辑,如果对这个逻辑感兴趣可以查看这个issue(https://github.com/tikv/pd/issues/3303 )和具体的代码逻辑(src/tiup-1.11.0/pkg/cluster/api/pdapi.go GetCurrentStore方法)4.如果某个tikv在集群拓扑文件有offline为true的标识,并且返回的状态是offline,那么就判断这个tikv的状态是Pending Offline
复制代码


    ssh_port: 22    port: 20161    status_port: 20181    deploy_dir: /tidb-deploy/tikv-20161    data_dir: /tidb-data/tikv-20161    log_dir: /tidb-deploy/tikv-20161/log    offline: true
复制代码

八、结论

通过对 tiup cluster display 代码的学习,对如何判断组件的状态有了一个较为清晰的了解,总之来说,tiup 是去通过 http 请求的方式去访问各个组件暴露出来的接口 url 才获得组件的运行状态 (tikv 的状态信息是通过 pd 的 api 获得的),当然如果操作系统的防火墙如果打开的话,也会导致 http 请求失败,就会判断组件的状态是 Down。


发布于: 刚刚阅读数: 2
用户头像

TiDB 社区官网:https://tidb.net/ 2021-12-15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
tiup cluster display 执行流程代码详解_实践案例_TiDB 社区干货传送门_InfoQ写作社区