IdleStateHandler是Netty为我们提供的检测连接有效性的处理器,一共有读空闲,写空闲,读/写空闲三种监测机制。

将其添加到我们的ChannelPipline中,便可以用来检测空闲。

先通过一段代码来学习下IdleStateHandler的用法:

ConnectStateHandler:(负责监测通道的各种状态并处理空闲事件IdleStateEvent)

 package com.insaneXs.netty.idlestate;

 import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent; public class ConnectStateHandler extends ChannelInboundHandlerAdapter { public ConnectStateHandler() {
super();
} @Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel Register");
super.channelRegistered(ctx);
} @Override
public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel Unregister");
super.channelUnregistered(ctx);
} @Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel Active");
super.channelActive(ctx);
} @Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel Inactive");
super.channelInactive(ctx);
} @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("Channel Read");
super.channelRead(ctx, msg);
} @Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
System.out.println("Channel Read Complete");
super.channelReadComplete(ctx);
} @Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if(evt instanceof IdleStateEvent){
if(((IdleStateEvent)evt).state().equals(IdleState.READER_IDLE)){
System.out.println("Read Idle");
ctx.close();
}
}else{
super.userEventTriggered(ctx, evt);
}
} @Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
super.channelWritabilityChanged(ctx);
} @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}
}

服务器代码:

 package com.insaneXs.netty.idlestate;

 import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.timeout.IdleStateHandler; public class NettyServer { public static void main(String[] args){
EventLoopGroup boss = new NioEventLoopGroup(1);
EventLoopGroup work = new NioEventLoopGroup(4); try {
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(boss,work)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(30, 0, 0));
pipeline.addLast(new ConnectStateHandler());
}
}); bootstrap.bind(8322).sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

测试客户端代码:

 package com.insaneXs.netty.common;

 import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel; public class CommonNettyClient { public static void main(String[] args){
EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress("localhost", 8322)
.handler(new ChannelInboundHandlerAdapter()); bootstrap.connect();
}
}

测试结果:

从上面的输出中我们可以看到Channel的状态变化:

1.连接建立时会从register -> active,

2.当读空闲时,我们产生了一个空闲事件,当ConnectStateHandler捕获这个事件后,会主动断开连接。

3.断开时则是从inactive -> unregister。

接下来我们学习下IdleStateHandler源码:

 /*
* Copyright 2012 The Netty Project
*
* The Netty Project licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package io.netty.handler.timeout; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.Channel.Unsafe;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundBuffer;
import io.netty.channel.ChannelPromise; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; /
*
* @see ReadTimeoutHandler
* @see WriteTimeoutHandler
*/
public class IdleStateHandler extends ChannelDuplexHandler {
private static final long MIN_TIMEOUT_NANOS = TimeUnit.MILLISECONDS.toNanos(1); // Not create a new ChannelFutureListener per write operation to reduce GC pressure.
private final ChannelFutureListener writeListener = new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
lastWriteTime = ticksInNanos();
firstWriterIdleEvent = firstAllIdleEvent = true;
}
}; //是否观察出站情况;默认false
private final boolean observeOutput; /*******三种超时情况管理*********/
//读超时时间
private final long readerIdleTimeNanos;
//写超时时间
private final long writerIdleTimeNanos;
//读或写超时时间
private final long allIdleTimeNanos; //读空闲定时任务,验证是否读超时
private ScheduledFuture<?> readerIdleTimeout;
//最后一次读时间
private long lastReadTime;
//是否第一次读超时
private boolean firstReaderIdleEvent = true; //写空闲定时任务,验证是否写超时
private ScheduledFuture<?> writerIdleTimeout;
//最后一次写超时
private long lastWriteTime;
//是否第一次写超时
private boolean firstWriterIdleEvent = true; //读或写超时定时任务
private ScheduledFuture<?> allIdleTimeout;
//是否读或写超时
private boolean firstAllIdleEvent = true; //处理器状态: 0-无状态, 1-初始化, 2-销毁
private byte state; // 0 - none, 1 - initialized, 2 - destroyed
//读状态标志
private boolean reading; /****用于观察输出情况*****/
private long lastChangeCheckTimeStamp;
private int lastMessageHashCode;
private long lastPendingWriteBytes; //设置超时时间;默认单位为秒
public IdleStateHandler(
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds) { this(readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds,
TimeUnit.SECONDS);
} //设置三种情况超时时间与时间单位
public IdleStateHandler(
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
this(false, readerIdleTime, writerIdleTime, allIdleTime, unit);
} //设置是否观察输出情况,三种情况超时时间以及时间单位
public IdleStateHandler(boolean observeOutput,
long readerIdleTime, long writerIdleTime, long allIdleTime,
TimeUnit unit) {
if (unit == null) {
throw new NullPointerException("unit");
} this.observeOutput = observeOutput; if (readerIdleTime <= 0) {
readerIdleTimeNanos = 0;
} else {
readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);
}
if (writerIdleTime <= 0) {
writerIdleTimeNanos = 0;
} else {
writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);
}
if (allIdleTime <= 0) {
allIdleTimeNanos = 0;
} else {
allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);
}
} /************将时间转换成毫秒级***************/
public long getReaderIdleTimeInMillis() {
return TimeUnit.NANOSECONDS.toMillis(readerIdleTimeNanos);
} public long getWriterIdleTimeInMillis() {
return TimeUnit.NANOSECONDS.toMillis(writerIdleTimeNanos);
} public long getAllIdleTimeInMillis() {
return TimeUnit.NANOSECONDS.toMillis(allIdleTimeNanos);
} //当处理器被添加时,视情况决定是否初始化
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isActive() && ctx.channel().isRegistered()) {
//判断channel是否已经激活和注册,避免
initialize(ctx);
} else {
// channelActive() event has not been fired yet. this.channelActive() will be invoked
// and initialization will occur there.
}
} //移除时,调用destroy销毁
@Override
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
destroy();
} //当通道被注册时,视情况决定是否初始化
@Override
public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
// Initialize early if channel is active already.
if (ctx.channel().isActive()) {
initialize(ctx);
}
super.channelRegistered(ctx);
} //当通道激活时,初始化
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// This method will be invoked only if this handler was added
// before channelActive() event is fired. If a user adds this handler
// after the channelActive() event, initialize() will be called by beforeAdd().
initialize(ctx);
super.channelActive(ctx);
} //通道不活跃时,调用destroy销毁
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
destroy();
super.channelInactive(ctx);
} //读事件
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//开启了读空闲监测 或 读写空闲检测
if (readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
//修改为正在读,并重置首次读写事件的标志位为true
reading = true;
firstReaderIdleEvent = firstAllIdleEvent = true;
}
ctx.fireChannelRead(msg);
} //读完成
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//如果开启了 读/读写标志 且 正在读
if ((readerIdleTimeNanos > 0 || allIdleTimeNanos > 0) && reading) {
//修改最后一次读取时间,重置正在读标识
lastReadTime = ticksInNanos();
reading = false;
}
ctx.fireChannelReadComplete();
} @Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
//如果开启了 写/读写标志
if (writerIdleTimeNanos > 0 || allIdleTimeNanos > 0) {
ctx.write(msg, promise.unvoid()).addListener(writeListener);
} else {
ctx.write(msg, promise);
}
} //初始化
private void initialize(ChannelHandlerContext ctx) {
//判断状态避免重复初始化
switch (state) {
case 1:
case 2:
return;
} state = 1;
//观察输出情况
initOutputChanged(ctx); //初始化最后一次读写时间
lastReadTime = lastWriteTime = ticksInNanos(); //根据超时时间,判断是否开启超时监测
if (readerIdleTimeNanos > 0) {
readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),
readerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (writerIdleTimeNanos > 0) {
writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),
writerIdleTimeNanos, TimeUnit.NANOSECONDS);
}
if (allIdleTimeNanos > 0) {
allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),
allIdleTimeNanos, TimeUnit.NANOSECONDS);
}
} /**
* This method is visible for testing!
*/
long ticksInNanos() {
return System.nanoTime();
} /**
* This method is visible for testing!
*/
ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {
return ctx.executor().schedule(task, delay, unit);
} //销毁
private void destroy() {
//更改状态 取消定时器
state = 2; if (readerIdleTimeout != null) {
readerIdleTimeout.cancel(false);
readerIdleTimeout = null;
}
if (writerIdleTimeout != null) {
writerIdleTimeout.cancel(false);
writerIdleTimeout = null;
}
if (allIdleTimeout != null) {
allIdleTimeout.cancel(false);
allIdleTimeout = null;
}
} /**
* Is called when an {@link IdleStateEvent} should be fired. This implementation calls
* {@link ChannelHandlerContext#fireUserEventTriggered(Object)}.
*/
protected void channelIdle(ChannelHandlerContext ctx, IdleStateEvent evt) throws Exception {
ctx.fireUserEventTriggered(evt);
} /**
* Returns a {@link IdleStateEvent}.
*/
protected IdleStateEvent newIdleStateEvent(IdleState state, boolean first) {
switch (state) {
case ALL_IDLE:
return first ? IdleStateEvent.FIRST_ALL_IDLE_STATE_EVENT : IdleStateEvent.ALL_IDLE_STATE_EVENT;
case READER_IDLE:
return first ? IdleStateEvent.FIRST_READER_IDLE_STATE_EVENT : IdleStateEvent.READER_IDLE_STATE_EVENT;
case WRITER_IDLE:
return first ? IdleStateEvent.FIRST_WRITER_IDLE_STATE_EVENT : IdleStateEvent.WRITER_IDLE_STATE_EVENT;
default:
throw new IllegalArgumentException("Unhandled: state=" + state + ", first=" + first);
}
} /**
* @see #hasOutputChanged(ChannelHandlerContext, boolean)
*/
private void initOutputChanged(ChannelHandlerContext ctx) {
if (observeOutput) {
Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer();
//初始化消息内容的HashCode和byte
if (buf != null) {
lastMessageHashCode = System.identityHashCode(buf.current());
lastPendingWriteBytes = buf.totalPendingWriteBytes();
}
}
} //判断输出是否有变化
private boolean hasOutputChanged(ChannelHandlerContext ctx, boolean first) {
if (observeOutput) { // We can take this shortcut if the ChannelPromises that got passed into write()
// appear to complete. It indicates "change" on message level and we simply assume
// that there's change happening on byte level. If the user doesn't observe channel
// writability events then they'll eventually OOME and there's clearly a different
// problem and idleness is least of their concerns.
if (lastChangeCheckTimeStamp != lastWriteTime) {
lastChangeCheckTimeStamp = lastWriteTime; // But this applies only if it's the non-first call.
if (!first) {
return true;
}
} Channel channel = ctx.channel();
Unsafe unsafe = channel.unsafe();
ChannelOutboundBuffer buf = unsafe.outboundBuffer(); if (buf != null) {
int messageHashCode = System.identityHashCode(buf.current());
long pendingWriteBytes = buf.totalPendingWriteBytes(); if (messageHashCode != lastMessageHashCode || pendingWriteBytes != lastPendingWriteBytes) {
lastMessageHashCode = messageHashCode;
lastPendingWriteBytes = pendingWriteBytes; if (!first) {
return true;
}
}
}
} return false;
} //监测任务的父类
private abstract static class AbstractIdleTask implements Runnable { private final ChannelHandlerContext ctx; AbstractIdleTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
} @Override
public void run() {
if (!ctx.channel().isOpen()) {
return;
} run(ctx);
} protected abstract void run(ChannelHandlerContext ctx);
} //读监测定时任务
private final class ReaderIdleTimeoutTask extends AbstractIdleTask { ReaderIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
} @Override
protected void run(ChannelHandlerContext ctx) {
long nextDelay = readerIdleTimeNanos;
if (!reading) {//不在读的过程中
//计算是否超时
nextDelay -= ticksInNanos() - lastReadTime;
} if (nextDelay <= 0) {//已经超时
//下次空闲监测
readerIdleTimeout = schedule(ctx, this, readerIdleTimeNanos, TimeUnit.NANOSECONDS); boolean first = firstReaderIdleEvent;
firstReaderIdleEvent = false; try {
//出发读超时事件
IdleStateEvent event = newIdleStateEvent(IdleState.READER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
readerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
} //写监测定时任务
private final class WriterIdleTimeoutTask extends AbstractIdleTask { WriterIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
} @Override
protected void run(ChannelHandlerContext ctx) { long lastWriteTime = IdleStateHandler.this.lastWriteTime;
long nextDelay = writerIdleTimeNanos - (ticksInNanos() - lastWriteTime);
if (nextDelay <= 0) {//写超时
//下一次超时检查时间
writerIdleTimeout = schedule(ctx, this, writerIdleTimeNanos, TimeUnit.NANOSECONDS); boolean first = firstWriterIdleEvent;
firstWriterIdleEvent = false; try {
//输入内容是否发生变化:观察输出模式下且输出内容发生变化则不认为写超时
if (hasOutputChanged(ctx, first)) {
return;
}
//传递写超时事件
IdleStateEvent event = newIdleStateEvent(IdleState.WRITER_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
writerIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
} //读写监测定时任务
private final class AllIdleTimeoutTask extends AbstractIdleTask { AllIdleTimeoutTask(ChannelHandlerContext ctx) {
super(ctx);
} @Override
protected void run(ChannelHandlerContext ctx) { long nextDelay = allIdleTimeNanos;
if (!reading) {
nextDelay -= ticksInNanos() - Math.max(lastReadTime, lastWriteTime);
}
if (nextDelay <= 0) {
// Both reader and writer are idle - set a new timeout and
// notify the callback.
allIdleTimeout = schedule(ctx, this, allIdleTimeNanos, TimeUnit.NANOSECONDS); boolean first = firstAllIdleEvent;
firstAllIdleEvent = false; try {
if (hasOutputChanged(ctx, first)) {
return;
} IdleStateEvent event = newIdleStateEvent(IdleState.ALL_IDLE, first);
channelIdle(ctx, event);
} catch (Throwable t) {
ctx.fireExceptionCaught(t);
}
} else {
// Either read or write occurred before the timeout - set a new
// timeout with shorter delay.
allIdleTimeout = schedule(ctx, this, nextDelay, TimeUnit.NANOSECONDS);
}
}
}
}

代码的关键部分都已经给出了注释。

这里主要关注几个问题,带着这些问题去思考代码设计的目的:

1.代码中firstXXXIdleEvent的标志位的作用是什么?

2.ObserveOutput标志位的作用是什么?

3.reading的标志的作用是什么?

4.lastReadTime和lastWriteTime会在什么时候被重置?

首先来回答第一个问题,为什么会有firstXXXIdleEvent的标识。

这是因为一次读写事件中,可能会产生多次的空闲超时事件。比如当我设置写空闲时间为5秒,而在某些情况下,我整个写过程需要30秒。那么这一次写过程就会产生多个写空闲事件。firstXXXIdleEvent标志位就是用来表明此次空闲事件是否为第一次空闲事件。

知道了第一个问题后,我们在看ObserveOutput标志的作用。ObserveOutput标志用来解决上述的慢输出的问题。如果设置为true,那么即使写过程中发生了写空闲事件,但是只要hasOutputChanged方法判断此时仍然在向外写(写缓存发生变化),那么就不会为此次超时产生写超时事件。

那么reading标志的作用是什么?reading的标志是在read方法开始时,被设置为true,在readComplete方法中又被设置为false。也就是reading标志表示正在发生读的过程。

最后的问题,lastReadTime和lastWriteTime在什么时候被重置?lastReadTime和lastWriteTime会在相应的定时器中被用来和空闲时间作比较,以此来检测在这段空闲时间中,是否发生过完整的读或写过程。因此,它们在handler初始化时被初始化值。lastReadTime会在readComplete时更新值。而lastWriteTime则是在WriterListener中设置(写过程完成后的回掉中)。

最新文章

  1. SQL注入处理
  2. php基础_字符串
  3. MQTT V3.1--我的理解
  4. 54-locate 简明笔记
  5. 常用的Linux操作
  6. 【C++面试】常考题复习
  7. word2010 ctrl v not work
  8. 关于sql语句in的使用注意规则( 转)
  9. 常用jQuery选择器总结【转】
  10. IOS开发-ObjC-NSDictionary
  11. 实例化vue之前赋值html元素导致事件失效
  12. 模拟IC芯片设计开发的流程
  13. Tips_关闭按钮的简单实现 &amp;&amp; Felx实现水平垂直居中
  14. JAVA配置文件/反射操作
  15. Nginx模块开发与架构解析(nginx安装、配置说明)
  16. jQuery源码学习扒一扒jQuery对象初使化
  17. 安卓程序代写 网上程序代写[原]Android开发技巧--ListView
  18. 动态链接库编程:非MFC DLL
  19. Java跨语言调用,使用JNA访问Java外部接口
  20. elasticsearch 多列 聚合(sql group by)

热门文章

  1. 整数回文数判断 Python
  2. Oauth2.0详解,Oauth2.0协议原理
  3. Level Up - ICPC Southeastern Europe Contest 2019(简单DP)
  4. Javascript 入门 必备知识点
  5. 在IDEA导入maven项目
  6. intelij idea 和 eclipse 使用上的区别
  7. python3(三十五)file read write
  8. Atlas运行时资源不足报错 -bash: fork: retry: 资源暂时不可用 Out of system resources
  9. ConcurrentHashMap和 CopyOnWriteArrayList提供线程安全性和可伸缩性 以及 同步的集合类 Hashtable 和 Vector Collections.synchronizedMap 和 Collections.synchronizedList 区别缺点
  10. 2020年iOS进阶面试题总结(一)