继续基于我们之前的例子(参见netty5自定义私有协议实例),这次我们加上连接校验和心跳机制:

  

  只要校验通过,客户端发送心跳和业务消息是两个不同的事件发送的,彼此互不干扰。针对以上流程,我们需要增加4个handler:客户端请求handler、心跳handler ,服务端校验handler、心跳处理handler。当然,引导类也得添加上面对应的handler。上代码:

  新增客户端首次连接handler:

package com.wlf.netty.nettyclient.handler;

import com.wlf.netty.nettyapi.javabean.Header;
import com.wlf.netty.nettyapi.javabean.NettyMessage;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j; @Slf4j
public class ControlClientHandler extends ChannelHandlerAdapter { @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(buildControlReq());
} /**
* 在处理过程中引发异常时被调用
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("[Client] conrol request error: {}", cause.getMessage());
ctx.fireExceptionCaught(cause);
} /**
* 构造请求消息体
*
* @return
*/
private NettyMessage buildControlReq() {
NettyMessage nettyMessage = new NettyMessage();
Header header = new Header();
byte[] data = buildControlData();
header.setDelimiter(0xABEF0101);
header.setLength(data.length);
header.setType((byte) 0);
header.setReserved((byte) 0);
nettyMessage.setHeader(header); // 设置数据包
nettyMessage.setData(data);
return nettyMessage;
} /**
* 构造控制请求消息体
*
* @return
*/
private byte[] buildControlData() {
byte[] result = new byte[2]; result[0] = (byte) 1; result[1] = (byte) 16;
return result;
}
}

  服务端校验handler:

package com.wlf.netty.nettyserver.handler;

import com.wlf.netty.nettyapi.javabean.Header;
import com.wlf.netty.nettyapi.javabean.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; @Slf4j
public class ControlServerHandler extends ChannelHandlerAdapter { // 白名单列表
private String[] whiteList = new String[]{"127.0.0.1"}; private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<>(); @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage nettyMessage = (NettyMessage) msg; // 如果是控制数据格式请求消息,说明是客户端首次请求,校验白名单,否则进入下一个处理流程
if (nettyMessage.getHeader() != null &&
nettyMessage.getHeader().getType() == (byte) 0) {
String nodeIndex = ctx.channel().remoteAddress().toString();
NettyMessage controlResponse = null; if (nodeCheck.containsKey(nodeIndex)) {
log.warn("request ip : {} has requested.", nodeIndex);
controlResponse = buildResponse(false);
} else {
InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
String ip = address.getAddress().getHostAddress();
boolean isOK = false; for (String whiteIp : whiteList) {
if (ip.equals(whiteIp)) {
isOK = true;
break;
}
} if (isOK) {
nodeCheck.put(nodeIndex, true);
// 白名单校验通过,校验是否支持PCM格式
byte[] data = nettyMessage.getData();
ByteBuf buf = Unpooled.buffer(2);
buf.writeBytes(data);
byte sample = buf.readByte(); if (sample != (byte) 1) {
log.error("sample : {} is not 1", sample);
controlResponse = buildResponse(false);
} else {
controlResponse = buildResponse(true);
}
} else {
log.error("ip : {} is not in whiteList : {}.", ip, whiteList);
controlResponse = buildResponse(false);
} }
log.info("[server] The control response is : {}, data : {}", controlResponse, controlResponse.getData());
ctx.writeAndFlush(controlResponse);
} else {
ctx.fireChannelRead(msg);
}
} /**
* 在处理过程中引发异常时被调用
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("[server] control response error: {}", cause.getMessage());
// 删除缓存
nodeCheck.remove(ctx.channel().remoteAddress().toString());
ctx.fireExceptionCaught(cause);
} /**
* 构造响应消息体
*
* @param isOk
* @return
*/
private NettyMessage buildResponse(boolean isOk) {
NettyMessage nettyMessage = new NettyMessage();
Header header = new Header();
byte[] data = buildData(isOk);
header.setDelimiter(0xABEF0101);
header.setLength(data.length);
header.setType((byte) 0);
header.setReserved((byte) 0);
nettyMessage.setHeader(header);
nettyMessage.setData(data);
return nettyMessage;
} /**
* 构建控制数据格式响应消息体
*
* @param isOk
* @return
*/
private byte[] buildData(boolean isOk) {
ByteBuf result = null;
if (isOk) {
result = Unpooled.buffer(8);
// 生成sid
result.writeInt(buildSid()); // 心跳发送间隔,5000毫秒秒
result.writeInt(5000);
} else {
result = Unpooled.buffer(1);
result.writeByte((byte) -1);
} return result.array();
} private int buildSid() {
int max = 100, min = 1;
long randomNum = System.currentTimeMillis();
return (int) (randomNum % (max - min) + min);
}
}

  心跳客户端handler:

package com.wlf.netty.nettyclient.handler;

import com.wlf.netty.nettyapi.javabean.Header;
import com.wlf.netty.nettyapi.javabean.NettyMessage;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.concurrent.ScheduledFuture;
import lombok.extern.slf4j.Slf4j; import java.util.concurrent.TimeUnit; @Slf4j
public class HeartBeatClientHandler extends ChannelHandlerAdapter {
private volatile int interval = 5000;
private volatile ScheduledFuture<?> heartBeat; @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage nettyMessage = (NettyMessage) msg; // 接收控制数据响应消息成功,发送心跳给服务端
if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 0) {
byte[] data = nettyMessage.getData();
ByteBuf buf = Unpooled.buffer(8);
buf.writeBytes(data);
int sid = buf.readInt();
interval = buf.readInt();
log.info("[client] control response is OK, header : {}. sid : {}, interval : {}", nettyMessage.getHeader(), sid, interval); // 每interval(默认5000)豪秒发送一次心跳请求到服务端
heartBeat = ctx.executor().scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
NettyMessage heartBeat = buildHeartBeat(sid);
log.info("[client] Client send heart beat message to server : ----> {}", heartBeat);
ctx.writeAndFlush(heartBeat);
}
},
0, interval, TimeUnit.MILLISECONDS); // 消息继续向后传
ctx.fireChannelRead(msg);
} else {
       ctx.fireChannedRead(msg);
     }
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("[Client] heart request error: {}", cause.getMessage());
if (heartBeat != null) {
heartBeat.cancel(true);
heartBeat = null;
}
ctx.fireExceptionCaught(cause);
} /**
* 构造心跳请求消息体
*
* @return
*/
private NettyMessage buildHeartBeat(int sid) {
NettyMessage message = new NettyMessage();
Header header = new Header();
byte[] data = buildData(sid);
header.setDelimiter(0xABEF0101);
header.setLength(data.length);
header.setType((byte) 3);
header.setReserved((byte) 0);
message.setHeader(header); // 设置数据包
message.setData(data);
return message;
} /**
* 构建心跳响应消息体
*
* @param sid
* @return
*/
private byte[] buildData(int sid) {
ByteBuf result = Unpooled.buffer(4);
result.writeInt(sid);
return result.array();
} }

  服务端心跳handler:

package com.wlf.netty.nettyserver.handler;

import com.wlf.netty.nettyapi.constant.MessageType;
import com.wlf.netty.nettyapi.javabean.NettyMessage;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j; @Slf4j
public class HeartBeatServerHandler extends ChannelHandlerAdapter { @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage nettyMessage = (NettyMessage) msg; // 接收到心跳请求,打印心跳消息,否则进入下一处理流程
if (nettyMessage.getHeader() != null &&
nettyMessage.getHeader().getType() == (byte) 3) {
log.info("[server] Receive client heart beat message : ----> {}", nettyMessage);
} else {
ctx.fireChannelRead(msg);
}
} /**
* 在处理过程中引发异常时被调用
*
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
log.error("[server] heart response error: {}", cause.getMessage()); ctx.fireExceptionCaught(cause);
} }

  客户端发送业务消息NettyClientHandler修改,发送触发从channelAcitve事件改为channelRead事件:

    @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
NettyMessage nettyMessage = (NettyMessage) msg; // 接收控制数据响应消息成功,每5秒发送pcm数据
if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 0) {
ctx.writeAndFlush(buildClientRequest());
} // @Override
// public void channelActive(ChannelHandlerContext ctx) throws Exception {
// ctx.writeAndFlush(buildClientRequest());
// }

  客户端引导类NettyClient修改,新增handler:

    public void connect(int port, String host) throws Exception {
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new NettyMessageDecoder());
channel.pipeline().addLast(new NettyMessageEncoder());
channel.pipeline().addLast(new ControlClientHandler());
channel.pipeline().addLast(new HeartBeatClientHandler());
channel.pipeline().addLast(new NettyClientHandler());
}
});
ChannelFuture future = bootstrap.connect(host, port).sync();
future.channel().closeFuture().sync();
} finally {
workGroup.shutdownGracefully();
}
}

  服务端引导类修改:

    public void bind(int port) throws Exception {
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new NettyMessageDecoder());
channel.pipeline().addLast(new NettyMessageEncoder());
channel.pipeline().addLast(new ControlServerHandler());
channel.pipeline().addLast(new HeartBeatServerHandler());
channel.pipeline().addLast(new NettyServerHandler());
}
});
// 绑定端口
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}

  我们跑起来看看,先跑服务端再跑客户端:

  服务端输出:

22:55:33.725 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.ControlServerHandler - [server] The control response is : NettyMessage{header=Header{delimiter=-1410399999, length=8, type=0, reserved=0}, data=[B@65aac359}, data : [0, 0, 0, 18, 0, 0, 19, -120]
22:55:33.741 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - [server] server receive client message : NettyMessage{header=Header{delimiter=-1410399999, length=8, type=1, reserved=0}, data=[B@75f76fa3}
22:55:33.752 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - data length: 8
22:55:33.752 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - startTime: 1572105096532
22:55:38.747 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@77e50de}
22:55:43.746 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@75305725}
22:55:48.769 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@29515fc1}
22:55:53.754 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@1dfc1da8}
22:55:58.739 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@68f15669}
22:56:03.738 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@1f8fad2d}
22:56:08.742 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@3c83464a}

  客户端输出:

22:55:33.725 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] control response is OK, header : Header{delimiter=-1410399999, length=8, type=0, reserved=0}. sid : 18, interval : 5000
22:55:33.725 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@b1db7b2}
22:55:38.747 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@4958706c}
22:55:43.746 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@1fb0fa50}
22:55:48.769 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@4aeea171}
22:55:53.754 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@2c282fb9}
22:55:58.739 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@2679e140}
22:56:03.738 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@3e03eda5}
22:56:08.742 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@1d9363d}

  从输出可以看到心跳是正常的,业务消息也发送了,但有一点要注意,就是业务消息只有一条,而且在心跳之前发送给了客户端。如果业务消息也是阻塞性的,那么就会出现问题,详见netty5心跳与阻塞性业务消息分发实例

最新文章

  1. c# Linq查询
  2. Fragment中的onKeyDown事件让Activity处理--处理特殊按键比如移动终端扫描
  3. ElasticSearch之二——集群
  4. Swift开发第四篇——柯里化
  5. Linux LDAP Server---&gt;Clients配置
  6. Golang学习 - unicode/utf8 包
  7. [UML]UML之开篇
  8. ASP.NET注意事项
  9. 转载:JAVA 正则表达式 (超详细)
  10. 封装对Cookie和Session设置或取值的类
  11. aix Mysql安装 Oracle官方教程
  12. opencv2对于读书笔记——二值化——thresholded功能
  13. float 的不确定性
  14. cmd实现cab文件的压缩与解压
  15. linux服务器运维管理学习
  16. sql server 查询时会锁表吗?
  17. 继续封装个 Volley 组件
  18. String类源码解析
  19. centos6.5换yum源
  20. HBase使用压缩存储(snappy)

热门文章

  1. The server time zone value &#39;�й���׼ʱ��&#39; is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use
  2. 写一个python小程序
  3. 浮点数转成字符串函数gcvt()
  4. Centos7 minimal 安装npm
  5. Kafka kSQL sql查询
  6. AGC 030 B - Tree Burning 结论+枚举
  7. CF696B Puzzles 概率期望
  8. EL获取域中的数据
  9. PHP实现系统编程(一) --- 网络Socket及IO多路复用【网摘】
  10. Python学习之--用户输入以及运算