Skip to content

Commit

Permalink
Some corner cases fixes (sofastack#462)
Browse files Browse the repository at this point in the history
* (fix) replicator state

* (fix) Replicate logs before appliedIndex should be considered success.

* (feat) prevent too many block timer created when fail to append entries

* (feat) dump threads stack when test hang too long

* (fix) code format

* (feat) revert readLock for Node#isLeader

* (feat) adds test cases for fixes

* (feat) Adds RpcUtils to run replicator/rpc closures, sofastack#426

* (fix) testReadIndexFromLearner may hang when learner state is not synced

* (fix) format

* format

* format

Co-authored-by: jiachun.fjc <[email protected]>
  • Loading branch information
killme2008 and fengjiachun authored Jun 11, 2020
1 parent e734a5b commit 0d8c96f
Show file tree
Hide file tree
Showing 11 changed files with 294 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2640,7 +2640,12 @@ private void handleVoteTimeout() {

@Override
public boolean isLeader() {
return this.state == State.STATE_LEADER;
this.readLock.lock();
try {
return this.state == State.STATE_LEADER;
} finally {
this.readLock.unlock();
}
}

@Override
Expand Down
32 changes: 20 additions & 12 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/Replicator.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import com.alipay.sofa.jraft.rpc.RpcRequests.TimeoutNowResponse;
import com.alipay.sofa.jraft.rpc.RpcResponseClosure;
import com.alipay.sofa.jraft.rpc.RpcResponseClosureAdapter;
import com.alipay.sofa.jraft.rpc.RpcUtils;
import com.alipay.sofa.jraft.storage.snapshot.SnapshotReader;
import com.alipay.sofa.jraft.util.ByteBufferCollector;
import com.alipay.sofa.jraft.util.OnlyForTest;
Expand Down Expand Up @@ -262,13 +263,13 @@ private static void notifyReplicatorStatusListener(final Replicator replicator,
try {
switch (event) {
case CREATED:
Utils.runInThread(() -> listener.onCreated(peer));
RpcUtils.runInThread(() -> listener.onCreated(peer));
break;
case ERROR:
Utils.runInThread(() -> listener.onError(peer, status));
RpcUtils.runInThread(() -> listener.onError(peer, status));
break;
case DESTROYED:
Utils.runInThread(() -> listener.onDestroyed(peer));
RpcUtils.runInThread(() -> listener.onDestroyed(peer));
break;
default:
break;
Expand Down Expand Up @@ -685,7 +686,7 @@ private void sendEmptyEntries(final boolean isHeartbeat,
// id is unlock in installSnapshot
installSnapshot();
if (isHeartbeat && heartBeatClosure != null) {
Utils.runClosureInThread(heartBeatClosure, new Status(RaftError.EAGAIN,
RpcUtils.runClosureInThread(heartBeatClosure, new Status(RaftError.EAGAIN,
"Fail to send heartbeat to peer %s", this.options.getPeerId()));
}
return;
Expand Down Expand Up @@ -839,7 +840,7 @@ public static void waitForCaughtUp(final ThreadId id, final long maxMargin, fina
final Replicator r = (Replicator) id.lock();

if (r == null) {
Utils.runClosureInThread(done, new Status(RaftError.EINVAL, "No such replicator"));
RpcUtils.runClosureInThread(done, new Status(RaftError.EINVAL, "No such replicator"));
return;
}
try {
Expand Down Expand Up @@ -905,6 +906,7 @@ static boolean continueSending(final ThreadId id, final int errCode) {
}
r.waitId = -1;
if (errCode == RaftError.ETIMEDOUT.getNumber()) {
r.blockTimer = null;
// Send empty entries after block timeout to check the correct
// _next_index otherwise the replicator is likely waits in executor.shutdown();
// _wait_more_entries and no further logs would be replicated even if the
Expand All @@ -921,7 +923,7 @@ static boolean continueSending(final ThreadId id, final int errCode) {
}

static void onBlockTimeout(final ThreadId arg) {
Utils.runInThread(() -> onBlockTimeoutInNewThread(arg));
RpcUtils.runInThread(() -> onBlockTimeoutInNewThread(arg));
}

void block(final long startTimeMs, @SuppressWarnings("unused") final int errorCode) {
Expand All @@ -930,6 +932,11 @@ void block(final long startTimeMs, @SuppressWarnings("unused") final int errorCo
// each individual error (e.g. we don't need check every
// heartbeat_timeout_ms whether a dead follower has come back), but it's just
// fine now.
if(this.blockTimer != null) {
// already in blocking state,return immediately.
this.id.unlock();
return;
}
final long dueTime = startTimeMs + this.options.getDynamicHeartBeatTimeoutMs();
try {
LOG.debug("Blocking {} for {} ms", this.options.getPeerId(), this.options.getDynamicHeartBeatTimeoutMs());
Expand All @@ -938,6 +945,7 @@ void block(final long startTimeMs, @SuppressWarnings("unused") final int errorCo
this.statInfo.runningState = RunningState.BLOCKING;
this.id.unlock();
} catch (final Exception e) {
this.blockTimer = null;
LOG.error("Fail to add timer", e);
// id unlock in sendEmptyEntries.
sendEmptyEntries(false);
Expand Down Expand Up @@ -983,7 +991,7 @@ public void onError(final ThreadId id, final Object data, final int errorCode) {
}
} else if (errorCode == RaftError.ETIMEDOUT.getNumber()) {
id.unlock();
Utils.runInThread(() -> sendHeartbeat(id));
RpcUtils.runInThread(() -> sendHeartbeat(id));
} else {
id.unlock();
// noinspection ConstantConditions
Expand Down Expand Up @@ -1034,7 +1042,7 @@ private void notifyOnCaughtUp(final int code, final boolean beforeDestroy) {
}
final CatchUpClosure savedClosure = this.catchUpClosure;
this.catchUpClosure = null;
Utils.runClosureInThread(savedClosure, savedClosure.getStatus());
RpcUtils.runClosureInThread(savedClosure, savedClosure.getStatus());
}

private static void onTimeout(final ThreadId id) {
Expand Down Expand Up @@ -1417,10 +1425,10 @@ private static boolean onAppendEntriesReturned(final ThreadId id, final Inflight
LOG.debug("Replicated logs in [{}, {}] to peer {}", r.nextIndex, r.nextIndex + entriesSize - 1,
r.options.getPeerId());
}
} else {
// The request is probe request, change the state into Replicate.
r.state = State.Replicate;
}

r.state = State.Replicate;
r.blockTimer = null;
r.nextIndex += entriesSize;
r.hasSucceeded = true;
r.notifyOnCaughtUp(RaftError.SUCCESS.getNumber(), false);
Expand Down Expand Up @@ -1583,7 +1591,7 @@ public void run(final Status status) {
public static void sendHeartbeat(final ThreadId id, final RpcResponseClosure<AppendEntriesResponse> closure) {
final Replicator r = (Replicator) id.lock();
if (r == null) {
Utils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Peer %s is not connected", id));
RpcUtils.runClosureInThread(closure, new Status(RaftError.EHOSTDOWN, "Peer %s is not connected", id));
return;
}
//id unlock in send empty entries.
Expand Down
111 changes: 111 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/RpcUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.rpc;

import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.sofa.jraft.Closure;
import com.alipay.sofa.jraft.Status;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.SystemPropertyUtil;
import com.alipay.sofa.jraft.util.ThreadPoolUtil;
import com.alipay.sofa.jraft.util.Utils;

/**
* RPC utilities
*
* @author boyan([email protected])
*/
public final class RpcUtils {

private static final Logger LOG = LoggerFactory.getLogger(RpcUtils.class);

/**
* Default jraft closure executor pool minimum size, CPUs by default.
*/
public static final int MIN_RPC_CLOSURE_EXECUTOR_POOL_SIZE = SystemPropertyUtil.getInt(
"jraft.rpc.closure.threadpool.size.min",
Utils.cpus());

/**
* Default jraft closure executor pool maximum size.
*/
public static final int MAX_RPC_CLOSURE_EXECUTOR_POOL_SIZE = SystemPropertyUtil.getInt(
"jraft.rpc.closure.threadpool.size.max",
Math.max(100, Utils.cpus() * 5));

/**
* Global thread pool to run rpc closure.
*/
private static ThreadPoolExecutor RPC_CLOSURE_EXECUTOR = ThreadPoolUtil
.newBuilder()
.poolName("JRAFT_RPC_CLOSURE_EXECUTOR")
.enableMetric(true)
.coreThreads(
MIN_RPC_CLOSURE_EXECUTOR_POOL_SIZE)
.maximumThreads(
MAX_RPC_CLOSURE_EXECUTOR_POOL_SIZE)
.keepAliveSeconds(60L)
.workQueue(new SynchronousQueue<>())
.threadFactory(
new NamedThreadFactory(
"JRaft-Rpc-Closure-Executor-",
true)) //
.build();

/**
* Run closure with OK status in thread pool.
*/
public static Future<?> runClosureInThread(final Closure done) {
if (done == null) {
return null;
}
return runClosureInThread(done, Status.OK());
}

/**
* Run a task in thread pool,returns the future object.
*/
public static Future<?> runInThread(final Runnable runnable) {
return RPC_CLOSURE_EXECUTOR.submit(runnable);
}

/**
* Run closure with status in thread pool.
*/
public static Future<?> runClosureInThread(final Closure done, final Status status) {
if (done == null) {
return null;
}

return runInThread(() -> {
try {
done.run(status);
} catch (final Throwable t) {
LOG.error("Fail to run done closure", t);
}
});
}

private RpcUtils() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import com.alipay.sofa.jraft.rpc.RpcRequests.PingRequest;
import com.alipay.sofa.jraft.rpc.RpcResponseClosure;
import com.alipay.sofa.jraft.rpc.RpcResponseFactory;
import com.alipay.sofa.jraft.rpc.RpcUtils;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.NamedThreadFactory;
import com.alipay.sofa.jraft.util.RpcFactoryHelper;
Expand Down Expand Up @@ -188,7 +189,7 @@ public <T extends Message> Future<Message> invokeWithDone(final Endpoint endpoin
if (rc == null) {
future.failure(new IllegalStateException("Client service is uninitialized."));
// should be in another thread to avoid dead locking.
Utils.runClosureInThread(done, new Status(RaftError.EINTERNAL, "Client service is uninitialized."));
RpcUtils.runClosureInThread(done, new Status(RaftError.EINTERNAL, "Client service is uninitialized."));
return future;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,8 @@ public void appendEntries(final List<LogEntry> entries, final StableClosure done
this.writeLock.lock();
try {
if (!entries.isEmpty() && !checkAndResolveConflict(entries, done)) {
// If checkAndResolveConflict returns false, the done will be called in it.
entries.clear();
Utils.runClosureInThread(done, new Status(RaftError.EINTERNAL, "Fail to checkAndResolveConflict."));
return;
}
for (int i = 0; i < entries.size(); i++) {
Expand Down Expand Up @@ -1008,6 +1008,8 @@ private boolean checkAndResolveConflict(final List<LogEntry> entries, final Stab
LOG.warn(
"Received entries of which the lastLog={} is not greater than appliedIndex={}, return immediately with nothing changed.",
lastLogEntry.getId().getIndex(), appliedIndex);
// Replicate old logs before appliedIndex should be considered successfully, response OK.
Utils.runClosureInThread(done);
return false;
}
if (firstLogEntry.getId().getIndex() == this.lastLogIndex + 1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import com.alipay.sofa.jraft.rpc.RpcRequests.GetFileRequest;
import com.alipay.sofa.jraft.rpc.RpcRequests.GetFileResponse;
import com.alipay.sofa.jraft.rpc.RpcResponseClosureAdapter;
import com.alipay.sofa.jraft.rpc.RpcUtils;
import com.alipay.sofa.jraft.storage.SnapshotThrottle;
import com.alipay.sofa.jraft.util.ByteBufferCollector;
import com.alipay.sofa.jraft.util.Endpoint;
Expand Down Expand Up @@ -203,7 +204,7 @@ private void onFinished() {
}

private void onTimer() {
Utils.runInThread(this::sendNextRpc);
RpcUtils.runInThread(this::sendNextRpc);
}

void onRpcReturned(final Status status, final GetFileResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
*
* 2018-Apr-07 10:12:35 AM
*/
public class Utils {
public final class Utils {

private static final Logger LOG = LoggerFactory.getLogger(Utils.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
Expand Down Expand Up @@ -104,10 +105,42 @@ public class NodeTest {

private long testStartMs;

private static DumpThread dumpThread;

static class DumpThread extends Thread {
private static long DUMP_TIMEOUT_MS = 5 * 60 * 1000;
private volatile boolean stopped = false;

@Override
public void run() {
while (!this.stopped) {
try {
Thread.sleep(DUMP_TIMEOUT_MS);
System.out.println("Test hang too long, dump threads");
TestUtils.dumpThreads();
} catch (InterruptedException e) {
// reset request, continue
continue;
}
}
}
}

@BeforeClass
public static void setupRocksdbOptions() {
public static void setupNodeTest() {
StorageOptionsFactory.registerRocksDBTableFormatConfig(RocksDBLogStorage.class, StorageOptionsFactory
.getDefaultRocksDBTableConfig().setBlockCacheSize(256 * SizeUnit.MB));
dumpThread = new DumpThread();
dumpThread.setName("NodeTest-DumpThread");
dumpThread.setDaemon(true);
dumpThread.start();
}

@AfterClass
public static void tearNodeTest() throws Exception {
dumpThread.stopped = true;
dumpThread.interrupt();
dumpThread.join(100);
}

@Before
Expand All @@ -117,6 +150,7 @@ public void setup() throws Exception {
FileUtils.forceMkdir(new File(this.dataPath));
assertEquals(NodeImpl.GLOBAL_NUM_NODES.get(), 0);
this.testStartMs = Utils.monotonicMs();
dumpThread.interrupt(); // reset dump timeout
}

@After
Expand Down Expand Up @@ -1365,6 +1399,7 @@ public void testReadIndexFromLearner() throws Exception {
assertEquals(1, leader.listLearners().size());
}

Thread.sleep(100);
// read from learner
Node learner = cluster.getNodes().get(3);
assertNotNull(leader);
Expand Down Expand Up @@ -1741,6 +1776,7 @@ public void testRemoveLeader() throws Exception {
CountDownLatch latch = new CountDownLatch(1);
leader.removePeer(oldLeader, new ExpectClosure(latch));
waitLatch(latch);
Thread.sleep(100);

// elect new leader
cluster.waitLeader();
Expand Down
Loading

0 comments on commit 0d8c96f

Please sign in to comment.