Skip to content

Commit

Permalink
minor change for 'reverse scan' cr (sofastack#429)
Browse files Browse the repository at this point in the history
  • Loading branch information
fengjiachun authored Apr 29, 2020
1 parent 5c9bfe3 commit 0489220
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,7 @@ public void handleScanRequest(final ScanRequest request,
response.setRegionEpoch(getRegionEpoch());
try {
KVParameterRequires.requireSameEpoch(request, getRegionEpoch());
BaseKVStoreClosure KVStoreClosure = new BaseKVStoreClosure() {
final BaseKVStoreClosure kvStoreClosure = new BaseKVStoreClosure() {

@SuppressWarnings("unchecked")
@Override
Expand All @@ -470,10 +470,10 @@ public void run(final Status status) {
};
if (request.isReverse()) {
this.rawKVStore.reverseScan(request.getStartKey(), request.getEndKey(), request.getLimit(),
request.isReadOnlySafe(), request.isReturnValue(), KVStoreClosure);
request.isReadOnlySafe(), request.isReturnValue(), kvStoreClosure);
} else {
this.rawKVStore.scan(request.getStartKey(), request.getEndKey(), request.getLimit(),
request.isReadOnlySafe(), request.isReturnValue(), KVStoreClosure);
request.isReadOnlySafe(), request.isReturnValue(), kvStoreClosure);
}
} catch (final Throwable t) {
LOG.error("Failed to handle: {}, {}.", request, StackTraceUtil.stackTrace(t));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -623,8 +623,8 @@ private FutureGroup<List<KVEntry>> internalScan(final byte[] startKey, final byt
final ListRetryCallable<KVEntry> retryCallable = retryCause -> internalScan(subStartKey, subEndKey,
readOnlySafe, returnValue, retriesLeft - 1, retryCause);
final ListFailoverFuture<KVEntry> future = new ListFailoverFuture<>(retriesLeft, retryCallable);
internalRegionScan(region, subStartKey, subEndKey, false, readOnlySafe, returnValue, future, retriesLeft, lastError,
this.onlyLeaderRead);
internalRegionScan(region, subStartKey, subEndKey, false, readOnlySafe, returnValue, future, retriesLeft,
lastError, this.onlyLeaderRead);
futures.add(future);
}
return new FutureGroup<>(futures);
Expand Down Expand Up @@ -761,8 +761,8 @@ private FutureGroup<List<KVEntry>> internalReverseScan(final byte[] startKey, fi
final ListRetryCallable<KVEntry> retryCallable = retryCause -> internalReverseScan(subStartKey, subEndKey,
readOnlySafe, returnValue, retriesLeft - 1, retryCause);
final ListFailoverFuture<KVEntry> future = new ListFailoverFuture<>(retriesLeft, retryCallable);
internalRegionScan(region, subStartKey, subEndKey, true, readOnlySafe, returnValue, future, retriesLeft, lastError,
this.onlyLeaderRead );
internalRegionScan(region, subStartKey, subEndKey, true, readOnlySafe, returnValue, future, retriesLeft,
lastError, this.onlyLeaderRead );
futures.add(future);
}
return new FutureGroup<>(futures);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,13 +128,14 @@ public long getSafeEndValueForSequence(final long startVal, final int step) {
}

/**
* If limit == 0, it will be modified to Integer.MAX_VALUE on the server
* and then queried. So 'limit == 0' means that the number of queries is
* not limited. This is because serialization uses varint to compress
* numbers. In the case of 0, only 1 byte is occupied, and Integer.MAX_VALUE
* takes 5 bytes.
* @param limit
* @return
* If limit == 0, it will be modified to Integer.MAX_VALUE on the server
* and then queried. So 'limit == 0' means that the number of queries is
* not limited. This is because serialization uses varint to compress
* numbers. In the case of 0, only 1 byte is occupied, and Integer.MAX_VALUE
* takes 5 bytes.
*
* @param limit input limit
* @return normalize limit
*/
protected int normalizeLimit(final int limit) {
return limit > 0 ? limit : Integer.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public void scan(final byte[] startKey, final byte[] endKey, final int limit,
final KVStoreClosure closure) {
final Timer.Context timeCtx = getTimeContext("SCAN");
final List<KVEntry> entries = Lists.newArrayList();
int maxCount = normalizeLimit(limit);
final int maxCount = normalizeLimit(limit);
final ConcurrentNavigableMap<byte[], byte[]> subMap;
final byte[] realStartKey = BytesUtil.nullToEmpty(startKey);
if (endKey == null) {
Expand Down Expand Up @@ -174,7 +174,7 @@ public void reverseScan(final byte[] startKey, final byte[] endKey, final int li
final KVStoreClosure closure) {
final Timer.Context timeCtx = getTimeContext("REVERSE_SCAN");
final List<KVEntry> entries = Lists.newArrayList();
int maxCount = normalizeLimit(limit);
final int maxCount = normalizeLimit(limit);
final ConcurrentNavigableMap<byte[], byte[]> subMap;
final byte[] realEndKey = BytesUtil.nullToEmpty(endKey);
if (startKey == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,11 @@ public void run(final Status status, final long index, final byte[] reqCtx) {
}
RaftRawKVStore.this.readIndexExecutor.execute(() -> {
if (isLeader()) {
LOG.warn("Fail to [reverse scan] with 'ReadIndex': {}, try to applying to the state machine.", status);
LOG.warn("Fail to [reverseScan] with 'ReadIndex': {}, try to applying to the state machine.", status);
// If 'read index' read fails, try to applying to the state machine at the leader node
applyOperation(KVOperation.createReverseScan(startKey, endKey, limit, returnValue), closure);
} else {
LOG.warn("Fail to [reverse scan] with 'ReadIndex': {}.", status);
LOG.warn("Fail to [reverseScan] with 'ReadIndex': {}.", status);
// Client will retry to leader node
new KVClosureAdapter(closure, null).run(status);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ public void scan(final byte[] startKey, final byte[] endKey, final int limit,
final KVStoreClosure closure) {
final Timer.Context timeCtx = getTimeContext("SCAN");
final List<KVEntry> entries = Lists.newArrayList();
int maxCount = normalizeLimit(limit);
final int maxCount = normalizeLimit(limit);
final Lock readLock = this.readWriteLock.readLock();
readLock.lock();
try (final RocksIterator it = this.db.newIterator()) {
Expand Down

0 comments on commit 0489220

Please sign in to comment.