Skip to content

Commit

Permalink
(feat) execute the 'onTaskCommitted' method only when necessary (sofa…
Browse files Browse the repository at this point in the history
…stack#89)

* (feat) execute the 'onTaskCommitted' method only when necessary

* (feat) add taskClosures list
  • Loading branch information
fengjiachun authored and killme2008 committed Apr 8, 2019
1 parent bc4523d commit 2471c3a
Show file tree
Hide file tree
Showing 4 changed files with 153 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,39 @@ public interface ClosureQueue {
*
* @param firstIndex the first index of queue
*/
void resetFirstIndex(long firstIndex);
void resetFirstIndex(final long firstIndex);

/**
* Append a new closure into queue.
*
* @param closure the closure to append
*/
void appendPendingClosure(Closure closure);
void appendPendingClosure(final Closure closure);

/**
* Pop closure from queue until index(inclusion), returns the first
* popped out index, returns -1 when out of range, returns index+1
* when not found.
*
* @param index the index of queue
* @param closures closure list
* @param endIndex the index of queue
* @param closures closure list
* @return returns the first popped out index, returns -1 when out
* of range, returns index+1
* when not found.
*/
long popClosureUntil(long index, List<Closure> closures);
long popClosureUntil(final long endIndex, final List<Closure> closures);

/**
* Pop closure from queue until index(inclusion), returns the first
* popped out index, returns -1 when out of range, returns index+1
* when not found.
*
* @param endIndex the index of queue
* @param closures closure list
* @param taskClosures task closure list
* @return returns the first popped out index, returns -1 when out
* of range, returns index+1
* when not found.
*/
long popClosureUntil(final long endIndex, final List<Closure> closures, final List<TaskClosure> taskClosures);
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ public class ClosureQueueImpl implements ClosureQueue {

@OnlyForTest
public long getFirstIndex() {
return this.firstIndex;
return firstIndex;
}

@OnlyForTest
public LinkedList<Closure> getQueue() {
return this.queue;
return queue;
}

public ClosureQueueImpl() {
Expand All @@ -66,68 +66,78 @@ public ClosureQueueImpl() {
@Override
public void clear() {
List<Closure> savedQueue;
lock.lock();
this.lock.lock();
try {
this.firstIndex = 0;
savedQueue = this.queue;
this.queue = new LinkedList<>();
} finally {
lock.unlock();
this.lock.unlock();
}

Status status = new Status(RaftError.EPERM, "Leader stepped down");
for (Closure done : savedQueue) {
final Status status = new Status(RaftError.EPERM, "Leader stepped down");
for (final Closure done : savedQueue) {
if (done != null) {
Utils.runClosureInThread(done, status);
}
}
}

@Override
public void resetFirstIndex(long firstIndex) {
lock.lock();
public void resetFirstIndex(final long firstIndex) {
this.lock.lock();
try {
Requires.requireTrue(this.queue.isEmpty(), "Queue is not empty.");
this.firstIndex = firstIndex;
} finally {
lock.unlock();
this.lock.unlock();
}

}

@Override
public void appendPendingClosure(Closure closure) {
lock.lock();
public void appendPendingClosure(final Closure closure) {
this.lock.lock();
try {
this.queue.add(closure);
} finally {
lock.unlock();
this.lock.unlock();
}
}

@Override
public long popClosureUntil(long index, List<Closure> out) {
out.clear();
long outFirstIndex;
lock.lock();
public long popClosureUntil(final long endIndex, final List<Closure> closures) {
return popClosureUntil(endIndex, closures, null);
}

@Override
public long popClosureUntil(final long endIndex, final List<Closure> closures, final List<TaskClosure> taskClosures) {
closures.clear();
if (taskClosures != null) {
taskClosures.clear();
}
this.lock.lock();
try {
if (queue.isEmpty() || index < this.firstIndex) {
outFirstIndex = index + 1;
return outFirstIndex;
final int queueSize = this.queue.size();
if (queueSize == 0 || endIndex < this.firstIndex) {
return endIndex + 1;
}
if (index > this.firstIndex + queue.size() - 1) {
LOG.error("Invalid index={}, firstIndex={}, closureQueueSize={}", index, firstIndex, queue.size());
if (endIndex > this.firstIndex + queueSize - 1) {
LOG.error("Invalid endIndex={}, firstIndex={}, closureQueueSize={}", endIndex, this.firstIndex,
queueSize);
return -1;
}
outFirstIndex = this.firstIndex;
for (long i = this.firstIndex; i <= index; i++) {
out.add(queue.pollFirst());
final long outFirstIndex = this.firstIndex;
for (long i = outFirstIndex; i <= endIndex; i++) {
final Closure closure = this.queue.pollFirst();
if (taskClosures != null && closure instanceof TaskClosure) {
taskClosures.add((TaskClosure) closure);
}
closures.add(closure);
}
this.firstIndex = index + 1;
this.firstIndex = endIndex + 1;
return outFirstIndex;
} finally {
lock.unlock();
this.lock.unlock();
}
}

}
Loading

0 comments on commit 2471c3a

Please sign in to comment.