Skip to content

Commit

Permalink
[ISSUE#246]Implement for ReplicatorGroupTest Unit Class. (sofastack#249)
Browse files Browse the repository at this point in the history
  • Loading branch information
zongtanghu authored and fengjiachun committed Aug 17, 2019
1 parent be1309d commit 61570a9
Show file tree
Hide file tree
Showing 2 changed files with 199 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,191 @@
*/
package com.alipay.sofa.jraft.core;

import com.alipay.sofa.jraft.ReplicatorGroup;
import com.alipay.sofa.jraft.entity.NodeId;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.NodeOptions;
import com.alipay.sofa.jraft.option.RaftOptions;
import com.alipay.sofa.jraft.option.ReplicatorGroupOptions;
import com.alipay.sofa.jraft.rpc.RaftClientService;
import com.alipay.sofa.jraft.rpc.RpcRequests;
import com.alipay.sofa.jraft.rpc.impl.FutureImpl;
import com.alipay.sofa.jraft.storage.LogManager;
import com.alipay.sofa.jraft.storage.SnapshotStorage;
import com.alipay.sofa.jraft.test.TestUtils;
import com.alipay.sofa.jraft.util.Endpoint;
import java.util.ArrayList;
import java.util.List;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.runners.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;

@RunWith(value = MockitoJUnitRunner.class)
public class ReplicatorGroupTest {

private TimerManager timerManager;
private ReplicatorGroup replicatorGroup;
@Mock
private BallotBox ballotBox;
@Mock
private LogManager logManager;
@Mock
private NodeImpl node;
private ReplicatorGroupOptions rgOpts;
@Mock
private RaftClientService rpcService;
@Mock
private SnapshotStorage snapshotStorage;
private NodeOptions options = new NodeOptions();
private final RaftOptions raftOptions = new RaftOptions();
private final PeerId peerId1 = new PeerId("localhost", 8082);
private final PeerId peerId2 = new PeerId("localhost", 8083);
private final PeerId peerId3 = new PeerId("localhost", 8084);

@Before
public void setup() {
this.timerManager = new TimerManager();
this.timerManager.init(5);
replicatorGroup = new ReplicatorGroupImpl();
rgOpts = new ReplicatorGroupOptions();
rgOpts.setHeartbeatTimeoutMs(heartbeatTimeout(this.options.getElectionTimeoutMs()));
rgOpts.setElectionTimeoutMs(this.options.getElectionTimeoutMs());
rgOpts.setLogManager(this.logManager);
rgOpts.setBallotBox(this.ballotBox);
rgOpts.setNode(node);
rgOpts.setRaftRpcClientService(this.rpcService);
rgOpts.setSnapshotStorage(snapshotStorage);
rgOpts.setRaftOptions(this.raftOptions);
rgOpts.setTimerManager(this.timerManager);
Mockito.when(this.logManager.getLastLogIndex()).thenReturn(10L);
Mockito.when(this.logManager.getTerm(10)).thenReturn(1L);
Mockito.when(this.node.getNodeMetrics()).thenReturn(new NodeMetrics(false));
Mockito.when(this.node.getNodeId()).thenReturn(new NodeId("test", new PeerId("localhost", 8081)));
mockSendEmptyEntries();
assertTrue(this.replicatorGroup.init(node.getNodeId(), rgOpts));
}

@Test
public void testAddReplicatorAndFailed() {
replicatorGroup.resetTerm(1);
assertFalse(replicatorGroup.addReplicator(peerId1));
}

@Test
public void testAddReplicatorSuccess() {
Mockito.when(this.rpcService.connect(peerId1.getEndpoint())).thenReturn(true);
replicatorGroup.resetTerm(1);
assertTrue(replicatorGroup.addReplicator(peerId1));
}

@Test
public void testStopReplicator() {
Mockito.when(this.rpcService.connect(peerId1.getEndpoint())).thenReturn(true);
replicatorGroup.resetTerm(1);
replicatorGroup.addReplicator(peerId1);
assertTrue(replicatorGroup.stopReplicator(peerId1));
}

@Test
public void testStopAllReplicator() {
Mockito.when(this.rpcService.connect(peerId1.getEndpoint())).thenReturn(true);
Mockito.when(this.rpcService.connect(peerId2.getEndpoint())).thenReturn(true);
Mockito.when(this.rpcService.connect(peerId3.getEndpoint())).thenReturn(true);
replicatorGroup.resetTerm(1);
replicatorGroup.addReplicator(peerId1);
replicatorGroup.addReplicator(peerId2);
replicatorGroup.addReplicator(peerId3);
assertTrue(replicatorGroup.contains(peerId1));
assertTrue(replicatorGroup.contains(peerId2));
assertTrue(replicatorGroup.contains(peerId3));
assertTrue(replicatorGroup.stopAll());
}

@Test
public void testTransferLeadershipToAndStop() {
Mockito.when(this.rpcService.connect(peerId1.getEndpoint())).thenReturn(true);
Mockito.when(this.rpcService.connect(peerId2.getEndpoint())).thenReturn(true);
Mockito.when(this.rpcService.connect(peerId3.getEndpoint())).thenReturn(true);
replicatorGroup.resetTerm(1);
replicatorGroup.addReplicator(peerId1);
replicatorGroup.addReplicator(peerId2);
replicatorGroup.addReplicator(peerId3);
long logIndx = 8;
assertTrue(replicatorGroup.transferLeadershipTo(peerId1, 8));
Replicator r = (Replicator) replicatorGroup.getReplicator(peerId1).lock();
assertEquals(r.getTimeoutNowIndex(), logIndx);
replicatorGroup.getReplicator(peerId1).unlock();
assertTrue(replicatorGroup.stopTransferLeadership(peerId1));
assertEquals(r.getTimeoutNowIndex(), 0);
}

@After
public void teardown() {
this.timerManager.shutdown();
}

private int heartbeatTimeout(final int electionTimeout) {
return Math.max(electionTimeout / this.raftOptions.getElectionHeartbeatFactor(), 10);
}

private void mockSendEmptyEntries() {
final RpcRequests.AppendEntriesRequest request1 = createEmptyEntriesRequestToPeer1();
final RpcRequests.AppendEntriesRequest request2 = createEmptyEntriesRequestToPeer2();
final RpcRequests.AppendEntriesRequest request3 = createEmptyEntriesRequestToPeer3();

Mockito.when(this.rpcService.appendEntries(eq(peerId1.getEndpoint()), eq(request1), eq(-1), Mockito.any()))
.thenReturn(new FutureImpl<>());
Mockito.when(this.rpcService.appendEntries(eq(peerId2.getEndpoint()), eq(request2), eq(-1), Mockito.any()))
.thenReturn(new FutureImpl<>());
Mockito.when(this.rpcService.appendEntries(eq(peerId3.getEndpoint()), eq(request3), eq(-1), Mockito.any()))
.thenReturn(new FutureImpl<>());
}

private RpcRequests.AppendEntriesRequest createEmptyEntriesRequestToPeer1() {
final RpcRequests.AppendEntriesRequest request = RpcRequests.AppendEntriesRequest.newBuilder(). //
setGroupId("test"). //
setServerId(new PeerId("localhost", 8081).toString()). //
setPeerId(this.peerId1.toString()). //
setTerm(1). //
setPrevLogIndex(10). //
setPrevLogTerm(1). //
setCommittedIndex(0).build();
return request;
}

private RpcRequests.AppendEntriesRequest createEmptyEntriesRequestToPeer2() {
final RpcRequests.AppendEntriesRequest request = RpcRequests.AppendEntriesRequest.newBuilder(). //
setGroupId("test"). //
setServerId(new PeerId("localhost", 8081).toString()). //
setPeerId(this.peerId2.toString()). //
setTerm(1). //
setPrevLogIndex(10). //
setPrevLogTerm(1). //
setCommittedIndex(0).build();
return request;
}

private RpcRequests.AppendEntriesRequest createEmptyEntriesRequestToPeer3() {
final RpcRequests.AppendEntriesRequest request = RpcRequests.AppendEntriesRequest.newBuilder(). //
setGroupId("test"). //
setServerId(new PeerId("localhost", 8081).toString()). //
setPeerId(this.peerId3.toString()). //
setTerm(1). //
setPrevLogIndex(10). //
setPrevLogTerm(1). //
setCommittedIndex(0).build();
return request;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,12 @@ public void setup() {
}

private void mockSendEmptyEntries() {
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequestt();
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest();
Mockito.when(this.rpcService.appendEntries(eq(peerId.getEndpoint()), eq(request), eq(-1), Mockito.any()))
.thenReturn(new FutureImpl<>());
}

private RpcRequests.AppendEntriesRequest createEmptyEntriesRequestt() {
private RpcRequests.AppendEntriesRequest createEmptyEntriesRequest() {
final RpcRequests.AppendEntriesRequest request = RpcRequests.AppendEntriesRequest.newBuilder(). //
setGroupId("test"). //
setServerId(new PeerId("localhost", 8082).toString()). //
Expand Down Expand Up @@ -158,7 +158,7 @@ private Replicator getReplicator() {
public void testOnRpcReturnedRpcError() {
final Replicator r = getReplicator();
assertNull(r.getBlockTimer());
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequestt();
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest();
final RpcRequests.AppendEntriesResponse response = RpcRequests.AppendEntriesResponse.newBuilder(). //
setSuccess(false). //
setLastLogIndex(12).setTerm(2).build();
Expand All @@ -173,7 +173,7 @@ public void testOnRpcReturnedRpcError() {
@Test
public void testOnRpcReturnedTermMismatch() {
final Replicator r = getReplicator();
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequestt();
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest();
final RpcRequests.AppendEntriesResponse response = RpcRequests.AppendEntriesResponse.newBuilder(). //
setSuccess(false). //
setLastLogIndex(12).setTerm(2).build();
Expand All @@ -192,7 +192,7 @@ public void testOnRpcReturnedTermMismatch() {
public void testOnRpcReturnedMoreLogs() {
final Replicator r = getReplicator();
assertEquals(11, r.getRealNextIndex());
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequestt();
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest();
final RpcRequests.AppendEntriesResponse response = RpcRequests.AppendEntriesResponse.newBuilder(). //
setSuccess(false). //
setLastLogIndex(12).setTerm(1).build();
Expand Down Expand Up @@ -227,7 +227,7 @@ public void testOnRpcReturnedMoreLogs() {
public void testOnRpcReturnedLessLogs() {
final Replicator r = getReplicator();
assertEquals(11, r.getRealNextIndex());
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequestt();
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest();
final RpcRequests.AppendEntriesResponse response = RpcRequests.AppendEntriesResponse.newBuilder(). //
setSuccess(false). //
setLastLogIndex(8).setTerm(1).build();
Expand Down Expand Up @@ -263,7 +263,7 @@ public void testOnRpcReturnedWaitMoreEntries() throws Exception {
final Replicator r = getReplicator();
assertEquals(-1, r.getWaitId());

final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequestt();
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest();
final RpcRequests.AppendEntriesResponse response = RpcRequests.AppendEntriesResponse.newBuilder(). //
setSuccess(true). //
setLastLogIndex(10).setTerm(1).build();
Expand Down Expand Up @@ -380,7 +380,7 @@ public void testSetErrorTimeout() throws Exception {
final Replicator r = getReplicator();
id.unlock();
assertNull(r.getHeartbeatInFly());
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequestt();
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest();
Mockito.when(
this.rpcService.appendEntries(eq(peerId.getEndpoint()), eq(request), eq(opts.getElectionTimeoutMs() / 2),
Mockito.any())).thenReturn(new FutureImpl<>());
Expand All @@ -395,7 +395,7 @@ public void testOnHeartbeatReturnedRpcError() {
id.unlock();
final ScheduledFuture<?> timer = r.getHeartbeatTimer();
assertNotNull(timer);
Replicator.onHeartbeatReturned(id, new Status(-1, "test"), this.createEmptyEntriesRequestt(), null,
Replicator.onHeartbeatReturned(id, new Status(-1, "test"), this.createEmptyEntriesRequest(), null,
Utils.monotonicMs());
assertNotNull(r.getHeartbeatTimer());
assertNotSame(timer, r.getHeartbeatTimer());
Expand All @@ -410,16 +410,16 @@ public void testOnHeartbeatReturnedOK() {
final RpcRequests.AppendEntriesResponse response = RpcRequests.AppendEntriesResponse.newBuilder(). //
setSuccess(false). //
setLastLogIndex(10).setTerm(1).build();
Replicator.onHeartbeatReturned(id, Status.OK(), this.createEmptyEntriesRequestt(), response,
Utils.monotonicMs());
Replicator
.onHeartbeatReturned(id, Status.OK(), this.createEmptyEntriesRequest(), response, Utils.monotonicMs());
assertNotNull(r.getHeartbeatTimer());
assertNotSame(timer, r.getHeartbeatTimer());
}

@Test
public void testOnHeartbeatReturnedTermMismatch() {
final Replicator r = getReplicator();
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequestt();
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest();
final RpcRequests.AppendEntriesResponse response = RpcRequests.AppendEntriesResponse.newBuilder(). //
setSuccess(false). //
setLastLogIndex(12).setTerm(2).build();
Expand Down Expand Up @@ -478,7 +478,7 @@ public void testSendHeartbeat() {
id.unlock();

assertNull(r.getHeartbeatInFly());
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequestt();
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest();
Mockito.when(
this.rpcService.appendEntries(eq(peerId.getEndpoint()), eq(request),
eq(this.opts.getElectionTimeoutMs() / 2), Mockito.any())).thenReturn(new FutureImpl<>());
Expand Down Expand Up @@ -657,7 +657,7 @@ public void testOnRpcReturnedOutOfOrder() {
final Replicator r = getReplicator();
assertEquals(-1, r.getWaitId());

final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequestt();
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest();
final RpcRequests.AppendEntriesResponse response = RpcRequests.AppendEntriesResponse.newBuilder(). //
setSuccess(true). //
setLastLogIndex(10).setTerm(1).build();
Expand Down

0 comments on commit 61570a9

Please sign in to comment.