Skip to content

Commit

Permalink
feat/grpc impl (sofastack#433)
Browse files Browse the repository at this point in the history
* jraft grpc rpc implement (sofastack#427)
Co-authored-by: SteNicholas <[email protected]>
  • Loading branch information
fengjiachun authored May 12, 2020
1 parent 7f1f74e commit 59853a7
Show file tree
Hide file tree
Showing 91 changed files with 7,060 additions and 1,612 deletions.
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ env:
- TESTFOLDER=jraft-core
- TESTFOLDER=jraft-rheakv/rheakv-core
- TESTFOLDER=jraft-rheakv/rheakv-pd
- TESTFOLDER=jraft-extension/rpc-grpc-impl

cache:
directories:
Expand Down
2 changes: 1 addition & 1 deletion jraft-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>jraft-parent</artifactId>
<groupId>com.alipay.sofa</groupId>
<version>1.3.1</version>
<version>1.3.2.beta1</version>
</parent>
<artifactId>jraft-core</artifactId>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

import com.alipay.remoting.util.ConcurrentHashSet;
import com.alipay.sofa.jraft.entity.NodeId;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.OnlyForTest;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.util.concurrent.ConcurrentHashSet;

/**
* Raft nodes manager.
Expand Down
69 changes: 46 additions & 23 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@
import com.alipay.sofa.jraft.rpc.RpcRequests.TimeoutNowResponse;
import com.alipay.sofa.jraft.rpc.RpcResponseClosure;
import com.alipay.sofa.jraft.rpc.RpcResponseClosureAdapter;
import com.alipay.sofa.jraft.rpc.RpcResponseFactory;
import com.alipay.sofa.jraft.rpc.impl.core.DefaultRaftClientService;
import com.alipay.sofa.jraft.storage.LogManager;
import com.alipay.sofa.jraft.storage.LogStorage;
Expand All @@ -112,6 +111,7 @@
import com.alipay.sofa.jraft.util.Platform;
import com.alipay.sofa.jraft.util.RepeatedTimer;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.RpcFactoryHelper;
import com.alipay.sofa.jraft.util.SignalHelper;
import com.alipay.sofa.jraft.util.ThreadHelper;
import com.alipay.sofa.jraft.util.ThreadId;
Expand Down Expand Up @@ -1608,15 +1608,19 @@ public Message handlePreVoteRequest(final RequestVoteRequest request) {
try {
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.",
getNodeId(), this.state.name());
return RpcFactoryHelper //
.responseFactory() //
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Node %s is not in active state, state %s.", getNodeId(), this.state.name());
}
final PeerId candidateId = new PeerId();
if (!candidateId.parse(request.getServerId())) {
LOG.warn("Node {} received PreVoteRequest from {} serverId bad format.", getNodeId(),
request.getServerId());
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse candidateId failed: %s.",
request.getServerId());
return RpcFactoryHelper //
.responseFactory() //
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Parse candidateId failed: %s.", request.getServerId());
}
boolean granted = false;
// noinspection ConstantConditions
Expand Down Expand Up @@ -1700,15 +1704,19 @@ public Message handleRequestVoteRequest(final RequestVoteRequest request) {
try {
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.",
getNodeId(), this.state.name());
return RpcFactoryHelper //
.responseFactory() //
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Node %s is not in active state, state %s.", getNodeId(), this.state.name());
}
final PeerId candidateId = new PeerId();
if (!candidateId.parse(request.getServerId())) {
LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(),
request.getServerId());
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse candidateId failed: %s.",
request.getServerId());
return RpcFactoryHelper //
.responseFactory() //
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Parse candidateId failed: %s.", request.getServerId());
}

// noinspection ConstantConditions
Expand Down Expand Up @@ -1840,16 +1848,20 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi
try {
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s is not in active state, state %s.",
getNodeId(), this.state.name());
return RpcFactoryHelper //
.responseFactory() //
.newResponse(AppendEntriesResponse.getDefaultInstance(), RaftError.EINVAL,
"Node %s is not in active state, state %s.", getNodeId(), this.state.name());
}

final PeerId serverId = new PeerId();
if (!serverId.parse(request.getServerId())) {
LOG.warn("Node {} received AppendEntriesRequest from {} serverId bad format.", getNodeId(),
request.getServerId());
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse serverId failed: %s.",
request.getServerId());
return RpcFactoryHelper //
.responseFactory() //
.newResponse(AppendEntriesResponse.getDefaultInstance(), RaftError.EINVAL,
"Parse serverId failed: %s.", request.getServerId());
}

// Check stale term
Expand Down Expand Up @@ -1881,8 +1893,10 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi

if (entriesCount > 0 && this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
LOG.warn("Node {} received AppendEntriesRequest while installing snapshot.", getNodeId());
return RpcResponseFactory.newResponse(RaftError.EBUSY, "Node %s:%s is installing snapshot.",
this.groupId, this.serverId);
return RpcFactoryHelper //
.responseFactory() //
.newResponse(AppendEntriesResponse.getDefaultInstance(), RaftError.EBUSY,
"Node %s:%s is installing snapshot.", this.groupId, this.serverId);
}

final long prevLogIndex = request.getPrevLogIndex();
Expand Down Expand Up @@ -1939,10 +1953,12 @@ public Message handleAppendEntriesRequest(final AppendEntriesRequest request, fi
"Corrupted log entry received from leader, index={}, term={}, expectedChecksum={}, realChecksum={}",
logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(),
realChecksum);
return RpcResponseFactory.newResponse(RaftError.EINVAL,
"The log entry is corrupted, index=%d, term=%d, expectedChecksum=%d, realChecksum=%d",
logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(),
realChecksum);
return RpcFactoryHelper //
.responseFactory() //
.newResponse(AppendEntriesResponse.getDefaultInstance(), RaftError.EINVAL,
"The log entry is corrupted, index=%d, term=%d, expectedChecksum=%d, realChecksum=%d",
logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(),
realChecksum);
}
entries.add(logEntry);
}
Expand Down Expand Up @@ -3152,21 +3168,28 @@ public Message handleTimeoutNowRequest(final TimeoutNowRequest request, final Rp
@Override
public Message handleInstallSnapshot(final InstallSnapshotRequest request, final RpcRequestClosure done) {
if (this.snapshotExecutor == null) {
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Not supported snapshot");
return RpcFactoryHelper //
.responseFactory() //
.newResponse(InstallSnapshotResponse.getDefaultInstance(), RaftError.EINVAL, "Not supported snapshot");
}
final PeerId serverId = new PeerId();
if (!serverId.parse(request.getServerId())) {
LOG.warn("Node {} ignore InstallSnapshotRequest from {} bad server id.", getNodeId(), request.getServerId());
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Parse serverId failed: %s", request.getServerId());
return RpcFactoryHelper //
.responseFactory() //
.newResponse(InstallSnapshotResponse.getDefaultInstance(), RaftError.EINVAL,
"Parse serverId failed: %s", request.getServerId());
}

this.writeLock.lock();
try {
if (!this.state.isActive()) {
LOG.warn("Node {} ignore InstallSnapshotRequest as it is not in active state {}.", getNodeId(),
this.state);
return RpcResponseFactory.newResponse(RaftError.EINVAL, "Node %s:%s is not in active state, state %s.",
this.groupId, this.serverId, this.state.name());
return RpcFactoryHelper //
.responseFactory() //
.newResponse(InstallSnapshotResponse.getDefaultInstance(), RaftError.EINVAL,
"Node %s:%s is not in active state, state %s.", this.groupId, this.serverId, this.state.name());
}

if (request.getTerm() < this.currTerm) {
Expand Down
Loading

0 comments on commit 59853a7

Please sign in to comment.