PD 关于 tso 分配源代码分析
作者: 薛港 - 移动云原文来源:https://tidb.net/blog/89aac039
tsoAllocatorManager 分析
TSO 有一个分配管理模块 tsoAllocatorManager ,这个模块用于管理所有的 TSO 分配,当前 PD 支持两类 TSO,一类是 global TSO 分配 用于保证全局事务线性增长。 还有一种是 DC-LOCATION TSO 分配器,用于保证特定 DC 内的事务线性增长。
tsoAllocatorManager 模块的初始化以及定义:
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.member, s.rootPath, s.cfg.TSOSaveInterval.Duration, s.cfg.TSOUpdatePhysicalInterval.Duration,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },
s.GetTLSConfig())
// NewAllocatorManager creates a new TSO Allocator Manager.
func NewAllocatorManager(
m *member.Member,
rootPath string,
saveInterval time.Duration,
updatePhysicalInterval time.Duration,
maxResetTSGap func() time.Duration,
sc *grpcutil.TLSConfig,
) *AllocatorManager {
allocatorManager := &AllocatorManager{
member: m,
rootPath: rootPath,
saveInterval: saveInterval,
updatePhysicalInterval: updatePhysicalInterval,
maxResetTSGap: maxResetTSGap,
securityConfig: sc,
}
allocatorManager.mu.allocatorGroups = make(map[string]*allocatorGroup)
allocatorManager.mu.clusterDCLocations = make(map[string]*DCLocationInfo)
allocatorManager.localAllocatorConn.clientConns = make(map[string]*grpc.ClientConn)
return allocatorManager
}
// AllocatorManager is used to manage the TSO Allocators a PD server holds.
// It is in charge of maintaining TSO allocators’ leadership, checking election
// priority, and forwarding TSO allocation requests to correct TSO Allocators.
type AllocatorManager struct {
mu struct {
sync.RWMutex
// There are two kinds of TSO Allocators:
// 1. Global TSO Allocator, as a global single point to allocate
// TSO for global transactions, such as cross-region cases.
// 2. Local TSO Allocator, servers for DC-level transactions.
// dc-location/global (string) -> TSO Allocator
allocatorGroups map[string]*allocatorGroup
clusterDCLocations map[string]*DCLocationInfo
// The max suffix sign we have so far, it will be used to calculate
// the number of suffix bits we need in the TSO logical part.
maxSuffix int32
}
wg sync.WaitGroup
// for election use
member *member.Member
// TSO config
rootPath string
saveInterval time.Duration
updatePhysicalInterval time.Duration
maxResetTSGap func() time.Duration
securityConfig *grpcutil.TLSConfig
// for gRPC use
localAllocatorConn struct {
sync.RWMutex
clientConns map[string]*grpc.ClientConn
}
}
allocatorGroups map[string]*allocatorGroup, 当前 PD 支持两类 TSO 分配,一个是全局 TSO 分配器,别一个是 DC-Location 级别的 TSO 分配器,allocatorGroups 的含义:
key: 对应 DC-location,一个 global,别一类是不同的 dc-location 信息
allocatorGroup: 对应具体的 TSO 分配器,
leadership: 用于 tso 分配器的选主,如果是 gloal 分配器,
leadership 就是 pd leadership 定义,如果是 dc-location 的 TSO 分
配器,leadership 就是 dc-location 的选主,用于在 DC 级别内产生
线性 TSO 。
allocator:用于 TSO 的分配,两类对象,对于 global TSO 分配,对象是
GlobalTSOAllocator, 对于 DC TSO 分配,对象是 LocalTSOAllocator
type allocatorGroup struct {
dcLocation string
// For the Global TSO Allocator, leadership is a PD leader’s
// leadership, and for the Local TSO Allocator, leadership
// is a DC-level certificate to allow an allocator to generate
// TSO for local transactions in its DC.
leadership *election.Leadership
allocator Allocator
}
clusterDCLocations map[string]*DCLocationInfo:
Key: 对应 DC-LOATION
DCLocationInfo: 记录这个 DC 的信息,例如这个 DC 对应的 suffix,以及这个 DC 内所有的
server ID
// DCLocationInfo is used to record some dc-location related info,
// such as suffix sign and server IDs in this dc-location.
type DCLocationInfo struct {
// dc-location/global (string) -> Member IDs
ServerIDs []uint64
// dc-location (string) -> Suffix sign. It is collected and maintained by the PD leader.
Suffix int32
}
为什么每个 dc-location 元信息有一个 suffix,他的目的是什么?
因为每个 dc 内的 tso 分配是完全独立的分配的,所以多个 TSO DC 级别的 TSO 分配的值有可
能相等,所以让每个 DC-LOCATION 对应一个后缀(不同 DC 的后缀值不一样),这个后缀
会放在 TSO 的尾部,用于保证每个 dc-location tso 值不相同. 为了保证每个 DC 的 suffix 不
一样,所有 DC 对应的 suffix 的值由 PD leader 维护,并保存在 ETCD 持久化
我们有三个 dc:dc-1,dc-2,dc-3.suffix 的位数由常量 GetSuffixBits 函数决定(原理也很简单,当前 sufiix 最大值是多少,然后算出变个最大值对应的 bit 位),这个例子里,suffix 对应 8 个 bit 位。对于 gloabl,所以为 suffix 为 0,对于 dc-1,sufuffix 是 1;dc-2,suffix 是 2, dc-3, 对应的 suffix 是 3. 所以他们的 TSO(一共 18 位)对应如下值
// global: xxxxxxxxxx00000000
// dc-1: xxxxxxxxxx00000001
// dc-2: xxxxxxxxxx00000010
// dc-3: xxxxxxxxxx00000011
// GetSuffixBits calculates the bits of suffix sign
// by the max number of suffix so far,
// which will be used in the TSO logical part.
func (am *AllocatorManager) GetSuffixBits() int {
am.mu.RLock()
defer am.mu.RUnlock()
return CalSuffixBits(am.mu.maxSuffix)
}
TSO 具体分配模块分析
GlobalTSOAllocator
// GlobalTSOAllocator is the global single point TSO allocator.
type GlobalTSOAllocator struct {
// leadership is used to check the current PD server’s leadership
// to determine whether a TSO request could be processed.
leadership *election.Leadership
timestampOracle *timestampOracle
}
// timestampOracle is used to maintain the logic of TSO.
type timestampOracle struct {
client *clientv3.Client
rootPath string
// TODO: remove saveInterval
saveInterval time.Duration
updatePhysicalInterval time.Duration
maxResetTSGap func() time.Duration
// tso info stored in the memory
tsoMux struct {
sync.RWMutex
tso *tsoObject
}
// last timestamp window stored in etcd
lastSavedTime atomic.Value // stored as time.Time
suffix int
dcLocation string
}
Global tso 分配核心算法分析:
这个函数用于产生 TSO 值,分二类情况,
1.cluster 没有管理 dc tso 分配器,整个 cluster 只有一个全局的 tso 分配器,那么 TSO 分配很简
单,直接调用 gta.timestampOracle.getTS(gta.leadership, count, 0)
2. 如果 cluster 同时管理多个 DC 级别的 TSO 控制器,TSO 的分配会稍显麻烦一些,也就是他的最
大值,基于所有的 DC-LOCATION 内的 TSO 最大值,所以要多次和所有的 DC TSO LEADER
通信,导致性能下降明显
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (gta *GlobalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) {
}
如果 global 没有 DC loccation,那么直接调用 gta.timestampOracle.getTS(gta.leadership, count, 0) 得到 tso 值,逻辑很简单,
// getTS is used to get a timestamp.
func (t *timestampOracle) getTS(leadership *election.Leadership, count uint32, suffixBits int) (pdpb.Timestamp, error) {
var resp pdpb.Timestamp
}
// generateTSO will add the TSO’s logical part with the given count and returns the new TSO result.
func (t *timestampOracle) generateTSO(count int64, suffixBits int) (physical int64, logical int64) {
t.tsoMux.Lock()
defer t.tsoMux.Unlock()
if t.tsoMux.tso == nil {
return 0, 0
}
physical = t.tsoMux.tso.physical.UnixNano() / int64(time.Millisecond)
t.tsoMux.tso.logical += count
logical = t.tsoMux.tso.logical
if suffixBits > 0 && t.suffix >= 0 {
logical = t.differentiateLogical(logical, suffixBits)
}
return physical, logical
}
LocalTSOAllocator
TSO 分配算法在 lta.timestampOracle.getTS(lta.leadership, count, lta.allocatorManager.GetSuffixBits() 里,这个函数之前分析过
// LocalTSOAllocator is the DC-level local TSO allocator,
// which is only used to allocate TSO in one DC each.
// One PD server may hold multiple Local TSO Allocators.
type LocalTSOAllocator struct {
allocatorManager *AllocatorManager
// leadership is used to campaign the corresponding DC’s Local TSO Allocator.
leadership *election.Leadership
timestampOracle *timestampOracle
// for election use, notice that the leadership that member holds is
// the leadership for PD leader. Local TSO Allocator’s leadership is for the
// election of Local TSO Allocator leader among several PD servers and
// Local TSO Allocator only use member’s some etcd and pbpd.Member info.
// So it’s not conflicted.
rootPath string
allocatorLeader atomic.Value // stored as *pdpb.Member
}
// NewLocalTSOAllocator creates a new local TSO allocator.
func NewLocalTSOAllocator(
am *AllocatorManager,
leadership *election.Leadership,
dcLocation string,
) Allocator {
return &LocalTSOAllocator{
allocatorManager: am,
leadership: leadership,
timestampOracle: ×tampOracle{
client: leadership.GetClient(),
rootPath: leadership.GetLeaderKey(),
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
maxResetTSGap: am.maxResetTSGap,
dcLocation: dcLocation,
},
rootPath: leadership.GetLeaderKey(),
}
}
// GenerateTSO is used to generate a given number of TSOs.
// Make sure you have initialized the TSO allocator before calling.
func (lta *LocalTSOAllocator) GenerateTSO(count uint32) (pdpb.Timestamp, error) {
if !lta.leadership.Check() {
return pdpb.Timestamp{}, errs.ErrGenerateTimestamp.FastGenByArgs(fmt.Sprintf(“requested pd %s of %s allocator”, errs.NotLeaderErr, lta.timestampOracle.dcLocation))
}
return lta.timestampOracle.getTS(lta.leadership, count, lta.allocatorManager.GetSuffixBits())
}
TSO 核心流程分析
1. 创建 tso 分配管理组件
s.tsoAllocatorManager = tso.NewAllocatorManager(
s.member, s.rootPath, s.cfg.TSOSaveInterval.Duration, s.cfg.TSOUpdatePhysicalInterval.Duration,
func() time.Duration { return s.persistOptions.GetMaxResetTSGap() },
s.GetTLSConfig())
2. 配置 TSO global 组件. 也就是设置一个数据元素,am.mu.allocatorGroups[dcLocation]
key 是 global. value 是对象 allocatorGroup 用于后期 gloabl tso 分配
s.tsoAllocatorManager.SetUpAllocator(ctx, tso.GlobalDCLocation, s.member.GetLeadership())
func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, dcLocation string, leadership *election.Leadership) {
}
如果 PD 的 zone lable 非空,并且设置支持 dc-location tso 分配的话,配置 dc-location TSO 分配.
SetLocalTSOConfig 函数的目的很简单,保存这个 pd 的 dc-location 到 etcd. 方便后台服务线程查询 ETCD,配置 dc-location 的 tso 分配组件
key:am.member.GetDCLocationPath(serverID), 包含这个 server ID
value:dcLocation. 表示这个 PD 对应的 location
ClusterDCLocationChecker 函数分析:
1. 获取所有的保存在 ETCD 里的 DC-LOCATION 以及属于这个 DC 的 serverid , 更新
am.mu.clusterDCLocations[dcLocation]
2. 如果 pd 是 leader, 更新每个 dc leader 对应的 suffix, 以及更新所有 dc 里最大的 suffix
3. 如果 pd 是 follow, 只要更新从 etcd 得到最大的 suffix, 然后更新 am.mu.maxSuffix = maxSuffix
if zone, exist := s.cfg.Labels[config.ZoneLabel]; exist && zone != “” && s.cfg.EnableLocalTSO {
if err = s.tsoAllocatorManager.SetLocalTSOConfig(zone); err != nil {
return err
}
}
// SetLocalTSOConfig receives the zone label of this PD server and write it into etcd as dc-location
// to make the whole cluster know the DC-level topology for later Local TSO Allocator campaign.
func (am *AllocatorManager) SetLocalTSOConfig(dcLocation string) error {
serverName := am.member.Member().Name
serverID := am.member.ID()
后台服务管理
Server 服务会启动 tso 后台服务管理线程,
func (s *Server) startServerLoop(ctx context.Context) {
go s.tsoAllocatorLoop()
}
// tsoAllocatorLoop is used to run the TSO Allocator updating daemon.
func (s *Server) tsoAllocatorLoop() {
s.tsoAllocatorManager.AllocatorDaemon(ctx)
log.Info(“server is closed, exit allocator loop”)
}
// AllocatorDaemon is used to update every allocator’s TSO and check whether we have
// any new local allocator that needs to be set up.
func (am *AllocatorManager) AllocatorDaemon(serverCtx context.Context) {
tsTicker := time.NewTicker(am.updatePhysicalInterval)
am.allocatorUpdater() 分析,
这个函数每隔 am.updatePhysicalInterval 调用这个函数. 这个函数很简单,针对每一个 global 和所有的 dc-location 的 TSO 分配器, 调用 UpdateTSO(),检察内存中 TSO 使用情况,触发保存新的 TSO 值 到 ETCD 里面。 例如每次存储当前时间 +3 秒后 TSO 值,如果后面的时间接近 ETCD 保存的值时,更新存储时间(再加 3S)。 别外如果逻辑时间快用完了,增加内存里对应的物理时间。
UpdateTSO() 详细分析, 这个函数分二类,global 以及 DC 级别:
global 对应处理函数:gta.timestampOracle.UpdateTimestamp(gta.leadership)
// Update the Local TSO Allocator leaders TSO in memory concurrently.
func (am *AllocatorManager) allocatorUpdater() {
// updateAllocator is used to update the allocator in the group.
func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {
ag.allocator.UpdateTSO();
}
Globale UpdateTSO
// UpdateTSO is used to update the TSO in memory and the time window in etcd.
func (gta *GlobalTSOAllocator) UpdateTSO() error {
return gta.timestampOracle.UpdateTimestamp(gta.leadership)
}
DC update TSO
// UpdateTSO is used to update the TSO in memory and the time window in etcd
// for all local TSO allocators this PD server hold.
func (lta *LocalTSOAllocator) UpdateTSO() error {
return lta.timestampOracle.UpdateTimestamp(lta.leadership)
}
…
1. 得到当前 TSO 物理以及逻辑时间
2. 如果当前时间和上次物理时间差值大于 updateTimestampGuard,那么物理时间等于 now
3. 如果逻辑时间超过最大值的一半,物理时间 +1m
4. 如果上次 ETCD 保存的时间和当前时间相差 updateTimestampGuard 。那么更新 ETCD 存储时间,
func (t *timestampOracle) UpdateTimestamp(leadership *election.Leadership) error {
prevPhysical, prevLogical := t.getTSO()
}
am.allocatorPatroller(serverCtx) 分析,
这个函数每隔 patrolStep 时间调用这个函数
如果有新的 dc 加入,配置这个 DC 的 TSO 分配器。如果有 DC 退出,也相应删除这个 DC TSO 分配器。原理也很简单,前面我们也描述,如果有新的 PD 启动,并且配置 DC TSO 分配,我们会在 ETCD 保存这个 KEY-VALUE(KEY 是 server id,value 是 dc-location)
如果发现新的 DC 加入,调用 am.SetUpAllocator 配置 dc tso 分配器
所以下面重点分析 am.SetUpAllocator(前面我们也分析过这个函数,但是前面重点分析是的 global 的配置)。注意 leadership 没有重用 pd 的 leadship,而是创建一个新的 leadership,path 等于 am.getAllocatorPath(dcLocation) 表示这个 dc-location 范围内的 leaer 选主
am.SetUpAllocator 创建这个 dc 对应的 allocatorGroup,然后 DC 内的 leader 选主。
am.allocatorLeaderLoop(parentCtx, localTSOAllocator)
// Check if we have any new dc-location configured, if yes,
// then set up the corresponding local allocator.
func (am *AllocatorManager) allocatorPatroller(serverCtx context.Context) {
// Collect all dc-locations
dcLocations := am.GetClusterDCLocations()
// Get all Local TSO Allocators
allocatorGroups := am.getAllocatorGroups(FilterDCLocation(GlobalDCLocation))
// Set up the new one
for dcLocation := range dcLocations {
if slice.NoneOf(allocatorGroups, func(i int) bool {
return allocatorGroups[i].dcLocation == dcLocation
}) {
am.SetUpAllocator(serverCtx, dcLocation, election.NewLeadership(
am.member.Client(),
am.getAllocatorPath(dcLocation),
fmt.Sprintf(“%s local allocator leader election”, dcLocation),
))
}
}
// Clean up the unused one
for _, ag := range allocatorGroups {
if _, exist := dcLocations[ag.dcLocation]; !exist {
am.deleteAllocatorGroup(ag.dcLocation)
}
}
}
// SetUpAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon.
// One TSO Allocator should only be set once, and may be initialized and reset multiple times depending on the election.
func (am *AllocatorManager) SetUpAllocator(parentCtx context.Context, dcLocation string, leadership *election.Leadership) {
var allocator Allocator
}
allocatorLeaderLoop
1. 检察 leader 是否存在,如果 leader 存在,那么 watch 这个 leader,直到 leader 无效
2. 检察是否有 nextid key, 这个值非常有意思,如果有 nextkey, 那么下面参与选主的 pd 一定要等于 nextkey, 不然退出选主(一般用于 dc tso leadr 切换操作)
3. 给 PD leader 发送 dc 信息,让 leader 知道有这个 DC TSO
4.am.campaignAllocatorLeader 触发选主
4.1 调用 allocator.CampaignAllocatorLeader(defaultAllocatorLeaderLease, cmps…) 往 ETCD 写选主信息,如果成功写入,表示选主成功
4.2 调用 go allocator.KeepAllocatorLeader(ctx) 持续续约 leaer key, 保证 leader 有效
4.3 调用 allocator.Initialize(int(dcLocationInfo.Suffix)) 初始化 local tso 内存值, 根据 ETCD 上次保存的值,以及当前时间算出内存 TSO 值
// similar logic with leaderLoop in server/server.go
func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *LocalTSOAllocator) {
}
func (am *AllocatorManager) campaignAllocatorLeader(
loopCtx context.Context,
allocator *LocalTSOAllocator,
dcLocationInfo *pdpb.GetDCLocationInfoResponse,
isNextLeader bool,
) {
}
am.ClusterDCLocationChecker()
这个函数每隔 PriorityCheck 时间调用这个函数, 这个函数前面分析过,原理也很简单,同步 ETCD 和同步中关于 DCLOCATION 的信息,该更新更新,该删除删除
// ClusterDCLocationChecker collects all dc-locations of a cluster, computes some related info
// and stores them into the DCLocationInfo, then finally writes them into am.mu.clusterDCLocations.
func (am *AllocatorManager) ClusterDCLocationChecker() {
newClusterDCLocations, err := am.GetClusterDCLocationsFromEtcd()
am.PriorityChecker()
这个函数每隔 PriorityCheck 时间调用这个函数
这个函数很简单,检察 local tso 分配器选主优化级,如果我们想选取 dc-1 的 local tso 的主,优化级规则如下,
1. 有 lable “dc-location=“dc-1” 具有最高的优秀级被选为主
2. 如果所有具有“dc-location=“dc-1” 的 pd 都 down 了,才轮到其它 dc 的 pd 被选为主
例如 DC-1 发现 dc-1 的 leader 所属于的 dc-location 不是 DC-1, 那么触发 leader 切换,在 ETCD 里创建 nextkey,告诉选主,我期望下一个 dc local tso 分配器的主是我自已
func (am *AllocatorManager) PriorityChecker() {
serverID := am.member.ID()
myServerDCLocation, err := am.getServerDCLocation(serverID)
}
版权声明: 本文为 InfoQ 作者【TiDB 社区干货传送门】的原创文章。
原文链接:【http://xie.infoq.cn/article/a169f560624951aa0dc184daf】。文章转载请联系作者。
评论