性能監控平台搭建 — 集成Locust性能數據
- 2019 年 10 月 4 日
- 筆記
之前的幾篇關於性能監控平台搭建的文章,分別介紹了性能測試中的資源數據採集、存儲及展示。今天一起來看下如何完成Locust性能數據的採集。

這是之前介紹過的性能監控平台的整體架構圖,想要了解其它部分的搭建,可以查看相關文章《Telegraf安裝與簡易使用指南》、《InfluxDB安裝與簡易使用指南》、《Grafana安裝與簡易使用指南》
因為我們已經完成了資源數據的採集,以及監控數據的存儲與展示,剩下的就是採集性能數據了。規劃中我們需要支援採集JMeter和Locust工具的性能數據,今天先講解如何採集Locust的性能數據。
問題概述
如果你使用過Locust,那麼你一定知道Locust本身自帶一個WEB服務,它提供了性能測試過程中的性能數據監控,並且也提供了一個圖形的介面支援實時監控,完事了還可以下載csv格式的性能測試數據。
既然Locust已經有了性能數據的監控功能,為哈還要接入到性能監控平台呢?因為Locust里的數據沒有主動持久化,一旦刷新就沒有了;也不會自動保存歷史數據;不能對數據進行訂製化展示,不能在同一個平台中查看全部的性能數據。
為此我們要解決的就是把Locust性能工具中的性能數據實時的獲取到並存儲到Influxdb中,這樣就完美的解決了Locust性能數據集成問題,讓監控平台可以無縫的支援Locust工具。
獲取Locust性能數據介面
既然要採集性能測試數據,那麼首先要考慮的就是如何獲取性能測試數據?是修改源碼?還是開發插件?這些統統不要!因為Locust本身就已經有了性能數據監控服務,通過抓取Locust的WEB服務頁面請求,很方便的就得到了Locust的性能監控數據。比如:
curl http://localhost:8089/stats/requests
該URL會返回當前性能測試到目前為止的性能測試數據的總結資訊,所以我們需要的性能數據基本上Locust已經為我們打包好了,我們只要請求這個URL就可以實時的獲取到現在的性能數據。
定時採集性能數據
數據獲取的方式已經知道了,接下來考慮的就是在什麼時候獲取數據的問題。最簡單粗暴的方式就是寫一個定時任務去請求該URL,獲取數據後直接存儲到Influxdb即可。程式碼如下:
def get_locust_stats_by_web_api(): print("get_locust_stats") try: start_url = f'http://localhost:8089/stats/requests' print(start_url) return requests.get(start_url).json() except Exception as e: print(e)
而這樣做的弊端則是定時任務與性能測試啟停的一致性需要人為的控制,用戶友好性不夠。我們希望的是性能測試一開始它就自動開始採集性能數據,性能測試一結束它就停止採集性能數據,要做到對目前的性能測試操作盡量無侵入。
性能數據採集一致性
為了解決性能數據採集與性能測試之間的一致性問題,我們需要把程式碼集成到Locust性能測試腳本中,讓它跟腳本綁定,這樣一旦開始執行性能測試,就會觸發性能數據採集的定時任務,從根本上解決了一致性問題。
no-web模式下獲取性能數據
前面我們獲取Locust性能測試數據時,是通過/stats/requests
介面獲取到的。這個介面是基於WEB模式下,一旦我們選擇以no-web的方式啟動Locust,那麼這個介面就會失效了。
為了兼容no-web模式下也能正常採集到Locust的性能數據,可以直接把/stats/requests
介面生成性能測試數據的程式碼直接COPY過來即可,所以獲取Locust性能測試數據的方法需要改寫成這樣:
def get_locust_stats(): stats = [] for s in chain(sort_stats(runners.locust_runner.request_stats), [runners.locust_runner.stats.total]): stats.append({ "method": s.method, "name": s.name, "num_requests": s.num_requests, "num_failures": s.num_failures, "avg_response_time": s.avg_response_time, "min_response_time": s.min_response_time or 0, "max_response_time": s.max_response_time, "current_rps": s.current_rps, "median_response_time": s.median_response_time, "avg_content_length": s.avg_content_length, }) errors = [e.to_dict() for e in six.itervalues(runners.locust_runner.errors)] # Truncate the total number of stats and errors displayed since a large number of rows will cause the app # to render extremely slowly. Aggregate stats should be preserved. report = {"stats": stats[:500], "errors": errors[:500]} if stats: report["total_rps"] = stats[len(stats) - 1]["current_rps"] report["fail_ratio"] = runners.locust_runner.stats.total.fail_ratio report[ "current_response_time_percentile_95"] = runners.locust_runner.stats.total.get_current_response_time_percentile( 0.95) report[ "current_response_time_percentile_50"] = runners.locust_runner.stats.total.get_current_response_time_percentile( 0.5) is_distributed = isinstance(runners.locust_runner, MasterLocustRunner) if is_distributed: slaves = [] for slave in runners.locust_runner.clients.values(): slaves.append({"id": slave.id, "state": slave.state, "user_count": slave.user_count}) report["slaves"] = slaves report["state"] = runners.locust_runner.state report["user_count"] = runners.locust_runner.user_count return report
slave模式下不進行數據採集
同樣的Locust還有分散式模式,一旦採用該模式之後Locust性能腳本會在master和各slave節點都會執行,但是很明顯我們不希望接收到多次重複的性能採集數據,所以需要保證只有在master上的性能腳本才會進行性能數據採集。
def monitor(project_name): print("start monitoring") slave = isinstance(runners.locust_runner, SlaveLocustRunner) if slave: # 判斷是否為slave print('is slave, will not rerun') return try: rep = get_locust_stats() if rep['state'] == 'running': host = get_locust_host() save_to_db(project_name, host, rep) print(f'is_slave: {slave}, host: {host}, project_name: {project_name}') else: print('it is not running now') except Exception as e: print(e) timer = threading.Timer(interval, monitor, args=[project_name]) # 定時任務 timer.start()
封裝
當然,我們也不希望把這麼多的性能數據採集的程式碼直接寫在Locust的性能測試腳本中,即不美觀也不容易管理。所以需要把這些採集數據的程式碼統一封裝到一個獨立文件中,並對外提供一個調用入口,只要簡單引入即可。調用程式碼如下:
from locust import HttpLocust, TaskSet, task, events from locust2db import start # 引入性能採集模組 start('locust_monitoring') # 調用性能採集模組 class WebsiteTasks(TaskSet): def on_start(self): self.client.get("/login") @task def index(self): self.client.get("/") class WebsiteUser(HttpLocust): task_set = WebsiteTasks host = 'https://www.testqa.cn' min_wait = 1000 max_wait = 1000