原文来源:https://tidb.net/blog/ff481792
背景
上周末(2019 年 10 月 20 日)参加了 TUG 华南区在 Shopee 举办的第三期线下技术沙龙活动,以“不同业务场景下的数据库技术选型思路”来展开分享和探讨的。
其中刘春辉和洪超老师分享了 Shopee 的数据库技术选型思路,在分享中,大家对全局唯一 ID 还挺有疑惑的,那我们今天就来看看 TiDB 中全局唯一 ID 是怎么实现的吧。
文章最后附上活动信息
PD
Placement Driver (简称 PD) 是整个集群的管理模块,其主要工作有三个:一是存储集群的元信息(某个 Key 存储在哪个 TiKV 节点);二是对 TiKV 集群进行调度和负载均衡(如数据的迁移、Raft group leader 的迁移等);三是分配全局唯一且递增的事务 ID。
PD 的命名,来源于 Google Spanner - Spanner:Google’s Globally-Distributed Database [译文]。 Spanner 论文
TiDB 架构图(重点看 PD 跟其他的组件的关系):
分布式 ID
单调递增的 id 能干的事可多了,可以用来实现数据库的 MVCC,进而实现 ACID 事务,检测冲突什么的。在分布式系统中尤其重要,这个领域其实说白了就是不停在和不确定的 wall clock 作斗争… 如何用更弱的约束达到更强的一致性,我觉得单调递增的唯一 id 生成器是一个利器
作者:Ed Huang链接:https://www.zhihu.com/question/52823076/answer/132331104来源:知乎著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
复制代码
业界一般的技术方案
单机数据库 auto_increment;
单点批量 ID 生成服务;
idgo 是一个利用 MySQL 批量生成 ID 的 ID 生成器
Redis INCR INCRBY;
uuid/guid;
取当前毫秒数;
[Snowflake]()
利用 zookeeper 生成唯一 ID
MongoDB 的 ObjectId
扩展技术方案
百度 UidGenerator
UidGenerator 是 Java 实现的, 基于 Snowflake 算法的唯一 ID 生成器。UidGenerator 以组件形式工作在应用项目中, 支持自定义 workerId 位数和初始化策略, 从而适用于 docker 等虚拟化环境下实例自动重启、漂移等场景。 在实现上, UidGenerator 通过借用未来时间来解决 sequence 天然存在的并发限制 ; 采用 RingBuffer 来缓存已生成的 UID, 并行化 UID 的生产和消费, 同时对 CacheLine 补齐,避免了由 RingBuffer 带来的硬件级「伪共享」问题. 最终单机 QPS 可达 *600 万 *。
美团 Leaf
Leaf 提供两种生成的 ID 的方式(号段模式和 snowflake 模式),你可以同时开启两种方式,也可以指定开启某种方式(默认两种方式为关闭状态)。 在 4C8G VM 基础上,通过公司 RPC 方式调用,QPS 压测结果近 5w/s,TP999 1ms。
PD 分配 ID 的场景
我们接下来就来看看他们的源码实现吧。
生成 Cluster ID
pd 在 startServer() 的时候调用 initClusterID() 可以初始化 Cluster ID。
首先会从 etcd 中读取,取不到,则重新生成。
生成 Cluster ID 的算法:
// Generate a random cluster ID.ts := uint64(time.Now().Unix())clusterID := (ts << 32) + uint64(rand.Uint32())value := typeutil.Uint64ToBytes(clusterID)
// Multiple PDs may try to init the cluster ID at the same time.// Only one PD can commit this transaction, then other PDs can get// the committed cluster ID.resp, err := c.Txn(ctx). If(clientv3.Compare(clientv3.CreateRevision(key), "=", 0)). Then(clientv3.OpPut(key, string(value))). Else(clientv3.OpGet(key)). Commit()
// Txn commits ok, return the generated cluster ID.if resp.Succeeded { return clusterID, nil}
复制代码
解析分配唯一 ID 的过程
1. pd service 的定义
首先,我们可以通过 kvproto/proto/pdpb.proto 来查看 pd 所定义的服务有哪些。
service PD { ... rpc AllocID(AllocIDRequest) returns (AllocIDResponse) {} ...}
message AllocIDRequest { RequestHeader header = 1;}
message AllocIDResponse { ResponseHeader header = 1;
uint64 id = 2;}
message RequestHeader { // cluster_id is the ID of the cluster which be sent to. uint64 cluster_id = 1;}
message ResponseHeader { // cluster_id is the ID of the cluster which sent the response. uint64 cluster_id = 1; Error error = 2;}enum ErrorType { OK = 0; UNKNOWN = 1; NOT_BOOTSTRAPPED = 2; STORE_TOMBSTONE = 3; ALREADY_BOOTSTRAPPED = 4; INCOMPATIBLE_VERSION = 5; REGION_NOT_FOUND = 6;}
message Error { ErrorType type = 1; string message = 2;}
复制代码
如果你对 protobuf 还不太了解的话,可以点击前往,咱们这里就不做详细阐述了。
2. AllocID service 的实现。
怎么查找到 AllocID 的实现呢?
检索:AllocID,然后结合我们的 pdpb.proto service 定义,我们就可以对照出结果。
实现代码在:grpc_service.go#L141。
// AllocID implements gRPC PDServer.func (s *Server) AllocID(ctx context.Context, request *pdpb.AllocIDRequest) (*pdpb.AllocIDResponse, error) { // 1. 校验请求数据,这里是在 pdpb.proto 中封装成 RequestHeader if err := s.validateRequest(request.GetHeader()); err != nil { return nil, err }
// 2. 我们用一个 idAllocator 来分配 ID // We can use an allocator for all types ID allocation. id, err := s.idAllocator.Alloc() if err != nil { return nil, status.Errorf(codes.Unknown, err.Error()) }
// 3. 返回请求时的 header 和生成的 ID return &pdpb.AllocIDResponse{ Header: s.header(), Id: id, }, nil}
复制代码
3. 初始化 idAllocator
// Server is the pd server.type Server struct {
// Server services. // for id allocator, we can use one allocator for // store, region and peer, because we just need // a unique ID. idAllocator *id.AllocatorImpl}
func (s *Server) startServer() error {}
复制代码
idAllocator 是在 server.Run 的时候,调用 startServer 时初始化的。
func (s *Server) startServer() error { s.idAllocator = id.NewAllocatorImpl(s.client, s.rootPath, s.member.MemberValue())}
复制代码
4. 源码阅读 AllocatorImpl
我们根据上一步,可以看到在 pd/server/id/id.go 声明了一个 interface:
// Allocator is the allocator to generate unique ID.type Allocator interface { Alloc() (uint64, error)}// 步长 1000const allocStep = uint64(1000)
// AllocatorImpl 是对 Allocator 的实现,用于分配 ID// AllocatorImpl is used to allocate ID.type AllocatorImpl struct { mu sync.Mutex base uint64 end uint64
// etcd client client *clientv3.Client rootPath string member string}
// NewAllocatorImpl creates a new IDAllocator.func NewAllocatorImpl(client *clientv3.Client, rootPath string, member string) *AllocatorImpl { return &AllocatorImpl{client: client, rootPath: rootPath, member: member}}
复制代码
4. Alloc
基本逻辑是:
在 generate() 时会从 etcd 中载入之前持久化的已经发过的 id 作为起点。然后执行一次持久化,将起始 id + allocStep 保存下来。 [id, id + allocStep) 的区间就是缓存。客户端请求时,下发的 id 都是从这个缓存中取的。所以,对于高并发的应用,配置一个大的缓存区间可以获取更高的性能。比如将 allocStep 设为 5000,平均发出 5000 个号才需要持久化一次。
如果出现 pd 服务中断的话,重启启动时会从 etcd 中重新载入配置。(etcd 为高可用)
Alloc ID 的代码,加上注释 66 行。
// Alloc returns a new id.func (alloc *AllocatorImpl) Alloc() (uint64, error) { // 给分配增加锁,使用 defer 在函数结束时进行释放 alloc.mu.Lock() defer alloc.mu.Unlock()
// 第一次的时候 base 和 end 都为 0,所以会执行 generate() // 否则直接返回 alloc.base++ if alloc.base == alloc.end { end, err := alloc.generate() if err != nil { return 0, err }
alloc.end = end alloc.base = alloc.end - allocStep }
alloc.base++
return alloc.base, nil}
func (alloc *AllocatorImpl) generate() (uint64, error) { // 获取要给 XXPath 分配 ID 的 key key := alloc.getAllocIDPath() // 从 etcd 中读取 key 所对应的值 value, err := etcdutil.GetValue(alloc.client, key) if err != nil { return 0, err }
var ( cmp clientv3.Cmp end uint64 )
if value == nil { // create the key cmp = clientv3.Compare(clientv3.CreateRevision(key), "=", 0) } else { // update the key end, err = typeutil.BytesToUint64(value) if err != nil { return 0, err }
cmp = clientv3.Compare(clientv3.Value(key), "=", string(value)) } // 如果以前不存在,则 end 被赋值为 1000(分配的步长),否则,就是原有的值+步长 end += allocStep // 将 uint64 转为 bytes value = typeutil.Uint64ToBytes(end) // 从 etcd 获取一个事务,然后将值提交到 etcd 中 txn := kv.NewSlowLogTxn(alloc.client) leaderPath := path.Join(alloc.rootPath, "leader") t := txn.If(append([]clientv3.Cmp{cmp}, clientv3.Compare(clientv3.Value(leaderPath), "=", alloc.member))...) resp, err := t.Then(clientv3.OpPut(key, string(value))).Commit() if err != nil { return 0, err } if !resp.Succeeded { return 0, errors.New("generate id failed, we may not leader") }
log.Info("idAllocator allocates a new id", zap.Uint64("alloc-id", end)) idGauge.WithLabelValues("idalloc").Set(float64(end)) return end, nil}
func (alloc *AllocatorImpl) getAllocIDPath() string { return path.Join(alloc.rootPath, "alloc_id")}
复制代码
etcd 中事务是原子执行的,只支持 if … then … else … 这种表达,能实现一些有意思的场景。
其他的一些 Alloc 调用
pd/server/cluster.go#AllocPeer()
pd/server/cluster_worker.go#handleAskSplit()
pd/server/grpc_service.go#AllocID()
pd/table/namespace_classifier.go#CreateNamespace()
主要的调用逻辑代码:
newRegionID, err := c.s.idAllocator.Alloc()
...
peerIDs := make([]uint64, len(request.Region.Peers))for i := 0; i < len(peerIDs); i++ { if peerIDs[i], err = c.s.idAllocator.Alloc(); err != nil { return nil, err }}
复制代码
相关统计
pd 在启动时,调用 metricutil.Push(&cfg.Metric) 即可开启 prometheus 的上报客户端,默认情况下:每 15 秒上报一次。
// prometheusPushClient pushs metrics to Prometheus Pushgateway.func prometheusPushClient(job, addr string, interval time.Duration) { for { err := push.FromGatherer( job, push.HostnameGroupingKey(), addr, prometheus.DefaultGatherer, ) if err != nil { log.Error("could not push metrics to Prometheus Pushgateway", zap.Error(err)) }
time.Sleep(interval) }}
// Push metircs in background.func Push(cfg *MetricConfig) { if cfg.PushInterval.Duration == zeroDuration || len(cfg.PushAddress) == 0 { log.Info("disable Prometheus push client") return }
log.Info("start Prometheus push client")
interval := cfg.PushInterval.Duration go prometheusPushClient(cfg.PushJob, cfg.PushAddress, interval)}
复制代码
每成功重新 generate() 一次的时候,就会上报一次 prometheus。
idGauge.WithLabelValues("idalloc").Set(float64(end))
高可用容灾
可以参考美团《Leaf 高可用容灾》
可以参见有赞《如何做一个靠谱的发号器》
参考资料
TiDB 整体架构
Placement Driver 功能介绍
介绍 PD Google Slides
TiDB 中的 TSO
常见分布式全局唯一 ID 生成策略及算法的对比
阿里 P8 架构师谈:分布式系统全局唯一 ID 简介、特点、5 种生成方式
全局唯一 ID 在分布式系统中用来做什么用?
etcd v3 客户端用法
如何做一个靠谱的发号器
高并发分布式系统唯一 ID 生成
微信序列号生成器架构设计及演变
etcd 性能表现(官方)
附 TUG 华南区 Shopee 深圳第三期线下活动信息
活动行
不同业务场景下的数据库技术选型思路, 活动时间, 预约报名, 活动地址, 活动详情, 活动嘉宾, 主办方等
活动的详细安排
后续 https://asktug.com 也会将活动实录整理好后放出来,有兴趣的小伙伴们可以关注。
阅读原文:茶歇驿站 TiDB 中的全局唯一 ID
评论