PD 客户端源码分析
作者: 薛港 - 移动云原文来源:https://tidb.net/blog/9dcfabe5
客户端对象创建以及初始化
// NewClient creates a PD client.
func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {
return NewClientWithContext(context.Background(), pdAddrs, security, opts…)
}
// newBaseClient returns a new baseClient.
func newBaseClient(ctx context.Context, urls []string, security SecurityOption, opts …ClientOption) (*baseClient, error) {
ctx1, cancel := context.WithCancel(ctx)
c := &baseClient{
urls: urls,
checkLeaderCh: make(chan struct{}, 1),
checkTSODispatcherCh: make(chan struct{}, 1),
}
client 包含子模块 basic client ,这个子模块目的很明确,基于用户的请求,尽量发送到 pd leader 和 tso dc-location 目的地。因为部分请求,只能 PD leader 或者 dc-location leader 能够处理,如果不是 leader 收到请求,还是要转发到对应的 leader。而 basic client 就是为了维护两类 leader, 以及建立连接.
baseClient 子模块分析, 主要包含以下信息:
对应 pd leader 对象
对应 follower 对象
dc-locatopn 对应的 leader 对象以及 grpc
一些 channel,checkLeaderCh,checkTSODispatcherCh 。用于两类 leader 的管理
// baseClient is a basic client for all other complex client.
type baseClient struct {
urls []string
clusterID uint64
// PD leader URL
leader atomic.Value // Store as string
// PD follower URLs
followers atomic.Value // Store as []string
// dc-location -> TSO allocator leader gRPC connection
clientConns sync.Map // Store as map[string]*grpc.ClientConn
// dc-location -> TSO allocator leader URL
allocators sync.Map // Store as map[string]string
checkLeaderCh chan struct{}
checkTSODispatcherCh chan struct{}
}
下面我们详细分析 basic client 的创建流程,三个核心步骤
调用 c.updateMember
func (c *baseClient) updateMember() error {
for _, u := range c.urls {
members, err := c.getMembers(ctx, u)
更新 c.clientConns 以及 c.allocators,用于匹配 dc-location 到 grpc 连接
c.switchTSOAllocatorLeader(members.GetTsoAllocatorLeaders());
启动后台服务线程 go c.memberLoop()
这个后台服务,每隔 1 分钟触发一次 updateMemer 用于更新 pd 成员角色。或者通过 channel 触发成员角色更新
func (c *baseClient) memberLoop() {
for {
select {
case <-c.checkLeaderCh:
case <-time.After(time.Minute):
case <-ctx.Done():
return
}
if err := c.updateMember(); err != nil {
log.Error(”[pd] failed updateMember”, errs.ZapError(err))
}
}
}
Client 流程分析:
核心流程分析, 当用户创建 pd 客户端的时候,触发以下步骤,
1. 创建 client 对象,包含子模块 basic client 对象的创建
2. 调用 c.updateTSODispatcher(),
3. 启动一系列后台管理线程
// NewClient creates a PD client.
func NewClient(pdAddrs []string, security SecurityOption, opts …ClientOption) (Client, error) {
return NewClientWithContext(context.Background(), pdAddrs, security, opts…)
}
调度每个 dc-location,用于请求批量处理发往这个 dc-location 的 tso 请要求
在 client,每个 dc 对应一个 TSODispatcher,dispatcher 用于批量发送请求到相应的 dc-location leader
func (c *client) updateTSODispatcher() {
// Set up the new TSO dispatcher
c.allocators.Range(func(dcLocationKey, _ interface{}) bool {
dcLocation := dcLocationKey.(string)
if !c.checkTSODispatcher(dcLocation) {
go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)
}
return true
})
启动后台服务 handleDispatcher,这个后台服务调用 processTSORequests 用于批量从 PD 请求一定数目的 TSO
go c.handleDispatcher(dispatcherCtx, dcLocation, tsoRequestCh)
func (c *client) handleDispatcher(dispatcherCtx context.Context, dc string, tsoDispatcher chan *tsoRequest) {
}
client 后台服务
tsLoop 流程分析:定期检察或者通过 checkTSODispatcherCh 触发是否有新加入的 dc-location,如果有的话,调用 updateTSODispatcher, 用于批量处理发往这个 dc-location 的 tso 请求
func (c *client) tsLoop() {
tsCancelLoop 后台服务线程主要用于 tso 请求超时检察,原理比较简单,当我们处理每次 TSO 请求时,会生成一个对象发送给对应 dc-location 对应超时处理的 channel. 这个对象包含超时检察以及超时处理函数。 然后在 tsCancelLoop 服务里,我们会 watch 每个 dc-location,watch 函数会进入一个 loop. 等待 channel 收到对象。如果发现对象,进入超时处理,要么超时,取消操作,要么处理完成,接受下一个对象
func (c *client) tsCancelLoop() {
func (c *client) watchTSDeadline(ctx context.Context, dcLocation string) {
if _, exist := c.tsDeadline.Load(dcLocation); !exist {
tsDeadlineCh := make(chan deadline, 1)
c.tsDeadline.Store(dcLocation, tsDeadlineCh)
go func(dc string, tsDeadlineCh <-chan deadline) {
for {
select {
case d := <-tsDeadlineCh:
select {
case <-d.timer:
log.Error(“tso request is canceled due to timeout”, zap.String(“dc-location”, dc), errs.ZapError(errs.ErrClientGetTSOTimeout))
d.cancel()
case <-d.done:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}(dcLocation, tsDeadlineCh)
}
}
后台 leader 检察线程,如果发现 leader 变化,更新相关信息
func (c *client) leaderCheckLoop() {
版权声明: 本文为 InfoQ 作者【TiDB 社区干货传送门】的原创文章。
原文链接:【http://xie.infoq.cn/article/0ffab878e39ace476f89dddb1】。文章转载请联系作者。
评论