Skip to content

Commit

Permalink
reset connect backoff timer (sofastack#504)
Browse files Browse the repository at this point in the history
* reset connect backoff timer

* 1.3.4.bugfix

* 1.3.4.bugfix_2
  • Loading branch information
fengjiachun authored Sep 22, 2020
1 parent 91ebdb1 commit 3a1bdcf
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 21 deletions.
2 changes: 1 addition & 1 deletion jraft-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>jraft-parent</artifactId>
<groupId>com.alipay.sofa</groupId>
<version>1.3.4</version>
<version>1.3.4.bugfix_2</version>
</parent>
<artifactId>jraft-core</artifactId>
<packaging>jar</packaging>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3437,7 +3437,9 @@ public void describe(final Printer out) {

// snapshotExecutor
out.println("snapshotExecutor: ");
this.snapshotExecutor.describe(out);
if (this.snapshotExecutor != null) {
this.snapshotExecutor.describe(out);
}

// replicators
out.println("replicatorGroup: ");
Expand Down
2 changes: 1 addition & 1 deletion jraft-example/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>jraft-parent</artifactId>
<groupId>com.alipay.sofa</groupId>
<version>1.3.4</version>
<version>1.3.4.bugfix_2</version>
</parent>
<artifactId>jraft-example</artifactId>
<packaging>jar</packaging>
Expand Down
2 changes: 1 addition & 1 deletion jraft-extension/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<artifactId>jraft-parent</artifactId>
<groupId>com.alipay.sofa</groupId>
<version>1.3.4</version>
<version>1.3.4.bugfix_2</version>
</parent>

<artifactId>jraft-extension</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion jraft-extension/rpc-grpc-impl/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<artifactId>jraft-extension</artifactId>
<groupId>com.alipay.sofa</groupId>
<version>1.3.4</version>
<version>1.3.4.bugfix_2</version>
</parent>

<artifactId>rpc-grpc-impl</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import io.grpc.CallOptions;
import io.grpc.Channel;
Expand All @@ -44,10 +45,11 @@
import com.alipay.sofa.jraft.rpc.InvokeCallback;
import com.alipay.sofa.jraft.rpc.InvokeContext;
import com.alipay.sofa.jraft.rpc.RpcClient;
import com.alipay.sofa.jraft.rpc.RpcUtils;
import com.alipay.sofa.jraft.util.DirectExecutor;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.Requires;
import com.alipay.sofa.jraft.util.Utils;
import com.alipay.sofa.jraft.util.SystemPropertyUtil;
import com.google.protobuf.Message;

/**
Expand All @@ -60,7 +62,11 @@ public class GrpcClient implements RpcClient {

private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class);

private static final int MAX_FAILURES = SystemPropertyUtil.getInt(
"jraft.grpc.max.connect.failures", 20);

private final Map<Endpoint, ManagedChannel> managedChannelPool = new ConcurrentHashMap<>();
private final Map<Endpoint, AtomicInteger> transientFailures = new ConcurrentHashMap<>();
private final Map<String, Message> parserClasses;
private final MarshallerRegistry marshallerRegistry;
private volatile ReplicatorGroup replicatorGroup;
Expand All @@ -79,6 +85,7 @@ public boolean init(final RpcOptions opts) {
@Override
public void shutdown() {
closeAllChannels();
this.transientFailures.clear();
}

@Override
Expand Down Expand Up @@ -181,17 +188,25 @@ private ManagedChannel getChannel(final Endpoint endpoint) {
ch.notifyWhenStateChanged(ConnectivityState.READY, () -> {
final ReplicatorGroup rpGroup = replicatorGroup;
if (rpGroup != null) {
Utils.runInThread(() -> {
final PeerId peer = new PeerId();
if (peer.parse(ep.toString())) {
LOG.info("Peer {} is connected.", peer);
rpGroup.checkReplicator(peer, true);
} else {
LOG.error("Fail to parse peer: {}.", ep);
}
});
try {
RpcUtils.runInThread(() -> {
final PeerId peer = new PeerId();
if (peer.parse(ep.toString())) {
LOG.info("Peer {} is connected.", peer);
rpGroup.checkReplicator(peer, true);
} else {
LOG.error("Fail to parse peer: {}.", ep);
}
});
} catch (final Throwable t) {
LOG.error("Fail to check replicator {}.", ep, t);
}
}
});
ch.notifyWhenStateChanged(ConnectivityState.TRANSIENT_FAILURE,
() -> LOG.warn("Channel in TRANSIENT_FAILURE state: {}.", ep));
ch.notifyWhenStateChanged(ConnectivityState.SHUTDOWN,
() -> LOG.warn("Channel in SHUTDOWN state: {}.", ep));

return ch;
});
Expand Down Expand Up @@ -222,6 +237,14 @@ private boolean checkChannel(final Endpoint endpoint, final boolean createIfAbse
return false;
}
final ConnectivityState st = ch.getState(true);
return st == ConnectivityState.CONNECTING || st == ConnectivityState.READY || st == ConnectivityState.IDLE;
if (st == ConnectivityState.TRANSIENT_FAILURE) {
final AtomicInteger num = this.transientFailures.computeIfAbsent(endpoint, ep -> new AtomicInteger());
if (num.incrementAndGet() > MAX_FAILURES) {
this.transientFailures.remove(endpoint);
LOG.warn("Channel[{}] in {} state {} times, will be reset connect backoff.", endpoint, st, num.get());
ch.resetConnectBackoff();
}
}
return st != ConnectivityState.TRANSIENT_FAILURE && st != ConnectivityState.SHUTDOWN;
}
}
2 changes: 1 addition & 1 deletion jraft-rheakv/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<artifactId>jraft-parent</artifactId>
<groupId>com.alipay.sofa</groupId>
<version>1.3.4</version>
<version>1.3.4.bugfix_2</version>
</parent>

<artifactId>jraft-rheakv</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion jraft-rheakv/rheakv-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<artifactId>jraft-rheakv</artifactId>
<groupId>com.alipay.sofa</groupId>
<version>1.3.4</version>
<version>1.3.4.bugfix_2</version>
</parent>

<artifactId>jraft-rheakv-core</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion jraft-rheakv/rheakv-pd/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<artifactId>jraft-rheakv</artifactId>
<groupId>com.alipay.sofa</groupId>
<version>1.3.4</version>
<version>1.3.4.bugfix_2</version>
</parent>

<artifactId>jraft-rheakv-pd</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion jraft-test/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>jraft-parent</artifactId>
<groupId>com.alipay.sofa</groupId>
<version>1.3.4</version>
<version>1.3.4.bugfix_2</version>
</parent>
<artifactId>jraft-test</artifactId>
<packaging>jar</packaging>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

<groupId>com.alipay.sofa</groupId>
<artifactId>jraft-parent</artifactId>
<version>1.3.4</version>
<version>1.3.4.bugfix_2</version>
<packaging>pom</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down

0 comments on commit 3a1bdcf

Please sign in to comment.