Netty源碼解析 — PoolChunk實現原理

本文主要分享Netty中PoolChunk如何管理內存。
源碼分析基於Netty 4.1.52

內存管理算法

首先說明PoolChunk內存組織方式。
PoolChunk的內存大小默認是16M,Netty將它劃分為2048個page,每個page為8K。
PoolChunk上可以分配Normal內存塊。
Normal內存塊大小必須是page的倍數。

PoolChunk通過runsAvail字段管理內存塊。
PoolChunk#runsAvail是PriorityQueue數組,其中PriorityQueue存放的是handle。
handle可以理解為一個句柄,維護一個內存塊的信息,由以下部分組成

  • o: runOffset ,在chunk中page偏移索引,從0開始,15bit
  • s: size,當前位置可分配的page數量,15bit
  • u: isUsed,是否使用?, 1bit
  • e: isSubpage,是否在subpage中, 1bit
  • b: bitmapIdx,內存塊在subpage中的索引,不在subpage則為0, 32bit

前面《內存對齊類SizeClasses》文章說過,SizeClasses將sizeClasses表格中isMultipageSize為1的行取出可以組成一個新表格,這裡稱為Page表格

runsAvail數組默認長度為40,每個位置index上放的handle代表了存在一個可用內存塊,並且可分配pageSize大於等於(pageIdx=index)上的pageSize,小於(pageIdex=index+1)的pageSize。
如runsAvail[11]上的handle的size可分配pageSize可能為16 ~ 19,
假如runsAvail[11]上handle的size為18,如果該handle分配了7個page,剩下的11個page,這時要將handle移動runsAvail[8](當然,handle的信息要調整)。
這時如果要找分配6個page,就可以從runsAvail[5]開始查找runsAvail數組,如果前面runsAvail[5]~runsAvail[7]都沒有handle,就找到了runsAvail[8]。
分配6個page之後,剩下的5個page,handle移動runsAvail[4]。

先看一下PoolChunk的構造函數

PoolChunk(PoolArena<T> arena, T memory, int pageSize, int pageShifts, int chunkSize, int maxPageIdx, int offset) {
    // #1
    unpooled = false;
    this.arena = arena;
    this.memory = memory;
    this.pageSize = pageSize;
    this.pageShifts = pageShifts;
    this.chunkSize = chunkSize;
    this.offset = offset;
    freeBytes = chunkSize;

    runsAvail = newRunsAvailqueueArray(maxPageIdx);
    runsAvailMap = new IntObjectHashMap<Long>();
    subpages = new PoolSubpage[chunkSize >> pageShifts];

    // #2
    int pages = chunkSize >> pageShifts;
    long initHandle = (long) pages << SIZE_SHIFT;
    insertAvailRun(0, pages, initHandle);

    cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
}

#1
unpooled: 是否使用內存池
arena:該PoolChunk所屬的PoolArena
memory:底層的內存塊,對於堆內存,它是一個byte數組,對於直接內存,它是(jvm)ByteBuffer,但無論是哪種形式,其內存大小默認都是16M。
pageSize:page大小,默認為8K。
chunkSize:整個PoolChunk的內存大小,默認為16777216,即16M。
offset:底層內存對齊偏移量,默認為0。
runsAvail:初始化runsAvail
runsAvailMap:記錄了每個內存塊開始位置和結束位置的runOffset和handle映射。

#2 insertAvailRun方法在runsAvail數組最後位置插入一個handle,該handle代表page偏移位置為0的地方可以分配16M的內存塊

內存分配

PoolChunk#allocate

boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int sizeIdx, PoolThreadCache cache) {
    final long handle;
    // #1
    if (sizeIdx <= arena.smallMaxSizeIdx) {
        // small
        handle = allocateSubpage(sizeIdx);
        if (handle < 0) {
            return false;
        }
        assert isSubpage(handle);
    } else {
        // #2
        int runSize = arena.sizeIdx2size(sizeIdx);
        handle = allocateRun(runSize);
        if (handle < 0) {
            return false;
        }
    }

    // #3
    ByteBuffer nioBuffer = cachedNioBuffers != null? cachedNioBuffers.pollLast() : null;
    initBuf(buf, nioBuffer, handle, reqCapacity, cache);
    return true;
}

#1 處理Small內存塊申請,調用allocateSubpage方法處理,後續文章解析。
#2 處理Normal內存塊申請
sizeIdx2size方法根據內存塊索引查找對應內存塊size。sizeIdx2size是PoolArena父類SizeClasses提供的方法,可參考系列文章《內存對齊類SizeClasses》。
allocateRun方法負責分配Normal內存塊,返回handle存儲了分配的內存塊大小和偏移量。

#3 使用handle和底層內存類(ByteBuffer)初始化ByteBuf了。

private long allocateRun(int runSize) {
    // #1
    int pages = runSize >> pageShifts;
    // #2
    int pageIdx = arena.pages2pageIdx(pages);

    synchronized (runsAvail) {
        //find first queue which has at least one big enough run
        // #3
        int queueIdx = runFirstBestFit(pageIdx);
        if (queueIdx == -1) {
            return -1;
        }

        //get run with min offset in this queue
        PriorityQueue<Long> queue = runsAvail[queueIdx];
        long handle = queue.poll();

        assert !isUsed(handle);
        // #4
        removeAvailRun(queue, handle);
        // #5
        if (handle != -1) {
            handle = splitLargeRun(handle, pages);
        }
        // #6
        freeBytes -= runSize(pageShifts, handle);
        return handle;
    }
}

#1 計算所需的page數量
#2 計算對應的pageIdx
注意,pages2pageIdx方法會將申請內存大小對齊為上述Page表格中的一個size。例如申請172032位元組(21個page)的內存塊,pages2pageIdx方法計算結果為13,實際分配196608(24個page)的內存塊。
#3 從pageIdx開始遍歷runsAvail,找到第一個handle。
該handle上可以分配所需內存塊。
#4 從runsAvail,runsAvailMap移除該handle信息
#5#3步驟找到的handle上劃分出所要的內存塊。
#6 減少可用內存位元組數

private long splitLargeRun(long handle, int needPages) {
    assert needPages > 0;

    // #1
    int totalPages = runPages(handle);
    assert needPages <= totalPages;

    int remPages = totalPages - needPages;

    // #2 
    if (remPages > 0) {
        int runOffset = runOffset(handle);

        // keep track of trailing unused pages for later use
        int availOffset = runOffset + needPages;
        long availRun = toRunHandle(availOffset, remPages, 0);
        insertAvailRun(availOffset, remPages, availRun);

        // not avail
        return toRunHandle(runOffset, needPages, 1);
    }

    //mark it as used
    handle |= 1L << IS_USED_SHIFT;
    return handle;
}

#1 totalPages,從handle中獲取當前位置可用page數。
remPages,分配後剩餘page數。
#2 剩餘page數大於0
availOffset,計算剩餘page開始偏移量
生成一個新的handle,availRun
insertAvailRun將availRun插入到runsAvail,runsAvailMap中

內存釋放

void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
    ...

    // #1
    int pages = runPages(handle);

    synchronized (runsAvail) {
        // collapse continuous runs, successfully collapsed runs
        // will be removed from runsAvail and runsAvailMap
        // #2
        long finalRun = collapseRuns(handle);

        // #3
        finalRun &= ~(1L << IS_USED_SHIFT);
        //if it is a subpage, set it to run
        finalRun &= ~(1L << IS_SUBPAGE_SHIFT);
        insertAvailRun(runOffset(finalRun), runPages(finalRun), finalRun);
        freeBytes += pages << pageShifts;
    }

    if (nioBuffer != null && cachedNioBuffers != null &&
        cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
        cachedNioBuffers.offer(nioBuffer);
    }
}

#1 計算釋放的page數
#2 如果可以,將前後的可用內存塊進行合併
#3 插入新的handle

collapseRuns

private long collapseRuns(long handle) {
    return collapseNext(collapsePast(handle));
}

collapsePast方法合併前面的可用內存塊
collapseNext方法合併後面的可用內存塊

private long collapseNext(long handle) {
    for (;;) {
        // #1
        int runOffset = runOffset(handle);
        int runPages = runPages(handle);

        Long nextRun = getAvailRunByOffset(runOffset + runPages);
        if (nextRun == null) {
            return handle;
        }

        int nextOffset = runOffset(nextRun);
        int nextPages = runPages(nextRun);

        //is continuous
        // #2
        if (nextRun != handle && runOffset + runPages == nextOffset) {
            //remove next run
            removeAvailRun(nextRun);
            handle = toRunHandle(runOffset, runPages + nextPages, 0);
        } else {
            return handle;
        }
    }
}

#1 getAvailRunByOffset方法從runsAvailMap中找到下一個內存塊的handle。
#2 如果是連續的內存塊,則移除下一個內存塊handle,並將其page合併生成一個新的handle。

下面來看一個例子

大家可以結合例子中runsAvail和內存使用情況的變化,理解上面的代碼。
實際上,2個Page的內存塊是通過Subpage分配,回收時會放回線程緩存中而不是直接釋放存塊,但為了展示PoolChunk中內存管理過程,圖中不考慮這些場景。

PoolChunk在Netty 4.1.52版本修改了算法,引入了jemalloc 4的算法 — //github.com/netty/netty/commit/0d701d7c3c51263a1eef56d5a549ef2075b9aa9e#diff-6850686cf7ebc7b9ddb873389ded45ebf40e6c1ccf411c44b744e7d3ca2ff774
Netty 4.1.52之前的版本,PoolChunk引入的是jemalloc 3的算法,使用二叉樹管理內存塊。有興趣的同學可以參考我後續的文章《PoolChunk實現(jemalloc 3的算法)》

如果您覺得本文不錯,歡迎關注我的微信公眾號,系列文章持續更新中。您的關注是我堅持的動力!