Skip to content

Commit

Permalink
Refactor CachingClusteredClient.run() (apache#4489)
Browse files Browse the repository at this point in the history
* Refactor CachingClusteredClient

* Comments

* Refactoring

* Readability fixes
  • Loading branch information
leventov authored and jihoonson committed Jul 23, 2017
1 parent b154ff0 commit 7408a7c
Show file tree
Hide file tree
Showing 22 changed files with 488 additions and 449 deletions.
6 changes: 4 additions & 2 deletions common/src/main/java/io/druid/timeline/TimelineLookup.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.druid.timeline.partition.PartitionHolder;
import org.joda.time.Interval;

import java.util.List;


public interface TimelineLookup<VersionType, ObjectType>
{
Expand All @@ -35,7 +37,7 @@ public interface TimelineLookup<VersionType, ObjectType>
* @return Holders representing the interval that the objects exist for, PartitionHolders
* are guaranteed to be complete. Holders returned sorted by the interval.
*/
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval interval);
public List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval interval);

/**
* Does a lookup for the objects representing the given time interval. Will also return
Expand All @@ -46,7 +48,7 @@ public interface TimelineLookup<VersionType, ObjectType>
* @return Holders representing the interval that the objects exist for, PartitionHolders
* can be incomplete. Holders returned sorted by the interval.
*/
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval);
public List<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval);

public PartitionHolder<ObjectType> findEntry(Interval interval, VersionType version);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public List<TimelineObjectHolder<VersionType, ObjectType>> lookup(Interval inter
}

@Override
public Iterable<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval)
public List<TimelineObjectHolder<VersionType, ObjectType>> lookupWithIncompletePartitions(Interval interval)
{
try {
lock.readLock().lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,18 @@

package io.druid.java.util.common.guava;

import com.google.common.base.Function;
import java.util.function.Function;

/**
*/
public class MappedSequence<T, Out> implements Sequence<Out>
{
private final Sequence<T> baseSequence;
private final Function<T, Out> fn;
private final Function<? super T, ? extends Out> fn;

public MappedSequence(
Sequence<T> baseSequence,
Function<T, Out> fn
Function<? super T, ? extends Out> fn
)
{
this.baseSequence = baseSequence;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,16 @@

package io.druid.java.util.common.guava;

import com.google.common.base.Function;
import java.util.function.Function;

/**
*/
public class MappingAccumulator<OutType, InType, MappedType> implements Accumulator<OutType, InType>
{
private final Function<InType, MappedType> fn;
private final Function<? super InType, ? extends MappedType> fn;
private final Accumulator<OutType, MappedType> accumulator;

public MappingAccumulator(Function<InType, MappedType> fn, Accumulator<OutType, MappedType> accumulator)
MappingAccumulator(Function<? super InType, ? extends MappedType> fn, Accumulator<OutType, MappedType> accumulator)
{
this.fn = fn;
this.accumulator = accumulator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@

package io.druid.java.util.common.guava;

import com.google.common.base.Function;
import java.util.function.Function;

/**
*/
public class MappingYieldingAccumulator<OutType, InType, MappedType> extends YieldingAccumulator<OutType, InType>
{
private final Function<InType, MappedType> fn;
private final Function<? super InType, ? extends MappedType> fn;
private final YieldingAccumulator<OutType, MappedType> baseAccumulator;

public MappingYieldingAccumulator(
Function<InType, MappedType> fn,
Function<? super InType, ? extends MappedType> fn,
YieldingAccumulator<OutType, MappedType> baseAccumulator
)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@
*/
public class MergeSequence<T> extends YieldingSequenceBase<T>
{
private final Ordering<T> ordering;
private final Sequence<Sequence<T>> baseSequences;
private final Ordering<? super T> ordering;
private final Sequence<? extends Sequence<T>> baseSequences;

public MergeSequence(
Ordering<T> ordering,
Sequence<Sequence<T>> baseSequences
Ordering<? super T> ordering,
Sequence<? extends Sequence<? extends T>> baseSequences
)
{
this.ordering = ordering;
this.baseSequences = baseSequences;
this.baseSequences = (Sequence<? extends Sequence<T>>) baseSequences;
}

@Override
Expand All @@ -62,37 +62,32 @@ public T apply(Yielder<T> input)

pQueue = baseSequences.accumulate(
pQueue,
new Accumulator<PriorityQueue<Yielder<T>>, Sequence<T>>()
{
@Override
public PriorityQueue<Yielder<T>> accumulate(PriorityQueue<Yielder<T>> queue, Sequence<T> in)
{
final Yielder<T> yielder = in.toYielder(
null,
new YieldingAccumulator<T, T>()
(queue, in) -> {
final Yielder<T> yielder = in.toYielder(
null,
new YieldingAccumulator<T, T>()
{
@Override
public T accumulate(T accumulated, T in)
{
@Override
public T accumulate(T accumulated, T in)
{
yield();
return in;
}
yield();
return in;
}
);

if (!yielder.isDone()) {
queue.add(yielder);
} else {
try {
yielder.close();
}
catch (IOException e) {
throw Throwables.propagate(e);
}
}
);

return queue;
if (!yielder.isDone()) {
queue.add(yielder);
} else {
try {
yielder.close();
}
catch (IOException e) {
throw new RuntimeException(e);
}
}

return queue;
}
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

package io.druid.java.util.common.guava;

import com.google.common.collect.Ordering;

import java.util.concurrent.Executor;
import java.util.function.Function;

/**
* A Sequence represents an iterable sequence of elements. Unlike normal Iterators however, it doesn't expose
* a way for you to extract values from it, instead you provide it with a worker (an Accumulator) and that defines
Expand Down Expand Up @@ -57,4 +62,22 @@ public interface Sequence<T>
* @see Yielder
*/
<OutType> Yielder<OutType> toYielder(OutType initValue, YieldingAccumulator<OutType, T> accumulator);

default <U> Sequence<U> map(Function<? super T, ? extends U> mapper)
{
return new MappedSequence<>(this, mapper);
}

default <R> Sequence<R> flatMerge(
Function<? super T, ? extends Sequence<? extends R>> mapper,
Ordering<? super R> ordering
)
{
return new MergeSequence<>(ordering, this.map(mapper));
}

default Sequence<T> withEffect(Runnable effect, Executor effectExecutor)
{
return Sequences.withEffect(this, effect, effectExecutor);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ public static <T> Sequence<T> concat(Sequence<Sequence<T>> sequences)
return new ConcatSequence<>(sequences);
}

public static <From, To> Sequence<To> map(Sequence<From> sequence, Function<From, To> fn)
public static <From, To> Sequence<To> map(Sequence<From> sequence, Function<? super From, ? extends To> fn)
{
return new MappedSequence<>(sequence, fn);
return new MappedSequence<>(sequence, fn::apply);
}

public static <T> Sequence<T> filter(Sequence<T> sequence, Predicate<T> pred)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@

package io.druid.java.util.common.guava;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import io.druid.java.util.common.StringUtils;
import org.junit.Test;

import java.util.List;
import java.util.function.Function;

/**
*/
Expand All @@ -51,7 +51,7 @@ public Integer apply(Integer input)
SequenceTestHelper.testAll(
StringUtils.format("Run %,d: ", i),
new MappedSequence<>(Sequences.simple(vals), fn),
Lists.transform(vals, fn)
Lists.transform(vals, fn::apply)
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import org.joda.time.Interval;

import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;

/**
*/
Expand Down Expand Up @@ -64,6 +66,12 @@ public Interval getInterval()
return interval;
}

public <U> BySegmentResultValueClass<U> mapResults(Function<? super T, ? extends U> mapper)
{
List<U> mappedResults = results.stream().map(mapper).collect(Collectors.toList());
return new BySegmentResultValueClass<>(mappedResults, segmentId, interval);
}

@Override
public String toString()
{
Expand Down
2 changes: 1 addition & 1 deletion processing/src/main/java/io/druid/query/QueryPlus.java
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public QueryPlus<T> withQuerySegmentSpec(QuerySegmentSpec spec)
/**
* Returns a QueryPlus object with {@link QueryMetrics} from this QueryPlus object, and the provided {@link Query}.
*/
public QueryPlus<T> withQuery(Query<T> replacementQuery)
public <U> QueryPlus<U> withQuery(Query<U> replacementQuery)
{
return new QueryPlus<>(replacementQuery, queryMetrics);
}
Expand Down
2 changes: 2 additions & 0 deletions processing/src/main/java/io/druid/query/QueryToolChest.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import io.druid.query.aggregation.MetricManipulationFn;
import io.druid.timeline.LogicalSegment;

import javax.annotation.Nullable;
import java.util.List;

/**
Expand Down Expand Up @@ -122,6 +123,7 @@ public Function<ResultType, ResultType> makePostComputeManipulatorFn(QueryType q
*
* @return A CacheStrategy that can be used to populate and read from the Cache
*/
@Nullable
public <T> CacheStrategy<ResultType, T, QueryType> getCacheStrategy(QueryType query)
{
return null;
Expand Down
7 changes: 7 additions & 0 deletions processing/src/main/java/io/druid/query/Result.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import org.joda.time.DateTime;

import java.util.function.Function;

/**
*/
public class Result<T> implements Comparable<Result<T>>
Expand All @@ -42,6 +44,11 @@ public Result(
this.value = value;
}

public <U> Result<U> map(Function<? super T, ? extends U> mapper)
{
return new Result<>(timestamp, mapper.apply(value));
}

@Override
public int compareTo(Result<T> tResult)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,9 @@ public static <T> Set<T> filterShards(DimFilter dimFilter, Iterable<T> input, Fu
if (dimFilter != null && shard != null) {
Map<String, Range<String>> domain = shard.getDomain();
for (Map.Entry<String, Range<String>> entry : domain.entrySet()) {
Optional<RangeSet<String>> optFilterRangeSet = dimensionRangeCache.get(entry.getKey());
if (optFilterRangeSet == null) {
RangeSet<String> filterRangeSet = dimFilter.getDimensionRangeSet(entry.getKey());
optFilterRangeSet = Optional.fromNullable(filterRangeSet);
dimensionRangeCache.put(entry.getKey(), optFilterRangeSet);
}
String dimension = entry.getKey();
Optional<RangeSet<String>> optFilterRangeSet = dimensionRangeCache
.computeIfAbsent(dimension, d-> Optional.fromNullable(dimFilter.getDimensionRangeSet(d)));
if (optFilterRangeSet.isPresent() && optFilterRangeSet.get().subRangeSet(entry.getValue()).isEmpty()) {
include = false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ private Sequence<Row> mergeGroupByResults(
makePreComputeManipulatorFn(
subquery,
MetricManipulatorFns.finalizing()
)
)::apply
);
} else {
finalizingResults = subqueryResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public Sequence<SegmentAnalysis> doRun(
makeOrdering(updatedQuery),
createMergeFn(updatedQuery)
),
MERGE_TRANSFORM_FN
MERGE_TRANSFORM_FN::apply
);
}

Expand Down
2 changes: 2 additions & 0 deletions server/src/main/java/io/druid/client/BrokerServerView.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.druid.timeline.VersionedIntervalTimeline;
import io.druid.timeline.partition.PartitionChunk;

import javax.annotation.Nullable;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -291,6 +292,7 @@ private void serverRemovedSegment(DruidServerMetadata server, DataSegment segmen
}


@Nullable
@Override
public VersionedIntervalTimeline<String, ServerSelector> getTimeline(DataSource dataSource)
{
Expand Down
Loading

0 comments on commit 7408a7c

Please sign in to comment.