Skip to content

Commit

Permalink
Feature/2020.09.22 changes (sofastack#511)
Browse files Browse the repository at this point in the history
* (feat) Don't update disk id when setSnapshot to avoid some corner cases.

* (feat) Send heartbeat response directly, don't go through pipeline processing
  • Loading branch information
killme2008 authored Sep 23, 2020
1 parent 3a1bdcf commit 7fdf7ec
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ private static class NodeReadWriteLock extends LongHeldDetectingReadWriteLock {

private final Node node;

public NodeReadWriteLock(Node node) {
public NodeReadWriteLock(final Node node) {
super(MAX_BLOCKING_MS_TO_REPORT, TimeUnit.MILLISECONDS);
this.node = node;
}
Expand Down Expand Up @@ -1943,7 +1943,7 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi
}

if (entriesCount == 0) {
// heartbeat
// heartbeat or probe request
final AppendEntriesResponse.Builder respBuilder = AppendEntriesResponse.newBuilder() //
.setSuccess(true) //
.setTerm(this.currTerm) //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import com.codahale.metrics.MetricFilter;
import com.codahale.metrics.MetricRegistry;
import com.codahale.metrics.MetricSet;
import com.google.protobuf.ByteString;
import com.google.protobuf.Message;
import com.google.protobuf.ZeroByteStringHelper;

Expand Down Expand Up @@ -694,9 +695,9 @@ private void sendEmptyEntries(final boolean isHeartbeat,
}
try {
final long monotonicSendTimeMs = Utils.monotonicMs();
final AppendEntriesRequest request = rb.build();

if (isHeartbeat) {
final AppendEntriesRequest request = rb.build();
// Sending a heartbeat request
this.heartbeatCounter++;
RpcResponseClosure<AppendEntriesResponse> heartbeatDone;
Expand All @@ -715,6 +716,10 @@ public void run(final Status status) {
this.heartbeatInFly = this.rpcService.appendEntries(this.options.getPeerId().getEndpoint(), request,
this.options.getElectionTimeoutMs() / 2, heartbeatDone);
} else {
// No entries and has empty data means a probe request.
// TODO(boyan) refactor, adds a new flag field?
rb.setData(ByteString.EMPTY);
final AppendEntriesRequest request = rb.build();
// Sending a probe request.
this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
this.statInfo.firstLogIndex = this.nextIndex;
Expand All @@ -737,7 +742,7 @@ public void run(final Status status) {
addInflight(RequestType.AppendEntries, this.nextIndex, 0, 0, seq, rpcFuture);
}
LOG.debug("Node {} send HeartbeatRequest to {} term {} lastCommittedIndex {}", this.options.getNode()
.getNodeId(), this.options.getPeerId(), this.options.getTerm(), request.getCommittedIndex());
.getNodeId(), this.options.getPeerId(), this.options.getTerm(), rb.getCommittedIndex());
} finally {
this.id.unlock();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executor;

import org.apache.commons.lang.StringUtils;

import com.alipay.sofa.jraft.JRaftUtils;
import com.alipay.sofa.jraft.Node;
import com.alipay.sofa.jraft.NodeManager;
Expand All @@ -48,7 +46,7 @@
*
* @author boyan ([email protected])
*
* 2018-Apr-04 3:00:13 PM
* 2018-Apr-04 3:00:13 PM
*/
public class AppendEntriesRequestProcessor extends NodeRequestProcessor<AppendEntriesRequest> implements
ConnectionClosedEventListener {
Expand All @@ -57,6 +55,7 @@ public class AppendEntriesRequestProcessor extends NodeRequestProcessor<AppendEn

/**
* Peer executor selector.
*
* @author dennis
*/
final class PeerExecutorSelector implements RpcProcessor.ExecutorSelector {
Expand Down Expand Up @@ -99,34 +98,42 @@ public Executor select(final String reqClass, final Object reqHeader) {
*/
class SequenceRpcRequestClosure extends RpcRequestClosure {

private final int reqSequence;
private final String groupId;
private final String peerId;
private final int reqSequence;
private final String groupId;
private final String peerId;
private final boolean isHeartbeat;

public SequenceRpcRequestClosure(RpcRequestClosure parent, int sequence, String groupId, String peerId,
Message defaultResp) {
public SequenceRpcRequestClosure(final RpcRequestClosure parent, final Message defaultResp,
final String groupId, final String peerId, final int sequence,
final boolean isHeartbeat) {
super(parent.getRpcCtx(), defaultResp);
this.reqSequence = sequence;
this.groupId = groupId;
this.peerId = peerId;
this.isHeartbeat = isHeartbeat;
}

@Override
public void sendResponse(final Message msg) {
sendSequenceResponse(this.groupId, this.peerId, this.reqSequence, getRpcCtx(), msg);
if (this.isHeartbeat) {
super.sendResponse(msg);
} else {
sendSequenceResponse(this.groupId, this.peerId, this.reqSequence, getRpcCtx(), msg);
}
}
}

/**
* Response message wrapper with a request sequence number and asyncContext.done
*
* @author dennis
*/
static class SequenceMessage implements Comparable<SequenceMessage> {
public final Message msg;
private final int sequence;
private final RpcContext rpcCtx;

public SequenceMessage(RpcContext rpcCtx, Message msg, int sequence) {
public SequenceMessage(final RpcContext rpcCtx, final Message msg, final int sequence) {
super();
this.rpcCtx = rpcCtx;
this.msg = msg;
Expand Down Expand Up @@ -302,8 +309,7 @@ void removePeerRequestContext(final String groupId, final String peerId) {
}

/**
* RAFT group peer request contexts
* Map<groupId, <peerId, ctx>>
* RAFT group peer request contexts Map<groupId, <peerId, ctx>>
*/
private final ConcurrentMap<String, ConcurrentMap<String, PeerRequestContext>> peerRequestContexts = new ConcurrentHashMap<>();

Expand All @@ -312,7 +318,7 @@ void removePeerRequestContext(final String groupId, final String peerId) {
*/
private final ExecutorSelector executorSelector;

public AppendEntriesRequestProcessor(Executor executor) {
public AppendEntriesRequestProcessor(final Executor executor) {
super(executor, RpcRequests.AppendEntriesResponse.getDefaultInstance());
this.executorSelector = new PeerExecutorSelector();
}
Expand All @@ -339,6 +345,12 @@ private int getAndIncrementNextRequiredSequence(final String groupId, final Stri
return getPeerRequestContext(groupId, peerId, conn).getAndIncrementNextRequiredSequence();
}

private boolean isHeartbeatRequest(final AppendEntriesRequest request) {
// No entries and no data means a true heartbeat request.
// TODO(boyan) refactor, adds a new flag field?
return request.getEntriesCount() == 0 && !request.hasData();
}

@Override
public Message processRequest0(final RaftServerService service, final AppendEntriesRequest request,
final RpcRequestClosure done) {
Expand All @@ -349,11 +361,19 @@ public Message processRequest0(final RaftServerService service, final AppendEntr
final String groupId = request.getGroupId();
final String peerId = request.getPeerId();

final int reqSequence = getAndIncrementSequence(groupId, peerId, done.getRpcCtx().getConnection());
boolean isHeartbeat = isHeartbeatRequest(request);
int reqSequence = -1;
if (!isHeartbeat) {
reqSequence = getAndIncrementSequence(groupId, peerId, done.getRpcCtx().getConnection());
}
final Message response = service.handleAppendEntriesRequest(request, new SequenceRpcRequestClosure(done,
reqSequence, groupId, peerId, defaultResp()));
defaultResp(), groupId, peerId, reqSequence, isHeartbeat));
if (response != null) {
sendSequenceResponse(groupId, peerId, reqSequence, done.getRpcCtx(), response);
if (isHeartbeat) {
done.getRpcCtx().sendResponse(response);
} else {
sendSequenceResponse(groupId, peerId, reqSequence, done.getRpcCtx(), response);
}
}
return null;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -619,9 +619,14 @@ public void setSnapshot(final SnapshotMeta meta) {
if (this.lastSnapshotId.compareTo(this.appliedId) > 0) {
this.appliedId = this.lastSnapshotId.copy();
}
if (this.lastSnapshotId.compareTo(this.diskId) > 0) {
this.diskId = this.lastSnapshotId.copy();
}
// NOTICE: not to update disk_id here as we are not sure if this node really
// has these logs on disk storage. Just leave disk_id as it was, which can keep
// these logs in memory all the time until they are flushed to disk. By this
// way we can avoid some corner cases which failed to get logs.
// See https://github.com/baidu/braft/pull/224/commits/8ef6fdbf70d23f5a4ee147356a889e2c0fa22aac
// if (this.lastSnapshotId.compareTo(this.diskId) > 0) {
// this.diskId = this.lastSnapshotId.copy();
// }

if (term == 0) {
// last_included_index is larger than last_index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,9 +321,8 @@ public void run(final Status status, final long index, final byte[] reqCtx) {
if (status.isOk()) {
readIndexSuccesses.incrementAndGet();
} else {
assertTrue(
"Unexpect status: " + status,
status.getErrorMsg().contains(errorMsg)
assertTrue("Unexpected status: " + status,
status.getErrorMsg().contains(errorMsg) || status.getRaftError() == RaftError.ETIMEDOUT
|| status.getErrorMsg().contains("Invalid state for readIndex: STATE_ERROR"));
}
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import com.alipay.sofa.jraft.rpc.impl.FutureImpl;
import com.alipay.sofa.jraft.storage.LogManager;
import com.alipay.sofa.jraft.storage.SnapshotStorage;

import com.google.protobuf.ByteString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
Expand Down Expand Up @@ -279,6 +279,7 @@ private RpcRequests.AppendEntriesRequest createEmptyEntriesRequestToPeer(final P
.setPrevLogIndex(10) //
.setPrevLogTerm(1) //
.setCommittedIndex(0) //
.setData(ByteString.EMPTY) //
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,21 +113,32 @@ public void setup() {
}

private void mockSendEmptyEntries() {
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest();
this.mockSendEmptyEntries(false);
}

private void mockSendEmptyEntries(final boolean isHeartbeat) {
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest(isHeartbeat);
Mockito.when(this.rpcService.appendEntries(eq(this.peerId.getEndpoint()), eq(request), eq(-1), Mockito.any()))
.thenReturn(new FutureImpl<>());
}

private RpcRequests.AppendEntriesRequest createEmptyEntriesRequest() {
return RpcRequests.AppendEntriesRequest.newBuilder() //
return this.createEmptyEntriesRequest(false);
}

private RpcRequests.AppendEntriesRequest createEmptyEntriesRequest(final boolean isHeartbeat) {
RpcRequests.AppendEntriesRequest.Builder rb = RpcRequests.AppendEntriesRequest.newBuilder() //
.setGroupId("test") //
.setServerId(new PeerId("localhost", 8082).toString()) //
.setPeerId(this.peerId.toString()) //
.setTerm(1) //
.setPrevLogIndex(10) //
.setPrevLogTerm(1) //
.setCommittedIndex(0) //
.build();
.setCommittedIndex(0);
if (!isHeartbeat) {
rb.setData(ByteString.EMPTY);
}
return rb.build();
}

@After
Expand Down Expand Up @@ -262,6 +273,7 @@ public void testOnRpcReturnedMoreLogs() {
setPeerId(this.peerId.toString()). //
setTerm(1). //
setPrevLogIndex(9). //
setData(ByteString.EMPTY). //
setPrevLogTerm(1). //
setCommittedIndex(0).build();
Mockito.when(this.rpcService.appendEntries(eq(this.peerId.getEndpoint()), eq(newReq), eq(-1), Mockito.any()))
Expand Down Expand Up @@ -300,6 +312,7 @@ public void testOnRpcReturnedLessLogs() {
.setTerm(1) //
.setPrevLogIndex(8) //
.setPrevLogTerm(1) //
.setData(ByteString.EMPTY) //
.setCommittedIndex(0) //
.build();
Mockito.when(this.rpcService.appendEntries(eq(this.peerId.getEndpoint()), eq(newReq), eq(-1), Mockito.any()))
Expand Down Expand Up @@ -440,7 +453,7 @@ public void testSetErrorTimeout() throws Exception {
final Replicator r = getReplicator();
this.id.unlock();
assertNull(r.getHeartbeatInFly());
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest();
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest(true);
Mockito.when(
this.rpcService.appendEntries(eq(this.peerId.getEndpoint()), eq(request),
eq(this.opts.getElectionTimeoutMs() / 2), Mockito.any())).thenReturn(new FutureImpl<>());
Expand Down Expand Up @@ -539,7 +552,7 @@ public void testSendHeartbeat() {
this.id.unlock();

assertNull(r.getHeartbeatInFly());
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest();
final RpcRequests.AppendEntriesRequest request = createEmptyEntriesRequest(true);
Mockito.when(
this.rpcService.appendEntries(eq(this.peerId.getEndpoint()), eq(request),
eq(this.opts.getElectionTimeoutMs() / 2), Mockito.any())).thenReturn(new FutureImpl<>());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ public void run(final Status status) {

assertEquals(1, this.logManager.getFirstLogIndex());
assertEquals(10, this.logManager.getLastLogIndex());
Thread.sleep(200); // waiting for setDiskId()
this.logManager.setAppliedId(new LogId(9, 1));

for (int i = 0; i < 10; i++) {
Expand Down

0 comments on commit 7fdf7ec

Please sign in to comment.