PD 关于 ID 分配的源码分析
作者: 薛港 - 移动云原文来源:https://tidb.net/blog/e2f56eac
ID 分配模块初始化
ID 模块的实例化
s.idAllocator = id.NewAllocator(s.client, s.rootPath, s.member.MemberValue())
// NewAllocator creates a new ID Allocator.
func NewAllocator(client *clientv3.Client, rootPath string, member string) Allocator {
return &allocatorImpl{client: client, rootPath: rootPath, member: member}
}
ID 分配模块的定义,
// allocatorImpl is used to allocate ID.
type allocatorImpl struct {
mu sync.Mutex
base uint64
end uint64
client *clientv3.Client
rootPath string
member string
}
ID 模块核心算法就是分配 ID,原理非常简单,内存维护一个 id 范围[base,end). etcd 会保存上次分配的 end 大小(主要防止 PD 异常导致 ID 分配不线性唯一)。 每次调用函数 Alloc()分配 ID 的时候,流程如下:
1. 如果 base 不等于 end,. 说明上次预分配还有多余,base++ 得到的值, 返回当作
分配的 ID
2. 如果 base 等于 end, 说明上次预分配的所有的 ID 已经使用完,或者是 PD 重新启动(不管上次还有多少 ID 没用,我们依然从上次的 end 重新批量分配一些 ID),无论那种情况,我们重新基于上次的 end,批量分配一批 ID:
2.1. 调用 etcd 查看上次 end 值是多少,key 等于 alloc.getAllocIDPath()
2.2. 上次的 end 要么为 0(pd 第一次启动),pd 不为 0(重新启动), 取出 end 值,这个 end 值 作为下次分配 ID 起点,并计算新的 END 值 end += allocStep,allocStep 表示每次分配的 ID 数目
2.3. 利用 ETCD 的事务机制,保存 END 值到 ETCD
// Alloc returns a new id.
func (alloc *allocatorImpl) Alloc() (uint64, error) {
if alloc.base == alloc.end {
if err := alloc.rebaseLocked(); err != nil {
return 0, err
}
}
alloc.base++
return alloc.base, nil
}
// Rebase resets the base for the allocator from the persistent window boundary,
// which also resets the end of the allocator. (base, end) is the range that can
// be allocated in memory.
func (alloc *allocatorImpl) Rebase() error {
return alloc.rebaseLocked()
}
func (alloc *allocatorImpl) rebaseLocked() error {
key := alloc.getAllocIDPath()
value, err := etcdutil.GetValue(alloc.client, key)
if value == nil {
// create the key
cmp = clientv3.Compare(clientv3.CreateRevision(key), “=”, 0)
} else {
// update the key
end, err = typeutil.BytesToUint64(value)
cmp = clientv3.Compare(clientv3.Value(key), “=”, string(value))
}
end += allocStep
value = typeutil.Uint64ToBytes(end)
txn := kv.NewSlowLogTxn(alloc.client)
leaderPath := path.Join(alloc.rootPath, “leader”)
t := txn.If(append([]clientv3.Cmp{cmp}, clientv3.Compare(clientv3.Value(leaderPath), “=”, alloc.member))…)
resp, err := t.Then(clientv3.OpPut(key, string(value))).Commit()
alloc.end = end
alloc.base = end - allocStep
return nil
}
版权声明: 本文为 InfoQ 作者【TiDB 社区干货传送门】的原创文章。
原文链接:【http://xie.infoq.cn/article/22412c274b2ed46549407129d】。文章转载请联系作者。
评论