Skip to content

Commit

Permalink
feat: raft impl v3, part 49
Browse files Browse the repository at this point in the history
re-add raft vote
  • Loading branch information
areyouok committed Feb 2, 2024
1 parent 86c2d4f commit de6a249
Show file tree
Hide file tree
Showing 9 changed files with 738 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ private FrameCallResult afterRaftLogInit(Pair<Integer, Long> initResult, int sna
initResult.getLeft(), initResult.getRight(), groupConfig.getGroupId());

gc.getCommitManager().startCommitFiber();
gc.getVoteManager().startFiber();
gc.getApplyManager().init(getFiberGroup(), prepareFuture);
return Fiber.frameReturn();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
*/
package com.github.dtprj.dongting.raft.impl;

import com.github.dtprj.dongting.fiber.Fiber;
import com.github.dtprj.dongting.fiber.FiberGroup;
import com.github.dtprj.dongting.fiber.FrameCall;
import com.github.dtprj.dongting.fiber.FrameCallResult;
import com.github.dtprj.dongting.log.DtLog;
import com.github.dtprj.dongting.log.DtLogs;
import com.github.dtprj.dongting.net.NioNet;
Expand All @@ -27,7 +30,10 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -257,6 +263,16 @@ public static void changeToObserver(RaftStatusImpl raftStatus, int leaderId) {
raftStatus.setRole(RaftRole.observer);
}

public static void changeToLeader(RaftStatusImpl raftStatus) {
log.info("change to leader. term={}", raftStatus.getCurrentTerm());
resetStatus(raftStatus);
raftStatus.setRole(RaftRole.leader);
raftStatus.setFirstIndexOfCurrentTerm(raftStatus.getLastLogIndex() + 1);
for (RaftMember node : raftStatus.getReplicateList()) {
node.setNextIndex(raftStatus.getLastLogIndex() + 1);
}
}

public static boolean writeNotFinished(RaftStatusImpl raftStatus) {
if (raftStatus.getLastSyncLogIndex() != raftStatus.getLastLogIndex()) {
log.info("write not finished, lastPersistLogIndex={}, lastLogIndex={}",
Expand All @@ -265,4 +281,20 @@ public static boolean writeNotFinished(RaftStatusImpl raftStatus) {
}
return false;
}

public static <T> Set<T> union(Collection<T> c1, Collection<T> c2) {
HashSet<T> set = new HashSet<>();
set.addAll(c1);
set.addAll(c2);
return set;
}

public static FrameCallResult waitWriteFinish(RaftStatusImpl raftStatus, FrameCall<Void> resumePoint) {
if (writeNotFinished(raftStatus)) {
return raftStatus.getLogSyncFinishCondition().await(
1000, v -> waitWriteFinish(raftStatus, resumePoint));
} else {
return Fiber.resume(null, resumePoint);
}
}
}
Loading

0 comments on commit de6a249

Please sign in to comment.