Flask 實現遠程日誌實時監控
- 2019 年 11 月 30 日
- 筆記
去除業務相關邏輯
- 示例程式碼倉庫在 https://github.com/frostming/flask-webconsole-example
前言
在自動化運維繫統中,常常需要監控日誌,這些日誌是不斷更新的。本文提供了一種實時日誌監控的 Python 實現。主要實現以下功能:
- 抓取遠程機器的終端輸出到伺服器上。
- 將伺服器的日誌更新實時顯示到客戶端網頁上。
文中示例基於 Python 以及 Flask。
主要依賴:
- Flask
- Redis 及其 Python 客戶端
- paramiko
分析
總體來說要完成實時監控日誌的功能需要分為兩個方面:
- 實時讀取遠程輸出
- 將輸出實時顯示到頁面上
獲取遠程輸出
那麼下面要解決的問題是如何從遠程機器上獲取終端輸出並添加到日誌隊列中。在 Python 中,SSH 連接相關的庫是 paramiko,於是我自然就想用下面的方法:
Python
client = paramiko.SSHClient() client.load_system_host_keys() client.connect(host) stdin, stdout, stderr = client.exec_command(command) for line in stdout: print(line)
這樣是挺好的,但是很多時候日誌輸出時雜糅了標準輸出與錯誤輸出的,我希望能有一種方法,檢測到有新輸出則顯示輸出,有新錯誤則顯示錯誤,就像Terminal裡面那樣。所幸我們可以利用更低一級的channel對象來實現:
Python
def do_run_command(host, username, password, command, key): client = paramiko.SSHClient() hostname, port = host.split(':') client.load_system_host_keys() try: client.connect(hostname, port, username, password) stdin, stdout, stderr = client.exec_command(command) channel = stdout.channel pending = err_pending = None while not channel.closed or channel.recv_ready() or channel.recv_stderr_ready(): readq, _, _ = select.select([channel], [], [], 1) for c in readq: if c.recv_ready(): chunk = c.recv(len(c.in_buffer)) if pending is not None: chunk = pending + chunk lines = chunk.splitlines() if lines and lines[-1] and lines[-1][-1] == chunk[-1]: pending = lines.pop() else: pending = None [push_log(line.decode(), key) for line in lines] if c.recv_stderr_ready(): chunk = c.recv_stderr(len(c.in_stderr_buffer)) if err_pending is not None: chunk = err_pending + chunk lines = chunk.splitlines() if lines and lines[-1] and lines[-1][-1] == chunk[-1]: err_pending = lines.pop() else: err_pending = None [push_log(line.decode(), key) for line in lines] finally: client.close()
這裡使用了 select 來控制 IO,另外需要說明的是循環條件:當所有輸出都讀取完畢時channel.closed
為True
,而exit_status_ready()
是當進程運行結束時就為真了,此時輸出不一定都讀完了。pending
和chunk
是用來整行讀取的。
日誌實時更新
下面我們需要實現一種網頁顯示,當用戶訪問時,顯示當前日誌,若日誌有更新,只要網頁還打開,無需刷新,日誌就是實時更新到網頁上。另外,還需要考慮到有多個客戶端連接的情況,日誌應該是同步更新的。
對於一般的 HTTP 連接,客戶端一次請求完畢後立即得到響應,若不重新請求就無法得到新的響應,伺服器是被動的。要實現這種客戶端的子更新,大致有三種方法:AJAX, SSE 和 Websocket。
- AJAX 就是客戶端自動定時發請求,定時間隔事先指定,不是真正的實時。
- SSE 其實是一種長連接,只能實現伺服器向客戶端主動發送消息。
- Websocket 是伺服器與客戶端之間的全雙工通道,需要後端的軟體支援。
權衡以上三者,SSE 是能滿足我的要求的代價最小的選擇。它的原理是客戶端建立一個事件監聽器,監聽指定 URL 的消息,在伺服器端,這個 URL 返回的響應必須是一個流類型。只要將響應體設為一個生成器,並設置頭部為mimetype='text/event-stream'
就行了。在Flask
上,已經有封裝好的擴展Flask-SSE
,直接安裝使用就行了。Flask-SSE
是通過 Redis 的 Pubsub 實現的消息隊列。然而,只有在連接建立以後發送的數據才能收到。只並建立事件監聽接受新的日誌即可。程式碼如下:
<script> (function () { document.getElementById("post-form").onsubmit = function(e) { e.preventDefault(); var parentNode = document.getElementById('log-container'); parentNode.innerHTML = ""; var data = new FormData(document.getElementById('post-form')); fetch('/run', { method: 'POST', body: data }).then(resp => resp.json()).then(data => { var source = new EventSource('/stream?channel=' + data.uid); source.addEventListener('message', function(event) { var res = JSON.parse(event.data); var pre = document.createElement('pre'); pre.innerText = res.message; parentNode.appendChild(pre); }); }); } })(); </script>
相應地,添加日誌時就要同時發送消息到Pubsub:
Python
def push_log(message, channel): sse.publish({'message': message}, 'message', channel=channel)
幾個注意事項
- 若遠程腳本使用python運行時,需要帶上
-u
選項,否則print
的輸出不會立即吐出,而是有緩衝。 - redis 的pubsub 只會收到連接建立之後的消息,可能會造成消息丟失。可以在pubsub之外,另外持久化一份消息到redis中,顯示時,消息則由「redis中取出的消息」+ 「監聽收到的新消息」組成。
參考鏈接:
- http://flask-sse.readthedocs.io/en/latest/quickstart.html
- http://stackoverflow.com/a/32758464