写点什么

PD 客户端源码分析

  • 2022 年 7 月 11 日
  • 本文字数:3536 字

    阅读完需:约 12 分钟

作者: 薛港 - 移动云原文来源:https://tidb.net/blog/9dcfabe5


客户端对象创建以及初始化


// NewClient creates a PD client.


func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {


return NewClientWithContext(context.Background(), pdAddrs, security, opts…)


}


// newBaseClient returns a new baseClient.


func newBaseClient(ctx context.Context, urls []string, security SecurityOption, opts …ClientOption) (*baseClient, error) {


ctx1, cancel := context.WithCancel(ctx)


c := &baseClient{


urls: urls,


checkLeaderCh: make(chan struct{}, 1),


checkTSODispatcherCh: make(chan struct{}, 1),


}


  c.initRetry(c.initClusterID);  c.initRetry(c.updateMember);    c.wg.Add(1)  go c.memberLoop()
return c, nil}
复制代码


client 包含子模块 basic client ,这个子模块目的很明确,基于用户的请求,尽量发送到 pd leader 和 tso dc-location 目的地。因为部分请求,只能 PD leader 或者 dc-location leader 能够处理,如果不是 leader 收到请求,还是要转发到对应的 leader。而 basic client 就是为了维护两类 leader, 以及建立连接.


baseClient 子模块分析, 主要包含以下信息:


对应 pd leader 对象


对应 follower 对象


dc-locatopn 对应的 leader 对象以及 grpc


一些 channel,checkLeaderCh,checkTSODispatcherCh 。用于两类 leader 的管理


// baseClient is a basic client for all other complex client.


type baseClient struct {


urls []string


clusterID uint64


// PD leader URL


leader atomic.Value // Store as string


// PD follower URLs


followers atomic.Value // Store as []string


// dc-location -> TSO allocator leader gRPC connection


clientConns sync.Map // Store as map[string]*grpc.ClientConn


// dc-location -> TSO allocator leader URL


allocators sync.Map // Store as map[string]string


checkLeaderCh chan struct{}


checkTSODispatcherCh chan struct{}


}


下面我们详细分析 basic client 的创建流程,三个核心步骤


调用 c.updateMember


func (c *baseClient) updateMember() error {


for _, u := range c.urls {


members, err := c.getMembers(ctx, u)


更新 c.clientConns 以及 c.allocators,用于匹配 dc-location 到 grpc 连接


c.switchTSOAllocatorLeader(members.GetTsoAllocatorLeaders());


        更新pd所有成员的url    c.updateURLs(members.GetMembers())                更新pd follower的urls    c.updateFollowers(members.GetMembers(), members.GetLeader())        更新c.leader,指定pd的leader.以及指定dc global对应的leader    c.switchLeader(members.GetLeader().GetClientUrls());         给c.checkTSODispatcherCh发送空消息,触发check tso    c.scheduleCheckTSODispatcher()    return nil  }  }
复制代码


启动后台服务线程 go c.memberLoop()


这个后台服务,每隔 1 分钟触发一次 updateMemer 用于更新 pd 成员角色。或者通过 channel 触发成员角色更新


func (c *baseClient) memberLoop() {


for {


select {


case <-c.checkLeaderCh:


case <-time.After(time.Minute):


case <-ctx.Done():


return


}


if err := c.updateMember(); err != nil {


log.Error(”[pd] failed updateMember”, errs.ZapError(err))


}


}


}


Client 流程分析:


核心流程分析, 当用户创建 pd 客户端的时候,触发以下步骤,


1. 创建 client 对象,包含子模块 basic client 对象的创建


2. 调用 c.updateTSODispatcher(),


3. 启动一系列后台管理线程


// NewClient creates a PD client.


func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {


return NewClientWithContext(context.Background(), pdAddrs, security, opts…)


}


// NewClientWithContext creates a PD client with context.func NewClientWithContext(ctx context.Context, pdAddrs []string, security SecurityOption, opts ...ClientOption) (Client, error) {
base, err := newBaseClient(ctx, addrsToUrls(pdAddrs), security, opts...)
c := &client{ baseClient: base, checkTSDeadlineCh: make(chan struct{}), }
c.updateTSODispatcher() c.wg.Add(3) go c.tsLoop() go c.tsCancelLoop() go c.leaderCheckLoop()
return c, nil}
复制代码


调度每个 dc-location,用于请求批量处理发往这个 dc-location 的 tso 请要求


在 client,每个 dc 对应一个 TSODispatcher,dispatcher 用于批量发送请求到相应的 dc-location leader


func (c *client) updateTSODispatcher() {


// Set up the new TSO dispatcher


c.allocators.Range(func(dcLocationKey, _ interface{}) bool {


dcLocation := dcLocationKey.(string)


if !c.checkTSODispatcher(dcLocation) {


go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)


}


return true


})


}
复制代码


启动后台服务 handleDispatcher,这个后台服务调用 processTSORequests 用于批量从 PD 请求一定数目的 TSO


go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)


func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoDispatcher chan *tsoRequest) {


for {
select { case first := <-tsoDispatcher: pendingPlus1 := len(tsoDispatcher) + 1 requests[0] = first for i := 1; i < pendingPlus1; i++ { requests[i] = <-tsoDispatcher } done := make(chan struct{}) dl := deadline{ timer: time.After(c.timeout), done: done, cancel: cancel, } tsDeadlineCh, ok := c.tsDeadline.Load(dc)
select { case tsDeadlineCh.(chan deadline) <- dl: case <-dispatcherCtx.Done(): return } err = c.processTSORequests(stream, dc, requests[:pendingPlus1], opts) } }
复制代码


}


client 后台服务


tsLoop 流程分析:定期检察或者通过 checkTSODispatcherCh 触发是否有新加入的 dc-location,如果有的话,调用 updateTSODispatcher, 用于批量处理发往这个 dc-location 的 tso 请求


func (c *client) tsLoop() {


  ticker := time.NewTicker(tsLoopDCCheckInterval)
for { c.updateTSODispatcher() select { case <-ticker.C: case <-c.checkTSODispatcherCh: case <-loopCtx.Done(): return } }}
复制代码


tsCancelLoop 后台服务线程主要用于 tso 请求超时检察,原理比较简单,当我们处理每次 TSO 请求时,会生成一个对象发送给对应 dc-location 对应超时处理的 channel. 这个对象包含超时检察以及超时处理函数。 然后在 tsCancelLoop 服务里,我们会 watch 每个 dc-location,watch 函数会进入一个 loop. 等待 channel 收到对象。如果发现对象,进入超时处理,要么超时,取消操作,要么处理完成,接受下一个对象


func (c *client) tsCancelLoop() {


  ticker := time.NewTicker(tsLoopDCCheckInterval)
for { // Watch every dc-location's tsDeadlineCh c.allocators.Range(func(dcLocation, _ interface{}) bool { c.watchTSDeadline(tsCancelLoopCtx, dcLocation.(string)) return true }) select { case <-c.checkTSDeadlineCh: continue case <-ticker.C: continue case <-tsCancelLoopCtx.Done(): return } }}
复制代码


func (c *client) watchTSDeadline(ctx context.Context, dcLocation string) {


if _, exist := c.tsDeadline.Load(dcLocation); !exist {


tsDeadlineCh := make(chan deadline, 1)


c.tsDeadline.Store(dcLocation, tsDeadlineCh)


go func(dc string, tsDeadlineCh <-chan deadline) {


for {


select {


case d := <-tsDeadlineCh:


select {


case <-d.timer:


log.Error(“tso request is canceled due to timeout”, zap.String(“dc-location”, dc), errs.ZapError(errs.ErrClientGetTSOTimeout))


d.cancel()


case <-d.done:


case <-ctx.Done():


return


}


case <-ctx.Done():


return


}


}


}(dcLocation, tsDeadlineCh)


}


}


后台 leader 检察线程,如果发现 leader 变化,更新相关信息


func (c *client) leaderCheckLoop() {


  ticker := time.NewTicker(LeaderHealthCheckInterval)  defer ticker.Stop()
for { select { case <-c.ctx.Done(): return case <-ticker.C: c.checkLeaderHealth(leaderCheckLoopCtx) } }}
复制代码


发布于: 刚刚阅读数: 3
用户头像

TiDB 社区官网:https://tidb.net/ 2021.12.15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
PD 客户端源码分析_安装 & 部署_TiDB 社区干货传送门_InfoQ写作社区