Skip to content

Commit

Permalink
Feature/fix node test (sofastack#790)
Browse files Browse the repository at this point in the history
* (fix) testLeaderFail

* (fix) node test assertion

* (feat) minor change

* (feat) revert assertion in chaos test
  • Loading branch information
killme2008 authored Mar 14, 2022
1 parent 530224e commit f69e7e9
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,50 +17,59 @@
package com.alipay.sofa.jraft.core;

import java.util.concurrent.CountDownLatch;

import java.util.concurrent.atomic.AtomicInteger;
import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.error.RaftError;

import static org.junit.Assert.assertEquals;

public class ExpectClosure implements Closure {
private int expectedErrCode;
private String expectErrMsg;
private CountDownLatch latch;
private final int expectedErrCode;
private final String expectErrMsg;
private final CountDownLatch latch;
private AtomicInteger successCount;

public ExpectClosure(CountDownLatch latch) {
public ExpectClosure(final CountDownLatch latch) {
this(RaftError.SUCCESS, latch);
}

public ExpectClosure(RaftError expectedErrCode, CountDownLatch latch) {
public ExpectClosure(final RaftError expectedErrCode, final CountDownLatch latch) {
this(expectedErrCode, null, latch);

}

public ExpectClosure(RaftError expectedErrCode, String expectErrMsg, CountDownLatch latch) {
public ExpectClosure(final RaftError expectedErrCode, final String expectErrMsg, final CountDownLatch latch) {
super();
this.expectedErrCode = expectedErrCode.getNumber();
this.expectErrMsg = expectErrMsg;
this.latch = latch;
}

public ExpectClosure(int code, String expectErrMsg, CountDownLatch latch) {
public ExpectClosure(final int code, final String expectErrMsg, final CountDownLatch latch) {
this(code, expectErrMsg, latch, null);
}

public ExpectClosure(final int code, final String expectErrMsg, final CountDownLatch latch,
final AtomicInteger successCount) {
super();
this.expectedErrCode = code;
this.expectErrMsg = expectErrMsg;
this.latch = latch;
this.successCount = successCount;
}

@Override
public void run(Status status) {
public void run(final Status status) {
if (this.expectedErrCode >= 0) {
assertEquals(this.expectedErrCode, status.getCode());
}
if (this.expectErrMsg != null) {
assertEquals(this.expectErrMsg, status.getErrorMsg());
}
latch.countDown();
if (status.isOk() && this.successCount != null) {
this.successCount.incrementAndGet();
}
this.latch.countDown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.alipay.sofa.jraft.Iterator;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.entity.LeaderChangeContext;
import com.alipay.sofa.jraft.entity.RaftOutter.SnapshotMeta;
import com.alipay.sofa.jraft.error.RaftError;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotWriter;
Expand Down Expand Up @@ -164,7 +165,8 @@ public void onSnapshotSave(final SnapshotWriter writer, final Closure done) {

@Override
public boolean onSnapshotLoad(final SnapshotReader reader) {
this.lastAppliedIndex.set(0);
SnapshotMeta meta = reader.load();
this.lastAppliedIndex.set(meta.getLastIncludedIndex());
this.loadSnapshotTimes++;
final String path = reader.getPath() + File.separator + "data";
final File file = new File(path);
Expand Down
44 changes: 24 additions & 20 deletions jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,6 @@ private void sendTestTaskAndWait(final Node node) throws InterruptedException {
this.sendTestTaskAndWait(node, 0, RaftError.SUCCESS);
}

private void sendTestTaskAndWait(final Node node, final RaftError err) throws InterruptedException {
this.sendTestTaskAndWait(node, 0, err);
}

private void sendTestTaskAndWait(final Node node, final int start, final RaftError err) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(10);
for (int i = start; i < start + 10; i++) {
Expand All @@ -430,14 +426,16 @@ private void sendTestTaskAndWait(final Node node, final int start, final RaftErr
}

@SuppressWarnings("SameParameterValue")
private void sendTestTaskAndWait(final String prefix, final Node node, final int code) throws InterruptedException {
private int sendTestTaskAndWait(final String prefix, final Node node, final int code) throws InterruptedException {
final CountDownLatch latch = new CountDownLatch(10);
final AtomicInteger successCount = new AtomicInteger(0);
for (int i = 0; i < 10; i++) {
final ByteBuffer data = ByteBuffer.wrap((prefix + i).getBytes());
final Task task = new Task(data, new ExpectClosure(code, null, latch));
final Task task = new Task(data, new ExpectClosure(code, null, latch, successCount));
node.apply(task);
}
waitLatch(latch);
return successCount.get();
}

@Test
Expand Down Expand Up @@ -1632,7 +1630,7 @@ public void testLeaderFail() throws Exception {
// apply something when follower
final List<Node> followers = cluster.getFollowers();
assertFalse(followers.isEmpty());
this.sendTestTaskAndWait("follower apply ", followers.get(0), -1);
int success = this.sendTestTaskAndWait("follower apply ", followers.get(0), -1);

// elect new leader
cluster.waitLeader();
Expand Down Expand Up @@ -1668,7 +1666,7 @@ public void testLeaderFail() throws Exception {
assertTrue(cluster.start(oldLeader.getEndpoint()));
assertTrue(cluster.ensureSame(-1));
for (final MockStateMachine fsm : cluster.getFsms()) {
assertEquals(30, fsm.getLogs().size());
assertEquals(30 + success, fsm.getLogs().size());
}
cluster.stopAll();
}
Expand Down Expand Up @@ -3073,7 +3071,8 @@ public void testChangePeers() throws Exception {
@Test
public void testChangePeersAddMultiNodes() throws Exception {
final PeerId peer0 = new PeerId(TestUtils.getMyIp(), TestUtils.INIT_PORT);
final TestCluster cluster = new TestCluster("testChangePeers", this.dataPath, Collections.singletonList(peer0));
final TestCluster cluster = new TestCluster("testChangePeersAddMultiNodes", this.dataPath,
Collections.singletonList(peer0));
assertTrue(cluster.start(peer0.getEndpoint()));

cluster.waitLeader();
Expand Down Expand Up @@ -3241,7 +3240,7 @@ public void testChangePeersChaosWithSnapshot() throws Exception {
// start cluster
final List<PeerId> peers = new ArrayList<>();
peers.add(new PeerId("127.0.0.1", TestUtils.INIT_PORT));
final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers, 1000);
final TestCluster cluster = new TestCluster("testChangePeersChaosWithSnapshot", this.dataPath, peers, 1000);
assertTrue(cluster.start(peers.get(0).getEndpoint(), false, 2));
// start other peers
for (int i = 1; i < 10; i++) {
Expand Down Expand Up @@ -3281,18 +3280,21 @@ public void testChangePeersChaosWithSnapshot() throws Exception {
assertTrue(st.getErrorMsg(), st.isOk());
cluster.ensureSame();
assertEquals(10, cluster.getFsms().size());
for (final MockStateMachine fsm : cluster.getFsms()) {
assertTrue(fsm.getLogs().size() >= 5000);
try {
for (final MockStateMachine fsm : cluster.getFsms()) {
assertTrue(fsm.getLogs().size() >= 5000);
}
} finally {
cluster.stopAll();
}
cluster.stopAll();
}

@Test
public void testChangePeersChaosWithoutSnapshot() throws Exception {
// start cluster
final List<PeerId> peers = new ArrayList<>();
peers.add(new PeerId("127.0.0.1", TestUtils.INIT_PORT));
final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers, 1000);
final TestCluster cluster = new TestCluster("testChangePeersChaosWithoutSnapshot", this.dataPath, peers, 1000);
assertTrue(cluster.start(peers.get(0).getEndpoint(), false, 100000));
// start other peers
for (int i = 1; i < 10; i++) {
Expand Down Expand Up @@ -3332,19 +3334,22 @@ public void testChangePeersChaosWithoutSnapshot() throws Exception {
assertTrue(done.await().isOk());
cluster.ensureSame();
assertEquals(10, cluster.getFsms().size());
for (final MockStateMachine fsm : cluster.getFsms()) {
assertTrue(fsm.getLogs().size() >= tasks);
assertTrue(fsm.getLogs().size() - tasks < 100);
try {
for (final MockStateMachine fsm : cluster.getFsms()) {
final int logSize = fsm.getLogs().size();
assertTrue("logSize=" + logSize, logSize >= tasks);
}
} finally {
cluster.stopAll();
}
cluster.stopAll();
}

@Test
public void testChangePeersChaosApplyTasks() throws Exception {
// start cluster
final List<PeerId> peers = new ArrayList<>();
peers.add(new PeerId("127.0.0.1", TestUtils.INIT_PORT));
final TestCluster cluster = new TestCluster("unittest", this.dataPath, peers, 1000);
final TestCluster cluster = new TestCluster("testChangePeersChaosApplyTasks", this.dataPath, peers, 1000);
assertTrue(cluster.start(peers.get(0).getEndpoint(), false, 100000));
// start other peers
for (int i = 1; i < 10; i++) {
Expand Down Expand Up @@ -3409,7 +3414,6 @@ public void testChangePeersChaosApplyTasks() throws Exception {
for (final MockStateMachine fsm : cluster.getFsms()) {
final int logSize = fsm.getLogs().size();
assertTrue("logSize= " + logSize, logSize >= 5000 * threads);
assertTrue("logSize= " + logSize, logSize - 5000 * threads < 100);
}
} finally {
cluster.stopAll();
Expand Down

0 comments on commit f69e7e9

Please sign in to comment.