Skip to content

Commit

Permalink
fix#70
Browse files Browse the repository at this point in the history
  • Loading branch information
cstongwei authored and zilongTong committed Jun 16, 2022
1 parent 4baf31f commit 6bfc0d2
Show file tree
Hide file tree
Showing 23 changed files with 5,361 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
package com.tencent.tsf.femas.adaptor.paas.config;

import com.tencent.tsf.femas.common.context.Context;
import com.tencent.tsf.femas.common.context.factory.ContextFactory;
import com.tencent.tsf.femas.common.entity.Service;
import com.tencent.tsf.femas.common.util.StringUtils;
import com.tencent.tsf.femas.config.AbstractConfigHttpClientManager;
import com.tencent.tsf.femas.config.grpc.paas.*;
import io.grpc.ManagedChannel;
import io.grpc.netty.NettyChannelBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;

public class FemasConfigGrpcClientManager extends AbstractConfigHttpClientManager {

private static final Logger log = LoggerFactory.getLogger(FemasConfigGrpcClientManager.class);
private volatile Context context = ContextFactory.getContextInstance();

private ManagedChannel channel;

public FemasConfigGrpcClientManager() {
String host = GrpcHepler.getPaasServerHost();
int port = GrpcHepler.getPaasGrpcPort();
channel = NettyChannelBuilder
.forAddress(host,port)
.usePlaintext()
.build();
}
@Override
public String getType() {
return PollingType.grpc.name();
}
@Override
public void reportEvent(Service service, String eventId, String data) {
if (context.isEmptyPaasServer()) {
log.debug("reportEvent failed, could not find the paas address profile");
return;
}
ReportEventRequest eventRequest = ReportEventRequest.newBuilder()
.setNamespaceId(service.getNamespace())
.setEventId(eventId)
.setServiceName(service.getName())
.setData(data)
.build();
try {
PollingResult result = createStub().reportServiceEvent(eventRequest);
boolean success = GrpcHepler.success(result);
if(!success){
log.error("init namespace failed,message={}",result.getMessage());
}
}catch (Exception e) {
log.error("init namespace failed", e);
}

}

@Override
public void reportApis(String namespaceId, String serviceName, String applicationVersion, String data) {
if (context.isEmptyPaasServer()) {
log.debug("reportApis failed ,could not find the paas address profile");
return;
}
ServiceApiRequest serviceApiRequest =ServiceApiRequest.newBuilder()
.setNamespaceId(namespaceId)
.setServiceName(serviceName)
.setApplicationVersion(applicationVersion)
.setData(data).build();
try {
PollingResult pollingResult = createStub().reportServiceApi(serviceApiRequest);
boolean success = GrpcHepler.success(pollingResult);
if(!success){
log.error("reportApis failed,message={}",pollingResult.getMessage());
}
} catch (Exception e) {
// 无配置 paas server 时,报错不打印
log.warn("config http manager reportApis failed, msg:{}", e.getMessage());
}
}

@Override
public String fetchKVValue(String key, String namespaceId) {
if (context.isEmptyPaasServer()) {
log.debug("fetchKVValue failed , could not find the paas address profile");
return null;
}
try {
SimpleParam simpleParam = SimpleParam.newBuilder().setParam(key).build();
PollingResult pollingResult = createStub().fetchBreakerRule(simpleParam);
boolean success = GrpcHepler.success(pollingResult);
if(!success){
log.error("config http manager fetchKVValue failed,message={}",pollingResult.getMessage());
}else {
return pollingResult.getResult();
}
} catch (Exception e) {
log.error("config http manager fetchKVValue failed", e);
}
return null;
}



@Override
public void initNamespace(String registryAddress, String namespaceId) {
if (StringUtils.isEmpty(namespaceId)) {
log.error("namespace is empty");
}
if (context.isEmptyPaasServer()) {
log.debug("initNamespace failed , could not find the paas address profile");
return;
}
InitNamespaceRequest initNamespaceRequest = InitNamespaceRequest.newBuilder()
.setNamespaceId(namespaceId)
.setRegistryAddress(registryAddress)
.build();
try {
PollingResult pollingResult = createStub().initNamespace(initNamespaceRequest);
boolean success = GrpcHepler.success(pollingResult);
if(!success){
log.error("init namespace failed,message={}",pollingResult.getMessage());
}
} catch (Exception e) {
log.error("init namespace failed, msg:{}", e.getMessage());
}
}

private PaasPollingGrpc.PaasPollingBlockingStub createStub() {
return PaasPollingGrpc.newBlockingStub(channel).withDeadlineAfter(10,TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
public class FemasConfigHttpClientManager extends AbstractConfigHttpClientManager {

private static final Logger log = LoggerFactory.getLogger(FemasConfigHttpClientManager.class);

private static final String webContext = "/atom";
private static final String fetchKeyUrl = "/v1/sdk/fetchData";
private static final String reportCircuitEvent = "/v1/sdk/reportServiceEvent";
Expand Down Expand Up @@ -77,6 +78,12 @@ public FemasConfigHttpClientManager() {
this.httpClient = ApacheHttpClientHolder.getHttpClient(httpClientFactory);
}

@Override
public String getType() {
return PollingType.http.name();
}

@Override
public void reportEvent(Service service, String eventId, String data) {
if (context.isEmptyPaasServer()) {
log.debug("reportEvent failed, could not find the paas address profile");
Expand All @@ -94,6 +101,7 @@ public void reportEvent(Service service, String eventId, String data) {
}
}

@Override
public void reportApis(String namespaceId, String serviceName, String applicationVersion, String data) {
if (context.isEmptyPaasServer()) {
log.debug("reportApis failed ,could not find the paas address profile");
Expand All @@ -119,6 +127,7 @@ public void reportApis(String namespaceId, String serviceName, String applicatio
}
}

@Override
public String fetchKVValue(String key, String namespaceId) {

final Map<String, Object> params = new HashMap<>(3);
Expand Down Expand Up @@ -173,6 +182,7 @@ public Map<String, String> builderHeader() {
return header;
}

@Override
public void initNamespace(String registryAddress, String namespaceId) {
if (StringUtils.isEmpty(namespaceId)) {
log.error("namespace is empty");
Expand Down
18 changes: 17 additions & 1 deletion femas-admin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,23 @@
<artifactId>mysql-connector-java</artifactId>
<version>8.0.20</version>
</dependency>

<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-api</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
</dependencies>


Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package com.tencent.tsf.femas.endpoint.configlisten;

import com.tencent.tsf.femas.config.grpc.paas.*;
import com.tencent.tsf.femas.context.ApplicationContextHelper;
import com.tencent.tsf.femas.service.namespace.NamespaceMangerService;
import com.tencent.tsf.femas.service.registry.ServiceManagerService;
import com.tencent.tsf.femas.service.rule.ConvertService;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;


/**
* @ClassName Grpc
* @Description TODO
* @Author cstongwei
* @Date 2022/5/30 10:28
* @Version 1.0.0
**/
@Slf4j
public class GrpcLongPollingEndpoint extends PaasPollingGrpc.PaasPollingImplBase{

ConvertService convertService;

ServiceManagerService serviceManagerService;

NamespaceMangerService namespaceMangerService;

public GrpcLongPollingEndpoint() {
convertService = ApplicationContextHelper.getBean(ConvertService.class);
serviceManagerService = ApplicationContextHelper.getBean(ServiceManagerService.class);
namespaceMangerService = ApplicationContextHelper.getBean(NamespaceMangerService.class);
}

@Override
public void fetchBreakerRule(SimpleParam request, StreamObserver<PollingResult> responseObserver) {
try {
String fetchBreakerRule = convertService.convert(request.getParam());
if(Context.current().isCancelled()){
log.debug("request is cancelled!");
responseObserver.onError(Status.CANCELLED.withDescription("request is cancelled!").asRuntimeException());
return;
}
PollingResult result;
if(fetchBreakerRule!=null){
result = GrpcHepler.ok(fetchBreakerRule);
}else{
result = GrpcHepler.ok();
}
responseObserver.onNext(result);
responseObserver.onCompleted();
}catch (Throwable e){
responseObserver.onNext(GrpcHepler.fail(e.toString()));
responseObserver.onCompleted();
log.error("error",e);
}

}

@Override
public void reportServiceApi(ServiceApiRequest request, StreamObserver<PollingResult> responseObserver) {
if (log.isDebugEnabled()) {
log.debug("ServiceApiRequest: {}", request);
}
try {
serviceManagerService.reportServiceApi(request.getNamespaceId(),
request.getServiceName(), request.getApplicationVersion(),
request.getData());
if(Context.current().isCancelled()){
log.debug("request is cancelled!");
responseObserver.onError(Status.CANCELLED.withDescription("request is cancelled!").asRuntimeException());
return;
}
responseObserver.onNext(GrpcHepler.ok());
responseObserver.onCompleted();
}catch (Throwable e){
responseObserver.onNext(GrpcHepler.fail(e.getCause().toString()));
responseObserver.onCompleted();
log.error("error",e);
}

}

@Override
public void reportServiceEvent(ReportEventRequest request, StreamObserver<PollingResult> responseObserver) {
try {
serviceManagerService.reportServiceEvent(request.getNamespaceId(), request.getServiceName(), request.getEventId(), request.getData());
if(Context.current().isCancelled()){
log.debug("request is cancelled!");
responseObserver.onError(Status.CANCELLED.withDescription("request is cancelled!").asRuntimeException());
return;
}
responseObserver.onNext(GrpcHepler.ok());
responseObserver.onCompleted();
}catch (Throwable e){
responseObserver.onNext(GrpcHepler.fail(e.getCause().toString()));
responseObserver.onCompleted();
log.error("error",e);
}
}

@Override
public void initNamespace(InitNamespaceRequest request, StreamObserver<PollingResult> responseObserver) {
try {
namespaceMangerService.initNamespace(request.getRegistryAddress(), request.getNamespaceId());
if(Context.current().isCancelled()){
log.debug("request is cancelled!");
responseObserver.onError(Status.CANCELLED.withDescription("request is cancelled!").asRuntimeException());
return;
}
responseObserver.onNext(GrpcHepler.ok());
responseObserver.onCompleted();
}catch (Throwable e){
responseObserver.onNext(GrpcHepler.fail(e.getCause().toString()));
responseObserver.onCompleted();
log.error("error",e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.tencent.tsf.femas.endpoint.adaptor.AbstractBaseEndpoint;
import com.tencent.tsf.femas.entity.registry.ServiceApiRequest;
import com.tencent.tsf.femas.enums.ServiceInvokeEnum;
import com.tencent.tsf.femas.service.namespace.NamespaceMangerService;
import com.tencent.tsf.femas.service.registry.ServiceManagerService;
import com.tencent.tsf.femas.service.rule.ConvertService;
import com.tencent.tsf.femas.storage.StorageResult;
Expand Down Expand Up @@ -38,11 +39,14 @@ public class LongPollingEndpoint extends AbstractBaseEndpoint {

private final ServiceManagerService serviceManagerService;

private final NamespaceMangerService namespaceMangerService;

public LongPollingEndpoint(ConvertService convertService,
ServiceManagerService serviceManagerService) {
ServiceManagerService serviceManagerService,NamespaceMangerService namespaceMangerService) {
// this.kvStoreManager = kvStoreManager;
this.convertService = convertService;
this.serviceManagerService = serviceManagerService;
this.namespaceMangerService = namespaceMangerService;
}

@GetMapping("fetchData")
Expand Down Expand Up @@ -87,6 +91,7 @@ public void reportServiceEvent(String namespaceId, String serviceName, String ev
*/
@PostMapping("initNamespace")
public void initNamespace(String registryAddress, String namespaceId) {
executor.invoke(ServiceInvokeEnum.ApiInvokeEnum.NAMESPACE_MANGER_INIT, registryAddress, namespaceId);
namespaceMangerService.initNamespace(registryAddress,namespaceId);
// executor.invoke(ServiceInvokeEnum.ApiInvokeEnum.NAMESPACE_MANGER_INIT, registryAddress, namespaceId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ public class FemasConstant extends ContextConstant {

public static final String FEMAS_REGISTRY_TYPE_KEY = "femas_registry_type";

public static final String FEMAS_PAAS_POLLING_TYPE = "femas_paas_polling_type";

/**
* 请求发起方的应用 ID
*/
Expand Down
21 changes: 21 additions & 0 deletions femas-config/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,27 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Loading

0 comments on commit 6bfc0d2

Please sign in to comment.