写点什么

PD 调度器模块

  • 2022 年 7 月 11 日
  • 本文字数:8588 字

    阅读完需:约 28 分钟

作者: 薛港 - 移动云原文来源:https://tidb.net/blog/9cdf0f5f


1 调度器管理模块 coordinator


初始化过程分析


因为调度器管理模块从属于 RaftCluster 模块, 所以启动 RaftCluster 模块的时候,我们初始化调度器管理模块,从初始化代码,能够看出来,coordinator 模块依赖上层模块 raft cluster 以及用于发送调作的模块 HeartbeatStreams。


raft cluster:用于存储当前系统最新的 region 以及 store 信息,基于这些信息,我们分析,统计


当前 cluster 数据分布现状,以及冷热现状。调度器模块需要这些信息,然后基于一些


rule, 或者为了系统健康或者更好的用户体验的想法,生成一系列调度操作


HeartbeatStreams:生成的调度操作,最终需要发送给 tikv 节点,用于实施调度操作,从而达


到用户或者系统定义健康的期望。这个模块主要用于连接管理,发现调度操作


// Start starts a cluster.


func (c *RaftCluster) Start(s Server) error {


c.coordinator = newCoordinator(c.ctx, cluster, s.GetHBStreams())


}


// newCoordinator creates a new coordinator.


func newCoordinator(ctx context.Context, cluster *RaftCluster, hbStreams *hbstream.HeartbeatStreams) *coordinator {


ctx, cancel := context.WithCancel(ctx)


opController := schedule.NewOperatorController(ctx, cluster, hbStreams)


return &coordinator{


ctx: ctx,


cancel: cancel,


cluster: cluster,


checkers: schedule.NewCheckerController(ctx, cluster, cluster.ruleManager, opController),


regionScatterer: schedule.NewRegionScatterer(ctx, cluster),


regionSplitter: schedule.NewRegionSplitter(cluster, schedule.NewSplitRegionsHandler(cluster, opController)),


schedulers: make(map[string]*scheduleController),


opController: opController,


hbStreams: hbStreams,


pluginInterface: schedule.NewPluginInterface(),


}


}


调度器管理模块,说白了,就是为了生成一系列调度操作,以及控制调度操作生成的速度,每类调度操作对应 region 的管理,一个 region 对应一个 raft group, 所以针对 region 的管理,就是针对 raft group 成员的管理,例如增加 / 删除 member,基于不同目的 (负载,数据量) 的 raft member 位置转移等,调度模块从属关系 coordinator–>RaftCluster –>Server


调度器 coordinator 模块和其它模块的关系


type RaftCluster struct {


coordinator *coordinator


}


type Server struct {


cluster *cluster.RaftCluster


}


调度器模块定义,从调度器定义里,发现调试器模块包含一系列其它模块


// coordinator is used to manage all schedulers and checkers to decide //if the region needs to be scheduled.


type coordinator struct {


sync.RWMutex


  wg              sync.WaitGroup  ctx             context.Context  cancel          context.CancelFunc  cluster         *RaftCluster  checkers        *schedule.CheckerController  regionScatterer *schedule.RegionScatterer  regionSplitter  *schedule.RegionSplitter  schedulers      map[string]*scheduleController  opController    *schedule.OperatorController  hbStreams       *hbstream.HeartbeatStreams  pluginInterface *schedule.PluginInterface}
复制代码


从上面 coordinator 的定义,也能看出来 coordinator 包含一系列子模块,例如子模块分成三类:


不同目的的调度操作生成模块:checkers,regionScatterer,regionSplitter


限制调度速度的模块:opController


用于调度操作发送到 tikv 的模块:hbStreams


生成调度子模块分析


2.1 子模块 CheckerController


这个模块主要用于定期检察所有的数据,如果发现数据副本数,或者位置等不符合用户的期望,会生成一系列的调度操作,这些调度操作会不停的针对 region 实施调度操作,最终达到用户的期望。CheckerController 的初始化以及定义如下:


check 控制器的初始化过程


checkers: schedule.NewCheckerController(ctx, cluster, cluster.ruleManager, opController)


// NewCheckerController create a new CheckerController.


// TODO: isSupportMerge should be removed.


func NewCheckerController(ctx context.Context, cluster opt.Cluster, ruleManager *placement.RuleManager, opController *OperatorController) *CheckerController {


regionWaitingList := cache.NewDefaultCache(DefaultCacheSize)


return &CheckerController{


cluster: cluster,


opts: cluster.GetOpts(),


opController: opController,


learnerChecker: checker.NewLearnerChecker(cluster),


replicaChecker: checker.NewReplicaChecker(cluster, regionWaitingList),


ruleChecker: checker.NewRuleChecker(cluster, ruleManager, regionWaitingList),


mergeChecker: checker.NewMergeChecker(ctx, cluster),


jointStateChecker: checker.NewJointStateChecker(cluster),


regionWaitingList: regionWaitingList,


}


}


Check 控制器的定义,里面包含一系列不同目的的检察器


// CheckerController is used to manage all checkers.


type CheckerController struct {


cluster opt.Cluster


opts *config.PersistOptions


opController *OperatorController


learnerChecker *checker.LearnerChecker


replicaChecker *checker.ReplicaChecker


ruleChecker *checker.RuleChecker


mergeChecker *checker.MergeChecker


jointStateChecker *checker.JointStateChecker


regionWaitingList cache.Cache


}


从检察控制模块 CheckerController 定义能够看出来,这个模块包含一系列用于不同目的检察调度模块,如果每类检察发现问题,会生成对应的调度操作。例如 replicaChecker 模块,这个模块主要用于检察复制数,如果发现问题,生成调度操作,并通过 stream 发现到对应的 TIKV,最终让 region 达到用户期望副本数。由于每类检察调度模块,基本框架一致,我们只要分析一个检察控制模块,其它依次推理分析,本文由于篇幅不会全部展开.


2.1.1 模块 LearnerChecker


初始化,以及模块定义


learnerChecker: checker.NewLearnerChecker(cluster)


// LearnerChecker ensures region has a learner will be promoted.


type LearnerChecker struct {


cluster opt.Cluster


}


// NewLearnerChecker creates a learner checker.


func NewLearnerChecker(cluster opt.Cluster) *LearnerChecker {


return &LearnerChecker{


cluster: cluster,


}


}


2.1.2LearnerChecker 生成调度操作 operator 流程分析


LearnerChecker 调用 chek 函数分析特定的 region, 如果发现这个 region 存在 learner 角色,并且这个 learner 角色,满足升级到 voter 角色要求,那么就会创建一个 operator,用于升级这个 region 的 learndr 角色的副本到 voter 角色,从而让这个副本能够参与后期选举。


// Check verifies a region’s role, creating an Operator if need.


func (l *LearnerChecker) Check(region *core.RegionInfo) *operator.Operator {


for _, p := range region.GetLearners() {


op, err := operator.CreatePromoteLearnerOperator(“promote-learner”, l.cluster, region, p)


continue


}


return op


}


return nil


}


重点分析函数 CreatePromoteLearnerOperator:


op, err := operator.CreatePromoteLearnerOperator(“promote-learner”, l.cluster, region, p)


// CreatePromoteLearnerOperator creates an operator that promotes a learner.


func CreatePromoteLearnerOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {


return NewBuilder(desc, cluster, region).


PromoteLearner(peer.GetStoreId()).


Build(0)


}


我们下面重点分析模块 Builder 模块,这个模块用于针对 region 生成相应 operator


build 模块定义如下


// Builder is used to create operators. Usage:


// op, err := NewBuilder(desc, cluster, region).


// RemovePeer(store1).


// AddPeer(peer1).


// SetLeader(store2).


// Build(kind)


// The generated Operator will choose the most appropriate execution order


// according to various constraints.


type Builder struct {


// basic info


desc string


cluster opt.Cluster


regionID uint64


regionEpoch *metapb.RegionEpoch


rules []*placement.Rule


expectedRoles map[uint64]placement.PeerRoleType


  // operation record  originPeers         peersMap  unhealthyPeers      peersMap  originLeaderStoreID uint64  targetPeers         peersMap  targetLeaderStoreID uint64  err                 error
// skip origin check flags skipOriginJointStateCheck bool
// build flags allowDemote bool useJointConsensus bool lightWeight bool forceTargetLeader bool
// intermediate states currentPeers peersMap currentLeaderStoreID uint64 toAdd, toRemove, toPromote, toDemote peersMap // pending tasks. steps []OpStep // generated steps. peerAddStep map[uint64]int // record at which step a peer is created.
// comparison function stepPlanPreferFuncs []func(stepPlan) int // for buildStepsWithoutJointConsensus}
复制代码


Build 模块初始化过程如下


1. 根据参数 region 对象,抽取出这个 region 对应的所有 peer 成员存放 store,处于 Pending 状态存入的 store,当前的 leader 所在的 store,


// NewBuilder creates a Builder.


func NewBuilder(desc string, cluster opt.Cluster, region *core.RegionInfo, opts …BuilderOption) *Builder {


b := &Builder{


desc: desc,


cluster: cluster,


regionID: region.GetID(),


regionEpoch: region.GetRegionEpoch(),


}


  // options  for _, option := range opts {    option(b)  }
// origin peers err := b.err originPeers := newPeersMap() unhealthyPeers := newPeersMap()
for _, p := range region.GetPeers() { originPeers.Set(p) }
for _, p := range region.GetPendingPeers() { unhealthyPeers.Set(p) }
for _, p := range region.GetDownPeers() { unhealthyPeers.Set(p.Peer) }
// origin leader originLeaderStoreID := region.GetLeader().GetStoreId() b.rules = rules b.originPeers = originPeers b.unhealthyPeers = unhealthyPeers b.originLeaderStoreID = originLeaderStoreID b.targetPeers = originPeers.Copy() b.allowDemote = supportJointConsensus return b}
复制代码


下一步分析 build 的 PromoteLearner 方法, 这个方法目的:根据你想要升级的 learner peer,初始化相关 build 成员 , 也就是修改 build 用于存放目标 peer 的角色 ,从之前 leaner 角色,修改成 metapb.PeerRole_Voter


// CreatePromoteLearnerOperator creates an operator that promotes a learner.


func CreatePromoteLearnerOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {


return NewBuilder(desc, cluster, region).


PromoteLearner(peer.GetStoreId()).


Build(0)


}


// PromoteLearner records a promote learner operation in Builder.


func (b *Builder) PromoteLearner(storeID uint64) *Builder {


    b.targetPeers.Set(&metapb.Peer{      Id:      peer.GetId(),      StoreId: peer.GetStoreId(),      Role:    metapb.PeerRole_Voter,    }
return b}
复制代码


下一步高用 build 的 Build(0) 方法,这个方法,会根据 build 成员的状态,生成相应的调试操作,原理其实也很简单,build 记录两部分信息,一部分信息对应原始的 region 信息,另一部分对应 region 期望的 region 目标状态信息。然后两组信息对比,最终生成调度操作(用于修正 region 到目标状态)。


// Build creates the Operator.


func (b *Builder) Build(kind OpKind) (*Operator, error) {


var brief string


if brief, b.err = b.prepareBuild(); b.err != nil {


return nil, b.err


}


    kind, b.err = b.buildStepsWithoutJointConsensus(kind)  }  
return NewOperator(b.desc, brief, b.regionID, b.regionEpoch, kind, b.steps...), nil}
复制代码


两组信息对比算法如下,我们重点关注 leaner 到 follower 角色的转换.


b.originPeers 保存的 region 原始状态信息


b.targetPeers 保存的 region 期望的目标状态信息


在我们的例子里,如果 通过 b.originPeers 发现 原始的 peer 是 leaner, b.targetPeers 对应 peer 的期望状态是 voter 角色。那么我们会保存一条记录,表示那个 store 的 peer 角色由 learner 升级到 voter 角色


// Initialize intermediate states.


// TODO: simplify the code


func (b *Builder) prepareBuild() (string, error) {


b.toPromote = newPeersMap()


// Diff originPeers and targetPeers to initialize toAdd, toRemove, toPromote, toDemote.


for _, o := range b.originPeers {


n := b.targetPeers[o.GetStoreId()]


if core.IsLearner(o) {


if !core.IsLearner(n) {


// learner -> voter


b.toPromote.Set(n)


}


}


}


  b.peerAddStep = make(map[uint64]int)
return b.brief(), nil}
复制代码


每个调度操作,分很多 steps, 下面这个函数用于生成调度操作的 steps


// Some special cases, and stores that do not support using joint consensus.


func (b *Builder) buildStepsWithoutJointConsensus(kind OpKind) (OpKind, error) {


for len(b.toPromote) > 0 {


plan := b.peerPlan()


    if plan.promote != nil {      b.execPromoteLearner(plan.promote)    }  }    return kind, nil}
复制代码


调用 b.peerPlan 生成对应 step paln, 最终基于 stepplan 生成操作步骤,


func (b *Builder) peerPlan() stepPlan {


if p := b.planPromotePeer(); !p.IsEmpty() {


return p


}


}


func (b *Builder) planPromotePeer() stepPlan {


for _, i := range b.toPromote.IDs() {


peer := b.toPromote[i]


return stepPlan{promote: peer}


}


}


type stepPlan struct {


promote *metapb.Peer


}


基于 stepplan 生成具体的 steps。


PromoteLearner 表示 promote 调度步骤,stote 表示 leaner 对应的 stroe,peerID 表示对应 peer


if plan.promote != nil {


b.execPromoteLearner(plan.promote)


}


func (b *Builder) execPromoteLearner(peer *metapb.Peer) {


b.steps = append(b.steps, PromoteLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId()})


}


// PromoteLearner is an OpStep that promotes a region learner peer to normal voter.


type PromoteLearner struct {


ToStore, PeerID uint64


}


基于以上步骤,调用 build 函数生成 opeartor,代码如下:


// CreatePromoteLearnerOperator creates an operator that promotes a learner.


func CreatePromoteLearnerOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {


return NewBuilder(desc, cluster, region).


PromoteLearner(peer.GetStoreId()).


Build(0)


}


// Build creates the Operator.


func (b *Builder) Build(kind OpKind) (*Operator, error) {


return NewOperator(b.desc, brief, b.regionID, b.regionEpoch, kind, b.steps…), nil


}


// NewOperator creates a new operator.


func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.RegionEpoch, kind OpKind, steps …OpStep) *Operator {


return &Operator{


desc: desc,


brief: brief,


regionID: regionID,


regionEpoch: regionEpoch,


kind: kind,


steps: steps,


stepsTime: make([]int64, len(steps)),


status: NewOpStatusTracker(),


level: level,


AdditionalInfos: make(map[string]string),


}


}


限制调度速度的模块 opController


模块定义


opController := schedule.NewOperatorController(ctx, cluster, hbStreams)


// NewOperatorController creates a OperatorController.


func NewOperatorController(ctx context.Context, cluster opt.Cluster, hbStreams *hbstream.HeartbeatStreams) *OperatorController {


return &OperatorController{


ctx: ctx,


cluster: cluster,


operators: make(map[uint64]*operator.Operator),


hbStreams: hbStreams,


histories: list.New(),


counts: make(map[operator.OpKind]uint64),


opRecords: NewOperatorRecords(ctx),


storesLimit: make(map[uint64]map[storelimit.Type]*storelimit.StoreLimit),


wop: NewRandBuckets(),


wopStatus: NewWaitingOperatorStatus(),


opNotifierQueue: make(operatorQueue, 0),


}


}


// OperatorController is used to limit the speed of scheduling.


type OperatorController struct {


sync.RWMutex


ctx context.Context


cluster opt.Cluster


operators map[uint64]*operator.Operator


hbStreams *hbstream.HeartbeatStreams


histories *list.List


counts map[operator.OpKind]uint64


opRecords *OperatorRecords


storesLimit map[uint64]map[storelimit.Type]*storelimit.StoreLimit


wop WaitingOperator


wopStatus *WaitingOperatorStatus


opNotifierQueue operatorQueue


}


这个模块的核心功能,就是控制把 opeartor 发送到 TIKV 的速度,所以我们找出 opcontrol 发送 opeartor 到 tikv 的函数:


// Dispatch is used to dispatch the operator of a region.


func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {


// Check existed operator.


if op := oc.GetOperator(region.GetID()); op != nil {


// Update operator status:


// The operator status should be STARTED.


// Check will call CheckSuccess and CheckTimeout.


step := op.Check(region)


    switch op.Status() {    case operator.STARTED:      oc.SendScheduleCommand(region, step, source)    }  }}
复制代码


oc.SendScheduleCommand(region, step, source), 针对指定 region 对应 opeartor 的 step 到 tikv 节点, 前面分析我们的 step 对应 PromoteLearner, 也就是上升 leaner 到 voter。生成 heatbeatResponse 对象,这个对象会通过 region 的心跳请求回复给这个 tikv. 从而推动 tikv 升级 region peer 到 voter


// SendScheduleCommand sends a command to the region.


func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step operator.OpStep, source string) {


var cmd *pdpb.RegionHeartbeatResponse


switch st := step.(type) {


case operator.PromoteLearner:


cmd = &pdpb.RegionHeartbeatResponse{


ChangePeer: &pdpb.ChangePeer{


// reuse AddNode type


ChangeType: eraftpb.ConfChangeType_AddNode,


Peer: &metapb.Peer{


Id: st.PeerID,


StoreId: st.ToStore,


Role: metapb.PeerRole_Voter,


},


},


}


  oc.hbStreams.SendMsg(region, cmd)}
复制代码


第三个部分,分析 tikv 和 pd 心跳流处理


用于处理心跳信息模块 hbStreams


心跳回复信息发送, 这个函数会把消息发送到 hbstreams 的 chanel 。用于消息的异常发送


oc.hbStreams.SendMsg(region, cmd)


// SendMsg sends a message to related store.


func (s *HeartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) {


  select {  case s.msgCh <- msg:  case <-s.hbStreamCtx.Done():  }}
复制代码


hbstream 模块,会启动一个后台服务线程,用于定期从 channel 模块里取出消息,处理发送到相应的 tikv


func (s *HeartbeatStreams) run() {


for {


select {


    case msg := <-s.msgCh:      storeID := msg.GetTargetPeer().GetStoreId()      storeLabel := strconv.FormatUint(storeID, 10)      store := s.storeInformer.GetStore(storeID)
storeAddress := store.GetAddress() if stream, ok := s.streams[storeID]; ok { if err := stream.Send(msg); }
复制代码


}


HeartbeatStreams 模块通过一个 map 结构管理和其它 TIKV 结点的流, 这个数据结构的数据更新,通过每次心跳 grpc 处理更新


/ RegionHeartbeat implements gRPC PDServer.


func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {


server := &heartbeatServer{stream: stream}


s.hbStreams.BindStream(storeID, server)


}


type heartbeatServer struct {


stream pdpb.PD_RegionHeartbeatServer


closed int32


}


heartbeatServer 是 pdpb.PD_RegionHeartbeatServer 包装器


func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error {


  done := make(chan error, 1)  go func() { done <- s.stream.Send(m) }()  select {  case err := <-done:    return errors.WithStack(err)  case <-time.After(regionHeartbeatSendTimeout):    atomic.StoreInt32(&s.closed, 1)    return errors.WithStack(errSendRegionHeartbeatTimeout)  }}
复制代码


发布于: 刚刚阅读数: 2
用户头像

TiDB 社区官网:https://tidb.net/ 2021.12.15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
PD 调度器模块_TiDB 底层架构_TiDB 社区干货传送门_InfoQ写作社区