Skip to content

Commit

Permalink
Merge pull request apache#1765 from metamx/brokerCpuMetrics
Browse files Browse the repository at this point in the history
Add CPUTimeMetricQueryRunner to ClientQuerySegmentWalker
  • Loading branch information
xvrl committed Oct 5, 2015
2 parents 166c4fc + 2bc4aed commit d8a1dd8
Showing 1 changed file with 39 additions and 22 deletions.
61 changes: 39 additions & 22 deletions server/src/main/java/io/druid/server/ClientQuerySegmentWalker.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,14 @@

package io.druid.server;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachingClusteredClient;
import io.druid.query.CPUTimeMetricQueryRunner;
import io.druid.query.FinalizeResultsQueryRunner;
import io.druid.query.PostProcessingOperator;
import io.druid.query.Query;
Expand All @@ -28,16 +35,12 @@
import io.druid.query.RetryQueryRunner;
import io.druid.query.RetryQueryRunnerConfig;
import io.druid.query.SegmentDescriptor;

import io.druid.query.UnionQueryRunner;
import java.util.Map;

import org.joda.time.Interval;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.inject.Inject;
import com.metamx.emitter.service.ServiceEmitter;
import javax.annotation.Nullable;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

/**
*/
Expand Down Expand Up @@ -80,23 +83,37 @@ public <T> QueryRunner<T> getQueryRunnerForSegments(Query<T> query, Iterable<Seg
private <T> QueryRunner<T> makeRunner(final Query<T> query)
{
final QueryToolChest<T, Query<T>> toolChest = warehouse.getToolChest(query);
final FinalizeResultsQueryRunner<T> baseRunner = new FinalizeResultsQueryRunner<T>(
toolChest.postMergeQueryDecoration(
toolChest.mergeResults(
new UnionQueryRunner<T>(
toolChest.preMergeQueryDecoration(
new RetryQueryRunner<T>(
baseClient,
toolChest,
retryConfig,
objectMapper
)
),
toolChest
final QueryRunner<T> baseRunner = CPUTimeMetricQueryRunner.safeBuild(
new FinalizeResultsQueryRunner<T>(
toolChest.postMergeQueryDecoration(
toolChest.mergeResults(
new UnionQueryRunner<T>(
toolChest.preMergeQueryDecoration(
new RetryQueryRunner<T>(
baseClient,
toolChest,
retryConfig,
objectMapper
)
),
toolChest
)
)
)
),
toolChest
),
toolChest
new Function<Query<T>, ServiceMetricEvent.Builder>()
{
@Nullable
@Override
public ServiceMetricEvent.Builder apply(Query<T> tQuery)
{
return toolChest.makeMetricBuilder(tQuery);
}
},
emitter,
new AtomicLong(0L),
true
);

final Map<String, Object> context = query.getContext();
Expand Down

0 comments on commit d8a1dd8

Please sign in to comment.