Skip to content

Commit

Permalink
feat(raft): leave joint consensus when committed
Browse files Browse the repository at this point in the history
Co-authored-by: deepthidevaki <[email protected]>
  • Loading branch information
lenaschoenburg and deepthidevaki committed Aug 22, 2023
1 parent 8fba14f commit a52860f
Showing 1 changed file with 43 additions and 16 deletions.
59 changes: 43 additions & 16 deletions atomix/cluster/src/main/java/io/atomix/raft/roles/LeaderRole.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;

Expand Down Expand Up @@ -88,6 +89,15 @@ public synchronized CompletableFuture<RaftRole> start() {
commitInitialEntriesFuture = commitInitialEntries();
lastZbEntry = findLastZeebeEntry();

if (raft.getCluster().inJointConsensus()) {
raft.getThreadContext()
.execute(
() -> {
final var currentMembers = raft.getCluster().getConfiguration().members();
configure(currentMembers, List.of());
});
}

return super.start().thenRun(this::startTimers).thenApply(v -> this);
}

Expand Down Expand Up @@ -117,7 +127,7 @@ public CompletableFuture<ReconfigureResponse> onReconfigure(final ReconfigureReq
// If the leader index is 0 or is greater than the commitIndex, reject the promote requests.
// Configuration changes should not be allowed until the leader has committed a no-op entry.
// See https://groups.google.com/forum/#!topic/raft-dev/t4xj6dJTP6E
if (configuring() || initializing()) {
if (configuring() || initializing() || raft.getCluster().inJointConsensus()) {
return CompletableFuture.completedFuture(
logResponse(ReconfigureResponse.builder().withStatus(RaftResponse.Status.ERROR).build()));
}
Expand All @@ -141,23 +151,39 @@ public CompletableFuture<ReconfigureResponse> onReconfigure(final ReconfigureReq
final CompletableFuture<ReconfigureResponse> future = new CompletableFuture<>();
configure(updatedMembers, currentMembers)
.whenComplete(
(index, error) -> {
if (error == null) {
future.complete(
logResponse(
ReconfigureResponse.builder()
.withStatus(RaftResponse.Status.OK)
.withIndex(index)
.withTerm(raft.getCluster().getConfiguration().term())
.withTime(raft.getCluster().getConfiguration().time())
.withMembers(updatedMembers)
.build()));
(jointConsensusIndex, jointConsensusError) -> {
if (jointConsensusError == null) {
configure(updatedMembers, List.of())
.whenComplete(
(leftJointConsensusIndex, leftJointConsensusError) -> {
if (leftJointConsensusError == null) {
future.complete(
logResponse(
ReconfigureResponse.builder()
.withStatus(RaftResponse.Status.OK)
.withIndex(leftJointConsensusIndex)
.withTerm(raft.getCluster().getConfiguration().term())
.withTime(raft.getCluster().getConfiguration().time())
.withMembers(updatedMembers)
.build()));
} else {
future.complete(
logResponse(
ReconfigureResponse.builder()
.withStatus(RaftResponse.Status.ERROR)
.withError(
RaftError.Type.PROTOCOL_ERROR,
leftJointConsensusError.getMessage())
.build()));
}
});
} else {
future.complete(
logResponse(
ReconfigureResponse.builder()
.withStatus(RaftResponse.Status.ERROR)
.withError(RaftError.Type.PROTOCOL_ERROR, error.getMessage())
.withError(
RaftError.Type.PROTOCOL_ERROR, jointConsensusError.getMessage())
.build()));
}
});
Expand Down Expand Up @@ -196,8 +222,9 @@ private void stepDown() {
/** Sets the current node as the cluster leader. */
private void takeLeadership() {
raft.setLeader(raft.getCluster().getLocalMember().memberId());
raft.getCluster().reset();
raft.getCluster()
.getRemoteMemberStates()
.getReplicationTargets()
.forEach(member -> member.openReplicationContext(raft.getLog()));
}

Expand Down Expand Up @@ -279,13 +306,13 @@ private boolean initializing() {

/** Commits the given configuration. */
private CompletableFuture<Long> configure(
final Collection<RaftMember> updatedMembers, final Collection<RaftMember> currentMembers) {
final Collection<RaftMember> newMembers, final Collection<RaftMember> oldMembers) {
raft.checkThread();

final long term = raft.getTerm();

final var configurationEntry =
new ConfigurationEntry(System.currentTimeMillis(), updatedMembers, currentMembers);
new ConfigurationEntry(System.currentTimeMillis(), newMembers, oldMembers);
final IndexedRaftLogEntry entry;
try {
entry = appendEntry(new RaftLogEntry(term, configurationEntry));
Expand Down

0 comments on commit a52860f

Please sign in to comment.