写点什么

夜莺二次开发指南 - 监控系统(1)

用户头像
秦叶宁
关注
发布于: 2020 年 12 月 24 日
夜莺二次开发指南-监控系统(1)

前言



本系列将对夜莺平台各个模块的主要逻辑代码进行介绍,方便大家进行二次开发,本篇是系列的第一篇,agent 和 transfer 模块代码解读。



首先贴下夜莺的项目地址和架构图,正在使用夜莺的读者欢迎给夜莺加一个star







本节主要讲解监控系统部分的源代码主逻辑,本篇主要讲解agent和transfer模块

Agent 解读

首先看一下agent的main函数,可以看出来agent负责了三个主要的工作,监控指标的采集、主机设备信息的收集、任务命令的执行

func main() {
...
if config.Config.Enable.Mon {
monStart()
}

if config.Config.Enable.Job {
jobStart()
}

if config.Config.Enable.Report {
reportStart()
}

...
}


因为本节只讲监控,所以只介绍监控指标的采集能力实现,也就是monStart()部分,其他的放到其他章节来介绍,从monStart()可以看出来,监控指标的采集主要分五部分



  • 系统指标采集

  • 端口采集

  • 进程采集

  • 插件采集

  • 日志采集



下面逐个对这五个采集的实现进行介绍



func monStart() {
sys.Init(config.Config.Sys)
stra.Init()

//系统指标采集
funcs.BuildMappers()
funcs.Collect()

//插件采集
plugins.Detect()

//进程采集
procs.Detect()

//端口采集
ports.Detect()

//初始化缓存,用作保存COUNTER类型数据
cache.Init()

//日志采集
go worker.UpdateConfigsLoop()
go worker.PusherStart()
go worker.Zeroize()
}


系统指标采集

首先看一下系统指标采集,如果想要增加系统指标采集项,可以从这里入手,每一个 FuncsAndInterval 对象,都包含了一组采集函数和采集周期,我们可以自己实现一个采集系统指标函数,然后赋值给 FuncsAndInterval,再加入到 []FuncsAndInterval{} 中,即可扩充agent的基础指标采集能力



type FuncsAndInterval struct {
Fs []func() []*dataobj.MetricValu
Interval int
}


端口采集

夜莺agent要对机器上的那些端口进行探测采集是可以在web页面进行配置的,主要调度逻辑如下,一个for循环,定期获取服务的采集配置,然后和本地的策略进行对比,没有的增加,多余的删除。agent对获取到的采集策略在内存了一个备份,服务端不能正常工作,导致采集任务无法进行



func Detect() {
detect()
go loopDetect()
}

func loopDetect() {
for {
time.Sleep(time.Second * 10)
detect()
}
}

func detect() {
ps := stra.GetPortCollects()
DelNoPortCollect(ps)
AddNewPortCollect(ps)
}

...
//端口采集主逻辑
func PortCollect(p *models.PortCollect) {
value := 0
if isListening(p.Port) {
value = 1
}

item := core.GaugeValue("proc.port.listen", value, p.Tags)
item.Step = int64(p.Step)
item.Timestamp = time.Now().Unix()
item.Endpoint = config.Endpoint
core.Push([]*dataobj.MetricValue{item})
}


进程采集

进程采集的调度逻辑和端口采集类似,不在赘述,目前夜莺只采集了进程的个数、cpu、内存使用率,如果想增加进程的其他相关指标,可以修改下面的函数



func ProcCollect(p *models.ProcCollect) {
ps, err := process.Processes()
if err != nil {
logger.Error(err)
return
}
var memUsedTotal uint64 = 0
var memUtilTotal = 0.0
var cpuUtilTotal = 0.0
var items []*dataobj.MetricValue
cnt := 0
for _, procs := range ps {
if isProc(procs, p.CollectMethod, p.Target) {
cnt++
procCache, exists := cache.ProcsCache.Get(procs.Pid)
if !exists {
cache.ProcsCache.Set(procs.Pid, procs)
procCache = procs
}
mem, err := procCache.MemoryInfo()
if err != nil {
logger.Error(err)
continue
}
memUsedTotal += mem.RSS
memUtil, err := procCache.MemoryPercent()
if err != nil {
logger.Error(err)
continue
}
memUtilTotal += float64(memUtil)
cpuUtil, err := procCache.Percent(0)
if err != nil {
logger.Error(err)
continue
}
cpuUtilTotal += cpuUtil
}

}

...
}


插件采集

插件采集的调度逻辑和端口采集也类似,目前夜莺的插件是需要提前部署到执行插件的目标机器上的,如果想实现插件自动分发的逻辑的话,可以修改 ListPlugins() 的逻辑,从服务端提供一个下载插件的地址,然后agent从远端下载插件。



下面代码块是执行插件的实现,目前插件支持在web端配置命令行参数、环境变量、和stdin,这里可以做一个功能升级,如果待执行的插件是一个脚步程序的话,可以改造monapi模块和下面的逻辑,支持在页面上直接编写脚步,然后agent直接执行,可以不需要再进行插件下发



func PluginRun(plugin *Plugin) {
//...
params := strings.Split(plugin.Params, " ")
cmd := exec.Command(fpath, params...)
cmd.Dir = filepath.Dir(fpath)
var stdout bytes.Buffer

cmd.Stdout = &stdout
var stderr bytes.Buffer
cmd.Stderr = &stderr

if plugin.Stdin != "" {
cmd.Stdin = bytes.NewReader([]byte(plugin.Stdin))
}

if plugin.Env != "" {
envs := make(map[string]string)
err := json.Unmarshal([]byte(plugin.Env), &envs)
if err != nil {
logger.Errorf("plugin:%+v %v", plugin, err)
return
}
for k, v := range envs {
cmd.Env = append(cmd.Env, fmt.Sprintf("%s=%s", k, v))
}
}

err := cmd.Start()
//...
}


日志采集

日志采集的调度逻辑和端口进程采集略有不同,因为代码是从滴滴开源的 falcon-log-agent 中集成过来的, 日志采集的实现思路是实时读取滚动的日志,使用正则表达式对日志进行匹配,然后把命中匹配的日志转换为时序数据,主要通过 worker 对象的 producer方法来实现,目前日志采集还遗留一个优化项,将最后匹配到的日志,放到监控数据的 Extra 字段中,这样如果出发的告警事件,最后一条错误日志可以很方便的可以看到,加快问题的定位。

采集策略下发逻辑



最后再补充下采集策略的下发逻辑,agent要执行哪些采集策略,是从monapi获取的,用户在web界面添加了采集策略是和服务树节点相关的,并没有机器之间关联,所有monapi会对采集策略进行一次处理,根据服务树节点找到下面的机器,然后将采集策略和机器关联起来,放到一个map中,等待agent来获取,如果想知道某台机器被分配了哪些采集策略的话,可以通过下面接口查询到



curl http://monapi-ip/api/mon/collects/endpoint_ip


如果想知道某台机器的agent实际获取了哪些采集策略的话,可以通过agent提供的http接口查询到

curl http://127.0.0.1:2080/api/collector/stra


agent处理采集监控指标之后,还可以接受监控数据的上报,提供了监控数据上报的http接口,agent获取的监控数据之后,下一步是把数据上报给服务端,即是下面我们要讲解的 transfer 模块。

transfer 解读

transfer 主要负责的工作有两个

  • 监控数据接收

  • 监控数据查询



transfer接收到监控数据之后,会对监控数据进行有效性校验,函数是 CheckValidity() 如果想对监控数据进行改造的话,可以通过修改 CheckValidity() 函数来完成。



transfer接收到数据之后,会把数据放到自己的内存队列中,然后由另个一个任务实时的从队列消费数据再发给不同的后端



// send to judge
backend.Push2JudgeQueue(items)

if aggr.AggrConfig.Enabled {
go aggr.SendToAggr(items)
}

// send to push endpoints
pushEndpoints, err := backend.GetPushEndpoints()
if err != nil {
logger.Errorf("could not find pushendpoint")
return err
} else {
for _, pushendpoint := range pushEndpoints {
pushendpoint.Push2Queue(items)
}
}
//...


夜莺的默认后端是tsdb模块,transfer在将监控数据分发给tsdb的时候,使用了一致性hash 算法,保证了同一条曲线的监控数据,每次都可以转发到相同的后端,同时也负责了监控数据查询的工作,因为知道数据发给谁,所以也知道去哪里获取数据。



transfer 目前可以对接多种后端存储,如果已支持的存储不满足大家的需求的,可以根据定义好的 DataSource interface ,开发支持自己的存储模块,只要实现以下方法即可



type DataSource interface {
PushEndpoint

// query data for judge
QueryData(inputs []dataobj.QueryData) []*dataobj.TsdbQueryResponse
// query data for ui
QueryDataForUI(input dataobj.QueryDataForUI) []*dataobj.TsdbQueryResponse

// query metrics & tags
QueryMetrics(recv dataobj.EndpointsRecv) *dataobj.MetricResp
QueryTagPairs(recv dataobj.EndpointMetricRecv) []dataobj.IndexTagkvResp
QueryIndexByClude(recv []dataobj.CludeRecv) []dataobj.XcludeResp
QueryIndexByFullTags(recv []dataobj.IndexByFullTagsRecv) []dataobj.IndexByFullTagsResp

// tsdb instance
GetInstance(metric, endpoint string, tags map[string]string) []string
}


本篇讲解到此结束,下篇将为大家带来告警链路模块的代码解读

作者简介

秦叶宁 企业级开源运维平台 Nightingale 主程,Urlooker 作者,现负责滴滴私有云运维产品方向的工作,如有运维平台的搭建需求,欢迎与我联系:)





发布于: 2020 年 12 月 24 日阅读数: 37
用户头像

秦叶宁

关注

还未添加个人签名 2018.03.16 加入

还未添加个人简介

评论

发布
暂无评论
夜莺二次开发指南-监控系统(1)