数据通信

概述:

netty的ReadTimeOut实现方案3

服务端:

public class Server {

    public static void main(String[] args) throws Exception{

        EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
//设置日志
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ReadTimeoutHandler(5));
sc.pipeline().addLast(new ServerHandler());
}
}); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully(); }
}

主要是加入sc.pipeline().addLast(new ReadTimeoutHandler(5));

客户端:

public class Client {

    private static class SingletonHolder {
static final Client instance = new Client();
} public static Client getInstance() {
return SingletonHolder.instance;
} private EventLoopGroup group;
private Bootstrap b;
private ChannelFuture cf; private Client() {
group = new NioEventLoopGroup();
b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
// 超时handler(当服务器端与客户端在指定时间以上没有任何进行通信,则会关闭响应的通道,主要为减小服务端资源占用)
sc.pipeline().addLast(new ReadTimeoutHandler(5));
sc.pipeline().addLast(new ClientHandler());
}
});
} public void connect() {
try {
this.cf = b.connect("127.0.0.1", 8765).sync();
System.out.println("远程服务器已经连接, 可以进行数据交换..");
} catch (Exception e) {
e.printStackTrace();
}
} public ChannelFuture getChannelFuture() { if (this.cf == null) {
this.connect();
}
if (!this.cf.channel().isActive()) {
this.connect();
} return this.cf;
} public static void main(String[] args) throws Exception {
final Client c = Client.getInstance();
// c.connect(); ChannelFuture cf = c.getChannelFuture();
for (int i = 1; i <= 3; i++) {
Request request = new Request();
request.setId("" + i);
request.setName("pro" + i);
request.setRequestMessage("数据信息" + i);
cf.channel().writeAndFlush(request);
TimeUnit.SECONDS.sleep(4);
} cf.channel().closeFuture().sync(); new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("进入子线程...");
ChannelFuture cf = c.getChannelFuture();
System.out.println(cf.channel().isActive());
System.out.println(cf.channel().isOpen()); // 再次发送数据
Request request = new Request();
request.setId("" + 4);
request.setName("pro" + 4);
request.setRequestMessage("数据信息" + 4);
cf.channel().writeAndFlush(request);
cf.channel().closeFuture().sync();
System.out.println("子线程结束.");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start(); System.out.println("断开连接,主线程结束.."); } }

主要看getChannelFuture这个方法,this.cf == null是第一次连接的时候用到的,!this.cf.channel().isActive() 是连接超时后重新发起连接用到的。

其他的代码:

public class ClientHandler extends ChannelHandlerAdapter {

    @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
Response resp = (Response) msg;
System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());
} finally {
ReferenceCountUtil.release(msg);
}
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
} } public class ServerHandler extends ChannelHandlerAdapter { @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Request request = (Request) msg;
System.out
.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
Response response = new Response();
response.setId(request.getId());
response.setName("response" + request.getId());
response.setResponseMessage("响应内容" + request.getId());
ctx.writeAndFlush(response);// .addListener(ChannelFutureListener.CLOSE);
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
} } public final class MarshallingCodeCFactory { /**
* 创建Jboss Marshalling解码器MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
//首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
//创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
//根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
return decoder;
} /**
* 创建Jboss Marshalling编码器MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
//构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
} public class Request implements Serializable { private static final long serialVersionUID = 1L; private String id;
private String name;
private String requestMessage; public String getId() {
return id;
} public void setId(String id) {
this.id = id;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public String getRequestMessage() {
return requestMessage;
} public void setRequestMessage(String requestMessage) {
this.requestMessage = requestMessage;
} } public class Response implements Serializable { private static final long serialVersionUID = 1L; private String id;
private String name;
private String responseMessage; public String getId() {
return id;
} public void setId(String id) {
this.id = id;
} public String getName() {
return name;
} public void setName(String name) {
this.name = name;
} public String getResponseMessage() {
return responseMessage;
} public void setResponseMessage(String responseMessage) {
this.responseMessage = responseMessage;
} }

心跳检测

概述:

代码示例:

public class Server {

    public static void main(String[] args) throws Exception {

        EventLoopGroup pGroup = new NioEventLoopGroup();
EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap();
b.group(pGroup, cGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)
// 设置日志
.handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ServerHeartBeatHandler());
}
}); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync();
pGroup.shutdownGracefully();
cGroup.shutdownGracefully(); }
} public class ServerHeartBeatHandler extends ChannelHandlerAdapter { /** key:ip value:auth */
private static HashMap<String, String> AUTH_IP_MAP = new HashMap<String, String>();
private static final String SUCCESS_KEY = "auth_success_key"; static {
AUTH_IP_MAP.put("192.168.1.200", "1234");
} private boolean auth(ChannelHandlerContext ctx, Object msg) {
// System.out.println(msg);
String[] ret = ((String) msg).split(",");
String auth = AUTH_IP_MAP.get(ret[0]);
if (auth != null && auth.equals(ret[1])) {
ctx.writeAndFlush(SUCCESS_KEY);
return true;
} else {
ctx.writeAndFlush("auth failure !").addListener(ChannelFutureListener.CLOSE);
return false;
}
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof String) {
auth(ctx, msg);
} else if (msg instanceof RequestInfo) { RequestInfo info = (RequestInfo) msg;
System.out.println("--------------------------------------------");
System.out.println("当前主机ip为: " + info.getIp());
System.out.println("当前主机cpu情况: ");
HashMap<String, Object> cpu = info.getCpuPercMap();
System.out.println("总使用率: " + cpu.get("combined"));
System.out.println("用户使用率: " + cpu.get("user"));
System.out.println("系统使用率: " + cpu.get("sys"));
System.out.println("等待率: " + cpu.get("wait"));
System.out.println("空闲率: " + cpu.get("idle")); System.out.println("当前主机memory情况: ");
HashMap<String, Object> memory = info.getMemoryMap();
System.out.println("内存总量: " + memory.get("total"));
System.out.println("当前内存使用量: " + memory.get("used"));
System.out.println("当前内存剩余量: " + memory.get("free"));
System.out.println("--------------------------------------------"); ctx.writeAndFlush("info received!");
} else {
ctx.writeAndFlush("connect failure!").addListener(ChannelFutureListener.CLOSE);
}
} } public class Client { public static void main(String[] args) throws Exception { EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class).handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast(new ClienHeartBeattHandler());
}
}); ChannelFuture cf = b.connect("127.0.0.1", 8765).sync(); cf.channel().closeFuture().sync();
group.shutdownGracefully();
}
} public class ClienHeartBeattHandler extends ChannelHandlerAdapter { private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private ScheduledFuture<?> heartBeat;
// 主动向服务器发送认证信息
private InetAddress addr; private static final String SUCCESS_KEY = "auth_success_key"; @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
addr = InetAddress.getLocalHost();
String ip = addr.getHostAddress();
String key = "1234";
// 证书
String auth = ip + "," + key;
ctx.writeAndFlush(auth);
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if (msg instanceof String) {
String ret = (String) msg;
if (SUCCESS_KEY.equals(ret)) {
// 握手成功,主动发送心跳消息
this.heartBeat = this.scheduler.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 2,
TimeUnit.SECONDS);
System.out.println(msg);
} else {
System.out.println(msg);
}
}
} finally {
ReferenceCountUtil.release(msg);
}
} private class HeartBeatTask implements Runnable {
private final ChannelHandlerContext ctx; public HeartBeatTask(final ChannelHandlerContext ctx) {
this.ctx = ctx;
} @Override
public void run() {
try {
RequestInfo info = new RequestInfo();
// ip
info.setIp(addr.getHostAddress());
Sigar sigar = new Sigar();
// cpu prec
CpuPerc cpuPerc = sigar.getCpuPerc();
HashMap<String, Object> cpuPercMap = new HashMap<String, Object>();
cpuPercMap.put("combined", cpuPerc.getCombined());
cpuPercMap.put("user", cpuPerc.getUser());
cpuPercMap.put("sys", cpuPerc.getSys());
cpuPercMap.put("wait", cpuPerc.getWait());
cpuPercMap.put("idle", cpuPerc.getIdle());
// memory
Mem mem = sigar.getMem();
HashMap<String, Object> memoryMap = new HashMap<String, Object>();
memoryMap.put("total", mem.getTotal() / 1024L);
memoryMap.put("used", mem.getUsed() / 1024L);
memoryMap.put("free", mem.getFree() / 1024L);
info.setCpuPercMap(cpuPercMap);
info.setMemoryMap(memoryMap);
ctx.writeAndFlush(info); } catch (Exception e) {
e.printStackTrace();
}
} public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if (heartBeat != null) {
heartBeat.cancel(true);
heartBeat = null;
}
ctx.fireExceptionCaught(cause);
} }
} public final class MarshallingCodeCFactory { /**
* 创建Jboss Marshalling解码器MarshallingDecoder
*
* @return MarshallingDecoder
*/
public static MarshallingDecoder buildMarshallingDecoder() {
// 首先通过Marshalling工具类的精通方法获取Marshalling实例对象 参数serial标识创建的是java序列化工厂对象。
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
// 创建了MarshallingConfiguration对象,配置了版本号为5
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
// 根据marshallerFactory和configuration创建provider
UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
// 构建Netty的MarshallingDecoder对象,俩个参数分别为provider和单个消息序列化后的最大长度
MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024 * 1024 * 1);
return decoder;
} /**
* 创建Jboss Marshalling编码器MarshallingEncoder
*
* @return MarshallingEncoder
*/
public static MarshallingEncoder buildMarshallingEncoder() {
final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfiguration configuration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
// 构建Netty的MarshallingEncoder对象,MarshallingEncoder用于实现序列化接口的POJO对象序列化为二进制数组
MarshallingEncoder encoder = new MarshallingEncoder(provider);
return encoder;
}
} public class RequestInfo implements Serializable { private String ip;
private HashMap<String, Object> cpuPercMap;
private HashMap<String, Object> memoryMap;
// .. other field public String getIp() {
return ip;
} public void setIp(String ip) {
this.ip = ip;
} public HashMap<String, Object> getCpuPercMap() {
return cpuPercMap;
} public void setCpuPercMap(HashMap<String, Object> cpuPercMap) {
this.cpuPercMap = cpuPercMap;
} public HashMap<String, Object> getMemoryMap() {
return memoryMap;
} public void setMemoryMap(HashMap<String, Object> memoryMap) {
this.memoryMap = memoryMap;
} }

当client刚刚连接的时候,会发送认证信息到server端认证,认证通过后再定时发送心跳包。

最新文章

  1. JavaScript模板引擎artTemplate.js——如何引入模板引擎?
  2. Freemarker中日期时间格式出错
  3. 【转】javascript变量声明 及作用域
  4. web攻防
  5. Erlang垃圾回收机制的二三事
  6. 一:【nopcommerce系列】Nop整体架构的简单介绍,在看nop代码之前,你需要懂哪些东西
  7. js方式找出数组中重复数最多的那个数,并返回该数以及重复次数
  8. SSM框架学习之高并发秒杀业务--笔记3-- Service层
  9. Android带多选功能的PhotoPicker
  10. 访问svc 文件,编译器错误消息: CS0016,未能写入输出文件
  11. poj 1364 King(差分约束)
  12. jQuery—一些常见方法(3)【width(),innerWidth(),outerWidth()】
  13. c++文件流
  14. Contest - 第10届“新秀杯”ACM程序设计大赛网络资格赛 赛后信息(晋级名单&#183;正式版)
  15. Android——编译odex保护
  16. C# 运算符 if
  17. Sass使用教程
  18. LPC1768定时器普通定时
  19. weiphp的学习
  20. Android-重新包装Toast,自定义背景

热门文章

  1. (二)Python脚本开头两行的:#!/usr/bin/python和# -*- coding: utf-8 -*-的作用
  2. Vue.Draggable实现拖拽效果(快速使用)
  3. hdu 5050 java程序求大数最大公约数
  4. 最小公倍数LCM
  5. resin4开启jmx
  6. 工作流框架Activiti
  7. Ubuntu 16.04修复PDF默认使用ImageMagick打开无法设置其它默认的问题(默认打开程序设置)
  8. Ubuntu 16.04安装设备管理器Hardinfo和lshw设备信息命令
  9. RESTFUL 和SOA初探
  10. 【python】range的用法