AirTest源码分析之运行器
from: https://blog.csdn.net/u012897401/article/details/82900562
使用:根据airtest文档说明,可以通过命令行来启动air脚本,需要传入一些参数如设备号,脚本名等,这样就可以不用通过AirTest IDE来运行了,可以集成,所以我们也可以写个脚本来控制air脚本的运行。
文档链接:https://airtest.readthedocs.io/en/latest/README_MORE.html#running-air-from-cli
这就是说你用airtest run 命令,可以指定运行某air脚本,指定设备,指定log输出地址
翻一下源码,找到runner.py,可以看得出这是一个unittest的子类AirtestCase,入口是run_script接口
1、程序的入口——run_script,传入参数parsed_args,进来以后传给全局变量args,给airtestcase里调用。
后面的三行代码,用过unittest的朋友应该都很熟悉了
def run_script(parsed_args, testcase_cls=AirtestCase):
global args # make it global deliberately to be used in AirtestCase & test scripts
args = parsed_args
suite = unittest.TestSuite()#创建一个测试套件
suite.addTest(testcase_cls())#添加一条AirtestCase类型的case,因为接口入参默认testcase_cls=AirtestCase
result = unittest.TextTestRunner(verbosity=0).run(suite)#运行它
if not result.wasSuccessful():
sys.exit(-1)#退出
2、AirtestCase类
这里定义好了setUpClass、setUp、runTest、tearDown、tearDownClass
分别做了什么呢,一个个看一下:
@classmethod
def setUpClass(cls):
cls.args = args #runScrip传进来的参数
setup_by_args(args) #设置参数,设备、log路径、脚本路径
# setup script exec scope
cls.scope = copy(globals())
cls.scope["exec_script"] = cls.exec_other_script
def setUp(self):
if self.args.log and self.args.recording: #如果参数配置了log路径且recording为Ture
for dev in G.DEVICE_LIST:
try:
dev.start_recording() #开始录制
except:
traceback.print_exc()
def tearDown(self):#停止录制
if self.args.log and self.args.recording:
for k, dev in enumerate(G.DEVICE_LIST):
try:
output = os.path.join(self.args.log, "recording_%d.mp4" % k)
dev.stop_recording(output)
except:
traceback.print_exc()
def runTest(self):#运行脚本
scriptpath = self.args.script #参数传入的脚本路径
#分割路径最后的名字,替换.air为.py,也就是传入‘d:/aaa/bbb.air’,pyfilename就为bbb.py
pyfilename = os.path.basename(scriptpath).replace(self.SCRIPTEXT, ".py")
#再组装py文件的路径,d:/aaa/bbb.air/bbb.py,看过air脚本文件就知道,这才是脚本代码,其他是图片
pyfilepath = os.path.join(scriptpath, pyfilename)
pyfilepath = os.path.abspath(pyfilepath)
self.scope["__file__"] = pyfilepath
#读进来
with open(pyfilepath, 'r', encoding="utf8") as f:
code = f.read()
pyfilepath = pyfilepath.encode(sys.getfilesystemencoding())
#运行读进来的脚本
try:
exec(compile(code.encode("utf-8"), pyfilepath, 'exec'), self.scope)
except Exception as err:
#出错处理,日志
tb = traceback.format_exc()
log("Final Error", tb)
six.reraise(*sys.exc_info())
def exec_other_script(cls, scriptpath):#这个接口不分析了,因为已经用using代替了。
#这个接口就是在你的air脚本中如果用了exec_script就会调用这里,它会把子脚本的图片文件拷贝过来,并读取py文件运行
#参数设置
def setup_by_args(args):
# init devices
if isinstance(args.device, list):#如果传入的设备参数是一个列表,所以命令行可以设置多个设备哦
devices = args.device
elif args.device:
devices = [args.device]#不是列表就给转成列表
else:
devices = []
print("do not connect device")
# set base dir to find tpl
args.script = decode_path(args.script)#脚本路径
# set log dir日志路径
if args.log is True:
print("save log in %s/log" % args.script)
args.log = os.path.join(args.script, "log")
elif args.log:
print("save log in '%s'" % args.log)
args.log = decode_path(args.log)
else:
print("do not save log")
# guess project_root to be basedir of current .air path把air脚本的路径设置为工程根目录
project_root = os.path.dirname(args.script) if not ST.PROJECT_ROOT else None
auto_setup(args.script, devices, args.log, project_root)#这个接口很熟悉吧,在IDE里新建一个air脚本就会自动生成这句,里面就设备的初始化连接,设置工程路径,日志路径。
所以,上层的air脚本不用什么测试框架,是这个airtestCase在支撑,这种设计思路确实降低了脚本的上手门槛,跟那些用excel表格、自然语言脚本的框架有点像。
源码贴出来方便看
# -*- coding: utf-8 -*-
import unittest
import os
import sys
import six
import re
import shutil
import traceback
import warnings
from io import open
from airtest.core.api import G, auto_setup, log
from airtest.core.settings import Settings as ST
from airtest.utils.compat import decode_path
from copy import copy
class AirtestCase(unittest.TestCase):
PROJECT_ROOT = "."
SCRIPTEXT = ".air"
TPLEXT = ".png"
@classmethod
def setUpClass(cls):
cls.args = args
setup_by_args(args)
# setup script exec scope
cls.scope = copy(globals())
cls.scope["exec_script"] = cls.exec_other_script
def setUp(self):
if self.args.log and self.args.recording:
for dev in G.DEVICE_LIST:
try:
dev.start_recording()
except:
traceback.print_exc()
def tearDown(self):
if self.args.log and self.args.recording:
for k, dev in enumerate(G.DEVICE_LIST):
try:
output = os.path.join(self.args.log, "recording_%d.mp4" % k)
dev.stop_recording(output)
except:
traceback.print_exc()
def runTest(self):
scriptpath = self.args.script
pyfilename = os.path.basename(scriptpath).replace(self.SCRIPTEXT, ".py")
pyfilepath = os.path.join(scriptpath, pyfilename)
pyfilepath = os.path.abspath(pyfilepath)
self.scope["__file__"] = pyfilepath
with open(pyfilepath, 'r', encoding="utf8") as f:
code = f.read()
pyfilepath = pyfilepath.encode(sys.getfilesystemencoding())
try:
exec(compile(code.encode("utf-8"), pyfilepath, 'exec'), self.scope)
except Exception as err:
tb = traceback.format_exc()
log("Final Error", tb)
six.reraise(*sys.exc_info())
@classmethod
def exec_other_script(cls, scriptpath):
"""run other script in test script"""
warnings.simplefilter("always")
warnings.warn("please use using() api instead.", PendingDeprecationWarning)
def _sub_dir_name(scriptname):
dirname = os.path.splitdrive(os.path.normpath(scriptname))[-1]
dirname = dirname.strip(os.path.sep).replace(os.path.sep, "_").replace(cls.SCRIPTEXT, "_sub")
return dirname
def _copy_script(src, dst):
if os.path.isdir(dst):
shutil.rmtree(dst, ignore_errors=True)
os.mkdir(dst)
for f in os.listdir(src):
srcfile = os.path.join(src, f)
if not (os.path.isfile(srcfile) and f.endswith(cls.TPLEXT)):
continue
dstfile = os.path.join(dst, f)
shutil.copy(srcfile, dstfile)
# find script in PROJECT_ROOT
scriptpath = os.path.join(ST.PROJECT_ROOT, scriptpath)
# copy submodule's images into sub_dir
sub_dir = _sub_dir_name(scriptpath)
sub_dirpath = os.path.join(cls.args.script, sub_dir)
_copy_script(scriptpath, sub_dirpath)
# read code
pyfilename = os.path.basename(scriptpath).replace(cls.SCRIPTEXT, ".py")
pyfilepath = os.path.join(scriptpath, pyfilename)
pyfilepath = os.path.abspath(pyfilepath)
with open(pyfilepath, 'r', encoding='utf8') as f:
code = f.read()
# replace tpl filepath with filepath in sub_dir
code = re.sub("[\'\"](\w+.png)[\'\"]", "\"%s/\g<1>\"" % sub_dir, code)
exec(compile(code.encode("utf8"), pyfilepath, 'exec'), cls.scope)
def setup_by_args(args):
# init devices
if isinstance(args.device, list):
devices = args.device
elif args.device:
devices = [args.device]
else:
devices = []
print("do not connect device")
# set base dir to find tpl
args.script = decode_path(args.script)
# set log dir
if args.log is True:
print("save log in %s/log" % args.script)
args.log = os.path.join(args.script, "log")
elif args.log:
print("save log in '%s'" % args.log)
args.log = decode_path(args.log)
else:
print("do not save log")
# guess project_root to be basedir of current .air path
project_root = os.path.dirname(args.script) if not ST.PROJECT_ROOT else None
auto_setup(args.script, devices, args.log, project_root)
def run_script(parsed_args, testcase_cls=AirtestCase):
global args # make it global deliberately to be used in AirtestCase & test scripts
args = parsed_args
suite = unittest.TestSuite()
suite.addTest(testcase_cls())
result = unittest.TextTestRunner(verbosity=0).run(suite)
if not result.wasSuccessful():
sys.exit(-1)
最新文章
- ThreadLocal内部机制及使用方法
- SpringMVC之控制器的单例和多例管理
- iOS学习之代码块(Block)
- java:快速文件分割及合并
- CocoaPods安装使用及上传
- Jquery对Cookie的操作
- java中的BigDecimal和String的相互转换
- BZOJ 3505
- ASP.NET程序中动态修改web.config中的设置项目(后台CS代码)
- [转] npm 模块安装机制简介
- 微信企业号 出现redirect_uri unauthorized 50001 解决办法
- Deformable Convolutional Network
- 经典文摘:饿了么的 PWA 升级实践(结合Vue.js)
- TZOJ 3820 Revenge of Fibonacci(大数+trie)
- Spring Boot 2.0正式发布,新特性解读
- CentOS 查看系统 CPU 个数、核心数、线程数
- 分享一个.NET(C#)按指定字母个数截断英文字符串的方法–提供枚举选项,可保留完整单词
- asp.net MVC中防止跨站请求攻击(CSRF)的ajax用法
- 201709012工作日记--一台电脑创建两个Github账户上传代码
- webstorm 调出project