NioEventLoop的创建

NioEventLoop是netty及其重要的组成部件,它的首要职责就是为注册在它上的channels服务,发现这些channels上发生的新连接、读写等I/O事件,然后将事件转交 channel 流水线处理。使用netty时,我们首先要做的就是创建NioEventLoopGroup,这是一组NioEventLoop的集合,类似线程与线程池。通常,服务端会创建2个group,一个叫做bossGroup,一个叫做workerGroup。bossGroup负责监听绑定的端口,接受请求并创建新连接,初始化后交由workerGroup处理后续IO事件。

NioEventLoop和NioEventLoopGroup的类图

首先看看NioEventLoop和NioEventLoopGroup的类关系图





类多但不乱,可以发现三个特点:

  1. 两者都继承了ExecutorService,从而与线程池建立了联系
  2. NioEventLoop继承的都是SingleThread,NioEventLoop继承的是MultiThread
  3. NioEventLoop还继承了AbstractScheduledEventExecutor,不难猜出这是个和定时任务调度有关的线程池

NioEventLoopGroup的创建

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

我们先看看bossGroup和workerGroup的构造方法。

public NioEventLoopGroup() {
this(0);
}
public NioEventLoopGroup(int nThreads) {
this(nThreads, (Executor) null);
}
除此之外,还有多达8种构造方法,这些构造方法可以指定5种参数:
1、最大线程数量。如果指定为0,那么Netty会将线程数量设置为CPU逻辑处理器数量的2倍
2、线程工厂。要求线程工厂类必须实现java.util.concurrent.ThreadFactory接口。如果没有指定线程工厂,那么默认DefaultThreadFactory。
3、SelectorProvider。如果没有指定SelectorProvider,那么默认的SelectorProvider为SelectorProvider.provider()。
4、SelectStrategyFactory。如果没有指定则默认为DefaultSelectStrategyFactory.INSTANCE
5、RejectedExecutionHandler。拒绝策略处理类,如果这个EventLoopGroup已被关闭,那么之后提交的Runnable任务会默认调用RejectedExecutionHandler的reject方法进行处理。如果没有指定,则默认调用拒绝策略。

最终,NioEventLoopGroup会重载到父类MultiThreadEventExecutorGroup的构造方法上,这里省略了一些健壮性代码。

protected MultithreadEventExecutorGroup(int nThreads, Executor executor,EventExecutorChooserFactory chooserFactory, Object... args) {
// 步骤1
if (executor == null) {
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
} // 步骤2
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
children[i] = newChild(executor, args);
} // 步骤3
chooser = chooserFactory.newChooser(children); // 步骤4
final FutureListener<Object> terminationListener = future -> {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
};
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
} // 步骤5
Set<EventExecutor> childrenSet = new LinkedHashSet<>(children.length);
Collections.addAll(childrenSet, children);
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}

这里可以分解为5个步骤,下面一步步讲解

步骤1

第一个步骤是创建线程池executor。从workerGroup构造方法可知,默认传进来的executor为null,所以首先创建executor。newDefaultThreadFactory的作用是设置线程的前缀名和线程优先级,默认情况下,前缀名是nioEventLoopGroup-x-y这样的命名规则,而线程优先级则是5,处于中间位置。

创建完newDefaultThreadFactory后,进入到ThreadPerTaskExecutor。它直接实现了juc包的线程池顶级接口,从构造方法可以看到它只是简单的把factory赋值给自己的成员变量。而它实现的接口方法调用了threadFactory的newThread方法。从名字可以看出,它构造了一个thread,并立即启动thread。

public ThreadPerTaskExecutor(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@Override
public void execute(Runnable command) {
threadFactory.newThread(command).start();
}

那么我们回过头来看下DefaultThreadFactory的newThread方法,发现他创建了一个FastThreadLocalThread。这是netty自定义的一个线程类,顾名思义,netty认为它的性能更快。关于它的解析留待以后。这里步骤1创建线程池就完成了。总的来说他与我们通常使用的线程池不太一样,不设置线程池的线程数和任务队列,而是来一个任务启动一个线程。(问题:那任务一多岂不是直接线程爆炸?)

@Override
public Thread newThread(Runnable r) {
Thread t = newThread(FastThreadLocalRunnable.wrap(r), prefix + nextId.incrementAndGet());
return t;
}
protected Thread newThread(Runnable r, String name) {
return new FastThreadLocalThread(threadGroup, r, name);
}

步骤2

步骤2是创建workerGroup中的NioEventLoop。在示例代码中,传进来的线程数是0,显然不可能真的只创建0个nioEventLoop线程。在调用父类MultithreadEventLoopGroup构造函数时,对线程数进行了判断,若为0,则传入默认线程数,该值默认为2倍CPU核心数

protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
// 静态代码块初始化DEFAULT_EVENT_LOOP_THREADS
static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
}

接下来是通过newChild方法为每一个EventExecutor创建一个对应的NioEventLoop。这个方法传入了一些args到NioEventLoop中,前三个是在NioEventLoopGroup创建时传过来的参数。默认值见上文

  1. SlectorProvider.provider, 用于创建 Java NIO Selector 对象;
  2. SelectStrategyFactory, 选择策略工厂;
  3. RejectedExecutionHandlers, 拒绝执行处理器;
  4. EventLoopTaskQueueFactory,任务队列工厂,默认为null;

进入NioEventLoop的构造函数,如下:

NioEventLoop构造函数
NioEventLoop(NioEventLoopGroup parent, Executor executor, SelectorProvider selectorProvider,
SelectStrategy strategy, RejectedExecutionHandler rejectedExecutionHandler,
EventLoopTaskQueueFactory queueFactory) {
super(parent, executor, false, newTaskQueue(queueFactory), newTaskQueue(queueFactory),
rejectedExecutionHandler);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
if (strategy == null) {
throw new NullPointerException("selectStrategy");
}
provider = selectorProvider;
final SelectorTuple selectorTuple = openSelector();
selector = selectorTuple.selector;
unwrappedSelector = selectorTuple.unwrappedSelector;
selectStrategy = strategy;
}
// 父类构造函数
protected SingleThreadEventExecutor(EventExecutorGroup parent, Executor executor,
boolean addTaskWakesUp, Queue<Runnable> taskQueue,
RejectedExecutionHandler rejectedHandler) {
super(parent);
this.addTaskWakesUp = addTaskWakesUp;
this.maxPendingTasks = DEFAULT_MAX_PENDING_EXECUTOR_TASKS;
this.executor = ThreadExecutorMap.apply(executor, this);
this.taskQueue = ObjectUtil.checkNotNull(taskQueue, taskQueue");
rejectedExecutionHandler = ObjectUtil.checkNotNullrejectedHandler, "rejectedHandler");
}

首先调用一个newTaskQueue方法创建一个任务队列。这是一个mpsc即多生产者单消费者的无锁队列。之后调用父类的构造函数,在父类的构造函数中,将NioEventLoopGroup设置为自己的parent,并通过匿名内部类创建了这样一个Executor————通过ThreadPerTaskExecutor执行传进来的任务,并且在执行时将当前线程与NioEventLoop绑定。其他属性也一一设置。

在nioEventLoop构造函数中,我们发现创建了一个selector,不妨看一看netty对它的包装。

unwrappedSelector = provider.openSelector();
if (DISABLE_KEY_SET_OPTIMIZATION) {
return new SelectorTuple(unwrappedSelector);
}

首先看到netty定义了一个常量DISABLE_KEY_SET_OPTIMIZATION,如果这个常量设置为true,也即不对keyset进行优化,则直接返回未包装的selector。那么netty对selector进行了哪些优化?

final SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();

final class SelectedSelectionKeySet extends AbstractSet<SelectionKey> {

    SelectionKey[] keys;
int size; SelectedSelectionKeySet() {
keys = new SelectionKey[1024];
}
}

往下,我们看到了一个叫做selectedSelectionKeySet的类,点进去可以看到,它继承了AbstractSet,然而它的成员变量却让我们想到了ArrayList,再看看它定义的方法,除了不支持remove和contains外,活脱脱一个简化版的ArrayList,甚至也支持扩容。

没错,netty确实通过反射的方式,将selectionKey从Set替换为了ArrayList。仔细一想,却又觉得此番做法有些道理。众所周知,虽然HashSet和ArrayList随机查找的时间复杂度都是o(1),但相比数组直接通过偏移量定位,HashSet由于需要Hash运算,时间消耗上又稍稍逊色了些。再加上使用场景上,都是获取selectionKey集合然后遍历,Set去重的特性完全用不上,也无怪乎追求性能的netty想要替换它了。

步骤3

创建完workerGroup的NioEventLoop后,如何挑选一个nioEventLoop进行工作是netty接下来想要做的事。一般来说轮询是一个很容易想到的方案,为此需要创建一个类似负载均衡作用的线程选择器。当然追求性能到丧心病狂的netty是不会轻易满足的。我们看看netty在这样常见的方案里又做了哪些操作。

public EventExecutorChooser newChooser(EventExecutor[] executors) {
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
// PowerOfTwo
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
// Generic
public EventExecutor next() {
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}

可以看到,netty根据workerGroup内线程的数量采取了2种不同的线程选择器,当线程数x是2的幂次方时,可以通过&(x-1)来达到对x取模的效果,其他情况则需要直接取模。这与hashmap强制设置容量为2的幂次方有异曲同工之妙。

步骤4

步骤4就是添加一些保证健壮性而添加的监听器了,这些监听器会在EventLoop被关闭后得到通知。

步骤5

创建一个只读的NioEventLoop线程组

到此NioEventLoopGroup及其包含的NioEventLoop组就创建完成了

最新文章

  1. [Infopath]使用jquery给infopath表单的的field赋值。 how to set value to Infopath field by Jquery
  2. 移动硬盘安装win7,蓝屏,0x0000007B
  3. ICEM(1)—边界结构网格绘制
  4. code complete part1
  5. OpenCV 3.0函数库索引
  6. php防止表单重复提交
  7. shell脚本例子集锦(习题总结)
  8. volatile关键字的使用
  9. 暑假集训单切赛第一场 POJ 2309 BST(找规律的题)
  10. [转载]C#对象序列化与反序列化
  11. 零门槛!ZBLibrary仿微信朋友圈自定义View,就是这么简单!
  12. 安装J2EE的SDK报错:could not find the required version of the Java(TM)2 Runtime Environment in &#39;(null)&#39;的解决办法。
  13. APScheduler
  14. jQuery的load函数是异步的
  15. [C++]Linux之读取计算机网络数据[/proc/net/dev]
  16. Apache Flume 1.7.0 源码编译 导入Eclipse
  17. CH4901 关押罪犯
  18. vue-cli 添加less 以及sass
  19. Python实现的常用排序方法
  20. C#高级编程9-第5章 泛型

热门文章

  1. epoch,iteration与batchsize的区别
  2. 均值不等式中的一则题目$\scriptsize\text{$(a+\cfrac{1}{a})^2+(b+\cfrac{1}{b})^2\ge \cfrac{25}{2}$}$
  3. PE安装器说明by双心
  4. TODO list(咕咕咕。。。
  5. web 视频播放器clappr 相关
  6. mysql 创建分组
  7. TypeScript规则整理
  8. Django入门——《Python编程从入门到实践》
  9. gdal 根据条件选择数据
  10. ABS函数 去掉金额字段值为负数问题