Skip to content

Commit

Permalink
Merge pull request apache#458 from metamx/stream-cache
Browse files Browse the repository at this point in the history
use and populate cache at compute node level
  • Loading branch information
xvrl committed Apr 9, 2014
2 parents e867c93 + 19d6aec commit c05f169
Show file tree
Hide file tree
Showing 6 changed files with 461 additions and 287 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<metamx.java-util.version>0.25.3</metamx.java-util.version>
<metamx.java-util.version>0.25.4</metamx.java-util.version>
<apache.curator.version>2.4.0</apache.curator.version>
<druid.api.version>0.1.11</druid.api.version>
</properties>
Expand Down

This file was deleted.

164 changes: 164 additions & 0 deletions server/src/main/java/io/druid/client/CachingQueryRunner.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Druid - a distributed column store.
* Copyright (C) 2014 Metamarkets Group Inc.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
*/

package io.druid.client;

import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Function;
import com.google.common.base.Throwables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.MoreExecutors;
import com.metamx.common.guava.BaseSequence;
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.query.CacheStrategy;
import io.druid.query.Query;
import io.druid.query.QueryRunner;
import io.druid.query.QueryToolChest;
import io.druid.query.SegmentDescriptor;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

public class CachingQueryRunner<T> implements QueryRunner<T>
{

private final String segmentIdentifier;
private final SegmentDescriptor segmentDescriptor;
private final QueryRunner<T> base;
private final QueryToolChest toolChest;
private final Cache cache;
private final ObjectMapper mapper;
private final CacheConfig cacheConfig;

public CachingQueryRunner(
String segmentIdentifier,
SegmentDescriptor segmentDescriptor,
ObjectMapper mapper,
Cache cache,
QueryToolChest toolchest,
QueryRunner<T> base,
CacheConfig cacheConfig
)
{
this.base = base;
this.segmentIdentifier = segmentIdentifier;
this.segmentDescriptor = segmentDescriptor;
this.toolChest = toolchest;
this.cache = cache;
this.mapper = mapper;
this.cacheConfig = cacheConfig;
}

@Override
public Sequence<T> run(Query<T> query)
{
final CacheStrategy strategy = toolChest.getCacheStrategy(query);

final boolean populateCache = query.getContextPopulateCache(true)
&& strategy != null
&& cacheConfig.isPopulateCache();

final boolean useCache = query.getContextUseCache(true)
&& strategy != null
&& cacheConfig.isPopulateCache();

final Cache.NamedKey key = CacheUtil.computeSegmentCacheKey(
segmentIdentifier,
segmentDescriptor,
strategy.computeCacheKey(query)
);

if(useCache) {
final Function cacheFn = strategy.pullFromCache();
final byte[] cachedResult = cache.get(key);
if(cachedResult != null) {
final TypeReference cacheObjectClazz = strategy.getCacheObjectClazz();

return Sequences.map(
new BaseSequence<>(
new BaseSequence.IteratorMaker<T, Iterator<T>>()
{
@Override
public Iterator<T> make()
{
try {
if (cachedResult.length == 0) {
return Iterators.emptyIterator();
}

return mapper.readValues(
mapper.getFactory().createParser(cachedResult),
cacheObjectClazz
);
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}

@Override
public void cleanup(Iterator<T> iterFromMake)
{
}
}
),
cacheFn
);
}
}

if (populateCache) {
final Function cacheFn = strategy.prepareForCache();
final List<Object> cacheResults = Lists.newLinkedList();

return Sequences.withEffect(
Sequences.map(
base.run(query),
new Function<T, T>()
{
@Override
public T apply(T input)
{
cacheResults.add(cacheFn.apply(input));
return input;
}
}
),
new Runnable()
{
@Override
public void run()
{
CacheUtil.populate(cache, mapper, key, cacheResults);
}
},
MoreExecutors.sameThreadExecutor()
);
} else {
return base.run(query);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import com.metamx.emitter.EmittingLogger;
import com.metamx.emitter.service.ServiceEmitter;
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.client.CachePopulatingQueryRunner;
import io.druid.client.CachingQueryRunner;
import io.druid.client.cache.Cache;
import io.druid.client.cache.CacheConfig;
import io.druid.collections.CountingMap;
Expand Down Expand Up @@ -410,7 +410,7 @@ public ServiceMetricEvent.Builder apply(@Nullable final Query<T> input)
new BySegmentQueryRunner<T>(
adapter.getIdentifier(),
adapter.getDataInterval().getStart(),
new CachePopulatingQueryRunner<T>(
new CachingQueryRunner<T>(
adapter.getIdentifier(),
segmentDescriptor,
objectMapper,
Expand All @@ -424,4 +424,4 @@ public ServiceMetricEvent.Builder apply(@Nullable final Query<T> input)
segmentSpec
);
}
}
}
Loading

0 comments on commit c05f169

Please sign in to comment.