Skip to content

Commit

Permalink
[pinpoint-apm#4127] abstract ServerRequest from logic using ThriftReq…
Browse files Browse the repository at this point in the history
…uest
  • Loading branch information
minwoo-jung authored and emeroad committed May 18, 2018
1 parent 23cca4e commit 1a0bd32
Show file tree
Hide file tree
Showing 33 changed files with 190 additions and 75 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.navercorp.pinpoint.collector.service.AgentEventService;
import com.navercorp.pinpoint.common.server.bo.event.AgentEventBo;
import com.navercorp.pinpoint.common.util.CollectionUtils;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.UnSupportedServerRequestTypeException;
import com.navercorp.pinpoint.thrift.dto.TAgentStat;
import com.navercorp.pinpoint.thrift.dto.TAgentStatBatch;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
Expand Down Expand Up @@ -50,8 +52,12 @@ public class AgentEventHandler implements SimpleHandler {
private AgentEventService agentEventService;

@Override
public void handleSimple(ThriftRequest thriftRequest) {
handleSimple(thriftRequest.getTbase());
public void handleSimple(ServerRequest serverRequest) {
if (serverRequest instanceof ThriftRequest) {
handleSimple(((ThriftRequest)serverRequest).getData());
}

throw new UnSupportedServerRequestTypeException(serverRequest.getClass() + "is not support type : " + serverRequest);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.navercorp.pinpoint.collector.handler;

import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.UnSupportedServerRequestTypeException;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
Expand Down Expand Up @@ -44,8 +46,12 @@ public class AgentInfoHandler implements SimpleHandler, RequestResponseHandler {
private ApplicationIndexDao applicationIndexDao;

@Override
public void handleSimple(ThriftRequest thriftRequest) {
handleSimple(thriftRequest.getTbase());
public void handleSimple(ServerRequest serverRequest) {
if (serverRequest instanceof ThriftRequest) {
handleSimple(((ThriftRequest)serverRequest).getData());
}

throw new UnSupportedServerRequestTypeException(serverRequest.getClass() + "is not support type : " + serverRequest);
}

public void handleSimple(TBase<?, ?> tbase) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import com.navercorp.pinpoint.collector.mapper.thrift.stat.AgentStatMapper;
import com.navercorp.pinpoint.collector.service.AgentStatService;
import com.navercorp.pinpoint.common.server.bo.stat.AgentStatBo;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.UnSupportedServerRequestTypeException;
import com.navercorp.pinpoint.thrift.dto.TAgentStat;
import com.navercorp.pinpoint.thrift.dto.TAgentStatBatch;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
Expand Down Expand Up @@ -51,8 +53,12 @@ public class AgentStatHandlerV2 implements SimpleHandler {
private List<AgentStatService> agentStatServiceList = Collections.emptyList();

@Override
public void handleSimple(ThriftRequest thriftRequest) {
handleSimple(thriftRequest.getTbase());
public void handleSimple(ServerRequest serverRequest) {
if (serverRequest instanceof ThriftRequest) {
handleSimple(((ThriftRequest)serverRequest).getData());
}

throw new UnSupportedServerRequestTypeException(serverRequest.getClass() + "is not support type : " + serverRequest);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.navercorp.pinpoint.collector.handler;

import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
import org.apache.thrift.TBase;

Expand All @@ -27,6 +28,6 @@ public interface SimpleHandler {

void handleSimple(TBase<?, ?> tBase);

void handleSimple(ThriftRequest thriftRequest);
void handleSimple(ServerRequest thriftRequest);

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.navercorp.pinpoint.common.service.ServiceTypeRegistryService;
import com.navercorp.pinpoint.common.trace.ServiceType;

import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.UnSupportedServerRequestTypeException;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
Expand Down Expand Up @@ -56,8 +58,12 @@ public class SpanChunkHandler implements SimpleHandler {
private SpanFactory spanFactory;

@Override
public void handleSimple(ThriftRequest thriftRequest) {
handleSimple(thriftRequest.getTbase());
public void handleSimple(ServerRequest serverRequest) {
if (serverRequest instanceof ThriftRequest) {
handleSimple(((ThriftRequest)serverRequest).getData());
}

throw new UnSupportedServerRequestTypeException(serverRequest.getClass() + "is not support type : " + serverRequest);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import com.navercorp.pinpoint.common.service.ServiceTypeRegistryService;
import com.navercorp.pinpoint.common.trace.ServiceType;

import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.UnSupportedServerRequestTypeException;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
import org.apache.commons.collections.CollectionUtils;
import org.apache.thrift.TBase;
Expand Down Expand Up @@ -66,8 +68,12 @@ public class SpanHandler implements SimpleHandler {
private SpanFactory spanFactory;

@Override
public void handleSimple(ThriftRequest thriftRequest) {
handleSimple(thriftRequest.getTbase());
public void handleSimple(ServerRequest serverRequest) {
if (serverRequest instanceof ThriftRequest) {
handleSimple(((ThriftRequest)serverRequest).getData());
}

throw new UnSupportedServerRequestTypeException(serverRequest.getClass() + "is not support type : " + serverRequest);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import com.navercorp.pinpoint.collector.handler.SimpleHandler;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.common.util.CollectionUtils;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
import com.navercorp.pinpoint.io.request.ServerRequest;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -64,22 +64,22 @@ public void dispatchSendMessage(TBase<?, ?> tBase) {
}

@Override
public void dispatchSendMessage(ThriftRequest thriftRequest) {
public void dispatchSendMessage(ServerRequest serverRequest) {

// mark accepted time
acceptedTimeService.accept();

// TODO consider to change dispatch table automatically
List<SimpleHandler> simpleHandlerList = getSimpleHandler(thriftRequest);
List<SimpleHandler> simpleHandlerList = getSimpleHandler(serverRequest);
if (CollectionUtils.isEmpty(simpleHandlerList)) {
throw new UnsupportedOperationException("Handler not found. Unknown type of data received. thrfitRequest=" + thriftRequest);
throw new UnsupportedOperationException("Handler not found. Unknown type of data received. serverRequest=" + serverRequest);
}

for (SimpleHandler simpleHandler : simpleHandlerList) {
if (logger.isTraceEnabled()) {
logger.trace("simpleHandler name:{}", simpleHandler.getClass().getName());
}
simpleHandler.handleSimple(thriftRequest);
simpleHandler.handleSimple(serverRequest);
}


Expand All @@ -104,7 +104,7 @@ protected List<SimpleHandler> getSimpleHandler(TBase<?, ?> tBase) {
return Collections.emptyList();
}

protected List<SimpleHandler> getSimpleHandler(ThriftRequest thriftRequest) {
protected List<SimpleHandler> getSimpleHandler(ServerRequest serverRequest) {
return Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.navercorp.pinpoint.collector.receiver;

import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
import com.navercorp.pinpoint.io.request.ServerRequest;
import org.apache.thrift.TBase;

/**
Expand All @@ -29,7 +29,7 @@ public interface DispatchHandler {

void dispatchSendMessage(TBase<?, ?> tBase);

void dispatchSendMessage(ThriftRequest thriftRequest);
void dispatchSendMessage(ServerRequest serverRequest);

TBase dispatchRequestMessage(TBase<?, ?> tBase);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.navercorp.pinpoint.collector.receiver;

import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
import com.navercorp.pinpoint.io.request.ServerRequest;
import org.apache.thrift.TBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -53,13 +53,13 @@ public void dispatchSendMessage(TBase<?, ?> tBase) {
}

@Override
public void dispatchSendMessage(ThriftRequest thriftRequest) {
public void dispatchSendMessage(ServerRequest serverRequest) {
if (checkAvailable()) {
this.delegate.dispatchSendMessage(thriftRequest);
this.delegate.dispatchSendMessage(serverRequest);
return;
}

logger.debug("Handler is disabled. Skipping send message {}.", thriftRequest);
logger.debug("Handler is disabled. Skipping send message {}.", serverRequest);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.navercorp.pinpoint.collector.receiver;

import com.navercorp.pinpoint.collector.handler.SimpleHandler;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.UnSupportedServerRequestTypeException;
import com.navercorp.pinpoint.thrift.dto.TSpan;
import com.navercorp.pinpoint.thrift.dto.TSpanChunk;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
Expand Down Expand Up @@ -61,7 +63,11 @@ protected List<SimpleHandler> getSimpleHandler(TBase<?, ?> tBase) {
}

@Override
protected List<SimpleHandler> getSimpleHandler(ThriftRequest thriftRequest) {
return getSimpleHandler(thriftRequest.getTbase());
protected List<SimpleHandler> getSimpleHandler(ServerRequest serverRequest) {
if (serverRequest instanceof ThriftRequest) {
return getSimpleHandler(((ThriftRequest)serverRequest).getData());
}

throw new UnSupportedServerRequestTypeException(serverRequest.getClass() + "is not support type : " + serverRequest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.navercorp.pinpoint.collector.handler.AgentEventHandler;
import com.navercorp.pinpoint.collector.handler.AgentStatHandlerV2;
import com.navercorp.pinpoint.collector.handler.SimpleHandler;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.UnSupportedServerRequestTypeException;
import com.navercorp.pinpoint.thrift.dto.TAgentStat;
import com.navercorp.pinpoint.thrift.dto.TAgentStatBatch;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
Expand Down Expand Up @@ -60,8 +62,12 @@ protected List<SimpleHandler> getSimpleHandler(TBase<?, ?> tBase) {
}

@Override
protected List<SimpleHandler> getSimpleHandler(ThriftRequest thriftRequest) {
return getSimpleHandler(thriftRequest.getTbase());
protected List<SimpleHandler> getSimpleHandler(ServerRequest serverRequest) {
if (serverRequest instanceof ThriftRequest) {
return getSimpleHandler(((ThriftRequest)serverRequest).getData());
}

throw new UnSupportedServerRequestTypeException(serverRequest.getClass() + "is not support type : " + serverRequest);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.navercorp.pinpoint.collector.handler.AgentInfoHandler;
import com.navercorp.pinpoint.collector.handler.RequestResponseHandler;
import com.navercorp.pinpoint.collector.handler.SimpleHandler;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.UnSupportedServerRequestTypeException;
import com.navercorp.pinpoint.thrift.dto.*;
import org.apache.thrift.TBase;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -85,7 +87,11 @@ protected List<SimpleHandler> getSimpleHandler(TBase<?, ?> tBase) {
}

@Override
protected List<SimpleHandler> getSimpleHandler(ThriftRequest thriftRequest) {
return getSimpleHandler(thriftRequest.getTbase());
protected List<SimpleHandler> getSimpleHandler(ServerRequest serverRequest) {
if (serverRequest instanceof ThriftRequest) {
return getSimpleHandler(((ThriftRequest)serverRequest).getData());
}

throw new UnSupportedServerRequestTypeException(serverRequest.getClass() + "is not support type : " + serverRequest);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,11 @@

import com.navercorp.pinpoint.collector.receiver.DispatchHandler;
import com.navercorp.pinpoint.collector.util.PacketUtils;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.rpc.PinpointSocket;
import com.navercorp.pinpoint.rpc.packet.BasicPacket;
import com.navercorp.pinpoint.rpc.packet.RequestPacket;
import com.navercorp.pinpoint.rpc.packet.SendPacket;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
import com.navercorp.pinpoint.thrift.io.DeserializerFactory;
import com.navercorp.pinpoint.thrift.io.HeaderTBaseDeserializer;
import com.navercorp.pinpoint.thrift.io.HeaderTBaseSerializer;
Expand Down Expand Up @@ -64,8 +64,8 @@ public void handleSend(SendPacket packet, PinpointSocket pinpointSocket) {
final byte[] payload = getPayload(packet);
SocketAddress remoteAddress = pinpointSocket.getRemoteAddress();
try {
ThriftRequest thriftRequest = SerializationUtils.deserializeThriftRequest(payload, deserializerFactory);
dispatchHandler.dispatchSendMessage(thriftRequest);
ServerRequest serverRequest = SerializationUtils.deserializeServerRequest(payload, deserializerFactory);
dispatchHandler.dispatchSendMessage(serverRequest);
} catch (TException e) {
handleTException(payload, remoteAddress, e);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@
import com.google.common.util.concurrent.MoreExecutors;
import com.navercorp.pinpoint.collector.config.DataReceiverGroupConfiguration;
import com.navercorp.pinpoint.common.server.util.AddressFilter;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.profiler.sender.DataSender;
import com.navercorp.pinpoint.profiler.sender.TcpDataSender;
import com.navercorp.pinpoint.profiler.sender.UdpDataSender;
import com.navercorp.pinpoint.rpc.client.DefaultPinpointClientFactory;
import com.navercorp.pinpoint.rpc.client.PinpointClientFactory;
import com.navercorp.pinpoint.thrift.dto.TResult;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
import org.apache.thrift.TBase;
import org.junit.Assert;
import org.junit.Test;
Expand Down Expand Up @@ -275,8 +275,8 @@ public void dispatchSendMessage(TBase<?, ?> tBase) {
}

@Override
public void dispatchSendMessage(ThriftRequest thriftRequest) {
LOGGER.debug("===================================== send {}", thriftRequest);
public void dispatchSendMessage(ServerRequest serverRequest) {
LOGGER.debug("===================================== send {}", serverRequest);
sendLatch.countDown();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.navercorp.pinpoint.collector.handler.RequestResponseHandler;
import com.navercorp.pinpoint.collector.handler.SimpleHandler;
import com.navercorp.pinpoint.common.server.util.AcceptedTimeService;
import com.navercorp.pinpoint.io.request.ServerRequest;
import com.navercorp.pinpoint.io.request.UnSupportedServerRequestTypeException;
import com.navercorp.pinpoint.thrift.dto.TResult;
import com.navercorp.pinpoint.thrift.dto.ThriftRequest;
import org.apache.thrift.TBase;
Expand Down Expand Up @@ -105,8 +107,12 @@ protected RequestResponseHandler getRequestResponseHandler(TBase<?, ?> tBase) {
}

@Override
protected List<SimpleHandler> getSimpleHandler(ThriftRequest thriftRequest) {
return getSimpleHandler(thriftRequest.getTbase());
protected List<SimpleHandler> getSimpleHandler(ServerRequest serverRequest) {
if (serverRequest instanceof ThriftRequest) {
return getSimpleHandler(((ThriftRequest)serverRequest).getData());
}

throw new UnSupportedServerRequestTypeException(serverRequest.getClass() + "is not support type : " + serverRequest);
}

}
Expand All @@ -121,8 +127,12 @@ public void handleSimple(TBase<?, ?> tbase) {
}

@Override
public void handleSimple(ThriftRequest thriftRequest) {
handleSimple(thriftRequest.getTbase());
public void handleSimple(ServerRequest serverRequest) {
if (serverRequest instanceof ThriftRequest) {
handleSimple(((ThriftRequest)serverRequest).getData());
}

throw new UnSupportedServerRequestTypeException(serverRequest.getClass() + "is not support type : " + serverRequest);
}

public int getExecutedCount() {
Expand Down
Loading

0 comments on commit 1a0bd32

Please sign in to comment.