前言
本系列将对夜莺平台各个模块的主要逻辑代码进行介绍,方便大家进行二次开发,本篇是系列的第三篇, index 和 tsdb 模块。
首先贴下夜莺的项目地址和架构图,正在使用夜莺的读者欢迎给夜莺加一个 star
本篇主要讲解数据存储模块 index 和 tsdb 模块,首先说明一下 transfer 到 tsdb 的数据流转
transfer 到 tsdb 数据流转
夜莺的默认存储后端是 tsdb 模块,为了可以支持海量的监控数据,tsdb 必须是可以横向水平扩展的,也就是 tsdb 可以部署多个实例,有了多个实例之后,会引入一个新问题,当监控数据来到服务端之后,应该发给哪个实例呢,这个工作交给 transfer 来负责决定,transfer 在将监控数据分发给 tsdb 的时候,使用了 一致性hash 算法,保证了同一条曲线的监控数据,每次都可以转发到相同的后端。transfer 同时也负责了监控数据查询的工作,因为知道数据发给谁,所以也知道去哪里获取数据。之所以使用 一致性 hash 还有一个原因是,在 tsdb 实例数据发生变更之后,可以保证发生迁移的监控数据尽可能的少。
下面讲解下 tsdb 模块代码的主要逻辑
tsdb 模块
首先看下 tsdb 模块的 main 函数,主要负责下面几个工作
接收监控数据写入内存
将内存中的数据定期落盘
将监控数据的索引推送到 index 模块
提供数据查询的能力
func main() {
aconf()
pconf()
start()
cfg := config.Config
loggeri.Init(cfg.Logger)
go stats.Init("n9e.tsdb")
cache.Init(cfg.Cache) //初始化用户存活监控数据的内存时序库
index.Init(cfg.Index) //初始化存放索引的功能
brpc.Init(cfg.RpcClient, index.IndexList.Get()) //连接index模块的client
cache.InitChunkSlot() //初始化用来落盘的槽位
rrdtool.Init(cfg.RRD) //启动落盘任务
migrate.Init(cfg.Migrate) //启动扩容功能
go http.Start()
go rpc.Start()
startSignal(os.Getpid())
}
复制代码
tsdb 架构
下面是 tsdb 模块的架构设计图
tsdb 模块接收到数据之后,会先将数据存放到内存当中,在将数据写入内存的时候,因为 tsdb 引入了 facebook 开源的 Gorilla 压缩算法,所以会对监控数据进行一次压缩,每条曲线有一个 chunks,每个 chunks 有多个 chunk 组成,每个 chunk 都储存了一段时间的监控数据。chunk 写满之后,会放到左边的 slot 中,然后按顺序落盘到 rrd 文件中,当查询请求过来之后,tsdb 会先从内存中拿数据,当拿到的数据不满足时间范围时,则会继续从 rrd 中读取数据,之后再将查到的数据进行合并,一并返回去。
数据写入
数据写入内存的过程主要由以下代码实现,监控数据上来之后,首先会检测对应的 chunk 是否存在,如果不存在则新建一个 chunk,如果存在,则写入到对应的 chunk 中,如果等待写入的 chunk 满了,则将 chunk push 到 ChunksSlots 中,再新建一个 chunk
func (cs *CS) Push(seriesID string, ts int64, value float64) error {
//找到当前chunk的起始时间
t0 := uint32(ts - (ts % int64(Config.SpanInSeconds)))
// 尚无chunk
if len(cs.Chunks) == 0 {
c := NewChunk(uint32(t0))
c.FirstTs = uint32(ts)
cs.Chunks = append(cs.Chunks, c)
return cs.Chunks[0].Push(uint32(ts), value)
}
// push到当前chunk
currentChunk := cs.GetChunk(cs.CurrentChunkPos)
if t0 == currentChunk.T0 {
if currentChunk.Closed {
return fmt.Errorf("push to closed chunk")
}
return currentChunk.Push(uint32(ts), value)
}
if t0 < currentChunk.T0 {
return fmt.Errorf("data @%v, timestamp old than previous chunk. currentchunk t0: %v\n", t0, currentChunk.T0)
}
// 需要新建chunk
// 先finish掉现有chunk
if !currentChunk.Closed {
currentChunk.FinishSync()
ChunksSlots.Push(seriesID, currentChunk)
}
// 超过chunks限制, pos回绕到0
cs.CurrentChunkPos++
if cs.CurrentChunkPos >= int(Config.NumOfChunks) {
cs.CurrentChunkPos = 0
}
// chunks未满, 直接append即可
if len(cs.Chunks) < int(Config.NumOfChunks) {
c := NewChunk(uint32(t0))
c.FirstTs = uint32(ts)
cs.Chunks = append(cs.Chunks, c)
return cs.Chunks[cs.CurrentChunkPos].Push(uint32(ts), value)
} else {
c := NewChunk(uint32(t0))
c.FirstTs = uint32(ts)
cs.Chunks[cs.CurrentChunkPos] = c
return cs.Chunks[cs.CurrentChunkPos].Push(uint32(ts), value)
}
return nil
}
复制代码
落盘机制
tsdb 另一个任务是定期将监控数据进行落盘,主要有 FlushFinishd2Disk() 来完成,槽位的个数由 FlushDiskStepMs 和 SpanInSeconds 来决定,每个刷新周期会将一个槽位的数据落盘的 rrd 文件中,具体 FlushDiskStepMs 的设置由磁盘 io 的能力决定。
func FlushFinishd2Disk() {
var idx int = 0
//time.Sleep(time.Second * time.Duration(cache.Config.SpanInSeconds))
ticker := time.NewTicker(time.Millisecond * time.Duration(cache.Config.FlushDiskStepMs)).C
slotNum := cache.Config.SpanInSeconds * 1000 / cache.Config.FlushDiskStepMs
for {
select {
case <-ticker:
idx = idx % slotNum
chunks := cache.ChunksSlots.Get(idx)
flushChunks := make(map[string][]*cache.Chunk, 0)
for key, cs := range chunks {
flushChunks[key] = cs
}
FlushRRD(flushChunks)
idx += 1
case <-cache.FlushDoneChan:
logger.Info("FlushFinishd2Disk recv sigout and exit...")
return
}
}
}
复制代码
数据查询
// 先从内从查询数据
iters, err := cache.Caches.Get(seriesID, startTs, endTs)
//查询起始时间在cache范围内,直接返回结果
if cachePointsSize > 0 && param.Start >= cachePoints[0].Timestamp {
resp.Values = cachePoints
stats.Counter.Set("query.hit.cache", 1)
goto _RETURN_OK
}
//从rrd文件查询数据
rrdFile = utils.RrdFileName(rrdtool.Config.Storage, seriesID, dsType, step)
rrdDatas, err = rrdtool.Fetch(rrdFile, seriesID, param.ConsolFunc, startTs-int64(step), endTs, step)
//合并之后,如果数据点太多,对数据进行降采样
if rsize > MaxRRAPointCnt {
sampleRate = int(rsize/MaxRRAPointCnt) + 1
sampleSize = int(rsize / sampleRate)
sampleStep = sampleRate * realStep
}
复制代码
索引更新
tsdb 会定期地新增索引和全量索引同步给 index 模块,为了保证 index 模块的高可用,index 会部署多个实例,那 tsdb 如何感知到有哪些 index 实例呢,index 会定期上报自己的心跳,tsdb 在上报索引的时候,会查询存活的 index 实例列表,然后将索引上报给所有存活的 index 实例。
index 模块
在从 transfer 查询监控数据的时候,需要提供完整的索引信息,这样 transfer 才可以知道从哪个 tsdb 后端数据,而索引信息的查询能力则由 index 模块来提供。下面是 index 模块的内存模型,非常简单,就是一个多级的 map 结构,所有的索引数据都存在 index 的内存中,为了防止 index 重启之后,缺失索引数据,index 会定期将索引数据进行落盘,不过 index 重启之后,不会直接使用落盘的数据,会先从其他 index 模块查询数据构建索引,这个才可以保证索引数据是最新的。只有在从其他 index 查不到数据的情况下,才会使用落盘数据重建索引。
index 其实就是一个多级的 map,下面是 index 的数据模型和代码中对于的数据结构
数据模型
数据结构
type EndpointIndexMap struct {
sync.RWMutex
M map[string]*MetricIndexMap `json:"endpoint_index"`
}
type MetricIndexMap struct {
sync.RWMutex
Reported bool
Data map[string]*MetricIndex
}
type MetricIndex struct {
sync.RWMutex
Metric string `json:"metric"`
Step int `json:"step"`
DsType string `json:"dstype"`
TagkvMap *TagkvIndex `json:"tags"`
CounterMap *CounterTsMap `json:"counters"`
Ts int64 `json:"ts"`
}
type TagkvIndex struct {
sync.RWMutex
Tagkv map[string]map[string]int64 `json:"tagkv"` // map[tagk]map[tagv]ts
}
复制代码
到此监控系统的介绍就结束了,下篇将为大家带来用户资源中心(RDB)模块的解读
作者简介
秦叶宁 企业级开源运维平台 Nightingale 主程,Urlooker 作者,现负责滴滴私有云运维产品方向的工作,如有运维平台的搭建需求,欢迎与我联系:)
评论