实现功能

  • 测试数据隔离: 测试前后进行数据库备份/还原
  • 接口直接的数据依赖: 需要B接口使用A接口响应中的某个字段作为参数
  • 对接数据库: 讲数据库的查询结果可直接用于断言操作
  • 动态多断言: 可(多个)动态提取实际预期结果与指定的预期结果进行比较断言操作
  • 自定义扩展方法: 在用例中使用自定义方法(如:获取当前时间戳...)的返回值
  • 邮件发送:将allure报告压缩后已附件形式发送
  • 企业微信推送:搭配jenkins发送allure测试报告地址
  • 钉钉推送:搭配jenkins发送allure测试报告地址
  • 接口录制:录制指定包含url的接口,生成用例数据

目录结构

├─api
│ └─client.py # 请求封装
├─backup_sqls
│ └─xxx.sql # 数据库备份文件
├─config
│ └─config.yaml # 配置文件
├─data
│ └─test_data.xlsx # 用例文件
├─log
│ └─run...x.log # 日志文件
├─report
│ ├─data
│ └─html # allure报告
├─test
│ ├─conftest.py # 依赖对象初始化
│ └─test_api.py # 测试文件
├─tools # 工具包
│ ├─__init__.py # 常用方法封装
│ ├─data_clearing.py # 数据隔离
│ ├─data_process.py # 依赖数据处理
│ ├─db.py # 数据库连接对象
│ ├─hooks.py # 自定义扩展方法(可用于用例)文件
│ ├─read_file.py # 用例、配置项读取
│ ├─recording.py # 接口录制,写入用例文件
│ ├─send_dd.py # 给钉钉发送测试报告
│ ├─send_vx.py # 给企业微信发送测试报告
│ └─send_email.py # 邮件发送、报告压缩
├─项目实战接口文档.md # 配套项目相关接口文档
├─requirements.txt # 项目依赖库文件
└─run.py # 主启动文件

主要代码

from typing import Any

from requests import Session
from tools import allure_step, allure_title, logger, allure_step_no
from tools.data_process import DataProcess class Transmission:
PARAMS: str = "params"
DATA: str = "data"
JSON: str = "json" class Client(Session): def action(self, case: list, env: str = "dev") -> Any:
"""处理case数据,转换成可用数据发送请求
:param case: 读取出来的每一行用例内容,可进行解包
:param env: 环境名称 默认使用config.yaml server下的 dev 后面的基准地址
return: 响应结果, 预期结果
"""
(
_,
case_title,
header,
path,
method,
parametric_key,
file_obj,
data,
extra,
sql,
expect,
) = case
logger.debug(
f"用例进行处理前数据: \n 接口路径: {path} \n 请求参数: {data} \n 提取参数: {extra} \n 后置sql: {sql} \n 预期结果: {expect} \n "
)
# allure报告 用例标题
allure_title(case_title)
# 处理url、header、data、file、的前置方法
url = DataProcess.handle_path(path, env)
header = DataProcess.handle_header(header)
data = DataProcess.handle_data(data)
allure_step("请求数据", data)
file = DataProcess.handler_files(file_obj)
# 发送请求
response = self._request(url, method, parametric_key, header, data, file)
# 提取参数
DataProcess.handle_extra(extra, response)
return response, expect, sql def _request(
self, url, method, parametric_key, header=None, data=None, file=None
) -> dict:
"""
:param method: 请求方法
:param url: 请求url
:param parametric_key: 入参关键字, params(查询参数类型,明文传输,一般在url?参数名=参数值), data(一般用于form表单类型参数)
json(一般用于json类型请求参数)
:param data: 参数数据,默认等于None
:param file: 文件对象
:param header: 请求头
:return: 返回res对象
""" if parametric_key == Transmission.PARAMS:
extra_args = {Transmission.PARAMS: data}
elif parametric_key == Transmission.DATA:
extra_args = {Transmission.DATA: data}
elif parametric_key == Transmission.JSON:
extra_args = {Transmission.JSON: data}
else:
raise ValueError("可选关键字为params, json, data") res = self.request(
method=method, url=url, files=file, headers=header, verify=False,**extra_args
)
response = res.json()
logger.info(
f"\n最终请求地址:{res.url}\n请求方法:{method}\n请求头:{header}\n请求参数:{data}\n上传文件:{file}\n响应数据:{response}"
)
allure_step_no(f"响应耗时(s): {res.elapsed.total_seconds()}")
allure_step("响应结果", response)
return response client = Client()

client.py

server:
# 本地接口服务
test: http://127.0.0.1:8888/
# 正式环境地址
dev: ******* # 基准的请求头信息
request_headers:
Accept-Encoding: gzip, deflate
Accept-Language: zh-CN,zh;q=0.9
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.125 Safari/537.36 file_path:
test_case: data/case_data.xls
report: report/
log: log/run{time}.log email:
# 发件人邮箱
user: *******
# 发件人邮箱授权码
password: ********
# 邮箱host
host: smtp.qq.com
contents: 解压apiAutoReport.zip(接口测试报告)后,请使用已安装Live Server 插件的VsCode,打开解压目录下的index.html查看报告
# 收件人邮箱
addressees: ["*******", "********"]
title: 接口自动化测试报告(见附件)
# 附件
enclosures: report.zip dingding:
# jenkins登录地址
jenkins_url: "http://192.168.1.107:8080/"
# job名称
job_name: "job/轻萤/" weixin:
# Corpid是企业号的标识
Corpid: "********"
# Secret是管理组凭证密钥
Secret: "**********"
# 应用ID
Agentid: "1000002"
# 部门id
Partyid: '1'
# jenkins登录地址
jenkins_url: "http://192.168.1.107:8080/"
# job名称
job_name: "job/轻萤/" # 数据库校验- mysql
database:
host: *********
port: 3306
user: root
# 不用''会被解析成int类型数据
password: 'root123'
db_name: py_test
charset: utf8mb4 # 数据库所在的服务器配置
ssh_server:
port: 22
username: root
password: 'root123'
# 私有密钥文件路径
private_key_file:
# 私钥密码
privat_passowrd:
# 如果使用的docker容器部署mysql服务,需要传入mysql的容器id/name
mysql_container: mysql8
# 数据库备份文件导出的本地路径, 需要保证存在该文件夹
sql_data_file: backup_sqls/

config.yaml

用例文件: case_data.xls

import pytest

from tools.data_clearing import DataClearing
from tools.db import DB
from tools.read_file import ReadFile # @pytest.fixture(scope="session")
# def data_clearing():
# """数据清洗"""
# DataClearing.server_init()
# # 1. 备份数据库
# DataClearing.backup_mysql()
# yield
# # 2. 恢复数据库
# DataClearing.recovery_mysql()
# DataClearing.close_client()
#
#
# # 若不需要数据清洗功能,请把data_clearing去掉
# @pytest.fixture(scope="session")
# def get_db(data_clearing):
# """关于其作用域请移步查看官方文档"""
# try:
# db = DB()
# yield db
# finally:
# db.close() # 不使用数据清洗 请把 下面代码解除注释 上面的get_db函数注释
# @pytest.fixture(scope="session")
# def get_db():
# """关于其作用域请移步查看官方文档"""
# try:
# db = DB()
# yield db
# finally:
# db.close() @pytest.fixture(params=ReadFile.read_testcase())
def cases(request):
"""用例数据,测试方法参数入参该方法名 cases即可,实现同样的参数化
目前来看相较于@pytest.mark.parametrize 更简洁。
"""
return request.param

conftest.py

from .conftest import pytest

from api import client
from tools.data_process import DataProcess # https://www.cnblogs.com/shouhu/p/12392917.html
# reruns 重试次数 reruns_delay 次数之间的延时设置(单位:秒)
# 失败重跑,会影响总测试时长,如不需要 将 @pytest.mark.flaky(reruns=3, reruns_delay=5) 注释即可
# @pytest.mark.flaky(reruns=2, reruns_delay=1)
# def test_main(cases, get_db): # 使用数据库功能(包含sql查询,数据备份,数据恢复)
# # 此处的cases入参来自与 conftest.py 文件中 cases函数,与直接使用 @pytest.mark.parametrize
# # 有着差不多的效果
# # 发送请求
# response, expect, sql = client.action(cases)
# # 执行sql
# DataProcess.handle_sql(sql, get_db)
# # 断言操作
# DataProcess.assert_result(response, expect) def test_main(cases): # 不使用数据库功能
# 发送请求
response, expect, sql = client.action(cases)
# 断言操作
DataProcess.assert_result(response, expect)

test_api.py

import re
from string import Template
from typing import Any import allure from jsonpath import jsonpath
from loguru import logger from tools.hooks import * def exec_func(func: str) -> str:
"""执行函数(exec可以执行Python代码)
:params func 字符的形式调用函数
: return 返回的将是个str类型的结果
"""
# 得到一个局部的变量字典,来修正exec函数中的变量,在其他函数内部使用不到的问题
loc = locals()
exec(f"result = {func}")
return str(loc['result']) def extractor(obj: dict, expr: str = '.') -> Any:
"""
根据表达式提取字典中的value,表达式, . 提取字典所有内容, $.case 提取一级字典case, $.case.data 提取case字典下的data
:param obj :json/dict类型数据
:param expr: 表达式, . 提取字典所有内容, $.case 提取一级字典case, $.case.data 提取case字典下的data
$.0.1 提取字典中的第一个列表中的第二个的值
"""
try:
result = jsonpath(obj, expr)[0]
except Exception as e:
logger.error(f'{expr} - 提取不到内容,丢给你一个错误!{e}')
result = expr
return result def rep_expr(content: str, data: dict) -> str:
"""从请求参数的字符串中,使用正则的方法找出合适的字符串内容并进行替换
:param content: 原始的字符串内容
:param data: 提取的参数变量池
return content: 替换表达式后的字符串
"""
content = Template(content).safe_substitute(data)
for func in re.findall('\\${(.*?)}', content):
try:
content = content.replace('${%s}' % func, exec_func(func))
except Exception as e:
logger.error(e)
return content def convert_json(dict_str: str) -> dict:
"""
:param dict_str: 长得像字典的字符串
return json格式的内容
"""
try:
if 'None' in dict_str:
dict_str = dict_str.replace('None', 'null')
elif 'True' in dict_str:
dict_str = dict_str.replace('True', 'true')
elif 'False' in dict_str:
dict_str = dict_str.replace('False', 'false')
dict_str = json.loads(dict_str)
except Exception as e:
if 'null' in dict_str:
dict_str = dict_str.replace('null', 'None')
elif 'true' in dict_str:
dict_str = dict_str.replace('true', 'True')
elif 'false' in dict_str:
dict_str = dict_str.replace('false', 'False')
dict_str = eval(dict_str)
logger.error(e)
return dict_str def allure_title(title: str) -> None:
"""allure中显示的用例标题"""
allure.dynamic.title(title) def allure_step(step: str, var: str) -> None:
"""
:param step: 步骤及附件名称
:param var: 附件内容
"""
with allure.step(step):
allure.attach(
json.dumps(
var,
ensure_ascii=False,
indent=4),
step,
allure.attachment_type.JSON) def allure_step_no(step: str):
"""
无附件的操作步骤
:param step: 步骤名称
:return:
"""
with allure.step(step):
pass

_init_.py

import os
from datetime import datetime
import paramiko
from tools.read_file import ReadFile
from tools import logger class ServerTools:
def __init__(
self,
host: str,
port: int = 22,
username: str = "root",
password: str = None,
private_key_file: str = None,
privat_passowrd: str = None):
# 进行SSH连接
self.trans = paramiko.Transport((host, port))
self.host = host
if password is None:
self.trans.connect(
username=username,
pkey=paramiko.RSAKey.from_private_key_file(
private_key_file,
privat_passowrd))
else:
self.trans.connect(username=username, password=password)
# 将sshclient的对象的transport指定为以上的trans
self.ssh = paramiko.SSHClient()
logger.success("SSH客户端创建成功.")
self.ssh._transport = self.trans
# 创建SFTP客户端
self.ftp_client = paramiko.SFTPClient.from_transport(self.trans)
logger.success("SFTP客户端创建成功.") def execute_cmd(self, cmd: str):
"""
:param cmd: 服务器下对应的命令
"""
stdin, stdout, stderr = self.ssh.exec_command(cmd)
error = stderr.read().decode()
logger.info(f"输入命令: {cmd} -> 输出结果: {stdout.read().decode()}")
logger.error(f"异常信息: {error}")
return error def files_action(
self,
post: bool,
local_path: str = os.getcwd(),
remote_path: str = "/root"):
"""
:param post: 动作 为 True 就是上传, False就是下载
:param local_path: 本地的文件路径, 默认当前脚本所在的工作目录
:param remote_path: 服务器上的文件路径,默认在/root目录下
"""
if post: # 上传文件
self.ftp_client.put(
localpath=local_path,
remotepath=f"{remote_path}{os.path.split(local_path)[1]}")
logger.info(
f"文件上传成功: {local_path} -> {self.host}:{remote_path}{os.path.split(local_path)[1]}")
else: # 下载文件
file_path = local_path + os.path.split(remote_path)[1]
self.ftp_client.get(remotepath=remote_path, localpath=file_path)
logger.info(f"文件下载成功: {self.host}:{remote_path} -> {file_path}") def ssh_close(self):
"""关闭连接"""
self.trans.close()
logger.info("已关闭SSH连接...") class DataClearing:
settings = ReadFile.read_config('$.database')
server_settings = settings.get('ssh_server')
server = None # 导出的sql文件名称及后缀
file_name = f"{settings.get('db_name')}_{datetime.now().strftime('%Y-%m-%dT%H_%M_%S')}.sql" @classmethod
def server_init(cls, settings=settings, server_settings=server_settings):
cls.server = ServerTools(
host=settings.get('host'),
port=server_settings.get('port'),
username=server_settings.get('username'),
password=server_settings.get('password'),
private_key_file=server_settings.get('private_key_file'),
privat_passowrd=server_settings.get('privat_passowrd'))
# 新建backup_sql文件夹在服务器上,存放导出的sql文件
cls.server.execute_cmd("mkdir backup_sql") @classmethod
def backup_mysql(cls):
"""
备份数据库, 会分别备份在数据库所在服务器的/root/backup_sql/目录下, 与当前项目文件目录下的 backup_sqls
每次备份生成一个数据库名_当前年_月_日T_时_分_秒, 支持linux 服务器上安装的mysql服务(本人未调试),以及linux中docker部署的mysql备份
"""
if cls.server_settings.get('mysql_container') is None:
cmd = f"mysqldump -h127.0.0.1 -u{cls.settings.get('username')} -p{cls.settings.get('password')} {cls.settings.get('db_name')} > {cls.file_name}"
else:
# 将mysql服务的容器中的指定数据库导出, 参考文章
# https://www.cnblogs.com/wangsongbai/p/12666368.html
cmd = f"docker exec -i {cls.server_settings.get('mysql_container')} mysqldump -h127.0.0.1 -u{cls.settings.get('user')} -p{cls.settings.get('password')} {cls.settings.get('db_name')} > /root/backup_sql/{cls.file_name}"
cls.server.execute_cmd(cmd)
cls.server.files_action(0,
f"{cls.server_settings.get('sql_data_file')}",
f"/root/backup_sql/{cls.file_name}") @classmethod
def recovery_mysql(
cls,
sql_file: str = file_name,
database: str = settings.get('db_name')):
"""
恢复数据库, 从服务器位置(/root/backup_sql/) 或者本地(../backup_sqls)上传, 传入的需要是.sql文件
:param sql_file: .sql数据库备份文件, 默认就是导出的sql文件名称, 默认文件名称是导出的sql文件
:param database: 恢复的数据库名称,默认是备份数据库(config.yaml中的db_name)
"""
result = cls.server.execute_cmd(f"ls -l /root/backup_sql/{sql_file}")
if "No such file or directory" in result:
# 本地上传
cls.server.files_action(
1, f"../backup_sqls/{sql_file}", "/root/backup_sql/")
cmd = f"docker exec -i {cls.server_settings.get('mysql_container')} mysql -u{cls.settings.get('user')} -p{cls.settings.get('password')} {database} < /root/backup_sql/{sql_file}"
cls.server.execute_cmd(cmd) @classmethod
def close_client(cls):
cls.server.ssh_close()

data_clearing.py

import re

from tools import logger, extractor, convert_json, rep_expr, allure_step, allure_step_no, get_time, get_path
from tools.db import DB
from tools.read_file import ReadFile class DataProcess:
# 存放提取参数的池子
extra_pool = {}
header = ReadFile.read_config('$.request_headers') @classmethod
def handle_path(cls, path_str: str, env: str) -> str:
"""路径参数处理
:param path_str: 带提取表达式的字符串 /${id}/state/${create_time}
:param env: 环境名称, 对应的是环境基准地址
上述内容表示,从extra_pool字典里取到key为id 对应的值,假设是500,后面${create_time} 类似, 假设其值为 1605711095 最终提取结果
return /511/state/1605711095
"""
url = ReadFile.read_config(
f'$.server.{env}') + rep_expr(path_str, cls.extra_pool)
allure_step_no(f'请求地址: {url}')
return url @classmethod
def handle_header(cls, header_str: str) -> dict:
"""处理header, 将用例中的表达式处理后 追加到基础header中
:header_str: 用例栏中的header
return header:
"""
if header_str == '':
header_str = '{}'
cls.header.update(cls.handle_data(header_str))
allure_step('请求头', cls.header)
return cls.header @classmethod
def handler_files(cls, file_obj: str) -> object:
"""file对象处理方法
:param file_obj: 上传文件使用,格式:接口中文件参数的名称:"文件路径地址"/["文件地址1", "文件地址2"]
实例- 单个文件: &file&D:
"""
if file_obj != '':
for k, v in convert_json(file_obj).items():
# 多文件上传
if isinstance(v, list):
files = []
for path in v:
files.append((k, (open(path, 'rb'))))
else:
# 单文件上传
l = re.findall(r"\${(.+?)}", str(v))
for i in l:
if i == 'path':
v = str(v).replace("${" + i + "}", get_path()+'/data/upload_data/')
files = {k: open(v, 'rb')}
allure_step('上传文件', file_obj)
return files @classmethod
def handle_data(cls, variable: str) -> dict:
"""请求数据处理
:param variable: 请求数据,传入的是可转换字典/json的字符串,其中可以包含变量表达式
return 处理之后的json/dict类型的字典数据
"""
if variable != '':
data = rep_expr(variable, cls.extra_pool)
l = re.findall(r"\${(.+?)}", str(data))
for i in l:
if i == 'timestamp':
data = str(data).replace("${" + i + "}", get_time())
variable = convert_json(data)
return variable @classmethod
def handle_sql(cls, sql: str, db: DB):
"""
处理sql,如果sql执行的结果不会空,执行sql的结果和参数池合并
:param sql: 支持单条或者多条sql,其中多条sql使用 ; 进行分割
多条sql,在用例中填写方式如下select * from user; select * from goods 每条sql语句之间需要使用 ; 来分割
单条sql,select * from user 或者 select * from user;
:param db: 数据库连接对象
:return:
"""
sql = rep_expr(sql, cls.extra_pool) for sql in sql.split(";"):
sql = sql.strip()
if sql == '':
continue
# 查后置sql
result = db.execute_sql(sql)
allure_step(f'执行sql: {sql}', result)
logger.info(f'执行sql: {sql} \n 结果: {result}')
if result is not None:
# 将查询结果添加到响应字典里面,作用在,接口响应的内容某个字段 直接和数据库某个字段比对,在预期结果中
# 使用同样的语法提取即可
cls.extra_pool.update(result) @classmethod
def handle_extra(cls, extra_str: str, response: dict):
"""
处理提取参数栏
:param extra_str: excel中 提取参数栏内容,需要是 {"参数名": "jsonpath提取式"} 可以有多个
:param response: 当前用例的响应结果字典
"""
if extra_str != '':
extra_dict = convert_json(extra_str)
for k, v in extra_dict.items():
cls.extra_pool[k] = extractor(response, v)
logger.info(f'加入依赖字典,key: {k}, 对应value: {v}') @classmethod
def assert_result(cls, response: dict, expect_str: str):
""" 预期结果实际结果断言方法
:param response: 实际响应结果
:param expect_str: 预期响应内容,从excel中读取
return None
"""
# 后置sql变量转换
allure_step("当前可用参数池", cls.extra_pool)
expect_str = rep_expr(expect_str, cls.extra_pool)
expect_dict = convert_json(expect_str)
index = 0
for k, v in expect_dict.items():
# 获取需要断言的实际结果部分
actual = extractor(response, k)
index += 1
logger.info(
f'第{index}个断言,实际结果:{actual} | 预期结果:{v} \n断言结果 {actual == v}')
allure_step(f'第{index}个断言', f'实际结果:{actual} = 预期结果:{v}')
try:
assert actual == v
except AssertionError:
raise AssertionError(
f'第{index}个断言失败 -|- 实际结果:{actual} || 预期结果: {v}')

data_process.py

import json
from datetime import datetime
from typing import Union import pymysql from tools.read_file import ReadFile class DB:
mysql = ReadFile.read_config('$.database') def __init__(self):
"""
初始化数据库连接,并指定查询的结果集以字典形式返回
"""
self.connection = pymysql.connect(
host=self.mysql['host'],
port=self.mysql['port'],
user=self.mysql['user'],
password=self.mysql['password'],
db=self.mysql['db_name'],
charset=self.mysql.get('charset', 'utf8mb4'),
cursorclass=pymysql.cursors.DictCursor
) def execute_sql(self, sql: str) -> Union[dict, None]:
"""
执行sql语句方法,查询所有结果的sql只会返回一条结果(
比如说: 使用select * from cases , 结果将只会返回第一条数据 {'id': 1, 'name': 'updatehahaha', 'path': None, 'body': None, 'expected': '{"msg": "你好"}', 'api_id': 1, 'create_at': '2021-05-17 17:23:54', 'update_at': '2021-05-17 17:23:54'} ),支持select, delete, insert, update
:param sql: sql语句
:return: select 语句 如果有结果则会返回 对应结果字典,delete,insert,update 将返回None
"""
with self.connection.cursor() as cursor:
cursor.execute(sql)
result = cursor.fetchone()
# 使用commit解决查询数据出现概率查错问题
self.connection.commit()
return self.verify(result) def verify(self, result: dict) -> Union[dict, None]:
"""验证结果能否被json.dumps序列化"""
# 尝试变成字符串,解决datetime 无法被json 序列化问题
try:
json.dumps(result)
except TypeError: # TypeError: Object of type datetime is not JSON serializable
for k, v in result.items():
if isinstance(v, datetime):
result[k] = str(v)
return result def close(self):
"""关闭数据库连接"""
self.connection.close() if __name__ == '__main__':
print(DB().execute_sql("SELECT * FROM user where name like '%test2%'"))

db.py

import json
import os
import time def get_current_highest():
"""获取当前时间戳"""
return int(time.time()) def sum_data(a, b):
"""计算函数"""
return a + b def set_token(token: str):
"""设置token,直接返回字典"""
return {"Authorization": token} def get_time():
'''获取当前时间,格式YYYYmmddHHMMSSxx'''
return time.strftime('%Y%m%d%H%M%S',time.localtime(time.time())) def get_path():
# 获取当前路径
#curpath = os.path.dirname(os.path.realpath(__file__))
#获取上级目录
curpath = os.path.abspath(os.path.dirname(os.path.dirname(__file__)))
return curpath if __name__ == '__main__':
print(get_path())

hooks.py

import yaml
import xlrd
from tools import extractor from pathlib import Path class ReadFile:
config_dict = None
config_path = f"{str(Path(__file__).parent.parent)}/config/config.yaml" @classmethod
def get_config_dict(cls) -> dict:
"""读取配置文件,并且转换成字典
return cls.config_dict
"""
if cls.config_dict is None:
# 指定编码格式解决,win下跑代码抛出错误
with open(cls.config_path, "r", encoding="utf-8") as file:
cls.config_dict = yaml.load(file.read(), Loader=yaml.FullLoader)
return cls.config_dict @classmethod
def read_config(cls, expr: str = ".") -> dict:
"""默认读取config目录下的config.yaml配置文件,根据传递的expr jsonpath表达式可任意返回任何配置项
:param expr: 提取表达式, 使用jsonpath语法,默认值提取整个读取的对象
return 根据表达式返回的值
"""
return extractor(cls.get_config_dict(), expr) @classmethod
def read_testcase(cls):
"""
读取excel格式的测试用例,返回一个生成器对象
:return 生成器
"""
book = xlrd.open_workbook(cls.read_config("$.file_path.test_case"))
# 读取第一个sheet页
table = book.sheet_by_index(0)
for norw in range(1, table.nrows):
# 每行第4列 是否运行
if table.cell_value(norw, 4) != "否": # 每行第4列等于否将不读取内容
value = table.row_values(norw)
value.pop(4)
yield value if __name__ == '__main__':
pass

read_file.py

# 获取jenkins构建信息和本次报告地址
import os
import jenkins
import json
import urllib3
from tools import get_path
from tools.read_file import ReadFile dingding = ReadFile.read_config('$.dingding') # # jenkins登录地址
# jenkins_url = "http://192.168.1.107:8080/"
jenkins_url= dingding['jenkins_url']
# 获取jenkins对象
server = jenkins.Jenkins(jenkins_url, username='root', password='******')
# job名称
# job_name = "job/轻萤/"
job_name = dingding['job_name']
# job的url地址
job_url = jenkins_url + job_name
# 获取最后一次构建
job_last_build_url = server.get_info(job_name)['lastBuild']['url']
# 报告地址
report_url = job_last_build_url + 'allure' '''
钉钉推送方法:
读取report文件中"prometheusData.txt",循环遍历获取需要的值。
使用钉钉机器人的接口,拼接后推送text
''' def DingTalkSend():
d = {}
# 获取项目上级路径
path = get_path()
# 打开prometheusData 获取需要发送的信息
f = open(path + r'/report\html\export/prometheusData.txt', 'r')
for lines in f:
for c in lines:
launch_name = lines.strip('\n').split(' ')[0]
num = lines.strip('\n').split(' ')[1]
d.update({launch_name: num})
print(d)
f.close()
retries_run = d.get('launch_retries_run') # 运行总数
print('运行总数:{}'.format(retries_run))
status_passed = d.get('launch_status_passed') # 通过数量
print('通过数量:{}'.format(status_passed))
status_failed = d.get('launch_status_failed') # 不通过数量
print('通过数量:{}'.format(status_failed)) # 钉钉推送 url = '*******' # webhook
con = {"msgtype": "text",
"text": {
"content": "轻萤脚本执行完成。"
"\n测试概述:"
"\n运行总数:" + retries_run +
"\n通过数量:" + status_passed +
"\n失败数量:" + status_failed +
"\n构建地址:\n" + job_url +
"\n报告地址:\n" + report_url }
}
urllib3.disable_warnings()
http = urllib3.PoolManager()
jd = json.dumps(con)
jd = bytes(jd, 'utf-8')
http.request('POST', url, body=jd, headers={'Content-Type': 'application/json'}) if __name__ == '__main__':
DingTalkSend()

send_dd.py

import os
import jenkins
import requests, json
import urllib3
from tools import get_path
from tools.read_file import ReadFile urllib3.disable_warnings()
weixin = ReadFile.read_config('$.weixin')
###填写参数### # Corpid是企业号的标识
Corpid = weixin['Corpid']
# Secret是管理组凭证密钥
Secret = weixin['Secret']
# 应用ID
Agentid = weixin['Agentid']
# 部门id
Partyid = weixin['Partyid']
# jenkins登录地址
jenkins_url = weixin['jenkins_url']
# 获取jenkins对象
server = jenkins.Jenkins(jenkins_url, username='root', password='*****')
# job名称
job_name = weixin['job_name']
# job的url地址
job_url = jenkins_url + job_name
# 获取最后一次构建
job_last_build_url = server.get_info(job_name)['lastBuild']['url']
# 报告地址
report_url = job_last_build_url + 'allure' def GetTokenFromServer(Corpid, Secret):
"""获取access_token"""
Url = "https://qyapi.weixin.qq.com/cgi-bin/gettoken"
Data = {
"corpid": Corpid,
"corpsecret": Secret
}
r = requests.get(url=Url, params=Data, verify=False)
print(r.json())
if r.json()['errcode'] != 0:
return False
else:
Token = r.json()['access_token']
return Token def SendMessage(Partyid):
"""发送消息""" Token = GetTokenFromServer(Corpid, Secret) d = {}
# 获取项目上级路径
path = get_path()
# 打开prometheusData 获取需要发送的信息
f = open(path + r'/report/html/export/prometheusData.txt', 'r')
for lines in f:
for c in lines:
launch_name = lines.strip('\n').split(' ')[0]
num = lines.strip('\n').split(' ')[1]
d.update({launch_name: num})
print(d)
f.close()
retries_run = d.get('launch_retries_run') # 运行总数
print('运行总数:{}'.format(retries_run))
status_passed = d.get('launch_status_passed') # 通过数量
print('通过数量:{}'.format(status_passed))
status_failed = d.get('launch_status_failed') # 不通过数量
print('通过数量:{}'.format(status_failed)) # 发送消息
Url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s" % Token
Data = {
"toparty": Partyid,
"msgtype": "text",
"agentid": Agentid,
"text": {
"content": "轻萤脚本执行完成。"
"\n测试概述:"
"\n运行总数:" + retries_run +
"\n通过数量:" + status_passed +
"\n失败数量:" + status_failed +
"\n构建地址:\n" + job_url +
"\n报告地址:\n" + report_url },
"safe": "0"
}
r = requests.post(url=Url, data=json.dumps(Data), verify=False)
# 如果发送失败,将重试三次
n = 1
while r.json()['errcode'] != 0 and n < 4:
n = n + 1
Token = GetTokenFromServer(Corpid, Secret)
if Token:
Url = "https://qyapi.weixin.qq.com/cgi-bin/message/send?access_token=%s" % Token
r = requests.post(url=Url, data=json.dumps(Data), verify=False)
print(r.json())
return r.json() if __name__ == '__main__':
Status = SendMessage(Partyid)
print(Status)

send_vx.py

import yagmail
from tools import logger
import zipfile
import os
from tools.read_file import ReadFile file_path = ReadFile.read_config('$.file_path')
email = ReadFile.read_config('$.email')
class EmailServe: @staticmethod
def zip_report(file_path: str, out_path: str):
"""
压缩指定文件夹
:param file_path: 目标文件夹路径
:param out_path: 压缩文件保存路径+xxxx.zip
:return: 无
"""
file_path = f"{file_path}html"
zip = zipfile.ZipFile(out_path, "w", zipfile.ZIP_DEFLATED)
for path, dirnames, filenames in os.walk(file_path):
# 去掉目标跟路径,只对目标文件夹下边的文件及文件夹进行压缩
fpath = path.replace(file_path, '')
fpath = fpath and fpath + os.sep or ''
for filename in filenames:
zip.write(
os.path.join(
path, filename), os.path.join(fpath, filename))
zip.close() @staticmethod
def send_email(setting: dict, file_path):
"""
入参一个字典
:param user: 发件人邮箱
:param password: 邮箱授权码
:param host: 发件人使用的邮箱服务 例如:smtp.163.com
:param contents: 内容
:param addressees: 收件人列表
:param title: 邮件标题
:param enclosures: 附件列表
:param file_path: 需要压缩的文件夹
:return:
"""
EmailServe.zip_report(
file_path=file_path,
out_path=setting['enclosures'])
yag = yagmail.SMTP(
setting['user'],
setting['password'],
setting['host'])
# 发送邮件
yag.send(
setting['addressees'],
setting['title'],
setting['contents'],
setting['enclosures'])
# 关闭服务
yag.close()
logger.info("邮件发送成功!") if __name__ == '__main__':
#EmailServe.zip_report('../report/html', 'report.zip')
EmailServe.send_email(email, file_path['report'])

send_email.py

import os
import shutil
from test.conftest import pytest
from tools import logger
from tools.read_file import ReadFile
from tools.send_email import EmailServe file_path = ReadFile.read_config('$.file_path')
email = ReadFile.read_config('$.email') def run():
if os.path.exists('report/'):
shutil.rmtree(path='report/') # 解决 issues 句柄无效
logger.remove()
logger.add(file_path['log'], enqueue=True, encoding='utf-8')
logger.info("""
_ _ _ _____ _
__ _ _ __ (_) / \\ _ _| |_ __|_ _|__ ___| |_
/ _` | '_ \\| | / _ \\| | | | __/ _ \\| |/ _ \\/ __| __|
| (_| | |_) | |/ ___ \\ |_| | || (_) | | __/\\__ \\ |_
\\__,_| .__/|_/_/ \\_\\__,_|\\__\\___/|_|\\___||___/\\__|
|_|
Starting ... ... ...
""")
pytest.main(
args=[
'test/test_api.py',
f'--alluredir={file_path["report"]}/data'])
# 自动以服务形式打开报告
# os.system(f'allure serve {file_path["report"]}/data') # 本地生成报告
os.system(
f'allure generate {file_path["report"]}/data -o {file_path["report"]}/html --clean')
logger.success('报告已生成') # 发送邮件带附件报告
# EmailServe.send_email(email, file_path['report']) # # 删除本地附件
# os.remove(email['enclosures']) if __name__ == '__main__':
run()

run.py

搜索

复制

最新文章

  1. 把生成的excel文件直接提供为下载页效果
  2. Function类型
  3. VBA Excel 对比两列数据
  4. EF架构~引入规约(Specification)模式,让程序扩展性更强
  5. Matlab2014下载和破解方法,以及Matlab很好的学习网站
  6. vsUnit单元测试
  7. DJANGO增加超级用户
  8. hadoop 2.2.0 集群部署 坑
  9. IIS 配置好了,为什么网站打开一片空白?
  10. Java学习笔记--AWT事件处理
  11. [问题解决]LaTex 进行中文文档操作
  12. NoSQL:redis缓存数据库
  13. Python进程-实现
  14. SQLite 删除表(http://www.w3cschool.cc/sqlite/sqlite-drop-table.html)
  15. Android动态换肤(一、应用内置多套皮肤)
  16. 项目中Orcale存储过程优化记录
  17. 反射在ADO.NET方面的应用
  18. Spring AOP 的实现机制
  19. 汇编语言--微机CPU的指令系统(五)(标志位操作指令)
  20. laravel创建新的提交数据

热门文章

  1. 2023-03-01 react-native 实现 复制功能 @react-native-community/clipboard 报错:TypeError: null is not an object (evaluating &#39;NativeClipboard_1.default.setString&#39;)。
  2. (0409) Pycharm 的设置--参数设置(运行.py文件带参数,例如argument) 比如: demo.py -prj xxx
  3. DP7361 是一款立体声六通道线性输出的数模转换器-兼容CS4361
  4. 国产电源芯片DP4054 软硬件兼容TP4054 规格书资料
  5. python性能测试工具locust
  6. APP学习3
  7. C# List提取类中某列保存成新list
  8. 力扣53. 最大子数组和(dp)
  9. php框架之odp-环境部署安装
  10. Winform帮助文档(C#打开chm定位到特定页面)国内最全总结写法。原文文档带翻译