Skip to content

Commit

Permalink
Add metrics for Query Count statistics (apache#3470)
Browse files Browse the repository at this point in the history
* Add metrics for Query Count statistics

This PR adds a new metrics monitor “QueryCountStatsMonitor” which emits
three new metrics -
1) query/success/count - number of successful queries
2) query/failed/count - number of failed queries
3) query/interrupted/count - number of interrupted/timedout queries

fix bindings

* make fields final

* fix imports

* AsyncQueryForwardingServlet implement QueryStatsProvider

* remove unused import
  • Loading branch information
nishantmonu51 authored and fjy committed Dec 19, 2016
1 parent 8eee259 commit 35160e5
Show file tree
Hide file tree
Showing 12 changed files with 180 additions and 47 deletions.
1 change: 1 addition & 0 deletions docs/content/configuration/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ The following monitors are available:
|`com.metamx.metrics.JvmMonitor`|Reports JVM-related statistics.|
|`io.druid.segment.realtime.RealtimeMetricsMonitor`|Reports statistics on Realtime nodes.|
|`io.druid.server.metrics.EventReceiverFirehoseMonitor`|Reports how many events have been queued in the EventReceiverFirehose.|
|`io.druid.server.metrics.QueryCountStatsMonitor`|Reports how many queries have been successful/failed/interrupted.|

### Emitting Metrics

Expand Down
9 changes: 9 additions & 0 deletions docs/content/operations/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ Available Metrics
|`query/node/bytes`|number of bytes returned from querying individual historical/realtime nodes.|id, status, server.| |
|`query/node/ttfb`|Time to first byte. Milliseconds elapsed until broker starts receiving the response from individual historical/realtime nodes.|id, status, server.|< 1s|
|`query/intervalChunk/time`|Only emitted if interval chunking is enabled. Milliseconds required to query an interval chunk.|id, status, chunkInterval (if interval chunking is enabled).|< 1s|
|`query/success/count`|number of queries successfully processed|This metric is only available if the QueryCountStatsMonitor module is included.||
|`query/failed/count`|number of failed queries|This metric is only available if the QueryCountStatsMonitor module is included.||
|`query/interrupted/count`|number of queries interrupted due to cancellation or timeout|This metric is only available if the QueryCountStatsMonitor module is included.||

### Historical

Expand All @@ -45,6 +48,9 @@ Available Metrics
|`segment/scan/pending`|Number of segments in queue waiting to be scanned.||Close to 0|
|`query/segmentAndCache/time`|Milliseconds taken to query individual segment or hit the cache (if it is enabled on the historical node).|id, segment.|several hundred milliseconds|
|`query/cpu/time`|Microseconds of CPU time taken to complete a query|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|Varies|
|`query/success/count`|number of queries successfully processed|This metric is only available if the QueryCountStatsMonitor module is included.||
|`query/failed/count`|number of failed queries|This metric is only available if the QueryCountStatsMonitor module is included.||
|`query/interrupted/count`|number of queries interrupted due to cancellation or timeout|This metric is only available if the QueryCountStatsMonitor module is included.||

### Real-time

Expand All @@ -53,6 +59,9 @@ Available Metrics
|`query/time`|Milliseconds taken to complete a query.|Common: dataSource, type, interval, hasFilters, duration, context, remoteAddress, id. Aggregation Queries: numMetrics, numComplexMetrics. GroupBy: numDimensions. TopN: threshold, dimension.|< 1s|
|`query/wait/time`|Milliseconds spent waiting for a segment to be scanned.|id, segment.|several hundred milliseconds|
|`segment/scan/pending`|Number of segments in queue waiting to be scanned.||Close to 0|
|`query/success/count`|number of queries successfully processed|This metric is only available if the QueryCountStatsMonitor module is included.||
|`query/failed/count`|number of failed queries|This metric is only available if the QueryCountStatsMonitor module is included.||
|`query/interrupted/count`|number of queries interrupted due to cancellation or timeout|This metric is only available if the QueryCountStatsMonitor module is included.||

### Jetty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.Provider;
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
Expand All @@ -34,6 +35,7 @@
import io.druid.query.Query;
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.log.RequestLogger;
import io.druid.server.metrics.QueryCountStatsProvider;
import io.druid.server.router.QueryHostFinder;
import io.druid.server.router.Router;
import org.eclipse.jetty.client.HttpClient;
Expand All @@ -56,11 +58,12 @@
import java.net.URLDecoder;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

/**
* This class does async query processing and should be merged with QueryResource at some point
*/
public class AsyncQueryForwardingServlet extends AsyncProxyServlet
public class AsyncQueryForwardingServlet extends AsyncProxyServlet implements QueryCountStatsProvider
{
private static final EmittingLogger log = new EmittingLogger(AsyncQueryForwardingServlet.class);
@Deprecated // use SmileMediaTypes.APPLICATION_JACKSON_SMILE
Expand All @@ -72,6 +75,9 @@ public class AsyncQueryForwardingServlet extends AsyncProxyServlet

private static final int CANCELLATION_TIMEOUT_MILLIS = 500;
private static final int MAX_QUEUED_CANCELLATIONS = 64;
private final AtomicLong successfulQueryCount = new AtomicLong();
private final AtomicLong failedQueryCount = new AtomicLong();
private final AtomicLong interruptedQueryCount = new AtomicLong();

private static void handleException(HttpServletResponse response, ObjectMapper objectMapper, Exception exception)
throws IOException
Expand Down Expand Up @@ -100,6 +106,7 @@ private static void handleException(HttpServletResponse response, ObjectMapper o

private HttpClient broadcastClient;

@Inject
public AsyncQueryForwardingServlet(
QueryToolChestWarehouse warehouse,
@Json ObjectMapper jsonMapper,
Expand Down Expand Up @@ -191,6 +198,7 @@ public void onComplete(Result result)
}
);
}
interruptedQueryCount.incrementAndGet();
}
} else if (isQueryEndpoint && HttpMethod.POST.is(request.getMethod())) {
// query request
Expand Down Expand Up @@ -317,6 +325,24 @@ private Response.Listener newMetricsEmittingProxyResponseListener(
return new MetricsEmittingProxyResponseListener(request, response, query, start);
}

@Override
public long getSuccessfulQueryCount()
{
return successfulQueryCount.get();
}

@Override
public long getFailedQueryCount()
{
return failedQueryCount.get();
}

@Override
public long getInterruptedQueryCount()
{
return interruptedQueryCount.get();
}


private class MetricsEmittingProxyResponseListener extends ProxyResponseListener
{
Expand Down Expand Up @@ -345,11 +371,16 @@ public void onComplete(Result result)
{
final long requestTime = System.currentTimeMillis() - start;
try {
boolean success = result.isSucceeded();
if (success) {
successfulQueryCount.incrementAndGet();
} else {
failedQueryCount.incrementAndGet();
}
emitter.emit(
DruidMetrics.makeQueryTimeMetric(warehouse.getToolChest(query), jsonMapper, query, req.getRemoteAddr())
.build("query/time", requestTime)
);

requestLogger.log(
new RequestLogLine(
new DateTime(),
Expand All @@ -360,12 +391,14 @@ public void onComplete(Result result)
"query/time",
requestTime,
"success",
result.isSucceeded()
success
&& result.getResponse().getStatus() == javax.ws.rs.core.Response.Status.OK.getStatusCode()
)
)
)
);


}
catch (Exception e) {
log.error(e, "Unable to log query [%s]!", query);
Expand All @@ -379,6 +412,7 @@ public void onFailure(Response response, Throwable failure)
{
try {
final String errorMessage = failure.getMessage();
failedQueryCount.incrementAndGet();
requestLogger.log(
new RequestLogLine(
new DateTime(),
Expand Down
30 changes: 28 additions & 2 deletions server/src/main/java/io/druid/server/QueryResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import io.druid.query.QueryToolChestWarehouse;
import io.druid.server.initialization.ServerConfig;
import io.druid.server.log.RequestLogger;
import io.druid.server.metrics.QueryCountStatsProvider;
import io.druid.server.security.Access;
import io.druid.server.security.Action;
import io.druid.server.security.AuthConfig;
Expand Down Expand Up @@ -73,18 +74,20 @@
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;

/**
*/
@Path("/druid/v2/")
public class QueryResource
public class QueryResource implements QueryCountStatsProvider
{
protected static final EmittingLogger log = new EmittingLogger(QueryResource.class);
@Deprecated // use SmileMediaTypes.APPLICATION_JACKSON_SMILE
protected static final String APPLICATION_SMILE = "application/smile";

protected static final int RESPONSE_CTX_HEADER_LEN_LIMIT = 7 * 1024;


protected final QueryToolChestWarehouse warehouse;
protected final ServerConfig config;
protected final ObjectMapper jsonMapper;
Expand All @@ -94,6 +97,9 @@ public class QueryResource
protected final RequestLogger requestLogger;
protected final QueryManager queryManager;
protected final AuthConfig authConfig;
private final AtomicLong successfulQueryCount = new AtomicLong();
private final AtomicLong failedQueryCount = new AtomicLong();
private final AtomicLong interruptedQueryCount = new AtomicLong();

@Inject
public QueryResource(
Expand Down Expand Up @@ -250,7 +256,7 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE

os.flush(); // Some types of OutputStream suppress flush errors in the .close() method.
os.close();

successfulQueryCount.incrementAndGet();
final long queryTime = System.currentTimeMillis() - start;
emitter.emit(
DruidMetrics.makeQueryTimeMetric(theToolChest, jsonMapper, theQuery, req.getRemoteAddr())
Expand Down Expand Up @@ -308,6 +314,7 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE
catch (QueryInterruptedException e) {
try {
log.warn(e, "Exception while processing queryId [%s]", queryId);
interruptedQueryCount.incrementAndGet();
final long queryTime = System.currentTimeMillis() - start;
emitter.emit(
DruidMetrics.makeQueryTimeMetric(toolChest, jsonMapper, query, req.getRemoteAddr())
Expand Down Expand Up @@ -347,6 +354,7 @@ public void write(OutputStream outputStream) throws IOException, WebApplicationE
: query.toString();

log.warn(e, "Exception occurred on request [%s]", queryString);
failedQueryCount.incrementAndGet();

try {
final long queryTime = System.currentTimeMillis() - start;
Expand Down Expand Up @@ -437,4 +445,22 @@ Response gotError(Exception e) throws IOException
.build();
}
}

@Override
public long getSuccessfulQueryCount()
{
return successfulQueryCount.get();
}

@Override
public long getFailedQueryCount()
{
return failedQueryCount.get();
}

@Override
public long getInterruptedQueryCount()
{
return interruptedQueryCount.get();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.metrics;

import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import com.metamx.metrics.AbstractMonitor;
import com.metamx.metrics.KeyedDiff;

import java.util.Map;

public class QueryCountStatsMonitor extends AbstractMonitor
{
private final KeyedDiff keyedDiff = new KeyedDiff();
private final QueryCountStatsProvider statsProvider;

@Inject
public QueryCountStatsMonitor(
QueryCountStatsProvider statsProvider
)
{
this.statsProvider = statsProvider;
}

@Override
public boolean doMonitor(ServiceEmitter emitter)
{
final ServiceMetricEvent.Builder builder = new ServiceMetricEvent.Builder();
Map<String, Long> diff = keyedDiff.to(
"queryCountStats",
ImmutableMap.of("query/success/count", statsProvider.getSuccessfulQueryCount(),
"query/failed/count", statsProvider.getFailedQueryCount(),
"query/interrupted/count", statsProvider.getInterruptedQueryCount()
)
);
if (diff != null) {
for (Map.Entry<String, Long> diffEntry : diff.entrySet()) {
emitter.emit(builder.build(diffEntry.getKey(), diffEntry.getValue()));
}
}
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package io.druid.server.metrics;

public interface QueryCountStatsProvider
{
public long getSuccessfulQueryCount();

public long getFailedQueryCount();

public long getInterruptedQueryCount();
}
3 changes: 3 additions & 0 deletions services/src/main/java/io/druid/cli/CliBroker.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.druid.server.http.BrokerResource;
import io.druid.server.initialization.jetty.JettyServerInitializer;
import io.druid.server.metrics.MetricsModule;
import io.druid.server.metrics.QueryCountStatsProvider;
import io.druid.server.router.TieredBrokerConfig;
import io.druid.sql.guice.SqlModule;
import org.eclipse.jetty.server.Server;
Expand Down Expand Up @@ -100,7 +101,9 @@ public void configure(Binder binder)
binder.bind(QuerySegmentWalker.class).to(ClientQuerySegmentWalker.class).in(LazySingleton.class);

binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);

Jerseys.addResource(binder, BrokerQueryResource.class);
binder.bind(QueryCountStatsProvider.class).to(BrokerQueryResource.class).in(LazySingleton.class);
Jerseys.addResource(binder, BrokerResource.class);
Jerseys.addResource(binder, ClientInfoResource.class);
LifecycleModule.register(binder, BrokerQueryResource.class);
Expand Down
2 changes: 2 additions & 0 deletions services/src/main/java/io/druid/cli/CliHistorical.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.druid.java.util.common.logger.Logger;
import io.druid.query.QuerySegmentWalker;
import io.druid.query.lookup.LookupModule;
import io.druid.server.metrics.QueryCountStatsProvider;
import io.druid.server.QueryResource;
import io.druid.server.coordination.ServerManager;
import io.druid.server.coordination.ZkCoordinator;
Expand Down Expand Up @@ -82,6 +83,7 @@ public void configure(Binder binder)

binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig("historical"));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);
binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class);
Jerseys.addResource(binder, HistoricalResource.class);
LifecycleModule.register(binder, QueryResource.class);
Expand Down
2 changes: 2 additions & 0 deletions services/src/main/java/io/druid/cli/CliPeon.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierConfig;
import io.druid.segment.realtime.plumber.CoordinatorBasedSegmentHandoffNotifierFactory;
import io.druid.segment.realtime.plumber.SegmentHandoffNotifierFactory;
import io.druid.server.metrics.QueryCountStatsProvider;
import io.druid.server.QueryResource;
import io.druid.server.initialization.jetty.ChatHandlerServerModule;
import io.druid.server.initialization.jetty.JettyServerInitializer;
Expand Down Expand Up @@ -201,6 +202,7 @@ public void configure(Binder binder)
binder.bind(CoordinatorClient.class).in(LazySingleton.class);

binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class);
binder.bind(QueryCountStatsProvider.class).to(QueryResource.class).in(LazySingleton.class);
Jerseys.addResource(binder, QueryResource.class);
LifecycleModule.register(binder, QueryResource.class);
binder.bind(NodeTypeConfig.class).toInstance(new NodeTypeConfig(nodeType));
Expand Down
Loading

0 comments on commit 35160e5

Please sign in to comment.