前面经过channel初始化、注册,所需要的数据结构(epoll_event)基本上准备好了,serverSocket也处于监听状态,可以接收来自客户端的请求了。NioServerSocketChannel注册在了NioEventLoop#selector,在注册过程中启动了NioEventLoop,run方法会循环执行,每次循环都会执行select和执行所有的task。如果select有事件,则会处理收到的事件。

private void processSelectedKeys() {
if (selectedKeys != null) {
// 是否使用优化过的selectionKey
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

前面在NioEventLoop初始化的时候说过关于selectionKey优化的问题,这里不再赘述。两种方式主要是遍历selectionKey的方式不同,具体处理事件的调用是一样的。这里以processSelectedKeysOptimized为例。

accept

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// channel是NioServerSocketChannel
// unsafe是NioMessageUnsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 省略中间代码...
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 调用NioMessageUnsafe.read
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
} public void read() {
// 省略中间代码...
// 由于是ServerSocket,只负责accept,如果有IO事件说明就是有新的客户端连接,所以这里就是创建NioSocketChannel
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
} allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
} int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// 注册刚刚创建的NioSocketChannel
pipeline.fireChannelRead(readBuf.get(i));
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();
// 省略中间代码...
}
} protected int doReadMessages(List<Object> buf) throws Exception {
// 调用java.nio.channels.ServerSocketChannel#accept来创建SocketChannel
SocketChannel ch = SocketUtils.accept(javaChannel()); try {
if (ch != null) {
// 创建NioSocketChannel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
// 省略中间代码...
} return 0;
}

上面创建了NioSocketChannel之后,接下来注册所有客户端连接的NioSocketChannel,调用的是DefaultChannelPipeline#fireChannelRead方法,接下来是执行pipeline中的handler,在初始化的时候添加了LoggingHandler (如果启动的时候配置了的话),那么目前pipeline中的handler有

  • io.netty.channel.DefaultChannelPipeline$HeadContext:pipeline创建的时候默认的第一个handler
  • io.netty.handler.logging.LoggingHandler:启动的时候用户配置的handler
  • io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor
  • io.netty.channel.DefaultChannelPipeline$TailContext:pipeline创建的时候默认的最后一个handler

下面看下ServerBootstrap$ServerBootstrapAcceptor是什么时候添加到handler的

// io.netty.bootstrap.ServerBootstrap#init
// 这个方法是NioServerSocketChannel初始化的时候调用的
void init(Channel channel) throws Exception {
// 省略中间代码...
p.addLast(new ChannelInitializer<Channel>() {
@Override
public void initChannel(final Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
} ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 在pipeline中添加ServerBootstrapAcceptor
pipeline.addLast(new ServerBootstrapAcceptor(
ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
}

之所以说ServerBootstrapAcceptor,是因为NioSocketChannel的register过程是这个handler的channelRead方法开始的

public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); setChannelOptions(child, childOptions, logger); // 配置NioSocketChannel
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
} try {
// 这里childGroup就是一开始我们配置的workerGroup
// 所以调用的是io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

接下来的注册过程和NioServerSocketChannel的注册过程是类似的,创建socket,创建SelectionKeyImpl等。只不过NioSocketChannel不监听accept事件。

read

上面在接收到来自客户端的连接请求后,将NioSocketChannel注册到selector上,这个selector也是在NioEventLoop里面的,后面和这个客户端的通信都会通过这个channel进行,如果客户端发送来数据,也是selector收到读事件通知,然后调用processSelectedKey来处理read事件。

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// channel是NioSocketChannel
// unsafe是NioSocketChannelUnsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
// 省略中间代码...
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 调用NioByteUnsafe.read
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
} public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
// PooledByteBufAllocator,默认的内存申请管理器
final ByteBufAllocator allocator = config.getAllocator();
// AdaptiveRecvByteBufAllocator$HandleImpl
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config); ByteBuf byteBuf = null;
boolean close = false;
try {
do {
// 申请内存
byteBuf = allocHandle.allocate(allocator);
// 读取数据
allocHandle.lastBytesRead(doReadBytes(byteBuf));
if (allocHandle.lastBytesRead() <= 0) {
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;
if (close) {
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
} allocHandle.incMessagesRead(1);
readPending = false;
// 执行pipeline中的handler
pipeline.fireChannelRead(byteBuf);
byteBuf = null;
} while (allocHandle.continueReading());
// 省略中间代码
}
}

一般来说NioSocketChannel中的handler包括

  • io.netty.channel.DefaultChannelPipeline$HeadContext
  • org.lep.test.netty.protocol.custom.codec.NettyMessageDecoder:自定义的解码器
  • org.lep.test.netty.protocol.custom.codec.NettyMessageEncoder:自定义的编码器
  • org.lep.test.netty.protocol.custom.server.LoginAuthRespHandler:自定义的handler
  • org.lep.test.netty.protocol.custom.server.HeartBeatRespHandler:自定义的handler
  • io.netty.channel.DefaultChannelPipeline$TailContext

netty提供了一些基本的编解码功能,自己可以根据实际需要扩展使用,然后自定义自己的逻辑处理handler。

上面还涉及到内存的分配部分留在下一节介绍。

总结

read事件处理过程:

  1. 接收到read事件
  2. 分配内存,初始化buffer
  3. 调用channel.read将数据读取到buffer中
  4. 执行pipeline中的handler,包括了编解码的handler,自定义的handler来处理数据

最新文章

  1. Nginx限制访问速率和最大并发连接数模块--limit (防止DDOS攻击)
  2. my_strcpy()
  3. SQLServer更新语句要注意
  4. UE4 编译后 不能正常使用Open Level 打开关卡解决方案:Open Level Blueprint Node not workin
  5. C++之函数重载
  6. ORCLE INNODB 博客与 innodb_lru_scan_depth
  7. SQL性能优化工具TKPROF
  8. AndroidManifest.xml文件综合详解(转)
  9. QQ邮箱中转站文件即将过期时如何转存到微云
  10. java学习笔记 --- 数组
  11. Spark Streaming笔记——技术点汇总
  12. 通过SQL创建一个有主键自动递增有默认值不为空有注释的表
  13. 搭建简易的c语言与python语言CGI和Apache服务器的开发环境
  14. 在OS X系统中php访问sftp时需要ssh2扩展的安装
  15. Oracle—通过操作系统进程查看数据库sql语句
  16. HTML 浏览器抓包
  17. php &amp;符的写法
  18. 15. 3Sum C++
  19. Others - On Duty
  20. cpu高 load 高 内存高 io 高怎么排查

热门文章

  1. Qt5.7 实现Https 认证全过程解析(亲自动手版)
  2. 登录测试用例sql语句注入
  3. ionic常见问题及解决方案
  4. 【手记】小心在where中使用NEWID()的大坑
  5. Debian9桌面设置
  6. 微信公众号开发 包括服务器配置、java web项目搭建、tomcat手动发布web项目、微信开发所需的url和token验证 2017.12.2
  7. jquery选择器 看这个链接吧!2017.6.2
  8. python实现邮件的发送
  9. Android 打造任意层级树形控件 考验你的数据结构和设计
  10. Java 学习笔记 (八) Java 变量