Skip to content

Commit

Permalink
refactor(raft): maintain specialized views of active and remote members
Browse files Browse the repository at this point in the history
  • Loading branch information
lenaschoenburg committed Aug 23, 2023
1 parent 3345818 commit cef975e
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,11 @@ public final class RaftClusterContext implements RaftCluster, AutoCloseable {
private final RaftContext raft;
private final DefaultRaftMember localMember;
private final Map<MemberId, RaftMemberContext> remoteMemberContexts = new HashMap<>();
private Configuration configuration;
private CompletableFuture<Void> bootstrapFuture;
private List<RaftMemberContext> replicationTargets = new ArrayList<>();
private List<RaftMemberContext> activeMembers = new ArrayList<>();
private boolean hasRemoteActiveMembers = false;
private volatile Configuration configuration;
private volatile CompletableFuture<Void> bootstrapFuture;

public RaftClusterContext(final MemberId localMemberId, final RaftContext raft) {
final Instant time = Instant.now();
Expand Down Expand Up @@ -83,12 +86,17 @@ public RaftClusterContext(final MemberId localMemberId, final RaftContext raft)
}
}

remoteMemberContexts
.values()
.forEach(
context -> {
context.resetState(raft.getLog());
});
remoteMemberContexts.values().forEach(context -> context.resetState(raft.getLog()));
activeMembers =
remoteMemberContexts.values().stream()
.filter(context -> context.getMember().getType() == Type.ACTIVE)
.toList();
replicationTargets =
remoteMemberContexts.values().stream()
.filter(context -> context.getMember().getType() != Type.INACTIVE)
.toList();

hasRemoteActiveMembers = !activeMembers.isEmpty();
}
}

Expand Down Expand Up @@ -241,7 +249,7 @@ public RaftMemberContext getMemberState(final MemberId id) {
*/
public <T extends Comparable<T>> Optional<T> getQuorumFor(
final Function<RaftMemberContext, T> calculateMemberValue) {
final var contexts = new ArrayList<>(getRemoteActiveMembers());
final var contexts = new ArrayList<>(activeMembers);

if (configuration.requiresJointConsensus()) {
final var oldMembers = configuration.oldMembers();
Expand Down Expand Up @@ -288,22 +296,22 @@ private <T extends Comparable<T>> Optional<T> getQuorumFor(
return Optional.of(calculateMemberValue.apply(context));
}

public boolean isSingleMemberCluster() {
return !hasRemoteActiveMembers;
}

/**
* @return A list of remote, active members.
* @return A list remote members which participate in voting, i.e. are active.
*/
public List<RaftMemberContext> getRemoteActiveMembers() {
return remoteMemberContexts.values().stream()
.filter(context -> context.getMember().getType() == Type.ACTIVE)
.toList();
public Set<RaftMember> getVotingMembers() {
return activeMembers.stream().map(RaftMemberContext::getMember).collect(Collectors.toSet());
}

/**
* @return A list of remote, active members.
*/
public List<RaftMemberContext> getReplicationTargets() {
return remoteMemberContexts.values().stream()
.filter(context -> context.getMember().getType() != Type.INACTIVE)
.toList();
return replicationTargets;
}

private void completeBootstrapFuture() {
Expand Down Expand Up @@ -373,6 +381,18 @@ public void configure(final Configuration configuration) {
updateMember(member, time);
}

activeMembers =
remoteMemberContexts.values().stream()
.filter(context -> context.getMember().getType() == Type.ACTIVE)
.toList();

replicationTargets =
remoteMemberContexts.values().stream()
.filter(context -> context.getMember().getType() != Type.INACTIVE)
.toList();

hasRemoteActiveMembers = !activeMembers.isEmpty();

// Transition the local member only if the member is being promoted and not demoted.
// Configuration changes that demote the local member are only applied to the local server
// upon commitment. This ensures that e.g. a leader that's removing itself from the quorum
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
import io.atomix.cluster.messaging.MessagingException.NoRemoteHandler;
import io.atomix.raft.RaftServer;
import io.atomix.raft.RaftServer.Role;
import io.atomix.raft.cluster.impl.DefaultRaftMember;
import io.atomix.raft.cluster.impl.RaftMemberContext;
import io.atomix.raft.cluster.RaftMember;
import io.atomix.raft.impl.RaftContext;
import io.atomix.raft.protocol.AppendResponse;
import io.atomix.raft.protocol.InternalAppendRequest;
Expand All @@ -31,10 +30,8 @@
import io.atomix.raft.utils.VoteQuorum;
import io.atomix.utils.concurrent.Scheduled;
import java.time.Duration;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/** Candidate state. */
public final class CandidateRole extends ActiveRole {
Expand All @@ -49,7 +46,7 @@ public CandidateRole(final RaftContext context) {

@Override
public synchronized CompletableFuture<RaftRole> start() {
if (raft.getCluster().getRemoteActiveMembers().isEmpty()) {
if (raft.getCluster().isSingleMemberCluster()) {
log.info("Single member cluster. Transitioning directly to leader.");
raft.setTerm(raft.getTerm() + 1);
raft.setLastVotedFor(raft.getCluster().getLocalMember().memberId());
Expand Down Expand Up @@ -107,10 +104,7 @@ private void sendVoteRequests() {
raft.setLastVotedFor(raft.getCluster().getLocalMember().memberId());

final AtomicBoolean complete = new AtomicBoolean();
final Set<DefaultRaftMember> votingMembers =
raft.getCluster().getRemoteActiveMembers().stream()
.map(RaftMemberContext::getMember)
.collect(Collectors.toSet());
final var votingMembers = raft.getCluster().getVotingMembers();

// Send vote requests to all nodes. The vote request that is sent
// to this node will be automatically successful.
Expand Down Expand Up @@ -183,7 +177,7 @@ private void sendVoteRequests() {

// Once we got the last log term, iterate through each current member
// of the cluster and vote each member for a vote.
for (final DefaultRaftMember member : votingMembers) {
for (final var member : votingMembers) {
log.debug("Requesting vote from {} for term {}", member, raft.getTerm());
final VoteRequest request =
VoteRequest.builder()
Expand All @@ -200,7 +194,7 @@ private void sendVoteRequests() {
private void sendVoteRequestToMember(
final AtomicBoolean complete,
final VoteQuorum quorum,
final DefaultRaftMember member,
final RaftMember member,
final VoteRequest request) {
raft.getProtocol()
.vote(member.memberId(), request)
Expand All @@ -217,7 +211,7 @@ private void sendVoteRequestToMember(
private void onVoteResponse(
final AtomicBoolean complete,
final VoteQuorum quorum,
final DefaultRaftMember member,
final RaftMember member,
final VoteRequest request,
final VoteResponse response,
final Throwable error) {
Expand Down Expand Up @@ -245,7 +239,7 @@ private void onVoteResponse(
private void onVoteResponseError(
final AtomicBoolean complete,
final VoteQuorum quorum,
final DefaultRaftMember member,
final RaftMember member,
final VoteRequest request,
final Throwable error) {
if (error.getCause() instanceof NoRemoteHandler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.atomix.raft.RaftServer;
import io.atomix.raft.cluster.RaftMember;
import io.atomix.raft.cluster.impl.DefaultRaftMember;
import io.atomix.raft.cluster.impl.RaftMemberContext;
import io.atomix.raft.impl.RaftContext;
import io.atomix.raft.protocol.AppendResponse;
import io.atomix.raft.protocol.ConfigureRequest;
Expand All @@ -39,10 +38,8 @@
import io.atomix.raft.storage.log.IndexedRaftLogEntry;
import io.atomix.raft.utils.VoteQuorum;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/** Follower state. */
public final class FollowerRole extends ActiveRole {
Expand All @@ -58,7 +55,7 @@ public FollowerRole(final RaftContext context, final ElectionTimerFactory electi
@Override
public synchronized CompletableFuture<RaftRole> start() {

if (raft.getCluster().getRemoteActiveMembers().isEmpty()) {
if (raft.getCluster().isSingleMemberCluster()) {
log.info("Single member cluster. Transitioning directly to candidate.");
raft.transition(RaftServer.Role.CANDIDATE);
return CompletableFuture.completedFuture(this);
Expand Down Expand Up @@ -115,10 +112,7 @@ private void handleClusterEvent(final ClusterMembershipEvent event) {
private void sendPollRequests() {
// Create a quorum that will track the number of nodes that have responded to the poll request.
final AtomicBoolean complete = new AtomicBoolean();
final Set<DefaultRaftMember> votingMembers =
raft.getCluster().getRemoteActiveMembers().stream()
.map(RaftMemberContext::getMember)
.collect(Collectors.toSet());
final var votingMembers = raft.getCluster().getVotingMembers();

// If there are no other members in the cluster, immediately transition to leader.
if (votingMembers.isEmpty()) {
Expand Down Expand Up @@ -157,7 +151,7 @@ private void sendPollRequests() {

// Once we got the last log term, iterate through each current member
// of the cluster and vote each member for a vote.
for (final DefaultRaftMember member : votingMembers) {
for (final RaftMember member : votingMembers) {
log.debug("Polling {} for next term {}", member, raft.getTerm() + 1);
final PollRequest request =
PollRequest.builder()
Expand Down Expand Up @@ -264,7 +258,7 @@ private void schedulePollRequests() {
private void handlePollResponse(
final AtomicBoolean complete,
final VoteQuorum quorum,
final DefaultRaftMember member,
final RaftMember member,
final PollResponse response,
final Throwable error) {
raft.checkThread();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,7 @@ public CompletableFuture<Long> appendEntries(final long index) {
// If there are no other active members in the cluster, update the commit index and complete the
// commit.
// The updated commit index will be sent to passive/reserve members on heartbeats.
if (raft.getCluster().getRemoteActiveMembers().isEmpty()) {
final long previousCommitIndex = raft.getCommitIndex();
if (raft.getCluster().isSingleMemberCluster()) {
raft.setCommitIndex(index);
completeCommits(index);
return CompletableFuture.completedFuture(index);
Expand Down

0 comments on commit cef975e

Please sign in to comment.