Skip to content

Commit

Permalink
Disable caching on brokers for groupBy v2 (apache#3950)
Browse files Browse the repository at this point in the history
* Disable caching on brokers for groupBy v2

* Rename parameter

* address comments
  • Loading branch information
jihoonson authored and fjy committed Feb 21, 2017
1 parent bc33b68 commit 128274c
Show file tree
Hide file tree
Showing 15 changed files with 159 additions and 21 deletions.
13 changes: 13 additions & 0 deletions processing/src/main/java/io/druid/query/CacheStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,23 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Function;

import java.util.concurrent.ExecutorService;

/**
*/
public interface CacheStrategy<T, CacheType, QueryType extends Query<T>>
{
/**
* Returns the given query is cacheable or not.
* The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node.
*
* @param query the query to be cached
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be
* called on the cached by-segment results
* @return true if the query is cacheable, otherwise false.
*/
boolean isCacheable(QueryType query, boolean willMergeRunners);

/**
* Computes the cache key for the given query
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,11 @@ public CacheStrategy<Row, Object, GroupByQuery> getCacheStrategy(final GroupByQu
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();
private final List<DimensionSpec> dims = query.getDimensions();

@Override
public boolean isCacheable(GroupByQuery query, boolean willMergeRunners)
{
return strategySelector.strategize(query).isCacheable(willMergeRunners);
}

@Override
public byte[] computeCacheKey(GroupByQuery query)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,26 @@
import io.druid.data.input.Row;
import io.druid.java.util.common.guava.Sequence;
import io.druid.query.QueryRunner;
import io.druid.query.QueryRunnerFactory;
import io.druid.query.groupby.GroupByQuery;
import io.druid.segment.StorageAdapter;

import java.util.Map;
import java.util.concurrent.ExecutorService;

public interface GroupByStrategy
{

/**
* Indicates this strategy is cacheable or not.
* The {@code willMergeRunners} parameter can be used for distinguishing the caller is a broker or a data node.
*
* @param willMergeRunners indicates that {@link QueryRunnerFactory#mergeRunners(ExecutorService, Iterable)} will be
* called on the cached by-segment results
* @return true if this strategy is cacheable, otherwise false.
*/
boolean isCacheable(boolean willMergeRunners);

Sequence<Row> mergeResults(
QueryRunner<Row> baseRunner,
GroupByQuery query,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ public GroupByStrategyV1(
this.bufferPool = bufferPool;
}

@Override
public boolean isCacheable(boolean willMergeRunners)
{
return true;
}

@Override
public Sequence<Row> mergeResults(
final QueryRunner<Row> baseRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,12 @@ public static DateTime getUniversalTimestamp(final GroupByQuery query)
}
}

@Override
public boolean isCacheable(boolean willMergeRunners)
{
return willMergeRunners;
}

@Override
public Sequence<Row> mergeResults(
final QueryRunner<Row> baseRunner,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ public CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery> get
{
return new CacheStrategy<SegmentAnalysis, SegmentAnalysis, SegmentMetadataQuery>()
{
@Override
public boolean isCacheable(SegmentMetadataQuery query, boolean willMergeRunners)
{
return true;
}

@Override
public byte[] computeCacheKey(SegmentMetadataQuery query)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,12 @@ public String apply(DimensionSpec input) {
:
Collections.<String>emptyList();

@Override
public boolean isCacheable(SearchQuery query, boolean willMergeRunners)
{
return true;
}

@Override
public byte[] computeCacheKey(SearchQuery query)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,12 @@ public String apply(DimensionSpec input) {
:
Collections.<String>emptyList();

@Override
public boolean isCacheable(SelectQuery query, boolean willMergeRunners)
{
return true;
}

@Override
public byte[] computeCacheKey(SelectQuery query)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ public CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery>
{
return new CacheStrategy<Result<TimeBoundaryResultValue>, Object, TimeBoundaryQuery>()
{
@Override
public boolean isCacheable(TimeBoundaryQuery query, boolean willMergeRunners)
{
return true;
}

@Override
public byte[] computeCacheKey(TimeBoundaryQuery query)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,12 @@ public CacheStrategy<Result<TimeseriesResultValue>, Object, TimeseriesQuery> get
{
private final List<AggregatorFactory> aggs = query.getAggregatorSpecs();

@Override
public boolean isCacheable(TimeseriesQuery query, boolean willMergeRunners)
{
return true;
}

@Override
public byte[] computeCacheKey(TimeseriesQuery query)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,12 @@ public CacheStrategy<Result<TopNResultValue>, Object, TopNQuery> getCacheStrateg
.getMetricName(query.getDimensionSpec())
);

@Override
public boolean isCacheable(TopNQuery query, boolean willMergeRunners)
{
return true;
}

@Override
public byte[] computeCacheKey(TopNQuery query)
{
Expand Down
64 changes: 64 additions & 0 deletions server/src/main/java/io/druid/client/CacheUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.SegmentDescriptor;
import org.joda.time.Interval;

Expand Down Expand Up @@ -77,4 +81,64 @@ public static void populate(Cache cache, ObjectMapper mapper, Cache.NamedKey key
}
}

public static <T> boolean useCacheOnBrokers(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false);
}

public static <T> boolean populateCacheOnBrokers(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, false);
}

public static <T> boolean useCacheOnDataNodes(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return useCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true);
}

public static <T> boolean populateCacheOnDataNodes(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return populateCache(query, strategy, cacheConfig) && strategy.isCacheable(query, true);
}

private static <T> boolean useCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return BaseQuery.getContextUseCache(query, true)
&& strategy != null
&& cacheConfig.isUseCache()
&& cacheConfig.isQueryCacheable(query);
}

private static <T> boolean populateCache(
Query<T> query,
CacheStrategy<T, Object, Query<T>> strategy,
CacheConfig cacheConfig
)
{
return BaseQuery.getContextPopulateCache(query, true)
&& strategy != null
&& cacheConfig.isPopulateCache()
&& cacheConfig.isQueryCacheable(query);
}

}
18 changes: 9 additions & 9 deletions server/src/main/java/io/druid/client/CachingClusteredClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,13 @@ public CachingClusteredClient(
this.cacheConfig = cacheConfig;
this.backgroundExecutorService = MoreExecutors.listeningDecorator(backgroundExecutorService);

if (cacheConfig.isQueryCacheable(Query.GROUP_BY)) {
log.warn(
"Even though groupBy caching is enabled, v2 groupBys will not be cached. "
+ "Consider disabling cache on your broker and enabling it on your data nodes to enable v2 groupBy caching."
);
}

serverView.registerSegmentCallback(
Execs.singleThreaded("CCClient-ServerView-CB-%d"),
new ServerView.BaseSegmentCallback()
Expand All @@ -137,17 +144,10 @@ public Sequence<T> run(final Query<T> query, final Map<String, Object> responseC
final List<Pair<Interval, byte[]>> cachedResults = Lists.newArrayList();
final Map<String, CachePopulator> cachePopulatorMap = Maps.newHashMap();

final boolean useCache = BaseQuery.getContextUseCache(query, true)
&& strategy != null
&& cacheConfig.isUseCache()
&& cacheConfig.isQueryCacheable(query);
final boolean populateCache = BaseQuery.getContextPopulateCache(query, true)
&& strategy != null
&& cacheConfig.isPopulateCache()
&& cacheConfig.isQueryCacheable(query);
final boolean useCache = CacheUtil.useCacheOnBrokers(query, strategy, cacheConfig);
final boolean populateCache = CacheUtil.populateCacheOnBrokers(query, strategy, cacheConfig);
final boolean isBySegment = BaseQuery.getContextBySegment(query, false);


final ImmutableMap.Builder<String, Object> contextBuilder = new ImmutableMap.Builder<>();

final int priority = BaseQuery.getContextPriority(query, 0);
Expand Down
13 changes: 2 additions & 11 deletions server/src/main/java/io/druid/client/CachingQueryRunner.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import io.druid.java.util.common.guava.Sequence;
import io.druid.java.util.common.guava.Sequences;
import io.druid.java.util.common.logger.Logger;
import io.druid.query.BaseQuery;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
Expand Down Expand Up @@ -87,16 +86,8 @@ public CachingQueryRunner(
public Sequence<T> run(Query<T> query, Map<String, Object> responseContext)
{
final CacheStrategy strategy = toolChest.getCacheStrategy(query);

final boolean populateCache = BaseQuery.getContextPopulateCache(query, true)
&& strategy != null
&& cacheConfig.isPopulateCache()
&& cacheConfig.isQueryCacheable(query);

final boolean useCache = BaseQuery.getContextUseCache(query, true)
&& strategy != null
&& cacheConfig.isUseCache()
&& cacheConfig.isQueryCacheable(query);
final boolean populateCache = CacheUtil.populateCacheOnDataNodes(query, strategy, cacheConfig);
final boolean useCache = CacheUtil.useCacheOnDataNodes(query, strategy, cacheConfig);

final Cache.NamedKey key;
if (strategy != null && (useCache || populateCache)) {
Expand Down
6 changes: 5 additions & 1 deletion server/src/main/java/io/druid/client/cache/CacheConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ public int getCacheBulkMergeLimit()

public boolean isQueryCacheable(Query query)
{
return isQueryCacheable(query.getType());
}

public boolean isQueryCacheable(String queryType) {
// O(n) impl, but I don't think we'll ever have a million query types here
return !unCacheable.contains(query.getType());
return !unCacheable.contains(queryType);
}
}

0 comments on commit 128274c

Please sign in to comment.