Skip to content

Commit

Permalink
CombiningSequence: Delay making next yielder on creation until it is …
Browse files Browse the repository at this point in the history
…actually asked for. (apache#2892)

This fixes the behavior of limited combining sequences (otherwise limit = 1 would
actually yield 2 elements).
  • Loading branch information
gianm authored and xvrl committed Apr 29, 2016
1 parent 2203a81 commit 488d12d
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 25 deletions.
14 changes: 10 additions & 4 deletions common/src/main/java/io/druid/common/guava/CombiningSequence.java
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public <OutType> Yielder<OutType> toYielder(OutType initValue, final YieldingAcc
}

public <OutType> Yielder<OutType> makeYielder(
Yielder<T> yielder,
final Yielder<T> yielder,
final CombiningYieldingAccumulator<OutType, T> combiningAccumulator,
boolean finalValue
)
Expand All @@ -92,7 +92,7 @@ public <OutType> Yielder<OutType> makeYielder(

if(!yielder.isDone()) {
retVal = combiningAccumulator.getRetVal();
finalYielder = yielder.next(yielder.get());
finalYielder = null;
finalFinalValue = false;
} else {
if(!finalValue && combiningAccumulator.accumulatedSomething()) {
Expand Down Expand Up @@ -124,7 +124,11 @@ public OutType get()
public Yielder<OutType> next(OutType initValue)
{
combiningAccumulator.reset();
return makeYielder(finalYielder, combiningAccumulator, finalFinalValue);
return makeYielder(
finalYielder == null ? yielder.next(yielder.get()) : finalYielder,
combiningAccumulator,
finalFinalValue
);
}

@Override
Expand All @@ -136,7 +140,9 @@ public boolean isDone()
@Override
public void close() throws IOException
{
finalYielder.close();
if (finalYielder != null) {
finalYielder.close();
}
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.common.guava;

import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Ordering;
Expand Down Expand Up @@ -47,7 +48,7 @@ public class CombiningSequenceTest
@Parameterized.Parameters
public static Collection<Object[]> valuesToTry()
{
return Arrays.asList(new Object[][] {
return Arrays.asList(new Object[][]{
{1}, {2}, {3}, {4}, {5}, {1000}
});
}
Expand Down Expand Up @@ -194,27 +195,48 @@ public void testNothing() throws Exception
private void testCombining(List<Pair<Integer, Integer>> pairs, List<Pair<Integer, Integer>> expected)
throws IOException
{
Sequence<Pair<Integer, Integer>> seq = new CombiningSequence<Pair<Integer, Integer>>(
Sequences.simple(pairs),
Ordering.natural().onResultOf(Pair.<Integer, Integer>lhsFn()),
new BinaryFn<Pair<Integer, Integer>, Pair<Integer, Integer>, Pair<Integer, Integer>>()
{
@Override
public Pair<Integer, Integer> apply(
Pair<Integer, Integer> lhs, Pair<Integer, Integer> rhs
)
{
if (lhs == null) {
return rhs;
}
for (int limit = 0; limit < expected.size() + 1; limit++) {
// limit = 0 doesn't work properly; it returns 1 element
final int expectedLimit = limit == 0 ? 1 : limit;

testCombining(
pairs,
Lists.newArrayList(Iterables.limit(expected, expectedLimit)),
limit
);
}
}

if (rhs == null) {
return lhs;
private void testCombining(
List<Pair<Integer, Integer>> pairs,
List<Pair<Integer, Integer>> expected,
int limit
) throws IOException
{
Sequence<Pair<Integer, Integer>> seq = Sequences.limit(
new CombiningSequence<>(
Sequences.simple(pairs),
Ordering.natural().onResultOf(Pair.<Integer, Integer>lhsFn()),
new BinaryFn<Pair<Integer, Integer>, Pair<Integer, Integer>, Pair<Integer, Integer>>()
{
@Override
public Pair<Integer, Integer> apply(
Pair<Integer, Integer> lhs, Pair<Integer, Integer> rhs
)
{
if (lhs == null) {
return rhs;
}

if (rhs == null) {
return lhs;
}

return Pair.of(lhs.lhs, lhs.rhs + rhs.rhs);
}
}

return Pair.of(lhs.lhs, lhs.rhs + rhs.rhs);
}
}
),
limit
);

List<Pair<Integer, Integer>> merged = Sequences.toList(seq, Lists.<Pair<Integer, Integer>>newArrayList());
Expand All @@ -233,7 +255,9 @@ public Pair<Integer, Integer> accumulate(
)
{
count++;
if(count % yieldEvery == 0) yield();
if (count % yieldEvery == 0) {
yield();
}
return rhs;
}
}
Expand Down

0 comments on commit 488d12d

Please sign in to comment.