这篇文章主要介绍了Netty分布式ByteBuf使用page级别的内存分配解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪

在上一节中,我们分析了命中缓存的内存分配逻辑。前提是,如果缓存中有数据,那么缓存中就没有数据。netty是如何开辟一块内存进行内存分配的?本节将带您进行分析:


netty内存分配数据结构

我们之前提到过,netty的内存分配单位是chunk,一个chunk的大小是16MB。实际上,每个块都以双向链表的形式存储在一个chunkList中,多个块列表也是通过双向链表连接的。大致结构如下:


在chunkList中,根据chunk的内存使用率将其分类到chunkList中,这样在分配内存时,会根据百分比找到对应的chunkList,并选择chunkList中的一个chunk进行内存分配。


我们看PoolArena中有关chunkList的成员变量private final PoolChunkListlt;Tgt; q050;private final PoolChunkListlt;Tgt; q025;private final PoolChunkListlt;Tgt; q000;private final PoolChunkListlt;Tgt; qInit;private final PoolChunkListlt;Tgt; q075;private final PoolChunkListlt;Tgt; q100;

这里总共定义了六个chunkList,并在构造函数中进行了初始化。

其施工方法为:

protected PoolArena(PooledByteBufAllocator parent, int pageSize, int maxOrder, int pageShifts, int chunkSize) { //代码省略 q100 = new PoolChunkListlt;Tgt;(null, 100, Integer.MAX_VALUE, chunkSize); q075 = new PoolChunkListlt;Tgt;(q100, 75, 100, chunkSize); q050 = new PoolChunkListlt;Tgt;(q075, 50, 100, chunkSize); q025 = new PoolChunkListlt;Tgt;(q050, 25, 75, chunkSize); q000 = new PoolChunkListlt;Tgt;(q025, 1, 50, chunkSize); qInit = new PoolChunkListlt;Tgt;(q000, Integer.MIN_VALUE, 25, chunkSize); //用双向链表的方式进行连接 q100.prevList(q075); q075.prevList(q050); q050.prevList(q025); q025.prevList(q000); q000.prevList(null); qInit.prevList(qInit); //代码省略}

首先用new PoolChunkList()的方式创建每个chunkList,我们用q050 = newPoolChunkListltTgt(q075,50,100,chunkSize)为例。

Q075表示当前q50的下一个节点是q075。刚才我们说ChunkList是双向链表链接的,所以这里不难理解。

参数50和100表示存储在当前chunkList中的块的内存使用在50%和100%之间,最后chunkSize设置其大小。

创建ChunkList后,设置上一个节点,以q050.prevList(q025)为例,表示当前chunkList的上一个节点是q025。

这样创建后,chunkList的节点关系如下图所示:


在netty中,chunk包含多个页面,每个页面的大小为8k。如果要分配16k的内存,可以通过在chunk中找到两个连续的页面来分配,对应关系如下:


很多场景下,给buffer分配8k内存也是一种浪费。例如,只需要分配2k的缓冲区。如果用8k,6k就浪费了。在这种情况下,netty会将页面切割成多个子页,每个子页的大小要根据分配的缓冲区大小来指定。例如,如果分配2k内存,一个页面将被分成四个子页面,每个子页面的大小为2k,如图所示:



我们看PoolSubpage的属性final PoolChunklt;Tgt; chunk;private final int memoryMapIdx;private final int runOffset;private final int pageSize; private final long[] bitmap;PoolSubpagelt;Tgt; prev;PoolSubpagelt;Tgt; next;boolean doNotDestroy; int elemSize;

Chunk表示其子页面属于哪个chunk。

位图用于记录子页的内存分配。

Prev和next,这意味着子页面根据双向链表进行链接,分别指向上一个节点和下一个节点。

ElemSize属性表示该子页根据多少内存进行划分。如果按照1k划分,可以划分为8个子页面。

在简单介绍了内存分配的数据结构之后,我们开始分析netty在页面级分配内存的过程:


我们回到PoolArena的allocate方法private void allocate(PoolThreadCache cache, PooledByteBuflt;Tgt; buf, final int reqCapacity) { //规格化 final int normCapacity = normalizeCapacity(reqCapacity); if (isTinyOrSmall(normCapacity)) { int tableIdx; PoolSubpagelt;Tgt;[] table; //判断是不是tinty boolean tiny = isTiny(normCapacity); if (tiny) { // lt; 512 //缓存分配 if (cache.allocateTiny(this, buf, reqCapacity, normCapacity)) { return; } //通过tinyIdx拿到tableIdx tableIdx = tinyIdx(normCapacity); //subpage的数组 table = tinySubpagePools; } else { if (cache.allocateSmall(this, buf, reqCapacity, normCapacity)) { return; } tableIdx = smallIdx(normCapacity); table = smallSubpagePools; } //拿到对应的节点 final PoolSubpagelt;Tgt; head = table[tableIdx]; synchronized (head) { final PoolSubpagelt;Tgt; s = head.next; //默认情况下, head的next也是自身 if (s != head) { assert s.doNotDestroy s.elemSize == normCapacity; long handle = s.allocate(); assert handle gt;= 0; s.chunk.initBufWithSubpage(buf, handle, reqCapacity); if (tiny) { allocationsTiny.increment(); } else { allocationsSmall.increment(); } return; } } allocateNormal(buf, reqCapacity, normCapacity); return; } if (normCapacity lt;= chunkSize) { //首先在缓存上进行内存分配 if (cache.allocateNormal(this, buf, reqCapacity, normCapacity)) { //分配成功, 返回 return; } //分配不成功, 做实际的内存分配 allocateNormal(buf, reqCapacity, normCapacity); } else { //大于这个值, 就不在缓存上分配 allocateHuge(buf, reqCapacity); }}

我们之前说过,如果缓存中分配不成功,会打开一个连续的内存进行缓冲区分配。这里我们先跳过isTinyOrSmall(normCapacity)之后的代码,在下一节分析。

首先,如果(normCapacity lt= chunkSize)表示小于16MB,然后先在缓存中分配。因为一开始缓存里没有值,所以会去分配Normal (BUF,Req Capacity,Norm Capacity)。在这里,它实际上是在页面级分配的,在一个或多个页面的空之间。


我们跟进allocateNormalprivate synchronized void allocateNormal(PooledByteBuflt;Tgt; buf, int reqCapacity, int normCapacity) { //首先在原来的chunk上进行内存分配(1) if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; } //创建chunk进行内存分配(2) PoolChunklt;Tgt; c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle gt; 0; //初始化byteBuf(3) c.initBuf(buf, handle, reqCapacity); qInit.add(c);}

这里主要拆解以下步骤。

1.在原始块中分配

2.创建用于分发的块

3.初始化ByteBuf

首先,让我们看看第一步,在原始块中进行分配:

if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return;}

如前所述,chunkList是存储不同内存使用情况的chunklist集合,每个chunk list由一个双向链表关联。这里,q050。allocate (BUF,Req Capacity,Norm Capacity)表示首先在Q050上分配内存。

我们先以q050为例进行分析,其次是Q050。分配(缓冲、请求能力、标准能力)方法:

boolean allocate(PooledByteBuflt;Tgt; buf, int reqCapacity, int normCapacity) { if (head == null || normCapacity gt; maxCapacity) { return false; } //从head节点往下遍历 for (PoolChunklt;Tgt; cur = head;;) { long handle = cur.allocate(normCapacity); if (handle lt; 0) { cur = cur.next; if (cur == null) { return false; } } else { cur.initBuf(buf, handle, reqCapacity); if (cur.usage() gt;= maxUsage) { remove(cur); nextList.add(cur); } return true; } }}


首先会从head节点往下遍历

longhandle = cur . allocate(norm capacity)

意味着对于每个块,尝试进行分配。

if(处理lt;0)如果没有分配,通过cur = cur.next找到下一个节点,继续分配。我们说过chunk也是通过双向链表关联的,所以要熟悉这个逻辑。

如果句柄大于0,内存已经分配,那么byteBuf由cur.initbuf (buf,handle,reqcapacity)初始化。

if(cur . usage()gt;= maxUsage)意味着当前chunk的内存使用量大于它的最大使用量,那么它将通过remove(cur)从当前chunkList中删除,并通过nextList.add(cur)添加到下一个chunkList中。

让我们回到PoolArena的allocateNormal方法:

让我们看看第二步。

PoolChunklt。Tgtc = newChunk(pageSize,maxOrder,pageShifts,chunkSize)

这里的参数pageSize是8192,也就是8k。

MaxOrder是11

PageShifts是13,2的13次方正好是8192,也就是8k。

ChunkSize是1677216,也就是16MB。

这里的参数值可以通过调试来跟踪。

因为我们的示例是堆外内存,newChunk (pagesize,maxorder,pageshifts,chunksize)将转到DirectArena的newChunk方法:

protected PoolChunklt;ByteBuffergt; newChunk(int pageSize, int maxOrder, int pageShifts, int chunkSize) { return new PoolChunklt;ByteBuffergt;( this, allocateDirect(chunkSize), pageSize, maxOrder, pageShifts, chunkSize);}


这里直接通过构造函数创建了一个chunk

allocated direct(chunk size)这里我们通过jdk的api申请了一个直接内存,我们按照PoolChunk的构造函数:

PoolChunk(PoolArenalt;Tgt; arena, T memory, int pageSize, int maxOrder, int pageShifts, int chunkSize) { unpooled = false; this.arena = arena; //memeory为一个ByteBuf this.memory = memory; //8k this.pageSize = pageSize; //13 this.pageShifts = pageShifts; //11 this.maxOrder = maxOrder; this.chunkSize = chunkSize; unusable = (byte) (maxOrder + 1); log2ChunkSize = log2(chunkSize); subpageOverflowMask = ~(pageSize - 1); freeBytes = chunkSize; assert maxOrder lt; 30 : "maxOrder should be lt; 30, but is: " + maxOrder; maxSubpageAllocs = 1 lt;lt; maxOrder; //节点数量为4096 memoryMap = new byte[maxSubpageAllocs lt;lt; 1]; //也是4096个节点 depthMap = new byte[memoryMap.length]; int memoryMapIndex = 1; //d相当于一个深度, 赋值的内容代表当前节点的深度 for (int d = 0; d lt;= maxOrder; ++ d) { int depth = 1 lt;lt; d; for (int p = 0; p lt; depth; ++ p) { memoryMap[memoryMapIndex] = (byte) d; depthMap[memoryMapIndex] = (byte) d; memoryMapIndex ++; } } subpages = newSubpageArray(maxSubpageAllocs);}


首先将参数传入的值进行赋值

This.memory = memory是保存参数中创建的堆外内存,也就是chunk指向的连续内存。在这个块中分配的ByteBuf将在这个存储器中被读取和写入。

我们重点关注内存映射=新字节[maxsubpagealloct;lt;1]

以及depth map = new byte[memory map . length]。

看内存映射= new byte[maxsubpageallocslt;lt;1]

这里初始化一个maxSubpageAllocs lt大小的字节数组memoryMaplt;1,也就是4096。

depth map = new byte[memoryMap . length]还用memory map的大小初始化一个字节数组,即4096。

在继续下一步之前,让我们看一下chunk的层次关系。


这是一个二叉树结构。左边的数字代表层次,右边代表连续的一块内存。每个父节点又分为几个子节点。顶层代表0-16MB的内存范围,分为两层,从0-8 MB到8-16 MB,以此类推。最后到达第11层,分为8k,也就是一页纸的大小。

如果我们分配一个8mb的缓冲区,我们就分配第二层的第一个节点,也就是0-8的连续内存。分配完成后,该节点将被设置为不可用。具体逻辑后面会解释。

结合上图,我们来看一下构造方法中的for循环:

for (int d = 0; d lt;= maxOrder; ++ d) { int depth = 1 lt;lt; d; for (int p = 0; p lt; depth; ++ p) { memoryMap[memoryMapIndex] = (byte) d; depthMap[memoryMapIndex] = (byte) d; memoryMapIndex ++; }}

这个for循环实际上是把上面的结构包装成一个字节数组memoryMap,外环用来控制层数,内环用来控制里面每一层的节点。在这里的循环之后,memoryMap和depthMap的内容如下:

[0, 0, 1, 1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 3, 3, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4, 4...........]

这里注意,因为程序中数组的下标是从1开始设置的,所以默认情况下第0个节点元素是0。

这里,数字表示层级和当前层级的节点,相同数量的数字是该层级中的节点数量。

其中0是2(因为这里下标从1开始,第0个位置是默认值0,但实际上第0层只有一个元素,是头节点),1是2,2是4,3是8,4是16,n是2的n次方,直到11,也就是11有2的11次方。



我们再回到PoolArena的allocateNormal方法中private synchronized void allocateNormal(PooledByteBuflt;Tgt; buf, int reqCapacity, int normCapacity) { //首先在原来的chunk上进行内存分配(1) if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; } //创建chunk进行内存分配(2) PoolChunklt;Tgt; c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle gt; 0; //初始化byteBuf(3) c.initBuf(buf, handle, reqCapacity); qInit.add(c);}

我们继续分析长柄= C .分配(定额容量)的步骤。


跟到allocate(normCapacity)中long allocate(int normCapacity) { if ((normCapacity subpageOverflowMask) != 0) { return allocateRun(normCapacity); } else { return allocateSubpage(normCapacity); }}

如果分配是基于页面的,请转到allocateRun(normCapacity)方法并遵循以下步骤:

private long allocateRun(int normCapacity) { int d = maxOrder - (log2(normCapacity) - pageShifts); int id = allocateNode(d); if (id lt; 0) { return id; } freeBytes -= runLength(id); return id;}

int = max order-(log2(norm capacity)-page shift)表示图5-8-5中的哪一层是根据定额容量计算的。

Intid = allocateNode(d)表示根据层次关系分配一个节点,其中id表示内存映射中的下标。


我们跟到allocateNode方法中private int allocateNode(int d) { //下标初始值为1 int id = 1; //代表当前层级第一个节点的初始下标 int initial = - (1 lt;lt; d); //获取第一个节点的值 byte val = value(id); //如果值大于层级, 说明chunk不可用 if (val gt; d) { return -1; } //当前下标对应的节点值如果小于层级, 或者当前下标小于层级的初始下标 while (val lt; d || (id initial) == 0) { //当前下标乘以2, 代表下当前节点的子节点的起始位置 id lt;lt;= 1; //获得id位置的值 val = value(id); //如果当前节点值大于层数(节点不可用) if (val gt; d) { //id为偶数则+1, id为奇数则-1(拿的是其兄弟节点) id ^= 1; //获取id的值 val = value(id); } } byte value = value(id); assert value == d (id initial) == 1 lt;lt; d : String.format("val = %d, id initial = %d, d = %d", value, id initial, d); //将找到的节点设置为不可用 setValue(id, unusable); //逐层往上标记被使用 updateParentsAlloc(id); return id;}

但在这里,我们实际上是从第一个节点往下看,找到D级的未使用节点,通过注释就能理解它的逻辑。

找到相关节点后,通过setValue将当前节点设置为不可用,其中id是当前节点的下标,unusable表示不可用值。这里是12。因为我们的层次结构只有12级,将其设置为12级相当于标记不可用。

设置为不可用后,通过updateParentsAlloc(id)设置为逐层使用


我们跟进updateParentsAlloc方法private void updateParentsAlloc(int id) { while (id gt; 1) { //取到当前节点的父节点的id int parentId = id gt;gt;gt; 1; //获取当前节点的值 byte val1 = value(id); //找到当前节点的兄弟节点 byte val2 = value(id ^ 1); //如果当前节点值小于兄弟节点, 则保存当前节点值到val, 否则, 保存兄弟节点值到val //如果当前节点是不可用, 则当前节点值是12, 大于兄弟节点的值, 所以这里将兄弟节点的值进行保存 byte val = val1 lt; val2 ? val1 : val2; //将val的值设置为父节点下标所对应的值 setValue(parentId, val); //id设置为父节点id, 继续循环 id = parentId; }}

事实上,该循环用父节点的值替换了同级节点的值。我们可以通过评论进行仔细的逻辑分析。

如果真的很难理解,我会用画图的方式帮你理解:

为简单起见,我们在这里只设置了三层:


这里,我们模拟它的分配场景,假设只有三层,其中index表示数组memoryMap的下标,value表示它的值,memoryMap中的值为[0,0,1,1,2,2,2,2]

我们需要分配一个4MB的byteBuf,调用allocateNode(int d)时传入的D是2,这是第二层。

按照我们上面分析的逻辑,这里会找到第二层的第一个节点,也就是0-4mb的节点。找到它后,它将被设置为不可用,因此memoryMap中的值为[0,0,1,1,12,2,2,2]

二叉树的结构将变成:


注意红色部分,将索引为4的节点设置为不可用。

该节点设置为不可用后,向上设置将不可用,循环中兄弟节点值较小的节点将被父节点替换,即索引为2的节点的值将被索引为5的节点的值替换,这样数组的值将变为[0,1,2,1,12,2,2]

二叉树的结构变成:


注意,这里的红色节点只代表节点的变化,并不是当前节点不可用。真实不可用状态的判断是基于12的值。

这样,如果再分配一个4MB内存的ByteBuf,按照它的逻辑,就会找到第二层的第二个节点,也就是4-8MB。

按照我们的逻辑,如果索引设置为2,就会设置为不可用状态,值设置为12,数组值就会变成[0,1,12,1,12,12,2,2]二叉树,如下图所示:


这样我们可以看到,通过分配两个4mb的byteBuf,当前节点及其父节点将被设置为不可用状态。当索引=2的节点设置为不可用时,将不会再找到该节点下的子节点。

依此类推,直到分配完所有内存,索引为1的节点将变得不可用,因此所有页面都被分配,并且块中不再有可用的节点。

我们再回到PoolArena的allocateNormal方法中private synchronized void allocateNormal(PooledByteBuflt;Tgt; buf, int reqCapacity, int normCapacity) { //首先在原来的chunk上进行内存分配(1) if (q050.allocate(buf, reqCapacity, normCapacity) || q025.allocate(buf, reqCapacity, normCapacity) || q000.allocate(buf, reqCapacity, normCapacity) || qInit.allocate(buf, reqCapacity, normCapacity) || q075.allocate(buf, reqCapacity, normCapacity)) { ++allocationsNormal; return; } //创建chunk进行内存分配(2) PoolChunklt;Tgt; c = newChunk(pageSize, maxOrder, pageShifts, chunkSize); long handle = c.allocate(normCapacity); ++allocationsNormal; assert handle gt; 0; //初始化byteBuf(3) c.initBuf(buf, handle, reqCapacity); qInit.add(c);}

从上面的逻辑我们知道,long handle = c . allocate(norm capacity)这个步骤实际上是返回了memoryMap的一个下标,通过这个下标我们可以唯一定位一块内存。

继续跟随,通过c.initbuf (buf,handle,reqcapacity)初始化ByteBuf后,通过qInit.add(c)将新创建的chunk添加到chunkList中


我们跟到initBuf方法中去void initBuf(PooledByteBuflt;Tgt; buf, long handle, int reqCapacity) { int memoryMapIdx = memoryMapIdx(handle); int bitmapIdx = bitmapIdx(handle); if (bitmapIdx == 0) { byte val = value(memoryMapIdx); assert val == unusable : String.valueOf(val); buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache()); } else { initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity); }}

这里找到了memorymapdx(handle)的下标,其实就是handle的值。

BitmapIdx(handle)是子页中使用的逻辑。如果是页级分配,这里只返回0,所以进入if块。

If首先断言当前节点是否不可用,然后通过init方法初始化它。

run offset(memorymapdx)表示偏移量,相当于分配给缓冲区的内存从chunk中申请的内存的第一个地址偏移了多少。

参数runLength(memorymapdx)表示根据下标可以得到最大可分配长度。

让我们跟随init,这里我们将转到PooledByteBuf的init方法:

void init(PoolChunklt;Tgt; chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) { //初始化 assert handle gt;= 0; assert chunk != null; //在哪一块内存上进行分配的 this.chunk = chunk; //这一块内存上的哪一块连续内存 this.handle = handle; memory = chunk.memory; this.offset = offset; this.length = length; this.maxLength = maxLength; tmpNioBuf = null; this.cache = cache;}

这又是我们熟悉的部分,初始化属性。

以上是DirectUnsafePooledByteBuf在页面级完全分配的完整流程,逻辑也很复杂。要想真正掌握,还需要多下功夫调试分析。关于Netty distributed ByteBuf使用页面级内存分配的更多信息,请关注搜源网其他相关文章!