作者: 薛港 - 移动云原文来源:https://tidb.net/blog/a6d4157a
PD 涉及三类选主
1.ETCD 选主
启动 etcd, 调用 embed.StartEtcd(s.etcdCfg),
etcd, err := embed.StartEtcd(s.etcdCfg)
等待 etcd 选主完成, 通过等待 channel (etcd.Server.ReadyNotify()), 这个 channel 收到通知表明 etcd cluster 完成选主,可以对外提供服务
select {
// Wait etcd until it is ready to use
case <-etcd.Server.ReadyNotify():
case <-newCtx.Done():
return errs.ErrCancelStartEtcd.FastGenByArgs()
}
后台启动线程,定期(时间间隔 s.cfg.LeaderPriorityCheckInterval)检察当前 PD 和 ETCD leader 的优化级,如果发现当前 pd 优化级更高,调用 etcd tranfer leader,切换 etcd leader 为当前 pd
func (s *Server) etcdLeaderLoop() {
for {
select {
case <-time.After(s.cfg.LeaderPriorityCheckInterval.Duration):
s.member.CheckPriority(ctx)
case <-ctx.Done():
log.Info("server is closed, exit etcd leader loop")
return
}
}
}
复制代码
// CheckPriority checks whether the etcd leader should be moved according to the priority.
func (m *Member) CheckPriority(ctx context.Context) {
etcdLeader := m.GetEtcdLeader()
myPriority, err := m.GetMemberLeaderPriority(m.ID())
leaderPriority, err := m.GetMemberLeaderPriority(etcdLeader)
if myPriority > leaderPriority {
err := m.MoveEtcdLeader(ctx, etcdLeader, m.ID())
}
}
复制代码
2.PD leader 选主
初始化 pd server 中 member 成员, 这个对象用于 pd leader 选主
func (s *Server) startServer(ctx context.Context) error {
s.member = member.NewMember(etcd, client, etcdServerID)
}
func (s *Server) startServer(ctx context.Context) error {
s.member.MemberInfo(s.cfg, s.Name(), s.rootPath)
s.member.SetMemberDeployPath(s.member.ID())
s.member.SetMemberBinaryVersion(s.member.ID(), versioninfo.PDReleaseVersion)
s.member.SetMemberGitHash(s.member.ID(), versioninfo.PDGitHash)
复制代码
}
启动后台服务线程 s.leaderLoop(),用于 pd 的选主
1. 检察当前是否有 leader, 如果已经存在 leader, 这个 pd 不用参与选主,只要 watch 当前的 leader,直到 leader 过期补删除
2. 如果 leader 过期,或者当前没有 pd leader, 调用 s.campaignLeader() 启动选主
2.1 调用 s.member.CampaignLeader 开始选主,原理很简单,利用 etcd 的事务操作,如果能够写入特定的 key value,就表示写主成功
2.2 调用后台服务线程,不停的续约 PD leader,保证 leader 一直有效
2.3 因为很多组件依赖 pd 的主,所以当 PD 选主成功以后,会启动很多其它组件的设置工作(tso 组件,id 分配组件,重新加载配置参数)
func (s *Server) leaderLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()
for {
leader, rev, checkAgain := s.member.CheckLeader()
if checkAgain {
continue
}
if leader != nil {
err := s.reloadConfigFromKV()
log.Info("start to watch pd leader", zap.Stringer("pd-leader", leader))
// WatchLeader will keep looping and never return unless the PD leader has changed.
s.member.WatchLeader(s.serverLoopCtx, leader, rev)
syncer.StopSyncWithLeader()
log.Info("pd leader has changed, try to re-campaign a pd leader")
}
// To make sure the etcd leader and PD leader are on the same server.
etcdLeader := s.member.GetEtcdLeader()
if etcdLeader != s.member.ID() {
time.Sleep(200 * time.Millisecond)
continue
}
s.campaignLeader()
}
复制代码
}
func (s *Server) campaignLeader() {
log.Info(“start to campaign pd leader”, zap.String(“campaign-pd-leader-name”, s.Name()))
if err := s.member.CampaignLeader(s.cfg.LeaderLease); err != nil {
}
// maintain the PD leader
go s.member.KeepLeader(ctx)
log.Info("campaign pd leader ok", zap.String("campaign-pd-leader-name", s.Name()))
alllocator, err := s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation)
if err != nil {
log.Error("failed to get the global TSO allocator", errs.ZapError(err))
return
}
log.Info("initializing the global TSO allocator")
if err := alllocator.Initialize(0); err != nil {
log.Error("failed to initialize the global TSO allocator", errs.ZapError(err))
return
}
defer s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation)
// Check the cluster dc-location after the PD leader is elected
go s.tsoAllocatorManager.ClusterDCLocationChecker()
if err := s.reloadConfigFromKV(); err != nil {
log.Error("failed to reload configuration", errs.ZapError(err))
return
}
// Try to create raft cluster.
if err := s.createRaftCluster(); err != nil {
log.Error("failed to create raft cluster", errs.ZapError(err))
return
}
defer s.stopRaftCluster()
if err := s.persistOptions.LoadTTLFromEtcd(s.ctx, s.client); err != nil {
return
}
if err := s.idAllocator.Rebase(); err != nil {
return
}
s.member.EnableLeader()
复制代码
}
3.TSO 分配器选主,tso 分为两类,
3.1 global tso 分配器,
用于保证 TSO 全局线性增加,它的 leader 使用的是 pd leader, 从下面的代码就能够知道 global 的 leader 就是使用的是 pd 的 leader(s.member.GetLeadership())
s.tsoAllocatorManager.SetUpAllocator(ctx, tso.GlobalDCLocation, s.member.GetLeadership())
3.2 dc tso 分配器 ,
用于保证每上 DC 内的 TSO 分配线性增加,每个 dc 内的 pd 会选出一个主.
启动后台服务线程,定期 (时间间隔 patrolStep) 调 am.allocatorPatroller(serverCtx)
allocatorPatroller 函数检察是否有新的 dc,如果有的话,创建这个 DC 对应的 tso 分配器,并创建新的 leadership 用于 dc 内的 leader 选主。选主调用函数 allocatorLeaderLoop,过程和 pd 选主类似
// Check if we have any new dc-location configured, if yes,
// then set up the corresponding local allocator.
func (am *AllocatorManager) allocatorPatroller(serverCtx context.Context) {
// Collect all dc-locations
dcLocations := am.GetClusterDCLocations()
// Get all Local TSO Allocators
allocatorGroups := am.getAllocatorGroups(FilterDCLocation(GlobalDCLocation))
// Set up the new one
for dcLocation := range dcLocations {
if slice.NoneOf(allocatorGroups, func(i int) bool {
return allocatorGroups[i].dcLocation == dcLocation
}) {
am.SetUpAllocator(serverCtx, dcLocation, election.NewLeadership(
am.member.Client(),
am.getAllocatorPath(dcLocation),
fmt.Sprintf(“%s local allocator leader election”, dcLocation),
))
}
}
}
allocatorLeaderLoop 分析:
1. 如果发现当前 dc 已经有 dc tso leader, 那么 watch 这个 leader,直到 leader 无效
2. 如果发现 etcd 保存有 nextleader,表明之前有 tranfer leader 的请求,如果当前 pd 不等于 nextleader , 那么本次不参与 pd 选主
3. 调用 campaignAllocatorLeader 进行选主,过程和 pd leader 选主类似,也是利用 etcd 的事务机制,写 leader key-value, 成功表明选主完成
func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *LocalTSOAllocator) {
defer log.Info(“server is closed, return local tso allocator leader loop”,
zap.String(“dc-location”, allocator.GetDCLocation()),
zap.String(“local-tso-allocator-name”, am.member.Member().Name))
for {
select {
case <-ctx.Done():
return
default:
}
// Check whether the Local TSO Allocator has the leader already
allocatorLeader, rev, checkAgain := allocator.CheckAllocatorLeader()
if checkAgain {
continue
}
if allocatorLeader != nil {
log.Info("start to watch allocator leader",
zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.GetDCLocation()), allocatorLeader),
zap.String("local-tso-allocator-name", am.member.Member().Name))
// WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed.
allocator.WatchAllocatorLeader(ctx, allocatorLeader, rev)
log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader",
zap.String("dc-location", allocator.GetDCLocation()))
}
// Check the next-leader key
nextLeader, err := am.getNextLeaderID(allocator.GetDCLocation())
if err != nil {
log.Error("get next leader from etcd failed",
zap.String("dc-location", allocator.GetDCLocation()),
errs.ZapError(err))
time.Sleep(200 * time.Millisecond)
continue
}
isNextLeader := false
if nextLeader != 0 {
if nextLeader != am.member.ID() {
log.Info("skip campaigning of the local tso allocator leader and check later",
zap.String("server-name", am.member.Member().Name),
zap.Uint64("server-id", am.member.ID()),
zap.Uint64("next-leader-id", nextLeader))
time.Sleep(200 * time.Millisecond)
continue
}
isNextLeader = true
}
// Make sure the leader is aware of this new dc-location in order to make the
// Global TSO synchronization can cover up this dc-location.
ok, dcLocationInfo, err := am.getDCLocationInfoFromLeader(ctx, allocator.GetDCLocation())
if err != nil {
log.Error("get dc-location info from pd leader failed",
zap.String("dc-location", allocator.GetDCLocation()),
errs.ZapError(err))
// PD leader hasn't been elected out, wait for the campaign
if !longSleep(ctx, time.Second) {
return
}
continue
}
if !ok || dcLocationInfo.Suffix <= 0 {
log.Warn("pd leader is not aware of dc-location during allocatorLeaderLoop, wait next round",
zap.String("dc-location", allocator.GetDCLocation()),
zap.Any("dc-location-info", dcLocationInfo),
zap.String("wait-duration", checkStep.String()))
// Because the checkStep is long, we use select here to check whether the ctx is done
// to prevent the leak of goroutine.
if !longSleep(ctx, checkStep) {
return
}
continue
}
am.campaignAllocatorLeader(ctx, allocator, dcLocationInfo, isNextLeader)
}
复制代码
}
评论