目标:对生产环境的服务质量进行量化,

解决办法:把pintpoint2里的数据转存入mysql数据库,作成报表,目前支持总请求数,错误请求数,中位数,平均数,95值(每分钟一次定时任务),其它指标可以根据要求进行增加。

数据库建库语句:

CREATE TABLE `time_analysis` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`datetime` datetime DEFAULT NULL COMMENT '时间',
`application_name` varchar(32) DEFAULT NULL COMMENT '应用名',
`totalcount` int(8) DEFAULT NULL COMMENT '总请求数',
`errorcount` int(8) DEFAULT NULL COMMENT '错误请求数',
`median` int(5) DEFAULT NULL COMMENT '中位数',
`average` int(5) DEFAULT NULL COMMENT '平均数',
`distribution95` int(5) DEFAULT NULL COMMENT '95值',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=358 DEFAULT CHARSET=utf8mb4 COMMENT='数据分析结果存署表';

CREATE TABLE `application_list` (
`application_name` varchar(32) NOT NULL,
`service_type` varchar(32) DEFAULT NULL COMMENT '服务类型',
`code` int(11) DEFAULT NULL COMMENT '服务类型代码',
`agents` int(11) DEFAULT NULL COMMENT 'agent个数',
`agentlists` varchar(256) DEFAULT NULL COMMENT 'agent list',
`update_time` datetime DEFAULT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (`application_name`),
UNIQUE KEY `Unique_App` (`application_name`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='pinpoint app list';

代码:getscatterdata.py 数据分析程序

# -*- coding: utf-8 -*-

# noinspection PyInterpreter,PyInterpreter

import sys
import requests
import time
import datetime
import json
import numpy as np

sys.path.append('../Golf')
import db #db.py

PPURL = "http://*****/"

From_Time = datetime.datetime.now() + datetime.timedelta(seconds=-60)
To_Time = datetime.datetime.now()
From_TimeStamp = int(time.mktime(From_Time.timetuple()))*1000
To_TimeStamp = int(time.mktime(datetime.datetime.now().timetuple()))*1000

class PinPoint(object):
"""docstring for PinPoint"""
def __init__(self, db):
self.db = db
super(PinPoint, self).__init__()

"""获取pinpoint中应用"""
def get_applications(self):
'''return application dict
'''
applicationListUrl = PPURL + "/applications.pinpoint"
res = requests.get(applicationListUrl)
if res.status_code != 200:
print("请求异常,请检查")
return
applicationLists = []
for app in res.json():
applicationLists.append(app)
applicationListDict={}
applicationListDict["applicationList"] = applicationLists
return applicationListDict
def getAgentList(self, appname):
AgentListUrl = PPURL + "/getAgentList.pinpoint"
param = {
'application':appname
}
res = requests.get(AgentListUrl, params=param)
if res.status_code != 200:
print("请求异常,请检查")
return
return len(res.json().keys()),json.dumps(list(res.json().keys()))

def update_servermap(self, appname, from_time=From_TimeStamp,
to_time=To_TimeStamp):
'''更新app上下游关系
:param appname: 应用名称
:param from_time: 起始时间
:param to_time: 终止时间
:
'''
#https://pinpoint.*****.com/getServerMapData.pinpoint?applicationName=test-app&from=1547721493000&to=1547721553000&callerRange=1&calleeRange=1&serviceTypeName=TOMCAT&_=1547720614229
#http://pinpoint.weixing-tech.com/getScatterData.pinpoint?application=daimler-manage-admin-pro&from=1618992044000&to=1618992104000&limit=5000&filter=&xGroupUnit=130&yGroupUnit=0&backwardDirection=true
param = {
'application':appname,
'from':from_time,
'to':to_time,
'limit':5000,
'filter':'',
'xGroupUnit':130,
'yGroupUnit':0,
'backwardDirection':'true'
}

# serverMapUrl = PPURL + "/getScatterData.pinpoint"
serverMapUrl = "{}{}".format(PPURL, "/getScatterData.pinpoint")
res = requests.get(serverMapUrl, params=param)
if res.status_code != 200:
print("请求异常,请检查")
return
timetemp = float(from_time/1000)
time_local = time.localtime(timetemp)
update_time = time.strftime('%Y-%m-%d %H:%M:%S',time_local)
time_analysis_list = []
time_analysis_error = []
links = res.json()["scatter"]["dotList"]
for link in links :
#时间戳,应用名,总请求数,错误请求数,中位数,平均数,95值(每分钟一次定时任务)
time_analysis_list.append(link[1])
time_analysis_error.append(link[4])
if len(time_analysis_list) != 0:
totalcount = int(len(time_analysis_list))
errorcount = time_analysis_error.count(0)
median = round(np.percentile(time_analysis_list,50))
average = round(sum(time_analysis_list)/len(time_analysis_list))
distribution95 = round(np.percentile(time_analysis_list,95))
else:
totalcount = 0
errorcount = 0
median = 0
average = 0
distribution95 = 0

sql = """
REPLACE into time_analysis( datetime, application_name, totalcount,errorcount, median, average, distribution95)
VALUES ("{}", "{}", {}, {}, {}, {}, {});""".format(update_time,appname,totalcount,errorcount,median,average,distribution95)
self.db.db_execute(sql)

def update_app(self):
"""更新application
"""
appdict = self.get_applications()
apps = appdict.get("applicationList")
update_time = time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time()))
for app in apps:
if app['applicationName'].startswith('test'):
continue
agents, agentlists = self.getAgentList(app['applicationName'])
sql = """
REPLACE into application_list( application_name,
service_type, code, agents, agentlists, update_time)
VALUES ("{}", "{}", {}, {}, '{}', "{}");""".format(
app['applicationName'], app['serviceType'],
app['code'], agents, agentlists, update_time)
self.db.db_execute(sql)
return True

def update_all_servermaps(self):
"""更新所有应用数
"""
appdict = self.get_applications()
apps = appdict.get("applicationList")
for app in apps:
self.update_servermap(app['applicationName'])
###删除3600天前数据
Del_Time = datetime.datetime.now() + datetime.timedelta(days=-3600)

sql = """delete from application_server_map where update_time <= "{}"
""".format(Del_Time)
self.db.db_execute(sql)
return True

def connect_db():
""" 建立SQL连接
"""
mydb = db.MyDB(
host="********",
user="******",
passwd="******",
db="******"
)
mydb.db_connect()
mydb.db_cursor()
return mydb

def main():
db = connect_db()
pp = PinPoint(db)
pp.update_app()
pp.update_all_servermaps()
db.db_close()

if __name__ == '__main__':
main()

db.py

import mysql.connector
class MyDB(object):
"""docstring for MyDB"""
def __init__(self, host, user, passwd , db):
self.host = host
self.user = user
self.passwd = passwd
self.db = db

self.connect = None
self.cursor = None
def db_connect(self):
"""数据库连接
"""
self.connect = mysql.connector.connect(host=self.host, user=self.user, passwd=self.passwd, database=self.db)
return self
def db_cursor(self):
if self.connect is None:
self.connect = self.db_connect()

if not self.connect.is_connected():
self.connect = self.db_connect()
self.cursor = self.connect.cursor()
return self
def get_rows(self , sql):
""" 查询数据库结果
:param sql: SQL语句
:param cursor: 数据库游标
"""
self.cursor.execute(sql)
return self.cursor.fetchall()
def db_execute(self, sql):
self.cursor.execute(sql)
self.connect.commit()
def db_close(self):
"""关闭数据库连接和游标
:param connect: 数据库连接实例
:param cursor: 数据库游标
"""
if self.connect:
self.connect.close()
if self.cursor:
self.cursor.close()

最新文章

  1. 关于XML序列化与CultureInfo
  2. OWIN的理解和实践(三) –Middleware开发入门
  3. HTML5应用之文件拖拽上传
  4. Java 程序 ——感想
  5. C语言面试题
  6. 【iCore、iCore2 双核心板】EPCS 实验(SPI Flash)(基于Verilog语言)
  7. typedef struct trx_struct trx_t;
  8. (一)Bootstrap简介
  9. thinkphp 模板替换
  10. 动态规划之HDU水题
  11. IE下全局对象报 脚本错误提示“对象不支持此属性或方法”解决方案
  12. Oracle表锁住处理
  13. WPF 杂谈——自定义控件
  14. Java 操作jar包工具类以及如何快速修改Jar包里的文件内容
  15. C# 接口使用方法
  16. VUE-利用OSS BrowserJS-SDK实现阿里OSS前端上传
  17. &lt;Android基础&gt;(三) UI开发 Part 3 RecyclerView
  18. mac/Linux源码安装TensorFlow
  19. win10 python27pyhton36共存
  20. [转]WordPress 主题教程 #2:模板文件和模板

热门文章

  1. Vulhub 漏洞学习之:Docker
  2. 微信小程序之permission字段
  3. 基于GLUT的PyOpenGL的使用
  4. ArcGIS for Android 开发环境搭建
  5. postgresql中条件表达式 coalesce、nullif 、greatest、least
  6. day09-MyBatis缓存
  7. ASP输出生成Word 、Excel、Txt文件的方法
  8. Socket:由于系统缓冲区空间不足或队列已满,不能执行套接字上的操作
  9. 在orangepi 3 lts上使用SmartCardReader(读卡器)
  10. [后端-Python]-项目练习集