1.Java NIO服务端创建



import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.util.Iterator;
import java.util.Set; /**
* User: mihasya
* Date: Jul 25, 2010
* Time: 9:09:03 AM
public class JServer {
public static void main (String[] args) {
ServerSocketChannel sch = null;
Selector sel = null; try {
// setup the socket we're listening for connections on.
InetSocketAddress addr = new InetSocketAddress(8400);
sch = ServerSocketChannel.open();
// setup our selector and register the main socket on it
sel = Selector.open();
sch.register(sel, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
System.out.println("Couldn't setup server socket");
} // fire up the listener thread, pass it our selector
ListenerThread listener = new ListenerThread(sel);
} /*
* the thread is completely unnecessary, it could all just happen
* in main()
class ListenerThread extends Thread {
Selector sel = null;
ListenerThread(Selector sel) {
this.sel = sel;
} public void run() {
while (true) { // our canned response for now
ByteBuffer resp = ByteBuffer.wrap(new String("got it\n").getBytes());
try {
// loop over all the sockets that are ready for some activity
while (this.sel.select() > 0) {
Set keys = this.sel.selectedKeys();
Iterator i = keys.iterator();
while (i.hasNext()) {
SelectionKey key = (SelectionKey)i.next();
if (key.isAcceptable()) {
// this means that a new client has hit the port our main
// socket is listening on, so we need to accept the connection
// and add the new client socket to our select pool for reading
// a command later
System.out.println("Accepting connection!");
// this will be the ServerSocketChannel we initially registered
// with the selector in main()
ServerSocketChannel sch = (ServerSocketChannel)key.channel();
SocketChannel ch = sch.accept();
ch.register(this.sel, SelectionKey.OP_READ);
} else if (key.isReadable()) {
// one of our client sockets has received a command and
// we're now ready to read it in
System.out.println("Accepting command!");
SocketChannel ch = (SocketChannel)key.channel();
ByteBuffer buf = ByteBuffer.allocate(200);
Charset charset = Charset.forName("UTF-8");
CharsetDecoder decoder = charset.newDecoder();
CharBuffer cbuf = decoder.decode(buf);
// re-register this socket with the selector, this time
// for writing since we'll want to write something to it
// on the next go-around
ch.register(this.sel, SelectionKey.OP_WRITE);
} else if (key.isWritable()) {
// we are ready to send a response to one of the client sockets
// we had read a command from previously
System.out.println("Sending response!");
SocketChannel ch = (SocketChannel)key.channel();
// we may get another command from this guy, so prepare
// to read again. We could also close the channel, but
// that sort of defeats the whole purpose of doing async
ch.register(this.sel, SelectionKey.OP_READ);
} catch (IOException e) {
System.out.println("Error in poll loop");

从上面的代码可以看出java nio的通用步骤:




2. Netty服务端创建

2.1 打开ServerSocketChannel

            ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)


* Create a new instance
public NioServerSocketChannel() {


    private static ServerSocketChannel newSocket(SelectorProvider provider) {
try {
* Use the {@link SelectorProvider} to open {@link SocketChannel} and so remove condition in
* {@link SelectorProvider#provider()} which is called by each ServerSocketChannel.open() otherwise.
* See <a href="See https://github.com/netty/netty/issues/2308">#2308</a>.
return provider.openServerSocketChannel();
} catch (IOException e) {
throw new ChannelException(
"Failed to open a server socket.", e);

其中的provider.openServerSocketChannel()就是java nio的实现。设置非阻塞模式包含在父类中:

* Create a new instance
* @param parent the parent {@link Channel} by which this instance was created. May be {@code null}
* @param ch the underlying {@link SelectableChannel} on which it operates
* @param readInterestOp the ops to set to receive data from the {@link SelectableChannel}
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
} catch (IOException e) {
try {
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
"Failed to close a partially initialized socket.", e2);
} throw new ChannelException("Failed to enter non-blocking mode.", e);

2.2 打开多路复用器过程


protected void run() {
for (;;) {
boolean oldWakenUp = wakenUp.getAndSet(false);
try {
if (hasTasks()) {
} else {
select(oldWakenUp); // 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required). if (wakenUp.get()) {
} cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
} else {
final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
} if (isShuttingDown()) {
if (confirmShutdown()) {
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t); // Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
} catch (InterruptedException e) {
// Ignore.

2.2.1 绑定处理的key

    private void processSelectedKeys() {
if (selectedKeys != null) {
} else {


 private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
// check if the set is empty and if so just return to not create garbage by
// creating a new Iterator every time even if there is nothing to process.
// See https://github.com/netty/netty/issues/597
if (selectedKeys.isEmpty()) {
} Iterator<SelectionKey> i = selectedKeys.iterator();
for (;;) {
final SelectionKey k = i.next();
final Object a = k.attachment();
i.remove(); if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
} if (!i.hasNext()) {
} if (needsToSelectAgain) {
selectedKeys = selector.selectedKeys(); // Create the iterator again to avoid ConcurrentModificationException
if (selectedKeys.isEmpty()) {
} else {
i = selectedKeys.iterator();

2.3 绑定端口,接收请求:

            // Bind and start to accept incoming connections.
ChannelFuture f = b.bind(PORT).sync();


private ChannelFuture doBind(final SocketAddress localAddress) {
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
} if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.executor = channel.eventLoop();
doBind0(regFuture, channel, localAddress, promise);
return promise;


    private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) { // This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new OneTimeTask() {
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {


 private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
} try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops); unsafe.finishConnect();
} catch (CancelledKeyException ignored) {





