Skip to content

Commit

Permalink
refector code
Browse files Browse the repository at this point in the history
  • Loading branch information
shiming committed Apr 27, 2017
1 parent 77574cd commit e8458af
Show file tree
Hide file tree
Showing 10 changed files with 18 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,8 @@ public class Constants {
public static final String ASYNC_KEY = "async";
public static final int RPCTYPE_ASYNC = 1;
public static final int RPCTYPE_BLOCKING = 2;
public static final int RPC_ASYNC_DEFAULT_TIMEOUT = 5000;
public static final int RPC_ASYNC_DEFAULT_TIMEOUT = 10000;

public static final String PROVIDER_ADDRESS = "provider";

public static final String CONSUMER_ADDRESS = "consumer";
public static final String REMOTE_ADDRESS = "remote";

}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.quancheng.saluki.core.grpc.util.MetadataKeyUtil;

import io.grpc.Attributes;
import io.grpc.Attributes.Key;
import io.grpc.CallOptions;
import io.grpc.ClientCall;
import io.grpc.Grpc;
Expand All @@ -32,8 +33,6 @@
import io.grpc.ResolvedServerInfo;
import io.grpc.ResolvedServerInfoGroup;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.Attributes.Key;

public abstract class AbstractRetryingRpcListener<RequestT, ResponseT, ResultT> extends ClientCall.Listener<ResponseT> implements Runnable {

Expand Down Expand Up @@ -85,14 +84,8 @@ public void onClose(Status status, Metadata trailers) {
} else {
if (retryCount > retryOptions.getReties() || !retryOptions.isEnableRetry()) {
String errorCause = trailers.get(MetadataKeyUtil.GRPC_ERRORCAUSE_VALUE);
StatusRuntimeException newException;
if (errorCause != null) {
newException = status.withDescription(errorCause).asRuntimeException();
completionFuture.setException(newException);
} else {
newException = status.asRuntimeException();
newException.printStackTrace();
}
Exception serverException = status.withDescription(errorCause).asRuntimeException();
completionFuture.setException(serverException);
notify.resetChannel();
return;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.protobuf.Message;
import com.quancheng.saluki.core.common.GrpcURL;
import com.quancheng.saluki.core.grpc.client.GrpcAsyncCall;
import com.quancheng.saluki.core.grpc.exception.RpcErrorMsgConstant;
import com.quancheng.saluki.core.grpc.exception.RpcServiceException;

import io.grpc.MethodDescriptor;
Expand All @@ -42,14 +43,15 @@ public GrpcBlockingUnaryCommand(GrpcAsyncCall grpcAsyncCall, GrpcURL refUrl,
this.grpcAsyncCall = grpcAsyncCall;
this.methodDesc = methodDesc;
this.request = request;

}

@Override
protected Message run() throws Exception {
try {
return grpcAsyncCall.unaryFuture(request, methodDesc).get();
} catch (InterruptedException | ExecutionException e) {
RpcServiceException rpcService = new RpcServiceException(e);
RpcServiceException rpcService = new RpcServiceException(e, RpcErrorMsgConstant.BIZ_DEFAULT_EXCEPTION);
throw rpcService;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ protected Message run() throws Exception {
RpcServiceException rpcService = new RpcServiceException(e, RpcErrorMsgConstant.SERVICE_TIMEOUT);
throw rpcService;
} else {
RpcServiceException rpcService = new RpcServiceException(e);
RpcServiceException rpcService = new RpcServiceException(e, RpcErrorMsgConstant.BIZ_DEFAULT_EXCEPTION);
throw rpcService;
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import com.quancheng.saluki.core.grpc.client.hystrix.GrpcBlockingUnaryCommand;
import com.quancheng.saluki.core.grpc.client.hystrix.GrpcFutureUnaryCommand;
import com.quancheng.saluki.core.grpc.exception.RpcFrameworkException;
import com.quancheng.saluki.core.grpc.exception.RpcServiceException;
import com.quancheng.saluki.core.grpc.service.ClientServerMonitor;
import com.quancheng.saluki.core.grpc.service.MonitorService;
import com.quancheng.saluki.core.grpc.util.GrpcReflectUtil;
Expand Down Expand Up @@ -109,11 +108,6 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl
collect(serviceName, methodName, reqProtoBufer, respProtoBufer, start, true);
RpcFrameworkException rpcFramwork = new RpcFrameworkException(e);
throw rpcFramwork;
} catch (Exception e) {
collect(serviceName, methodName, reqProtoBufer, respProtoBufer, start, true);
log.error(e.getMessage(), e);
RpcServiceException rpcService = new RpcServiceException(e);
throw rpcService;
} finally {
log.info(String.format("Service: %s Method: %s RemoteAddress: %s", serviceName, methodName,
getProviderServer()));
Expand All @@ -124,7 +118,7 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwabl

public InetSocketAddress getProviderServer() {
InetSocketAddress currentServer = (InetSocketAddress) attributes.get(GrpcAsyncCall.CURRENT_ADDR_KEY);
RpcContext.getContext().set(Constants.PROVIDER_ADDRESS, currentServer);
RpcContext.getContext().set(Constants.REMOTE_ADDRESS, currentServer);
return currentServer;
}

Expand All @@ -151,14 +145,14 @@ private RetryOptions createRetryOption(String methodName) {
private void collect(String serviceName, String methodName, Message request, Message response, long start,
boolean error) {
try {
String provider = getProviderServer().getHostName();
if (request == null || response == null) {
return;
}
long elapsed = System.currentTimeMillis() - start; // 计算调用耗时
int concurrent = getConcurrent(serviceName, methodName).get(); // 当前并发数
String service = serviceName; // 获取服务名称
String method = methodName; // 获取方法名
String provider = getProviderServer().getHostName();
String host = refUrl.getHost();
Integer port = refUrl.getPort();
if (clientServerMonitor == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public String getMessage() {

return "error_message: " + message + ", status: " + motanErrorMsg.getStatus() + ", error_code: "
+ motanErrorMsg.getErrorCode() + ",remoteaddress="
+ RpcContext.getContext().getAttachment(Constants.PROVIDER_ADDRESS);
+ RpcContext.getContext().getAttachment(Constants.REMOTE_ADDRESS);
}

public int getStatus() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
@Override
public void request(int numMessages) {
InetSocketAddress remoteAddress = (InetSocketAddress) call.getAttributes().get(Grpc.TRANSPORT_ATTR_REMOTE_ADDR);
RpcContext.getContext().setAttachment(Constants.CONSUMER_ADDRESS, remoteAddress.getHostString());
RpcContext.getContext().setAttachment(Constants.REMOTE_ADDRESS, remoteAddress.getHostString());
copyMetadataToThreadLocal(headers);
super.request(numMessages);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,7 @@ public void invoke(Message request, StreamObserver<Message> responseObserver) {
responseObserver.onError(statusException);
} finally {
log.info(String.format("Service: %s Method: %s RemoteAddress: %s", providerUrl.getServiceInterface(),
method.getName(),
RpcContext.getContext().getAttachment(Constants.CONSUMER_ADDRESS)));
method.getName(), RpcContext.getContext().getAttachment(Constants.REMOTE_ADDRESS)));
getConcurrent().decrementAndGet();
}
}
Expand All @@ -112,7 +111,7 @@ private void collect(Message request, Message response, long start, boolean erro
int concurrent = getConcurrent().get(); // 当前并发数
String service = providerUrl.getServiceInterface(); // 获取服务名称
String method = this.method.getName(); // 获取方法名
String consumer = RpcContext.getContext().getAttachment(Constants.CONSUMER_ADDRESS);// 远程服务器地址
String consumer = RpcContext.getContext().getAttachment(Constants.REMOTE_ADDRESS);// 远程服务器地址
String host = providerUrl.getHost();
int rpcPort = providerUrl.getPort();
int registryRpcPort = providerUrl.getParameter(Constants.REGISTRY_RPC_PORT_KEY, rpcPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ public class HelloServiceImpl implements HelloService {
public HelloReply sayHello(HelloRequest request) {
HelloReply reply = new HelloReply();
int registryPort = 0;
//Preconditions.checkState(registryPort != 0, "RegistryPort can not be null", registryPort);
Preconditions.checkState(registryPort != 0, "RegistryPort can not be null", registryPort);
reply.setMessage(request.getName());
return reply;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,6 @@ spring.application.name=example-server
server.port=8080
saluki.grpc.group=example
saluki.grpc.version=1.0.0
saluki.grpc.host=10.9.27.125
saluki.grpc.host=192.168.1.6
saluki.grpc.realityRpcPort=12201
saluki.grpc.registryAddress=localhost:8500

0 comments on commit e8458af

Please sign in to comment.