背景
Mimir compactor 组件负责将抓取模块 ingester 上传的多个 TSDB blocks,合并成大的 block,即从 level1-levelN 的压缩过程。当处理亿级活跃指标压缩数据时,会出现一系列挑战性的问题。
问题场景
首先,原有的 compactor 不能均衡的处理压缩任务。起初,Mimir 团队使用企业版的 compactor,它可以很好地将跨时间段(默认 2h)的数据进行压缩,但是当压缩 2h 内 level1 的原始数据的 blocks 时,却花费了很长时间,因为 10 亿活跃指标,2h 内产生的 block 是非常多的。这样带来的一个后果就是,由于 2h 内的数据没有压缩、去重,查询性能也比较差。
然后,即使 compactor 能够完成压缩,最后结果也是异常。因为当前 TSDB 格式中,有内容大小的限制。比如 index 最大可以写 64G,index 内部的功能分区 section 的大小限制为 4BYTE=2^32,最大占用 4G。关于 index 详细格式,参考链接。作者在生产环境中也遇到 symbol label 限制的问题,由于当时 section 报错没有打印,所以提交PR增加了这个错误。但是 index 的限制问题,prometheus 社区还在讨论中,因为涉及 TSDB 数据落盘和历史数据兼容的问题。
split-and-merge 压缩
为了克服上述的问题和限制,Mimir 提出一种新的压缩算法,叫做 split-and-merge 压缩。这种方法支持水平和垂直扩展,尤其是在同一时间段内的数据压缩有明显效果。总体来说,包含三个步骤:group(分组)-split(拆分)-merge(压缩)
group
首先将源 blocks 进行分组(需要注意 group 前的 blocks 是所有 Ingester 模块上传到对象存储的 2h 原始数据块),然后每个 compactor 分配一些“预压缩”任务,比如 compator-1 负责压缩 block#1 和 block#2 ,compator-2 负责压缩 block#3 和 block#4 ,这样 compator-1 和 compator-2 可以并行工作。
对于原始未压缩的 2h 数据,打上 stageSplit 状态
/home/opensource/mimir/pkg/compactor/split_merge_grouper.go
//blockrange configuration:
cfg.BlockRanges = mimir_tsdb.DurationList{2 * time.Hour, 12 * time.Hour, 24 * time.Hour}
func planCompaction(userID string, blocks []*metadata.Meta, ranges []int64, shardCount, splitGroups uint32) (jobs []*job) {
planCompactionByRange(userID, mainBlocks, tr, tr == ranges[0], shardCount, splitGroups)
...
if splitJobs := planSplitting(userID, group, splitGroups); len(splitJobs) > 0 {
// start to produce groups
splitGroup := mimir_tsdb.HashBlockID(block.ULID) % splitGroups
if jobs[splitGroup] == nil {
jobs[splitGroup] = &job{
userID: userID,
stage: stageSplit, // split stage
shardID: sharding.FormatShardIDLabelValue(uint64(splitGroup), uint64(splitGroups)),
blocksGroup: blocksGroup{
rangeStart: group.rangeStart,
rangeEnd: group.rangeEnd,
},
}
}
复制代码
请注意,上面的 shardID 索引是根据 splitGroup := mimir_tsdb.HashBlockID(block.ULID) % splitGroups
可能得到如下 jobs 的 shardID 信息:
shardID=1_of_2; // group-1
shardID=2_of_2; // group-2
复制代码
需要设置 shardID 分配任务,因为任务执行时,需要 shardingKey,来保证不同的 group 被不同的 compactor 执行。
func (j *job) shardingKey() string {
return fmt.Sprintf("%s-%s-%d-%d-%s", j.userID, j.stage, j.rangeStart, j.rangeEnd, j.shardID)
}
复制代码
最后生成了本次 groups 的任务,
//home/opensource/mimir/pkg/compactor/split_merge_grouper.go
func (g *SplitAndMergeGrouper) Groups(blocks map[ulid.ULID]*metadata.Meta) (res []*Job, err error) {
compactionJob := NewJob(
g.userID,
groupKey,
externalLabels,
resolution,
metadata.NoneFunc,
job.stage == stageSplit, // mark this a split job
g.shardCount,
job.shardingKey(),
)
//such jobs are executed by compactors
复制代码
此任务随后被 compactors 执行,具体被 compators 集群中的哪个节点执行该任务,可以跟踪一下 func (s *splitAndMergeShardingStrategy) ownJob(job *Job) 函数 。后面我们再单独分享一次,jobs 的分解和执行逻辑。
split
对于 compator-1,根据分组产生的预压缩任务,下载 block#1 和 block#2 到本地后,再按照配置的 split-and-merge-shards 值(这里配置为 2),根据 sereis 的 label 进行 hash 分成 2 份,产生了 block#5(shard 1_of_2)和 block#6(shard 2_of_2)。这里的 block#5 和 block#6 随后将被上传到对象存储。
===>split
/home/opensource/mimir/pkg/compactor/bucket_compactor.go
func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shouldRerun bool, compIDs []ulid.ULID, rerr error) {
if job.UseSplitting() {
compIDs, err = c.comp.CompactWithSplitting(subDir, blocksToCompactDirs, nil, uint64(job.SplittingShards()))
复制代码
然后修改了 tsdb 压缩部分,传进去 shardCount 参数,这里是为了构造 shard 数据。
###modify prometheus tsdb compact codes by adding #shardCount :
/home/opensource/mimir/vendor/github.com/prometheus/prometheus/tsdb/compact.go
func (c *LeveledCompactor) compact(dest string, dirs []string, open []*Block, shardCount uint64) (_ []ulid.ULID, err error) {
func (c *LeveledCompactor) write(dest string, outBlocks []shardedBlock, blocks ...BlockReader) (err error) {
func (c *LeveledCompactor) populateBlock(blocks []BlockReader, minT, maxT int64, outBlocks []shardedBlock) (err error) {
if len(outBlocks) > 1 {
obIx = s.Labels().Hash() % uint64(len(outBlocks)) // obIx = shard index,outBlocks=shard counts
}
####return from prometheus tsdb compact
复制代码
最主要的是上面的 obIx 部分,这个数据就是某个 series 被分配到哪个 shard 的唯一根据。outBlocks 的个数,就是 shardcounts 的配置值。
obIx = s.Labels().Hash() % uint64(len(outBlocks)) // obIx = shard index,outBlocks=shard counts
复制代码
最后将每个 split 完的 block,加上"compactor_shard_id"标签,然后被推送到持久化存储。
//CompactorShardIDExternalLabel = "__compactor_shard_id__"
//block meta info add shard label, like "__compactor_shard_id__": "2_of_2"
if job.UseSplitting() {
newLabels[mimit_tsdb.CompactorShardIDExternalLabel] = sharding.FormatShardIDLabelValue(uint64(blockToUpload.shardIndex), uint64(job.SplittingShards()))
}
//such blocks then will be uploaded to buckets.
复制代码
merge
上面 split 完的 shard:1_of_2,包含 block#5 和 block#7,然后由 compactor-3 对这两个 block 的数据进行压缩,生成 1 个 2h 的分片 block#9。同理,由 compactor-4 生成另外 1 个分片,block#10。
经过上述步骤,tenant 中的 2h 原始数据,被切分成两个 level2 的 block,随后的 level3-levelN 的压缩,会由 compactor-x 继续执行 merge 的动作,同时分别保持 1_of_2 和 2_of_2 的 label。
merge 时仍然需要先创建任务,这时候的 stage,就是 stageMerge
======> merge
handle "__compactor_shard_id__" labeled blocks
func planCompactionByRange(userID string, blocks []*metadata.Meta, tr int64, isSmallestRange bool, shardCount, splitGroups uint32) (jobs []*job) {
...
func groupBlocksByShardID(blocks []*metadata.Meta) map[string][]*metadata.Meta {
shardID := block.Thanos.Labels[mimir_tsdb.CompactorShardIDExternalLabel]
groups[shardID] = append(groups[shardID], block) // produce merge groups by CompactorShardIDExternalLabel
jobs = append(jobs, &job{
userID: userID,
stage: stageMerge, // merge stage
shardID: shardID,
blocksGroup: blocksGroup{
rangeStart: group.rangeStart,
rangeEnd: group.rangeEnd,
blocks: shardBlocks, // shardBlocks only has one label type like "1_of_2" or "2_of_2"
},
})
//compact merge blocks by shardBlocks belonging same __compactor_shard_id__.
func (c *BucketCompactor) runCompactionJob(ctx context.Context, job *Job) (shouldRerun bool, compIDs []ulid.ULID, rerr error) {
...
// used for splitting
if job.UseSplitting() {
compIDs, err = c.comp.CompactWithSplitting(subDir, blocksToCompactDirs, nil, uint64(job.SplittingShards()))
}
// used for merging
else {
var compID ulid.ULID
compID, err = c.comp.Compact(subDir, blocksToCompactDirs, nil) // execute normal compaction
compIDs = append(compIDs, compID)
}
复制代码
如何开启分片压缩
关键配置信息
-compactor.split-groups:分组数配置
-compactor.split-and-merge-shards:分组后,每个block进行分片的个数,以及最终merge后产生的分片数。
-compactor.compactor-tenant-shard-size: 多租户场景下,每个租户可以使用的compactor实例个数。默认为0,不限制使用个数。
-compactor.compaction-concurrency: 每个compactor实例可以同时开启的压缩线程数
复制代码
一个例子
我们前面章节使用 docker-compose 一键部署单体版集群介绍的 3 节点集群部署的单体模式为基础,修改 overrides.xml 文件中,demo 租户的 compator 分片配置,
overrides:
demo:
compactor_split_and_merge_shards: 2
compactor_split_groups: 2
复制代码
重新部署,经过 2h 后,compactor 开始工作,在/data-compactor/compactor-meta-demo/meta-syncer 目录下,可以看到 meta.json 的信息。压缩后的 blocks,已经上传到对象存储,如需查看具体的 block 可以去 minio 控制台查看。
01G87DF5RCC2HT585HPR7J7TWN/meta.json: "__compactor_shard_id__": "2_of_2"
01G87DF6CAKB3PKV66R9CDJPXW/meta.json: "__compactor_shard_id__": "2_of_2"
01G87DF6QKSE4Z2MS028J7YCTV/meta.json: "__compactor_shard_id__": "2_of_2"
01G87EST3SF17E8SEEJ76VFTY2/meta.json: "__compactor_shard_id__": "1_of_2"
复制代码
某个 level2 的 meta.json 文件内容如下,可以看到"compactor_shard_id": "2_of_2"的标签表示本次 block 包含的是 series_hash % shard_count 为 2 的内容。
"compaction": {
"level": 2,
"sources": [
"01G87M05RRTFTMGFMZD8BENZXH"
],
"parents": [
{
...
}
]
},
"version": 1,
"thanos": {
"labels": {
"__compactor_shard_id__": "2_of_2"
},
"downsample": {
"resolution": 0
},
"source": "compactor",
复制代码
总结
对于 split-merge 分片压缩,大量任务可以由多台 compactor 实例提供分布式的方式,并发执行。
即,当某个租户的指标很多时,可以配置较多的 groups,split-and-merge-shards 参数:
进行 2h 内的垂直分片合并,在 merge 生成 Level1 数据时,是基于每个分片的维度独立进行的。这样可以快速完成多副本数据合并;
进行 2h 外的水平分片合并,即 Level2-LevelN 压缩时,也会保持较高的效率和性能。
可以说,水平压缩的高效,得益于垂直压缩的分片。
下图显示了 150 台 compactor 实例,压缩单租户 10 亿指标的数据,
可以看出所有 compactor 的压力比较均衡。水平和纵向压缩,可以保证查询时,不访问非压缩的 block。
评论