1. 前端页面样式

2. 前端代码

添加:

展示:

修改

删除

3. 后台代码

封装的DAO类数据

 # coding: utf-8

 from pdform.services.db.dbCore import GCP_LogicalVerify, GCP_LogicalVerifyRecord, GCP_ProjectSubject
from pdform import db, app
from pdform.services.util import Util
from pdform.services.sysParams import SYSParams
from pdform.services.appStaticVal import PDFormApp
from gcp.appStaticVal import GCPApp
from dataservices.config import *
from sqlalchemy import and_, or_, func, desc
from gcp.dao.project_subject_dao import GCPProjectSubjectDao
from gcp.dao.project_global_var_dao import ProjectGlobalVarDao import traceback
from xlrd import *
import xlrd
import js2py
import re class GCPLogicalVerifyDao:
"""
逻辑核查
"""
def __init__(self):
self.projectGlobalVarDao = ProjectGlobalVarDao() def get_status_message(self, status):
"""
根据核查状态返回信息
:param status:
:return:
"""
if status == GCPApp.TYPE_LV_STATUS_PASSED:
return u"通过"
elif status == GCPApp.TYPE_LV_STATUS_FAILED:
return u"失败"
return u"未知" def get_logical_verify(self, lv_id):
"""
获取单条核查项
:param lv_id:
:return:
"""
logical_verify = db.query(GCP_LogicalVerify).filter(GCP_LogicalVerify.lv_id == lv_id).first()
print(11,logical_verify)
if not logical_verify:
return None
logical_verify_dict = Util.row2dict(logical_verify)
print(222,logical_verify)
return logical_verify_dict def get_logical_verifies(self, project_id, params):
"""
返回核查项列表
:param project_id:
:param params:
:return:
"""
is_slice, is_slice_success, query_start, query_end = SYSParams.check_fetch_query_start_and_end(params)
if is_slice and not is_slice_success:
return [], 0 # 初始化
query = db.query(GCP_LogicalVerify)
print(1111,query) # 项目
if project_id is not None:
query = query.filter(GCP_LogicalVerify.project_id == project_id)
print(222,query) # 模糊匹配
has_search_text, search_text = SYSParams.check_fetch_query_search_text(params)
if has_search_text:
search_text = "%" + search_text + "%"
query = query.filter(GCP_LogicalVerify.name.like(search_text))
print(3333,query) # 滤重和排序
query = query.distinct(GCP_LogicalVerify.lv_id).order_by(GCP_LogicalVerify.create_date)
print(444,query) # 查询结果
all_logical_verify_array = query.all()
print(555,all_logical_verify_array)
total = len(all_logical_verify_array) if is_slice:
logical_verify_array = all_logical_verify_array[query_start:query_end]
else:
logical_verify_array = all_logical_verify_array
return Util.rows2dict(logical_verify_array), total def check_exist_logical_verify(self, project_id, name, type, lv_id):
"""
判断核查项是否重名
:param project_id:
:param name:
:return: is_exist
"""
logical_verify_array = db.query(GCP_LogicalVerify)\
.filter(GCP_LogicalVerify.project_id == project_id)\
.filter(GCP_LogicalVerify.type == type)\
.filter(GCP_LogicalVerify.name == name)\
.all()
lv_count = len(logical_verify_array)
if lv_count <= 0:
return False
if not lv_id or lv_count > 1:
return True
logical_verify = logical_verify_array[0]
if logical_verify.lv_id == lv_id:
return False
return True def add_logical_verify(self, project_id, logical_verify_dict):
"""
添加一条核查项
:param project_id:
:param logical_verify_dict:
:return: is_success, lv_id, error_code
"""
try:
is_created = False
logical_verify = db.query(GCP_LogicalVerify)\
.filter(GCP_LogicalVerify.project_id == project_id)\
.filter(GCP_LogicalVerify.name == logical_verify_dict["name"])\
.first()
if not logical_verify:
is_created = True
logical_verify = GCP_LogicalVerify()
logical_verify.lv_id = Util.getUUID()
logical_verify.project_id = project_id
logical_verify.create_date = Util.getDateTime()
if SYSParams.check_key_exist_and_not_none(logical_verify_dict, "name"):
logical_verify.name = logical_verify_dict["name"]
if SYSParams.check_key_exist_and_not_none(logical_verify_dict, "script"):
logical_verify.script = logical_verify_dict["script"]
if SYSParams.check_key_exist_and_not_none(logical_verify_dict, "type"):
logical_verify.type = logical_verify_dict["type"]
if is_created:
db.add(logical_verify)
db.commit()
return True, logical_verify.lv_id
except Exception, ex:
db.rollback()
app.logger.error(traceback.format_exc())
return False, None def update_logical_verify(self, lv_id, logical_verify_dict):
"""
修改一条核查项
:param lv_id:
:param logical_verify_dict:
:return: is_success
"""
try:
logical_verify = db.query(GCP_LogicalVerify).filter(GCP_LogicalVerify.lv_id == lv_id).first()
if not logical_verify:
return False
if SYSParams.check_key_exist_and_not_none(logical_verify_dict, "name"):
logical_verify.name = logical_verify_dict["name"]
if SYSParams.check_key_exist_and_not_none(logical_verify_dict, "script"):
logical_verify.script = logical_verify_dict["script"]
if SYSParams.check_key_exist_and_not_none(logical_verify_dict, "type"):
logical_verify.type = logical_verify_dict["type"]
db.commit()
return True
except Exception, ex:
db.rollback()
app.logger.error(traceback.format_exc())
return False def delete_logical_verify(self, lv_id):
"""
删除一条核查项
:param lv_id:
:return:
"""
try:
logical_verify = db.query(GCP_LogicalVerify).filter(GCP_LogicalVerify.lv_id == lv_id).first()
if not logical_verify:
return False
db.delete(logical_verify)
db.commit()
return True
except Exception, ex:
db.rollback()
app.logger.error(traceback.format_exc())
return False def check_fetch_worksheet(self, xls_files):
"""
获取XLS的worksheet
:param xls_files:
:return:
"""
if not xls_files or len(xls_files) <= 0:
return None
xls_data = xls_files[0].stream.read()
xls_rd = xlrd.open_workbook(None, sys.stdout, 0, USE_MMAP, xls_data)
if xls_rd.nsheets <= 0:
return None
ws = xls_rd.sheet_by_index(0)
return ws def get_cell_value(self, ws, i_row, i_col):
"""
获取指定字段的值
:param ws
:param i_row:
:param i_col
:return:
"""
try:
cell_value = ws.cell_value(i_row, i_col)
cell_value = cell_value if cell_value else ""
return cell_value
except Exception, ex:
app.logger.error(traceback.format_exc())
return "" def import_xls_files(self, project_id, xls_files):
"""
逻辑核查导入
:param project_id:
:param xls_files:
:return is_success, success_lines_count, failed_lines_array:
""" # 校验XLS数据,如果不合法,则不执行操作
ws = self.check_fetch_worksheet(xls_files)
if not ws:
return False, 0, [] failed_lines_array = []
success_lines_count = 0
last_table_name = ""
for i_row in range(1, ws.nrows):
# 名称,脚本
if ws.ncols < 2:
failed_lines_array.append(i_row+1)
continue # 名称不能为空
name = self.get_cell_value(ws, i_row, 0)
if not name:
failed_lines_array.append(i_row+1)
continue
logical_type = self.get_cell_value(ws, i_row, 2)
if not logical_type:
logical_type = ""
logical_verify_dict = {
"name": name,
"type": logical_type,
"script": self.get_cell_value(ws, i_row, 1)
}
is_success, lv_id = self.add_logical_verify(project_id, logical_verify_dict)
if not is_success:
failed_lines_array.append(i_row+1)
continue
success_lines_count += 1 return True, success_lines_count, failed_lines_array def get_logical_verify_records(self, project_id, params):
"""
返回核查记录列表
:param project_id:
:param params:
:return:
"""
is_slice, is_slice_success, query_start, query_end = SYSParams.check_fetch_query_start_and_end(params)
if is_slice and not is_slice_success:
return [], 0 # 初始化
query = db.query(GCP_LogicalVerifyRecord, GCP_ProjectSubject)\
.join(GCP_ProjectSubject, and_(GCP_ProjectSubject.project_id == GCP_LogicalVerifyRecord.project_id,
GCP_ProjectSubject.dm_id == GCP_LogicalVerifyRecord.dm_id)) # 项目
if project_id is not None:
query = query.filter(GCP_LogicalVerifyRecord.project_id == project_id) # 受试者
if SYSParams.check_key_exist_and_not_none(params, "dm_id"):
query = query.filter(GCP_LogicalVerifyRecord.dm_id == params["dm_id"]) # 模糊匹配
has_search_text, search_text = SYSParams.check_fetch_query_search_text(params)
if has_search_text:
search_text = "%" + search_text + "%"
query = query.filter(GCP_LogicalVerifyRecord.name.like(search_text)) # 滤重和排序
query = query.distinct(GCP_LogicalVerifyRecord.lvr_id).order_by(GCP_LogicalVerifyRecord.verify_date) # 查询结果
all_lvr_row_array = query.all()
total = len(all_lvr_row_array) if is_slice:
lvr_row_array = all_lvr_row_array[query_start:query_end]
else:
lvr_row_array = all_lvr_row_array
lvr_dict_array = []
for lvr_row in lvr_row_array:
logical_verify_record = lvr_row[0]
project_subject = lvr_row[1]
lvr_dict = Util.row2dict(logical_verify_record)
lvr_dict.update(Util.row2dict(project_subject))
lvr_dict["verify_status_message"] = self.get_status_message(lvr_dict["verify_status"])
lvr_dict_array.append(lvr_dict) return lvr_dict_array, total def clear_all_logical_verify_records(self, project_id):
"""
清除项目所有核查记录
:param project_id:
:return:
"""
try:
db.query(GCP_LogicalVerifyRecord).filter(GCP_LogicalVerifyRecord.project_id == project_id).delete()
db.commit()
return True
except Exception, ex:
db.rollback()
app.logger.error(traceback.format_exc())
return False def add_logical_verify_record(self, project_id, project_subject_dict, logical_verify_dict, logical_script_name, script_ret):
"""
添加核查记录
:param project_id:
:param project_subject_dict:
:param logical_verify_dict:
:param script_ret:
:return:
"""
try:
logical_verify_record = GCP_LogicalVerifyRecord()
logical_verify_record.lvr_id = Util.getUUID()
logical_verify_record.lv_id = logical_verify_dict["lv_id"]
logical_verify_record.project_id = project_id
logical_verify_record.dm_id = project_subject_dict["dm_id"]
logical_verify_record.name = logical_script_name
logical_verify_record.script = logical_verify_dict["script"]
logical_verify_record.verify_date = Util.getDateTime()
logical_verify_record.verify_status = \
GCPApp.TYPE_LV_STATUS_PASSED if script_ret else GCPApp.TYPE_LV_STATUS_FAILED
db.add(logical_verify_record)
db.commit()
return True
except Exception, ex:
db.rollback()
app.logger.error(traceback.format_exc())
return False def translate_script(self, script, global_var_dict):
"""
翻译脚本
:param script:
:param global_var_dict:
:return: is_success, new_script
"""
variant_pattern = "\$[A-Za-z0-9_/]+\$"
variant_array = re.findall(variant_pattern, script, re.I)
if not variant_array:
return True, script new_script = script
regex = re.compile(variant_pattern)
for variant in variant_array:
variant_name = variant[1:-1]
if variant_name not in global_var_dict:
return False, new_script
variant_value = global_var_dict[variant_name]
variant_value = '"%s"' % variant_value if variant_value else ''
new_script = regex.sub(variant_value, new_script, 1)
return True, new_script def verify_subject(self, project_id, project_subject_dict, logical_verify_dict_array):
"""
核查受试者
:param project_id:
:param project_subject_dict:
:param logical_verify_dict_array:
:return: is_success, error_code, success_count, failed_count
"""
# Global variants from system
global_var_name_array, global_var_dict = \
self.projectGlobalVarDao.get_global_vars(project_id, project_subject_dict["dm_id"], None) success_count = 0
failed_count = 0
for logical_verify_dict in logical_verify_dict_array:
logical_verify_name = logical_verify_dict["name"]
is_success, logical_verify_name = self.translate_script(logical_verify_name, global_var_dict)
if not is_success:
logical_verify_name = logical_verify_dict["name"]
script = logical_verify_dict["script"]
if not script:
# Skip if script is empty
self.add_logical_verify_record(
project_id, project_subject_dict, logical_verify_dict, logical_verify_name, True)
success_count += 1
continue # Translate script
is_success, new_script = self.translate_script(script, global_var_dict)
if is_success and new_script:
try:
# Evaluate script
script_ret = js2py.eval_js(new_script)
if script_ret:
# Script result is true
self.add_logical_verify_record(
project_id, project_subject_dict, logical_verify_dict, logical_verify_name, True)
success_count += 1
continue
except Exception, ex:
app.logger.error(traceback.format_exc()) self.add_logical_verify_record(
project_id, project_subject_dict, logical_verify_dict, logical_verify_name, False)
failed_count += 1 return True, 0, success_count, failed_count def verify_all(self, project_id, project_subject_dict_array):
"""
以受试者为核查对象遍历所有核查项
:param project_id:
:param project_subject_dict_array:
:return: is_success, error_code, success_count, failed_count
"""
if not project_subject_dict_array:
return True, 0, 0, 0
logical_verify_dict_array, total = self.get_logical_verifies(project_id, {})
if not logical_verify_dict_array:
return True, 0, 0, 0 success_count = 0
failed_count = 0
for project_subject_dict in project_subject_dict_array:
is_success, error_code, subject_success_count, subject_failed_count = \
self.verify_subject(project_id, project_subject_dict, logical_verify_dict_array)
if not is_success:
return False, error_code, 0, 0
success_count += subject_success_count
failed_count += subject_failed_count return True, 0, success_count, failed_count def get_screening_period_logical_verifies(self, project_id):
"""
返回核查项列表
:param project_id:
:param params:
:return:
"""
logical_verify_list = db.query(GCP_LogicalVerify)\
.filter(GCP_LogicalVerify.project_id == project_id)\
.filter(GCP_LogicalVerify.type == 0)\
.all()
if not logical_verify_list:
return []
return Util.rows2dict(logical_verify_list) def screening_period_verify_subject(self, project_id, dm_id):
"""
核查受试者
:param project_id:
:param project_subject_dict:
:param logical_verify_dict_array:
:return: is_success, error_code, success_count, failed_count
"""
# Global variants from system logical_verify_dict_array = self.get_screening_period_logical_verifies(project_id) global_var_name_array, global_var_dict = \
self.projectGlobalVarDao.get_global_vars(project_id, dm_id, None)
logical_verify_result = []
for logical_verify_dict in logical_verify_dict_array:
script_name = logical_verify_dict["name"]
is_success, script_name = self.translate_script(script_name, global_var_dict)
if not is_success:
script_name = logical_verify_dict["name"]
script = logical_verify_dict["script"]
if not script:
logical_verify_result.append(script_name)
continue
# Translate script
is_success, new_script = self.translate_script(script, global_var_dict)
if is_success and new_script:
try:
# Evaluate script
script_ret = js2py.eval_js(new_script)
if script_ret:
logical_verify_result.append(script_name)
continue
except Exception, ex:
app.logger.error(traceback.format_exc()) return logical_verify_result #逻辑核查记录DAO class GCPLogicalVerifyRecordDao: """
逻辑核查记录
"""
def __init__(self):
self.projectGlobalVarDao = ProjectGlobalVarDao() def get_logical_verifies(self, project_id, params):
"""
返回核查项列表
:param project_id:
:param params:
:return:
"""
is_slice, is_slice_success, query_start, query_end = SYSParams.check_fetch_query_start_and_end(params)
if is_slice and not is_slice_success:
return [], 0 # 初始化
query = db.query(GCP_LogicalVerifyRecord) # 项目
if project_id is not None:
query = query.filter(GCP_LogicalVerifyRecord.project_id == project_id) # 模糊匹配
has_search_text, search_text = SYSParams.check_fetch_query_search_text(params)
if has_search_text:
search_text = "%" + search_text + "%"
query = query.filter(GCP_LogicalVerifyRecord.name.like(search_text)) # 查询结果
all_logical_verify_array = query.all()
total = len(all_logical_verify_array) if is_slice:
logical_verify_array = all_logical_verify_array[query_start:query_end]
else:
logical_verify_array = all_logical_verify_array
return Util.rows2dict(logical_verify_array), total

获取单条数据:

# coding: utf-8

from pdform.api_request import PDRequest
from pdform.services.appStaticVal import PDFormApp
from gcp.dao.logical_verify_dao import GCPLogicalVerifyDao
from gcp.dao.project_subject_dao import GCPProjectSubjectDao class PDLogicalVerify(PDRequest):
"""
逻辑核查
URI映射为: /api/v1/logical_verify
"""
API_NAME = "logical_verify" def __init__(self):
"""
初始化
"""
super(PDLogicalVerify, self).__init__("lv_id")
self.logicalVerifyDao = GCPLogicalVerifyDao() #继承父类的对象
self.parser.add_argument("name")
self.parser.add_argument("type")
self.parser.add_argument("script")
self.parser.add_argument("page_index", type=int)
self.parser.add_argument("page_size", type=int)
self.parser.add_argument("search_text")
self.parser.add_argument("command")
self.parser.add_argument("dm_id") def get_request(self, args, lv_id):
"""
获取单条核查信息
:param lv_id:
:return:
"""
logical_verify_dict = self.logicalVerifyDao.get_logical_verify(lv_id) print("",logical_verify_dict["lv_id"]) #
print(logical_verify_dict["project_id"])
print(logical_verify_dict["name"])
print(logical_verify_dict["type"])
print(logical_verify_dict["script"])
print(logical_verify_dict["create_date"])
if not logical_verify_dict:
return PDFormApp.API_Error(PDFormApp.INFO_ERROR_DB_NONE), 200
return {"status": PDFormApp.STATUE_SUCCESS, "message": PDFormApp.STATUE_SUCCESS,
"logical_verify": logical_verify_dict}, 200 打印结果:

('222', u'7e356c00822211e98840f4b7e2e95ca4')

11d262c0806a11e984fff4b7e2e95ca4
胜多负少
0
大幅度
2019-05-29 23:00:26

查看所有:

  def list_request(self, args):
"""
获取所有核查信息
:return:
"""
project_id = args["project_id"] if args["project_id"] else ""
logical_verify_dict_array, total = self.logicalVerifyDao.get_logical_verifies(project_id, args) for i in logical_verify_dict_array:
print("lv_id",i["lv_id"])
print("projectid",i["project_id"])
print("name",i["name"])
print('type',i["type"])
print("========")
return {"status": PDFormApp.STATUE_SUCCESS, "message": PDFormApp.STATUE_SUCCESS,
"logical_verify_list": logical_verify_dict_array, "total": total}, 200 打印结果:

('lv_id', u'6451ae1e807e11e98f55f4b7e2e95ca4')
('projectid', u'11d262c0806a11e984fff4b7e2e95ca4')
('name', u'1221')
('type', 0)
========
('lv_id', u'e54f6d4081f111e9b978f4b7e2e95ca4')
('projectid', u'11d262c0806a11e984fff4b7e2e95ca4')
('name', u'333')
('type', 1)
========
('lv_id', u'ec6f9d7081f111e980c8f4b7e2e95ca4')
('projectid', u'11d262c0806a11e984fff4b7e2e95ca4')
('name', u'3332')
('type', 0)
========


添加:

 def add_request(self, args):
"""
添加一条核查
:return:
"""
project_id = args["project_id"] if args["project_id"] else ""
if args["command"] == "verify":
dm_id = args["dm_id"]
if dm_id:
project_subject_dict = GCPProjectSubjectDao.get_subject(project_id, dm_id)
if not project_subject_dict:
return PDFormApp.API_Error(PDFormApp.INFO_ERROR_DB_NONE), 200
project_subject_dict_array = [project_subject_dict]
else:
project_subject_dict_array, total = GCPProjectSubjectDao().getProjectSubjects(args)
if not project_subject_dict_array:
return PDFormApp.API_Error(PDFormApp.INFO_ERROR_DB_NONE), 200 is_success, error_code, success_count, failed_count = \
self.logicalVerifyDao.verify_all(project_id, project_subject_dict_array)
if not is_success:
return PDFormApp.API_Error(error_code), 200
return {"status": PDFormApp.STATUE_SUCCESS, "message": PDFormApp.INFO_RUN_OK,
"success_lvr_count": success_count, "failed_lvr_count": failed_count}, 200
if not args["name"]:
return PDFormApp.API_Error(PDFormApp.INFO_WARNING_LESS_PARAMETERS), 200
if self.logicalVerifyDao.check_exist_logical_verify(project_id, args["name"], args["type"], None):
return PDFormApp.API_Error(PDFormApp.INFO_WARNING_LOGICAL_VERIFY_ALREADY_EXIST), 200
is_success, lv_id = self.logicalVerifyDao.add_logical_verify(project_id, args)
if not is_success:
return PDFormApp.API_Error(PDFormApp.INFO_ERROR_DB_WRITE), 200
return {"status": PDFormApp.STATUE_SUCCESS, "message": PDFormApp.INFO_RUN_OK, "lv_id": lv_id}, 200

最新文章

  1. 20145224&amp;20145238 《信息安全系统设计基础》 第四次实验
  2. SAP中查询用户操作日志的事务码
  3. Image Segmentation的定义
  4. 【转】关于URL编码/javascript/js url 编码/url的三个js编码函数
  5. JsRender系列demo(9)自定义函数
  6. iOS应用拨打电话
  7. 浅谈js中的正则表达式
  8. htmlUtil 网页爬取工具
  9. maven与eclipse连接的配置
  10. vue父子组件的传值总结
  11. Oracle判断周末
  12. GTX使用(更新中)
  13. Hibernate 再接触 一对多单向关联
  14. IDEA2018 license
  15. Cesium教程系列汇总【转】
  16. ES6学习之-let 和const命令
  17. C#转义字符[转]
  18. 除去Scala的糖衣(13) -- Default Parameter Value
  19. java代码-------Runnable的用法
  20. 浅谈Django的中间件与Python的装饰器

热门文章

  1. easyUI相关文件的引入
  2. Gcc如何知道文件类型。
  3. 使用Cookie记住登录用户
  4. linux Nginx 的安装
  5. nmbd - 向客户端提供构造在IP之上的NetBIOS名字服务的NetBIOS名字服务器
  6. 14.Linux-CentOS系统proc文件系统丢失
  7. 116-基于5VLX110T FPGA FMC接口功能验证6U CPCI平台 光纤PCIe卡
  8. Codeforces 962 /2错误 相间位置排列 堆模拟 X轴距离最小值 前向星点双连通分量求只存在在一个简单环中的边
  9. 洛谷 P4665 [BalticOI 2015]Network
  10. html2canvas 把h5网页保存为图片 区域保存