Skip to content

Commit

Permalink
Let PoolThreadCache work even if allocation and deallocation Thread a…
Browse files Browse the repository at this point in the history
…re different

Motivation:

PoolThreadCache did only cache allocations if the allocation and deallocation Thread were the same. This is not optimal as often people write from differen thread then the actual EventLoop thread.

Modification:

- Add MpscArrayQueue which was forked from jctools and lightly modified.
- Use MpscArrayQueue for caches and always add buffer back to the cache that belongs to the allocation thread.

Result:

ThreadPoolCache is now also usable and so gives performance improvements when allocation and deallocation thread are different.

Performance when using same thread for allocation and deallocation is noticable worse then before.
  • Loading branch information
normanmaurer committed May 27, 2015
1 parent bac2e3a commit 81fee66
Show file tree
Hide file tree
Showing 11 changed files with 825 additions and 113 deletions.
8 changes: 8 additions & 0 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,14 @@ a constant-size alphabet written by Yuta Mori. It can be obtained at:
* HOMEPAGE:
* https://code.google.com/p/libdivsufsort/

This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM,
which can be obtained at:

* LICENSE:
* license/LICENSE.jctools.txt (ASL2 License)
* HOMEPAGE:
* https://github.com/JCTools/JCTools

This product optionally depends on 'JZlib', a re-implementation of zlib in
pure Java, which can be obtained at:

Expand Down
14 changes: 6 additions & 8 deletions buffer/src/main/java/io/netty/buffer/PoolArena.java
Original file line number Diff line number Diff line change
Expand Up @@ -237,19 +237,17 @@ private void allocateHuge(PooledByteBuf<T> buf, int reqCapacity) {
buf.initUnpooled(newUnpooledChunk(reqCapacity), reqCapacity);
}

void free(PoolChunk<T> chunk, long handle, int normCapacity, boolean sameThreads) {
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
allocationsHuge.decrement();
destroyChunk(chunk);
} else {
SizeClass sizeClass = sizeClass(normCapacity);
if (sameThreads) {
PoolThreadCache cache = parent.threadCache();
if (cache.add(this, chunk, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}

freeChunk(chunk, handle, sizeClass);
}
}
Expand Down Expand Up @@ -378,7 +376,7 @@ void reallocate(PooledByteBuf<T> buf, int newCapacity, boolean freeOldMemory) {
buf.setIndex(readerIndex, writerIndex);

if (freeOldMemory) {
free(oldChunk, oldHandle, oldMaxLength, buf.initThread == Thread.currentThread());
free(oldChunk, oldHandle, oldMaxLength, buf.cache);
}
}

Expand Down
6 changes: 4 additions & 2 deletions buffer/src/main/java/io/netty/buffer/PoolChunk.java
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
if (bitmapIdx == 0) {
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx));
buf.init(this, handle, runOffset(memoryMapIdx), reqCapacity, runLength(memoryMapIdx),
arena.parent.threadCache());
} else {
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
}
Expand All @@ -379,7 +380,8 @@ private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx

buf.init(
this, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize);
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize, reqCapacity, subpage.elemSize,
arena.parent.threadCache());
}

private byte value(int id) {
Expand Down
162 changes: 69 additions & 93 deletions buffer/src/main/java/io/netty/buffer/PoolThreadCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,15 @@


import io.netty.buffer.PoolArena.SizeClass;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ThreadDeathWatcher;
import io.netty.util.internal.PlatformDependent;
import io.netty.util.internal.logging.InternalLogger;
import io.netty.util.internal.logging.InternalLoggerFactory;

import java.nio.ByteBuffer;
import java.util.Queue;

/**
* Acts a Thread cache for allocations. This implementation is moduled after
Expand Down Expand Up @@ -116,11 +120,11 @@ public void run() {
ThreadDeathWatcher.watch(thread, freeTask);
}

private static <T> SubPageMemoryRegionCache<T>[] createSubPageCaches(
private static <T> MemoryRegionCache<T>[] createSubPageCaches(
int cacheSize, int numCaches, SizeClass sizeClass) {
if (cacheSize > 0) {
@SuppressWarnings("unchecked")
SubPageMemoryRegionCache<T>[] cache = new SubPageMemoryRegionCache[numCaches];
MemoryRegionCache<T>[] cache = new MemoryRegionCache[numCaches];
for (int i = 0; i < cache.length; i++) {
// TODO: maybe use cacheSize / cache.length
cache[i] = new SubPageMemoryRegionCache<T>(cacheSize, sizeClass);
Expand All @@ -131,14 +135,14 @@ private static <T> SubPageMemoryRegionCache<T>[] createSubPageCaches(
}
}

private static <T> NormalMemoryRegionCache<T>[] createNormalCaches(
private static <T> MemoryRegionCache<T>[] createNormalCaches(
int cacheSize, int maxCachedBufferCapacity, PoolArena<T> area) {
if (cacheSize > 0) {
int max = Math.min(area.chunkSize, maxCachedBufferCapacity);
int arraySize = Math.max(1, log2(max / area.pageSize) + 1);

@SuppressWarnings("unchecked")
NormalMemoryRegionCache<T>[] cache = new NormalMemoryRegionCache[arraySize];
MemoryRegionCache<T>[] cache = new MemoryRegionCache[arraySize];
for (int i = 0; i < cache.length; i++) {
cache[i] = new NormalMemoryRegionCache<T>(cacheSize);
}
Expand Down Expand Up @@ -345,28 +349,15 @@ protected void initBuf(
}
}

/**
* Cache of {@link PoolChunk} and handles which can be used to allocate a buffer without locking at all.
*
* The {@link MemoryRegionCache} uses a LIFO implementation as this way it is more likely that the
* cached memory is still in the loaded cache-line and so no new read must happen (compared to FIFO).
*/
private abstract static class MemoryRegionCache<T> {
private final Entry<T>[] entries;
private final int size;
private final Queue<Entry<T>> queue;
private final SizeClass sizeClass;
private final int maxUnusedCached;
private int head;
private int tail;
private int maxEntriesInUse;
private int entriesInUse;
private int allocations;

@SuppressWarnings("unchecked")
MemoryRegionCache(int size, SizeClass sizeClass) {
entries = new Entry[powerOfTwo(size)];
for (int i = 0; i < entries.length; i++) {
entries[i] = new Entry<T>();
}
maxUnusedCached = size / 2;
this.size = powerOfTwo(size);
queue = PlatformDependent.newFixedMpscQueue(this.size);
this.sizeClass = sizeClass;
}

Expand All @@ -393,115 +384,100 @@ protected abstract void initBuf(PoolChunk<T> chunk, long handle,
/**
* Add to cache if not already full.
*/
public boolean add(PoolChunk<T> chunk, long handle) {
Entry<T> entry = entries[tail];
if (entry.chunk != null) {
// cache is full
return false;
}
entriesInUse --;

entry.chunk = chunk;
entry.handle = handle;
tail = nextIdx(tail);
return true;
@SuppressWarnings("unchecked")
public final boolean add(PoolChunk<T> chunk, long handle) {
return queue.offer(newEntry(chunk, handle));
}

/**
* Allocate something out of the cache if possible and remove the entry from the cache.
*/
public boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
int index = prevIdx(tail);
Entry<T> entry = entries[index];
if (entry.chunk == null) {
public final boolean allocate(PooledByteBuf<T> buf, int reqCapacity) {
Entry<T> entry = queue.poll();
if (entry == null) {
return false;
}

entriesInUse ++;
if (maxEntriesInUse < entriesInUse) {
maxEntriesInUse = entriesInUse;
}
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
// only null out the chunk as we only use the chunk to check if the buffer is full or not.
entry.chunk = null;
tail = index;

// allocations is not thread-safe which is fine as this is only called from the same thread all time.
++ allocations;
return true;
}

/**
* Clear out this cache and free up all previous cached {@link PoolChunk}s and {@code handle}s.
*/
public int free() {
public final int free() {
return free(Integer.MAX_VALUE);
}

private int free(int max) {
int numFreed = 0;
entriesInUse = 0;
maxEntriesInUse = 0;
for (int i = head;; i = nextIdx(i)) {
if (freeEntry(entries[i])) {
numFreed++;
for (; numFreed < max; numFreed++) {
Entry<T> entry = queue.poll();
if (entry != null) {
freeEntry(entry);
} else {
// all cleared
return numFreed;
}
}
return numFreed;
}

/**
* Free up cached {@link PoolChunk}s if not allocated frequently enough.
*/
private void trim() {
int free = size() - maxEntriesInUse;
entriesInUse = 0;
maxEntriesInUse = 0;

if (free <= maxUnusedCached) {
return;
}
public final void trim() {
int free = size - allocations;
allocations = 0;

int i = head;
for (; free > 0; free--) {
if (!freeEntry(entries[i])) {
// all freed
break;
}
i = nextIdx(i);
// We not even allocated all the number that are
if (free > 0) {
free(free);
}

// Update head to point to te correct entry
// See https://github.com/netty/netty/issues/2924
head = i;
}

@SuppressWarnings({ "unchecked", "rawtypes" })
private boolean freeEntry(Entry entry) {
private void freeEntry(Entry entry) {
PoolChunk chunk = entry.chunk;
if (chunk == null) {
return false;
}

// recycle now so PoolChunk can be GC'ed.
entry.recycle();

chunk.arena.freeChunk(chunk, entry.handle, sizeClass);
entry.chunk = null;
return true;
}

/**
* Return the number of cached entries.
*/
private int size() {
return tail - head & entries.length - 1;
}
static final class Entry<T> {
final Handle recyclerHandle;
PoolChunk<T> chunk;
long handle = -1;

private int nextIdx(int index) {
// use bitwise operation as this is faster as using modulo.
return index + 1 & entries.length - 1;
}
Entry(Handle recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}

private int prevIdx(int index) {
// use bitwise operation as this is faster as using modulo.
return index - 1 & entries.length - 1;
void recycle() {
chunk = null;
handle = -1;
RECYCLER.recycle(this, recyclerHandle);
}
}

private static final class Entry<T> {
PoolChunk<T> chunk;
long handle;
@SuppressWarnings("rawtypes")
private static Entry newEntry(PoolChunk<?> chunk, long handle) {
Entry entry = RECYCLER.get();
entry.chunk = chunk;
entry.handle = handle;
return entry;
}

@SuppressWarnings("rawtypes")
private static final Recycler<Entry> RECYCLER = new Recycler<Entry>() {
@Override
protected Entry newObject(Handle handle) {
return new Entry(handle);
}
};
}
}
12 changes: 5 additions & 7 deletions buffer/src/main/java/io/netty/buffer/PooledByteBuf.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
protected int offset;
protected int length;
int maxLength;
Thread initThread;
PoolThreadCache cache;
private ByteBuffer tmpNioBuf;

@SuppressWarnings("unchecked")
Expand All @@ -41,7 +41,7 @@ protected PooledByteBuf(Recycler.Handle<? extends PooledByteBuf<T>> recyclerHand
this.recyclerHandle = (Handle<PooledByteBuf<T>>) recyclerHandle;
}

void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength) {
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;

Expand All @@ -54,7 +54,7 @@ void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength
setIndex(0, 0);
discardMarks();
tmpNioBuf = null;
initThread = Thread.currentThread();
this.cache = cache;
}

void initUnpooled(PoolChunk<T> chunk, int length) {
Expand All @@ -67,7 +67,7 @@ void initUnpooled(PoolChunk<T> chunk, int length) {
this.length = maxLength = length;
setIndex(0, 0);
tmpNioBuf = null;
initThread = Thread.currentThread();
cache = null;
}

@Override
Expand Down Expand Up @@ -155,9 +155,7 @@ protected final void deallocate() {
final long handle = this.handle;
this.handle = -1;
memory = null;
boolean sameThread = initThread == Thread.currentThread();
initThread = null;
chunk.arena.free(chunk, handle, maxLength, sameThread);
chunk.arena.free(chunk, handle, maxLength, cache);
recycle();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.netty.buffer;

import io.netty.buffer.PooledByteBufAllocator.PoolThreadLocalCache;
import io.netty.util.Recycler;
import io.netty.util.internal.PlatformDependent;

Expand Down Expand Up @@ -53,8 +54,9 @@ private PooledUnsafeDirectByteBuf(Recycler.Handle<PooledUnsafeDirectByteBuf> rec
}

@Override
void init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength) {
super.init(chunk, handle, offset, length, maxLength);
void init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength,
PoolThreadCache cache) {
super.init(chunk, handle, offset, length, maxLength, cache);
initMemoryAddress();
}

Expand Down
Loading

0 comments on commit 81fee66

Please sign in to comment.