# -*- coding: utf-8 -*-
import subprocess
import re
import datetime
import requests
import sys
import pandas as pd
days = None
def get_cluster_name():
try:
command = "tiup cluster list"
result = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = result.communicate()
cluster_name_match = re.search(r'([a-zA-Z0-9_-]+)\s+tidb\s+v', output.decode('utf-8'))
if cluster_name_match:
return cluster_name_match.group(1)
else:
return None
except Exception as e:
print("An error occurred:", e)
return None
def display_cluster_info(cluster_name):
if not cluster_name:
print("Cluster name not found.")
return
try:
command = "tiup cluster display {0}".format(cluster_name)
result = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
output, error = result.communicate()
return output.decode('utf-8')
except Exception as e:
print("An error occurred:", e)
def extract_id_role(output):
id_role_dict = {}
lines = output.strip().split("\n")
for line in lines[2:]:
parts = line.split()
if parts[0].startswith('10.'):
node_id, role = parts[0], parts[1]
id_role_dict[node_id] = role
return id_role_dict
def get_prometheus_ip(data_dict):
prometheus_ip = None
for key, value in data_dict.items():
if value == 'prometheus':
prometheus_ip = key
break
return prometheus_ip
def get_tasks():
global days
tasks = {
# TiKV 1
'TiDB.tikv.TiKV_server_is_down': {
'pql': 'probe_success{group="tikv",instance=~".*"} == 0',
'pql_max': '',
'note': 'TiKV 服务不可用'
},
'TiDB.tikv.TiKV_node_restart': {
'pql': 'changes(process_start_time_seconds{job="tikv",instance=~".*"}[24h])> 0',
'pql_max': 'max(changes(process_start_time_seconds{job="tikv",instance=~".*"}[24h]))',
'note': 'TiKV 服务5分钟内出现重启'
},
'TiDB.tikv.TiKV_GC_can_not_work': {
'pql_max': '',
'pql': 'sum(increase(tikv_gcworker_gc_tasks_vec{task="gc", instance=~".*"}[2d])) by (instance) < 1 and (sum(increase('
'tikv_gc_compaction_filter_perform{instance=~".*"}[2d])) by (instance) < 1 and sum(increase('
'tikv_engine_event_total{cf="write",db="kv",type="compaction",instance=~".*"}[2d])) by (instance) >= 1)',
'note': 'TiKV 服务GC无法工作'
},
# TiKV 2
'TiDB.tikv.TiKV_raftstore_thread_cpu_seconds_total': {
'pql_max': 'max_over_time(avg(rate(tikv_thread_cpu_seconds_total{name=~"(raftstore|rs)_.*"}[1m])) by (instance)[24h:1m])',
'pql': 'max_over_time(avg(rate(tikv_thread_cpu_seconds_total{name=~"(raftstore|rs)_.*"}[1m])) by (instance)[24h:1m]) > 0.8',
'note': 'TiKV raftstore 线程池 CPU 使用率过高'
},
'TiDB.tikv.TiKV_approximate_region_size': {
'pql_max': 'max_over_time(histogram_quantile(0.99, sum(rate(tikv_raftstore_region_size_bucket{instance=~".*"}[1m])) '
'by (le,instance))[24h:1m])',
'pql': 'max_over_time(histogram_quantile(0.99, sum(rate(tikv_raftstore_region_size_bucket{instance=~".*"}[1m])) '
'by (le,instance))[24h:1m]) > 1073741824',
'note': 'TiKV split checker 扫描到的最大的 Region approximate size 大于 1 GB'
},
'TiDB.tikv.TiKV_async_request_write_duration_seconds': {
'pql_max': 'max_over_time(histogram_quantile(0.99, sum(rate(tikv_storage_engine_async_request_duration_seconds_bucket'
'{type="write", instance=~".*"}[1m])) by (le, instance, type))[24h:1m])',
'pql': 'max_over_time(histogram_quantile(0.99, sum(rate(tikv_storage_engine_async_request_duration_seconds_bucket'
'{type="write", instance=~".*"}[1m])) by (le, instance, type))[24h:1m]) > 1',
'note': 'TiKV 中Raft写入响应时间过长'
},
'TiDB.tikv.TiKV_write_stall': {
'pql_max': 'max_over_time(delta(tikv_engine_write_stall{instance=~".*"}[10m])[24h:10m])',
'pql': 'max_over_time(delta('
'tikv_engine_write_stall{instance=~".*"}[10m])[24h:10m]) > 10',
'note': 'TiKV 中存在写入积压'
},
# TiKV 3
'TiDB.tikv.TiKV_server_report_failure_msg_total': {
'pql_max': 'max_over_time(sum(rate(tikv_server_report_failure_msg_total{type="unreachable"}[10m])) BY (instance)[24h:10m])',
'pql': 'max_over_time(sum(rate(tikv_server_report_failure_msg_total{type="unreachable"}[10m])) BY (instance)[24h:10m]) > 10',
'note': 'TiKV 节点报告失败次数过多'
},
'TiDB.tikv.TiKV_channel_full_total': {
'pql_max': 'max_over_time(sum(rate(tikv_channel_full_total{instance=~".*"}[10m])) BY (type, instance)[24h:10m])',
'pql': 'max_over_time(sum(rate(tikv_channel_full_total{instance=~".*"}[10m])) BY (type, instance)[24h:10m]) > 0',
'note': 'TIKV 通道已占满 tikv 过忙'
},
'TiDB.tikv.TiKV_raft_log_lag': {
'pql_max': 'max_over_time(histogram_quantile(0.99, sum(rate(tikv_raftstore_log_lag_bucket{instance=~".*"}[1m])) by (le,instance))[24h:10m])',
'pql': 'max_over_time(histogram_quantile(0.99, sum(rate(tikv_raftstore_log_lag_bucket{instance=~".*"}[1m])) by (le, '
'instance))[24h:10m]) > 5000',
'note': 'TiKV 中 raft 日志同步相差过大'
},
'TiDB.tikv.TiKV_thread_unified_readpool_cpu_seconds': {
'pql_max': 'max_over_time(avg(rate(tikv_thread_cpu_seconds_total{name=~"unified_read_po*", instance=~".*"}[1m])) by (instance)[24h:1m])',
'pql': 'max_over_time(avg(rate(tikv_thread_cpu_seconds_total{name=~"unified_read_po*", instance=~".*"}[1m])) '
'by (instance)[24h:1m]) > 0.7',
'note': 'unifiled read 线程池使用率大于70%'
},
'TiDB.tikv.TiKV_low_space': {
'pql_max': 'sum(tikv_store_size_bytes{type="available"}) by (instance) / sum(tikv_store_size_bytes{type="capacity"}) by (instance)',
'pql': 'sum(tikv_store_size_bytes{type="available"}) by (instance) / sum(tikv_store_size_bytes{type="capacity"}) by (instance) < 0.3',
'note': 'TiKV 当前存储可用空间小于阈值'
},
}
for key, value in tasks.items():
for inner_key, inner_value in value.items():
if isinstance(inner_value, str) and 'pql' in inner_key:
value[inner_key] = inner_value.replace("24h:", f"{24 * days}h:").replace("[24h]", f"[{24 * days}h]")
return tasks
def request_prome(prometheus_address, query):
try:
response = requests.get('http://%s/api/v1/query' % prometheus_address, params={'query': query})
return response
except:
return None
def has_response(prometheus_address, query):
response = request_prome(prometheus_address, query)
if not response:
return False
try:
if response.json()["data"]['result']:
return True
else:
return False
except:
return False
def check_prome_alive(prometheus_address):
# dummy query is used to judge if prometheus is alive
dummy_query = 'probe_success{}'
return has_response(prometheus_address, dummy_query)
def find_alive_prome(prometheus_addresses):
if check_prome_alive(prometheus_addresses):
return prometheus_addresses
return None
# ip:port -> ip_port
def decode_instance(instance):
return instance.replace(':', '_')
def check_metric(alert_name, prometheus_address, pql, is_value, pql_max):
record = []
try:
is_warning = "异常"
response = request_prome(prometheus_address, pql)
alert_name = alert_name.split('.')
result = response.json()['data']['result']
# 判断是否出现异常
if len(result) == 0:
is_warning = "正常"
if pql_max == '':
result = [{'metric': {}, 'value': [0, '0']}]
else:
response = request_prome(prometheus_address, pql_max)
result = response.json()['data']['result']
for i in result:
# 判断是否按节点显示
if 'instance' in i['metric']:
instance = i['metric']['instance']
node = decode_instance(instance)
else:
node = '集群'
# 判断是否有type
if 'type' in i['metric']:
type = i['metric']['type']
else:
type = '无类型'
value = i['value'][1]
if value == 'NaN':
value = 0
else:
value = round(float(value), 3)
message = "%s,%s,%s,%s,%s,%s,%s,%s" % (
datetime.datetime.now(), node, alert_name[1], alert_name[2], type, is_warning, is_value, value)
print(message)
record.append(message)
except Exception as e:
print(alert_name[2] + "----An error occurred check_metric:", e)
return
return record
def csv_report(record):
data = pd.DataFrame([line.split(',') for line in record],
columns=['timestamp', 'ip_address', 'service', 'event_type', 'type', 'status', 'description',
'value'])
grouped = data.groupby("service")
writer = pd.ExcelWriter("inspection_report.xlsx", engine="xlsxwriter")
for name, group in grouped:
group.to_excel(writer, sheet_name=name, index=False)
worksheet = writer.sheets[name]
for i, col in enumerate(group.columns):
column_len = max(group[col].astype(str).str.len().max(), len(col)) + 2
worksheet.set_column(i, i, column_len)
writer.save()
def run_tasks(role_metrics, prometheus_address):
record = []
for alert in role_metrics:
pql = role_metrics[alert]['pql']
is_value = role_metrics[alert]['note']
pql_max = role_metrics[alert]['pql_max']
message = check_metric(alert, prometheus_address, pql, is_value, pql_max)
for data in message:
record.append(data)
csv_report(record)
def run_script(prometheus_addresses):
active_prometheus_address = find_alive_prome(prometheus_addresses)
# check if all prometheus are down
if not active_prometheus_address:
sys.exit()
tasks = get_tasks()
run_tasks(tasks, active_prometheus_address)
def get_user_input():
global days
try:
user_input = int(input("请输入需要巡检的天数: "))
days = user_input
except ValueError:
print("输入无效,请输入一个有效的数字。")
if __name__ == "__main__":
# 输入巡检天数
get_user_input()
prometheus_ip = '10.3.65.136:9091'
# prometheus_ip = None
if prometheus_ip is None:
cluster_name = get_cluster_name()
cluster_info = display_cluster_info(cluster_name)
id_role_dict = extract_id_role(cluster_info)
print(id_role_dict)
prometheus_ip = get_prometheus_ip(id_role_dict)
print(prometheus_ip)
run_script(prometheus_ip)
评论