写点什么

cortex ingester 基于 hash ring 进行 token 管理

作者:jupiter
  • 2023-03-17
    广东
  • 本文字数:2036 字

    阅读完需:约 7 分钟

token 选取流程

3 个 ingester 时的 token

比如有 3 个 ingester(下文简称 in),每个 ingester 产生如下 512 个不同的 tokens

4222218899 4244974091 4247636007 4260587700 4270043296 4278792571
复制代码

distributor 对 ingester 产生的所有 token 进行排序

0(in1),5(in2),9(in3),20(in2),30(in3),100(in1)
复制代码

有一个 series 经过 hash 得到 12,找到第一个大于 12 的 token 在 9-20 的区间,所以这个 series 被写到 in2。假设现在新增一个 ingester 节点,in4,他也产生 512 个 token,重新加入上述排序队列,上述 token 队列可能变成

0(in1),5(in2),7(in4),9(in3),14(in4),20(in2),30(in3),50(in4),100(in1)
复制代码

上述的 series 发现落在 9-14 区间,所以后续的 series 数据,都被写到 in4。

4 个 ingester 时的 token

ring 工作流程

初始化 KV store

type KV struct {  // Protects access to memberlist and broadcasts fields.  initWG     sync.WaitGroup  memberlist *memberlist.Memberlist  broadcasts *memberlist.TransmitLimitedQueue
// KV Store. storeMu sync.Mutex store map[string]valueDesc
// Codec registry codecs map[string]codec.Codec
// Key watchers watchersMu sync.Mutex watchers map[string][]chan string prefixWatchers map[string][]chan string }
复制代码


// Client implements kv.Client interface, by using memberlist.KVtype Client struct {  kv    *KV // reference to singleton memberlist-based KV  codec codec.Codec}
复制代码


初始化一个基于 memberlist 的 KV 存储,其他类型比如 etcd/consul 也有相应 Client 的 interface。

1)store 表示实际传递的消息体,如果需要流转多种类型消息体,可以用 map 进行映射。

2)codecs 定义好 memberlist 中转发数据的解码工具 codec。

// NewDesc returns an empty ring.Descfunc NewDesc() *Desc {  return &Desc{    Ingesters: map[string]InstanceDesc{},  }}
复制代码


这里表示,ingester 的 ring,进行消息传递时,对应的 pb 结构体是 InstanceDesc。3)watchers 动态监听 key 的数据集。如下代码实例一个 memberlist 子节点:

case "memberlist":  kv, err := cfg.MemberlistKV()  if err != nil {    return nil, err  }  client, err = memberlist.NewClient(kv, codec)  if err != nil {    return nil, err  }
复制代码

定义 ring 结构体

type Lifecycler struct {  KVStore         kv.Client
// These values are initialised at startup, and never change ID string Addr string RingName string RingKey string Zone string
state InstanceState tokens Tokens
复制代码

初始化一个基于 memberlist 的 Lifecycer Ring 结构体,

  1. KVStore 即是前文的 memberlist.NewClient,

  2. tokens 记录当前 Ring 中的内容,由 KVStore 经过 codecs 解析后获取。

ringDesc = in.(*Desc)myTokens, takenTokens := ringDesc.TokensFor(i.ID)
复制代码

ingester 节点加入 ring

1)ingester 注册时,初始化 KV store 和 Ring 子节点,

2)首次加入 memlister 时,携带空的 token,为了加快 ring 子节点间快速探测及加入

ringDesc.AddIngester(i.ID, i.Addr, i.Zone, []uint32{}, i.GetState(), registeredAt)
复制代码

3)当 tsdb 文件加载完成后,再生成 token,重新刷新 ring 的信息

myTokens, takenTokens := ringDesc.TokensFor(i.ID)ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt())
复制代码


func (d *Desc) AddIngester(id, addr, zone string, tokens []uint32, state InstanceState, registeredAt time.Time) InstanceDesc {...ingester := InstanceDesc{  Addr:                addr,  Timestamp:           time.Now().Unix(),  RegisteredTimestamp: registeredTimestamp,  State:               state,  Tokens:              tokens,  Zone:                zone,  }  d.Ingesters[id] = ingester
复制代码

disrtibutor 获取 ingester 的 token 信息

disrtibutor 启动时,也会初始化一个 memberlist 的 client,使用 WatchKey 来探测 Ingester 的 token 变化。

r.KVClient.WatchKey(ctx, r.key, func(value interface{}) bool {  if value == nil {    level.Info(r.logger).Log("msg", "ring doesn't exist in KV store yet")    return true  }  r.updateRingState(value.(*Desc))  return true})
复制代码


当有 series 进入 Distributor 时,会使用最新的 token 集合,来判断将此 token 推到哪一个 Ingester 进行后续处理。

func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest)  ...  ring.DoBatch    ...  r.Get(key, op, bufDescs[:0], bufHosts[:0], bufZones[:0])
复制代码


func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, bufZones []string) (ReplicationSet, error) {  start      = searchToken(r.ringTokens, key)
复制代码

更多文章请关注我们公众号【Grafana爱好者】

发布于: 刚刚阅读数: 3
用户头像

jupiter

关注

还未添加个人签名 2022-01-14 加入

还未添加个人简介

评论

发布
暂无评论
cortex ingester 基于 hash ring 进行 token 管理_Prometheus_jupiter_InfoQ写作社区