Python用telnet设置,抓UDP抓采样点并显示
====main.bat====
echo off
rem "d:\Program\WiresharkPortable64\App\Wireshark\tshark.exe" --list-interfaces
rem "d:\Program\WiresharkPortable64\App\Wireshark\tshark.exe" -f udp -i 3 -a duration:1 -w data.pcapng
rem set FILE_NAME=dat
set FILE_NAME=data
set TSHARK="d:\Program\WiresharkPortable64\App\Wireshark\tshark.exe"
set ETH_PORT=3
set UDP_SECOND=1
cd /d %~dp0
rem python tel.py status
del %FILE_NAME%.pcapng
rem %TSHARK% --list-interfaces
%TSHARK% -f udp -i %ETH_PORT% -a duration:%UDP_SECOND% -w %FILE_NAME%.pcapng
rem Parse.exe %FILE_NAME%
del %FILE_NAME%
python Parse.py %FILE_NAME%
python DataPlot.py %FILE_NAME%
====ShowSmv.py====
import os
import tkinter
from tkinter import *
import tel
import Parse
import DataPlot
window = tkinter.Tk()
window.title('Show IO24x SMV')
window.geometry()
#window.geometry('800x400')
window.resizable(0, 0)
l1 = tkinter.Label(window, text='Tshark.exe:')
# 定义输入框1
e1 = tkinter.Entry(window)
e1.insert(0, r'd:\Program\WiresharkPortable64\App\Wireshark\tshark.exe')
l1.grid(row=0, column=0, sticky=E, ipadx=4)
e1.grid(row=0, column=1, sticky=W+E, columnspan=3, ipadx=4, padx=4)
# 定义输入框2
l2 = tkinter.Label(window, text='Eth num:')
e2 = tkinter.Entry(window)
e2.insert(0, '2')
l2.grid(row=1, column=0, sticky=E, ipadx=4)
e2.grid(row=1, column=1, sticky=W, padx=4)
# 定义输入框
l3 = tkinter.Label(window, text='UDP seconds:')
e3 = tkinter.Entry(window)
e3.insert(0, '1')
l3.grid(row=2, column=0, sticky=E, ipadx=4)
e3.grid(row=2, column=1, sticky=W, padx=4)
# 定义文本框
# 有disabled、normal 两个状态值,默认为normal
log = tkinter.Text(window, state='normal', width=64, height=16)
log.grid(row=4, column=0, rowspan=1, columnspan=4, sticky=W, padx=4)
def start_udp():
tshark = e1.get()
eth_port = e2.get()
udp_second = e3.get()
my_cmd = tshark + ' --list-interfaces'
my_str = os.popen(my_cmd)
log.insert('insert', my_cmd+'\n')
log.insert('insert', my_str.read())
my_cmd = tshark + ' -f udp -i ' + eth_port + ' -a duration:' + udp_second + ' -w data.pcapng'
my_str = os.popen(my_cmd)
log.insert('insert', my_cmd+'\n')
log.insert('insert', my_str.read())
def start_oct():
log.insert('insert', 'Show OCT\n')
tel.tel24x("EnableRCVTChannelsInUdp=0")
tel.tel24x("EnableOCTChannelsInUdp=1")
start_udp()
Parse.Parse('data.pcapng')
DataPlot.DataPlot('data')
def start_rcvt():
log.insert('insert', 'Show RCVT\n')
tel.tel24x("EnableRCVTChannelsInUdp=1")
tel.tel24x("EnableOCTChannelsInUdp=0")
start_udp()
Parse.Parse('data.pcapng')
DataPlot.DataPlot('data')
b1 = tkinter.Button(window, text="Show OCT", command=start_oct)
b2 = tkinter.Button(window, text="Show RCVT", command=start_rcvt)
b1.grid(row=3, column=0)
b2.grid(row=3, column=1)
window.mainloop()
====DataPlot.py====
# from distutils.command.install_egg_info import to_filename
import sys
import numpy as np
# import glob as glob
from pathlib import Path
# from pathlib import PureWindowsPath
# from pathlib import PurePath
# from scapy.all import *
import dpkt as dpkt
import matplotlib.pyplot as plt
uint32x48_type = np.dtype((np.uint32, 48))
int32x3_type = np.dtype((np.int32, 3))
floatx3_type = np.dtype((np.float32, 3))
floatx6_type = np.dtype((np.float32, 6))
# RCVT data structure
frame1_type = np.dtype(
[('SmpCnt', np.int32),
('RcvtRaw', int32x3_type),
('SvRcvt', floatx3_type)])
# OCT data structure
frame2_type = np.dtype(
[('SmpCnt', np.int32),
('AfeRaw', uint32x48_type),
('AfeTemp', floatx6_type),
('SvProtMeas', floatx6_type)])
# OCT+RCVT data structure
frame3_type = np.dtype(
[('SmpCnt', np.int32),
('AfeRaw', uint32x48_type),
('AfeTemp', floatx6_type),
('SvProtMeas', floatx6_type),
('RcvtRaw', int32x3_type),
('SvRcvt', floatx3_type)])
def DataPlot(filename='data'):
filepath = Path(filename)
try:
bufs = filepath.open("rb").read()
except:
print("Missing data file!")
sys.exit()
pkt_size = int.from_bytes(bufs[0:4], 'little')
if np.mod(len(bufs) - 4, pkt_size) != 0:
print("Wrong data file!")
sys.exit()
GET_RCVT_UDP = 0
GET_OCT_UDP = 0
if pkt_size == frame1_type.itemsize:
frame_type = frame1_type
GET_RCVT_UDP = 1
print((len(bufs) - 4) // pkt_size, "RCVT Samples found")
elif pkt_size == frame2_type.itemsize:
frame_type = frame2_type
GET_OCT_UDP = 1
print((len(bufs) - 4) // pkt_size, "OCT Samples found")
elif pkt_size == frame3_type.itemsize:
frame_type = frame3_type
GET_RCVT_UDP = 1
GET_OCT_UDP = 1
print((len(bufs) - 4) // pkt_size, "RCVT+OCT Samples found")
else:
print("Wrong data file!")
sys.exit()
# Cut the file header
# Reshape data to defined struct
data = np.frombuffer(bufs[4:], dtype=frame_type).copy()
# plot the data
plt.figure('SmpCnt')
plt.plot(data['SmpCnt'])
if GET_RCVT_UDP:
# int_value = 0x80000000|(~tmp + 1)
get_int = np.bitwise_and(data['RcvtRaw'], 0x807FFFFF)
get_int[get_int >= 0x80000000] = np.bitwise_or(np.bitwise_not(get_int[get_int >= 0x80000000]) + 1, 0x80000000)
plt.figure('RcvtRawToInt32')
plt.plot(get_int)
plt.legend(['PhA', 'PhB', 'PhC'])
plt.figure('SvRcvt')
plt.plot(data['SvRcvt'])
plt.legend(['PhA', 'PhB', 'PhC'])
if GET_OCT_UDP:
plt.figure('AfeRaw')
plt.plot(data['AfeRaw'])
# plt.legend(['PhA', 'PhB', 'PhC'])
plt.figure('AfeTemp')
plt.plot(data['AfeTemp'])
plt.legend(['Afe1-Up', 'Afe1-Low', 'Afe2-Up', 'Afe2-Low', 'Afe3-Up', 'Afe3-Low'])
plt.figure('SvProtMeas')
plt.plot(data['SvProtMeas'])
plt.legend(['ProtA', 'ProtB', 'ProtC', 'MeasA', 'MeasB', 'MeasC'])
plt.show()
====Parse.py====
#from distutils.command.install_egg_info import to_filename
import sys
#import numpy as np
#import glob as glob
from pathlib import Path
#from pathlib import PureWindowsPath
#from pathlib import PurePath
#from scapy.all import *
import dpkt as dpkt
def Parse(filename='data.pcapng'):
filepath = Path(filename)
try:
f = filepath.open("rb")
pcap = dpkt.pcapng.Reader(f)
print("reader created")
packets = [ dpkt.ethernet.Ethernet(buf) for _, buf in pcap if dpkt.ethernet.Ethernet(buf).type == dpkt.ethernet.ETH_TYPE_IP and dpkt.ethernet.Ethernet(buf).data.p == dpkt.ip.IP_PROTO_UDP]
print("packets read")
bufs = [x.data.data.data for x in packets if x.data.data.dport==6510]
except:
print('No valid IO24x UDP found!')
sys.exit()
if(len(bufs) < 3):
print('No valid IO24x UDP found!')
sys.exit()
pkt_len0=len(bufs[0])
pkt_len1=len(bufs[1])
cnt0=int.from_bytes(bufs[0][0:4],'little')
cnt1=int.from_bytes(bufs[1][0:4],'little')
cnt2=int.from_bytes(bufs[2][0:4],'little')
if cnt1>cnt0:
pkt_size=int(pkt_len0/(cnt1-cnt0))
else:
pkt_size=int(pkt_len1/(cnt2-cnt1))
print("packets filtered")
buf = b''.join(bufs)
print("packets joined")
#data = np.frombuffer(buf,dtype=frame2_type).copy()
#print("packets parsed")
#d = np.unwrap(data[:]['SmpCnt']/16000*np.pi*2)/np.pi/2*16000
#data[:]['SmpCnt'] = np.round(d)
buf_result =int.to_bytes(pkt_size,4,'little') + buf
to_filepath = filepath.with_suffix("")
with to_filepath.open('wb') as f:
f.write(buf_result)
print("data written")
====tel.py====
import telnetlib
import time
import sys
def tel24x(arg1='ver'):
command = arg1
host = "172.16.60.50"
#username = "admin"
#password = "admin"
tn = telnetlib.Telnet(host)
# 30.telnet登陆
# tn.read_until(b'Username: ', timeout=3)
# tn.write(username.encode('ascii') + b"\n")
# tn.read_until(b"Password: ", timeout=3)
# tn.write(password.encode('ascii') + b"\n")
time.sleep(0.3)
tn.write(command.encode('ascii') + b"\n")
time.sleep(0.2)
v_result = tn.read_very_eager().decode('ascii')
print(v_result)
最新文章
- 多栏多列布局(display:flex)
- 玩转Windows服务系列——Windows服务启动超时时间
- hadoop入门(3)&mdash;&mdash;hadoop2.0理论基础:安装部署方法
- linq查询xml
- EBS R12中重新enable失效用户之后,丢失职责
- html5 教程网站
- javascrip自定义对象的方式
- ubuntu下的第一个脚本file.sh
- DPdao
- javaScript中eval()方法转换json对象
- 【USACO 1.1.2】贪婪的送礼者
- bcp和load table
- 2018年发表论文阅读:Convolutional Simplex Projection Network for Weakly Supervised Semantic Segmentation
- python--model进阶
- POJ1128 Frame Stacking(拓扑排序+dfs)题解
- EJB系列 - EJB基础知识
- Session里存的密码或其他信息如何获取。
- STM32总线结构和存储器
- 首页的css
- 【Groovy】Spock with Maven