token 选取流程
3 个 ingester 时的 token
比如有 3 个 ingester(下文简称 in),每个 ingester 产生如下 512 个不同的 tokens
4222218899 4244974091 4247636007 4260587700 4270043296 4278792571
复制代码
distributor 对 ingester 产生的所有 token 进行排序
0(in1),5(in2),9(in3),20(in2),30(in3),100(in1)
复制代码
有一个 series 经过 hash 得到 12,找到第一个大于 12 的 token 在 9-20 的区间,所以这个 series 被写到 in2。假设现在新增一个 ingester 节点,in4,他也产生 512 个 token,重新加入上述排序队列,上述 token 队列可能变成
0(in1),5(in2),7(in4),9(in3),14(in4),20(in2),30(in3),50(in4),100(in1)
复制代码
上述的 series 发现落在 9-14 区间,所以后续的 series 数据,都被写到 in4。
4 个 ingester 时的 token
ring 工作流程
初始化 KV store
type KV struct {
// Protects access to memberlist and broadcasts fields.
initWG sync.WaitGroup
memberlist *memberlist.Memberlist
broadcasts *memberlist.TransmitLimitedQueue
// KV Store.
storeMu sync.Mutex
store map[string]valueDesc
// Codec registry
codecs map[string]codec.Codec
// Key watchers
watchersMu sync.Mutex
watchers map[string][]chan string
prefixWatchers map[string][]chan string
}
复制代码
// Client implements kv.Client interface, by using memberlist.KV
type Client struct {
kv *KV // reference to singleton memberlist-based KV
codec codec.Codec
}
复制代码
初始化一个基于 memberlist 的 KV 存储,其他类型比如 etcd/consul 也有相应 Client 的 interface。
1)store 表示实际传递的消息体,如果需要流转多种类型消息体,可以用 map 进行映射。
2)codecs 定义好 memberlist 中转发数据的解码工具 codec。
// NewDesc returns an empty ring.Desc
func NewDesc() *Desc {
return &Desc{
Ingesters: map[string]InstanceDesc{},
}
}
复制代码
这里表示,ingester 的 ring,进行消息传递时,对应的 pb 结构体是 InstanceDesc。3)watchers 动态监听 key 的数据集。如下代码实例一个 memberlist 子节点:
case "memberlist":
kv, err := cfg.MemberlistKV()
if err != nil {
return nil, err
}
client, err = memberlist.NewClient(kv, codec)
if err != nil {
return nil, err
}
复制代码
定义 ring 结构体
type Lifecycler struct {
KVStore kv.Client
// These values are initialised at startup, and never change
ID string
Addr string
RingName string
RingKey string
Zone string
state InstanceState
tokens Tokens
复制代码
初始化一个基于 memberlist 的 Lifecycer Ring 结构体,
KVStore 即是前文的 memberlist.NewClient,
tokens 记录当前 Ring 中的内容,由 KVStore 经过 codecs 解析后获取。
ringDesc = in.(*Desc)
myTokens, takenTokens := ringDesc.TokensFor(i.ID)
复制代码
ingester 节点加入 ring
1)ingester 注册时,初始化 KV store 和 Ring 子节点,
2)首次加入 memlister 时,携带空的 token,为了加快 ring 子节点间快速探测及加入
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState(), registeredAt)
复制代码
3)当 tsdb 文件加载完成后,再生成 token,重新刷新 ring 的信息
myTokens, takenTokens := ringDesc.TokensFor(i.ID)
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt())
复制代码
func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state InstanceState, registeredAt time.Time) InstanceDesc {
...
ingester := InstanceDesc{
Addr: addr,
Timestamp: time.Now().Unix(),
RegisteredTimestamp: registeredTimestamp,
State: state,
Tokens: tokens,
Zone: zone,
}
d.Ingesters[id] = ingester
复制代码
disrtibutor 获取 ingester 的 token 信息
disrtibutor 启动时,也会初始化一个 memberlist 的 client,使用 WatchKey 来探测 Ingester 的 token 变化。
r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool {
if value == nil {
level.Info(r.logger).Log("msg", "ring doesn't exist in KV store yet")
return true
}
r.updateRingState(value.(*Desc))
return true
})
复制代码
当有 series 进入 Distributor 时,会使用最新的 token 集合,来判断将此 token 推到哪一个 Ingester 进行后续处理。
func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest)
...
ring.DoBatch
...
r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones[:0])
复制代码
func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) {
start = searchToken(r.ringTokens, key)
复制代码
更多文章请关注我们公众号【Grafana爱好者】
评论