一、框架结构:

 工程目录

二、Case文件设计

三、基础包 base

3.1 封装get/post请求(runmethon.py)

 import requests
import json
class RunMethod:
def post_main(self,url,data,header=None):
res = None
if header !=None:
res = requests.post(url=url,data=data,headers=header)
else:
res = requests.post(url=url,data=data)
return res.json() def get_main(self,url,data=None,header=None):
res = None
if header !=None:
res = requests.get(url=url,data=data,headers=header,verify=False)
else:
res = requests.get(url=url,data=data,verify=False)
return res.json() def run_main(self,method,url,data=None,header=None):
res = None
if method == 'Post':
res = self.post_main(url,data,header)
else:
res = self.get_main(url,data,header)
return json.dumps(res,ensure_ascii=False,sort_keys=True,indent=2)

3.2 封装mock(mock.py)

from mock import mock
#模拟mock 封装
def mock_test(mock_method,request_data,url,method,response_data):
mock_method = mock.Mock(return_value=response_data)
res = mock_method(url,method,request_data)
return res

四、数据操作包 operation_data

4.1 获取excel单元格中的内容(get_data.py)

# coding:utf-8
from tool.operation_excel import OperationExcel
import data_config
from tool.operation_json import OperetionJson
from tool.connect_db import OperationMysql class GetData:
def __init__(self):
self.opera_excel = OperationExcel() # 去获取excel行数,就是case的个数
def get_case_lines(self):
return self.opera_excel.get_lines() # 获取是否执行
def get_is_run(self, row):
flag = None
col = int(data_config.get_run())
run_model = self.opera_excel.get_cell_value(row, col)
if run_model == 'yes':
flag = True
else:
flag = False
return flag # 是否携带header def is_header(self, row):
col = int(data_config.get_header())
header = self.opera_excel.get_cell_value(row, col)
if header != '':
return header
else:
return None # 获取请求方式 def get_request_method(self, row):
col = int(data_config.get_run_way())
request_method = self.opera_excel.get_cell_value(row, col)
return request_method # 获取url def get_request_url(self, row):
col = int(data_config.get_url())
url = self.opera_excel.get_cell_value(row, col)
return url # 获取请求数据
def get_request_data(self, row):
col = int(data_config.get_data())
data = self.opera_excel.get_cell_value(row, col)
if data == '':
return None
return data # 通过获取关键字拿到data数据
def get_data_for_json(self, row):
opera_json = OperetionJson()
request_data = opera_json.get_data(self.get_request_data(row))
return request_data # 获取预期结果 def get_expcet_data(self, row):
col = int(data_config.get_expect())
expect = self.opera_excel.get_cell_value(row, col)
if expect == '':
return None
return expect # 通过sql获取预期结果
def get_expcet_data_for_mysql(self, row):
op_mysql = OperationMysql()
sql = self.get_expcet_data(row)
res = op_mysql.search_one(sql)
return res.decode('unicode-escape') def write_result(self, row, value):
col = int(data_config.get_result())
self.opera_excel.write_value(row, col, value) # 获取依赖数据的key
def get_depend_key(self, row):
col = int(data_config.get_data_depend())
depent_key = self.opera_excel.get_cell_value(row, col)
if depent_key == "":
return None
else:
return depent_key # 判断是否有case依赖 def is_depend(self, row):
col = int(data_config.get_case_depend())
depend_case_id = self.opera_excel.get_cell_value(row, col)
if depend_case_id == "":
return None
else:
return depend_case_id # 获取数据依赖字段 def get_depend_field(self, row):
col = int(data_config.get_field_depend())
data = self.opera_excel.get_cell_value(row, col)
if data == "":
return None
else:
return data

4.2 获取excel中每个列(data_config.py)

# coding:utf-8
class global_var:
# case_id
Id = ''
request_name = ''
url = ''
run = ''
request_way = ''
header = ''
case_depend = ''
data_depend = ''
field_depend = ''
data = ''
expect = ''
result = ''
# 获取caseid
def get_id():
return global_var.Id # 获取url
def get_url():
return global_var.url def get_run():
return global_var.run def get_run_way():
return global_var.request_way def get_header():
return global_var.header def get_case_depend():
return global_var.case_depend def get_data_depend():
return global_var.data_depend def get_field_depend():
return global_var.field_depend def get_data():
return global_var.data def get_expect():
return global_var.expect def get_result():
return global_var.result def get_header_value():
return global_var.header

4.3 解决数据依赖(dependent.py )

# coding:utf-8
import sys
import json sys.path.append('C:/Users/lxz/Desktop/InterFace_JIA')
from tool.operation_excel import OperationExcel
from base.runmethod import RunMethod
from operation_data.get_data import GetData
from jsonpath_rw import jsonpath, parse class DependdentData:
def __init__(self, case_id):
self.case_id = case_id
self.opera_excel = OperationExcel()
self.data = GetData() # 通过case_id去获取该case_id的整行数据
def get_case_line_data(self):
rows_data = self.opera_excel.get_rows_data(self.case_id)
return rows_data # 执行依赖测试,获取结果
def run_dependent(self):
run_method = RunMethod()
row_num = self.opera_excel.get_row_num(self.case_id)
request_data = self.data.get_data_for_json(row_num)
# header = self.data.is_header(row_num)
method = self.data.get_request_method(row_num)
url = self.data.get_request_url(row_num)
res = run_method.run_main(method, url, request_data)
return json.loads(res) # 根据依赖的key去获取执行依赖测试case的响应,然后返回
def get_data_for_key(self, row):
depend_data = self.data.get_depend_key(row)
response_data = self.run_dependent()
json_exe = parse(depend_data)
madle = json_exe.find(response_data)
return [math.value for math in madle][0] if __name__ == '__main__':
order = {
"data": {
"_input_charset": "utf-8",
"body": "京东订单-1710141907182334",
"it_b_pay": "1d",
"notify_url": "http://order.imooc.com/pay/notifyalipay",
"out_trade_no": "",
"partner": "",
"payment_type": "",
"seller_id": "yangyan01@tcl.com",
"service": "mobile.securitypay.pay",
"sign": "kZBV53KuiUf5HIrVLBCcBpWDg%2FnzO%2BtyEnBqgVYwwBtDU66Xk8VQUTbVOqDjrNymCupkVhlI%2BkFZq1jOr8C554KsZ7Gk7orC9dDbQlpr % 2BaMmdjO30JBgjqjj4mmM % 2Flphy9Xwr0Xrv46uSkDKdlQqLDdGAOP7YwOM2dSLyUQX % 2Bo4 % 3D",
"sign_type": "RSA",
"string": "_input_charset=utf-8&body=京东订单-1710141907182334&it_b_pay=1d&notify_url=http://order.imooc.com/pay/notifyalipay & out_trade_no = 1710141907182334 & partner = 2088002966755334 & payment_type = 1 & seller_id = yangyan01 @tcl.com & service = mobile.securitypay.pay & subject = 京东订单 - 1710141907182334 & total_fee = 299 & sign = kZBV53KuiUf5HIrVLBCcBpWDg % 2FnzO % 2BtyEnBqgVYwwBtDU66Xk8VQUTbVOqDjrNymCupkVhlI % 2BkFZq1jOr8C554KsZ7Gk7orC9dDbQlpr % 2BaMmdjO30JBgjqjj4mmM % 2Flphy9Xwr0Xrv46uSkDKdlQqLDdGAOP7YwOM2dSLyUQX % 2Bo4 % 3D &sign_type = RSA",
"subject": "京东订单-1710141907182334",
"total_fee": 299
},
"errorCode": 1000,
"errorDesc": "成功",
"status": 1,
"timestamp": 1507979239100
}
res = "data.out_trade_no"
json_exe = parse(res)
madle = json_exe.find(order)
print[math.value for math in madle][0]

五、工具类包 tool

5.1 操作excel (operation_excel.py)

#coding:utf-8
import xlrd
from xlutils.copy import copy
class OperationExcel:
def __init__(self,file_name=None,sheet_id=None):
if file_name:
self.file_name = file_name
self.sheet_id = sheet_id
else:
self.file_name = '../dataconfig/case1.xls'
self.sheet_id = 0
self.data = self.get_data() #获取sheets的内容
def get_data(self):
data = xlrd.open_workbook(self.file_name)
tables = data.sheets()[self.sheet_id]
return tables #获取单元格的行数
def get_lines(self):
tables = self.data
return tables.nrows #获取某一个单元格的内容
def get_cell_value(self,row,col):
return self.data.cell_value(row,col) #写入数据
def write_value(self,row,col,value):
'''
写入excel数据
row,col,value
'''
read_data = xlrd.open_workbook(self.file_name)
write_data = copy(read_data)
sheet_data = write_data.get_sheet(0)
sheet_data.write(row,col,value)
write_data.save(self.file_name) #根据对应的caseid 找到对应行的内容
def get_rows_data(self,case_id):
row_num = self.get_row_num(case_id)
rows_data = self.get_row_values(row_num)
return rows_data #根据对应的caseid找到对应的行号
def get_row_num(self,case_id):
num = 0
clols_data = self.get_cols_data()
for col_data in clols_data:
if case_id in col_data:
return num
num = num+1 #根据行号,找到该行的内容
def get_row_values(self,row):
tables = self.data
row_data = tables.row_values(row)
return row_data #获取某一列的内容
def get_cols_data(self,col_id=None):
if col_id != None:
cols = self.data.col_values(col_id)
else:
cols = self.data.col_values(0)
return cols if __name__ == '__main__':
opers = OperationExcel()
print(opers.get_cell_value(1,2))

5.2判断字符串包含,判断字典是否相等(common_util.py)

# coding:utf-8
import json class CommonUtil:
def is_contain(self, str_one, str_two):
'''
判断一个字符串是否再另外一个字符串中
str_one:查找的字符串
str_two:被查找的字符串
'''
flag = None
if isinstance(str_one, unicode):
str_one = str_one.encode('unicode-escape').decode('string_escape')
return cmp(str_one, str_two)
if str_one in str_two:
flag = True
else:
flag = False
return flag def is_equal_dict(self, dict_one, dict_two):
'''
判断两个字典是否相等
'''
if isinstance(dict_one, str):
dict_one = json.loads(dict_one)
if isinstance(dict_two, str):
dict_two = json.loads(dict_two)
return cmp(dict_one, dict_two)

5.3 操作header(operation_herder.py)

# coding:utf-8
import requests
import json
from operation_json import OperetionJson class OperationHeader:
def __init__(self, response):
self.response = json.loads(response) def get_response_url(self):
'''
获取登录返回的token的url
'''
url = self.response['data']['url'][0]
return url def get_cookie(self):
'''
获取cookie的jar文件
'''
url = self.get_response_url() + "&callback=jQuery21008240514814031887_1508666806688&_=1508666806689"
cookie = requests.get(url).cookies
return cookie def write_cookie(self):
cookie = requests.utils.dict_from_cookiejar(self.get_cookie())
op_json = OperetionJson()
op_json.write_data(cookie) if __name__ == '__main__':
url = "http://www.jd.com/passport/user/login"
data = {
"username": "",
"password": "",
"verify": "",
"referer": "https://www.jd.com"
}
res = json.dumps(requests.post(url, data).json())
op_header = OperationHeader(res)
op_header.write_cookie()

5.4 操作json文件(operation_json.py)

# coding:utf-8
import json class OperetionJson:
def __init__(self, file_path=None):
if file_path == None:
self.file_path = '../dataconfig/user.json'
else:
self.file_path = file_path
self.data = self.read_data() # 读取json文件
def read_data(self):
with open(self.file_path) as fp:
data = json.load(fp)
return data # 根据关键字获取数据
def get_data(self, id):
print
type(self.data)
return self.data[id] # 写json
def write_data(self, data):
with open('../dataconfig/cookie.json', 'w') as fp:
fp.write(json.dumps(data)) if __name__ == '__main__':
opjson = OperetionJson()
print(opjson.get_data('shop'))

5.5 操作数据库(connect_db.py)

# coding:utf-8
import MySQLdb.cursors
import json class OperationMysql:
def __init__(self):
self.conn = MySQLdb.connect(
host='localhost',
port=3306,
user='root',
passwd='',
db='le_study',
charset='utf8',
cursorclass=MySQLdb.cursors.DictCursor
)
self.cur = self.conn.cursor() # 查询一条数据
def search_one(self, sql):
self.cur.execute(sql)
result = self.cur.fetchone()
result = json.dumps(result)
return result if __name__ == '__main__':
op_mysql = OperationMysql()
res = op_mysql.search_one("select * from web_user where Name='ailiailan'")
print(res)

5.6 发送报告邮件(send_email.py)

# coding:utf-8
import smtplib
from email.mime.text import MIMEText class SendEmail:
global send_user
global email_host
global password
email_host = "smtp.163.com"
send_user = "jiaxiaonan666@163.com"
password = "jia_668" def send_mail(self, user_list, sub, content):
user = "jiaxiaonan" + "<" + send_user + ">"
message = MIMEText(content, _subtype='plain', _charset='utf-8')
message['Subject'] = sub
message['From'] = user
message['To'] = ";".join(user_list)
server = smtplib.SMTP()
server.connect(email_host)
server.login(send_user, password)
server.sendmail(user, user_list, message.as_string())
server.close() def send_main(self, pass_list, fail_list):
pass_num = float(len(pass_list))
fail_num = float(len(fail_list))
count_num = pass_num + fail_num
# 90%
pass_result = "%.2f%%" % (pass_num / count_num * 100)
fail_result = "%.2f%%" % (fail_num / count_num * 100) user_list = ['609037724@qq.com']
sub = "接口自动化测试报告"
content = "此次一共运行接口个数为%s个,通过个数为%s个,失败个数为%s,通过率为%s,失败率为%s" % (
count_num, pass_num, fail_num, pass_result, fail_result)
self.send_mail(user_list, sub, content) if __name__ == '__main__':
sen = SendEmail()
sen.send_main([1, 2, 3, 4], [2, 3, 4, 5, 6, 7])

六、主函数

run_test.py

# coding:utf-8
import sys sys.path.append("C:/Users/lxz/Desktop/InterFace_JIA")
from base.runmethod import RunMethod
from operation_data.get_data import GetData
from tool.common_util import CommonUtil
from operation_data.dependent_data import DependdentData
from tool.send_email import SendEmail
from tool.operation_header import OperationHeader
from tool.operation_json import OperetionJson class RunTest:
def __init__(self):
self.run_method = RunMethod()
self.data = GetData()
self.com_util = CommonUtil()
self.send_mai = SendEmail() # 程序执行的
def go_on_run(self):
res = None
pass_count = []
fail_count = []
# 10 0,1,2,3
rows_count = self.data.get_case_lines()
for i in range(1, rows_count):
is_run = self.data.get_is_run(i)
if is_run:
url = self.data.get_request_url(i)
method = self.data.get_request_method(i)
request_data = self.data.get_data_for_json(i)
expect = self.data.get_expcet_data_for_mysql(i)
header = self.data.is_header(i)
depend_case = self.data.is_depend(i)
if depend_case != None:
self.depend_data = DependdentData(depend_case)
# 获取的依赖响应数据
depend_response_data = self.depend_data.get_data_for_key(i)
# 获取依赖的key
depend_key = self.data.get_depend_field(i)
request_data[depend_key] = depend_response_data
if header == 'write':
res = self.run_method.run_main(method, url, request_data)
op_header = OperationHeader(res)
op_header.write_cookie() elif header == 'yes':
op_json = OperetionJson('../dataconfig/cookie.json')
cookie = op_json.get_data('apsid')
cookies = {
'apsid': cookie
}
res = self.run_method.run_main(method, url, request_data, cookies)
else:
res = self.run_method.run_main(method, url, request_data) if self.com_util.is_equal_dict(expect, res) == 0:
self.data.write_result(i, 'pass')
pass_count.append(i)
else:
self.data.write_result(i, res)
fail_count.append(i)
self.send_mai.send_main(pass_count, fail_count) # 将执行判断封装
# def get_cookie_run(self,header): if __name__ == '__main__':
run = RunTest()
run.go_on_run()

 

 

最新文章

  1. SQL Server 2012 清理日志 截断日志的方法
  2. appium如何获取conten-desc内容文本
  3. Tomcat catalina.out日志使用log4j按天分割
  4. 为简单而努力:Android封装类详解
  5. SQLiteDatabase浅谈
  6. php中的curl】php中curl的详细解说
  7. python操作postgresql数据库
  8. iOS Dev (20) 用 AVAudioPlayer 播放一个本地音频文件
  9. C#中静态与非静态方法比较【转】
  10. InstallShield集成安装MSDE2000最小版本(三) fishout特许授权发布
  11. Matlab内置函数
  12. java四种xml解析区别
  13. Java爬虫——模拟登录知乎
  14. ABP官方文档翻译 2.7 对象到对象的映射
  15. centos下 redmind2.6安装
  16. 基于Jenkins,docker实现自动化部署(持续交互)
  17. [USACO18JAN]Stamp Painting
  18. 管理MariaDB
  19. 魅族MX四核手机转让,二手淘宝上+hi-pda论坛结合使用成功已出
  20. js计时器方法的使用

热门文章

  1. 阿里云重磅推出物联网设备身份认证Link ID&#178;
  2. Java中JNI的使用详解第六篇:C/C++中的引用类型和Id的缓存
  3. storm集群的安装
  4. 大转盘抽奖css3+js(简单书写)
  5. NX二次开发-UFUN编辑图层类别名字UF_LAYER_edit_category_name
  6. How to SSH Into Your iPhone
  7. hdu4126_hdu4756_求最小生成树的最佳替换边_Kruskal and Prim
  8. vmware压缩磁盘空间的方法, linux怎么卸载vmware
  9. Python 爬虫-抓取中小企业股份转让系统公司公告的链接并下载
  10. 火狐不支持webp格式的图片