Skip to content

Commit

Permalink
Feat: use AtomicIntHashCounter in AccurateRecorder to reduce memory f…
Browse files Browse the repository at this point in the history
…ootprint
  • Loading branch information
LinShunKang committed Mar 20, 2022
1 parent 5338541 commit fa52668
Show file tree
Hide file tree
Showing 10 changed files with 282 additions and 92 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cn.myperf4j.base.util.concurrent;

import cn.myperf4j.base.buffer.IntBuf;
import cn.myperf4j.base.util.UnsafeUtils;
import sun.misc.Unsafe;

Expand Down Expand Up @@ -96,17 +97,7 @@ public int getAndIncrement(int i) {
* @return the previous value
*/
public int getAndAdd(int i, int delta) {
final long offset = checkedByteOffset(i);
while (true) {
int current = getRaw(offset);
if (compareAndSetRaw(offset, current, current + delta)) {
return current;
}
}
}

private boolean compareAndSetRaw(long offset, int expect, int update) {
return unsafe.compareAndSwapInt(array, offset, expect, update);
return unsafe.getAndAddInt(array, checkedByteOffset(i), delta);
}

/**
Expand All @@ -127,18 +118,23 @@ public int incrementAndGet(int i) {
* @return the updated value
*/
public int addAndGet(int i, int delta) {
final long offset = checkedByteOffset(i);
while (true) {
int current = getRaw(offset);
int next = current + delta;
if (compareAndSetRaw(offset, current, next)) {
return next;
}
}
return getAndAdd(i, delta) + delta;
}

public void reset() {
final int[] array = this.array;
unsafe.setMemory(array, byteOffset(0), (long) array.length * scale, (byte) 0);
}

public long fillSortedKvs(IntBuf intBuf) {
long totalCount = 0L;
for (int i = 0, len = array.length; i < len; ++i) {
final int count = get(i);
if (count > 0) {
intBuf.write(i, count);
totalCount += count;
}
}
return totalCount;
}
}
Original file line number Diff line number Diff line change
@@ -1,17 +1,69 @@
package cn.myperf4j.base.util.concurrent;

import cn.myperf4j.base.buffer.IntBuf;

/**
* Created by LinShunkang on 2022/03/19
*/
public interface AtomicIntHashCounter {

/**
* Get the specified value of the specified key.
*
* @param key the key
* @return the specified value of the specified key, or 0 if this map contains no mapping for the key
*/
int get(int key);

/**
* Atomically increments the specified value of the specified key by one.
*
* @param key the key
* @return the previous value. Negative return indicates that update failure.
*/
int getAndIncrement(int key);

/**
* Atomically adds the given value of the specified key to the specified value.
*
* @param key the key
* @return the previous value. Negative return indicates that update failure.
*/
int getAndAdd(int key, int delta);

/**
* Atomically increments the specified value of the specified key by one.
*
* @param key the key
* @return the updated value
*/
int incrementAndGet(int key);

/**
* Atomically adds the given value of the specified key to the specified value.
*
* @param key the key
* @return the updated value. Non-positive number return indicates that update failure.
*/
int addAndGet(int key, int delta);

/**
* Returns the number of key-value mappings in this map.
*
* @return the number of key-value mappings in this map
*/
int size();

/**
* Removes all the mappings from this map (optional operation).
*/
void reset();

/**
* Write sorted key-value into intBuf, order by key.
*
* @param intBuf the IntBuf
* @return the total values writes into intBuf
*/
long fillSortedKvs(IntBuf intBuf);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package cn.myperf4j.base.util.concurrent;

import cn.myperf4j.base.buffer.IntBuf;
import cn.myperf4j.base.util.UnsafeUtils;
import sun.misc.Unsafe;

Expand Down Expand Up @@ -120,19 +121,19 @@ public int get(final int key) {
}

@Override
public int incrementAndGet(final int key) {
return addAndGet(key, 1);
public int getAndIncrement(int key) {
return getAndAdd(key, 1);
}

@Override
public int addAndGet(final int key, final int delta) {
public int getAndAdd(int key, int delta) {
if (delta == 0) {
return get(key);
}
return addAndGet0(this.array, key, delta);
return getAndAdd0(this.array, key, delta);
}

private int addAndGet0(final int[] array, final int key, final int delta) {
private int getAndAdd0(final int[] array, final int key, final int delta) {
final int mask = array.length - 1;
final int startIdx = hashIdx(key, mask);
int idx = startIdx;
Expand All @@ -143,17 +144,17 @@ private int addAndGet0(final int[] array, final int key, final int delta) {
if ((kv = getLongRaw(array, kOffset)) == 0L) { //try set
if (unsafe.compareAndSwapLong(array, kOffset, 0L, getKvLong(key, delta))) {
SIZE_UPDATER.incrementAndGet(this);
return delta;
return 0;
} else if ((int) (kv = getLongRaw(array, kOffset)) == key) {
if (tryAddLong(array, kOffset, key, delta)) {
return getValue(kv) + delta;
return getValue(kv);
} else {
throw new IllegalStateException("this should not have happened1!");
}
}
} else if (getKey(kv) == key) { //increase
if (tryAddLong(array, kOffset, key, delta)) {
return getValue(kv) + delta;
return getValue(kv);
} else {
throw new IllegalStateException("this should not have happened2!");
}
Expand All @@ -170,7 +171,7 @@ private boolean tryAddLong(final int[] array, final long byteOffset, final int k
while (true) {
final long current = getLongRaw(array, byteOffset);
final int value = getValue(current);
if (key != getKey(current) || value == 0) {
if (key != getKey(current)/* || value <= 0*/) {
return false;
}

Expand All @@ -181,6 +182,16 @@ private boolean tryAddLong(final int[] array, final long byteOffset, final int k
}
}

@Override
public int incrementAndGet(final int key) {
return getAndAdd(key, 1) + 1;
}

@Override
public int addAndGet(final int key, final int delta) {
return getAndAdd(key, delta) + delta;
}

@Override
public int size() {
return size;
Expand All @@ -193,6 +204,38 @@ public void reset() {
this.size = 0;
}

@Override
public long fillSortedKvs(IntBuf intBuf) {
long totalCount = 0L;
final int offset = intBuf.writerIndex();
final int[] array = this.array;
for (int k = 0, len = array.length; k < len; k += 2) {
final long kvLong = getLongRaw(array, byteOffset(k, len - 1));
final int key = getKey(kvLong);
final int value = getValue(kvLong);
if (value > 0) {
intBuf.write(key);
totalCount += value;
}
}

if (offset == intBuf.writerIndex()) {
return 0;
}

final int writerIndex = intBuf.writerIndex();
Arrays.sort(intBuf._buf(), offset, writerIndex);

for (int i = writerIndex - 1; i >= offset; --i) {
final int key = intBuf.getInt(i);
final int keyIdx = (i << 1) - offset; //2 * (i - offset) + offset
intBuf.setInt(keyIdx, key);
intBuf.setInt(keyIdx + 1, get(key));
}
intBuf.writerIndex((writerIndex << 1) - offset); //writerIndex + (writerIndex - offset)
return totalCount;
}

@Override
public String toString() {
return "FixedAtomicIntCounter{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ public void testSimpleIncrease() {
Assert.assertEquals(1, intCounter.get(1));
Assert.assertEquals(3, intCounter.addAndGet(1, 2));
Assert.assertEquals(3, intCounter.get(1));
Assert.assertEquals(3, intCounter.getAndIncrement(1));
Assert.assertEquals(4, intCounter.get(1));
Assert.assertEquals(4, intCounter.getAndAdd(1, 2));
Assert.assertEquals(6, intCounter.get(1));

Assert.assertEquals(1, intCounter.size());
intCounter.reset();
Expand All @@ -42,7 +46,7 @@ public void testIncreaseFailure() {
Assert.assertEquals(8, intMap.size());

for (int i = 8; i < 1024; i++) {
Assert.assertEquals(-1, intMap.incrementAndGet(i));
Assert.assertEquals(0, intMap.incrementAndGet(i));
}
Assert.assertEquals(8, intMap.size());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package cn.myperf4j.bench;
package cn.myperf4j.bench.util.concurrent;

import cn.myperf4j.base.util.concurrent.AtomicIntArray;
import org.openjdk.jmh.annotations.Benchmark;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package cn.myperf4j.bench.util.concurrent;

import cn.myperf4j.base.util.concurrent.AtomicIntArray;
import cn.myperf4j.base.util.concurrent.AtomicIntHashCounter;
import cn.myperf4j.base.util.concurrent.FixedAtomicIntHashCounter;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
* Created by LinShunkang on 2022/03/20
*/
@BenchmarkMode({Mode.Throughput})
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@State(Scope.Thread)
public class AtomicIntHashCounterBench {

private AtomicIntHashCounter intHashCounter;

private AtomicIntArray intArray;

private ConcurrentMap<Integer, AtomicInteger> integerMap;

private final int[] keys = {0, 8, 16, 24, 32, 40, 48, 56};

@Setup
public void setup() {
intHashCounter = new FixedAtomicIntHashCounter(128);
intArray = new AtomicIntArray(128);
integerMap = new ConcurrentHashMap<>(128);
}

@Benchmark
public int intHashCounter() {
return intHashCounter.incrementAndGet(64);
}

// @Benchmark
public int intArray() {
return intArray.incrementAndGet(64);
}

@Benchmark
public int integerMap() {
return increase(integerMap, 64);
}

private int increase(ConcurrentMap<Integer, AtomicInteger> integerHashMap, int k) {
final AtomicInteger count = integerHashMap.get(k);
if (count != null) {
return count.incrementAndGet();
}

final AtomicInteger oldCounter = integerHashMap.putIfAbsent(k, new AtomicInteger(1));
if (oldCounter != null) {
return oldCounter.incrementAndGet();
}
return 0;
}

public static void main(String[] args) throws RunnerException {
// 使用一个单独进程执行测试,执行3遍warmup,然后执行5遍测试
final Options opt = new OptionsBuilder()
.include(AtomicIntHashCounterBench.class.getSimpleName())
.forks(2)
.threads(Runtime.getRuntime().availableProcessors() / 2 + 1)
.warmupIterations(3)
.measurementIterations(5)
.build();
new Runner(opt).run();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@ private MethodMetricsCalculator() {
public static MethodMetrics calMetrics(Recorder recorder, MethodTag methodTag, long startTime, long stopTime) {
IntBuf intBuf = null;
try {
int diffCount = recorder.getDiffCount();
final int diffCount = recorder.getDiffCount();
intBuf = intBufPool.acquire(diffCount << 1);
long totalCount = recorder.fillSortedRecords(intBuf);
final long totalCount = recorder.fillSortedRecords(intBuf);
return calMetrics(recorder, methodTag, startTime, stopTime, intBuf, totalCount, diffCount);
} catch (Exception e) {
Logger.error("MethodMetricsCalculator.calMetrics(" + recorder + ", " + methodTag + ", "
Expand Down Expand Up @@ -64,8 +64,8 @@ private static MethodMetrics calMetrics(Recorder recorder,
double sigma = 0.0D; //∑
long totalTime = 0L;
for (int i = 0, writerIdx = sortedRecords.writerIndex(); i < writerIdx; ) {
int timeCost = sortedRecords._getInt(i++);
int count = sortedRecords._getInt(i++);
final int timeCost = sortedRecords._getInt(i++);
final int count = sortedRecords._getInt(i++);

totalTime += (long) timeCost * count;
countMile += count;
Expand Down
Loading

0 comments on commit fa52668

Please sign in to comment.