Python調用Prometheus監控數據並計算
- 2021 年 12 月 29 日
- 筆記
- linux伺服器運維
Prometheus是什麼
Prometheus是一套開源監控系統和告警為一體,由go語言(golang)開發,是監控+報警+時間序列數
據庫的組合。適合監控docker容器。因為kubernetes(k8s)的流行帶動其發展。
Prometheus的主要特點
- 多維度數據模型,由指標名稱和鍵/值對標識的時間序列數據。
- 作為一個時間序列資料庫,其採集的數據會以文件的形式存儲在本地中。
- 靈活的查詢語言,PromQL(Prometheus Query Language)函數式查詢語言。
- 不依賴分散式存儲,單個伺服器節點是自治的。
- 以HTTP方式,通過pull模型拉取時間序列數據。
- 也可以通過中間網關支援push模型。
- 通過服務發現或者靜態配置,來發現目標服務對象。
- 支援多種多樣的圖表和介面展示。
Prometheus原理架構圖
Prometheus基礎概念
什麼是時間序列數據
時間序列數據(TimeSeries Data) : 按照時間順序記錄系統、設備狀態變化的數據被稱為時序數據。
應用的場景很多,如:
- 無人駕駛運行中記錄的經度,緯度,速度,方向,旁邊物體距離等。
- 某一個地區的各車輛的行駛軌跡數據。
- 傳統證券行業實時交易數據。
- 實時運維監控數據等。
時間序列數據特點:
- 性能好、存儲成本低
什麼是targets(目標)
Prometheus 是一個監控平台,它通過抓取監控目標(targets)上的指標 HTTP 端點來從這些目標收集指標。
安裝完Prometheus Server端之後,第一個targets就是它本身。
具體可以參考官方文檔
什麼是metrics(指標)
Prometheus存在多種不同的監控指標(Metrics),在不同的場景下應該要選擇不同的Metrics。
Prometheus的merics類型有四種,分別為Counter、Gauge、Summary、Histogram。
- Counter:只增不減的計數器
- Gauge:可增可減的儀錶盤
- Histogram:分析數據分布情況
- Summary:使用較少
簡單了解即可,暫不需要深入理解。
通過瀏覽器訪問//被監控端IP:9100(被監控埠)/metrics
就可以查到node_exporter在被監控端收集的監控資訊
什麼是PromQL(函數式查詢語言)
Prometheus內置了一個強大的數據查詢語言PromQL。 通過PromQL可以實現對監控數據的查詢、聚合。
同時PromQL也被應用於數據可視化(如Grafana)以及告警當中。
通過PromQL可以輕鬆回答以下問題:
- 在過去一段時間中95%應用延遲時間的分布範圍?
- 預測在4小時後,磁碟空間佔用大致會是什麼情況?
- CPU佔用率前5位的服務有哪些?(過濾)
具體查詢細節可以參考官方。
如何監控遠程Linux主機
安裝Prometheus組件其實很簡單,下載包–解壓–後台啟動運行即可,不做具體演示。
在遠程linux主機(被監控端)上安裝node_exporter組件,可看下載地址
下載解壓後,裡面就一個啟動命令node_exporter
,直接啟動即可。
nohup /usr/local/node_exporter/node_exporter >/dev/null 2>&1 &
lsof -i:9100
nohup:如果直接啟動node_exporter的話,終端關閉進程也會隨之關閉,這個命令幫你解決問題。
Prometheus HTTP API
Prometheus 所有穩定的 HTTP API 都在 /api/v1
路徑下。當我們有數據查詢需求時,可以通過查詢 API 請求監控數據,提交數據可以使用 remote write 協議或者 Pushgateway 的方式。
支援的 API
API | 說明 | 需要認證 | 方法 |
---|---|---|---|
/api/v1/query | 查詢介面 | 是 | GET/POST |
/api/v1/query_range | 範圍查詢 | 是 | GET/POST |
/api/v1/series | series 查詢 | 是 | GET/POST |
/api/v1/labels | labels 查詢 | 是 | GET/POST |
/api/v1/label/<label_name>/values | label value 查詢 | 是 | GET |
/api/v1/prom/write | remote write 數據提交 | 是 | remote write |
Pushgateway | pushgateway 數據提交 | 是 | SDK |
認證方法
默認開啟認證,因此所有的介面都需要認證,且所有的認證方式都支援 Bearer Token和 Basic Auth。
調用介面的時候,我們需要攜帶Basic Auth請求頭的認證,否則會出現401。
Bearer Token
Bearer Token 隨著實例產生而生成,可以通過控制台進行查詢。了解 Bearer Token 更多資訊,請參見 Bearer Authentication。
Basic Auth
Basic Auth 兼容原生 Prometheus Query 的認證方式,用戶名為用戶的 APPID,密碼為 bearer token(實例產生時生成),可以通過控制台進行查詢。了解 Basic Auth 更多資訊,請參見 Basic Authentication。
數據返回格式
所有 API 的響應數據格式都為 JSON。每一次成功的請求會返回 2xx
狀態碼。
無效的請求會返回一個包含錯誤對象的 JSON 格式數據,同時也將包含一個如下表格的狀態碼:
狀態碼 | 含義 |
---|---|
401 | 認證失敗 |
400 | 當參數缺失或錯誤時返回無效的請求狀態碼 |
422 | 當一個無效的表達式無法被指定時 (RFC4918) |
503 | 當查詢不可用或者被取消時返回服務不可用狀態碼 |
無效請求響應返回模板如下:
{
"status": "success" | "error",
"data": <data>,
// 當 status 狀態為 error 時,下面的數據將被返回
"errorType": "<string>",
"error": "<string>",
// 當執行請求時有警告資訊時,該欄位將被填充返回
"warnings": ["<string>"]
}
數據寫入
運維過程不需要對數據進行寫入,所以暫時不深入理解。
有興趣的同學可以看看官方文檔
監控數據查詢
當我們有數據查詢需求時,可以通過查詢 API 請求監控數據。
- 查詢 API 介面
GET /api/v1/query
POST /api/v1/query
查詢參數:
query=
time= <rfc3339 | unix_timestamp>: 時間戳, 可選。
timeout= -query.timeout
參數指定。
- 簡單的查詢
查詢當前狀態為up的監控主機:
curl -u "appid:token" '//IP:PORT/api/v1/query?query=up'
- 範圍查詢
GET /api/v1/query_range
POST /api/v1/query_range
根據時間範圍查詢需要的數據,這也是我們用得最多的場景,
這時我們需要用到 /api/v1/query_range
介面,示例如下:
$ curl '//localhost:9090/api/v1/query_range?query=up&start=2015-07-01T20:10:30.781Z&end=2015-07-01T20:11:00.781Z&step=15s'
{
"status" : "success",
"data" : {
"resultType" : "matrix",
"result" : [
{
"metric" : {
"__name__" : "up",
"job" : "prometheus",
"instance" : "localhost:9090"
},
"values" : [
[ 1435781430.781, "1" ],
[ 1435781445.781, "1" ],
[ 1435781460.781, "1" ]
]
},
{
"metric" : {
"__name__" : "up",
"job" : "node",
"instance" : "localhost:9091"
},
"values" : [
[ 1435781430.781, "0" ],
[ 1435781445.781, "0" ],
[ 1435781460.781, "1" ]
]
}
]
}
}
什麼是Grafana
Grafana是一個開源的度量分析和可視化工具,可以通過將採集的數據分析、查詢,
然後進行可視化的展示,並能實現報警。
網址: //grafana.com/
使用Grafana連接Prometheus
連接不再做具體演示,操作思路如下:
- 在Grafana伺服器上安裝,下載地址://grafana.com/grafana/download
- 瀏覽器
//grafana伺服器IP:3000
登錄,默認帳號密碼都是admin,就可以登陸了。 - 把Prometheus伺服器收集的數據做為一個數據源添加到Grafana,得到Prometheus數據。
- 然後為添加好的數據源做圖形顯示,最後在dashboard就可以查看到。
操作流程不難,就不講解重點,後面正式開始上查詢腳本。
工作使用場景
工作中需要通過CPU、記憶體生成資源利用率報表,可以通過Prometheus的API寫一個Python腳本。
可通過API獲取數據,然後再進行數據排序、過濾、運算、聚合,最後寫入Mysql資料庫。
CPU峰值計算
- 取最近一周CPU數值,再排序取最高的值。
def get_cpu_peak(self):
"""
CPU取最近一周所有數值,再排序取最高的值,TOP1
:return: {'IP' : value}
"""
# 拼接URL
pre_url = self.server_ip + '/api/v1/query_range?query='
expr = '100 - (avg by(instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) ' \
'&start=%s&end=%s&step=300' % (self.time_list[0], self.time_list[-1] - 1)
url = pre_url + expr
# print(url)
result = {}
# 請求URL後將Json數據轉為字典對象
res = json.loads(requests.post(url=url, headers=self.headers).content.decode('utf8', 'ignore'))
# print(data)
# 循環取出字典里每個IP的values,排序取最高值,最後存入result字典
for da in res.get('data').get('result'):
values = da.get('values')
cpu_values = [float(v[1]) for v in values] # 取出數值並存入列表
# 取出IP並消除埠號
ip = da.get('metric').get('instance')
ip = ip[:ip.index(':')] if ':' in ip else ip
# if ip == '10.124.58.181':
# print (ip)
# cpu_peak = round(sorted(cpu_values, reverse=True)[0], 2)
cpu_peak = sorted(cpu_values, reverse=True)[0]
# 取出IP和最高值之後,寫入字典
result[ip] = cpu_peak
# print(result)
return result
CPU均值計算
- 取最近一周CPU每一天的TOP20除以20得到當時忙時平均值,
再將7天平均值的和除以n,得到時間範圍內忙時平均值。
def get_cpu_average(self):
"""
CPU忙時平均值:取最近一周CPU數據,每一天的TOP20除以20得到忙時平均值;
再將一周得到的忙時平均值相加,再除以7,得到時間範圍內一周的忙時平均值。
:return:
"""
cpu_average = {}
for t in range(len(self.time_list)):
if t + 1 < len(self.time_list):
start_time = self.time_list[t]
end_time = self.time_list[t + 1]
# print(start_time, end_time)
# 拼接URL
pre_url = server_ip + '/api/v1/query_range?query='
expr = '100 - (avg by(instance) (irate(node_cpu_seconds_total{mode="idle"}[5m])) * 100) ' \
'&start=%s&end=%s&step=300' % (start_time, end_time - 1)
url = pre_url + expr
# print(url)
# 請求介面數據
data = json.loads(requests.post(url=url, headers=self.headers).content.decode('utf8', 'ignore'))
for da in data.get('data').get('result'): # 循環拿到result數據
values = da.get('values')
cpu_load = [float(v[1]) for v in values] # 循環拿到values裡面的所有值
ip = da.get('metric').get('instance') # 拿到instance裡面的ip
ip = ip[:ip.index(':')] if ':' in ip else ip # 去除個別後面帶的埠號
# avg_cup_load = sum(sorted(cpu_load, reverse=True)[:20]) / 20
# 取top20% 再除以20%,得出top20%的平均值
# avg_cup_load = round(sum(sorted(cpu_load, reverse=True)[:round(len(cpu_load) * 0.2)]) / round(len(cpu_load) * 0.2), 2)
# 倒序後取前面20%除以個數,得到前20%的平均值
avg_cup_load = sum(sorted(cpu_load, reverse=True)[:round(len(cpu_load) * 0.2)]) / round(len(cpu_load) * 0.2)
# print(avg_cup_load)
# 將計算後的數據以ip為key寫入字典
if cpu_average.get(ip):
cpu_average[ip].append(avg_cup_load)
else:
cpu_average[ip] = [avg_cup_load]
# 每日top20的平均值累加,共7天的再除以7
for k, v in cpu_average.items():
# cpu_average[k] = round(sum(v) / 7, 2)
cpu_average[k] = sum(v)
# print(cpu_average)
return cpu_average
記憶體峰值計算
- 取7天記憶體數值,排序後取最高峰值TOP1
def get_mem_peak(self):
"""
記憶體單台峰值:取7天記憶體最高峰值TOP1
:return: 7天記憶體使用率最高峰值
"""
pre_url = self.server_ip + '/api/v1/query_range?query='
# expr = '(node_memory_MemTotal_bytes - (node_memory_MemFree_bytes+node_memory_Buffers_bytes+node_memory_Cached_bytes )) / node_memory_MemTotal_bytes * 100&start=%s&end=%s&step=300' % (start_time, end_time)
# 字元太長會導致報錯,所以這裡進行拆分欄位計算
expr_MenTotal = 'node_memory_MemTotal_bytes&start=%s&end=%s&step=300' % (self.time_list[0], self.time_list[-1] - 1)
expr_MemFree = 'node_memory_MemFree_bytes&start=%s&end=%s&step=300' % (self.time_list[0], self.time_list[-1] - 1)
expr_Buffers = 'node_memory_Buffers_bytes&start=%s&end=%s&step=300' % (self.time_list[0], self.time_list[-1] - 1)
expr_Cached = 'node_memory_Cached_bytes&start=%s&end=%s&step=300' % (self.time_list[0], self.time_list[-1] - 1)
result = {}
# 循環分別取出總記憶體、可用記憶體、Buffer塊、快取塊四個欄位
for ur in expr_MenTotal, expr_MemFree, expr_Buffers, expr_Cached:
url = pre_url + ur
data = json.loads(requests.post(url=url, headers=self.headers).content.decode('utf8', 'ignore'))
ip_dict = {}
# 循環單個欄位所有值
for da in data.get('data').get('result'):
ip = da.get('metric').get('instance')
ip = ip[:ip.index(':')] if ':' in ip else ip
# if ip != '10.124.53.12':
# continue
if ip_dict.get(ip): # 過濾重複的ip,重複ip會導致計算多次
# print("重複ip:%s" % (ip))
continue
values = da.get('values')
# 將列表裡的值轉為字典方便計算
values_dict = {}
for v in values:
values_dict[str(v[0])] = v[1]
# 標記ip存在
ip_dict[ip] = True
# 建立列表追加字典
if result.get(ip):
result[ip].append(values_dict)
else:
result[ip] = [values_dict]
# print(result)
# 對取出的四個值進行計算,得出峰值
for ip, values in result.items():
values_list = []
for k, v in values[0].items():
try:
values_MenTotal = float(v)
values_MemFree = float(values[1].get(k, 0))
values_Buffers = float(values[2].get(k, 0)) if values[2] else 0
values_Cached = float(values[3].get(k, 0)) if values[3] else 0
# 如果是0,不參與計算
if values_MemFree==0.0 or values_Buffers==0.0 or values_Cached==0.0:
continue
# values_list.append(round((values_MenTotal - (values_MemFree + values_Buffers + values_Cached)) / values_MenTotal * 100, 2))
# 合併後計算,得出列表
values_list.append((values_MenTotal - (values_MemFree + values_Buffers + values_Cached)) / values_MenTotal * 100)
# 對得出結果進行排序
result[ip] = sorted(values_list, reverse=True)[0]
except Exception as e:
# print(values[0])
logging.exception(e)
# print(result)
return result
記憶體均值計算
- 先取出7天的日期,根據多條鏈接循環取出每天數據,排序value取top20除以20,最終7天數據再除以7
def get_mem_average(self):
"""
記憶體忙時平均值:先取出7天的日期,根據多條鏈接循環取出每天數據,排序value取top20除以20,最終7天數據再除以7
:return:
"""
avg_mem_util = {}
for t in range(len(self.time_list)):
if t + 1 < len(self.time_list):
start_time = self.time_list[t]
end_time = self.time_list[t + 1]
# 根據多條鏈接循環取出每天數據
pre_url = self.server_ip + '/api/v1/query_range?query='
# expr = '(node_memory_MemTotal_bytes - (node_memory_MemFree_bytes+node_memory_Buffers_bytes+node_memory_Cached_bytes )) / node_memory_MemTotal_bytes * 100&start=%s&end=%s&step=300' % (start_time, end_time)
expr_MenTotal = 'node_memory_MemTotal_bytes&start=%s&end=%s&step=600' % (start_time, end_time - 1)
expr_MemFree = 'node_memory_MemFree_bytes&start=%s&end=%s&step=600' % (start_time, end_time - 1)
expr_Buffers = 'node_memory_Buffers_bytes&start=%s&end=%s&step=600' % (start_time, end_time - 1)
expr_Cached = 'node_memory_Cached_bytes&start=%s&end=%s&step=600' % (start_time, end_time - 1)
result = {}
# 循環取出四個欄位
for ur in expr_MenTotal, expr_MemFree, expr_Buffers, expr_Cached:
url = pre_url + ur
data = json.loads(requests.post(url=url, headers=self.headers).content.decode('utf8', 'ignore'))
ip_dict = {}
# 循環單個欄位所有值
for da in data.get('data').get('result'):
ip = da.get('metric').get('instance')
ip = ip[:ip.index(':')] if ':' in ip else ip
if ip_dict.get(ip):
# print("重複ip:%s" % (ip))
continue
values = da.get('values')
# 將列表裡的值轉為字典方便計算
values_dict = {}
for v in values:
values_dict[str(v[0])] = v[1]
# 標記ip存在
ip_dict[ip] = True
# 建立列表追加字典
if result.get(ip):
result[ip].append(values_dict)
else:
result[ip] = [values_dict]
# print(result)
for ip, values in result.items():
values_list = []
for k, v in values[0].items():
try:
values_MenTotal = float(v)
values_MemFree = float(values[1].get(k, 0)) if values[1] else 0
values_Buffers = float(values[2].get(k, 0)) if values[2] else 0
values_Cached = float(values[3].get(k, 0)) if values[3] else 0
if values_MemFree == 0.0 or values_Buffers == 0.0 or values_Cached == 0.0:
continue
value_calc = (values_MenTotal - (values_MemFree + values_Buffers + values_Cached)) / values_MenTotal * 100
if value_calc != float(0):
values_list.append(value_calc)
except Exception as e:
print(values[0])
# logging.exception(e)
continue
# 排序value取top20除以20
# avg_mem = round(sum(sorted(values_list, reverse=True)[:round(len(values_list) * 0.2)]) / round(len(values_list) * 0.2), 2)
try:
avg_mem = sum(sorted(values_list, reverse=True)[:round(len(values_list) * 0.2)]) / round(len(values_list) * 0.2)
except Exception as e:
avg_mem = 0
logging.exception(e)
if avg_mem_util.get(ip):
avg_mem_util[ip].append(avg_mem)
else:
avg_mem_util[ip] = [avg_mem]
# 最終7天數據再除以7
for k, v in avg_mem_util.items():
# avg_mem_util[k] = round(sum(v) / 7, 2)
avg_mem_util[k] = sum(v)
return avg_mem_util
導出excel
- 將採集到的數據導出excel
def export_excel(self, export):
"""
將採集到的數據導出excel
:param export: 數據集合
:return:
"""
try:
# 將字典列錶轉換為DataFrame
pf = pd.DataFrame(list(export))
# 指定欄位順序
order = ['ip', 'cpu_peak', 'cpu_average', 'mem_peak', 'mem_average', 'collector']
pf = pf[order]
# 將列名替換為中文
columns_map = {
'ip': 'ip',
'cpu_peak': 'CPU峰值利用率',
'cpu_average': 'CPU忙時平均峰值利用率',
'mem_peak': '記憶體峰值利用率',
'mem_average': '記憶體忙時平均峰值利用率',
'collector': '來源地址'
}
pf.rename(columns=columns_map, inplace=True)
# 指定生成的Excel表格名稱
writer_name = self.Host + '.xlsx'
writer_name.replace(':18600', '')
# print(writer_name)
file_path = pd.ExcelWriter(writer_name.replace(':18600', ''))
# 替換空單元格
pf.fillna(' ', inplace=True)
# 輸出
pf.to_excel(file_path, encoding='utf-8', index=False)
# 保存表格
file_path.save()
except Exception as e:
print(e)
logging.exception(e)
因為機房需要保留數據方便展示,後面改造成採集直接入庫mysql。
—- 鋼鐵知識庫 [email protected] 2021.12.29
寫在最後
以上簡單介紹了Prometheus架構、基礎概念、API使用,以及Python調用Prometheus的API部分示例,完整程式碼也已經上傳,需要自取或聯繫即可。
下載鏈接:
//download.csdn.net/download/u011463397/72150839
參考鏈接:
Prometheus操作指南://github.com/yunlzheng/prometheus-book
官方查詢API://prometheus.io/docs/prometheus/latest/querying/api/