Skip to content

Commit

Permalink
Add configurable for GrpcClient health check retry times (alibaba#9056)
Browse files Browse the repository at this point in the history
* add configurable for GrpcClient health check retry.

* add some unit test for GrpcClient health check retry.

* reformat code style

* some grpc params move to GrpcConsts

* add GRpcConfigLabel

* reformat style

* reformat style

* reformat style

* reformat style

* reformat style
  • Loading branch information
karsonto authored Sep 2, 2022
1 parent 30794c4 commit 0aa695e
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,10 @@ public abstract class RpcClient implements Closeable {

private static final Pattern EXCLUDE_PROTOCOL_PATTERN = Pattern.compile("(?<=\\w{1,5}://)(.*)");

protected int healthCheckRetryTimes = 1;

protected Long healthCheckTimeOut = 3000L;

static {
PayloadRegistry.init();
}
Expand Down Expand Up @@ -309,8 +313,7 @@ public final void start() throws NacosException {
if (isShutdown()) {
break;
}
ReconnectContext reconnectContext = reconnectionSignal
.poll(keepAliveTime, TimeUnit.MILLISECONDS);
ReconnectContext reconnectContext = reconnectionSignal.poll(keepAliveTime, TimeUnit.MILLISECONDS);
if (reconnectContext == null) {
// check alive time.
if (System.currentTimeMillis() - lastActiveTimeStamp >= keepAliveTime) {
Expand All @@ -328,8 +331,8 @@ public final void start() throws NacosException {
break;
}

boolean statusFLowSuccess = RpcClient.this.rpcClientStatus
.compareAndSet(rpcClientStatus, RpcClientStatus.UNHEALTHY);
boolean statusFLowSuccess = RpcClient.this.rpcClientStatus.compareAndSet(
rpcClientStatus, RpcClientStatus.UNHEALTHY);
if (statusFLowSuccess) {
reconnectContext = new ReconnectContext(null, false);
} else {
Expand Down Expand Up @@ -396,8 +399,9 @@ public final void start() throws NacosException {
}

if (connectToServer != null) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect to server [{}] on start up, connectionId = {}",
name, connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Success to connect to server [{}] on start up, connectionId = {}", name,
connectToServer.serverInfo.getAddress(), connectToServer.getConnectionId());
this.currentConnection = connectToServer;
rpcClientStatus.set(RpcClientStatus.RUNNING);
eventLinkedBlockingQueue.offer(new ConnectionEvent(ConnectionEvent.CONNECTED));
Expand Down Expand Up @@ -431,8 +435,8 @@ public Response requestReply(Request request) {
ConnectResetRequest connectResetRequest = (ConnectResetRequest) request;
if (StringUtils.isNotBlank(connectResetRequest.getServerIp())) {
ServerInfo serverInfo = resolveServerInfo(
connectResetRequest.getServerIp() + Constants.COLON + connectResetRequest
.getServerPort());
connectResetRequest.getServerIp() + Constants.COLON
+ connectResetRequest.getServerPort());
switchServerAsync(serverInfo, false);
} else {
switchServerAsync();
Expand Down Expand Up @@ -464,12 +468,16 @@ private boolean healthCheck() {
if (this.currentConnection == null) {
return false;
}
try {
Response response = this.currentConnection.request(healthCheckRequest, 3000L);
// not only check server is ok, also check connection is register.
return response != null && response.isSuccess();
} catch (NacosException e) {
// ignore
int reTryTimes = healthCheckRetryTimes;
while (reTryTimes > 0) {
reTryTimes--;
try {
Response response = this.currentConnection.request(healthCheckRequest, healthCheckTimeOut);
// not only check server is ok, also check connection is register.
return response != null && response.isSuccess();
} catch (NacosException e) {
// ignore
}
}
return false;
}
Expand Down Expand Up @@ -520,8 +528,9 @@ protected void reconnect(final ServerInfo recommendServerInfo, boolean onRequest
// 2.create a new channel to new server
Connection connectionNew = connectToServer(serverInfo);
if (connectionNew != null) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Success to connect a server [{}], connectionId = {}",
name, serverInfo.getAddress(), connectionNew.getConnectionId());
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Success to connect a server [{}], connectionId = {}", name,
serverInfo.getAddress(), connectionNew.getConnectionId());
// successfully create a new connect.
if (currentConnection != null) {
LoggerUtils.printIfInfoEnabled(LOGGER,
Expand Down Expand Up @@ -558,8 +567,8 @@ protected void reconnect(final ServerInfo recommendServerInfo, boolean onRequest
if (reConnectTimes > 0
&& reConnectTimes % RpcClient.this.serverListFactory.getServerList().size() == 0) {
LoggerUtils.printIfInfoEnabled(LOGGER,
"[{}] Fail to connect server, after trying {} times, last try server is {}, error = {}", name,
reConnectTimes, serverInfo, lastException == null ? "unknown" : lastException);
"[{}] Fail to connect server, after trying {} times, last try server is {}, error = {}",
name, reConnectTimes, serverInfo, lastException == null ? "unknown" : lastException);
if (Integer.MAX_VALUE == retryTurns) {
retryTurns = 50;
} else {
Expand Down Expand Up @@ -687,8 +696,9 @@ public Response request(Request request, long timeoutMills) throws NacosExceptio
}
}

LoggerUtils.printIfErrorEnabled(LOGGER, "Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
request, retryTimes, e.getMessage());
LoggerUtils.printIfErrorEnabled(LOGGER,
"Send request fail, request = {}, retryTimes = {}, errorMessage = {}", request, retryTimes,
e.getMessage());

exceptionThrow = e;

Expand Down Expand Up @@ -737,9 +747,9 @@ public void asyncRequest(Request request, RequestCallBack callback) throws Nacos
// Do nothing.
}
}
LoggerUtils
.printIfErrorEnabled(LOGGER, "[{}] Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
name, request, retryTimes, e.getMessage());
LoggerUtils.printIfErrorEnabled(LOGGER,
"[{}] Send request fail, request = {}, retryTimes = {}, errorMessage = {}", name, request,
retryTimes, e.getMessage());
exceptionToThrow = e;

}
Expand Down Expand Up @@ -785,9 +795,9 @@ public RequestFuture requestFuture(Request request) throws NacosException {
// Do nothing.
}
}
LoggerUtils
.printIfErrorEnabled(LOGGER, "[{}] Send request fail, request = {}, retryTimes = {}, errorMessage = {}",
name, request, retryTimes, e.getMessage());
LoggerUtils.printIfErrorEnabled(LOGGER,
"[{}] Send request fail, request = {}, retryTimes = {}, errorMessage = {}", name, request,
retryTimes, e.getMessage());
exceptionToThrow = e;

}
Expand Down Expand Up @@ -832,8 +842,8 @@ protected Response handleServerRequest(final Request request) {
Response response = serverRequestHandler.requestReply(request);

if (response != null) {
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Ack server push request, request = {}, requestId = {}", name,
request.getClass().getSimpleName(), request.getRequestId());
LoggerUtils.printIfInfoEnabled(LOGGER, "[{}] Ack server push request, request = {}, requestId = {}",
name, request.getClass().getSimpleName(), request.getRequestId());
return response;
}
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand All @@ -60,23 +61,15 @@ public abstract class GrpcClient extends RpcClient {

static final Logger LOGGER = LoggerFactory.getLogger(GrpcClient.class);

protected static final String NACOS_SERVER_GRPC_PORT_OFFSET_KEY = "nacos.server.grpc.port.offset";

protected static final String NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME = "nacos.remote.client.grpc.pool.alive";

protected static final String NACOS_CLIENT_GRPC_TIMEOUT = "nacos.remote.client.grpc.timeout";

protected static final String NACOS_CLIENT_GRPC_QUEUESIZE = "nacos.remote.client.grpc.queue.size";

private ThreadPoolExecutor grpcExecutor = null;

private Integer threadPoolCoreSize;

private Integer threadPoolMaxSize;

private static final long DEFAULT_MAX_INBOUND_MESSAGE_SIZE = 10 * 1024 * 1024L;
private static final String DEFAULT_MAX_INBOUND_MESSAGE_SIZE = String.valueOf(10 * 1024 * 1024L);

private static final long DEFAULT_KEEP_ALIVE_TIME = 6 * 60 * 1000;
private static final String DEFAULT_KEEP_ALIVE_TIME = String.valueOf(6 * 60 * 1000);

private Properties configProperties = new Properties();

Expand All @@ -86,7 +79,7 @@ public abstract class GrpcClient extends RpcClient {

private static final String KEEP_ALIVE = "10";

private Long timeOut;
private long timeOut = 3000L;

@Override
public ConnectionType getConnectionType() {
Expand All @@ -98,32 +91,34 @@ public ConnectionType getConnectionType() {
*/
public GrpcClient(String name) {
super(name);
initGrpcClient(null);
initGrpcClient(null, null);
}

public GrpcClient(String name, Properties configProperties) {
super(name);
initGrpcClient(configProperties);
initGrpcClient(configProperties, GrpcConsts.getRpcParams());
}

private void initGrpcClient(Properties configProperties) {
private void initGrpcClient(Properties configProperties, Set<String> configName) {
if (!Objects.isNull(configProperties)) {
if (configProperties.contains(NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME)) {
this.configProperties.put(NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME,
configProperties.getProperty(NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME));
}
if (configProperties.contains(NACOS_CLIENT_GRPC_TIMEOUT)) {
this.configProperties.put(NACOS_CLIENT_GRPC_TIMEOUT,
configProperties.getProperty(NACOS_CLIENT_GRPC_TIMEOUT));
}
if (configProperties.contains(NACOS_CLIENT_GRPC_QUEUESIZE)) {
this.configProperties.put(NACOS_CLIENT_GRPC_QUEUESIZE,
configProperties.getProperty(NACOS_CLIENT_GRPC_QUEUESIZE));
for (String name : configName) {
if (configProperties.contains(name)) {
this.configProperties.put(name, configProperties.getProperty(name));
}
}
}
addDefaultConfig();
checkInitProperties(this.configProperties);
}

private void addDefaultConfig() {
addDefaultConfig(configProperties, GrpcConsts.NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME, KEEP_ALIVE);
addDefaultConfig(configProperties, GrpcConsts.NACOS_CLIENT_GRPC_TIMEOUT, DEFAULT_TIME_OUT);
addDefaultConfig(configProperties, GrpcConsts.NACOS_CLIENT_GRPC_QUEUESIZE, QUEUE_SIZE);
addDefaultConfig(configProperties, GrpcConsts.MAX_INBOUND_MESSAGE_SIZE, DEFAULT_MAX_INBOUND_MESSAGE_SIZE);
addDefaultConfig(configProperties, GrpcConsts.KEEP_ALIVE_TIME, DEFAULT_KEEP_ALIVE_TIME);
}

private void addDefaultConfig(Properties configProperties, String name, String defaultConfig) {
if (null != System.getProperty(name)) {
configProperties.put(name, System.getProperty(name));
Expand All @@ -133,10 +128,17 @@ private void addDefaultConfig(Properties configProperties, String name, String d
}

private void checkInitProperties(Properties configProperties) {
addDefaultConfig(configProperties, NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME, KEEP_ALIVE);
addDefaultConfig(configProperties, NACOS_CLIENT_GRPC_TIMEOUT, DEFAULT_TIME_OUT);
addDefaultConfig(configProperties, NACOS_CLIENT_GRPC_QUEUESIZE, QUEUE_SIZE);
this.timeOut = Long.parseLong(configProperties.getProperty(NACOS_CLIENT_GRPC_TIMEOUT));
if (configProperties.contains(GrpcConsts.NACOS_CLIENT_GRPC_TIMEOUT)) {
this.timeOut = Long.parseLong(configProperties.getProperty(GrpcConsts.NACOS_CLIENT_GRPC_TIMEOUT));
}
if (configProperties.contains(GrpcConsts.NACOS_CLIENT_GRPC_HEALTHCHECK_RETRY_TIMES)) {
this.healthCheckRetryTimes = Integer.parseInt(
configProperties.getProperty(GrpcConsts.NACOS_CLIENT_GRPC_HEALTHCHECK_RETRY_TIMES));
}
if (configProperties.contains(GrpcConsts.NACOS_CLIENT_GRPC_HEALTHCHECK_TIMEOUT)) {
this.healthCheckTimeOut = Long.parseLong(
configProperties.getProperty(GrpcConsts.NACOS_CLIENT_GRPC_HEALTHCHECK_TIMEOUT));
}
}

/**
Expand Down Expand Up @@ -167,8 +169,8 @@ protected Integer getThreadPoolMaxSize() {

protected ThreadPoolExecutor createGrpcExecutor(String serverIp) {
Long keepAliveTime = Long.parseLong(
this.configProperties.getProperty(NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME));
int queueSize = Integer.parseInt(this.configProperties.getProperty(NACOS_CLIENT_GRPC_QUEUESIZE));
this.configProperties.getProperty(GrpcConsts.NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME));
int queueSize = Integer.parseInt(this.configProperties.getProperty(GrpcConsts.NACOS_CLIENT_GRPC_QUEUESIZE));
ThreadPoolExecutor grpcExecutor = new ThreadPoolExecutor(getThreadPoolCoreSize(), getThreadPoolMaxSize(),
keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queueSize),
new ThreadFactoryBuilder().daemon(true).nameFormat("nacos-grpc-client-executor-" + serverIp + "-%d")
Expand Down Expand Up @@ -213,15 +215,11 @@ private ManagedChannel createNewManagedChannel(String serverIp, int serverPort)
}

private int getInboundMessageSize() {
String messageSize = System.getProperty("nacos.remote.client.grpc.maxinbound.message.size",
String.valueOf(DEFAULT_MAX_INBOUND_MESSAGE_SIZE));
return Integer.parseInt(messageSize);
return Integer.parseInt(configProperties.getProperty(GrpcConsts.MAX_INBOUND_MESSAGE_SIZE));
}

private int keepAliveTimeMillis() {
String keepAliveTimeMillis = System.getProperty("nacos.remote.grpc.keep.alive.millis",
String.valueOf(DEFAULT_KEEP_ALIVE_TIME));
return Integer.parseInt(keepAliveTimeMillis);
return Integer.parseInt(configProperties.getProperty(GrpcConsts.KEEP_ALIVE_TIME));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public GrpcClusterClient(String name) {

@Override
public int rpcPortOffset() {
return Integer.parseInt(System.getProperty(NACOS_SERVER_GRPC_PORT_OFFSET_KEY,
return Integer.parseInt(System.getProperty(GrpcConsts.NACOS_SERVER_GRPC_PORT_OFFSET_KEY,
String.valueOf(Constants.CLUSTER_GRPC_PORT_DEFAULT_OFFSET)));
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
/*
* Copyright 1999-2020 Alibaba Group Holding Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.alibaba.nacos.common.remote.client.grpc;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
* GrpcConsts.
*
* @author karsonto
*/
public class GrpcConsts {

public static final String NACOS_SERVER_GRPC_PORT_OFFSET_KEY = "nacos.server.grpc.port.offset";

public static final String NACOS_CLIENT_GRPC = "nacos.remote.client.grpc";

@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_THREADPOOL_KEEPALIVETIME = NACOS_CLIENT_GRPC + ".pool.alive";

@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_TIMEOUT = NACOS_CLIENT_GRPC + ".timeout";

@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_QUEUESIZE = NACOS_CLIENT_GRPC + ".queue.size";

@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_HEALTHCHECK_RETRY_TIMES = NACOS_CLIENT_GRPC + ".health.retry";

@GRpcConfigLabel
public static final String NACOS_CLIENT_GRPC_HEALTHCHECK_TIMEOUT = NACOS_CLIENT_GRPC + ".health.timeout";

@GRpcConfigLabel
public static final String MAX_INBOUND_MESSAGE_SIZE = NACOS_CLIENT_GRPC + ".maxinbound.message.size";

@GRpcConfigLabel
public static final String KEEP_ALIVE_TIME = NACOS_CLIENT_GRPC + ".keep.alive.millis";

private static final Set<String> CONFIG_NAMES = new HashSet<>();

@Documented
@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
protected @interface GRpcConfigLabel {

}

static {
Class clazz = GrpcConsts.class;
Field[] declaredFields = clazz.getDeclaredFields();
for (Field declaredField : declaredFields) {
declaredField.setAccessible(true);
if (declaredField.getType().equals(String.class) && null != declaredField.getAnnotation(
GRpcConfigLabel.class)) {
try {
CONFIG_NAMES.add((String) declaredField.get(null));
} catch (IllegalAccessException e) {
e.printStackTrace();
}
}
}
}

public static Set<String> getRpcParams() {
return Collections.unmodifiableSet(CONFIG_NAMES);
}
}
Loading

0 comments on commit 0aa695e

Please sign in to comment.