// NewClient creates a PD client.

func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {

return NewClientWithContext(context.Background(), pdAddrs, security, opts…)


// newBaseClient returns a new baseClient.

func newBaseClient(ctx context.Context, urls []string, security SecurityOption, opts …ClientOption) (*baseClient, error) {

ctx1, cancel := context.WithCancel(ctx)

c := &baseClient{

urls: urls,

checkLeaderCh: make(chan struct{}, 1),

checkTSODispatcherCh: make(chan struct{}, 1),


  c.initRetry(c.initClusterID);  c.initRetry(c.updateMember);    c.wg.Add(1)  go c.memberLoop()
return c, nil}

client 包含子模块 basic client ,这个子模块目的很明确,基于用户的请求,尽量发送到 pd leader 和 tso dc-location 目的地。因为部分请求,只能 PD leader 或者 dc-location leader 能够处理,如果不是 leader 收到请求,还是要转发到对应的 leader。而 basic client 就是为了维护两类 leader, 以及建立连接.

baseClient 子模块分析, 主要包含以下信息:

对应 pd leader 对象

对应 follower 对象

dc-locatopn 对应的 leader 对象以及 grpc

一些 channel,checkLeaderCh,checkTSODispatcherCh 。用于两类 leader 的管理

// baseClient is a basic client for all other complex client.

type baseClient struct {

urls []string

clusterID uint64

// PD leader URL

leader atomic.Value // Store as string

// PD follower URLs

followers atomic.Value // Store as []string

// dc-location -> TSO allocator leader gRPC connection

clientConns sync.Map // Store as map[string]*grpc.ClientConn

// dc-location -> TSO allocator leader URL

allocators sync.Map // Store as map[string]string

checkLeaderCh chan struct{}

checkTSODispatcherCh chan struct{}


下面我们详细分析 basic client 的创建流程,三个核心步骤

调用 c.updateMember

func (c *baseClient) updateMember() error {

for _, u := range c.urls {

members, err := c.getMembers(ctx, u)

更新 c.clientConns 以及 c.allocators,用于匹配 dc-location 到 grpc 连接


        更新pd所有成员的url    c.updateURLs(members.GetMembers())                更新pd follower的urls    c.updateFollowers(members.GetMembers(), members.GetLeader())        更新c.leader,指定pd的leader.以及指定dc global对应的leader    c.switchLeader(members.GetLeader().GetClientUrls());         给c.checkTSODispatcherCh发送空消息,触发check tso    c.scheduleCheckTSODispatcher()    return nil  }  }

启动后台服务线程 go c.memberLoop()

这个后台服务,每隔 1 分钟触发一次 updateMemer 用于更新 pd 成员角色。或者通过 channel 触发成员角色更新

func (c *baseClient) memberLoop() {

for {

select {

case <-c.checkLeaderCh:

case <-time.After(time.Minute):

case <-ctx.Done():



if err := c.updateMember(); err != nil {

log.Error(”[pd] failed updateMember”, errs.ZapError(err))




Client 流程分析:

核心流程分析, 当用户创建 pd 客户端的时候,触发以下步骤,

1. 创建 client 对象,包含子模块 basic client 对象的创建

2. 调用 c.updateTSODispatcher(),

3. 启动一系列后台管理线程

// NewClient creates a PD client.

func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {

return NewClientWithContext(context.Background(), pdAddrs, security, opts…)


// NewClientWithContext creates a PD client with context.func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
base, err := newBaseClient(ctx, addrsToUrls(pdAddrs), security, opts...)
c := &client{ baseClient: base, checkTSDeadlineCh: make(chan struct{}), }
c.updateTSODispatcher() c.wg.Add(3) go c.tsLoop() go c.tsCancelLoop() go c.leaderCheckLoop()
return c, nil}

调度每个 dc-location,用于请求批量处理发往这个 dc-location 的 tso 请要求

在 client,每个 dc 对应一个 TSODispatcher,dispatcher 用于批量发送请求到相应的 dc-location leader

func (c *client) updateTSODispatcher() {

// Set up the new TSO dispatcher

c.allocators.Range(func(dcLocationKey, _ interface{}) bool {

dcLocation := dcLocationKey.(string)

if !c.checkTSODispatcher(dcLocation) {

go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)


return true



启动后台服务 handleDispatcher,这个后台服务调用 processTSORequests 用于批量从 PD 请求一定数目的 TSO

go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)

func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoDispatcher chan *tsoRequest) {

for {
select { case first := <-tsoDispatcher: pendingPlus1 := len(tsoDispatcher) + 1 requests[0] = first for i := 1; i < pendingPlus1; i++ { requests[i] = <-tsoDispatcher } done := make(chan struct{}) dl := deadline{ timer: time.After(c.timeout), done: done, cancel: cancel, } tsDeadlineCh, ok := c.tsDeadline.Load(dc)
select { case tsDeadlineCh.(chan deadline) <- dl: case <-dispatcherCtx.Done(): return } err = c.processTSORequests(stream, dc, requests[:pendingPlus1], opts) } }


client 后台服务

tsLoop 流程分析:定期检察或者通过 checkTSODispatcherCh 触发是否有新加入的 dc-location,如果有的话,调用 updateTSODispatcher, 用于批量处理发往这个 dc-location 的 tso 请求

func (c *client) tsLoop() {

  ticker := time.NewTicker(tsLoopDCCheckInterval)
for { c.updateTSODispatcher() select { case <-ticker.C: case <-c.checkTSODispatcherCh: case <-loopCtx.Done(): return } }}

tsCancelLoop 后台服务线程主要用于 tso 请求超时检察,原理比较简单,当我们处理每次 TSO 请求时,会生成一个对象发送给对应 dc-location 对应超时处理的 channel. 这个对象包含超时检察以及超时处理函数。 然后在 tsCancelLoop 服务里,我们会 watch 每个 dc-location,watch 函数会进入一个 loop. 等待 channel 收到对象。如果发现对象,进入超时处理,要么超时,取消操作,要么处理完成,接受下一个对象

func (c *client) tsCancelLoop() {

  ticker := time.NewTicker(tsLoopDCCheckInterval)
for { // Watch every dc-location's tsDeadlineCh c.allocators.Range(func(dcLocation, _ interface{}) bool { c.watchTSDeadline(tsCancelLoopCtx, dcLocation.(string)) return true }) select { case <-c.checkTSDeadlineCh: continue case <-ticker.C: continue case <-tsCancelLoopCtx.Done(): return } }}

func (c *client) watchTSDeadline(ctx context.Context, dcLocation string) {

if _, exist := c.tsDeadline.Load(dcLocation); !exist {

tsDeadlineCh := make(chan deadline, 1)

c.tsDeadline.Store(dcLocation, tsDeadlineCh)

go func(dc string, tsDeadlineCh <-chan deadline) {

for {

select {

case d := <-tsDeadlineCh:

select {

case <-d.timer:

log.Error(“tso request is canceled due to timeout”, zap.String(“dc-location”, dc), errs.ZapError(errs.ErrClientGetTSOTimeout))


case <-d.done:

case <-ctx.Done():



case <-ctx.Done():




}(dcLocation, tsDeadlineCh)



后台 leader 检察线程,如果发现 leader 变化,更新相关信息

func (c *client) leaderCheckLoop() {

  ticker := time.NewTicker(LeaderHealthCheckInterval)  defer ticker.Stop()
for { select { case <-c.ctx.Done(): return case <-ticker.C: c.checkLeaderHealth(leaderCheckLoopCtx) } }}

