Python+requests+unittest+excel实现接口自动化测试框架

  • 2019 年 10 月 6 日
  • 筆記

一、框架结构:

工程目录

代码:基于python2编写

二、Case文件设计

三、基础包 base

3.1 封装get/post请求(runmethon.py)

import requests  import json  class RunMethod:      def post_main(self,url,data,header=None):          res = None          if header !=None:              res = requests.post(url=url,data=data,headers=header)          else:              res = requests.post(url=url,data=data)          return res.json()        def get_main(self,url,data=None,header=None):          res = None          if header !=None:              res = requests.get(url=url,data=data,headers=header,verify=False)          else:              res = requests.get(url=url,data=data,verify=False)          return res.json()        def run_main(self,method,url,data=None,header=None):          res = None          if method == 'Post':              res = self.post_main(url,data,header)          else:              res = self.get_main(url,data,header)          return json.dumps(res,ensure_ascii=False,sort_keys=True,indent=2)

3.2 封装mock(mock.py)

from mock import mock  #模拟mock 封装  def mock_test(mock_method,request_data,url,method,response_data):      mock_method = mock.Mock(return_value=response_data)      res = mock_method(url,method,request_data)      return res

四、数据操作包 operation_data

4.1 获取excel单元格中的内容(get_data.py)

#coding:utf-8  from tool.operation_excel import OperationExcel  import data_config  from tool.operation_json import OperetionJson  from tool.connect_db import OperationMysql  class GetData:      def __init__(self):          self.opera_excel = OperationExcel()        #去获取excel行数,就是case的个数      def get_case_lines(self):          return self.opera_excel.get_lines()        #获取是否执行      def get_is_run(self,row):          flag = None          col = int(data_config.get_run())          run_model = self.opera_excel.get_cell_value(row,col)          if run_model == 'yes':              flag = True          else:              flag = False          return flag        #是否携带header      def is_header(self,row):          col = int(data_config.get_header())          header = self.opera_excel.get_cell_value(row,col)          if header != '':              return header          else:              return None      #获取请求方式      def get_request_method(self,row):          col = int(data_config.get_run_way())          request_method = self.opera_excel.get_cell_value(row,col)          return request_method        #获取url      def get_request_url(self,row):          col = int(data_config.get_url())          url = self.opera_excel.get_cell_value(row,col)          return url        #获取请求数据      def get_request_data(self,row):          col = int(data_config.get_data())          data = self.opera_excel.get_cell_value(row,col)          if data == '':              return None          return data        #通过获取关键字拿到data数据      def get_data_for_json(self,row):          opera_json = OperetionJson()          request_data = opera_json.get_data(self.get_request_data(row))          return request_data        #获取预期结果      def get_expcet_data(self,row):          col = int(data_config.get_expect())          expect = self.opera_excel.get_cell_value(row,col)          if expect == '':              return None          return expect        #通过sql获取预期结果      def get_expcet_data_for_mysql(self,row):          op_mysql = OperationMysql()          sql = self.get_expcet_data(row)          res = op_mysql.search_one(sql)          return res.decode('unicode-escape')        def write_result(self,row,value):          col = int(data_config.get_result())          self.opera_excel.write_value(row,col,value)        #获取依赖数据的key      def get_depend_key(self,row):          col = int(data_config.get_data_depend())          depent_key = self.opera_excel.get_cell_value(row,col)          if depent_key == "":              return None          else:              return depent_key        #判断是否有case依赖      def is_depend(self,row):          col = int(data_config.get_case_depend())          depend_case_id = self.opera_excel.get_cell_value(row,col)          if depend_case_id == "":              return None          else:              return depend_case_id        #获取数据依赖字段      def get_depend_field(self,row):          col = int(data_config.get_field_depend())          data = self.opera_excel.get_cell_value(row,col)          if data == "":              return None          else:              return data

4.2 获取excel中每个列(data_config.py)

#coding:utf-8  class global_var:      #case_id      Id = '0'      request_name = '1'      url = '2'      run = '3'      request_way = '4'      header = '5'      case_depend = '6'      data_depend = '7'      field_depend = '8'      data = '9'      expect = '10'      result = '11'  #获取caseid  def get_id():      return global_var.Id    #获取url  def get_url():      return global_var.url    def get_run():      return global_var.run    def get_run_way():      return global_var.request_way    def get_header():      return global_var.header    def get_case_depend():      return global_var.case_depend    def get_data_depend():      return global_var.data_depend    def get_field_depend():      return global_var.field_depend    def get_data():      return global_var.data    def get_expect():      return global_var.expect    def get_result():      return global_var.result    def get_header_value():      return global_var.header

4.3 解决数据依赖(dependent.py )

#coding:utf-8  import sys  import json  sys.path.append('C:/Users/lxz/Desktop/InterFace_JIA')  from tool.operation_excel import OperationExcel  from base.runmethod import RunMethod  from operation_data.get_data import GetData  from jsonpath_rw import jsonpath,parse  class DependdentData:      def __init__(self,case_id):          self.case_id = case_id          self.opera_excel = OperationExcel()          self.data = GetData()        #通过case_id去获取该case_id的整行数据      def get_case_line_data(self):          rows_data = self.opera_excel.get_rows_data(self.case_id)          return rows_data        #执行依赖测试,获取结果      def run_dependent(self):          run_method = RunMethod()          row_num  = self.opera_excel.get_row_num(self.case_id)          request_data = self.data.get_data_for_json(row_num)          #header = self.data.is_header(row_num)          method = self.data.get_request_method(row_num)          url = self.data.get_request_url(row_num)          res = run_method.run_main(method,url,request_data)          return json.loads(res)        #根据依赖的key去获取执行依赖测试case的响应,然后返回      def get_data_for_key(self,row):          depend_data = self.data.get_depend_key(row)          response_data = self.run_dependent()          json_exe = parse(depend_data)          madle = json_exe.find(response_data)          return [math.value for math in madle][0]    if __name__ == '__main__':      order = {          "data": {              "_input_charset": "utf-8",              "body": "京东订单-1710141907182334",              "it_b_pay": "1d",              "notify_url": "http://order.imooc.com/pay/notifyalipay",              "out_trade_no": "1710141907182334",              "partner": "2088002966755334",              "payment_type": "1",              "seller_id": "[email protected]",              "service": "mobile.securitypay.pay",              "sign": "kZBV53KuiUf5HIrVLBCcBpWDg%2FnzO%2BtyEnBqgVYwwBtDU66Xk8VQUTbVOqDjrNymCupkVhlI%2BkFZq1jOr8C554KsZ7Gk7orC9dDbQl                         pr%2BaMmdjO30JBgjqjj4mmM%2Flphy9Xwr0Xrv46uSkDKdlQqLDdGAOP7YwOM2dSLyUQX%2Bo4%3D",              "sign_type": "RSA",              "string": "_input_charset=utf-8&body=京东订单-1710141907182334&it_b_pay=1d&notify_url=http://order.imooc.com/pay/                           notifyalipay&out_trade_no=1710141907182334&partner=2088002966755334&payment_type=1&seller_id=yangyan01@                           tcl.com&service=mobile.securitypay.pay&subject=京东订单-1710141907182334&total_fee=299&sign=kZBV53KuiUf5H                           IrVLBCcBpWDg%2FnzO%2BtyEnBqgVYwwBtDU66Xk8VQUTbVOqDjrNymCupkVhlI%2BkFZq1jOr8C554KsZ7Gk7orC9dDbQlpr%2BaMmdjO30                           JBgjqjj4mmM%2Flphy9Xwr0Xrv46uSkDKdlQqLDdGAOP7YwOM2dSLyUQX%2Bo4%3D&sign_type=RSA",              "subject": "京东订单-1710141907182334",              "total_fee": 299              },              "errorCode": 1000,              "errorDesc": "成功",              "status": 1,              "timestamp": 1507979239100          }      res = "data.out_trade_no"      json_exe = parse(res)      madle = json_exe.find(order)      print [math.value for math in madle][0]

五、工具类包 tool

5.1 操作excel (operation_excel.py)

#coding:utf-8  import xlrd  from xlutils.copy import copy  class OperationExcel:      def __init__(self,file_name=None,sheet_id=None):          if file_name:              self.file_name = file_name              self.sheet_id = sheet_id          else:              self.file_name = '../dataconfig/case1.xls'              self.sheet_id = 0          self.data = self.get_data()        #获取sheets的内容      def get_data(self):          data = xlrd.open_workbook(self.file_name)          tables = data.sheets()[self.sheet_id]          return tables        #获取单元格的行数      def get_lines(self):          tables = self.data          return tables.nrows        #获取某一个单元格的内容      def get_cell_value(self,row,col):          return self.data.cell_value(row,col)        #写入数据      def write_value(self,row,col,value):          '''          写入excel数据          row,col,value          '''          read_data = xlrd.open_workbook(self.file_name)          write_data = copy(read_data)          sheet_data = write_data.get_sheet(0)          sheet_data.write(row,col,value)          write_data.save(self.file_name)        #根据对应的caseid 找到对应行的内容      def get_rows_data(self,case_id):          row_num = self.get_row_num(case_id)          rows_data = self.get_row_values(row_num)          return rows_data        #根据对应的caseid找到对应的行号      def get_row_num(self,case_id):          num = 0          clols_data = self.get_cols_data()          for col_data in clols_data:              if case_id in col_data:                  return num              num = num+1      #根据行号,找到该行的内容      def get_row_values(self,row):          tables = self.data          row_data = tables.row_values(row)          return row_data        #获取某一列的内容      def get_cols_data(self,col_id=None):          if col_id != None:              cols = self.data.col_values(col_id)          else:              cols = self.data.col_values(0)          return cols      if __name__ == '__main__':      opers = OperationExcel()      print opers.get_cell_value(1,2)

5.2判断字符串包含,判断字典是否相等(common_util.py)

#coding:utf-8  import json  class CommonUtil:      def is_contain(self,str_one,str_two):          '''          判断一个字符串是否再另外一个字符串中          str_one:查找的字符串          str_two:被查找的字符串          '''          flag = None          if isinstance(str_one,unicode):              str_one = str_one.encode('unicode-escape').decode('string_escape')          return cmp(str_one,str_two)          if str_one in str_two:              flag = True          else:              flag = False          return flag          def is_equal_dict(self,dict_one,dict_two):          '''          判断两个字典是否相等          '''          if isinstance(dict_one,str):              dict_one = json.loads(dict_one)          if isinstance(dict_two,str):              dict_two = json.loads(dict_two)          return cmp(dict_one,dict_two)

5.3 操作header(operation_herder.py)

# coding:utf-8  import requests  import json  from operation_json import OperetionJson      class OperationHeader:        def __init__(self ,response):          self.response = json.loads(response)        def get_response_url(self):          '''          获取登录返回的token的url          '''          url = self.response['data']['url'][0]          return url        def get_cookie(self):          '''          获取cookie的jar文件          '''          url = self.get_response_url( ) +"&callback=jQuery21008240514814031887_1508666806688&_=1508666806689"          cookie = requests.get(url).cookies          return cookie        def write_cookie(self):          cookie = requests.utils.dict_from_cookiejar(self.get_cookie())          op_json = OperetionJson()          op_json.write_data(cookie)    if __name__ == '__main__':        url = "http://www.jd.com/passport/user/login"      data = {          "username" :"18513199586",          "password" :"111111",          "verify" :"",          "referer" :"https://www.jd.com"      }      res = json.dumps(requests.post(url ,data).json())      op_header = OperationHeader(res)      op_header.write_cookie()

5.4 操作json文件(operation_json.py)

#coding:utf-8  import json  class OperetionJson:        def __init__(self,file_path=None):          if file_path  == None:              self.file_path = '../dataconfig/user.json'          else:              self.file_path = file_path          self.data = self.read_data()        #读取json文件      def read_data(self):          with open(self.file_path) as fp:              data = json.load(fp)              return data        #根据关键字获取数据      def get_data(self,id):          print type(self.data)          return self.data[id]        #写json      def write_data(self,data):          with open('../dataconfig/cookie.json','w') as fp:              fp.write(json.dumps(data))        if __name__ == '__main__':      opjson = OperetionJson()      print opjson.get_data('shop')

5.5 操作数据库(connect_db.py)

 #coding:utf-8  import MySQLdb.cursors  import json  class OperationMysql:      def __init__(self):          self.conn = MySQLdb.connect(              host='localhost',              port=3306,              user='root',              passwd='123456',              db='le_study',              charset='utf8',              cursorclass=MySQLdb.cursors.DictCursor              )          self.cur = self.conn.cursor()        #查询一条数据      def search_one(self,sql):          self.cur.execute(sql)          result = self.cur.fetchone()          result = json.dumps(result)          return result    if __name__ == '__main__':      op_mysql = OperationMysql()      res = op_mysql.search_one("select * from web_user where Name='ailiailan'")      print res

5.6 发送报告邮件(send_email.py)


#coding:utf-8  import smtplib  from email.mime.text import MIMEText  class SendEmail:      global send_user      global email_host      global password      email_host = "smtp.163.com"      send_user = "[email protected]"      password = "jia_668"      def send_mail(self,user_list,sub,content):          user = "jiaxiaonan"+"<"+send_user+">"          message = MIMEText(content,_subtype='plain',_charset='utf-8')          message['Subject'] = sub          message['From'] = user          message['To'] = ";".join(user_list)          server = smtplib.SMTP()          server.connect(email_host)          server.login(send_user,password)          server.sendmail(user,user_list,message.as_string())          server.close()        def send_main(self,pass_list,fail_list):          pass_num = float(len(pass_list))          fail_num = float(len(fail_list))          count_num = pass_num+fail_num          #90%          pass_result = "%.2f%%" %(pass_num/count_num*100)          fail_result = "%.2f%%" %(fail_num/count_num*100)              user_list = ['[email protected]']          sub = "接口自动化测试报告"          content = "此次一共运行接口个数为%s个,通过个数为%s个,失败个数为%s,通过率为%s,失败率为%s" %(count_num,pass_num,fail_num,pass_result,fail_result )          self.send_mail(user_list,sub,content)    if __name__ == '__main__':      sen = SendEmail()      sen.send_main([1,2,3,4],[2,3,4,5,6,7])

六、主函数

run_test.py

#coding:utf-8  import sys  sys.path.append("C:/Users/lxz/Desktop/InterFace_JIA")  from base.runmethod import RunMethod  from operation_data.get_data import GetData  from tool.common_util import CommonUtil  from operation_data.dependent_data import DependdentData  from tool.send_email import SendEmail  from tool.operation_header import OperationHeader  from tool.operation_json import OperetionJson  class RunTest:      def __init__(self):          self.run_method = RunMethod()          self.data = GetData()          self.com_util = CommonUtil()          self.send_mai = SendEmail()        #程序执行的      def go_on_run(self):          res = None          pass_count = []          fail_count = []          #10  0,1,2,3          rows_count = self.data.get_case_lines()          for i in range(1,rows_count):              is_run = self.data.get_is_run(i)              if is_run:                  url = self.data.get_request_url(i)                  method = self.data.get_request_method(i)                  request_data = self.data.get_data_for_json(i)                  expect = self.data.get_expcet_data_for_mysql(i)                  header = self.data.is_header(i)                  depend_case = self.data.is_depend(i)                  if depend_case != None:                      self.depend_data = DependdentData(depend_case)                      #获取的依赖响应数据                      depend_response_data = self.depend_data.get_data_for_key(i)                      #获取依赖的key                      depend_key = self.data.get_depend_field(i)                      request_data[depend_key] = depend_response_data                  if header == 'write':                      res = self.run_method.run_main(method,url,request_data)                      op_header = OperationHeader(res)                      op_header.write_cookie()                    elif header == 'yes':                      op_json = OperetionJson('../dataconfig/cookie.json')                      cookie = op_json.get_data('apsid')                      cookies = {                          'apsid':cookie                      }                      res = self.run_method.run_main(method,url,request_data,cookies)                  else:                      res = self.run_method.run_main(method,url,request_data)                    if self.com_util.is_equal_dict(expect,res) == 0:                      self.data.write_result(i,'pass')                      pass_count.append(i)                  else:                      self.data.write_result(i,res)                      fail_count.append(i)          self.send_mai.send_main(pass_count,fail_count)        #将执行判断封装      #def get_cookie_run(self,header):  if __name__ == '__main__':      run = RunTest()      run.go_on_run()