Skip to content

Commit

Permalink
Merge pull request apache#1471 from metamx/direct-client-metrics
Browse files Browse the repository at this point in the history
add query/node/time metrics to DirectDruidClient
  • Loading branch information
drcrallen committed Jul 1, 2015
2 parents b4eec5b + 2da12de commit f2919b9
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 16 deletions.
2 changes: 2 additions & 0 deletions docs/content/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Available Metrics
|Metric|Description|Dimensions|Normal Value|
|------|-----------|----------|------------|
|`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s|
|`query/node/time`|Milliseconds taken to query individual historical/realtime nodes.|id, status, server.|< 1s|
|`query/node/ttfb`|Time to first byte. Milliseconds elapsed until broker starts receiving the response from individual historical/realtime nodes.|id, status, server.|< 1s|
|`query/intervalChunk/time`|Only emitted if interval chunking is enabled. Milliseconds required to query an interval chunk.|id, status, chunkInterval (if interval chunking is enabled).|< 1s|

### Historical
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package io.druid.query;

import com.google.common.base.Function;
import com.google.common.base.Strings;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.metamx.common.guava.Accumulator;
Expand Down Expand Up @@ -103,11 +104,7 @@ public Sequence<T> run(final Query<T> query, final Map<String, Object> responseC
builder.setDimension(userDimension.getKey(), userDimension.getValue());
}

String queryId = query.getId();
if (queryId == null) {
queryId = "";
}
builder.setDimension(DruidMetrics.ID, queryId);
builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));

return new Sequence<T>()
{
Expand Down
8 changes: 6 additions & 2 deletions server/src/main/java/io/druid/client/BrokerServerView.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.google.common.collect.Ordering;
import com.google.inject.Inject;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import io.druid.client.selector.QueryableDruidServer;
import io.druid.client.selector.ServerSelector;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class BrokerServerView implements TimelineServerView
private final HttpClient httpClient;
private final ServerInventoryView baseView;
private final TierSelectorStrategy tierSelectorStrategy;
private final ServiceEmitter emitter;

private volatile boolean initialized = false;

Expand All @@ -77,7 +79,8 @@ public BrokerServerView(
@Smile ObjectMapper smileMapper,
@Client HttpClient httpClient,
ServerInventoryView baseView,
TierSelectorStrategy tierSelectorStrategy
TierSelectorStrategy tierSelectorStrategy,
ServiceEmitter emitter
)
{
this.warehouse = warehouse;
Expand All @@ -86,6 +89,7 @@ public BrokerServerView(
this.httpClient = httpClient;
this.baseView = baseView;
this.tierSelectorStrategy = tierSelectorStrategy;
this.emitter = emitter;

this.clients = Maps.newConcurrentMap();
this.selectors = Maps.newHashMap();
Expand Down Expand Up @@ -173,7 +177,7 @@ private QueryableDruidServer addServer(DruidServer server)

private DirectDruidClient makeDirectClient(DruidServer server)
{
return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getHost());
return new DirectDruidClient(warehouse, queryWatcher, smileMapper, httpClient, server.getHost(), emitter);
}

private QueryableDruidServer removeServer(DruidServer server)
Expand Down
27 changes: 22 additions & 5 deletions server/src/main/java/io/druid/client/DirectDruidClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.fasterxml.jackson.dataformat.smile.SmileFactory;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Charsets;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.ByteSource;
Expand All @@ -41,13 +42,16 @@
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.http.client.HttpClient;
import com.metamx.http.client.Request;
import com.metamx.http.client.response.ClientResponse;
import com.metamx.http.client.response.HttpResponseHandler;
import com.metamx.http.client.response.StatusResponseHandler;
import com.metamx.http.client.response.StatusResponseHolder;
import io.druid.query.BySegmentResultValueClass;
import io.druid.query.DruidMetrics;
import io.druid.query.Query;
import io.druid.query.QueryInterruptedException;
import io.druid.query.QueryRunner;
Expand Down Expand Up @@ -94,6 +98,7 @@ public class DirectDruidClient<T> implements QueryRunner<T>
private final ObjectMapper objectMapper;
private final HttpClient httpClient;
private final String host;
private final ServiceEmitter emitter;

private final AtomicInteger openConnections;
private final boolean isSmile;
Expand All @@ -103,14 +108,16 @@ public DirectDruidClient(
QueryWatcher queryWatcher,
ObjectMapper objectMapper,
HttpClient httpClient,
String host
String host,
ServiceEmitter emitter
)
{
this.warehouse = warehouse;
this.queryWatcher = queryWatcher;
this.objectMapper = objectMapper;
this.httpClient = httpClient;
this.host = host;
this.emitter = emitter;

this.isSmile = this.objectMapper.getFactory() instanceof SmileFactory;
this.openConnections = new AtomicInteger();
Expand Down Expand Up @@ -152,9 +159,16 @@ public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
try {
log.debug("Querying url[%s]", url);

final long requestStartTime = System.currentTimeMillis();

final ServiceMetricEvent.Builder builder = toolChest.makeMetricBuilder(query);
builder.setDimension("server", host);
builder.setDimension(DruidMetrics.ID, Strings.nullToEmpty(query.getId()));


final HttpResponseHandler<InputStream, InputStream> responseHandler = new HttpResponseHandler<InputStream, InputStream>()
{
private long startTime;
private long responseStartTime;
private final AtomicLong byteCount = new AtomicLong(0);
private final BlockingQueue<InputStream> queue = new LinkedBlockingQueue<>();
private final AtomicBoolean done = new AtomicBoolean(false);
Expand All @@ -163,7 +177,9 @@ public Sequence<T> run(final Query<T> query, final Map<String, Object> context)
public ClientResponse<InputStream> handleResponse(HttpResponse response)
{
log.debug("Initial response from url[%s]", url);
startTime = System.currentTimeMillis();
responseStartTime = System.currentTimeMillis();
emitter.emit(builder.build("query/node/ttfb", responseStartTime - requestStartTime));

try {
final String responseContext = response.headers().get("X-Druid-Response-Context");
// context may be null in case of error or query timeout
Expand Down Expand Up @@ -256,9 +272,10 @@ public ClientResponse<InputStream> done(ClientResponse<InputStream> clientRespon
"Completed request to url[%s] with %,d bytes returned in %,d millis [%,f b/s].",
url,
byteCount.get(),
stopTime - startTime,
byteCount.get() / (0.0001 * (stopTime - startTime))
stopTime - responseStartTime,
byteCount.get() / (0.0001 * (stopTime - responseStartTime))
);
emitter.emit(builder.build("query/node/time", stopTime - requestStartTime));
synchronized (done) {
try {
// An empty byte array is put at the end to give the SequenceInputStream.close() as something to close out
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.metamx.common.Pair;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.http.client.HttpClient;
import io.druid.client.selector.HighestPriorityTierSelectorStrategy;
import io.druid.client.selector.RandomServerSelectorStrategy;
Expand All @@ -41,6 +42,7 @@
import io.druid.query.TableDataSource;
import io.druid.server.coordination.DruidServerMetadata;
import io.druid.server.initialization.ZkPathsConfig;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.TimelineLookup;
import io.druid.timeline.TimelineObjectHolder;
Expand Down Expand Up @@ -364,7 +366,8 @@ public CallbackAction segmentViewInitialized()
getSmileMapper(),
EasyMock.createMock(HttpClient.class),
baseView,
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy())
new HighestPriorityTierSelectorStrategy(new RandomServerSelectorStrategy()),
new NoopServiceEmitter()
);
}

Expand Down
10 changes: 7 additions & 3 deletions server/src/test/java/io/druid/client/DirectDruidClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.druid.query.ReflectionQueryToolChestWarehouse;
import io.druid.query.Result;
import io.druid.query.timeboundary.TimeBoundaryQuery;
import io.druid.server.metrics.NoopServiceEmitter;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.easymock.Capture;
Expand Down Expand Up @@ -117,14 +118,16 @@ public void testRun() throws Exception
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
"foo"
"foo",
new NoopServiceEmitter()
);
DirectDruidClient client2 = new DirectDruidClient(
new ReflectionQueryToolChestWarehouse(),
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
"foo2"
"foo2",
new NoopServiceEmitter()
);

QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
Expand Down Expand Up @@ -225,7 +228,8 @@ public void testCancel() throws Exception
QueryRunnerTestHelper.NOOP_QUERYWATCHER,
new DefaultObjectMapper(),
httpClient,
"foo"
"foo",
new NoopServiceEmitter()
);

QueryableDruidServer queryableDruidServer1 = new QueryableDruidServer(
Expand Down

0 comments on commit f2919b9

Please sign in to comment.