Skip to content

Commit

Permalink
(feat) batch putIfAbsent impl (sofastack#201)
Browse files Browse the repository at this point in the history
* put if absent

put if absent

* putIfAbsent test update
  • Loading branch information
SteNicholas authored and fengjiachun committed Jul 6, 2019
1 parent fcd642b commit 991b943
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -937,8 +937,9 @@ public Boolean bCompareAndPut(final String key, final byte[] expect, final byte[
return FutureHelper.get(compareAndPut(key, expect, update), this.futureTimeoutMillis);
}

private void internalCompareAndPut(final byte[] key, final byte[] expect, final byte[] update, final CompletableFuture<Boolean> future,
final int retriesLeft, final Errors lastCause) {
private void internalCompareAndPut(final byte[] key, final byte[] expect, final byte[] update,
final CompletableFuture<Boolean> future, final int retriesLeft,
final Errors lastCause) {
final Region region = this.pdClient.findRegionByKey(key, ErrorsHelper.isInvalidEpoch(lastCause));
final RegionEngine regionEngine = getRegionEngine(region.getId(), true);
final RetryRunner retryRunner = retryCause -> internalCompareAndPut(key, expect, update, future, retriesLeft - 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,13 +574,15 @@ public void batchCompareAndPut(final KVStateOutputList kvStates) {
setData(kvState.getDone(), Boolean.FALSE);
}
}
this.db.write(this.writeOptions, batch);
if(batch.count() > 0) {
this.db.write(this.writeOptions, batch);
}
for (final KVState kvState : segment) {
setSuccess(kvState.getDone(), getData(kvState.getDone()));
}
} catch (final Exception e) {
LOG.error("Failed to [BATCH_COMPARE_PUT], [size = {}] {}.", segment.size(),
StackTraceUtil.stackTrace(e));
StackTraceUtil.stackTrace(e));
setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_COMPARE_PUT]", e);
}
return null;
Expand Down Expand Up @@ -684,6 +686,57 @@ public void putIfAbsent(final byte[] key, final byte[] value, final KVStoreClosu
}
}

@Override
public void batchPutIfAbsent(final KVStateOutputList kvStates) {
if (kvStates.isSingletonList()) {
final KVState kvState = kvStates.getSingletonElement();
final KVOperation op = kvState.getOp();
putIfAbsent(op.getKey(), op.getValue(), kvState.getDone());
return;
}
final Timer.Context timeCtx = getTimeContext("BATCH_PUT_IF_ABSENT");
final Lock readLock = this.readWriteLock.readLock();
readLock.lock();
try {
Partitions.manyToOne(kvStates, MAX_BATCH_WRITE_SIZE, (Function<List<KVState>, Void>) segment -> {
try (final WriteBatch batch = new WriteBatch()) {
final List<byte[]> keys = Lists.newArrayListWithCapacity(segment.size());
final Map<byte[], byte[]> values = Maps.newHashMapWithExpectedSize(segment.size());
for (final KVState kvState : segment) {
final KVOperation op = kvState.getOp();
final byte[] key = op.getKey();
final byte[] value = op.getValue();
keys.add(key);
values.put(key, value);
}
final Map<byte[], byte[]> prevValMap = this.db.multiGet(keys);
for (final KVState kvState : segment) {
final byte[] key = kvState.getOp().getKey();
final byte[] prevVal = prevValMap.get(key);
if(prevVal == null) {
batch.put(key, values.get(key));
}
setData(kvState.getDone(), prevVal);
}
if(batch.count() > 0) {
this.db.write(this.writeOptions, batch);
}
for (final KVState kvState : segment) {
setSuccess(kvState.getDone(), getData(kvState.getDone()));
}
} catch (final Exception e) {
LOG.error("Failed to [BATCH_PUT_IF_ABSENT], [size = {}] {}.", segment.size(),
StackTraceUtil.stackTrace(e));
setCriticalError(Lists.transform(kvStates, KVState::getDone), "Fail to [BATCH_PUT_IF_ABSENT]", e);
}
return null;
});
} finally {
readLock.unlock();
timeCtx.stop();
}
}

@Override
public void tryLockWith(final byte[] key, final byte[] fencingKey, final boolean keepLease,
final DistributedLock.Acquirer acquirer, final KVStoreClosure closure) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import com.alipay.sofa.jraft.rhea.storage.BaseKVStoreClosure;
import com.alipay.sofa.jraft.rhea.storage.KVEntry;
import com.alipay.sofa.jraft.rhea.storage.KVIterator;
import com.alipay.sofa.jraft.rhea.storage.KVOperation;
import com.alipay.sofa.jraft.rhea.storage.KVState;
import com.alipay.sofa.jraft.rhea.storage.KVStateOutputList;
import com.alipay.sofa.jraft.rhea.storage.KVStoreAccessHelper;
import com.alipay.sofa.jraft.rhea.storage.KVStoreClosure;
import com.alipay.sofa.jraft.rhea.storage.LocalLock;
Expand All @@ -60,6 +63,7 @@
import static com.alipay.sofa.jraft.rhea.KeyValueTool.makeValue;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
Expand Down Expand Up @@ -335,10 +339,10 @@ public void run(Status status) {
@Test
public void compareAndPutTest() {
final byte[] key = makeKey("put_test");
byte[] value = makeValue("put_test_value");
final byte[] value = makeValue("put_test_value");
this.kvStore.put(key, value, null);

byte[] update = makeValue("put_test_update");
final byte[] update = makeValue("put_test_update");
KVStoreClosure kvStoreClosure = new BaseKVStoreClosure() {
@Override
public void run(Status status) {
Expand All @@ -359,6 +363,53 @@ public void execute(RawKVStore kvStore, KVStoreClosure closure) {
assertEquals(kvStoreClosure.getData(), Boolean.FALSE);
}

/**
* Test method: {@link RocksRawKVStore#batchCompareAndPut(KVStateOutputList)}
*/
@Test
public void batchCompareAndPutTest() {
final KVStateOutputList kvStates = KVStateOutputList.newInstance();
final int batchWriteSize = RocksRawKVStore.MAX_BATCH_WRITE_SIZE + 1;
for(int i = 1; i <= batchWriteSize; i++) {
final byte[] key = makeKey("put_test" + i);
final byte[] value = makeValue("put_test_value" + i);
kvStates.add(KVState.of(KVOperation.createPut(key, value), null));
}
this.kvStore.batchPut(kvStates);
kvStates.clear();

for(int i = 1; i <= batchWriteSize; i++) {
final byte[] key = makeKey("put_test" + i);
final byte[] value = makeValue("put_test_value" + i);
final byte[] update = makeValue("put_test_update" + i);
KVStoreClosure kvStoreClosure = new BaseKVStoreClosure() {
@Override
public void run(Status status) {
assertEquals(status, Status.OK());
}
};
kvStates.add(KVState.of(KVOperation.createCompareAndPut(key, update, value), kvStoreClosure));
}
this.kvStore.batchCompareAndPut(kvStates);
kvStates.forEach(kvState -> assertEquals(kvState.getDone().getData(), Boolean.FALSE));
kvStates.clear();

for(int i = 1; i <= batchWriteSize; i++) {
final byte[] key = makeKey("put_test" + i);
final byte[] value = makeValue("put_test_value" + i);
final byte[] update = makeValue("put_test_update" + i);
KVStoreClosure kvStoreClosure = new BaseKVStoreClosure() {
@Override
public void run(Status status) {
assertEquals(status, Status.OK());
}
};
kvStates.add(KVState.of(KVOperation.createCompareAndPut(key, value, update), kvStoreClosure));
}
this.kvStore.batchCompareAndPut(kvStates);
kvStates.forEach(kvState -> assertEquals(kvState.getDone().getData(), Boolean.TRUE));
}

/**
* Test method: {@link RocksRawKVStore#merge(byte[], byte[], KVStoreClosure)}
*/
Expand Down Expand Up @@ -411,7 +462,7 @@ public void execute(RawKVStore kvStore, KVStoreClosure closure) {
* Test method: {@link RocksRawKVStore#putIfAbsent(byte[], byte[], KVStoreClosure)}
*/
@Test
public void putIfAbsent() {
public void putIfAbsentTest() {
byte[] key = makeKey("put_if_absent_test");
byte[] value = makeValue("put_if_absent_test_value");
TestClosure closure = new TestClosure();
Expand All @@ -421,6 +472,43 @@ public void putIfAbsent() {
assertArrayEquals(value, (byte[]) closure.getData());
}

/**
* Test method: {@link RocksRawKVStore#batchPutIfAbsent(KVStateOutputList)}
*/
@Test
public void batchPutIfAbsentTest() {
final KVStateOutputList kvStates = KVStateOutputList.newInstance();
final int batchWriteSize = RocksRawKVStore.MAX_BATCH_WRITE_SIZE + 1;
for(int i = 1; i <= batchWriteSize; i++) {
final byte[] key = makeKey("put_test" + i);
final byte[] value = makeValue("put_test_value" + i);
KVStoreClosure kvStoreClosure = new BaseKVStoreClosure() {
@Override
public void run(Status status) {
assertEquals(status, Status.OK());
}
};
kvStates.add(KVState.of(KVOperation.createPutIfAbsent(key, value), kvStoreClosure));
}
this.kvStore.batchPutIfAbsent(kvStates);
kvStates.forEach(kvState -> assertNull(kvState.getDone().getData()));
kvStates.clear();

for(int i = 1; i <= batchWriteSize; i++) {
final byte[] key = makeKey("put_test" + i);
final byte[] value = makeValue("put_test_value" + i);
KVStoreClosure kvStoreClosure = new BaseKVStoreClosure() {
@Override
public void run(Status status) {
assertEquals(status, Status.OK());
}
};
kvStates.add(KVState.of(KVOperation.createPutIfAbsent(key, value), kvStoreClosure));
}
this.kvStore.batchPutIfAbsent(kvStates);
kvStates.forEach(kvState -> assertArrayEquals(kvState.getOp().getValue(), (byte[]) kvState.getDone().getData()));
}

/**
* Test method: {@link RocksRawKVStore#tryLockWith(byte[], byte[], boolean, DistributedLock.Acquirer, KVStoreClosure)}
*/
Expand All @@ -436,7 +524,7 @@ public void tryLockWith() throws InterruptedException {
new Thread(() -> {
final DistributedLock<byte[]> lock2 = new LocalLock(lockKey, 3, TimeUnit.SECONDS, this.kvStore);
try {
assertTrue(!lock2.tryLock());
assertFalse(lock2.tryLock());
} finally {
latch.countDown();
}
Expand Down

0 comments on commit 991b943

Please sign in to comment.