背景
Mimir compactor 组件负责将抓取模块 ingester 上传的多个 TSDB blocks,合并成大的 block,即从 level1-levelN 的压缩过程。当处理亿级活跃指标压缩数据时,会出现一系列挑战性的问题。
问题场景
首先,原有的 compactor 不能均衡的处理压缩任务。起初,Mimir 团队使用企业版的 compactor,它可以很好地将跨时间段(默认 2h)的数据进行压缩,但是当压缩 2h 内 level1 的原始数据的 blocks 时,却花费了很长时间,因为 10 亿活跃指标,2h 内产生的 block 是非常多的。这样带来的一个后果就是,由于 2h 内的数据没有压缩、去重,查询性能也比较差。
然后,即使 compactor 能够完成压缩,最后结果也是异常。因为当前 TSDB 格式中,有内容大小的限制。比如 index 最大可以写 64G,index 内部的功能分区 section 的大小限制为 4BYTE=2^32,最大占用 4G。关于 index 详细格式,参考链接。作者在生产环境中也遇到 symbol label 限制的问题,由于当时 section 报错没有打印,所以提交PR增加了这个错误。但是 index 的限制问题,prometheus 社区还在讨论中,因为涉及 TSDB 数据落盘和历史数据兼容的问题。
split-and-merge 压缩
为了克服上述的问题和限制,Mimir 提出一种新的压缩算法,叫做 split-and-merge 压缩。这种方法支持水平和垂直扩展,尤其是在同一时间段内的数据压缩有明显效果。总体来说,包含三个步骤:group(分组)-split(拆分)-merge(压缩)
group
首先将源 blocks 进行分组(需要注意 group 前的 blocks 是所有 Ingester 模块上传到对象存储的 2h 原始数据块),然后每个 compactor 分配一些“预压缩”任务,比如 compator-1 负责压缩 block#1 和 block#2 ,compator-2 负责压缩 block#3 和 block#4 ,这样 compator-1 和 compator-2 可以并行工作。
对于原始未压缩的 2h 数据,打上 stageSplit 状态
/home/opensource/mimir/pkg/compactor/split_merge_grouper.go
  //blockrange configuration:cfg.BlockRanges = mimir_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}
func planCompaction(userID string, blocks []*metadata.Meta, ranges []int64, shardCount, splitGroups uint32) (jobs []*job) {  planCompactionByRange(userID, mainBlocks, tr, tr == ranges[0], shardCount, splitGroups)   ...    if splitJobs := planSplitting(userID, group, splitGroups); len(splitJobs) > 0 {                  // start to produce groups      splitGroup := mimir_tsdb.HashBlockID(block.ULID) % splitGroups      if jobs[splitGroup] == nil {      jobs[splitGroup] = &job{        userID:  userID,        stage:   stageSplit,  // split stage        shardID: sharding.FormatShardIDLabelValue(uint64(splitGroup), uint64(splitGroups)),        blocksGroup: blocksGroup{          rangeStart: group.rangeStart,          rangeEnd:   group.rangeEnd,        },      }    }
   复制代码
 
请注意,上面的 shardID 索引是根据 splitGroup := mimir_tsdb.HashBlockID(block.ULID) % splitGroups
可能得到如下 jobs 的 shardID 信息:
 shardID=1_of_2; // group-1shardID=2_of_2; // group-2
   复制代码
 
需要设置 shardID 分配任务,因为任务执行时,需要 shardingKey,来保证不同的 group 被不同的 compactor 执行。
 func (j *job) shardingKey() string {  return fmt.Sprintf("%s-%s-%d-%d-%s", j.userID, j.stage, j.rangeStart, j.rangeEnd, j.shardID)}
   复制代码
 
最后生成了本次 groups 的任务,
 //home/opensource/mimir/pkg/compactor/split_merge_grouper.gofunc (g *SplitAndMergeGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Job, err error) {    compactionJob := NewJob(      g.userID,      groupKey,      externalLabels,      resolution,      metadata.NoneFunc,      job.stage == stageSplit,   // mark this a split job      g.shardCount,      job.shardingKey(),    )
//such jobs are executed by compactors
   复制代码
 
此任务随后被 compactors 执行,具体被 compators 集群中的哪个节点执行该任务,可以跟踪一下 func (s *splitAndMergeShardingStrategy) ownJob(job *Job) 函数 。后面我们再单独分享一次,jobs 的分解和执行逻辑。
split
对于 compator-1,根据分组产生的预压缩任务,下载 block#1 和 block#2 到本地后,再按照配置的 split-and-merge-shards 值(这里配置为 2),根据 sereis 的 label 进行 hash 分成 2 份,产生了 block#5(shard 1_of_2)和 block#6(shard 2_of_2)。这里的 block#5 和 block#6 随后将被上传到对象存储。
 ===>split/home/opensource/mimir/pkg/compactor/bucket_compactor.gofunc (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shouldRerun bool, compIDs []ulid.ULID, rerr error) {
  if job.UseSplitting() {    compIDs, err = c.comp.CompactWithSplitting(subDir, blocksToCompactDirs, nil, uint64(job.SplittingShards()))
   复制代码
 
然后修改了 tsdb 压缩部分,传进去 shardCount 参数,这里是为了构造 shard 数据。
 ###modify prometheus tsdb compact codes by adding #shardCount :/home/opensource/mimir/vendor/github.com/prometheus/prometheus/tsdb/compact.go
func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, shardCount uint64) (_ []ulid.ULID, err error) {
func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks ...BlockReader) (err error) {func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) (err error) {      if len(outBlocks) > 1 {      obIx = s.Labels().Hash() % uint64(len(outBlocks))   // obIx = shard index,outBlocks=shard counts    }####return from prometheus tsdb compact
   复制代码
 
最主要的是上面的 obIx 部分,这个数据就是某个 series 被分配到哪个 shard 的唯一根据。outBlocks 的个数,就是 shardcounts 的配置值。
   obIx = s.Labels().Hash() % uint64(len(outBlocks))   // obIx = shard index,outBlocks=shard counts
   复制代码
 
最后将每个 split 完的 block,加上"compactor_shard_id"标签,然后被推送到持久化存储。
   //CompactorShardIDExternalLabel = "__compactor_shard_id__"      //block meta info add shard label, like "__compactor_shard_id__": "2_of_2"  if job.UseSplitting() {      newLabels[mimit_tsdb.CompactorShardIDExternalLabel] = sharding.FormatShardIDLabelValue(uint64(blockToUpload.shardIndex), uint64(job.SplittingShards()))    }  //such blocks then will be uploaded to buckets.
   复制代码
 merge
上面 split 完的 shard:1_of_2,包含 block#5 和 block#7,然后由 compactor-3 对这两个 block 的数据进行压缩,生成 1 个 2h 的分片 block#9。同理,由 compactor-4 生成另外 1 个分片,block#10。
经过上述步骤,tenant 中的 2h 原始数据,被切分成两个 level2 的 block,随后的 level3-levelN 的压缩,会由 compactor-x 继续执行 merge 的动作,同时分别保持 1_of_2 和 2_of_2 的 label。
merge 时仍然需要先创建任务,这时候的 stage,就是 stageMerge
 ======> merge
handle "__compactor_shard_id__" labeled blocks
func planCompactionByRange(userID string, blocks []*metadata.Meta, tr int64, isSmallestRange bool, shardCount, splitGroups uint32) (jobs []*job) {...    func groupBlocksByShardID(blocks []*metadata.Meta) map[string][]*metadata.Meta {      shardID := block.Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel]      groups[shardID] = append(groups[shardID], block)      // produce merge groups by CompactorShardIDExternalLabel
      jobs = append(jobs, &job{        userID:  userID,        stage:   stageMerge,    // merge stage        shardID: shardID,        blocksGroup: blocksGroup{          rangeStart: group.rangeStart,          rangeEnd:   group.rangeEnd,          blocks:     shardBlocks,   // shardBlocks only has one label type like "1_of_2" or "2_of_2"        },      })
//compact merge blocks by shardBlocks belonging same __compactor_shard_id__.func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shouldRerun bool, compIDs []ulid.ULID, rerr error) {  ...  // used for splitting      if job.UseSplitting() {    compIDs, err = c.comp.CompactWithSplitting(subDir, blocksToCompactDirs, nil, uint64(job.SplittingShards()))  }   // used for merging  else {      var compID ulid.ULID    compID, err = c.comp.Compact(subDir, blocksToCompactDirs, nil)  // execute normal compaction     compIDs = append(compIDs, compID)  }
   复制代码
 如何开启分片压缩
关键配置信息
 -compactor.split-groups:分组数配置-compactor.split-and-merge-shards:分组后,每个block进行分片的个数,以及最终merge后产生的分片数。-compactor.compactor-tenant-shard-size: 多租户场景下,每个租户可以使用的compactor实例个数。默认为0,不限制使用个数。-compactor.compaction-concurrency: 每个compactor实例可以同时开启的压缩线程数
   复制代码
 一个例子
我们前面章节使用 docker-compose 一键部署单体版集群介绍的 3 节点集群部署的单体模式为基础,修改 overrides.xml 文件中,demo 租户的 compator 分片配置,
 overrides:  demo:    compactor_split_and_merge_shards: 2    compactor_split_groups: 2
   复制代码
 
重新部署,经过 2h 后,compactor 开始工作,在/data-compactor/compactor-meta-demo/meta-syncer 目录下,可以看到 meta.json 的信息。压缩后的 blocks,已经上传到对象存储,如需查看具体的 block 可以去 minio 控制台查看。
 01G87DF5RCC2HT585HPR7J7TWN/meta.json:            "__compactor_shard_id__": "2_of_2"01G87DF6CAKB3PKV66R9CDJPXW/meta.json:            "__compactor_shard_id__": "2_of_2"01G87DF6QKSE4Z2MS028J7YCTV/meta.json:            "__compactor_shard_id__": "2_of_2"01G87EST3SF17E8SEEJ76VFTY2/meta.json:            "__compactor_shard_id__": "1_of_2"
   复制代码
 
某个 level2 的 meta.json 文件内容如下,可以看到"compactor_shard_id": "2_of_2"的标签表示本次 block 包含的是 series_hash % shard_count 为 2 的内容。
   "compaction": {                "level": 2,                "sources": [                        "01G87M05RRTFTMGFMZD8BENZXH"                ],                "parents": [                        {                              ...                        }                ]        },        "version": 1,        "thanos": {                "labels": {                        "__compactor_shard_id__": "2_of_2"                },                "downsample": {                        "resolution": 0                },                "source": "compactor",
   复制代码
 总结
对于 split-merge 分片压缩,大量任务可以由多台 compactor 实例提供分布式的方式,并发执行。
即,当某个租户的指标很多时,可以配置较多的 groups,split-and-merge-shards 参数:
进行 2h 内的垂直分片合并,在 merge 生成 Level1 数据时,是基于每个分片的维度独立进行的。这样可以快速完成多副本数据合并;
进行 2h 外的水平分片合并,即 Level2-LevelN 压缩时,也会保持较高的效率和性能。
可以说,水平压缩的高效,得益于垂直压缩的分片。
下图显示了 150 台 compactor 实例,压缩单租户 10 亿指标的数据,
可以看出所有 compactor 的压力比较均衡。水平和纵向压缩,可以保证查询时,不访问非压缩的 block。
评论