多线程的并发测试

  • 2020 年 1 月 23 日
  • 筆記

在API的测试中,需要考虑分层的思想来应用于API的测试,其实在性能测试中,也是有不同层次的性能测试的,特别是在saas化架构的模式下,这种不同层次的性能测试显得更加突出。在性能测试中,经常常见的层次分别是端到端的测试,还有一类是API的性能测试,最后是具体的一个函数或者说是方法。今天单纯的来说性能测试分层中API的测试方式,当我们使用requests发送一个请求之后,需要验证协议的状态码以及响应数据,当然还有响应时间,响应时间可以把它理解为Request时间与Response时间的之和。

从性能测试的另外一个角度,可以分为IO密集型和CPU的密集型,IO密集型主要应用于如爬虫请求这些,以及IO读写交互这些的场景,在Python中使用多线程做性能测试比较高效,如果是CPU密集型可以使用多进程的方式。就以编写的API的测试代码为案例,具体被测试的API的代码如下:

#!/usr/bin/python3  #coding:utf-8  from flask import  Flask,make_response,jsonify,abort,request  from flask_restful import  Api,Resource    app=Flask(__name__)  app.debug = True  app.config['SECRET_KEY'] = 'super-secret'  api=Api(app=app)      books=[     {'id':1,'author':'wuya','name':'Python接口自动化测试实战','done':True},     {'id':2,'author':'无涯','name':'Selenium3自动化测试实战','done':False}  ]      class Books(Resource):       def get(self):        return jsonify({'status':0,'msg':'ok','datas':books})      api.add_resource(Books,'/v1/api/books')    if __name__ == '__main__':      app.run(debug=True)

依据上面的案例代码,来测试获取所有书籍的接口,实现它的源码为:

#!/usr/bin/env python  #!coding:utf-8  import  requests    r=requests.get(     url='http://127.0.0.1:5000/v1/api/books')  print(r.status_code)  print(r.elapsed.total_seconds())

依据上面的代码可以,能够看到服务端返回的状态码以及请求的响应时间。total_seconds()方法可以理解为持续时间的总秒数。下来模拟多线程对该API的测试,实现的代码如下:

#!/usr/bin/env python  #!coding:utf-8  import  requests  from threading import  Thread    def api():     r=requests.get(        url='http://127.0.0.1:5000/v1/api/books')     return r      if __name__ == '__main__':     tasks=list()     for i in range(1,10):        t=Thread(target=api)        tasks.append(t)        t.start()       for t in tasks:t.join()

执行如上的代码后,就看到看到发送的请求,如下图所示:

那么现在需要返回每次请求的状态码以及每次请求的持续时间总秒数,但是在Thread的类里面并没有返回值,需要自定义返回值,那么完善后的代码为:

#!/usr/bin/env python  #!coding:utf-8  import  requests  from threading import  Thread    def api(code,seconds):     r=requests.get(        url='http://127.0.0.1:5000/v1/api/books')     code=r.status_code     seconds=r.elapsed.total_seconds()     return code,seconds    class MyThread(Thread):     def __init__(self,func,args=()):        super(MyThread,self).__init__()        self.func=func        self.args=args       def run(self):        self.result=self.func(*self.args)       def getResult(self):        try:           return self.result        except BaseException as e:           return e.args[0]    if __name__ == '__main__':     tasks=list()     results=list()     for i in range(1,10000):        t=MyThread(api,args=(i,i))        tasks.append(t)        t.start()       for t in tasks:        t.join()        results.append(t.getResult())     for i in results:        print(i)

增加了返回值的获取以及返回值的循环输出,执行如上的代码后,可以看到CPU的变化是从低到高然后测试结束后,由回落到最低,如下图所示(测试前资源信息):

测试过程中资源信息如下图所示: