今天研究的是,心跳和重连,虽然这次是大神写的代码,但是万变不离其宗,我们先回顾一下Netty应用心跳和重连的整个过程:

1)客户端连接服务端

2)在客户端的的ChannelPipeline中加入一个比较特殊的IdleStateHandler,设置一下客户端的写空闲时间,例如5s

3)当客户端的所有ChannelHandler中4s内没有write事件,则会触发userEventTriggered方法(上文介绍过)

4)我们在客户端的userEventTriggered中对应的触发事件下发送一个心跳包给服务端,检测服务端是否还存活,防止服务端已经宕机,客户端还不知道

5)同样,服务端要对心跳包做出响应,其实给客户端最好的回复就是“不回复”,这样可以服务端的压力,假如有10w个空闲Idle的连接,那么服务端光发送心跳回复,则也是费事的事情,那么怎么才能告诉客户端它还活着呢,其实很简单,因为5s服务端都会收到来自客户端的心跳信息,那么如果10秒内收不到,服务端可以认为客户端挂了,可以close链路

6)加入服务端因为什么因素导致宕机的话,就会关闭所有的链路链接,所以作为客户端要做的事情就是短线重连

以上描述的就是整个心跳和重连的整个过程,虽然很简单,上一篇blog也写了一个Demo,简单地做了一下上述功能

要写工业级的Netty心跳重连的代码,需要解决一下几个问题:

1)ChannelPipeline中的ChannelHandlers的维护,首次连接和重连都需要对ChannelHandlers进行管理

2)重连对象的管理,也就是bootstrap对象的管理

3)重连机制编写

完整的代码:https://github.com/BazingaLyn/netty-study/tree/master/src/main/java/com/lyncc/netty/idle

下面我们就看大神是如何解决这些问题的,首先先定义一个接口ChannelHandlerHolder,用来保管ChannelPipeline中的Handlers的

  1. package com.lyncc.netty.idle;
  2. import io.netty.channel.ChannelHandler;
  3. /**
  4. *
  5. * 客户端的ChannelHandler集合,由子类实现,这样做的好处:
  6. * 继承这个接口的所有子类可以很方便地获取ChannelPipeline中的Handlers
  7. * 获取到handlers之后方便ChannelPipeline中的handler的初始化和在重连的时候也能很方便
  8. * 地获取所有的handlers
  9. */
  10. public interface ChannelHandlerHolder {
  11. ChannelHandler[] handlers();
  12. }

我们再来编写我们熟悉的服务端的ServerBootstrap的编写:

HeartBeatServer.java

  1. package com.lyncc.netty.idle;
  2. import io.netty.bootstrap.ServerBootstrap;
  3. import io.netty.channel.ChannelFuture;
  4. import io.netty.channel.ChannelInitializer;
  5. import io.netty.channel.ChannelOption;
  6. import io.netty.channel.EventLoopGroup;
  7. import io.netty.channel.nio.NioEventLoopGroup;
  8. import io.netty.channel.socket.SocketChannel;
  9. import io.netty.channel.socket.nio.NioServerSocketChannel;
  10. import io.netty.handler.codec.string.StringDecoder;
  11. import io.netty.handler.codec.string.StringEncoder;
  12. import io.netty.handler.logging.LogLevel;
  13. import io.netty.handler.logging.LoggingHandler;
  14. import io.netty.handler.timeout.IdleStateHandler;
  15. import java.net.InetSocketAddress;
  16. import java.util.concurrent.TimeUnit;
  17. public class HeartBeatServer {
  18. private final AcceptorIdleStateTrigger idleStateTrigger = new AcceptorIdleStateTrigger();
  19. private int port;
  20. public HeartBeatServer(int port) {
  21. this.port = port;
  22. }
  23. public void start() {
  24. EventLoopGroup bossGroup = new NioEventLoopGroup(1);
  25. EventLoopGroup workerGroup = new NioEventLoopGroup();
  26. try {
  27. ServerBootstrap sbs = new ServerBootstrap().group(bossGroup, workerGroup)
  28. .channel(NioServerSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO))
  29. .localAddress(new InetSocketAddress(port)).childHandler(new ChannelInitializer<SocketChannel>() {
  30. protected void initChannel(SocketChannel ch) throws Exception {
  31. ch.pipeline().addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS));
  32. ch.pipeline().addLast(idleStateTrigger);
  33. ch.pipeline().addLast("decoder", new StringDecoder());
  34. ch.pipeline().addLast("encoder", new StringEncoder());
  35. ch.pipeline().addLast(new HeartBeatServerHandler());
  36. };
  37. }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
  38. // 绑定端口,开始接收进来的连接
  39. ChannelFuture future = sbs.bind(port).sync();
  40. System.out.println("Server start listen at " + port);
  41. future.channel().closeFuture().sync();
  42. } catch (Exception e) {
  43. bossGroup.shutdownGracefully();
  44. workerGroup.shutdownGracefully();
  45. }
  46. }
  47. public static void main(String[] args) throws Exception {
  48. int port;
  49. if (args.length > 0) {
  50. port = Integer.parseInt(args[0]);
  51. } else {
  52. port = 8080;
  53. }
  54. new HeartBeatServer(port).start();
  55. }
  56. }

单独写一个AcceptorIdleStateTrigger,其实也是继承ChannelInboundHandlerAdapter,重写userEventTriggered方法,因为客户端是write,那么服务端自然是read,设置的状态就是IdleState.READER_IDLE,源码如下:

  1. package com.lyncc.netty.idle;
  2. import io.netty.channel.ChannelHandler;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import io.netty.handler.timeout.IdleState;
  6. import io.netty.handler.timeout.IdleStateEvent;
  7. @ChannelHandler.Sharable
  8. public class AcceptorIdleStateTrigger extends ChannelInboundHandlerAdapter {
  9. @Override
  10. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  11. if (evt instanceof IdleStateEvent) {
  12. IdleState state = ((IdleStateEvent) evt).state();
  13. if (state == IdleState.READER_IDLE) {
  14. throw new Exception("idle exception");
  15. }
  16. } else {
  17. super.userEventTriggered(ctx, evt);
  18. }
  19. }
  20. }

HeartBeatServerHandler就是一个很简单的自定义的Handler,不是重点:

  1. package com.lyncc.netty.idle;
  2. import io.netty.channel.ChannelHandlerContext;
  3. import io.netty.channel.ChannelInboundHandlerAdapter;
  4. public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter {
  5. @Override
  6. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  7. System.out.println("server channelRead..");
  8. System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString());
  9. }
  10. @Override
  11. public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
  12. cause.printStackTrace();
  13. ctx.close();
  14. }
  15. }

接下来就是重点,我们需要写一个类,这个类可以去观察链路是否断了,如果断了,进行循环的断线重连操作,ConnectionWatchdog,顾名思义,链路检测狗,我们先看完整代码:

  1. package com.lyncc.netty.idle;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.Channel;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.ChannelFutureListener;
  6. import io.netty.channel.ChannelHandler.Sharable;
  7. import io.netty.channel.ChannelHandlerContext;
  8. import io.netty.channel.ChannelInboundHandlerAdapter;
  9. import io.netty.channel.ChannelInitializer;
  10. import io.netty.util.Timeout;
  11. import io.netty.util.Timer;
  12. import io.netty.util.TimerTask;
  13. import java.util.concurrent.TimeUnit;
  14. /**
  15. *
  16. * 重连检测狗,当发现当前的链路不稳定关闭之后,进行12次重连
  17. */
  18. @Sharable
  19. public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask ,ChannelHandlerHolder{
  20. private final Bootstrap bootstrap;
  21. private final Timer timer;
  22. private final int port;
  23. private final String host;
  24. private volatile boolean reconnect = true;
  25. private int attempts;
  26. public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port,String host, boolean reconnect) {
  27. this.bootstrap = bootstrap;
  28. this.timer = timer;
  29. this.port = port;
  30. this.host = host;
  31. this.reconnect = reconnect;
  32. }
  33. /**
  34. * channel链路每次active的时候,将其连接的次数重新☞ 0
  35. */
  36. @Override
  37. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  38. System.out.println("当前链路已经激活了,重连尝试次数重新置为0");
  39. attempts = 0;
  40. ctx.fireChannelActive();
  41. }
  42. @Override
  43. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  44. System.out.println("链接关闭");
  45. if(reconnect){
  46. System.out.println("链接关闭,将进行重连");
  47. if (attempts < 12) {
  48. attempts++;
  49. //重连的间隔时间会越来越长
  50. int timeout = 2 << attempts;
  51. timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS);
  52. }
  53. }
  54. ctx.fireChannelInactive();
  55. }
  56. public void run(Timeout timeout) throws Exception {
  57. ChannelFuture future;
  58. //bootstrap已经初始化好了,只需要将handler填入就可以了
  59. synchronized (bootstrap) {
  60. bootstrap.handler(new ChannelInitializer<Channel>() {
  61. @Override
  62. protected void initChannel(Channel ch) throws Exception {
  63. ch.pipeline().addLast(handlers());
  64. }
  65. });
  66. future = bootstrap.connect(host,port);
  67. }
  68. //future对象
  69. future.addListener(new ChannelFutureListener() {
  70. public void operationComplete(ChannelFuture f) throws Exception {
  71. boolean succeed = f.isSuccess();
  72. //如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连
  73. if (!succeed) {
  74. System.out.println("重连失败");
  75. f.channel().pipeline().fireChannelInactive();
  76. }else{
  77. System.out.println("重连成功");
  78. }
  79. }
  80. });
  81. }
  82. }

稍微分析一下:

1)继承了ChannelInboundHandlerAdapter,说明它也是Handler,也对,作为一个检测对象,肯定会放在链路中,否则怎么检测

2)实现了2个接口,TimeTask,ChannelHandlerHolder

①TimeTask,我们就要写run方法,这应该是一个定时任务,这个定时任务做的事情应该是重连的工作

②ChannelHandlerHolder的接口,这个接口我们刚才说过是维护的所有的Handlers,因为在重连的时候需要获取Handlers

3)bootstrap对象,重连的时候依旧需要这个对象

4)当链路断开的时候会触发channelInactive这个方法,也就说触发重连的导火索是从这边开始的

好了,我们这边再写次核心的HeartBeatsClient的代码:

  1. package com.lyncc.netty.idle;
  2. import io.netty.bootstrap.Bootstrap;
  3. import io.netty.channel.Channel;
  4. import io.netty.channel.ChannelFuture;
  5. import io.netty.channel.ChannelHandler;
  6. import io.netty.channel.ChannelInitializer;
  7. import io.netty.channel.EventLoopGroup;
  8. import io.netty.channel.nio.NioEventLoopGroup;
  9. import io.netty.channel.socket.nio.NioSocketChannel;
  10. import io.netty.handler.codec.string.StringDecoder;
  11. import io.netty.handler.codec.string.StringEncoder;
  12. import io.netty.handler.logging.LogLevel;
  13. import io.netty.handler.logging.LoggingHandler;
  14. import io.netty.handler.timeout.IdleStateHandler;
  15. import io.netty.util.HashedWheelTimer;
  16. import java.util.concurrent.TimeUnit;
  17. public class HeartBeatsClient {
  18. protected final HashedWheelTimer timer = new HashedWheelTimer();
  19. private Bootstrap boot;
  20. private final ConnectorIdleStateTrigger idleStateTrigger = new ConnectorIdleStateTrigger();
  21. public void connect(int port, String host) throws Exception {
  22. EventLoopGroup group = new NioEventLoopGroup();
  23. boot = new Bootstrap();
  24. boot.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO));
  25. final ConnectionWatchdog watchdog = new ConnectionWatchdog(boot, timer, port,host, true) {
  26. public ChannelHandler[] handlers() {
  27. return new ChannelHandler[] {
  28. this,
  29. new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS),
  30. idleStateTrigger,
  31. new StringDecoder(),
  32. new StringEncoder(),
  33. new HeartBeatClientHandler()
  34. };
  35. }
  36. };
  37. ChannelFuture future;
  38. //进行连接
  39. try {
  40. synchronized (boot) {
  41. boot.handler(new ChannelInitializer<Channel>() {
  42. //初始化channel
  43. @Override
  44. protected void initChannel(Channel ch) throws Exception {
  45. ch.pipeline().addLast(watchdog.handlers());
  46. }
  47. });
  48. future = boot.connect(host,port);
  49. }
  50. // 以下代码在synchronized同步块外面是安全的
  51. future.sync();
  52. } catch (Throwable t) {
  53. throw new Exception("connects to  fails", t);
  54. }
  55. }
  56. /**
  57. * @param args
  58. * @throws Exception
  59. */
  60. public static void main(String[] args) throws Exception {
  61. int port = 8080;
  62. if (args != null && args.length > 0) {
  63. try {
  64. port = Integer.valueOf(args[0]);
  65. } catch (NumberFormatException e) {
  66. // 采用默认值
  67. }
  68. }
  69. new HeartBeatsClient().connect(port, "127.0.0.1");
  70. }
  71. }

也稍微说明一下:

1)创建了ConnectionWatchdog对象,自然要实现handlers方法

2)初始化好bootstrap对象

3)4秒内没有写操作,进行心跳触发,也就是IdleStateHandler这个方法

最后ConnectorIdleStateTrigger这个类

  1. package com.lyncc.netty.idle;
  2. import io.netty.buffer.ByteBuf;
  3. import io.netty.buffer.Unpooled;
  4. import io.netty.channel.ChannelHandler.Sharable;
  5. import io.netty.channel.ChannelHandlerContext;
  6. import io.netty.channel.ChannelInboundHandlerAdapter;
  7. import io.netty.handler.timeout.IdleState;
  8. import io.netty.handler.timeout.IdleStateEvent;
  9. import io.netty.util.CharsetUtil;
  10. @Sharable
  11. public class ConnectorIdleStateTrigger extends ChannelInboundHandlerAdapter {
  12. private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat",
  13. CharsetUtil.UTF_8));
  14. @Override
  15. public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
  16. if (evt instanceof IdleStateEvent) {
  17. IdleState state = ((IdleStateEvent) evt).state();
  18. if (state == IdleState.WRITER_IDLE) {
  19. // write heartbeat to server
  20. ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate());
  21. }
  22. } else {
  23. super.userEventTriggered(ctx, evt);
  24. }
  25. }
  26. }

HeartBeatClientHandler.java(不是重点)

  1. package com.lyncc.netty.idle;
  2. import io.netty.channel.ChannelHandler.Sharable;
  3. import io.netty.channel.ChannelHandlerContext;
  4. import io.netty.channel.ChannelInboundHandlerAdapter;
  5. import io.netty.util.ReferenceCountUtil;
  6. import java.util.Date;
  7. @Sharable
  8. public class HeartBeatClientHandler extends ChannelInboundHandlerAdapter {
  9. @Override
  10. public void channelActive(ChannelHandlerContext ctx) throws Exception {
  11. System.out.println("激活时间是:"+new Date());
  12. System.out.println("HeartBeatClientHandler channelActive");
  13. ctx.fireChannelActive();
  14. }
  15. @Override
  16. public void channelInactive(ChannelHandlerContext ctx) throws Exception {
  17. System.out.println("停止时间是:"+new Date());
  18. System.out.println("HeartBeatClientHandler channelInactive");
  19. }
  20. @Override
  21. public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
  22. String message = (String) msg;
  23. System.out.println(message);
  24. if (message.equals("Heartbeat")) {
  25. ctx.write("has read message from server");
  26. ctx.flush();
  27. }
  28. ReferenceCountUtil.release(msg);
  29. }
  30. }

好了,到此为止,所有的代码都贴完了,我们做一个简单的测试,按照常理,如果不出任何状况的话,客户端4秒发送心跳,服务端5秒才验证是不会断连的,所以我们在启动之后,关闭服务端,然后再次重启服务端

首先启动服务端,控制台如下:

启动客户端,控制台如下:

客户端启动之后,服务端的控制台:

关闭服务端后,客户端控制台:

重启启动服务端:

重连成功~

最新文章

  1. C#设计模式学习笔记-单例模式
  2. mui,css3 querySelector,appendChild,style.display,insertBefore
  3. maven配置阿里云镜像
  4. jquery左右滑动效果的实现
  5. 数据结构和算法 &ndash; 10.集合
  6. C++中嵌入Lua脚本环境搭建
  7. dede织梦后台如何修改?如何增加删除菜单?(
  8. [vb.net]XML File Parsing in VB.NET
  9. 解决java.io.IOException: Cannot run program &quot;cygpath&quot;: CreateProcess error=2, 系统找不到指定的文件 的错误
  10. 保存mysql用户的登录信息到~.my.cnf文件;用于方便登录操作。
  11. COB對PCB設計的要求
  12. 运动检测(前景检测)之(二)混合高斯模型GMM
  13. golang高性能RPC:Apache Thrift安装使用完全攻略
  14. 查看局域网指定IP的电脑名
  15. php memcached 扩展
  16. npm是什么
  17. Caffe 碎碎念
  18. Java项目性能持续优化中……
  19. hdu5289 2015多校联合第一场1002 Assignment
  20. window 更新 nodejs

热门文章

  1. python的基础
  2. Redis学习总结之一——Redis初入
  3. 使用阿里云加速docker镜像的安装
  4. spring加载jar包中多个配置文件(转载)
  5. HDU2825 Wireless Password 【AC自动机】【状压DP】
  6. VC中键盘键的对应关系
  7. python3 的字符串格式判断
  8. 接口测试基础——第7篇 Python中_、__、__func__之间的区别
  9. 在 CentOS 7.2 上安装 ODOO 10 (2018-10-09 持续更新)
  10. Vim使用YouCompleteMe达到类似IDE的代码提示、补全,以及其他实用设置