token 选取流程
3 个 ingester 时的 token
比如有 3 个 ingester(下文简称 in),每个 ingester 产生如下 512 个不同的 tokens
4222218899 4244974091 4247636007 4260587700 4270043296 4278792571
复制代码
distributor 对 ingester 产生的所有 token 进行排序
0(in1),5(in2),9(in3),20(in2),30(in3),100(in1)
复制代码
有一个 series 经过 hash 得到 12,找到第一个大于 12 的 token 在 9-20 的区间,所以这个 series 被写到 in2。假设现在新增一个 ingester 节点,in4,他也产生 512 个 token,重新加入上述排序队列,上述 token 队列可能变成
0(in1),5(in2),7(in4),9(in3),14(in4),20(in2),30(in3),50(in4),100(in1)
复制代码
上述的 series 发现落在 9-14 区间,所以后续的 series 数据,都被写到 in4。
4 个 ingester 时的 token
ring 工作流程
初始化 KV store
type KV struct { // Protects access to memberlist and broadcasts fields. initWG sync.WaitGroup memberlist *memberlist.Memberlist broadcasts *memberlist.TransmitLimitedQueue
// KV Store. storeMu sync.Mutex store map[string]valueDesc
// Codec registry codecs map[string]codec.Codec
// Key watchers watchersMu sync.Mutex watchers map[string][]chan string prefixWatchers map[string][]chan string }
复制代码
// Client implements kv.Client interface, by using memberlist.KVtype Client struct { kv *KV // reference to singleton memberlist-based KV codec codec.Codec}
复制代码
初始化一个基于 memberlist 的 KV 存储,其他类型比如 etcd/consul 也有相应 Client 的 interface。
1)store 表示实际传递的消息体,如果需要流转多种类型消息体,可以用 map 进行映射。
2)codecs 定义好 memberlist 中转发数据的解码工具 codec。
// NewDesc returns an empty ring.Descfunc NewDesc() *Desc { return &Desc{ Ingesters: map[string]InstanceDesc{}, }}
复制代码
这里表示,ingester 的 ring,进行消息传递时,对应的 pb 结构体是 InstanceDesc。3)watchers 动态监听 key 的数据集。如下代码实例一个 memberlist 子节点:
case "memberlist": kv, err := cfg.MemberlistKV() if err != nil { return nil, err } client, err = memberlist.NewClient(kv, codec) if err != nil { return nil, err }
复制代码
定义 ring 结构体
type Lifecycler struct { KVStore kv.Client
// These values are initialised at startup, and never change ID string Addr string RingName string RingKey string Zone string
state InstanceState tokens Tokens
复制代码
初始化一个基于 memberlist 的 Lifecycer Ring 结构体,
KVStore 即是前文的 memberlist.NewClient,
tokens 记录当前 Ring 中的内容,由 KVStore 经过 codecs 解析后获取。
ringDesc = in.(*Desc)myTokens, takenTokens := ringDesc.TokensFor(i.ID)
复制代码
ingester 节点加入 ring
1)ingester 注册时,初始化 KV store 和 Ring 子节点,
2)首次加入 memlister 时,携带空的 token,为了加快 ring 子节点间快速探测及加入
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState(), registeredAt)
复制代码
3)当 tsdb 文件加载完成后,再生成 token,重新刷新 ring 的信息
myTokens, takenTokens := ringDesc.TokensFor(i.ID)ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt())
复制代码
func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state InstanceState, registeredAt time.Time) InstanceDesc {...ingester := InstanceDesc{ Addr: addr, Timestamp: time.Now().Unix(), RegisteredTimestamp: registeredTimestamp, State: state, Tokens: tokens, Zone: zone, } d.Ingesters[id] = ingester
复制代码
disrtibutor 获取 ingester 的 token 信息
disrtibutor 启动时,也会初始化一个 memberlist 的 client,使用 WatchKey 来探测 Ingester 的 token 变化。
r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool { if value == nil { level.Info(r.logger).Log("msg", "ring doesn't exist in KV store yet") return true } r.updateRingState(value.(*Desc)) return true})
复制代码
当有 series 进入 Distributor 时,会使用最新的 token 集合,来判断将此 token 推到哪一个 Ingester 进行后续处理。
func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) ... ring.DoBatch ... r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones[:0])
复制代码
func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) { start = searchToken(r.ringTokens, key)
复制代码
更多文章请关注我们公众号【Grafana爱好者】
评论