Skip to content

Commit

Permalink
Make "pinned memory" from PooledByteBufAllocator reflect buffers in u…
Browse files Browse the repository at this point in the history
…se (netty#11990)

Motivation:
The pinned memory accounting was being done from a bad place.
Namely, it was being done at the same time as the PoolChunk free memory accounting was being done.
That means the thread local caching was messing up the accuracy of the numbers.

Modification:
Move pinned memory accounting to the PooledByteBuf init and deallocation methods, so it's tied to the buffer life cycle.

Result:
The pinned memory accounting is updated as part of the pooled buffer life cycle, and is no longer being tricked by the activities of the thread-local buffer cache.

Fixes netty#11984
  • Loading branch information
chrisvest authored Jan 11, 2022
1 parent d60ea59 commit 3324707
Show file tree
Hide file tree
Showing 3 changed files with 116 additions and 7 deletions.
30 changes: 23 additions & 7 deletions buffer/src/main/java/io/netty/buffer/PoolChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.PriorityQueue;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
Expand Down Expand Up @@ -160,6 +161,11 @@ final class PoolChunk<T> implements PoolChunkMetric {
*/
private final PoolSubpage<T>[] subpages;

/**
* Accounting of pinned memory – memory that is currently in use by ByteBuf instances.
*/
private final AtomicInteger pinnedBytes;

private final int pageSize;
private final int pageShifts;
private final int chunkSize;
Expand All @@ -172,7 +178,6 @@ final class PoolChunk<T> implements PoolChunkMetric {
private final Deque<ByteBuffer> cachedNioBuffers;

int freeBytes;
int pinnedBytes;

PoolChunkList<T> parent;
PoolChunk<T> prev;
Expand Down Expand Up @@ -202,6 +207,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
insertAvailRun(0, pages, initHandle);

cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
pinnedBytes = new AtomicInteger();
}

/** Creates a special chunk that is not pooled. */
Expand All @@ -217,6 +223,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
subpages = null;
chunkSize = size;
cachedNioBuffers = null;
pinnedBytes = new AtomicInteger();
}

private static LongPriorityQueue[] newRunsAvailqueueArray(int size) {
Expand Down Expand Up @@ -343,7 +350,6 @@ private long allocateRun(int runSize) {

int pinnedSize = runSize(pageShifts, handle);
freeBytes -= pinnedSize;
pinnedBytes += pinnedSize;
return handle;
}
}
Expand Down Expand Up @@ -453,7 +459,6 @@ private long allocateSubpage(int sizeIdx) {
*/
void free(long handle, int normCapacity, ByteBuffer nioBuffer) {
int runSize = runSize(pageShifts, handle);
pinnedBytes -= runSize;
if (isSubpage(handle)) {
int sizeIdx = arena.size2SizeIdx(normCapacity);
PoolSubpage<T> head = arena.findSubpagePoolHead(sizeIdx);
Expand Down Expand Up @@ -557,8 +562,9 @@ private static long toRunHandle(int runOffset, int runPages, int inUsed) {
void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity,
PoolThreadCache threadCache) {
if (isRun(handle)) {
int maxLength = runSize(pageShifts, handle);
buf.init(this, nioBuffer, handle, runOffset(handle) << pageShifts,
reqCapacity, runSize(pageShifts, handle), arena.parent.threadCache());
reqCapacity, maxLength, arena.parent.threadCache());
} else {
initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
}
Expand All @@ -577,6 +583,18 @@ void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle,
buf.init(this, nioBuffer, handle, offset, reqCapacity, s.elemSize, threadCache);
}

void incrementPinnedMemory(int delta) {
assert delta > 0;
int result = pinnedBytes.addAndGet(delta);
assert result > 0;
}

void decrementPinnedMemory(int delta) {
assert delta > 0;
int result = pinnedBytes.addAndGet(-delta);
assert result >= 0;
}

@Override
public int chunkSize() {
return chunkSize;
Expand All @@ -590,9 +608,7 @@ public int freeBytes() {
}

public int pinnedBytes() {
synchronized (arena) {
return pinnedBytes;
}
return pinnedBytes.get();
}

@Override
Expand Down
3 changes: 3 additions & 0 deletions buffer/src/main/java/io/netty/buffer/PooledByteBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ private void init0(PoolChunk<T> chunk, ByteBuffer nioBuffer,
assert !PoolChunk.isSubpage(handle) || chunk.arena.size2SizeIdx(maxLength) <= chunk.arena.smallMaxSizeIdx:
"Allocated small sub-page handle for a buffer size that isn't \"small.\"";

chunk.incrementPinnedMemory(maxLength);
this.chunk = chunk;
memory = chunk.memory;
tmpNioBuf = nioBuffer;
Expand Down Expand Up @@ -117,6 +118,7 @@ public final ByteBuf capacity(int newCapacity) {
}

// Reallocation required.
chunk.decrementPinnedMemory(maxLength);
chunk.arena.reallocate(this, newCapacity, true);
return this;
}
Expand Down Expand Up @@ -170,6 +172,7 @@ protected final void deallocate() {
final long handle = this.handle;
this.handle = -1;
memory = null;
chunk.decrementPinnedMemory(maxLength);
chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
tmpNioBuf = null;
chunk = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import org.junit.jupiter.api.Timeout;
Expand Down Expand Up @@ -821,4 +823,92 @@ private void testUsedHeapMemory(int initialCapacity) {
trimCaches(allocator);
assertEquals(0, allocator.pinnedDirectMemory());
}

@Test
public void pinnedMemoryMustReflectBuffersInUseWithThreadLocalCaching() {
pinnedMemoryMustReflectBuffersInUse(true);
}

@Test
public void pinnedMemoryMustReflectBuffersInUseWithoutThreadLocalCaching() {
pinnedMemoryMustReflectBuffersInUse(false);
}

private static void pinnedMemoryMustReflectBuffersInUse(boolean useThreadLocalCaching) {
int smallCacheSize;
int normalCacheSize;
if (useThreadLocalCaching) {
smallCacheSize = PooledByteBufAllocator.defaultSmallCacheSize();
normalCacheSize = PooledByteBufAllocator.defaultNormalCacheSize();
} else {
smallCacheSize = 0;
normalCacheSize = 0;
}
int directMemoryCacheAlignment = 0;
PooledByteBufAllocator alloc = new PooledByteBufAllocator(
PooledByteBufAllocator.defaultPreferDirect(),
PooledByteBufAllocator.defaultNumHeapArena(),
PooledByteBufAllocator.defaultNumDirectArena(),
PooledByteBufAllocator.defaultPageSize(),
PooledByteBufAllocator.defaultMaxOrder(),
smallCacheSize,
normalCacheSize,
useThreadLocalCaching,
directMemoryCacheAlignment);
PooledByteBufAllocatorMetric metric = alloc.metric();
AtomicLong capSum = new AtomicLong();

for (long index = 0; index < 10000; index++) {
ThreadLocalRandom rnd = ThreadLocalRandom.current();
int bufCount = rnd.nextInt(1, 100);
List<ByteBuf> buffers = new ArrayList<ByteBuf>(bufCount);

if (index % 2 == 0) {
// ensure that we allocate a small buffer
for (int i = 0; i < bufCount; i++) {
ByteBuf buf = alloc.directBuffer(rnd.nextInt(8, 128));
buffers.add(buf);
capSum.addAndGet(buf.capacity());
}
} else {
// allocate a larger buffer
for (int i = 0; i < bufCount; i++) {
ByteBuf buf = alloc.directBuffer(rnd.nextInt(1024, 1024 * 100));
buffers.add(buf);
capSum.addAndGet(buf.capacity());
}
}

if (index % 100 == 0) {
long used = usedMemory(metric.directArenas());
long pinned = alloc.pinnedDirectMemory();
assertThat(capSum.get()).isLessThanOrEqualTo(pinned);
assertThat(pinned).isLessThanOrEqualTo(used);
}

for (ByteBuf buffer : buffers) {
buffer.release();
}
capSum.set(0);
// After releasing all buffers, pinned memory must be zero
assertThat(alloc.pinnedDirectMemory()).isZero();
}
}

/**
* Returns an estimate of bytes used by currently in-use buffers
*/
private static long usedMemory(List<PoolArenaMetric> arenas) {
long totalUsed = 0;
for (PoolArenaMetric arenaMetrics : arenas) {
for (PoolChunkListMetric arenaMetric : arenaMetrics.chunkLists()) {
for (PoolChunkMetric chunkMetric : arenaMetric) {
// chunkMetric.chunkSize() returns maximum of bytes that can be served out of the chunk
// and chunkMetric.freeBytes() returns the bytes that are not yet allocated by in-use buffers
totalUsed += chunkMetric.chunkSize() - chunkMetric.freeBytes();
}
}
}
return totalUsed;
}
}

0 comments on commit 3324707

Please sign in to comment.