我们在使用netty的时候会使用一个参数,ChannelOption.SO_KEEPALIVE为true, 设置好了之后再Linux系统才会对keepalive生效,但是linux里边需要配置几个参数,tcp_keepalive_time, tcp_keepalive_invl, tcp_keepalive_probes,如果不配置的时候都会是默认值。

  tcp_keepalive_time 即给一个TCP连接发送心跳包最后的时间间隔某一段时间后继续发送心跳包,允许空闲的时间,然后再次发送心跳包,默认时间为7200秒,即2个小时发一次心跳包。

tcp_keepalive_invl,发送存活探测时候未收到对方回执的时候,需要间隔一段时间继续发送。默认为75秒。

  tcp_keepalive_probes,如果发了存活探测的时候没有收到对方的回执,那么需要继续发送探测的次数,此时默认值为9次,也就是未收到回执的时候需要发送9次。

  再理一次,间隔tcp_keepalive_time之后发送心跳探测,如果未收到对方回执的时候,需要间隔tcp_keepalive_invl设置的时间继续发送,一共需要发送tcp_keepalive_probes的次数。  

这个是Linux系统的配置,如果要使用Linux的此功能需要设置SO_KEEPALIVE为true,同时设置其他几个参数。系统默认的SO_KEEPALIVE为false。因为这些情况的差异,所以netty提供了自己实现心跳的机制。

  netty有心跳的实现方法 IdleStateHandler,其中有读空闲时间,写空闲时间,读写空闲时间,只要有一个满足条件会触发userEventTriggered方法。

public IdleStateHandler(
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds)

  定义个消息内容吧,长度为Type的长度1 + 实际内容的长度5 = 6。Length为2个字节,Type为1个类型。

  +----------+----------+----------------+
| Length |Type(byte)| Actual Content |
| 0x06 | 1 | "HELLO" |
+----------+----------+----------------+

  定义公共的inbound方法,用于进行channelRead, sendPing, sendPong, userEventTriggered 方法。

package com.hqs.heartbeat.common;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.timeout.IdleStateEvent; import java.util.concurrent.atomic.AtomicInteger; /**
* @author huangqingshi
* @Date 2019-05-11
*/
public abstract class CustomeHeartbeatHandler extends SimpleChannelInboundHandler<ByteBuf> { public static final byte PING = 1;
public static final byte PONG = 2;
public static final byte CUSTOM_MSG = 3; protected String name;
private AtomicInteger heartbeatCount = new AtomicInteger(0); public CustomeHeartbeatHandler(String name) {
this.name = name;
} @Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) throws Exception {
if(byteBuf.getByte(2) == PING) {
sendPong(channelHandlerContext);
} else if(byteBuf.getByte(2) == PONG) {
System.out.println("get pong msg from " + channelHandlerContext
.channel().remoteAddress());
} else {
handleData(channelHandlerContext, byteBuf);
}
} protected abstract void handleData(ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf); @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("channel read : " + msg);
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println(byteBuf.getByte(2));
super.channelRead(ctx, msg);
} protected void sendPong(ChannelHandlerContext channelHandlerContext) {
ByteBuf buf = channelHandlerContext.alloc().buffer(3);
buf.writeShort(3);
buf.writeByte(PONG);
channelHandlerContext.writeAndFlush(buf);
heartbeatCount.incrementAndGet();
System.out.println("send pong message to " + channelHandlerContext.channel().remoteAddress());
} protected void sendPing(ChannelHandlerContext channelHandlerContext) {
ByteBuf buf = channelHandlerContext.alloc().buffer(3);
buf.writeShort(3);
buf.writeByte(PING);
channelHandlerContext.writeAndFlush(buf);
heartbeatCount.incrementAndGet();
System.out.println("send ping message to " + channelHandlerContext.channel().remoteAddress());
} @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
IdleStateEvent e = (IdleStateEvent) evt;
switch (e.state()) {
case ALL_IDLE:
handlALLIdle(ctx);
break;
case READER_IDLE:
handlReadIdle(ctx);
break;
case WRITER_IDLE:
handlWriteIdle(ctx);
break;
default:
break;
}
}
} protected void handlReadIdle(ChannelHandlerContext channelHandlerContext) {
System.out.println("READ_IDLE---");
} protected void handlWriteIdle(ChannelHandlerContext channelHandlerContext) {
System.out.println("WRITE_IDLE---");
} protected void handlALLIdle(ChannelHandlerContext channelHandlerContext) {
System.out.println("ALL_IDLE---");
} @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel:" + ctx.channel().remoteAddress() + " is active");
} @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("channel:" + ctx.channel().remoteAddress() + " is inactive");
}
}

  定义Server的方法,设置读超时为10秒,采用固定长度方法进行内容分割:LengthFieldBasedFrameDecoder(1024, 0, 2, -2, 0),长度为1K 。一个主线程接收请求,四个线程处理请求。端口号设置为9999。

package com.hqs.heartbeat.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.timeout.IdleStateHandler; /**
* @author huangqingshi
* @Date 2019-05-11
*/
public class Server { public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(4); try {
ServerBootstrap bootstrapServer = new ServerBootstrap();
bootstrapServer.group(boss, worker).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline channelPipeline = ch.pipeline();
channelPipeline.addLast(new IdleStateHandler(10, 0, 0));
channelPipeline.addLast(new LengthFieldBasedFrameDecoder(1024, 0,2, -2, 0));
channelPipeline.addLast(new ServerHandler());
}
});
Channel channel = bootstrapServer.bind(9999).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}

  Server的handler的处理方法:

package com.hqs.heartbeat.server;

import com.hqs.heartbeat.common.CustomeHeartbeatHandler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext; /**
* @author huangqingshi
* @Date 2019-05-11
*/
public class ServerHandler extends CustomeHeartbeatHandler { public ServerHandler() {
super("server");
} @Override
protected void handleData(ChannelHandlerContext channelHandlerContext,
ByteBuf byteBuf) {
byte[] data = new byte[byteBuf.readableBytes() - 3];
ByteBuf responseBuf = Unpooled.copiedBuffer(byteBuf);
byteBuf.skipBytes(3);
byteBuf.readBytes(data);
String content = new String(data);
System.out.println(name + " get content : " + content);
channelHandlerContext.writeAndFlush(responseBuf);
} @Override
protected void handlReadIdle(ChannelHandlerContext channelHandlerContext) {
super.handlReadIdle(channelHandlerContext);
System.out.println(" client " + channelHandlerContext.channel().remoteAddress() + " reader timeout close it --");
channelHandlerContext.close();
}
}

  定义Client类,所有超时时间为5秒,如果5秒没有读写的话则发送ping,如果失去连接之后inactive了就会重新连接,采用10秒出发一次。

package com.hqs.heartbeat.client;

import com.hqs.heartbeat.common.CustomeHeartbeatHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.timeout.IdleStateHandler; import java.util.Random;
import java.util.concurrent.TimeUnit; /**
* @author huangqingshi
* @Date 2019-05-11
*/
public class Client { private NioEventLoopGroup workGroup = new NioEventLoopGroup(4);
private Channel channel;
private Bootstrap bootstrap; public static void main(String[] args) throws InterruptedException {
Client client = new Client();
client.start();
client.sendData();
} public void start() { try {
bootstrap = new Bootstrap(); bootstrap.group(workGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline channelPipeline = ch.pipeline()
.addLast(new IdleStateHandler(0,0,5))
.addLast(new LengthFieldBasedFrameDecoder(1024, 0, 2, -2, 0))
.addLast(new ClientHandler(Client.this));
}
});
doConnect();
} catch (Exception e) {
throw new RuntimeException(e);
}
} public void sendData() throws InterruptedException {
Random random = new Random(System.currentTimeMillis());
for(int i = 0; i < 10000; i++) {
if(channel != null && channel.isActive()) {
String content = "client msg " + i;
ByteBuf byteBuf = channel.alloc().buffer(3 + content.getBytes().length);
byteBuf.writeShort(3 + content.getBytes().length);
byteBuf.writeByte(CustomeHeartbeatHandler.CUSTOM_MSG);
byteBuf.writeBytes(content.getBytes());
channel.writeAndFlush(byteBuf);
} Thread.sleep(random.nextInt(20000)); } } public void doConnect() {
if(channel != null && channel.isActive()) {
return;
} ChannelFuture future = bootstrap
.connect("127.0.0.1", 9999);
future.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()) {
channel = future.channel();
System.out.println("connect to server successfully");
} else {
System.out.println("Failed to connect to server, try after 10s"); future.channel().eventLoop().schedule(new Runnable() {
@Override
public void run() {
doConnect();
}
}, 10, TimeUnit.SECONDS);
}
}
});
} }

  定义clientHandler方法,读取时跳过长度+类型 2+1 三个字节,然后获取消息。连接断开之后则进行重连。

package com.hqs.heartbeat.client;

import com.hqs.heartbeat.common.CustomeHeartbeatHandler;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext; /**
* @author huangqingshi
* @Date 2019-05-11
*/
public class ClientHandler extends CustomeHeartbeatHandler { private Client client; public ClientHandler(Client client) {
super("client");
this.client = client;
} @Override
protected void handleData(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
byte[] data = new byte[byteBuf.readableBytes() - 3];
byteBuf.skipBytes(3);
byteBuf.readBytes(data);
String content = new String(data);
System.out.println(name + " get content:" + content);
} @Override
protected void handlALLIdle(ChannelHandlerContext channelHandlerContext) {
super.handlALLIdle(channelHandlerContext);
sendPing(channelHandlerContext);
} @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
super.channelInactive(ctx);
client.doConnect();
}
}

  好了,总体的netty心跳实现机制就这么多,希望能帮助到大家。

  github地址:https://github.com/stonehqs/heartbeat

  

最新文章

  1. 结对项目:代码复审+PSP
  2. response 下载文件
  3. LinQ的增删改查
  4. js 中的算法题,那些经常看到的
  5. thinkphp模板调用函数用法
  6. A Swift Tour(4) - Objects and Classes
  7. Python 一路走来 HTML CSS Javascript
  8. 自写的LastPos,寻找字符串里的最后一个字符,RTL里没有提供这个函数——Delphi的String下标是从1开始的
  9. NIO 入门基础
  10. Css技术入门笔记02
  11. C++语言之析构函数与构造函数
  12. 【BZOJ5337】[TJOI2018]str(动态规划,哈希)
  13. https SSL主流数字证书都有哪些格式(转载)
  14. python 过滤四字节字符 表情字符
  15. 【题解】 Codeforces Edu44 F.Isomorphic Strings (字符串Hash)
  16. Jexus 网站服务器和 ASP.NET 跨平台开发
  17. PNG文件格式详解
  18. FTP地址
  19. 如何找到Firefox的收藏夹,就像IE一样,出现在网页的侧面
  20. Yii2框架加入API Modules

热门文章

  1. SAP流水号
  2. linux中查看python的安装路径
  3. python之virtualenv 与 virtualenvwrapper 详解
  4. PHP闭包详解
  5. laravel基础课程---5、路由复习(路由作用)
  6. storm--chuanzhiboke
  7. mysql数据库---编码格式基本操作
  8. hdu 4609 3-idiots —— FFT
  9. Ubuntu 16.04 LTS 一键安装VNC
  10. CDN网络原理