  • 2023-03-01
普罗米修斯提供了多种语言的 client, 用户可以使用 client 很方便的构建自己的 exporter 服务, 后续只需要修改普罗米修斯的配置文件, 就可以把 exporter 加入到普罗米修斯中来。

python client 的使用

首先 需要用 pip install prometheus_client安装客户端

import timefrom prometheus_client.core import GaugeMetricFamily, REGISTRY, CounterMetricFamilyfrom prometheus_client import start_http_server

class CustomCollector(object): def __init__(self): pass
def collect(self): g = GaugeMetricFamily("MemoryUsage", 'Help text', labels=['instance']) g.add_metric(["instance01.us.west.local"], 20) yield g
c = CounterMetricFamily("HttpRequests", 'Help text', labels=['app']) c.add_metric(["example"], 2000) yield c

if __name__ == '__main__': start_http_server(8000) REGISTRY.register(CustomCollector())

上面是在 python 中开发一个 exporter 最简单的方式。 我们可以使用prometheus_client 内置的GaugeMetricFamily 和 CounterMetricFamily 来构建自己的监控指标。

java client 的使用


<!-- The client --><dependency>    <groupId>io.prometheus</groupId>    <artifactId>simpleclient</artifactId>    <version>0.6.0</version></dependency>
<dependency> <groupId>io.prometheus</groupId> <artifactId>simpleclient_httpserver</artifactId> <version>0.6.0</version></dependency>

package exporter;
import io.prometheus.client.Counter;import io.prometheus.client.exporter.HTTPServer;
import java.io.IOException;import java.util.Random;import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;
public class CustomExporter { # 注册一个counter类型的监控指标并带有一个名字叫method的label static final Counter requests = Counter.build() .name("my_library_requests_total").help("Total requests.") .labelNames("method").register();
public static void processGetRequest() { requests.labels("get").inc(); // 在这编写监控逻辑 }
public static void main(String[] args) throws IOException { // 启动一个线程池,每隔10s种触发一次调用监控逻辑 ScheduledExecutorService service = Executors.newScheduledThreadPool(1); service.scheduleWithFixedDelay(() -> { // 模拟一个监控动作, 调用counter类型的自增方法。 当然也可以调用上面的processGetRequest方法 double value = requests.labels("get").get(); System.out.println(value); requests.labels("get").inc(); }, 0, 10, TimeUnit.SECONDS);
// 利用普罗米修斯提供的httpserver 启动服务 HTTPServer server = new HTTPServer(1234);


go client 的使用

我们实际用 go client 来开发一个监控在 k8s 集群中监控每一个容器的 socket 状态的 exporter。 首先我们需要通过 go mod 文件拉引入依赖。PS:代码的逻辑解释在注释中。

module prophet-container-exporter
go 1.13
require ( github.com/pkg/errors v0.9.1 github.com/prometheus/client_golang v1.5.1 github.com/sirupsen/logrus v1.4.2 k8s.io/api v0.0.0-20190620084959-7cf5895f2711 k8s.io/apimachinery v0.0.0-20190612205821-1799e75a0719 k8s.io/client-go v0.0.0-20190620085101-78d2af792bab)

注意: 除了普罗米修斯的 client 之外, 还需要引入 k8s 的 client-go 用来实际的去监控容器的状态。


func init() {   log.SetOutput(os.Stdout)   log.Info("init the kubeconfig")   // 初始化k8s的client有两种方式。 如果当前是在pod中运行的话, client自己会找到容器对应目录下的service account信息与k8s的apiserver通信鉴权。 如果程序在集群外部运行, 那么需要给client提供一个kubeconfig文件   if isExist, err := PathExists("kubeconfig"); err != nil {      panic(err)   } else {      if isExist {         log.Info("now out of k8s cluster")         kubeConfig, err = clientcmd.BuildConfigFromFlags("", "kubeconfig")      } else {         log.Info("now In k8s cluster")         kubeConfig, err = rest.InClusterConfig()      }      if err != nil {         log.Error("cannot init the kubeconfig")         panic(err.Error())      }   }   var err error   k8s, err = kubernetes.NewForConfig(kubeConfig)   if err != nil {      log.Error("cannot init the k8s client")      panic(err.Error())   }   log.Info("init the k8sclient done, now begin to monitor the k8s")

// 在开始监控之前首先遍历当前k8s集群中所有的pod,为每个容器建立一个guage类型的监控指标并记录在一个map中,方便后面程序根据这些容器执行具体的监控逻辑。 register := func() { namespaceList, err := k8s.CoreV1().Namespaces().List(metav1.ListOptions{}) if err != nil { log.Error(err) os.Exit(1) }
for _, n := range namespaceList.Items { namespace := n.Name podList, err := k8s.CoreV1().Pods(namespace).List(metav1.ListOptions{}) if err != nil { panic(errors.Wrapf(err, "cannot list k8s with namespace %s", namespace)) } // 遍历所有pod for _, pod := range podList.Items { # 只监控Running状态的Pod if pod.Status.Phase != "Running" { continue }
// 遍历pod下的容器, 每个容器的指标有3个label。1. 名称空间 2. pod名称 3. 容器名称 for _, container := range pod.Status.ContainerStatuses { sdGauge := prometheus.NewGauge(prometheus.GaugeOpts{ Name: "namespace_container_Socket_Close_Wait", Help: "num of socket with CLOSE-WAIT status in container", ConstLabels: map[string]string{ "namespace": namespace, "pod": pod.Name, "container": container.Name, }, }) if _, ok := sdMetrics[pod.Name+","+namespace]; !ok { prometheus.MustRegister(sdGauge) sdMetrics[pod.Name+","+namespace] = sdGauge } } } }
// 上来就注册一次 register()
// 周期性注册namespace下所有pod中app容器的指标, 因为随时会有新的pod启动 go func() { for { time.Sleep(time.Minute * 10) register() } }()}


func main() {
// 周期性的获取最新的监控数据。 遍历所有容器, 向容器发送`cat /proc/net/tcp | awk '{print $4}'|grep 08|wc -l` 命令,该命令可以查询容器中socket的状态,计算一共有多少个处于CLOSE-WAIT的socket。而我们的监控目的就是通过这个指标判断是否存在socket泄露. go func() { for { for podInfo, guage := range sdMetrics { tmp := strings.Split(podInfo, ",") podName := tmp[0] namespace := tmp[1]
log.WithFields(log.Fields{ "namespace": namespace, "pod": podName, }) pod, _ := k8s.CoreV1().Pods(namespace).Get(podName, metav1.GetOptions{})
for _, container := range pod.Status.ContainerStatuses { commands := []string{"sh", "-c", "cat /proc/net/tcp | awk '{print $4}'|grep 08|wc -l"} output, err := promethues.Exec(k8s, kubeConfig, commands, namespace, podName, container.Name) if err != nil { log.Error(err.Error()) continue } closeWait, err := strconv.ParseFloat(strings.Replace(output, "\n", "", -1), 32)
if err != nil { fmt.Fprintf(os.Stdout, "err %s\n", errors.Wrap(err, "cannot trans string to float")) continue } guage.Set(closeWait) log.Infof("successfully collect %s's socket status", podName) } } time.Sleep(time.Minute * 10) } }()
http.Handle("/metrics", promhttp.Handler()) log.Fatal(http.ListenAndServe("", nil))

