Skip to content

Commit

Permalink
[pinpoint-apm#4558] Add grpc stream executor server interceptor
Browse files Browse the repository at this point in the history
  • Loading branch information
jaehong-kim committed Oct 10, 2019
1 parent 2ab3b54 commit 9a4a66f
Show file tree
Hide file tree
Showing 19 changed files with 528 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public final class SpanReceiverConfiguration implements DataReceiverGroupConfigu
private static final String GRPC_WORKER_EXECUTOR_QUEUE_SIZE = GRPC_PREFIX + ".worker.executor.queue.size";
private static final String GRPC_WORKER_EXECUTOR_MONITOR_ENABLE = GRPC_PREFIX + ".worker.executor.monitor.enable";

private static final String GRPC_STREAM_SCHEDULER_THREAD_SIZE = GRPC_PREFIX + ".stream.scheduler.thread.size";
private static final String GRPC_CALL_INIT_REQUEST_COUNT = GRPC_PREFIX + ".stream.call.init.request.count";
private static final String GRPC_STREAM_SCHEDULER_PERIOD_MILLIS = GRPC_PREFIX + ".stream.scheduler.period.millis";

private static final String TCP_ENABLE = PREFIX + ".tcp";
private static final String TCP_BIND_IP = PREFIX + ".tcp.ip";
private static final String TCP_BIND_PORT = PREFIX + ".tcp.port";
Expand Down Expand Up @@ -73,6 +77,9 @@ public final class SpanReceiverConfiguration implements DataReceiverGroupConfigu
private final int grpcWorkerExecutorThreadSize;
private final int grpcWorkerExecutorQueueSize;
private final boolean grpcWorkerExecutorMonitorEnable;
private final int grpcStreamSchedulerThreadSize;
private final int grpcStreamCallInitRequestCount;
private final int grpcStreamSchedulerPeriodMillis;
private final ServerOption grpcServerOption;

public SpanReceiverConfiguration(Properties properties, DeprecatedConfiguration deprecatedConfiguration) {
Expand Down Expand Up @@ -111,6 +118,10 @@ public SpanReceiverConfiguration(Properties properties, DeprecatedConfiguration
this.grpcWorkerExecutorQueueSize = CollectorConfiguration.readInt(properties, GRPC_WORKER_EXECUTOR_QUEUE_SIZE, 1024 * 5);
Assert.isTrue(grpcWorkerExecutorQueueSize > 0, "grpcWorkerExecutorQueueSize must be greater than 0");
this.grpcWorkerExecutorMonitorEnable = CollectorConfiguration.readBoolean(properties, GRPC_WORKER_EXECUTOR_MONITOR_ENABLE);
this.grpcStreamSchedulerThreadSize = CollectorConfiguration.readInt(properties, GRPC_STREAM_SCHEDULER_THREAD_SIZE, 1);
Assert.isTrue(grpcStreamSchedulerThreadSize > 0, "grpcStreamSchedulerThreadSize must be greater than 0");
this.grpcStreamSchedulerPeriodMillis = CollectorConfiguration.readInt(properties, GRPC_STREAM_SCHEDULER_PERIOD_MILLIS, 1000);
this.grpcStreamCallInitRequestCount = CollectorConfiguration.readInt(properties, GRPC_CALL_INIT_REQUEST_COUNT, 64);

// Server option
final ServerOption.Builder serverOptionBuilder = GrpcPropertiesServerOptionBuilder.newBuilder(properties, GRPC_PREFIX);
Expand Down Expand Up @@ -304,6 +315,18 @@ public boolean isGrpcWorkerExecutorMonitorEnable() {
return grpcWorkerExecutorMonitorEnable;
}

public int getGrpcStreamSchedulerThreadSize() {
return grpcStreamSchedulerThreadSize;
}

public int getGrpcStreamCallInitRequestCount() {
return grpcStreamCallInitRequestCount;
}

public int getGrpcStreamSchedulerPeriodMillis() {
return grpcStreamSchedulerPeriodMillis;
}

public ServerOption getGrpcServerOption() {
return grpcServerOption;
}
Expand All @@ -330,6 +353,9 @@ public String toString() {
sb.append(", grpcWorkerExecutorThreadSize=").append(grpcWorkerExecutorThreadSize);
sb.append(", grpcWorkerExecutorQueueSize=").append(grpcWorkerExecutorQueueSize);
sb.append(", grpcWorkerExecutorMonitorEnable=").append(grpcWorkerExecutorMonitorEnable);
sb.append(", grpcStreamSchedulerThreadSize=").append(grpcStreamSchedulerThreadSize);
sb.append(", grpcStreamCallInitRequestCount=").append(grpcStreamCallInitRequestCount);
sb.append(", grpcStreamSchedulerPeriodMillis=").append(grpcStreamSchedulerPeriodMillis);
sb.append(", grpcServerOption=").append(grpcServerOption);
sb.append('}');
return sb.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ public final class StatReceiverConfiguration implements DataReceiverGroupConfigu
private static final String GRPC_WORKER_EXECUTOR_QUEUE_SIZE = GRPC_PREFIX + ".worker.executor.queue.size";
private static final String GRPC_WORKER_EXECUTOR_MONITOR_ENABLE = GRPC_PREFIX + ".worker.executor.monitor.enable";

private static final String GRPC_STREAM_SCHEDULER_THREAD_SIZE = GRPC_PREFIX + ".stream.scheduler.thread.size";
private static final String GRPC_CALL_INIT_REQUEST_COUNT = GRPC_PREFIX + ".stream.call.init.request.count";
private static final String GRPC_STREAM_SCHEDULER_PERIOD_MILLIS = GRPC_PREFIX + ".stream.scheduler.period.millis";

private static final String TCP_ENABLE = PREFIX + ".tcp";
private static final String TCP_BIND_IP = PREFIX + ".tcp.ip";
Expand Down Expand Up @@ -72,6 +75,9 @@ public final class StatReceiverConfiguration implements DataReceiverGroupConfigu
private final int grpcWorkerExecutorThreadSize;
private final int grpcWorkerExecutorQueueSize;
private final boolean grpcWorkerExecutorMonitorEnable;
private final int grpcStreamSchedulerThreadSize;
private final int grpcStreamCallInitRequestCount;
private final int grpcStreamSchedulerPeriodMillis;
private final ServerOption grpcServerOption;

public StatReceiverConfiguration(Properties properties, DeprecatedConfiguration deprecatedConfiguration) {
Expand Down Expand Up @@ -109,6 +115,10 @@ public StatReceiverConfiguration(Properties properties, DeprecatedConfiguration
this.grpcWorkerExecutorQueueSize = CollectorConfiguration.readInt(properties, GRPC_WORKER_EXECUTOR_QUEUE_SIZE, 1024 * 5);
Assert.isTrue(grpcWorkerExecutorQueueSize > 0, "grpcWorkerExecutorQueueSize must be greater than 0");
this.grpcWorkerExecutorMonitorEnable = CollectorConfiguration.readBoolean(properties, GRPC_WORKER_EXECUTOR_MONITOR_ENABLE);
this.grpcStreamSchedulerThreadSize = CollectorConfiguration.readInt(properties, GRPC_STREAM_SCHEDULER_THREAD_SIZE, 1);
Assert.isTrue(grpcStreamSchedulerThreadSize > 0, "grpcStreamSchedulerThreadSize must be greater than 0");
this.grpcStreamSchedulerPeriodMillis = CollectorConfiguration.readInt(properties, GRPC_STREAM_SCHEDULER_PERIOD_MILLIS, 1000);
this.grpcStreamCallInitRequestCount = CollectorConfiguration.readInt(properties, GRPC_CALL_INIT_REQUEST_COUNT, 64);

// Server option
final ServerOption.Builder serverOptionBuilder = GrpcPropertiesServerOptionBuilder.newBuilder(properties, GRPC_PREFIX);
Expand Down Expand Up @@ -302,6 +312,18 @@ public boolean isGrpcWorkerExecutorMonitorEnable() {
return grpcWorkerExecutorMonitorEnable;
}

public int getGrpcStreamSchedulerThreadSize() {
return grpcStreamSchedulerThreadSize;
}

public int getGrpcStreamCallInitRequestCount() {
return grpcStreamCallInitRequestCount;
}

public int getGrpcStreamSchedulerPeriodMillis() {
return grpcStreamSchedulerPeriodMillis;
}

public ServerOption getGrpcServerOption() {
return grpcServerOption;
}
Expand All @@ -328,8 +350,11 @@ public String toString() {
sb.append(", grpcWorkerExecutorThreadSize=").append(grpcWorkerExecutorThreadSize);
sb.append(", grpcWorkerExecutorQueueSize=").append(grpcWorkerExecutorQueueSize);
sb.append(", grpcWorkerExecutorMonitorEnable=").append(grpcWorkerExecutorMonitorEnable);
sb.append(", grpcStreamSchedulerThreadSize=").append(grpcStreamSchedulerThreadSize);
sb.append(", grpcStreamCallInitRequestCount=").append(grpcStreamCallInitRequestCount);
sb.append(", grpcStreamSchedulerPeriodMillis=").append(grpcStreamSchedulerPeriodMillis);
sb.append(", grpcServerOption=").append(grpcServerOption);
sb.append('}');
return sb.toString();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@
import com.navercorp.pinpoint.common.server.util.AddressFilter;
import com.navercorp.pinpoint.common.util.Assert;
import com.navercorp.pinpoint.common.util.CollectionUtils;
import com.navercorp.pinpoint.grpc.server.StreamExecutorRejectedExecutionRequestScheduler;
import com.navercorp.pinpoint.grpc.server.StreamExecutorServerInterceptor;
import com.navercorp.pinpoint.grpc.server.MetadataServerTransportFilter;
import com.navercorp.pinpoint.grpc.server.ServerFactory;
import com.navercorp.pinpoint.grpc.server.ServerOption;
import com.navercorp.pinpoint.grpc.server.StreamExecutorService;
import com.navercorp.pinpoint.grpc.server.TransportMetadataFactory;
import com.navercorp.pinpoint.grpc.server.TransportMetadataServerInterceptor;

import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import io.grpc.ServerServiceDefinition;
import io.grpc.ServerTransportFilter;
import org.slf4j.Logger;
Expand All @@ -40,6 +44,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/**
* @author Taejin Koo
Expand Down Expand Up @@ -106,21 +111,43 @@ public void afterPropertiesSet() throws Exception {
}

// Add service
addService();

this.server = serverFactory.build();
if (logger.isInfoEnabled()) {
logger.info("Start {} server {}", this.beanName, this.server);
}
this.server.start();
}

private void addService() {
for (Object service : serviceList) {
if (service instanceof BindableService) {
this.serverFactory.addService((BindableService) service);
final ServerInterceptor interceptor = getStreamExecutorServerInterceptor(service);
if (interceptor != null) {
this.serverFactory.addService(ServerInterceptors.intercept((BindableService) service, interceptor));
} else {
this.serverFactory.addService((BindableService) service);
}
} else if (service instanceof ServerServiceDefinition) {
this.serverFactory.addService((ServerServiceDefinition) service);
final ServerInterceptor interceptor = getStreamExecutorServerInterceptor(service);
if (interceptor != null) {
this.serverFactory.addService(ServerInterceptors.intercept((ServerServiceDefinition) service, interceptor));
} else {
this.serverFactory.addService((ServerServiceDefinition) service);
}
} else {
throw new IllegalStateException("unsupported service type " + service);
}
}
}

this.server = serverFactory.build();
if (logger.isInfoEnabled()) {
logger.info("Start {} server {}", this.beanName, this.server);
private ServerInterceptor getStreamExecutorServerInterceptor(Object service) {
if (service instanceof StreamExecutorService) {
final ServerInterceptor interceptor = ((StreamExecutorService) service).getStreamExecutorServerInterceptor();
return interceptor;
}
this.server.start();
return null;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
import com.google.protobuf.Empty;
import com.google.protobuf.GeneratedMessageV3;
import com.navercorp.pinpoint.collector.receiver.DispatchHandler;
import com.navercorp.pinpoint.common.annotations.VisibleForTesting;
import com.navercorp.pinpoint.grpc.MessageFormatUtils;
import com.navercorp.pinpoint.grpc.StatusError;
import com.navercorp.pinpoint.grpc.StatusErrors;
import com.navercorp.pinpoint.grpc.server.StreamExecutorServerInterceptor;
import com.navercorp.pinpoint.grpc.server.StreamExecutorService;
import com.navercorp.pinpoint.grpc.trace.PSpan;
import com.navercorp.pinpoint.grpc.trace.PSpanChunk;
import com.navercorp.pinpoint.grpc.trace.PSpanMessage;
Expand All @@ -34,7 +35,6 @@
import com.navercorp.pinpoint.io.request.Message;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.thrift.io.DefaultTBaseLocator;
import io.grpc.Context;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.StatusRuntimeException;
Expand All @@ -45,22 +45,27 @@
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;

/**
* @author jaehong.kim
*/
public class SpanService extends SpanGrpc.SpanImplBase {
public class SpanService extends SpanGrpc.SpanImplBase implements StreamExecutorService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final boolean isDebug = logger.isDebugEnabled();
private final DispatchHandler dispatchHandler;
private final ServerRequestFactory serverRequestFactory = new ServerRequestFactory();
private final Executor executor;
private final int initRequestCount;
private final ScheduledExecutorService scheduledExecutorService;
private final int periodMillis;

public SpanService(DispatchHandler dispatchHandler, Executor executor) {
public SpanService(DispatchHandler dispatchHandler, Executor executor, final int initRequestCount, final ScheduledExecutorService scheduledExecutorService, final int periodMillis) {
this.dispatchHandler = Objects.requireNonNull(dispatchHandler, "dispatchHandler must not be null");
Objects.requireNonNull(executor, "executor must not be null");
this.executor = Context.currentContextExecutor(executor);
this.executor = Objects.requireNonNull(executor, "executor must not be null");
this.initRequestCount = initRequestCount;
this.scheduledExecutorService = Objects.requireNonNull(scheduledExecutorService, "scheduledExecutorService must not be null");
this.periodMillis = periodMillis;
}

@Override
Expand All @@ -74,10 +79,10 @@ public void onNext(PSpanMessage spanMessage) {

if (spanMessage.hasSpan()) {
final Message<PSpan> message = newMessage(spanMessage.getSpan(), DefaultTBaseLocator.SPAN);
doExecute(message, responseObserver);
send(message, responseObserver);
} else if (spanMessage.hasSpanChunk()) {
final Message<PSpanChunk> message = newMessage(spanMessage.getSpanChunk(), DefaultTBaseLocator.SPANCHUNK);
doExecute(message, responseObserver);
send(message, responseObserver);
} else {
if (isDebug) {
logger.debug("Found empty span message {}", MessageFormatUtils.debugLog(spanMessage));
Expand Down Expand Up @@ -111,21 +116,6 @@ private <T> Message<T> newMessage(T requestData, short serviceType) {
return new DefaultMessage<>(header, headerEntity, requestData);
}

@VisibleForTesting
void doExecute(final Message<? extends GeneratedMessageV3> message, StreamObserver<Empty> responseObserver) {
try {
executor.execute(new Runnable() {
@Override
public void run() {
send(message, responseObserver);
}
});
} catch (RejectedExecutionException ree) {
// Defense code
logger.warn("Failed to request. Rejected execution, executor={}", executor);
}
}

private void send(final Message<? extends GeneratedMessageV3> message, StreamObserver<Empty> responseObserver) {
try {
ServerRequest<? extends GeneratedMessageV3> request = serverRequestFactory.newServerRequest(message);
Expand All @@ -140,4 +130,10 @@ private void send(final Message<? extends GeneratedMessageV3> message, StreamObs
}
}
}

@Override
public StreamExecutorServerInterceptor getStreamExecutorServerInterceptor() {
final StreamExecutorServerInterceptor interceptor = new StreamExecutorServerInterceptor(this.executor, initRequestCount, this.scheduledExecutorService, this.periodMillis);
return interceptor;
}
}
Loading

0 comments on commit 9a4a66f

Please sign in to comment.