Skip to content

Commit

Permalink
Allow overlapping intervals for the compaction task (apache#10912)
Browse files Browse the repository at this point in the history
* Allow overlapping intervals for the compaction task

* unused import

* line indentation

Co-authored-by: Maytas Monsereenusorn <[email protected]>
  • Loading branch information
jihoonson and maytasm authored Mar 23, 2021
1 parent 51d2c61 commit a041933
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,12 @@
import org.apache.druid.common.guava.SettableSupplier;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.JodaUtils;
import org.apache.druid.java.util.common.guava.Comparators;
import org.joda.time.Interval;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

/**
* Produce a stream of intervals generated by a given set of intervals as input and a given
Expand All @@ -51,19 +47,7 @@ public class IntervalsByGranularity
*/
public IntervalsByGranularity(Collection<Interval> intervals, Granularity granularity)
{
// eliminate dups, sort intervals:
Set<Interval> intervalSet = new HashSet<>(intervals);
List<Interval> inputIntervals = new ArrayList<>(intervals.size());
inputIntervals.addAll(intervalSet);
inputIntervals.sort(Comparators.intervalsByStartThenEnd());

// sanity check
if (JodaUtils.containOverlappingIntervals(inputIntervals)) {
throw new IAE("Intervals contain overlapping intervals [%s]", intervals);
}

// all good:
sortedNonOverlappingIntervals = inputIntervals;
this.sortedNonOverlappingIntervals = JodaUtils.condenseIntervals(intervals);
this.granularity = granularity;
}

Expand All @@ -73,9 +57,8 @@ public IntervalsByGranularity(Collection<Interval> intervals, Granularity granul
*/
public Iterator<Interval> granularityIntervalsIterator()
{
Iterator<Interval> ite;
if (sortedNonOverlappingIntervals.isEmpty()) {
ite = Collections.emptyIterator();
return Collections.emptyIterator();
} else {
// The filter after transform & concat is to remove duplicats.
// This can happen when condense left intervals that did not overlap but
Expand All @@ -85,7 +68,7 @@ public Iterator<Interval> granularityIntervalsIterator()
// intervals will be returned, both with the same value 2013-01-01T00:00:00.000Z/2013-02-01T00:00:00.000Z.
// Thus dups can be created given the right conditions....
final SettableSupplier<Interval> previous = new SettableSupplier<>();
ite = FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable)
return FluentIterable.from(sortedNonOverlappingIntervals).transformAndConcat(granularity::getIterable)
.filter(interval -> {
if (previous.get() != null && previous.get().equals(interval)) {
return false;
Expand All @@ -94,7 +77,5 @@ public Iterator<Interval> granularityIntervalsIterator()
return true;
}).iterator();
}
return ite;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@

import com.google.common.collect.ImmutableList;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.granularity.IntervalsByGranularity;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

import java.util.Collections;
import java.util.Iterator;
Expand All @@ -34,8 +36,8 @@

public class IntervalsByGranularityTest
{
private static final long SECONDS_IN_YEAR = 31536000;

@Rule
public ExpectedException expectedException = ExpectedException.none();

@Test
public void testTrivialIntervalExplosion()
Expand All @@ -46,17 +48,17 @@ public void testTrivialIntervalExplosion()

IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first, second, third),
Granularity.fromString("DAY")
Granularities.DAY
);

// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
long count = getCount(granularityIntervals);
Assert.assertTrue(count == 62 + 365);
long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
Assert.assertEquals(62 + 365, count);

granularityIntervals = intervals.granularityIntervalsIterator();
count = getCountWithNoHasNext(granularityIntervals);
Assert.assertTrue(count == 62 + 365);
Assert.assertEquals(62 + 365, count);
}


Expand All @@ -69,13 +71,13 @@ public void testDups()

IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first, second, third),
Granularity.fromString("DAY")
Granularities.DAY
);

// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
long count = getCount(granularityIntervals);
Assert.assertTrue(count == 61);
long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
Assert.assertEquals(61, count);
}


Expand All @@ -88,30 +90,37 @@ public void testCondenseForManyIntervals()
Interval first = Intervals.of("2012-01-01T00Z/P1Y");
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first),
Granularity.fromString("SECOND")
Granularities.SECOND
);
Assert.assertEquals(
ImmutableList.of(Intervals.of("2012-01-01T00Z/2013-01-01T00Z")),
ImmutableList.copyOf(JodaUtils.condensedIntervalsIterator(intervals.granularityIntervalsIterator()))
);
}

/**
* This test iterates huge intervals (2.5 years) with the SECOND granularity.
* The motivation behind this test is ensuring that IntervalsByGranularity can handle
* these huge intervals with a tiny granularity. However, this test takes a long time
* to populate all intervals based on the SECOND granularity (more than 1 min), so
* is ignored by default. We should make this test not a unit test, but a load test.
*/
@Ignore
@Test
public void testIntervalExplosion()
public void testIterateHugeIntervalsWithTinyGranularity()
{
Interval first = Intervals.of("2012-01-01T00Z/2012-12-31T00Z");
Interval second = Intervals.of("2002-01-01T00Z/2002-12-31T00Z");
Interval third = Intervals.of("2021-01-01T00Z/2021-06-30T00Z");
IntervalsByGranularity intervals = new IntervalsByGranularity(
ImmutableList.of(first, second, third),
Granularity.fromString("SECOND")
Granularities.SECOND
);

// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
long count = getCount(granularityIntervals);
Assert.assertTrue(count == 78537600);

long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
Assert.assertEquals(78537600, count);
}

@Test
Expand All @@ -132,7 +141,6 @@ public void testSimpleEliminateRepeated()
ImmutableList.of(Intervals.of("2012-01-01T00Z/2012-02-01T00Z")),
ImmutableList.copyOf(intervals.granularityIntervalsIterator())
);

}

@Test
Expand Down Expand Up @@ -160,10 +168,9 @@ public void testALittleMoreComplexEliminateRepeated()
),
ImmutableList.copyOf(intervals.granularityIntervalsIterator())
);

}

@Test(expected = IAE.class)
@Test
public void testOverlappingShouldThrow()
{
List<Interval> inputIntervals = ImmutableList.of(
Expand All @@ -174,10 +181,13 @@ public void testOverlappingShouldThrow()

IntervalsByGranularity intervals = new IntervalsByGranularity(
inputIntervals,
Granularity.fromString("DAY")
Granularities.DAY
);
}

Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
Assert.assertEquals(14, count);
}

@Test
public void testWithGranularity()
Expand All @@ -190,13 +200,13 @@ public void testWithGranularity()

IntervalsByGranularity intervals = new IntervalsByGranularity(
inputIntervals,
Granularity.fromString("MONTH")
Granularities.MONTH
);

// get count:
Iterator<Interval> granularityIntervals = intervals.granularityIntervalsIterator();
long count = getCount(granularityIntervals);
Assert.assertTrue(count == 2);
long count = verifyIteratorAndReturnIntervalCount(granularityIntervals);
Assert.assertEquals(2, count);
}

@Test(expected = UnsupportedOperationException.class)
Expand All @@ -223,7 +233,7 @@ public void testEmptyInput()
Assert.assertFalse(intervals.granularityIntervalsIterator().hasNext());
}

private long getCount(Iterator<Interval> granularityIntervalIterator)
private long verifyIteratorAndReturnIntervalCount(Iterator<Interval> granularityIntervalIterator)
{
long count = 0;
Interval previous = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonInclude.Include;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
Expand Down Expand Up @@ -142,8 +144,6 @@ public class CompactionTask extends AbstractBatchIndexTask
@Nullable
private final AggregatorFactory[] metricsSpec;
@Nullable
private final Granularity segmentGranularity;
@Nullable
private final ClientCompactionTaskGranularitySpec granularitySpec;
@Nullable
private final ParallelIndexTuningConfig tuningConfig;
Expand Down Expand Up @@ -207,7 +207,6 @@ public CompactionTask(

this.dimensionsSpec = dimensionsSpec == null ? dimensions : dimensionsSpec;
this.metricsSpec = metricsSpec;
this.segmentGranularity = segmentGranularity;
// Prior to apache/druid#10843 users could specify segmentGranularity using `segmentGranularity`
// Now users should prefer to use `granularitySpec`
// In case users accidentally specify both, and they are conflicting, warn the user instead of proceeding
Expand Down Expand Up @@ -308,6 +307,7 @@ public AggregatorFactory[] getMetricsSpec()
return metricsSpec;
}

@JsonInclude(Include.NON_NULL)
@JsonProperty
@Nullable
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.java.util.common.guava.Comparators;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
Expand Down Expand Up @@ -798,6 +799,79 @@ public void testCompactThenAppend() throws Exception
Assert.assertEquals(expectedSegments, usedSegments);
}

@Test
public void testPartialIntervalCompactWithFinerSegmentGranularityThenFullIntervalCompact() throws Exception
{
// This test fails with segment lock because of the bug reported in https://github.com/apache/druid/issues/10911.
if (lockGranularity == LockGranularity.SEGMENT) {
return;
}

runIndexTask();

final Set<DataSegment> expectedSegments = new HashSet<>(
getStorageCoordinator().retrieveUsedSegmentsForIntervals(
DATA_SOURCE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
Segments.ONLY_VISIBLE
)
);

final Builder builder = new Builder(
DATA_SOURCE,
segmentLoaderFactory,
RETRY_POLICY_FACTORY
);

final Interval partialInterval = Intervals.of("2014-01-01T01:00:00/2014-01-01T02:00:00");
final CompactionTask partialCompactionTask = builder
.interval(partialInterval)
.segmentGranularity(Granularities.MINUTE)
.build();

final Pair<TaskStatus, List<DataSegment>> partialCompactionResult = runTask(partialCompactionTask);
Assert.assertTrue(partialCompactionResult.lhs.isSuccess());
// All segments in the previous expectedSegments should still appear as they have larger segment granularity.
expectedSegments.addAll(partialCompactionResult.rhs);

final Set<DataSegment> segmentsAfterPartialCompaction = new HashSet<>(
getStorageCoordinator().retrieveUsedSegmentsForIntervals(
DATA_SOURCE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
Segments.ONLY_VISIBLE
)
);

Assert.assertEquals(expectedSegments, segmentsAfterPartialCompaction);

final CompactionTask fullCompactionTask = builder
.interval(Intervals.of("2014-01-01/2014-01-02"))
.segmentGranularity(null)
.build();

final Pair<TaskStatus, List<DataSegment>> fullCompactionResult = runTask(fullCompactionTask);
Assert.assertTrue(fullCompactionResult.lhs.isSuccess());

final List<DataSegment> segmentsAfterFullCompaction = new ArrayList<>(
getStorageCoordinator().retrieveUsedSegmentsForIntervals(
DATA_SOURCE,
Collections.singletonList(Intervals.of("2014-01-01/2014-01-02")),
Segments.ONLY_VISIBLE
)
);
segmentsAfterFullCompaction.sort(
(s1, s2) -> Comparators.intervalsByStartThenEnd().compare(s1.getInterval(), s2.getInterval())
);

Assert.assertEquals(3, segmentsAfterFullCompaction.size());
for (int i = 0; i < segmentsAfterFullCompaction.size(); i++) {
Assert.assertEquals(
Intervals.of(StringUtils.format("2014-01-01T%02d/2014-01-01T%02d", i, i + 1)),
segmentsAfterFullCompaction.get(i).getInterval()
);
}
}

@Test
public void testRunIndexAndCompactForSameSegmentAtTheSameTime() throws Exception
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularities;
Expand All @@ -44,16 +43,12 @@ public abstract class BaseGranularitySpec implements GranularitySpec
public static final Granularity DEFAULT_SEGMENT_GRANULARITY = Granularities.DAY;
public static final Granularity DEFAULT_QUERY_GRANULARITY = Granularities.NONE;

protected List<Interval> inputIntervals;
protected final List<Interval> inputIntervals;
protected final Boolean rollup;

public BaseGranularitySpec(List<Interval> inputIntervals, Boolean rollup)
{
if (inputIntervals != null) {
this.inputIntervals = ImmutableList.copyOf(inputIntervals);
} else {
this.inputIntervals = Collections.emptyList();
}
this.inputIntervals = inputIntervals == null ? Collections.emptyList() : inputIntervals;
this.rollup = rollup == null ? DEFAULT_ROLLUP : rollup;
}

Expand Down

0 comments on commit a041933

Please sign in to comment.