NIO使用Reactor模式遇到的问题
2024-08-21 21:19:39
关于Reactor模式,不再多做介绍,推荐Doug Lea大神的教程:Java 可扩展的IO
本来在Reactor的构造方法中完成一系列操作是没有问题的:
public class Reactor implements Runnable {
private final Selector selector;
public Reactor() throws IOException {
selector = Selector.open();
String host = "127.0.0.1";
int port = 12345;
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(host, port));
serverChannel.configureBlocking(false);
serverChannel.register(selector, SelectionKey.OP_ACCEPT, new Acceptor(serverChannel, selector));
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
dispatch(it.next());
}
selected.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void dispatch(SelectionKey next) {
Runnable r = (Runnable) next.attachment();
if (r != null) {
r.run();
}
}
public void registerChannel(SelectableChannel channel, int ops) throws IOException {
if (channel instanceof ServerSocketChannel) {
ServerSocketChannel socketChannel = (ServerSocketChannel) channel;
channel.register(selector, ops, new Acceptor(socketChannel, selector));
}
}
}
然而有些参数需要在外层操作,我想这样弄:
public Reactor() throws IOException {
selector = Selector.open();
}
在主线程中启动server [reactorManager.start()来启动Reactor线程]
private void startReactorManager() throws IOException {
reactorManager = new ReactorManager();
reactorManager.start();
}
private void startNIOServer(ServerConfig serverConfig) throws IOException {
String host = serverConfig.getHost();
int port = serverConfig.getTcpPort();
ServerSocketChannel serverChannel = ServerSocketChannel.open();
serverChannel.socket().bind(new InetSocketAddress(host, port));
serverChannel.configureBlocking(false);
reactorManager.registerChannel(serverChannel, SelectionKey.OP_ACCEPT);
}
也就是先启动一个线程来Selector.open();然后在主线程中注册通道和事件。
结果一直无法监听到客户端的连接,跟踪才发现服务端的注册方法阻塞了,原因是锁的问题,具体还不清楚。
想一想,既然在构造函数中可以注册,放到main线程中却不行,那么是否我们可以在注册时检查,如果this是当前Reactor线程,就直接注册,这跟在构造函数中没有区别。
如果不是,就放到队列中,当然你这时放了,selector.select()一直阻塞着,你也无法取出来注册,那么我们可以利用selector.wakeup()唤醒它。
新的方案如下:
public class Reactor extends Thread {
private final Selector selector;
private LinkedBlockingQueue<Object[]> register = new LinkedBlockingQueue<>() ;//channel、ops、attach
private final AtomicBoolean wakeup = new AtomicBoolean() ;
public Reactor() throws IOException {
selector = Selector.open();
}
@Override
public void run() {
while (!Thread.interrupted()) {
try {
wakeup.set(false);
processRegister();
selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
Iterator<SelectionKey> it = selected.iterator();
while (it.hasNext()) {
dispatch(it.next());
}
selected.clear();
} catch (IOException e) {
e.printStackTrace();
}
}
}
private void processRegister() {
Object[] object;
while ((object = this.register.poll()) != null) {
try {
SelectableChannel channel = (SelectableChannel) object[0];
if (!channel.isOpen())
continue;
int ops = ((Integer) object[1]).intValue();
Object attachment = object[2];
channel.register(this.selector, ops, attachment);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void dispatch(SelectionKey next) {
Runnable r = (Runnable) next.attachment();
if (r != null) {
r.run();
}
}
public void registerChannel(SelectableChannel channel, int ops) throws IOException {
ServerSocketChannel serverChannel = null;
if (channel instanceof ServerSocketChannel) {
serverChannel = (ServerSocketChannel) channel;
}
Object attachment = new Acceptor(serverChannel, selector);
if (this == Thread.currentThread()) {
serverChannel.register(selector, ops, attachment);
} else {
this.register.offer(new Object[]{ channel, ops, attachment });
if (wakeup.compareAndSet(false, true)) {
this.selector.wakeup();
}
}
}
}
这样就解决了问题,在此记录下!
最新文章
- 二叉树的创建和遍历(C版和java版)
- ASP.NET Razor - C# 变量
- handler的理解笔记
- svg中改变class调用的线条颜色
- extern &ldquo;C&rdquo;原理,用法以及使用场景-2016.01.05
- Microsoft Office Excel 不能访问文件
- .net core 使用DES加密字符串
- Java学习之旅基础知识篇:面向对象之封装、继承及多态
- Chris Richardson微服务翻译:构建微服务之使用API网关
- Oracle的dual
- Errors running builder &#39;DeploymentBuilder&#39; on project &#39;工程名&#39;
- H5、React Native、Native应用对比分析
- 【Java编程】Eclipse快捷键
- node.js与ThreadLocal
- shell编写自动化安装dhcp服务
- HP Jack介绍
- docker删除镜像文件时,出现image is referenced in multiple repositories如何解决
- 64位的Sql Server使用OPENROWSET导入xlsx格式的excel数据的时候报错(转载)
- MySQL 删除数据
- 把MacBook里的电影导入到iPad