写点什么

PD 关于 tso 分配源代码分析

  • 2022 年 7 月 11 日
  • 本文字数:14775 字

    阅读完需:约 48 分钟

作者: 薛港 - 移动云原文来源:https://tidb.net/blog/89aac039


tsoAllocatorManager 分析


TSO 有一个分配管理模块 tsoAllocatorManager ,这个模块用于管理所有的 TSO 分配,当前 PD 支持两类 TSO,一类是 global TSO 分配 用于保证全局事务线性增长。 还有一种是 DC-LOCATION TSO 分配器,用于保证特定 DC 内的事务线性增长。


tsoAllocatorManager 模块的初始化以及定义:


s.tsoAllocatorManager = tso.NewAllocatorManager(


s.member, s.rootPath, s.cfg.TSOSaveInterval.Duration, s.cfg.TSOUpdatePhysicalInterval.Duration,


func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },


s.GetTLSConfig())


// NewAllocatorManager creates a new TSO Allocator Manager.


func NewAllocatorManager(


m *member.Member,


rootPath string,


saveInterval time.Duration,


updatePhysicalInterval time.Duration,


maxResetTSGap func() time.Duration,


sc *grpcutil.TLSConfig,


) *AllocatorManager {


allocatorManager := &AllocatorManager{


member: m,


rootPath: rootPath,


saveInterval: saveInterval,


updatePhysicalInterval: updatePhysicalInterval,


maxResetTSGap: maxResetTSGap,


securityConfig: sc,


}


allocatorManager.mu.allocatorGroups = make(map[string]*allocatorGroup)


allocatorManager.mu.clusterDCLocations = make(map[string]*DCLocationInfo)


allocatorManager.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn)


return allocatorManager


}


// AllocatorManager is used to manage the TSO Allocators a PD server holds.


// It is in charge of maintaining TSO allocators’ leadership, checking election


// priority, and forwarding TSO allocation requests to correct TSO Allocators.


type AllocatorManager struct {


mu struct {


sync.RWMutex


// There are two kinds of TSO Allocators:


// 1. Global TSO Allocator, as a global single point to allocate


// TSO for global transactions, such as cross-region cases.


// 2. Local TSO Allocator, servers for DC-level transactions.


// dc-location/global (string) -> TSO Allocator


allocatorGroups map[string]*allocatorGroup


clusterDCLocations map[string]*DCLocationInfo


// The max suffix sign we have so far, it will be used to calculate


// the number of suffix bits we need in the TSO logical part.


maxSuffix int32


}


wg sync.WaitGroup


// for election use


member *member.Member


// TSO config


rootPath string


saveInterval time.Duration


updatePhysicalInterval time.Duration


maxResetTSGap func() time.Duration


securityConfig *grpcutil.TLSConfig


// for gRPC use


localAllocatorConn struct {


sync.RWMutex


clientConns map[string]*grpc.ClientConn


}


}


allocatorGroups map[string]*allocatorGroup, 当前 PD 支持两类 TSO 分配,一个是全局 TSO 分配器,别一个是 DC-Location 级别的 TSO 分配器,allocatorGroups 的含义:


key: 对应 DC-location,一个 global,别一类是不同的 dc-location 信息


allocatorGroup: 对应具体的 TSO 分配器,


leadership: 用于 tso 分配器的选主,如果是 gloal 分配器,


leadership 就是 pd leadership 定义,如果是 dc-location 的 TSO 分


配器,leadership 就是 dc-location 的选主,用于在 DC 级别内产生


线性 TSO 。


allocator:用于 TSO 的分配,两类对象,对于 global TSO 分配,对象是


GlobalTSOAllocator, 对于 DC TSO 分配,对象是 LocalTSOAllocator


type allocatorGroup struct {


dcLocation string


// For the Global TSO Allocator, leadership is a PD leader’s


// leadership, and for the Local TSO Allocator, leadership


// is a DC-level certificate to allow an allocator to generate


// TSO for local transactions in its DC.


leadership *election.Leadership


allocator Allocator


}


clusterDCLocations map[string]*DCLocationInfo:


Key: 对应 DC-LOATION


DCLocationInfo: 记录这个 DC 的信息,例如这个 DC 对应的 suffix,以及这个 DC 内所有的


server ID


// DCLocationInfo is used to record some dc-location related info,


// such as suffix sign and server IDs in this dc-location.


type DCLocationInfo struct {


// dc-location/global (string) -> Member IDs


ServerIDs []uint64


// dc-location (string) -> Suffix sign. It is collected and maintained by the PD leader.


Suffix int32


}


为什么每个 dc-location 元信息有一个 suffix,他的目的是什么?


因为每个 dc 内的 tso 分配是完全独立的分配的,所以多个 TSO DC 级别的 TSO 分配的值有可


能相等,所以让每个 DC-LOCATION 对应一个后缀(不同 DC 的后缀值不一样),这个后缀


会放在 TSO 的尾部,用于保证每个 dc-location tso 值不相同. 为了保证每个 DC 的 suffix 不


一样,所有 DC 对应的 suffix 的值由 PD leader 维护,并保存在 ETCD 持久化


 例子如下
复制代码


我们有三个 dc:dc-1,dc-2,dc-3.suffix 的位数由常量 GetSuffixBits 函数决定(原理也很简单,当前 sufiix 最大值是多少,然后算出变个最大值对应的 bit 位),这个例子里,suffix 对应 8 个 bit 位。对于 gloabl,所以为 suffix 为 0,对于 dc-1,sufuffix 是 1;dc-2,suffix 是 2, dc-3, 对应的 suffix 是 3. 所以他们的 TSO(一共 18 位)对应如下值


// global: xxxxxxxxxx00000000


// dc-1: xxxxxxxxxx00000001


// dc-2: xxxxxxxxxx00000010


// dc-3: xxxxxxxxxx00000011


// GetSuffixBits calculates the bits of suffix sign


// by the max number of suffix so far,


// which will be used in the TSO logical part.


func (am *AllocatorManager) GetSuffixBits() int {


am.mu.RLock()


defer am.mu.RUnlock()


return CalSuffixBits(am.mu.maxSuffix)


}


TSO 具体分配模块分析


GlobalTSOAllocator


// GlobalTSOAllocator is the global single point TSO allocator.


type GlobalTSOAllocator struct {


// leadership is used to check the current PD server’s leadership


// to determine whether a TSO request could be processed.


leadership *election.Leadership


timestampOracle *timestampOracle


}


// timestampOracle is used to maintain the logic of TSO.


type timestampOracle struct {


client *clientv3.Client


rootPath string


// TODO: remove saveInterval


saveInterval time.Duration


updatePhysicalInterval time.Duration


maxResetTSGap func() time.Duration


// tso info stored in the memory


tsoMux struct {


sync.RWMutex


tso *tsoObject


}


// last timestamp window stored in etcd


lastSavedTime atomic.Value // stored as time.Time


suffix int


dcLocation string


}


Global tso 分配核心算法分析:


这个函数用于产生 TSO 值,分二类情况,


1.cluster 没有管理 dc tso 分配器,整个 cluster 只有一个全局的 tso 分配器,那么 TSO 分配很简


单,直接调用 gta.timestampOracle.getTS(gta.leadership, count, 0)


2. 如果 cluster 同时管理多个 DC 级别的 TSO 控制器,TSO 的分配会稍显麻烦一些,也就是他的最


大值,基于所有的 DC-LOCATION 内的 TSO 最大值,所以要多次和所有的 DC TSO LEADER


通信,导致性能下降明显


// GenerateTSO is used to generate a given number of TSOs.


// Make sure you have initialized the TSO allocator before calling.


func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) {


    检察当前leader 是否有效if !gta.leadership.Check() {  return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf("requested pd %s of cluster", errs.NotLeaderErr))}
如果有dc 级的tso分配器,获得所有的dc-location 的信息,例如有多少dc,每个dc 的suffix以及属于这个dc的server id// To check if we have any dc-location configured in the clusterdcLocationMap := gta.allocatorManager.GetClusterDCLocations()

如果没有dc 级的tso分配器,那么很简单,调用gta.timestampOracle.getTS(gta.leadership, count, 0)获得TSO值 // No dc-locations configured in the cluster, use the normal TSO generation way.if len(dcLocationMap) == 0 { return gta.timestampOracle.getTS(gta.leadership, count, 0)}
如果你配置了dc tso 分配器,并且想使用全局的TSO分配器,就会导致性能下降比较多,因为需要给所有的dc local tso leader 发送 maxTS。这里面隐含性能问题。具体过程如下:1.给所有的Local TSO allocator leader发收请求,用于得到当前全局最大的tso max 值 gta.SyncMaxTS(ctx, dcLocationMap, maxTSO)
2.如果逻辑部分+COUNT,超过逻辑部分上限,那么物理部分+1,逻辑部分直接等于count3.调用SyncMaxTS,把最大TSO时间,同步给所有的local tso 分配器// Have dc-locations configured in the cluster, use the Global TSO generation way.// Send maxTS to all Local TSO Allocator leaders to prewrite

maxTSO := &pdpb.Timestamp{}// 1. Collect the MaxTS with all Local TSO Allocator leaders firstif err := gta.SyncMaxTS(ctx, dcLocationMap, maxTSO); err != nil { return pdpb.Timestamp{}, err}// 2. Add the count and make sure its logical part won't overflow after being differentiated.suffixBits := gta.allocatorManager.GetSuffixBits()gta.preprocessLogical(maxTSO, count, suffixBits)// 3. Sync the MaxTS with all Local TSO Allocator leaders thenif err := gta.SyncMaxTS(ctx, dcLocationMap, maxTSO); err != nil { return pdpb.Timestamp{}, err}// 4. Persist MaxTS into memory, and etcd if neededvar ( currentGlobalTSO pdpb.Timestamp err error)if currentGlobalTSO, err = gta.getCurrentTSO(); err != nil { return pdpb.Timestamp{}, err}if tsoutil.CompareTimestamp(&currentGlobalTSO, maxTSO) < 0 { // Update the Global TSO in memory if err := gta.timestampOracle.resetUserTimestamp(gta.leadership, tsoutil.GenerateTS(maxTSO), true); err != nil { log.Warn("update the global tso in memory failed", errs.ZapError(err)) }}// 5.Differentiate the logical part to make the TSO unique globally by giving it a unique suffix in the whole clustermaxTSO.Logical = gta.timestampOracle.differentiateLogical(maxTSO.Logical, suffixBits)return *maxTSO, nil
复制代码


}


如果 global 没有 DC loccation,那么直接调用 gta.timestampOracle.getTS(gta.leadership, count, 0) 得到 tso 值,逻辑很简单,


// getTS is used to get a timestamp.


func (t *timestampOracle) getTS(leadership *election.Leadership, count uint32, suffixBits int) (pdpb.Timestamp, error) {


var resp pdpb.Timestamp


for i := 0; i < maxRetryCount; i++ {  currentPhysical, currentLogical := t.getTSO()
// Get a new TSO result with the given count resp.Physical, resp.Logical = t.generateTSO(int64(count), suffixBits) if resp.GetPhysical() == 0 { return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("timestamp in memory has been reset") }
// In case lease expired after the first check. if !leadership.Check() { return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs("not the pd or local tso allocator leader") } resp.SuffixBits = uint32(suffixBits) return resp, nil}return resp, errs.ErrGenerateTimestamp.FastGenByArgs("maximum number of retries exceeded")
复制代码


}


// generateTSO will add the TSO’s logical part with the given count and returns the new TSO result.


func (t *timestampOracle) generateTSO(count int64, suffixBits int) (physical int64, logical int64) {


t.tsoMux.Lock()


defer t.tsoMux.Unlock()


if t.tsoMux.tso == nil {


return 0, 0


}


physical = t.tsoMux.tso.physical.UnixNano() / int64(time.Millisecond)


t.tsoMux.tso.logical += count


logical = t.tsoMux.tso.logical


if suffixBits > 0 && t.suffix >= 0 {


logical = t.differentiateLogical(logical, suffixBits)


}


return physical, logical


}


LocalTSOAllocator


TSO 分配算法在 lta.timestampOracle.getTS(lta.leadership, count, lta.allocatorManager.GetSuffixBits() 里,这个函数之前分析过


// LocalTSOAllocator is the DC-level local TSO allocator,


// which is only used to allocate TSO in one DC each.


// One PD server may hold multiple Local TSO Allocators.


type LocalTSOAllocator struct {


allocatorManager *AllocatorManager


// leadership is used to campaign the corresponding DC’s Local TSO Allocator.


leadership *election.Leadership


timestampOracle *timestampOracle


// for election use, notice that the leadership that member holds is


// the leadership for PD leader. Local TSO Allocator’s leadership is for the


// election of Local TSO Allocator leader among several PD servers and


// Local TSO Allocator only use member’s some etcd and pbpd.Member info.


// So it’s not conflicted.


rootPath string


allocatorLeader atomic.Value // stored as *pdpb.Member


}


// NewLocalTSOAllocator creates a new local TSO allocator.


func NewLocalTSOAllocator(


am *AllocatorManager,


leadership *election.Leadership,


dcLocation string,


) Allocator {


return &LocalTSOAllocator{


allocatorManager: am,


leadership: leadership,


timestampOracle: &timestampOracle{


client: leadership.GetClient(),


rootPath: leadership.GetLeaderKey(),


saveInterval: am.saveInterval,


updatePhysicalInterval: am.updatePhysicalInterval,


maxResetTSGap: am.maxResetTSGap,


dcLocation: dcLocation,


},


rootPath: leadership.GetLeaderKey(),


}


}


// GenerateTSO is used to generate a given number of TSOs.


// Make sure you have initialized the TSO allocator before calling.


func (lta *LocalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) {


if !lta.leadership.Check() {


return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf(“requested pd %s of %s allocator”, errs.NotLeaderErr, lta.timestampOracle.dcLocation))


}


return lta.timestampOracle.getTS(lta.leadership, count, lta.allocatorManager.GetSuffixBits())


}


TSO 核心流程分析


1. 创建 tso 分配管理组件


s.tsoAllocatorManager = tso.NewAllocatorManager(


s.member, s.rootPath, s.cfg.TSOSaveInterval.Duration, s.cfg.TSOUpdatePhysicalInterval.Duration,


func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },


s.GetTLSConfig())


2. 配置 TSO global 组件. 也就是设置一个数据元素,am.mu.allocatorGroups[dcLocation]


key 是 global. value 是对象 allocatorGroup 用于后期 gloabl tso 分配


s.tsoAllocatorManager.SetUpAllocator(ctx, tso.GlobalDCLocation, s.member.GetLeadership())


func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, dcLocation string, leadership *election.Leadership) {


创建global tso 分配组件,leadership就是pd 的leadershipallocator = NewGlobalTSOAllocator(am, leadership)
// Create a new allocatorGroupctx, cancel := context.WithCancel(parentCtx)am.mu.allocatorGroups[dcLocation] = &allocatorGroup{ dcLocation: dcLocation, ctx: ctx, cancel: cancel, leadership: leadership, allocator: allocator,}
复制代码


}


如果 PD 的 zone lable 非空,并且设置支持 dc-location tso 分配的话,配置 dc-location TSO 分配.


SetLocalTSOConfig 函数的目的很简单,保存这个 pd 的 dc-location 到 etcd. 方便后台服务线程查询 ETCD,配置 dc-location 的 tso 分配组件


key:am.member.GetDCLocationPath(serverID), 包含这个 server ID


value:dcLocation. 表示这个 PD 对应的 location


ClusterDCLocationChecker 函数分析:


1. 获取所有的保存在 ETCD 里的 DC-LOCATION 以及属于这个 DC 的 serverid , 更新


am.mu.clusterDCLocations[dcLocation]


2. 如果 pd 是 leader, 更新每个 dc leader 对应的 suffix, 以及更新所有 dc 里最大的 suffix


3. 如果 pd 是 follow, 只要更新从 etcd 得到最大的 suffix, 然后更新 am.mu.maxSuffix = maxSuffix


if zone, exist := s.cfg.Labels[config.ZoneLabel]; exist && zone != “” && s.cfg.EnableLocalTSO {


if err = s.tsoAllocatorManager.SetLocalTSOConfig(zone); err != nil {


return err


}


}


// SetLocalTSOConfig receives the zone label of this PD server and write it into etcd as dc-location


// to make the whole cluster know the DC-level topology for later Local TSO Allocator campaign.


func (am *AllocatorManager) SetLocalTSOConfig(dcLocation string) error {


serverName := am.member.Member().Name


serverID := am.member.ID()


  // The key-value pair in etcd will be like: serverID -> dcLocation  dcLocationKey := am.member.GetDCLocationPath(serverID)  resp, err := kv.    NewSlowLogTxn(am.member.Client()).    Then(clientv3.OpPut(dcLocationKey, dcLocation)).    Commit()    go am.ClusterDCLocationChecker()  return nil}
复制代码


后台服务管理


Server 服务会启动 tso 后台服务管理线程,


func (s *Server) startServerLoop(ctx context.Context) {


go s.tsoAllocatorLoop()


}


// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.


func (s *Server) tsoAllocatorLoop() {


s.tsoAllocatorManager.AllocatorDaemon(ctx)


log.Info(“server is closed, exit allocator loop”)


}


// AllocatorDaemon is used to update every allocator’s TSO and check whether we have


// any new local allocator that needs to be set up.


func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) {


tsTicker := time.NewTicker(am.updatePhysicalInterval)


  patrolTicker := time.NewTicker(patrolStep)    checkerTicker := time.NewTicker(PriorityCheck)

for { select { case <-tsTicker.C: am.allocatorUpdater() case <-patrolTicker.C: am.allocatorPatroller(serverCtx) case <-checkerTicker.C: // ClusterDCLocationChecker and PriorityChecker are time consuming and low frequent to run, // we should run them concurrently to speed up the progress. go am.ClusterDCLocationChecker() go am.PriorityChecker() case <-serverCtx.Done(): return } }}
复制代码


am.allocatorUpdater() 分析,


这个函数每隔 am.updatePhysicalInterval 调用这个函数. 这个函数很简单,针对每一个 global 和所有的 dc-location 的 TSO 分配器, 调用 UpdateTSO(),检察内存中 TSO 使用情况,触发保存新的 TSO 值 到 ETCD 里面。 例如每次存储当前时间 +3 秒后 TSO 值,如果后面的时间接近 ETCD 保存的值时,更新存储时间(再加 3S)。 别外如果逻辑时间快用完了,增加内存里对应的物理时间。


UpdateTSO() 详细分析, 这个函数分二类,global 以及 DC 级别:


global 对应处理函数:gta.timestampOracle.UpdateTimestamp(gta.leadership)



 DC 对应处理函数:lta.timestampOracle.UpdateTimestamp(lta.leadership)
复制代码


// Update the Local TSO Allocator leaders TSO in memory concurrently.


func (am *AllocatorManager) allocatorUpdater() {


  // Update each allocator concurrently  for _, ag := range allocatorGroups {    am.wg.Add(1)    go am.updateAllocator(ag)  }  am.wg.Wait()}
复制代码


// updateAllocator is used to update the allocator in the group.


func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {


ag.allocator.UpdateTSO();


}


Globale UpdateTSO


// UpdateTSO is used to update the TSO in memory and the time window in etcd.


func (gta *GlobalTSOAllocator) UpdateTSO() error {


return gta.timestampOracle.UpdateTimestamp(gta.leadership)


}


DC update TSO


// UpdateTSO is used to update the TSO in memory and the time window in etcd


// for all local TSO allocators this PD server hold.


func (lta *LocalTSOAllocator) UpdateTSO() error {


return lta.timestampOracle.UpdateTimestamp(lta.leadership)


}



1. 得到当前 TSO 物理以及逻辑时间


2. 如果当前时间和上次物理时间差值大于 updateTimestampGuard,那么物理时间等于 now


3. 如果逻辑时间超过最大值的一半,物理时间 +1m


4. 如果上次 ETCD 保存的时间和当前时间相差 updateTimestampGuard 。那么更新 ETCD 存储时间,


func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error {


prevPhysical, prevLogical := t.getTSO()


now := time.Now()

jetLag := typeutil.SubTimeByWallClock(now, prevPhysical)

var next time.Time// If the system time is greater, it will be synchronized with the system time.if jetLag > updateTimestampGuard { next = now} else if prevLogical > maxLogical/2 { // The reason choosing maxLogical/2 here is that it's big enough for common cases. // Because there is enough timestamp can be allocated before next update. log.Warn("the logical time may be not enough", zap.Int64("prev-logical", prevLogical)) next = prevPhysical.Add(time.Millisecond)} else { // It will still use the previous physical time to alloc the timestamp. tsoCounter.WithLabelValues("skip_save", t.dcLocation).Inc() return nil}
// It is not safe to increase the physical time to `next`.// The time window needs to be updated and saved to etcd.if typeutil.SubTimeByWallClock(t.lastSavedTime.Load().(time.Time), next) <= updateTimestampGuard { save := next.Add(t.saveInterval) if err := t.saveTimestamp(leadership, save); err != nil { tsoCounter.WithLabelValues("err_save_update_ts", t.dcLocation).Inc() return err }}// save into memoryt.setTSOPhysical(next)
return nil
复制代码


}


am.allocatorPatroller(serverCtx) 分析,


这个函数每隔 patrolStep 时间调用这个函数


如果有新的 dc 加入,配置这个 DC 的 TSO 分配器。如果有 DC 退出,也相应删除这个 DC TSO 分配器。原理也很简单,前面我们也描述,如果有新的 PD 启动,并且配置 DC TSO 分配,我们会在 ETCD 保存这个 KEY-VALUE(KEY 是 server id,value 是 dc-location)


如果发现新的 DC 加入,调用 am.SetUpAllocator 配置 dc tso 分配器


所以下面重点分析 am.SetUpAllocator(前面我们也分析过这个函数,但是前面重点分析是的 global 的配置)。注意 leadership 没有重用 pd 的 leadship,而是创建一个新的 leadership,path 等于 am.getAllocatorPath(dcLocation) 表示这个 dc-location 范围内的 leaer 选主


am.SetUpAllocator 创建这个 dc 对应的 allocatorGroup,然后 DC 内的 leader 选主。


am.allocatorLeaderLoop(parentCtx, localTSOAllocator)


// Check if we have any new dc-location configured, if yes,


// then set up the corresponding local allocator.


func (am *AllocatorManager) allocatorPatroller(serverCtx context.Context) {


// Collect all dc-locations


dcLocations := am.GetClusterDCLocations()


// Get all Local TSO Allocators


allocatorGroups := am.getAllocatorGroups(FilterDCLocation(GlobalDCLocation))


// Set up the new one


for dcLocation := range dcLocations {


if slice.NoneOf(allocatorGroups, func(i int) bool {


return allocatorGroups[i].dcLocation == dcLocation


}) {


am.SetUpAllocator(serverCtx, dcLocation, election.NewLeadership(


am.member.Client(),


am.getAllocatorPath(dcLocation),


fmt.Sprintf(“%s local allocator leader election”, dcLocation),


))


}


}


// Clean up the unused one


for _, ag := range allocatorGroups {


if _, exist := dcLocations[ag.dcLocation]; !exist {


am.deleteAllocatorGroup(ag.dcLocation)


}


}


}


// SetUpAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon.


// One TSO Allocator should only be set once, and may be initialized and reset multiple times depending on the election.


func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, dcLocation string, leadership *election.Leadership) {


var allocator Allocator


allocator = NewLocalTSOAllocator(am, leadership, dcLocation)
// Create a new allocatorGroupctx, cancel := context.WithCancel(parentCtx)am.mu.allocatorGroups[dcLocation] = &allocatorGroup{ dcLocation: dcLocation, ctx: ctx, cancel: cancel, leadership: leadership, allocator: allocator,}
// Start election of the Local TSO Allocator herelocalTSOAllocator, _ := allocator.(*LocalTSOAllocator)go am.allocatorLeaderLoop(parentCtx, localTSOAllocator)
复制代码


}


allocatorLeaderLoop


1. 检察 leader 是否存在,如果 leader 存在,那么 watch 这个 leader,直到 leader 无效


2. 检察是否有 nextid key, 这个值非常有意思,如果有 nextkey, 那么下面参与选主的 pd 一定要等于 nextkey, 不然退出选主(一般用于 dc tso leadr 切换操作)


3. 给 PD leader 发送 dc 信息,让 leader 知道有这个 DC TSO


4.am.campaignAllocatorLeader 触发选主


4.1 调用 allocator.CampaignAllocatorLeader(defaultAllocatorLeaderLease, cmps…) 往 ETCD 写选主信息,如果成功写入,表示选主成功


4.2 调用 go allocator.KeepAllocatorLeader(ctx) 持续续约 leaer key, 保证 leader 有效


4.3 调用 allocator.Initialize(int(dcLocationInfo.Suffix)) 初始化 local tso 内存值, 根据 ETCD 上次保存的值,以及当前时间算出内存 TSO 值


// similar logic with leaderLoop in server/server.go


func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *LocalTSOAllocator) {


for {    // Check whether the Local TSO Allocator has the leader already  allocatorLeader, rev, checkAgain := allocator.CheckAllocatorLeader()  if checkAgain {    continue  }  if allocatorLeader != nil {    log.Info("start to watch allocator leader",      zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.GetDCLocation()), allocatorLeader),      zap.String("local-tso-allocator-name", am.member.Member().Name))    // WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed.    allocator.WatchAllocatorLeader(ctx, allocatorLeader, rev)    log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader",      zap.String("dc-location", allocator.GetDCLocation()))  }
// Check the next-leader key nextLeader, err := am.getNextLeaderID(allocator.GetDCLocation()) if err != nil { log.Error("get next leader from etcd failed", zap.String("dc-location", allocator.GetDCLocation()), errs.ZapError(err)) time.Sleep(200 * time.Millisecond) continue } isNextLeader := false if nextLeader != 0 { if nextLeader != am.member.ID() { log.Info("skip campaigning of the local tso allocator leader and check later", zap.String("server-name", am.member.Member().Name), zap.Uint64("server-id", am.member.ID()), zap.Uint64("next-leader-id", nextLeader)) time.Sleep(200 * time.Millisecond) continue } isNextLeader = true }
// Make sure the leader is aware of this new dc-location in order to make the // Global TSO synchronization can cover up this dc-location. ok, dcLocationInfo, err := am.getDCLocationInfoFromLeader(ctx, allocator.GetDCLocation())
if !ok || dcLocationInfo.Suffix <= 0 { log.Warn("pd leader is not aware of dc-location during allocatorLeaderLoop, wait next round", zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), zap.String("wait-duration", checkStep.String())) // Because the checkStep is long, we use select here to check whether the ctx is done // to prevent the leak of goroutine. if !longSleep(ctx, checkStep) { return } continue }
am.campaignAllocatorLeader(ctx, allocator, dcLocationInfo, isNextLeader)}
复制代码


}


func (am *AllocatorManager) campaignAllocatorLeader(


loopCtx context.Context,


allocator *LocalTSOAllocator,


dcLocationInfo *pdpb.GetDCLocationInfoResponse,


isNextLeader bool,


) {


cmps := make([]clientv3.Cmp, 0)nextLeaderKey := am.nextLeaderKey(allocator.GetDCLocation())if !isNextLeader {  cmps = append(cmps, clientv3.Compare(clientv3.CreateRevision(nextLeaderKey), "=", 0))} else {  nextLeaderValue := fmt.Sprintf("%v", am.member.ID())  cmps = append(cmps, clientv3.Compare(clientv3.Value(nextLeaderKey), "=", nextLeaderValue))}

if err := allocator.CampaignAllocatorLeader(defaultAllocatorLeaderLease, cmps...); err != nil { if err.Error() == errs.ErrEtcdTxnConflict.Error() { } else {
} return}


// Maintain the Local TSO Allocator leadergo allocator.KeepAllocatorLeader(ctx)
if err := allocator.Initialize(int(dcLocationInfo.Suffix)); err != nil {
return}if dcLocationInfo.GetMaxTs().GetPhysical() != 0 { if err := allocator.WriteTSO(dcLocationInfo.GetMaxTs()); err != nil { log.Error("failed to write the max local TSO after member changed", zap.String("dc-location", allocator.GetDCLocation()), zap.Any("dc-location-info", dcLocationInfo), errs.ZapError(err)) return }}am.compareAndSetMaxSuffix(dcLocationInfo.Suffix)allocator.EnableAllocatorLeader()
复制代码


}


am.ClusterDCLocationChecker()


这个函数每隔 PriorityCheck 时间调用这个函数, 这个函数前面分析过,原理也很简单,同步 ETCD 和同步中关于 DCLOCATION 的信息,该更新更新,该删除删除


// ClusterDCLocationChecker collects all dc-locations of a cluster, computes some related info


// and stores them into the DCLocationInfo, then finally writes them into am.mu.clusterDCLocations.


func (am *AllocatorManager) ClusterDCLocationChecker() {


newClusterDCLocations, err := am.GetClusterDCLocationsFromEtcd()


  // May be used to rollback the updating after  newDCLocations := make([]string, 0)  // Update the new dc-locations  for dcLocation, serverIDs := range newClusterDCLocations {    if _, ok := am.mu.clusterDCLocations[dcLocation]; !ok {      am.mu.clusterDCLocations[dcLocation] = &DCLocationInfo{        ServerIDs: serverIDs,        Suffix:    -1,      }      newDCLocations = append(newDCLocations, dcLocation)    }  }  // Only leader can write the TSO suffix to etcd in order to make it consistent in the cluster  if am.member.IsLeader() {    for dcLocation, info := range am.mu.clusterDCLocations {      if info.Suffix > 0 {        continue      }      suffix, err := am.getOrCreateLocalTSOSuffix(dcLocation)      if err != nil {                continue      }      if suffix > am.mu.maxSuffix {        am.mu.maxSuffix = suffix      }      am.mu.clusterDCLocations[dcLocation].Suffix = suffix    }  } else {    // Follower should check and update the am.mu.maxSuffix    maxSuffix, err := am.getMaxLocalTSOSuffix()          am.mu.maxSuffix = maxSuffix      }  am.mu.Unlock()}
复制代码


am.PriorityChecker()


这个函数每隔 PriorityCheck 时间调用这个函数


这个函数很简单,检察 local tso 分配器选主优化级,如果我们想选取 dc-1 的 local tso 的主,优化级规则如下,


1. 有 lable “dc-location=“dc-1” 具有最高的优秀级被选为主


2. 如果所有具有“dc-location=“dc-1” 的 pd 都 down 了,才轮到其它 dc 的 pd 被选为主


例如 DC-1 发现 dc-1 的 leader 所属于的 dc-location 不是 DC-1, 那么触发 leader 切换,在 ETCD 里创建 nextkey,告诉选主,我期望下一个 dc local tso 分配器的主是我自已


func (am *AllocatorManager) PriorityChecker() {


serverID := am.member.ID()


myServerDCLocation, err := am.getServerDCLocation(serverID)


// Check all Local TSO Allocator followers to see if their priorities is higher than the leaders// Filter out allocators with leadership and initializedallocatorGroups := am.getAllocatorGroups(FilterDCLocation(GlobalDCLocation), FilterAvailableLeadership())for _, allocatorGroup := range allocatorGroups {  localTSOAllocator, _ := allocatorGroup.allocator.(*LocalTSOAllocator)  leaderServerID := localTSOAllocator.GetAllocatorLeader().GetMemberId()
leaderServerDCLocation, err := am.getServerDCLocation(leaderServerID) if err != nil { rr)) continue } // For example, an allocator leader for dc-1 is elected by a server of dc-2, then the server of dc-1 will // find this allocator's dc-location isn't the same with server of dc-2 but is same with itself. if allocatorGroup.dcLocation != leaderServerDCLocation && allocatorGroup.dcLocation == myServerDCLocation { err = am.transferLocalAllocator(allocatorGroup.dcLocation, am.member.ID()) }}// Check next leader and resign// Filter out allocators with leadershipallocatorGroups = am.getAllocatorGroups(FilterDCLocation(GlobalDCLocation), FilterUnavailableLeadership())for _, allocatorGroup := range allocatorGroups { nextLeader, err := am.getNextLeaderID(allocatorGroup.dcLocation) if err != nil { continue } // nextLeader is not empty and isn't same with the server ID, resign the leader if nextLeader != 0 && nextLeader != serverID { log.Info("next leader key found, resign current leader", zap.Uint64("nextLeaderID", nextLeader)) am.ResetAllocatorGroup(allocatorGroup.dcLocation) }}
复制代码


}


发布于: 刚刚阅读数: 3
用户头像

TiDB 社区官网:https://tidb.net/ 2021.12.15 加入

TiDB 社区干货传送门是由 TiDB 社区中布道师组委会自发组织的 TiDB 社区优质内容对外宣布的栏目,旨在加深 TiDBer 之间的交流和学习。一起构建有爱、互助、共创共建的 TiDB 社区 https://tidb.net/

评论

发布
暂无评论
PD 关于tso 分配源代码分析_TiDB 底层架构_TiDB 社区干货传送门_InfoQ写作社区