diff --git a/common/src/main/java/io/druid/common/guava/CombiningSequence.java b/common/src/main/java/io/druid/common/guava/CombiningSequence.java index ceb714e43aec..6ff60ca5b42a 100644 --- a/common/src/main/java/io/druid/common/guava/CombiningSequence.java +++ b/common/src/main/java/io/druid/common/guava/CombiningSequence.java @@ -19,7 +19,6 @@ package io.druid.common.guava; -import com.google.common.base.Function; import com.google.common.collect.Ordering; import com.metamx.common.guava.Accumulator; import com.metamx.common.guava.Sequence; @@ -81,7 +80,7 @@ public Yielder toYielder(OutType initValue, final YieldingAcc return makeYielder(baseYielder, combiningAccumulator, false); } - public Yielder makeYielder( + public Yielder makeYielder( Yielder yielder, final CombiningYieldingAccumulator combiningAccumulator, boolean finalValue @@ -102,13 +101,13 @@ public Yielder makeYielder( finalFinalValue = true; if(!combiningAccumulator.yielded()) { - return Yielders.done(null, yielder); + return Yielders.done(retVal, yielder); } else { finalYielder = Yielders.done(null, yielder); } } else { - return Yielders.done(null, yielder); + return Yielders.done(combiningAccumulator.getRetVal(), yielder); } } @@ -124,6 +123,7 @@ public OutType get() @Override public Yielder next(OutType initValue) { + combiningAccumulator.reset(); return makeYielder(finalYielder, combiningAccumulator, finalFinalValue); } diff --git a/common/src/test/java/io/druid/common/guava/ComplexSequenceTest.java b/common/src/test/java/io/druid/common/guava/ComplexSequenceTest.java new file mode 100644 index 000000000000..7848eb239310 --- /dev/null +++ b/common/src/test/java/io/druid/common/guava/ComplexSequenceTest.java @@ -0,0 +1,116 @@ +/* + * Licensed to Metamarkets Group Inc. (Metamarkets) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. Metamarkets licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package io.druid.common.guava; + +import com.google.common.collect.Ordering; +import com.google.common.primitives.Ints; +import com.metamx.common.guava.Sequence; +import com.metamx.common.guava.Sequences; +import com.metamx.common.guava.Yielder; +import com.metamx.common.guava.YieldingAccumulator; +import com.metamx.common.guava.nary.BinaryFn; +import org.junit.Assert; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class ComplexSequenceTest +{ + @Test + public void testComplexSequence() + { + Sequence complex; + check("[3, 5]", complex = concat(combine(simple(3)), combine(simple(5)))); + check("[8]", complex = combine(complex)); + check("[8, 6, 3, 5]", complex = concat(complex, concat(combine(simple(2, 4)), simple(3, 5)))); + check("[22]", complex = combine(complex)); + check("[22]", concat(complex, simple())); + } + + private void check(String expected, Sequence complex) + { + List combined = Sequences.toList(complex, new ArrayList()); + Assert.assertEquals(expected, combined.toString()); + + Yielder yielder = complex.toYielder( + null, + new YieldingAccumulator() + { + @Override + public Integer accumulate(Integer accumulated, Integer in) + { + yield(); + return in; + } + } + ); + + List combinedByYielder = new ArrayList<>(); + while (!yielder.isDone()) { + combinedByYielder.add(yielder.get()); + yielder = yielder.next(null); + } + + Assert.assertEquals(expected, combinedByYielder.toString()); + } + + private Sequence simple(int... values) + { + return Sequences.simple(Ints.asList(values)); + } + + private Sequence combine(Sequence sequence) + { + return CombiningSequence.create(sequence, alwaysSame, plus); + } + + private Sequence concat(Sequence... sequences) + { + return Sequences.concat(Arrays.asList(sequences)); + } + + private final Ordering alwaysSame = new Ordering() + { + @Override + public int compare(Integer left, Integer right) + { + return 0; + } + }; + + private final BinaryFn plus = new BinaryFn() + { + @Override + public Integer apply(Integer arg1, Integer arg2) + { + if (arg1 == null) { + return arg2; + } + + if (arg2 == null) { + return arg1; + } + + return arg1 + arg2; + } + }; +}