Skip to content

Commit

Permalink
(feat) Impl SegmentList to replace ArrayDequeue in LogManagerImpl, so…
Browse files Browse the repository at this point in the history
…fastack#335 (sofastack#377)

* (feat) Impl SegmentList to replace ArrayDequeue in LogManagerImpl, sofastack#335

* (fix) comments

* (feat) MockStateMachine prevents log duplication

* (fix) set SEGMENT_SHIFT=7

* (feat) minor changes, replace ArrayDeque in BallotBox

* (fix) format

* (fix) NPE when unsafe not supported
  • Loading branch information
killme2008 authored and fengjiachun committed Dec 30, 2019
1 parent ad3e958 commit 64daa8b
Show file tree
Hide file tree
Showing 23 changed files with 769 additions and 109 deletions.
22 changes: 11 additions & 11 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/BallotBox.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import com.alipay.sofa.jraft.entity.Ballot;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.BallotBoxOptions;
import com.alipay.sofa.jraft.util.ArrayDeque;
import com.alipay.sofa.jraft.util.Describer;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.SegmentList;

/**
* Ballot box for voting.
Expand All @@ -45,22 +45,22 @@
@ThreadSafe
public class BallotBox implements Lifecycle<BallotBoxOptions>, Describer {

private static final Logger LOG = LoggerFactory.getLogger(BallotBox.class);
private static final Logger LOG = LoggerFactory.getLogger(BallotBox.class);

private FSMCaller waiter;
private ClosureQueue closureQueue;
private final StampedLock stampedLock = new StampedLock();
private long lastCommittedIndex = 0;
private long pendingIndex;
private final ArrayDeque<Ballot> pendingMetaQueue = new ArrayDeque<>();
private FSMCaller waiter;
private ClosureQueue closureQueue;
private final StampedLock stampedLock = new StampedLock();
private long lastCommittedIndex = 0;
private long pendingIndex;
private final SegmentList<Ballot> pendingMetaQueue = new SegmentList<>(false);

@OnlyForTest
long getPendingIndex() {
return this.pendingIndex;
}

@OnlyForTest
ArrayDeque<Ballot> getPendingMetaQueue() {
SegmentList<Ballot> getPendingMetaQueue() {
return this.pendingMetaQueue;
}

Expand Down Expand Up @@ -95,7 +95,7 @@ public boolean init(final BallotBoxOptions opts) {
*/
public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) {
// TODO use lock-free algorithm here?
final long stamp = stampedLock.writeLock();
final long stamp = this.stampedLock.writeLock();
long lastCommittedIndex = 0;
try {
if (this.pendingIndex == 0) {
Expand Down Expand Up @@ -127,7 +127,7 @@ public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final
// logs, since we use the new configuration to deal the quorum of the
// removal request, we think it's safe to commit all the uncommitted
// previous logs, which is not well proved right now
this.pendingMetaQueue.removeRange(0, (int) (lastCommittedIndex - this.pendingIndex) + 1);
this.pendingMetaQueue.removeFromFirst((int) (lastCommittedIndex - this.pendingIndex) + 1);
LOG.debug("Committed log fromIndex={}, toIndex={}.", this.pendingIndex, lastCommittedIndex);
this.pendingIndex = lastCommittedIndex + 1;
this.lastCommittedIndex = lastCommittedIndex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1048,7 +1048,6 @@ private static void onTimeout(final ThreadId id) {
void destroy() {
final ThreadId savedId = this.id;
LOG.info("Replicator {} is going to quit", savedId);
this.id = null;
releaseReader();
// Unregister replicator metric set
if (this.nodeMetrics.isEnabled()) {
Expand All @@ -1058,6 +1057,7 @@ void destroy() {
this.state = State.Destroyed;
notifyReplicatorStatusListener((Replicator) savedId.getData(), ReplicatorEvent.DESTROYED);
savedId.unlockAndDestroy();
this.id = null;
}

private void releaseReader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import com.alipay.sofa.jraft.util.LogExceptionHandler;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.SegmentList;
import com.alipay.sofa.jraft.util.ThreadHelper;
import com.alipay.sofa.jraft.util.Utils;
import com.lmax.disruptor.EventFactory;
Expand Down Expand Up @@ -90,8 +91,7 @@ public class LogManagerImpl implements LogManager {
private long nextWaitId;
private LogId diskId = new LogId(0, 0);
private LogId appliedId = new LogId(0, 0);
// TODO use a lock-free concurrent list instead?
private ArrayDeque<LogEntry> logsInMemory = new ArrayDeque<>();
private final SegmentList<LogEntry> logsInMemory = new SegmentList<>(true);
private volatile long firstLogIndex;
private volatile long lastLogIndex;
private volatile LogId lastSnapshotId = new LogId(0, 0);
Expand Down Expand Up @@ -253,16 +253,8 @@ public void shutdown() {
private void clearMemoryLogs(final LogId id) {
this.writeLock.lock();
try {
int index = 0;
for (final int size = this.logsInMemory.size(); index < size; index++) {
final LogEntry entry = this.logsInMemory.get(index);
if (entry.getId().compareTo(id) > 0) {
break;
}
}
if (index > 0) {
this.logsInMemory.removeRange(0, index);
}

this.logsInMemory.removeFromFirstWhen(entry -> entry.getId().compareTo(id) <= 0);
} finally {
this.writeLock.unlock();
}
Expand Down Expand Up @@ -698,7 +690,8 @@ public void clearBufferedLogs() {
private String descLogsInMemory() {
final StringBuilder sb = new StringBuilder();
boolean wasFirst = true;
for (final LogEntry logEntry : this.logsInMemory) {
for (int i = 0; i < this.logsInMemory.size(); i++) {
LogEntry logEntry = this.logsInMemory.get(i);
if (!wasFirst) {
sb.append(",");
} else {
Expand Down Expand Up @@ -933,20 +926,12 @@ public void run(final Status status) {
}

private boolean truncatePrefix(final long firstIndexKept) {
int index = 0;
for (final int size = this.logsInMemory.size(); index < size; index++) {
final LogEntry entry = this.logsInMemory.get(index);
if (entry.getId().getIndex() >= firstIndexKept) {
break;
}
}
if (index > 0) {
this.logsInMemory.removeRange(0, index);
}

this.logsInMemory.removeFromFirstWhen(entry -> entry.getId().getIndex() < firstIndexKept);

// TODO maybe it's fine here
Requires.requireTrue(firstIndexKept >= this.firstLogIndex,
"Try to truncate logs before %d, but the firstLogIndex is %d", firstIndexKept, this.firstLogIndex);
"Try to truncate logs before %d, but the firstLogIndex is %d", firstIndexKept, this.firstLogIndex);

this.firstLogIndex = firstIndexKept;
if (firstIndexKept > this.lastLogIndex) {
Expand All @@ -963,7 +948,7 @@ private boolean truncatePrefix(final long firstIndexKept) {
private boolean reset(final long nextLogIndex) {
this.writeLock.lock();
try {
this.logsInMemory = new ArrayDeque<>();
this.logsInMemory.clear();
this.firstLogIndex = nextLogIndex;
this.lastLogIndex = nextLogIndex - 1;
this.configManager.truncatePrefix(this.firstLogIndex);
Expand All @@ -982,14 +967,9 @@ private void unsafeTruncateSuffix(final long lastIndexKept) {
lastIndexKept);
return;
}
while (!this.logsInMemory.isEmpty()) {
final LogEntry entry = this.logsInMemory.peekLast();
if (entry.getId().getIndex() > lastIndexKept) {
this.logsInMemory.pollLast();
} else {
break;
}
}

this.logsInMemory.removeFromLastWhen(entry -> entry.getId().getIndex() > lastIndexKept);

this.lastLogIndex = lastIndexKept;
final long lastTermKept = unsafeGetTerm(lastIndexKept);
Requires.requireTrue(this.lastLogIndex == 0 || lastTermKept != 0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,20 @@ public abstract class RepeatedTimer implements Describer {
private Timeout timeout;
private boolean stopped;
private volatile boolean running;
private boolean destroyed;
private boolean invoking;
private volatile boolean destroyed;
private volatile boolean invoking;
private volatile int timeoutMs;
private final String name;

public int getTimeoutMs() {
return this.timeoutMs;
}

public RepeatedTimer(String name, int timeoutMs) {
public RepeatedTimer(final String name, final int timeoutMs) {
this(name, timeoutMs, new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048));
}

public RepeatedTimer(String name, int timeoutMs, Timer timer) {
public RepeatedTimer(final String name, final int timeoutMs, final Timer timer) {
super();
this.name = name;
this.timeoutMs = timeoutMs;
Expand All @@ -81,12 +81,7 @@ protected int adjustTimeout(final int timeoutMs) {
}

public void run() {
this.lock.lock();
try {
this.invoking = true;
} finally {
this.lock.unlock();
}
this.invoking = true;
try {
onTrigger();
} catch (final Throwable t) {
Expand Down Expand Up @@ -157,7 +152,7 @@ public void start() {
}

private void schedule() {
if(this.timeout != null) {
if (this.timeout != null) {
this.timeout.cancel();
}
final TimerTask timerTask = timeout -> {
Expand Down Expand Up @@ -216,8 +211,6 @@ public void destroy() {
if (!this.running) {
invokeDestroyed = true;
}
// Timer#stop is idempotent
this.timer.stop();
if (this.stopped) {
return;
}
Expand All @@ -231,6 +224,7 @@ public void destroy() {
}
} finally {
this.lock.unlock();
this.timer.stop();
if (invokeDestroyed) {
onDestroy();
}
Expand Down Expand Up @@ -272,8 +266,8 @@ public void describe(final Printer out) {

@Override
public String toString() {
return "RepeatedTimer{" + "timeout=" + timeout + ", stopped=" + stopped + ", running=" + running
+ ", destroyed=" + destroyed + ", invoking=" + invoking + ", timeoutMs=" + timeoutMs + ", name='" + name
+ '\'' + '}';
return "RepeatedTimer{" + "timeout=" + this.timeout + ", stopped=" + this.stopped + ", running=" + this.running
+ ", destroyed=" + this.destroyed + ", invoking=" + this.invoking + ", timeoutMs=" + this.timeoutMs
+ ", name='" + this.name + '\'' + '}';
}
}
Loading

0 comments on commit 64daa8b

Please sign in to comment.