PooledByteBufAllocator buffer分配

buffer分配的入口:

io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(int, int)

netty实际应用时分配调用栈:

CLASS_NAME METHOD_NAME LINE_NUM
io/netty/buffer/PooledByteBufAllocator newDirectBuffer 339
io/netty/buffer/AbstractByteBufAllocator directBuffer 185
io/netty/buffer/AbstractByteBufAllocator directBuffer 176
io/netty/buffer/AbstractByteBufAllocator ioBuffer 139
io/netty/channel/DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle allocate 114
io/netty/channel/nio/AbstractNioByteChannel$NioByteUnsafe read 186
io/netty/channel/nio/NioEventLoop processSelectedKey 682
io/netty/channel/nio/NioEventLoop processSelectedKeysOptimized 628
io/netty/channel/nio/NioEventLoop processSelectedKeys 533
io/netty/channel/nio/NioEventLoop run 511
io/netty/util/concurrent/SingleThreadEventExecutor$5 run 956

测试case代码

package io.netty.buffer;

import org.junit.Assert;
public class PooledByteBufTest { public static void main(String[] args) {
final PooledByteBufAllocator allocator = new PooledByteBufAllocator(
false, // preferDirect
0, // nHeapArena
1, // nDirectArena
8192, // pageSize
11, // maxOrder
3, // tinyCacheSize
3, // smallCacheSize
3, // normalCacheSize
true // useCacheForAllThreads
); // create tiny buffer
final ByteBuf b1 = allocator.directBuffer(24);
// create small buffer
final ByteBuf b2 = allocator.directBuffer(800);
// create normal buffer
final ByteBuf b3 = allocator.directBuffer(8192 * 2); Assert.assertNotNull(b1);
Assert.assertNotNull(b2);
Assert.assertNotNull(b3); // then release buffer to deallocated memory while threadlocal cache has been disabled
// allocations counter value must equals deallocations counter value
Assert.assertTrue(b1.release());
Assert.assertTrue(b2.release());
Assert.assertTrue(b3.release());
}
}

PoolChunk

PoolChunk本身数据结构与设计思路参见PoolChunk注释:

/**
* Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
*
* Notation: The following terms are important to understand the code
* > page - a page is the smallest unit of memory chunk that can be allocated
* page是chunk中能分配的最小单元
* > chunk - a chunk is a collection of pages
* 一个chunk中有一组page 1对多
* > in this code chunkSize = 2^{maxOrder} * pageSize
* 代码中 chunksize大小计算如上 maxOrder 是啥?
*
* To begin we allocate a byte array of size = chunkSize
* Whenever a ByteBuf of given size needs to be created we search for the first position
* in the byte array that has enough empty space to accommodate the requested size and
* return a (long) handle that encodes this offset information, (this memory segment is then
* marked as reserved so it is always used by exactly one ByteBuf and no more)
* 首先,当需要创建给定大小的ByteBuf时,我们分配一个size=chunkSize的字节数组,
* 在字节数组中搜索第一个有足够的空空间来容纳请求的大小的位置,
* 并返回一个(长)句柄来编码该偏移量信息(然后将该内存段标记为保留,因此它总是仅由一个ByteBuf使用,不再使用)
*
* For simplicity all sizes are normalized according to PoolArena#normalizeCapacity method
* This ensures that when we request for memory segments of size >= pageSize the normalizedCapacity
* equals the next nearest power of 2
* 为了简单起见,所有大小都按照PoolArena#normalizeCapacity方法进行规范化
* 这确保当我们请求大小大于等于pageSize的内存段时,normalized容量等于下一个最接近的2的幂
*
* To search for the first offset in chunk that has at least requested size available we construct a
* complete balanced binary tree and store it in an array (just like heaps) - memoryMap
* 为了搜索块中至少有请求大小可用的第一个偏移量,我们构造了一个完整的平衡二叉树,并将其存储在一个数组(就像堆一样)-内存映射中
*
* The tree looks like this (the size of each node being mentioned in the parenthesis)
* 树看起来是这样的(括号中提到的每个节点的大小)
*
* depth=0 1 node (chunkSize)
* depth=1 2 nodes (chunkSize/2)
* ..
* ..
* depth=d 2^d nodes (chunkSize/2^d)
* ..
* depth=maxOrder 2^maxOrder nodes (chunkSize/2^{maxOrder} = pageSize) pageSize 在最下一层 最顶层是chunksize 从上往下走,每过一层除以2
*
* depth=maxOrder is the last level and the leafs consist of pages
*
* With this tree available searching in chunkArray translates like this:
* To allocate a memory segment of size chunkSize/2^k we search for the first node (from left) at height k
* which is unused 要分配大小为chunkSize/2^k的内存段,我们在高度k处搜索第一个未使用的节点(从左开始)。 嗯嗯
*
* Algorithm:
* ----------
* Encode the tree in memoryMap with the notation 用符号将树编码在内存中
* memoryMap[id] = x => in the subtree rooted at id, the first node that is free to be allocated
* is at depth x (counted from depth=0) i.e., at depths [depth_of_id, x), there is no node that is free
* 在以id为根的子树中,可自由分配的第一个节点在深度x(从深度=0开始计算),即在深度[深度id,x的深度]处,没有可自由分配的节点
*
* As we allocate & free nodes, we update values stored in memoryMap so that the property is maintained
* 当我们分配空闲节点时,我们更新存储在memoryMap中的值,以便维护属性
*
* Initialization -
* In the beginning we construct the memoryMap array by storing the depth of a node at each node
* 首先,我们通过在每个节点上存储一个节点的深度来构造memoryMap数组
* i.e., memoryMap[id] = depth_of_id
*
* Observations:
* -------------
* 1) memoryMap[id] = depth_of_id => it is free / unallocated
* 2) memoryMap[id] > depth_of_id => at least one of its child nodes is allocated, so we cannot allocate it, but
* some of its children can still be allocated based on their availability
* 3) memoryMap[id] = maxOrder + 1 => the node is fully allocated & thus none of its children can be allocated, it
* is thus marked as unusable
*
* Algorithm: [allocateNode(d) => we want to find the first node (from left) at height h that can be allocated]
* ----------
* 1) start at root (i.e., depth = 0 or id = 1)
* 2) if memoryMap[1] > d => cannot be allocated from this chunk
* 3) if left node value <= h; we can allocate from left subtree so move to left and repeat until found
* 4) else try in right subtree
*
* Algorithm: [allocateRun(size)]
* ----------
* 1) Compute d = log_2(chunkSize/size)
* 2) Return allocateNode(d)
*
* Algorithm: [allocateSubpage(size)]
* ----------
* 1) use allocateNode(maxOrder) to find an empty (i.e., unused) leaf (i.e., page)
* 2) use this handle to construct the PoolSubpage object or if it already exists just call init(normCapacity)
* note that this PoolSubpage object is added to subpagesPool in the PoolArena when we init() it
*
* Note:
* -----
* In the implementation for improving cache coherence,
* we store 2 pieces of information depth_of_id and x as two byte values in memoryMap and depthMap respectively
*
* memoryMap[id]= depth_of_id is defined above
* depthMap[id]= x indicates that the first node which is free to be allocated is at depth x (from root)
*/
final class PoolChunk<T> implements PoolChunkMetric {

io.netty.buffer.PoolArena.findSubpagePoolHead(int) 算出page header在page table中的index,小的page在前面

// trace 库地址 jdbc:h2:/Users/simon/twice-cooked-pork/trace-data/基于netty4做的resetserver的一次http请求trace/tracer.data.h2db

PoolChunk要解决的问题有:

  1. 快速查找未分配的地方并分配
  2. 尽量不要有碎片,可以理解成尽量挨着紧凑的分配

整个chunk的结构如下:

                                                    +------+   chunksize 当L=11时,是16M
L=0 | 0 |
+----------------+------+------------------+
| |
| |
| |
+---v--+ +---v--+
L=1 | 1 | | 2 |
+------+------+------+ +------+------+-------+
| | | |
| | | |
| | | |
+---v--+ +---v--+ +---v--+ +---v--+
L=2 | 3 | | 4 | | 5 | | 6 |
+--+------+-+ +-+------+--+ +--+------+--+ +-+------+--+
| | | | | | | |
| | | | | | | |
| | | | | | | |
+--v---+ +---v--+ +--v---+ +---v--+ +-v----+ +---v--+ +--v---+ +---v--+
L=3 | 7 | | 8 | | 9 | | 10 | | 11 | | 12 | | 13 | | 14 |
+------+ +------+ +------+ +------+ +------+ +------+ +------+ +------+
8K大小即page size

是一个完全二叉树,树的层高可以自定义,目前限制在14层内,默认是11层。

最底层是真正的chunk描述,最底层每个叶子是一个paage,大小为8K。那么当层数是11层时,chunk的size是16M。因为11层的话,最下面一层叶子是2的11次方,再乘以8K正好是16MB。

这棵树中每个节点还对对应其相应的大小是否被分配。什么叫其相应的大小?是这样的,每一层代表需要分配的大小的档次。暂且用档次这个词吧。最上面是16MB档次,最下面是8K档次,从最上面开始往下走一层,档次就除以2。

每次申请内存时,netty会先对其做规格化,所谓规格化就是最接近申请内存值的2de整数次幂。比如我申请900byte,那么规格化后就是1K。在规格化后,netty会在树上标志 0 1 3 7被使用了。下次要再申请8K内存时就要避开这个路径了,只能是 0 1 3 8 了,因为7那边已经不够了。其他大小同理。所以树上的节点是为了标志是否被使用过,以使得内存碎片减少尽量靠左紧凑分配。 对于单page内的内存使用浪费问题,netty又做了一层位图结构使其得以利用。对于chunk对象的查找,netty还做了缓存机制,下面有讲。

真正数据存放在 io.netty.buffer.PoolChunk.memory 这个字段中,调试时为:java.nio.DirectByteBuffer[pos=0 lim=16777216 cap=16777216]

16777216是16M

作业

仔细调试 1K 2k 3K 8K 11K 内存的多次分配与回收。

分配24byte过程

PooledUnsafeDirectByteBuf是用了对象池特性io.netty.buffer.PooledUnsafeDirectByteBuf.RECYCLER

PoolArena

PoolArena 这一层负责创建与维护PoolChunk,维护的方式是将用到的正在分配中的PoolChunk放到PoolChunkList这个列表中。

PoolChunkList是一个链是结构。

而且,PoolArena还按PoolChunk的使用量分别维护到相对应的PoolChunkList中。

// abstract class PoolArena<T> implements PoolArenaMetric {
private final PoolChunkList<T> q050;
private final PoolChunkList<T> q025;
private final PoolChunkList<T> q000;
private final PoolChunkList<T> qInit;
private final PoolChunkList<T> q075;
private final PoolChunkList<T> q100;

这些PoolChunkList也是按使用量大小有序的链式的串在一起(参见PoolArena构造方法中初始化这些list字段的代码),当使用量达到本级别时,会加入到下一级别的list中,比如达到25%了,那么就会加到50%列表中了。(参见io.netty.buffer.PoolChunkList.add(PoolChunk))

void add(PoolChunk<T> chunk) {
if (chunk.usage() >= maxUsage) {
nextList.add(chunk);
return;
}
add0(chunk);
}

PoolArena中还维护了两个PoolSubpage数组,每个数组里面的实例在PoolArena构造时初始化,刚初始化后每个PoolSubpage元素的前继与后继元素都是指向自己(PoolSubpage是支持链表式的一个结构)

在io.netty.buffer.PoolSubpage.addToPool(PoolSubpage)时 会将io.netty.buffer.PoolChunk.allocateSubpage(int)过程中新构建出来的PoolSubpage实例加到head的next节点上(即后继节点)。 具体代码如下:

    private long allocateSubpage(int normCapacity) {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity); // 这个是查找PoolArena的PoolSubpage数组
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
synchronized (head) {
int id = allocateNode(d);
if (id < 0) {
return id;
} final PoolSubpage<T>[] subpages = this.subpages;
final int pageSize = this.pageSize; freeBytes -= pageSize; int subpageIdx = subpageIdx(id);
PoolSubpage<T> subpage = subpages[subpageIdx];
if (subpage == null) {
subpage = new PoolSubpage<T>(head, this, id, runOffset(id), pageSize, normCapacity); // 此处会将新新构建出来的PoolSubpage实例加到head的next节点
subpages[subpageIdx] = subpage;
} else {
subpage.init(head, normCapacity);
}
return subpage.allocate();
}
}

PoolArenad的cache与Recycler对象池

PooledByteBuf依赖PoolThreadCache做了一层对PoolChunk的缓存,PoolThreadCache靠MemoryRegionCache实现缓存。MemoryRegionCache靠队列来实现对PoolChunk的缓存(参见下面代码1),MemoryRegionCache在buf释放时会调用其add接口将释放的PoolChunk对象和nioBuffer对象通过io.netty.buffer.PoolThreadCache.MemoryRegionCache.Entry对象包装后加入(offer)到队列(参见下面堆栈1)。在io.netty.buffer.PoolThreadCache.MemoryRegionCache.allocate(PooledByteBuf, int)时再从队列中直接poll出来,达成cache的目的。优化还没有结束,包装PoolChunk用的Entry对象是通过Recycler对象池完成分配(获取)已释放的。对象是本质上一个通过FastThreadLocal的Stack的数据结构,分配对应出栈,释放对象入栈。具体参见下面代码2。

Recycler

是一个基于ThreadLocal结合stack玩起来的一个对象池数据结构,像上述这种就是PooledUnsafeDirectByteBuf的对象pool。回收的时候压栈,要用的时候出栈。

获取对象 io.netty.util.Recycler.get()

回收对象 io.netty.util.Recycler.DefaultHandle.recycle(Object)

代码1: 队列初始化

Queue<Entry<T>> queue = PlatformDependent.newFixedMpscQueue(this.size);

堆栈1:buf释放时会调用MemoryRegionCache add接口将释放的PoolChunk对象包装后入队:

Thread [main] (Suspended (breakpoint at line 393 in PoolThreadCache$MemoryRegionCache))
PoolThreadCache$SubPageMemoryRegionCache<T>(PoolThreadCache$MemoryRegionCache<T>).add(PoolChunk<T>, ByteBuffer, long) line: 393
PoolThreadCache.add(PoolArena<?>, PoolChunk, ByteBuffer, long, int, SizeClass) line: 209
PoolArena$DirectArena(PoolArena<T>).free(PoolChunk<T>, ByteBuffer, long, int, PoolThreadCache) line: 273
PooledUnsafeDirectByteBuf(PooledByteBuf<T>).deallocate() line: 171
PooledUnsafeDirectByteBuf(AbstractReferenceCountedByteBuf).release0(int) line: 136
PooledUnsafeDirectByteBuf(AbstractReferenceCountedByteBuf).release() line: 124
PooledByteBufTest.main(String[]) line: 43

代码2:Entry对象使用对象池

private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
@SuppressWarnings("unchecked")
@Override
protected Entry newObject(Handle<Entry> handle) {
return new Entry(handle);
}
}; private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle) {
Entry entry = RECYCLER.get();
entry.chunk = chunk;
entry.nioBuffer = nioBuffer;
entry.handle = handle;
return entry;
} @Override
public void recycle(Object object) {
if (object != value) {
throw new IllegalArgumentException("object does not belong to handle");
} Stack<?> stack = this.stack;
if (lastRecycledId != recycleId || stack == null) {
throw new IllegalStateException("recycled already");
} stack.push(this);
}

PooledByteBufAllocator创建及其关联细节

  1. PooledByteBufAllocator validateAndCalculateChunkSize 校验树高度不能超过14,且根据pageSize(可以外部指定)和树高计算出chunksize
  2. PooledByteBufAllocator validateAndCalculatePageShifts 校验pageSize最小不能小于4K,且pageSize必须是2的整数次方((pageSize & pageSize - 1) != 0) (为什么(pageSize & pageSize - 1) != 0能判断?因为2的n次方的二进制形式一定是第一位1后面接n个0,减1后就变成第一位0后面接n个1,相与之后一定是0;如果不是2的n次方的数的二进制形式一定是第一位是1,且,这个数减去1后,第一位一定还是1,因为第一位是1且后面全接0的数一定是2的整数次方的,那么不是2的整数次方的数后面一定不全是0,所以减去1后第一位肯定还是1,所以不管后面接的这些数相与是怎样的结果,第一位两个1相与出来肯定是1,肯定不为0,所以能用这个办法判断)
  3. 创建tinySubpagePools数组并初始化里面的元素,默认数组大小32个,里面的元素是PoolSubpage,PoolSubpage还支持链式形式连接(他有前继和后继)

PoolChunk 分配与释放小于pagesize的buf

io.netty.buffer.PoolArena.free(PoolChunk, ByteBuffer, long, int, PoolThreadCache)

位图相关:

// long64位 取 高32位转成整数
private static int bitmapIdx(long handle) {
return (int) (handle >>> Integer.SIZE);
}

PoolSubpage 支持位图

一个page 8192大小 一个块(element)大小32,那么一个page可以拆成256个,每申请一次numAvail减去1。

long型位图数组中有个8个元素,8192/16/64=8, 64是long的位数,。

分配时bitmap中元素,以第一个元素为例子,按1 3 7 15 31 63 127网上涨,释放的时候按对应数据往下减,并且在释放时记录nextAvail值,便于下次申请时优先使用。

bitmap中的4个(bitmapLength)long来维护256个(maxNumElems=pageSize/elemSize)块是否使用的情况。

final class PoolSubpage<T> implements PoolSubpageMetric {

    final PoolChunk<T> chunk;
private final int memoryMapIdx;
private final int runOffset;
private final int pageSize;
private final long[] bitmap; // 位图...,默认有8个元素 个数= pagesize >>> 10 (pagesize / 16 / 64)64应该是long的位数,16是啥?一个element算256。 实际这个数组默认只用4个元素 PoolSubpage<T> prev;
PoolSubpage<T> next; boolean doNotDestroy;
int elemSize;
private int maxNumElems; // 一个page再分maxNumElems分 默认是256
private int bitmapLength; // 默认是4 256 >>> 6 = 4
private int nextAvail; // 在有buf释放时会设置这个值,以使得他们在下次分配时优先使用这个
private int numAvail; long allocate() {
if (elemSize == 0) {
return toHandle(0);
} if (numAvail == 0 || !doNotDestroy) {
return -1;
} final int bitmapIdx = getNextAvail();
int q = bitmapIdx >>> 6;
int r = bitmapIdx & 63;
assert (bitmap[q] >>> r & 1) == 0;
bitmap[q] |= 1L << r; // 按1 3 7 15 31 63 127往上涨 if (-- numAvail == 0) {
removeFromPool();
} return toHandle(bitmapIdx);
} private int findNextAvail() {
final long[] bitmap = this.bitmap;
final int bitmapLength = this.bitmapLength;
for (int i = 0; i < bitmapLength; i ++) {
long bits = bitmap[i];
if (~bits != 0) { // 这个表示这个long上是否所有的位都用完了。。
return findNextAvail0(i, bits);
}
}
return -1;
} private int findNextAvail0(int i, long bits) {
final int maxNumElems = this.maxNumElems;
final int baseVal = i << 6; for (int j = 0; j < 64; j ++) {
if ((bits & 1) == 0) { // 判断是否是偶数
int val = baseVal | j;
if (val < maxNumElems) {
return val;
} else {
break;
}
}
bits >>>= 1; // 除以2 并向靠近的2的整数次幂对齐
}
return -1;
}

free时不是每次都会真正释放,在下面会先加入到MemoryRegionCache的queue中cache起来,当queue中放不下时才真正free,代码如下:

// PoolArena.class
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
activeBytesHuge.add(-size);
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
} freeChunk(chunk, handle, sizeClass, nioBuffer);
}
}

最新文章

  1. Android设置图片内存溢出(OOM)问题——Android开发进阶之路6
  2. bzoj1051Tarjan裸题
  3. call()与apply()
  4. 【项目经验】--EasyUI DataGrid之右键菜单
  5. JQuery小结
  6. Plus One Linked List
  7. ajax views
  8. java第一课:环境、变量、数据类型
  9. 二叉树的实现 -- 数据结构与算法的javascript描述 第十章
  10. 打开新世界的第一步:学习servlet
  11. 详解MySQL表空间以及ibdata1文件过大问题
  12. 【codechef】FN/Fibonacci Number
  13. &lt;转载&gt;bellman-ford算法
  14. hdoj:2053
  15. MT【212】合作共赢
  16. sqlserver服务启动后停止,传递给数据库 &#39;master&#39; 中的日志扫描操作的日志扫描号无效
  17. FLIR ONE PRO热成像仪
  18. tomcat 的自问自答与总结
  19. binlog介绍
  20. linux下使用docker-thunder-xware进行离线下载

热门文章

  1. Python 简明教程 --- 24,Python 文件读写
  2. 题解 洛谷 P3298 【[SDOI2013]泉】
  3. Oracle修改表类型方法
  4. 基于Python爬虫采集天气网实时信息
  5. Winform开发中的困境及解决方案
  6. 细说JavaScript 导出 上万条Excel数据
  7. Java复习总结(二)Java SE 面试题
  8. 面试官:如何在Integer类型的ArrayList中同时添加String、Character、Boolean等类型的数据? | Java反射高级应用
  9. Django学习路26_转换字符串大小写 upper,lower
  10. PHP floatval()、doubleval () 函数