Skip to content

Commit

Permalink
Merge pull request apache#490 from metamx/lazy-cache-population
Browse files Browse the repository at this point in the history
Broker populates cache lazily
  • Loading branch information
fjy committed Apr 17, 2014
2 parents d0e6568 + 6a52112 commit aba31ea
Showing 1 changed file with 32 additions and 13 deletions.
45 changes: 32 additions & 13 deletions server/src/main/java/io/druid/client/CachingClusteredClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.Ordering;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.inject.Inject;
import com.metamx.common.Pair;
Expand Down Expand Up @@ -332,26 +333,30 @@ private void addSequencesFromServer(ArrayList<Pair<DateTime, Sequence<T>>> listO
clientQueryable.run(rewrittenQuery.withQuerySegmentSpec(segmentSpec)),
new Function<Object, Sequence<T>>()
{
private final Function<T, Object> prepareForCache = strategy.prepareForCache();
private final Function<T, Object> cacheFn = strategy.prepareForCache();

@Override
public Sequence<T> apply(Object input)
{
Result<Object> result = (Result<Object>) input;
final BySegmentResultValueClass<T> value = (BySegmentResultValueClass<T>) result.getValue();
String segmentIdentifier = value.getSegmentId();
final Iterable<T> segmentResults = value.getResults();

CachePopulator cachePopulator = cachePopulatorMap.get(
String.format("%s_%s", segmentIdentifier, value.getInterval())
);
if (cachePopulator != null) {
cachePopulator.populate(Iterables.transform(segmentResults, prepareForCache));
}
final List<Object> cacheData = Lists.newLinkedList();

return Sequences.simple(
Iterables.transform(
segmentResults,
return Sequences.withEffect(
Sequences.map(
Sequences.map(
Sequences.simple(value.getResults()),
new Function<T, T>()
{
@Override
public T apply(T input)
{
cacheData.add(cacheFn.apply(input));
return input;
}
}
),
toolChest.makeMetricManipulatorFn(
rewrittenQuery,
new MetricManipulationFn()
Expand All @@ -363,7 +368,21 @@ public Object manipulate(AggregatorFactory factory, Object object)
}
}
)
)
),
new Runnable()
{
@Override
public void run()
{
CachePopulator cachePopulator = cachePopulatorMap.get(
String.format("%s_%s", value.getSegmentId(), value.getInterval())
);
if (cachePopulator != null) {
cachePopulator.populate(cacheData);
}
}
},
MoreExecutors.sameThreadExecutor()
);
}
}
Expand Down

0 comments on commit aba31ea

Please sign in to comment.