写点什么

PD 三类选主流程梳理

  • 2022 年 7 月 11 日
  • 本文字数:4569 字

    阅读完需:约 15 分钟

作者: 薛港 - 移动云原文来源:https://tidb.net/blog/a6d4157a



PD 涉及三类选主


1.ETCD 选主


启动 etcd, 调用 embed.StartEtcd(s.etcdCfg),


etcd, err := embed.StartEtcd(s.etcdCfg)


等待 etcd 选主完成, 通过等待 channel (etcd.Server.ReadyNotify()), 这个 channel 收到通知表明 etcd cluster 完成选主,可以对外提供服务


select {


// Wait etcd until it is ready to use


case <-etcd.Server.ReadyNotify():


case <-newCtx.Done():


return errs.ErrCancelStartEtcd.FastGenByArgs()


}


后台启动线程,定期(时间间隔 s.cfg.LeaderPriorityCheckInterval)检察当前 PD 和 ETCD leader 的优化级,如果发现当前 pd 优化级更高,调用 etcd tranfer leader,切换 etcd leader 为当前 pd


func (s *Server) etcdLeaderLoop() {


  for {    select {    case <-time.After(s.cfg.LeaderPriorityCheckInterval.Duration):      s.member.CheckPriority(ctx)    case <-ctx.Done():      log.Info("server is closed, exit etcd leader loop")      return    }  }}
复制代码


// CheckPriority checks whether the etcd leader should be moved according to the priority.


func (m *Member) CheckPriority(ctx context.Context) {


etcdLeader := m.GetEtcdLeader()


  myPriority, err := m.GetMemberLeaderPriority(m.ID())
leaderPriority, err := m.GetMemberLeaderPriority(etcdLeader)
if myPriority > leaderPriority { err := m.MoveEtcdLeader(ctx, etcdLeader, m.ID())
}}
复制代码


2.PD leader 选主


初始化 pd server 中 member 成员, 这个对象用于 pd leader 选主


func (s *Server) startServer(ctx context.Context) error {


s.member = member.NewMember(etcd, client, etcdServerID)


}


func (s *Server) startServer(ctx context.Context) error {


s.member.MemberInfo(s.cfg, s.Name(), s.rootPath)s.member.SetMemberDeployPath(s.member.ID())s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion)s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash)
复制代码


}


启动后台服务线程 s.leaderLoop(),用于 pd 的选主


1. 检察当前是否有 leader, 如果已经存在 leader, 这个 pd 不用参与选主,只要 watch 当前的 leader,直到 leader 过期补删除


2. 如果 leader 过期,或者当前没有 pd leader, 调用 s.campaignLeader() 启动选主


2.1 调用 s.member.CampaignLeader 开始选主,原理很简单,利用 etcd 的事务操作,如果能够写入特定的 key value,就表示写主成功


2.2 调用后台服务线程,不停的续约 PD leader,保证 leader 一直有效


2.3 因为很多组件依赖 pd 的主,所以当 PD 选主成功以后,会启动很多其它组件的设置工作(tso 组件,id 分配组件,重新加载配置参数)


func (s *Server) leaderLoop() {


defer logutil.LogPanic()


defer s.serverLoopWg.Done()


for {  leader, rev, checkAgain := s.member.CheckLeader()  if checkAgain {    continue  }  if leader != nil {    err := s.reloadConfigFromKV()
log.Info("start to watch pd leader", zap.Stringer("pd-leader", leader)) // WatchLeader will keep looping and never return unless the PD leader has changed. s.member.WatchLeader(s.serverLoopCtx, leader, rev) syncer.StopSyncWithLeader() log.Info("pd leader has changed, try to re-campaign a pd leader") }
// To make sure the etcd leader and PD leader are on the same server. etcdLeader := s.member.GetEtcdLeader() if etcdLeader != s.member.ID() { time.Sleep(200 * time.Millisecond) continue } s.campaignLeader()}
复制代码


}


func (s *Server) campaignLeader() {


log.Info(“start to campaign pd leader”, zap.String(“campaign-pd-leader-name”, s.Name()))


if err := s.member.CampaignLeader(s.cfg.LeaderLease); err != nil {


}
// maintain the PD leadergo s.member.KeepLeader(ctx)log.Info("campaign pd leader ok", zap.String("campaign-pd-leader-name", s.Name()))
alllocator, err := s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)if err != nil { log.Error("failed to get the global TSO allocator", errs.ZapError(err)) return}log.Info("initializing the global TSO allocator")if err := alllocator.Initialize(0); err != nil { log.Error("failed to initialize the global TSO allocator", errs.ZapError(err)) return}defer s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation)// Check the cluster dc-location after the PD leader is electedgo s.tsoAllocatorManager.ClusterDCLocationChecker()
if err := s.reloadConfigFromKV(); err != nil { log.Error("failed to reload configuration", errs.ZapError(err)) return}


// Try to create raft cluster.if err := s.createRaftCluster(); err != nil { log.Error("failed to create raft cluster", errs.ZapError(err)) return}defer s.stopRaftCluster()if err := s.persistOptions.LoadTTLFromEtcd(s.ctx, s.client); err != nil {
return}if err := s.idAllocator.Rebase(); err != nil { return}s.member.EnableLeader()
复制代码


}


3.TSO 分配器选主,tso 分为两类,


3.1 global tso 分配器,


用于保证 TSO 全局线性增加,它的 leader 使用的是 pd leader, 从下面的代码就能够知道 global 的 leader 就是使用的是 pd 的 leader(s.member.GetLeadership())


s.tsoAllocatorManager.SetUpAllocator(ctx, tso.GlobalDCLocation, s.member.GetLeadership())


3.2 dc tso 分配器 ,


用于保证每上 DC 内的 TSO 分配线性增加,每个 dc 内的 pd 会选出一个主.


启动后台服务线程,定期 (时间间隔 patrolStep) 调 am.allocatorPatroller(serverCtx)


allocatorPatroller 函数检察是否有新的 dc,如果有的话,创建这个 DC 对应的 tso 分配器,并创建新的 leadership 用于 dc 内的 leader 选主。选主调用函数 allocatorLeaderLoop,过程和 pd 选主类似


// Check if we have any new dc-location configured, if yes,


// then set up the corresponding local allocator.


func (am *AllocatorManager) allocatorPatroller(serverCtx context.Context) {


// Collect all dc-locations


dcLocations := am.GetClusterDCLocations()


// Get all Local TSO Allocators


allocatorGroups := am.getAllocatorGroups(FilterDCLocation(GlobalDCLocation))


// Set up the new one


for dcLocation := range dcLocations {


if slice.NoneOf(allocatorGroups, func(i int) bool {


return allocatorGroups[i].dcLocation == dcLocation


}) {


am.SetUpAllocator(serverCtx, dcLocation, election.NewLeadership(


am.member.Client(),


am.getAllocatorPath(dcLocation),


fmt.Sprintf(“%s local allocator leader election”, dcLocation),


))


}


}


}


allocatorLeaderLoop 分析:


1. 如果发现当前 dc 已经有 dc tso leader, 那么 watch 这个 leader,直到 leader 无效


2. 如果发现 etcd 保存有 nextleader,表明之前有 tranfer leader 的请求,如果当前 pd 不等于 nextleader , 那么本次不参与 pd 选主


3. 调用 campaignAllocatorLeader 进行选主,过程和 pd leader 选主类似,也是利用 etcd 的事务机制,写 leader key-value, 成功表明选主完成


func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *LocalTSOAllocator) {


defer log.Info(“server is closed, return local tso allocator leader loop”,


zap.String(“dc-location”, allocator.GetDCLocation()),


zap.String(“local-tso-allocator-name”, am.member.Member().Name))


for {


select {


case <-ctx.Done():


return


default:


}


  // Check whether the Local TSO Allocator has the leader already  allocatorLeader, rev, checkAgain := allocator.CheckAllocatorLeader()  if checkAgain {    continue  }  if allocatorLeader != nil {    log.Info("start to watch allocator leader",      zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.GetDCLocation()), allocatorLeader),      zap.String("local-tso-allocator-name", am.member.Member().Name))    // WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed.    allocator.WatchAllocatorLeader(ctx, allocatorLeader, rev)    log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader",      zap.String("dc-location", allocator.GetDCLocation()))  }
// Check the next-leader key nextLeader, err := am.getNextLeaderID(allocator.GetDCLocation()) if err != nil { log.Error("get next leader from etcd failed", zap.String("dc-location", allocator.GetDCLocation()), errs.ZapError(err)) time.Sleep(200 * time.Millisecond) continue } isNextLeader := false if nextLeader != 0 { if nextLeader != am.member.ID() { log.Info("skip campaigning of the local tso allocator leader and check later", zap.String("server-name", am.member.Member().Name), zap.Uint64("server-id", am.member.ID()), zap.Uint64("next-leader-id", nextLeader)) time.Sleep(200 * time.Millisecond) continue } isNextLeader = true }
// Make sure the leader is aware of this new dc-location in order to make the // Global TSO synchronization can cover up this dc-location. ok, dcLocationInfo, err := am.getDCLocationInfoFromLeader(ctx, allocator.GetDCLocation()) if err != nil { log.Error("get dc-location info from pd leader failed", zap.String("dc-location", allocator.GetDCLocation()), errs.ZapError(err)) // PD leader hasn't been elected out, wait for the campaign if !longSleep(ctx, time.Second) { return } continue } if !ok || dcLocationInfo.Suffix <= 0 { log.Warn("pd leader is not aware of dc-location during allocatorLeaderLoop, wait next round", zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("wait-duration", checkStep.String())) // Because the checkStep is long, we use select here to check whether the ctx is done // to prevent the leak of goroutine. if !longSleep(ctx, checkStep) { return } continue }
am.campaignAllocatorLeader(ctx, allocator, dcLocationInfo, isNextLeader)}
复制代码


}


发布于: 刚刚阅读数: 2
用户头像

TiDB 社区官网:https://tidb.net/ 2021.12.15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
PD 三类选主流程梳理_TiDB 底层架构_TiDB 社区干货传送门_InfoQ写作社区