Skip to content

Commit

Permalink
fix: grpc conn refresh (sofastack#690)
Browse files Browse the repository at this point in the history
* fix: when the grpc connection failures too many times, reset the connect backoff and reconnect immediately

* by CR

* grpc conn fix
  • Loading branch information
Block authored Nov 23, 2021
1 parent 3054d55 commit 613fdde
Show file tree
Hide file tree
Showing 4 changed files with 279 additions and 52 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.rpc;

import org.junit.Ignore;
import org.junit.Test;

import com.alipay.sofa.jraft.rpc.impl.PingRequestProcessor;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.RpcFactoryHelper;

/**
*
* @author jiachun.fjc
*/
public class ConnectionRefreshTest {

@Ignore
@Test
public void simulation() throws InterruptedException {
ProtobufMsgFactory.load();

final RpcServer server = RpcFactoryHelper.rpcFactory().createRpcServer(new Endpoint("127.0.0.1", 19991));
server.registerProcessor(new PingRequestProcessor());
server.init(null);

final Endpoint target = new Endpoint("my.test.host1.com", 19991);

final RpcClient client = RpcFactoryHelper.rpcFactory().createRpcClient();
client.init(null);

final RpcRequests.PingRequest req = RpcRequests.PingRequest.newBuilder() //
.setSendTimestamp(System.currentTimeMillis()) //
.build();

for (int i = 0; i < 1000; i++) {
try {
final Object resp = client.invokeSync(target, req, 3000);
System.out.println(resp);
} catch (final Exception e) {
e.printStackTrace();
}
Thread.sleep(1000);
}
}
}
4 changes: 2 additions & 2 deletions jraft-core/src/test/resources/log4j2.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@

<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n"/>
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss.SSS} [%t] %-5p %c{1}:%L - %msg%n"/>
</Console>

<RollingFile name="RollingFile" filename="log/jraft-test.log"
filepattern="log/%d{YYYYMMddHHmmss}-jraft-test.log">
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss} [%t] %-5p %c{1}:%L - %msg%n"/>
<PatternLayout pattern="%d{YYYY-MM-dd HH:mm:ss.SSS} [%t] %-5p %c{1}:%L - %msg%n"/>
<Policies>
<SizeBasedTriggeringPolicy size="100 MB"/>
</Policies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,13 @@
*/
public class GrpcClient implements RpcClient {

private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class);
private static final Logger LOG = LoggerFactory.getLogger(GrpcClient.class);

private static final int MAX_FAILURES = SystemPropertyUtil.getInt(
"jraft.grpc.max.connect.failures", 20);
private static final int RESET_CONN_THRESHOLD = SystemPropertyUtil.getInt(
"jraft.grpc.max.conn.failures.to_reset", 2);

private final Map<Endpoint, ManagedChannel> managedChannelPool = new ConcurrentHashMap<>();
private final Map<Endpoint, AtomicInteger> transientFailures = new ConcurrentHashMap<>();
private final Map<Endpoint, ManagedChannel> managedChannelPool = new ConcurrentHashMap<>();
private final Map<Endpoint, AtomicInteger> transientFailures = new ConcurrentHashMap<>();
private final Map<String, Message> parserClasses;
private final MarshallerRegistry marshallerRegistry;
private volatile ReplicatorGroup replicatorGroup;
Expand Down Expand Up @@ -140,10 +140,16 @@ public void invokeAsync(final Endpoint endpoint, final Object request, final Inv
Requires.requireNonNull(endpoint, "endpoint");
Requires.requireNonNull(request, "request");

final Channel ch = getChannel(endpoint);
final Executor executor = callback.executor() != null ? callback.executor() : DirectExecutor.INSTANCE;

final Channel ch = getCheckedChannel(endpoint);
if (ch == null) {
executor.execute(() -> callback.complete(null, new RemotingException("Fail to connect: " + endpoint)));
return;
}

final MethodDescriptor<Message, Message> method = getCallMethod(request);
final CallOptions callOpts = CallOptions.DEFAULT.withDeadlineAfter(timeoutMs, TimeUnit.MILLISECONDS);
final Executor executor = callback.executor() != null ? callback.executor() : DirectExecutor.INSTANCE;

ClientCalls.asyncUnaryCall(ch.newCall(method, callOpts), (Message) request, new StreamObserver<Message>() {

Expand Down Expand Up @@ -178,39 +184,102 @@ private MethodDescriptor<Message, Message> getCallMethod(final Object request) {
.build();
}

private ManagedChannel getChannel(final Endpoint endpoint) {
return this.managedChannelPool.computeIfAbsent(endpoint, ep -> {
final ManagedChannel ch = ManagedChannelBuilder.forAddress(ep.getIp(), ep.getPort()) //
.usePlaintext() //
.directExecutor() //
.maxInboundMessageSize(GrpcRaftRpcFactory.RPC_MAX_INBOUND_MESSAGE_SIZE) //
.build();
// channel connection event
ch.notifyWhenStateChanged(ConnectivityState.READY, () -> {
final ReplicatorGroup rpGroup = replicatorGroup;
if (rpGroup != null) {
try {
RpcUtils.runInThread(() -> {
final PeerId peer = new PeerId();
if (peer.parse(ep.toString())) {
LOG.info("Peer {} is connected.", peer);
rpGroup.checkReplicator(peer, true);
} else {
LOG.error("Fail to parse peer: {}.", ep);
}
});
} catch (final Throwable t) {
LOG.error("Fail to check replicator {}.", ep, t);
}
}
});
ch.notifyWhenStateChanged(ConnectivityState.TRANSIENT_FAILURE,
() -> LOG.warn("Channel in TRANSIENT_FAILURE state: {}.", ep));
ch.notifyWhenStateChanged(ConnectivityState.SHUTDOWN,
() -> LOG.warn("Channel in SHUTDOWN state: {}.", ep));
private ManagedChannel getCheckedChannel(final Endpoint endpoint) {
final ManagedChannel ch = getChannel(endpoint, true);

if (checkConnectivity(endpoint, ch)) {
return ch;
});
}

return null;
}

private ManagedChannel getChannel(final Endpoint endpoint, final boolean createIfAbsent) {
if (createIfAbsent) {
return this.managedChannelPool.computeIfAbsent(endpoint, this::newChannel);
} else {
return this.managedChannelPool.get(endpoint);
}
}

private ManagedChannel newChannel(final Endpoint endpoint) {
final ManagedChannel ch = ManagedChannelBuilder.forAddress(endpoint.getIp(), endpoint.getPort()) //
.usePlaintext() //
.directExecutor() //
.maxInboundMessageSize(GrpcRaftRpcFactory.RPC_MAX_INBOUND_MESSAGE_SIZE) //
.build();

LOG.info("Creating new channel to: {}.", endpoint);

// The init channel state is IDLE
notifyWhenStateChanged(ConnectivityState.IDLE, endpoint, ch);

return ch;
}

private ManagedChannel removeChannel(final Endpoint endpoint) {
return this.managedChannelPool.remove(endpoint);
}

private void notifyWhenStateChanged(final ConnectivityState state, final Endpoint endpoint, final ManagedChannel ch) {
ch.notifyWhenStateChanged(state, () -> onStateChanged(endpoint, ch));
}

private void onStateChanged(final Endpoint endpoint, final ManagedChannel ch) {
final ConnectivityState state = ch.getState(false);

LOG.info("The channel {} is in state: {}.", endpoint, state);

switch (state) {
case READY:
notifyReady(endpoint);
notifyWhenStateChanged(ConnectivityState.READY, endpoint, ch);
break;
case TRANSIENT_FAILURE:
notifyFailure(endpoint);
notifyWhenStateChanged(ConnectivityState.TRANSIENT_FAILURE, endpoint, ch);
break;
case SHUTDOWN:
notifyShutdown(endpoint);
break;
case CONNECTING:
notifyWhenStateChanged(ConnectivityState.CONNECTING, endpoint, ch);
break;
case IDLE:
notifyWhenStateChanged(ConnectivityState.IDLE, endpoint, ch);
break;
}
}

private void notifyReady(final Endpoint endpoint) {
LOG.info("The channel {} has successfully established.", endpoint);

clearConnFailuresCount(endpoint);

final ReplicatorGroup rpGroup = this.replicatorGroup;
if (rpGroup != null) {
try {
RpcUtils.runInThread(() -> {
final PeerId peer = new PeerId();
if (peer.parse(endpoint.toString())) {
LOG.info("Peer {} is connected.", peer);
rpGroup.checkReplicator(peer, true);
} else {
LOG.error("Fail to parse peer: {}.", endpoint);
}
});
} catch (final Throwable t) {
LOG.error("Fail to check replicator {}.", endpoint, t);
}
}
}

private void notifyFailure(final Endpoint endpoint) {
LOG.warn("There has been some transient failure on this channel {}.", endpoint);
}

private void notifyShutdown(final Endpoint endpoint) {
LOG.warn("This channel {} has started shutting down. Any new RPCs should fail immediately.", endpoint);
}

private void closeAllChannels() {
Expand All @@ -219,33 +288,71 @@ private void closeAllChannels() {
LOG.info("Shutdown managed channel: {}, {}.", entry.getKey(), ch);
ManagedChannelHelper.shutdownAndAwaitTermination(ch);
}
this.managedChannelPool.clear();
}

private void closeChannel(final Endpoint endpoint) {
final ManagedChannel ch = this.managedChannelPool.remove(endpoint);
final ManagedChannel ch = removeChannel(endpoint);
LOG.info("Close connection: {}, {}.", endpoint, ch);
if (ch != null) {
ManagedChannelHelper.shutdownAndAwaitTermination(ch);
}
}

private boolean checkChannel(final Endpoint endpoint, final boolean createIfAbsent) {
ManagedChannel ch = this.managedChannelPool.get(endpoint);
if (ch == null && createIfAbsent) {
ch = getChannel(endpoint);
}
final ManagedChannel ch = getChannel(endpoint, createIfAbsent);

if (ch == null) {
return false;
}
final ConnectivityState st = ch.getState(true);
if (st == ConnectivityState.TRANSIENT_FAILURE) {
final AtomicInteger num = this.transientFailures.computeIfAbsent(endpoint, ep -> new AtomicInteger());
if (num.incrementAndGet() > MAX_FAILURES) {
this.transientFailures.remove(endpoint);
LOG.warn("Channel[{}] in {} state {} times, will be reset connect backoff.", endpoint, st, num.get());

return checkConnectivity(endpoint, ch);
}

private int incConnFailuresCount(final Endpoint endpoint) {
return this.transientFailures.computeIfAbsent(endpoint, ep -> new AtomicInteger()).incrementAndGet();
}

private void clearConnFailuresCount(final Endpoint endpoint) {
this.transientFailures.remove(endpoint);
}

private boolean checkConnectivity(final Endpoint endpoint, final ManagedChannel ch) {
final ConnectivityState st = ch.getState(false);

if (st != ConnectivityState.TRANSIENT_FAILURE && st != ConnectivityState.SHUTDOWN) {
return true;
}

final int c = incConnFailuresCount(endpoint);
if (c < RESET_CONN_THRESHOLD) {
if (c == RESET_CONN_THRESHOLD - 1) {
// For sub-channels that are in TRANSIENT_FAILURE state, short-circuit the backoff timer and make
// them reconnect immediately. May also attempt to invoke NameResolver#refresh
ch.resetConnectBackoff();
}
return true;
}
return st != ConnectivityState.TRANSIENT_FAILURE && st != ConnectivityState.SHUTDOWN;

clearConnFailuresCount(endpoint);

final ManagedChannel removedCh = removeChannel(endpoint);

if (removedCh == null) {
// The channel has been removed and closed by another
return false;
}

LOG.warn("Channel[{}] in [INACTIVE] state {} times, it has been removed from the pool.", endpoint, c);

if (removedCh != ch) {
// Now that it's removed, close it
ManagedChannelHelper.shutdownAndAwaitTermination(removedCh, 100);
}

ManagedChannelHelper.shutdownAndAwaitTermination(ch, 100);

return false;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft.rpc;

import org.junit.Ignore;
import org.junit.Test;

import com.alipay.sofa.jraft.rpc.impl.PingRequestProcessor;
import com.alipay.sofa.jraft.util.Endpoint;
import com.alipay.sofa.jraft.util.RpcFactoryHelper;

/**
*
* @author jiachun.fjc
*/
public class ConnectionRefreshTest {

@Ignore
@Test
public void simulation() throws InterruptedException {
ProtobufMsgFactory.load();

final RpcServer server = RpcFactoryHelper.rpcFactory().createRpcServer(new Endpoint("127.0.0.1", 19991));
server.registerProcessor(new PingRequestProcessor());
server.init(null);

final Endpoint target = new Endpoint("my.test.host1.com", 19991);

final RpcClient client = RpcFactoryHelper.rpcFactory().createRpcClient();
client.init(null);

final RpcRequests.PingRequest req = RpcRequests.PingRequest.newBuilder() //
.setSendTimestamp(System.currentTimeMillis()) //
.build();

for (int i = 0; i < 1000; i++) {
try {
final Object resp = client.invokeSync(target, req, 3000);
System.out.println(resp);
} catch (final Exception e) {
e.printStackTrace();
}
Thread.sleep(1000);
}
}
}

0 comments on commit 613fdde

Please sign in to comment.