Skip to content

Commit

Permalink
(fix) Declare rpcClient to be volatile, fix sofastack#319 (sofastack#323
Browse files Browse the repository at this point in the history
)

* (fix) Declare rpcClient to be volatile, fix sofastack#319

* (fix) improve check null with rpcClient (sofastack#326)

* (feat) minor changes
  • Loading branch information
killme2008 authored and fengjiachun committed Nov 1, 2019
1 parent e321198 commit b727ab2
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,7 @@ public void apply(final Task task) {
}

} catch (final Exception e) {
LOG.error("Fail to apply task.", e);
Utils.runClosureInThread(task.getDone(), new Status(RaftError.EPERM, "Node is down."));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public abstract class AbstractBoltClientService implements ClientService {
ProtobufMsgFactory.load();
}

protected RpcClient rpcClient;
protected volatile RpcClient rpcClient;
protected ThreadPoolExecutor rpcExecutor;
protected RpcOptions rpcOptions;
protected JRaftRpcAddressParser rpcAddressParser;
Expand All @@ -73,7 +73,12 @@ public RpcClient getRpcClient() {

@Override
public boolean isConnected(final Endpoint endpoint) {
return this.rpcClient.checkConnection(endpoint.toString());
final RpcClient rc = this.rpcClient;
return rc != null && isConnected(rc, endpoint);
}

private static boolean isConnected(final RpcClient rpcClient, final Endpoint endpoint) {
return rpcClient.checkConnection(endpoint.toString());
}

@Override
Expand Down Expand Up @@ -124,18 +129,19 @@ public synchronized void shutdown() {

@Override
public boolean connect(final Endpoint endpoint) {
if (this.rpcClient == null) {
throw new IllegalStateException("Client service is not inited.");
final RpcClient rc = this.rpcClient;
if (rc == null) {
throw new IllegalStateException("Client service is uninitialized.");
}
if (isConnected(endpoint)) {
if (isConnected(rc, endpoint)) {
return true;
}
try {
final PingRequest req = PingRequest.newBuilder() //
.setSendTimestamp(System.currentTimeMillis()) //
.build();
final ErrorResponse resp = (ErrorResponse) this.rpcClient.invokeSync(endpoint.toString(), req,
this.defaultInvokeCtx, this.rpcOptions.getRpcConnectTimeoutMs());
final ErrorResponse resp = (ErrorResponse) rc.invokeSync(endpoint.toString(), req, this.defaultInvokeCtx,
this.rpcOptions.getRpcConnectTimeoutMs());
return resp.getErrorCode() == 0;
} catch (final InterruptedException e) {
Thread.currentThread().interrupt();
Expand All @@ -148,8 +154,12 @@ public boolean connect(final Endpoint endpoint) {

@Override
public boolean disconnect(final Endpoint endpoint) {
LOG.info("Disconnect from {}", endpoint);
this.rpcClient.closeConnection(endpoint.toString());
final RpcClient rc = this.rpcClient;
if (rc == null) {
return true;
}
LOG.info("Disconnect from {}.", endpoint);
rc.closeConnection(endpoint.toString());
return true;
}

Expand All @@ -175,10 +185,18 @@ public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoin
final InvokeContext ctx,
final RpcResponseClosure<T> done, final int timeoutMs,
final Executor rpcExecutor) {
final RpcClient rc = this.rpcClient;

final FutureImpl<Message> future = new FutureImpl<>();
try {
if (rc == null) {
future.failure(new IllegalStateException("Client service is uninitialized."));
// should be in another thread to avoid dead locking.
Utils.runClosureInThread(done, new Status(RaftError.EINTERNAL, "Client service is uninitialized."));
return future;
}
final Url rpcUrl = this.rpcAddressParser.parse(endpoint.toString());
this.rpcClient.invokeWithCallback(rpcUrl, request, ctx, new InvokeCallback() {
rc.invokeWithCallback(rpcUrl, request, ctx, new InvokeCallback() {

@SuppressWarnings("unchecked")
@Override
Expand Down
50 changes: 27 additions & 23 deletions jraft-core/src/test/java/com/alipay/sofa/jraft/core/NodeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,16 @@
*/
package com.alipay.sofa.jraft.core;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.io.File;
import java.nio.ByteBuffer;
import java.util.ArrayList;
Expand Down Expand Up @@ -65,16 +75,6 @@
import com.alipay.sofa.jraft.util.Utils;
import com.codahale.metrics.ConsoleReporter;

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

public class NodeTest {

static final Logger LOG = LoggerFactory.getLogger(NodeTest.class);
Expand All @@ -99,8 +99,8 @@ public void teardown() throws Exception {
}
FileUtils.deleteDirectory(new File(this.dataPath));
NodeManager.getInstance().clear();
startedCounter.set(0);
stoppedCounter.set(0);
this.startedCounter.set(0);
this.stoppedCounter.set(0);
}

@Test
Expand Down Expand Up @@ -152,11 +152,15 @@ public void testNodeTaskOverload() throws Exception {
final ByteBuffer data = ByteBuffer.wrap(("hello" + i).getBytes());
final Task task = new Task(data, status -> {
System.out.println(status);
if (!status.isOk()) {
assertTrue(status.getRaftError() == RaftError.EBUSY || status.getRaftError() == RaftError.EPERM);
try {
if (!status.isOk()) {
assertTrue(
status.getRaftError() == RaftError.EBUSY || status.getRaftError() == RaftError.EPERM);
}
c.incrementAndGet();
} finally {
latch.countDown();
}
c.incrementAndGet();
latch.countDown();
});
node.apply(task);
}
Expand Down Expand Up @@ -277,7 +281,7 @@ public void testTripleNodesWithReplicatorStateListener() throws Exception {
}
// elect leader
cluster.waitLeader();
assertEquals(4, startedCounter.get());
assertEquals(4, this.startedCounter.get());
assertEquals(2, cluster.getLeader().getReplicatorStatueListeners().size());
assertEquals(2, cluster.getFollowers().get(0).getReplicatorStatueListeners().size());
assertEquals(2, cluster.getFollowers().get(1).getReplicatorStatueListeners().size());
Expand All @@ -294,20 +298,20 @@ public void testTripleNodesWithReplicatorStateListener() throws Exception {

class UserReplicatorStateListener implements Replicator.ReplicatorStateListener {
@Override
public void onCreated(PeerId peer) {
public void onCreated(final PeerId peer) {
LOG.info("Replicator has created");
startedCounter.incrementAndGet();
NodeTest.this.startedCounter.incrementAndGet();
}

@Override
public void onError(PeerId peer, Status status) {
public void onError(final PeerId peer, final Status status) {
LOG.info("Replicator has errors");
}

@Override
public void onDestroyed(PeerId peer) {
public void onDestroyed(final PeerId peer) {
LOG.info("Replicator has been destroyed");
stoppedCounter.incrementAndGet();
NodeTest.this.stoppedCounter.incrementAndGet();
}
}

Expand Down Expand Up @@ -335,7 +339,7 @@ public void testLeaderTransferWithReplicatorStateListener() throws Exception {
assertTrue(leader.transferLeadershipTo(targetPeer).isOk());
Thread.sleep(1000);
cluster.waitLeader();
assertEquals(2, startedCounter.get());
assertEquals(2, this.startedCounter.get());

for (Node node : cluster.getNodes()) {
node.clearReplicatorStateListeners();
Expand Down

0 comments on commit b727ab2

Please sign in to comment.