PD 调度器模块
作者: 薛港 - 移动云原文来源:https://tidb.net/blog/9cdf0f5f
1 调度器管理模块 coordinator
初始化过程分析
因为调度器管理模块从属于 RaftCluster 模块, 所以启动 RaftCluster 模块的时候,我们初始化调度器管理模块,从初始化代码,能够看出来,coordinator 模块依赖上层模块 raft cluster 以及用于发送调作的模块 HeartbeatStreams。
raft cluster:用于存储当前系统最新的 region 以及 store 信息,基于这些信息,我们分析,统计
当前 cluster 数据分布现状,以及冷热现状。调度器模块需要这些信息,然后基于一些
rule, 或者为了系统健康或者更好的用户体验的想法,生成一系列调度操作
HeartbeatStreams:生成的调度操作,最终需要发送给 tikv 节点,用于实施调度操作,从而达
到用户或者系统定义健康的期望。这个模块主要用于连接管理,发现调度操作
// Start starts a cluster.
func (c *RaftCluster) Start(s Server) error {
c.coordinator = newCoordinator(c.ctx, cluster, s.GetHBStreams())
}
// newCoordinator creates a new coordinator.
func newCoordinator(ctx context.Context, cluster *RaftCluster, hbStreams *hbstream.HeartbeatStreams) *coordinator {
ctx, cancel := context.WithCancel(ctx)
opController := schedule.NewOperatorController(ctx, cluster, hbStreams)
return &coordinator{
ctx: ctx,
cancel: cancel,
cluster: cluster,
checkers: schedule.NewCheckerController(ctx, cluster, cluster.ruleManager, opController),
regionScatterer: schedule.NewRegionScatterer(ctx, cluster),
regionSplitter: schedule.NewRegionSplitter(cluster, schedule.NewSplitRegionsHandler(cluster, opController)),
schedulers: make(map[string]*scheduleController),
opController: opController,
hbStreams: hbStreams,
pluginInterface: schedule.NewPluginInterface(),
}
}
调度器管理模块,说白了,就是为了生成一系列调度操作,以及控制调度操作生成的速度,每类调度操作对应 region 的管理,一个 region 对应一个 raft group, 所以针对 region 的管理,就是针对 raft group 成员的管理,例如增加 / 删除 member,基于不同目的 (负载,数据量) 的 raft member 位置转移等,调度模块从属关系 coordinator–>RaftCluster –>Server
调度器 coordinator 模块和其它模块的关系
type RaftCluster struct {
coordinator *coordinator
}
type Server struct {
cluster *cluster.RaftCluster
}
调度器模块定义,从调度器定义里,发现调试器模块包含一系列其它模块
// coordinator is used to manage all schedulers and checkers to decide //if the region needs to be scheduled.
type coordinator struct {
sync.RWMutex
从上面 coordinator 的定义,也能看出来 coordinator 包含一系列子模块,例如子模块分成三类:
不同目的的调度操作生成模块:checkers,regionScatterer,regionSplitter
限制调度速度的模块:opController
用于调度操作发送到 tikv 的模块:hbStreams
生成调度子模块分析
2.1 子模块 CheckerController
这个模块主要用于定期检察所有的数据,如果发现数据副本数,或者位置等不符合用户的期望,会生成一系列的调度操作,这些调度操作会不停的针对 region 实施调度操作,最终达到用户的期望。CheckerController 的初始化以及定义如下:
check 控制器的初始化过程
checkers: schedule.NewCheckerController(ctx, cluster, cluster.ruleManager, opController)
// NewCheckerController create a new CheckerController.
// TODO: isSupportMerge should be removed.
func NewCheckerController(ctx context.Context, cluster opt.Cluster, ruleManager *placement.RuleManager, opController *OperatorController) *CheckerController {
regionWaitingList := cache.NewDefaultCache(DefaultCacheSize)
return &CheckerController{
cluster: cluster,
opts: cluster.GetOpts(),
opController: opController,
learnerChecker: checker.NewLearnerChecker(cluster),
replicaChecker: checker.NewReplicaChecker(cluster, regionWaitingList),
ruleChecker: checker.NewRuleChecker(cluster, ruleManager, regionWaitingList),
mergeChecker: checker.NewMergeChecker(ctx, cluster),
jointStateChecker: checker.NewJointStateChecker(cluster),
regionWaitingList: regionWaitingList,
}
}
Check 控制器的定义,里面包含一系列不同目的的检察器
// CheckerController is used to manage all checkers.
type CheckerController struct {
cluster opt.Cluster
opts *config.PersistOptions
opController *OperatorController
learnerChecker *checker.LearnerChecker
replicaChecker *checker.ReplicaChecker
ruleChecker *checker.RuleChecker
mergeChecker *checker.MergeChecker
jointStateChecker *checker.JointStateChecker
regionWaitingList cache.Cache
}
从检察控制模块 CheckerController 定义能够看出来,这个模块包含一系列用于不同目的检察调度模块,如果每类检察发现问题,会生成对应的调度操作。例如 replicaChecker 模块,这个模块主要用于检察复制数,如果发现问题,生成调度操作,并通过 stream 发现到对应的 TIKV,最终让 region 达到用户期望副本数。由于每类检察调度模块,基本框架一致,我们只要分析一个检察控制模块,其它依次推理分析,本文由于篇幅不会全部展开.
2.1.1 模块 LearnerChecker
初始化,以及模块定义
learnerChecker: checker.NewLearnerChecker(cluster)
// LearnerChecker ensures region has a learner will be promoted.
type LearnerChecker struct {
cluster opt.Cluster
}
// NewLearnerChecker creates a learner checker.
func NewLearnerChecker(cluster opt.Cluster) *LearnerChecker {
return &LearnerChecker{
cluster: cluster,
}
}
2.1.2LearnerChecker 生成调度操作 operator 流程分析
LearnerChecker 调用 chek 函数分析特定的 region, 如果发现这个 region 存在 learner 角色,并且这个 learner 角色,满足升级到 voter 角色要求,那么就会创建一个 operator,用于升级这个 region 的 learndr 角色的副本到 voter 角色,从而让这个副本能够参与后期选举。
// Check verifies a region’s role, creating an Operator if need.
func (l *LearnerChecker) Check(region *core.RegionInfo) *operator.Operator {
for _, p := range region.GetLearners() {
op, err := operator.CreatePromoteLearnerOperator(“promote-learner”, l.cluster, region, p)
continue
}
return op
}
return nil
}
重点分析函数 CreatePromoteLearnerOperator:
op, err := operator.CreatePromoteLearnerOperator(“promote-learner”, l.cluster, region, p)
// CreatePromoteLearnerOperator creates an operator that promotes a learner.
func CreatePromoteLearnerOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, cluster, region).
PromoteLearner(peer.GetStoreId()).
Build(0)
}
我们下面重点分析模块 Builder 模块,这个模块用于针对 region 生成相应 operator
build 模块定义如下
// Builder is used to create operators. Usage:
// op, err := NewBuilder(desc, cluster, region).
// RemovePeer(store1).
// AddPeer(peer1).
// SetLeader(store2).
// Build(kind)
// The generated Operator will choose the most appropriate execution order
// according to various constraints.
type Builder struct {
// basic info
desc string
cluster opt.Cluster
regionID uint64
regionEpoch *metapb.RegionEpoch
rules []*placement.Rule
expectedRoles map[uint64]placement.PeerRoleType
Build 模块初始化过程如下
1. 根据参数 region 对象,抽取出这个 region 对应的所有 peer 成员存放 store,处于 Pending 状态存入的 store,当前的 leader 所在的 store,
// NewBuilder creates a Builder.
func NewBuilder(desc string, cluster opt.Cluster, region *core.RegionInfo, opts …BuilderOption) *Builder {
b := &Builder{
desc: desc,
cluster: cluster,
regionID: region.GetID(),
regionEpoch: region.GetRegionEpoch(),
}
下一步分析 build 的 PromoteLearner 方法, 这个方法目的:根据你想要升级的 learner peer,初始化相关 build 成员 , 也就是修改 build 用于存放目标 peer 的角色 ,从之前 leaner 角色,修改成 metapb.PeerRole_Voter
// CreatePromoteLearnerOperator creates an operator that promotes a learner.
func CreatePromoteLearnerOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, cluster, region).
PromoteLearner(peer.GetStoreId()).
Build(0)
}
// PromoteLearner records a promote learner operation in Builder.
func (b *Builder) PromoteLearner(storeID uint64) *Builder {
下一步高用 build 的 Build(0) 方法,这个方法,会根据 build 成员的状态,生成相应的调试操作,原理其实也很简单,build 记录两部分信息,一部分信息对应原始的 region 信息,另一部分对应 region 期望的 region 目标状态信息。然后两组信息对比,最终生成调度操作(用于修正 region 到目标状态)。
// Build creates the Operator.
func (b *Builder) Build(kind OpKind) (*Operator, error) {
var brief string
if brief, b.err = b.prepareBuild(); b.err != nil {
return nil, b.err
}
两组信息对比算法如下,我们重点关注 leaner 到 follower 角色的转换.
b.originPeers 保存的 region 原始状态信息
b.targetPeers 保存的 region 期望的目标状态信息
在我们的例子里,如果 通过 b.originPeers 发现 原始的 peer 是 leaner, b.targetPeers 对应 peer 的期望状态是 voter 角色。那么我们会保存一条记录,表示那个 store 的 peer 角色由 learner 升级到 voter 角色
// Initialize intermediate states.
// TODO: simplify the code
func (b *Builder) prepareBuild() (string, error) {
b.toPromote = newPeersMap()
// Diff originPeers
and targetPeers
to initialize toAdd
, toRemove
, toPromote
, toDemote
.
for _, o := range b.originPeers {
n := b.targetPeers[o.GetStoreId()]
if core.IsLearner(o) {
if !core.IsLearner(n) {
// learner -> voter
b.toPromote.Set(n)
}
}
}
每个调度操作,分很多 steps, 下面这个函数用于生成调度操作的 steps
// Some special cases, and stores that do not support using joint consensus.
func (b *Builder) buildStepsWithoutJointConsensus(kind OpKind) (OpKind, error) {
for len(b.toPromote) > 0 {
plan := b.peerPlan()
调用 b.peerPlan 生成对应 step paln, 最终基于 stepplan 生成操作步骤,
func (b *Builder) peerPlan() stepPlan {
if p := b.planPromotePeer(); !p.IsEmpty() {
return p
}
}
func (b *Builder) planPromotePeer() stepPlan {
for _, i := range b.toPromote.IDs() {
peer := b.toPromote[i]
return stepPlan{promote: peer}
}
}
type stepPlan struct {
promote *metapb.Peer
}
基于 stepplan 生成具体的 steps。
PromoteLearner 表示 promote 调度步骤,stote 表示 leaner 对应的 stroe,peerID 表示对应 peer
if plan.promote != nil {
b.execPromoteLearner(plan.promote)
}
func (b *Builder) execPromoteLearner(peer *metapb.Peer) {
b.steps = append(b.steps, PromoteLearner{ToStore: peer.GetStoreId(), PeerID: peer.GetId()})
}
// PromoteLearner is an OpStep that promotes a region learner peer to normal voter.
type PromoteLearner struct {
ToStore, PeerID uint64
}
基于以上步骤,调用 build 函数生成 opeartor,代码如下:
// CreatePromoteLearnerOperator creates an operator that promotes a learner.
func CreatePromoteLearnerOperator(desc string, cluster opt.Cluster, region *core.RegionInfo, peer *metapb.Peer) (*Operator, error) {
return NewBuilder(desc, cluster, region).
PromoteLearner(peer.GetStoreId()).
Build(0)
}
// Build creates the Operator.
func (b *Builder) Build(kind OpKind) (*Operator, error) {
return NewOperator(b.desc, brief, b.regionID, b.regionEpoch, kind, b.steps…), nil
}
// NewOperator creates a new operator.
func NewOperator(desc, brief string, regionID uint64, regionEpoch *metapb.RegionEpoch, kind OpKind, steps …OpStep) *Operator {
return &Operator{
desc: desc,
brief: brief,
regionID: regionID,
regionEpoch: regionEpoch,
kind: kind,
steps: steps,
stepsTime: make([]int64, len(steps)),
status: NewOpStatusTracker(),
level: level,
AdditionalInfos: make(map[string]string),
}
}
限制调度速度的模块 opController
模块定义
opController := schedule.NewOperatorController(ctx, cluster, hbStreams)
// NewOperatorController creates a OperatorController.
func NewOperatorController(ctx context.Context, cluster opt.Cluster, hbStreams *hbstream.HeartbeatStreams) *OperatorController {
return &OperatorController{
ctx: ctx,
cluster: cluster,
operators: make(map[uint64]*operator.Operator),
hbStreams: hbStreams,
histories: list.New(),
counts: make(map[operator.OpKind]uint64),
opRecords: NewOperatorRecords(ctx),
storesLimit: make(map[uint64]map[storelimit.Type]*storelimit.StoreLimit),
wop: NewRandBuckets(),
wopStatus: NewWaitingOperatorStatus(),
opNotifierQueue: make(operatorQueue, 0),
}
}
// OperatorController is used to limit the speed of scheduling.
type OperatorController struct {
sync.RWMutex
ctx context.Context
cluster opt.Cluster
operators map[uint64]*operator.Operator
hbStreams *hbstream.HeartbeatStreams
histories *list.List
counts map[operator.OpKind]uint64
opRecords *OperatorRecords
storesLimit map[uint64]map[storelimit.Type]*storelimit.StoreLimit
wop WaitingOperator
wopStatus *WaitingOperatorStatus
opNotifierQueue operatorQueue
}
这个模块的核心功能,就是控制把 opeartor 发送到 TIKV 的速度,所以我们找出 opcontrol 发送 opeartor 到 tikv 的函数:
// Dispatch is used to dispatch the operator of a region.
func (oc *OperatorController) Dispatch(region *core.RegionInfo, source string) {
// Check existed operator.
if op := oc.GetOperator(region.GetID()); op != nil {
// Update operator status:
// The operator status should be STARTED.
// Check will call CheckSuccess and CheckTimeout.
step := op.Check(region)
oc.SendScheduleCommand(region, step, source), 针对指定 region 对应 opeartor 的 step 到 tikv 节点, 前面分析我们的 step 对应 PromoteLearner, 也就是上升 leaner 到 voter。生成 heatbeatResponse 对象,这个对象会通过 region 的心跳请求回复给这个 tikv. 从而推动 tikv 升级 region peer 到 voter
// SendScheduleCommand sends a command to the region.
func (oc *OperatorController) SendScheduleCommand(region *core.RegionInfo, step operator.OpStep, source string) {
var cmd *pdpb.RegionHeartbeatResponse
switch st := step.(type) {
case operator.PromoteLearner:
cmd = &pdpb.RegionHeartbeatResponse{
ChangePeer: &pdpb.ChangePeer{
// reuse AddNode type
ChangeType: eraftpb.ConfChangeType_AddNode,
Peer: &metapb.Peer{
Id: st.PeerID,
StoreId: st.ToStore,
Role: metapb.PeerRole_Voter,
},
},
}
第三个部分,分析 tikv 和 pd 心跳流处理
用于处理心跳信息模块 hbStreams
心跳回复信息发送, 这个函数会把消息发送到 hbstreams 的 chanel 。用于消息的异常发送
oc.hbStreams.SendMsg(region, cmd)
// SendMsg sends a message to related store.
func (s *HeartbeatStreams) SendMsg(region *core.RegionInfo, msg *pdpb.RegionHeartbeatResponse) {
hbstream 模块,会启动一个后台服务线程,用于定期从 channel 模块里取出消息,处理发送到相应的 tikv
func (s *HeartbeatStreams) run() {
for {
select {
}
HeartbeatStreams 模块通过一个 map 结构管理和其它 TIKV 结点的流, 这个数据结构的数据更新,通过每次心跳 grpc 处理更新
/ RegionHeartbeat implements gRPC PDServer.
func (s *Server) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error {
server := &heartbeatServer{stream: stream}
s.hbStreams.BindStream(storeID, server)
}
type heartbeatServer struct {
stream pdpb.PD_RegionHeartbeatServer
closed int32
}
heartbeatServer 是 pdpb.PD_RegionHeartbeatServer 包装器
func (s *heartbeatServer) Send(m *pdpb.RegionHeartbeatResponse) error {
版权声明: 本文为 InfoQ 作者【TiDB 社区干货传送门】的原创文章。
原文链接:【http://xie.infoq.cn/article/51a900fdf6ae315a33f3c20ad】。文章转载请联系作者。
评论