写点什么

Mimir 源码分析(二):效率爆棚的分片压缩

  • 2022 年 8 月 28 日
    上海
  • 本文字数:4833 字

    阅读完需:约 16 分钟

Mimir 源码分析(二):效率爆棚的分片压缩

背景


Mimir compactor 组件负责将抓取模块 ingester 上传的多个 TSDB blocks,合并成大的 block,即从 level1-levelN 的压缩过程。当处理亿级活跃指标压缩数据时,会出现一系列挑战性的问题。

问题场景


首先,原有的 compactor 不能均衡的处理压缩任务。起初,Mimir 团队使用企业版的 compactor,它可以很好地将跨时间段(默认 2h)的数据进行压缩,但是当压缩 2h 内 level1 的原始数据的 blocks 时,却花费了很长时间,因为 10 亿活跃指标,2h 内产生的 block 是非常多的。这样带来的一个后果就是,由于 2h 内的数据没有压缩、去重,查询性能也比较差。


然后,即使 compactor 能够完成压缩,最后结果也是异常。因为当前 TSDB 格式中,有内容大小的限制。比如 index 最大可以写 64G,index 内部的功能分区 section 的大小限制为 4BYTE=2^32,最大占用 4G。关于 index 详细格式,参考链接。作者在生产环境中也遇到 symbol label 限制的问题,由于当时 section 报错没有打印,所以提交PR增加了这个错误。但是 index 的限制问题,prometheus 社区还在讨论中,因为涉及 TSDB 数据落盘和历史数据兼容的问题。

split-and-merge 压缩


为了克服上述的问题和限制,Mimir 提出一种新的压缩算法,叫做 split-and-merge 压缩。这种方法支持水平和垂直扩展,尤其是在同一时间段内的数据压缩有明显效果。总体来说,包含三个步骤:group(分组)-split(拆分)-merge(压缩)


group


首先将源 blocks 进行分组(需要注意 group 前的 blocks 是所有 Ingester 模块上传到对象存储的 2h 原始数据块),然后每个 compactor 分配一些“预压缩”任务,比如 compator-1 负责压缩 block#1 和 block#2 ,compator-2 负责压缩 block#3 和 block#4 ,这样 compator-1 和 compator-2 可以并行工作。


对于原始未压缩的 2h 数据,打上 stageSplit 状态


/home/opensource/mimir/pkg/compactor/split_merge_grouper.go


 //blockrange configuration:cfg.BlockRanges = mimir_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}

func planCompaction(userID string, blocks []*metadata.Meta, ranges []int64, shardCount, splitGroups uint32) (jobs []*job) { planCompactionByRange(userID, mainBlocks, tr, tr == ranges[0], shardCount, splitGroups) ... if splitJobs := planSplitting(userID, group, splitGroups); len(splitJobs) > 0 { // start to produce groups splitGroup := mimir_tsdb.HashBlockID(block.ULID) % splitGroups if jobs[splitGroup] == nil { jobs[splitGroup] = &job{ userID: userID, stage: stageSplit, // split stage shardID: sharding.FormatShardIDLabelValue(uint64(splitGroup), uint64(splitGroups)), blocksGroup: blocksGroup{ rangeStart: group.rangeStart, rangeEnd: group.rangeEnd, }, } }
复制代码


请注意,上面的 shardID 索引是根据 splitGroup := mimir_tsdb.HashBlockID(block.ULID) % splitGroups


可能得到如下 jobs 的 shardID 信息:


shardID=1_of_2; // group-1shardID=2_of_2; // group-2
复制代码


需要设置 shardID 分配任务,因为任务执行时,需要 shardingKey,来保证不同的 group 被不同的 compactor 执行。


func (j *job) shardingKey() string {  return fmt.Sprintf("%s-%s-%d-%d-%s", j.userID, j.stage, j.rangeStart, j.rangeEnd, j.shardID)}
复制代码


最后生成了本次 groups 的任务,


//home/opensource/mimir/pkg/compactor/split_merge_grouper.gofunc (g *SplitAndMergeGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Job, err error) {    compactionJob := NewJob(      g.userID,      groupKey,      externalLabels,      resolution,      metadata.NoneFunc,      job.stage == stageSplit,   // mark this a split job      g.shardCount,      job.shardingKey(),    )
//such jobs are executed by compactors
复制代码


此任务随后被 compactors 执行,具体被 compators 集群中的哪个节点执行该任务,可以跟踪一下 func (s *splitAndMergeShardingStrategy) ownJob(job *Job) 函数 。后面我们再单独分享一次,jobs 的分解和执行逻辑。

split


对于 compator-1,根据分组产生的预压缩任务,下载 block#1 和 block#2 到本地后,再按照配置的 split-and-merge-shards 值(这里配置为 2),根据 sereis 的 label 进行 hash 分成 2 份,产生了 block#5(shard 1_of_2)和 block#6(shard 2_of_2)。这里的 block#5 和 block#6 随后将被上传到对象存储。


===>split/home/opensource/mimir/pkg/compactor/bucket_compactor.gofunc (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shouldRerun bool, compIDs []ulid.ULID, rerr error) {
if job.UseSplitting() { compIDs, err = c.comp.CompactWithSplitting(subDir, blocksToCompactDirs, nil, uint64(job.SplittingShards()))
复制代码


然后修改了 tsdb 压缩部分,传进去 shardCount 参数,这里是为了构造 shard 数据。


###modify prometheus tsdb compact codes by adding #shardCount :/home/opensource/mimir/vendor/github.com/prometheus/prometheus/tsdb/compact.go
func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, shardCount uint64) (_ []ulid.ULID, err error) {
func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks ...BlockReader) (err error) {func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) (err error) { if len(outBlocks) > 1 { obIx = s.Labels().Hash() % uint64(len(outBlocks)) // obIx = shard index,outBlocks=shard counts }####return from prometheus tsdb compact
复制代码


最主要的是上面的 obIx 部分,这个数据就是某个 series 被分配到哪个 shard 的唯一根据。outBlocks 的个数,就是 shardcounts 的配置值。


  obIx = s.Labels().Hash() % uint64(len(outBlocks))   // obIx = shard index,outBlocks=shard counts
复制代码


最后将每个 split 完的 block,加上"compactor_shard_id"标签,然后被推送到持久化存储。


  //CompactorShardIDExternalLabel = "__compactor_shard_id__"      //block meta info add shard label, like "__compactor_shard_id__": "2_of_2"  if job.UseSplitting() {      newLabels[mimit_tsdb.CompactorShardIDExternalLabel] = sharding.FormatShardIDLabelValue(uint64(blockToUpload.shardIndex), uint64(job.SplittingShards()))    }  //such blocks then will be uploaded to buckets.
复制代码

merge


上面 split 完的 shard:1_of_2,包含 block#5 和 block#7,然后由 compactor-3 对这两个 block 的数据进行压缩,生成 1 个 2h 的分片 block#9。同理,由 compactor-4 生成另外 1 个分片,block#10。


经过上述步骤,tenant 中的 2h 原始数据,被切分成两个 level2 的 block,随后的 level3-levelN 的压缩,会由 compactor-x 继续执行 merge 的动作,同时分别保持 1_of_2 和 2_of_2 的 label。


merge 时仍然需要先创建任务,这时候的 stage,就是 stageMerge


======> merge
handle "__compactor_shard_id__" labeled blocks
func planCompactionByRange(userID string, blocks []*metadata.Meta, tr int64, isSmallestRange bool, shardCount, splitGroups uint32) (jobs []*job) {... func groupBlocksByShardID(blocks []*metadata.Meta) map[string][]*metadata.Meta { shardID := block.Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel] groups[shardID] = append(groups[shardID], block) // produce merge groups by CompactorShardIDExternalLabel
jobs = append(jobs, &job{ userID: userID, stage: stageMerge, // merge stage shardID: shardID, blocksGroup: blocksGroup{ rangeStart: group.rangeStart, rangeEnd: group.rangeEnd, blocks: shardBlocks, // shardBlocks only has one label type like "1_of_2" or "2_of_2" }, })
//compact merge blocks by shardBlocks belonging same __compactor_shard_id__.func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shouldRerun bool, compIDs []ulid.ULID, rerr error) { ... // used for splitting if job.UseSplitting() { compIDs, err = c.comp.CompactWithSplitting(subDir, blocksToCompactDirs, nil, uint64(job.SplittingShards())) } // used for merging else { var compID ulid.ULID compID, err = c.comp.Compact(subDir, blocksToCompactDirs, nil) // execute normal compaction compIDs = append(compIDs, compID) }
复制代码

如何开启分片压缩


关键配置信息

-compactor.split-groups:分组数配置-compactor.split-and-merge-shards:分组后,每个block进行分片的个数,以及最终merge后产生的分片数。-compactor.compactor-tenant-shard-size: 多租户场景下,每个租户可以使用的compactor实例个数。默认为0,不限制使用个数。-compactor.compaction-concurrency: 每个compactor实例可以同时开启的压缩线程数
复制代码

一个例子


我们前面章节使用 docker-compose 一键部署单体版集群介绍的 3 节点集群部署的单体模式为基础,修改 overrides.xml 文件中,demo 租户的 compator 分片配置,


overrides:  demo:    compactor_split_and_merge_shards: 2    compactor_split_groups: 2
复制代码


重新部署,经过 2h 后,compactor 开始工作,在/data-compactor/compactor-meta-demo/meta-syncer 目录下,可以看到 meta.json 的信息。压缩后的 blocks,已经上传到对象存储,如需查看具体的 block 可以去 minio 控制台查看。


01G87DF5RCC2HT585HPR7J7TWN/meta.json:            "__compactor_shard_id__": "2_of_2"01G87DF6CAKB3PKV66R9CDJPXW/meta.json:            "__compactor_shard_id__": "2_of_2"01G87DF6QKSE4Z2MS028J7YCTV/meta.json:            "__compactor_shard_id__": "2_of_2"01G87EST3SF17E8SEEJ76VFTY2/meta.json:            "__compactor_shard_id__": "1_of_2"
复制代码


某个 level2 的 meta.json 文件内容如下,可以看到"compactor_shard_id": "2_of_2"的标签表示本次 block 包含的是 series_hash % shard_count 为 2 的内容。


  "compaction": {                "level": 2,                "sources": [                        "01G87M05RRTFTMGFMZD8BENZXH"                ],                "parents": [                        {                              ...                        }                ]        },        "version": 1,        "thanos": {                "labels": {                        "__compactor_shard_id__": "2_of_2"                },                "downsample": {                        "resolution": 0                },                "source": "compactor",
复制代码

总结


对于 split-merge 分片压缩,大量任务可以由多台 compactor 实例提供分布式的方式,并发执行。


即,当某个租户的指标很多时,可以配置较多的 groups,split-and-merge-shards 参数:


进行 2h 内的垂直分片合并,在 merge 生成 Level1 数据时,是基于每个分片的维度独立进行的。这样可以快速完成多副本数据合并;


进行 2h 外的水平分片合并,即 Level2-LevelN 压缩时,也会保持较高的效率和性能。


可以说,水平压缩的高效,得益于垂直压缩的分片。


下图显示了 150 台 compactor 实例,压缩单租户 10 亿指标的数据,



可以看出所有 compactor 的压力比较均衡。水平和纵向压缩,可以保证查询时,不访问非压缩的 block。

发布于: 刚刚阅读数: 6
用户头像

学习是从了解到使用再到输出的过程。 2018.04.27 加入

GrafanaFans 是由南京多位 Grafana 爱好者一起发起的 Grafana 开源产品学习小组,致力于 LGTM(Loki、Grafana、Tempo、Mimir)技术栈在国内的普及和应用,欢迎关注开源项目 https://github.com/grafanafans/club。

评论

发布
暂无评论
Mimir 源码分析(二):效率爆棚的分片压缩_Mimir_Grafana 爱好者_InfoQ写作社区