Skip to content

Commit

Permalink
Trimming out outside of given interval (apache#2798)
Browse files Browse the repository at this point in the history
* Trimming out outside of given interval (Fix for apache#2659)

* addressed comments
  • Loading branch information
navis authored and fjy committed Dec 8, 2016
1 parent bb9e35e commit f794246
Show file tree
Hide file tree
Showing 6 changed files with 119 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,13 @@ public Optional<Set<Interval>> getSegmentGranularIntervals()
);
}

public List<Interval> getInputIntervals()
{
return schema.getDataSchema()
.getGranularitySpec()
.inputIntervals();
}

public Optional<Iterable<Bucket>> getAllBuckets()
{
Optional<Set<Interval>> intervals = getSegmentGranularIntervals();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
package io.druid.indexer.path;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;

import io.druid.indexer.HadoopDruidIndexerConfig;
Expand Down Expand Up @@ -113,13 +112,10 @@ public void setPathFormat(String pathFormat)
@Override
public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOException
{
final Set<Interval> intervals = Sets.newTreeSet(Comparators.intervals());
Optional<Set<Interval>> optionalIntervals = config.getSegmentGranularIntervals();
if (optionalIntervals.isPresent()) {
for (Interval segmentInterval : optionalIntervals.get()) {
for (Interval dataInterval : dataGranularity.getIterable(segmentInterval)) {
intervals.add(dataInterval);
}
final Set<Interval> intervals = Sets.newTreeSet(Comparators.intervalsByStartThenEnd());
for (Interval inputInterval : config.getInputIntervals()) {
for (Interval interval : dataGranularity.getIterable(inputInterval)) {
intervals.add(trim(inputInterval, interval));
}
}

Expand Down Expand Up @@ -158,4 +154,22 @@ public Job addInputPaths(HadoopDruidIndexerConfig config, Job job) throws IOExce

return job;
}

private Interval trim(Interval inputInterval, Interval interval)
{
long start = interval.getStartMillis();
long end = interval.getEndMillis();

boolean makeNew = false;
if (start < inputInterval.getStartMillis()) {
start = inputInterval.getStartMillis();
makeNew = true;
}
if (end > inputInterval.getEndMillis()) {
end = inputInterval.getEndMillis();
makeNew = true;
}
return makeNew ? new Interval(start, end) : interval;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

import java.io.IOException;
import java.util.Arrays;

public class GranularityPathSpecTest
{
private GranularityPathSpec granularityPathSpec;
Expand Down Expand Up @@ -173,6 +176,67 @@ public void testAddInputPath() throws Exception
Assert.assertEquals("Did not find expected input paths", expected, actual);
}

@Test
public void testIntervalTrimming() throws Exception
{
UserGroupInformation.setLoginUser(UserGroupInformation.createUserForTesting("test", new String[]{"testGroup"}));
HadoopIngestionSpec spec = new HadoopIngestionSpec(
new DataSchema(
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(
Granularity.DAY,
QueryGranularity.ALL,
ImmutableList.of(new Interval("2015-01-01T11Z/2015-01-02T05Z"))
),
jsonMapper
),
new HadoopIOConfig(null, null, null),
new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null, null, null)
);

granularityPathSpec.setDataGranularity(Granularity.HOUR);
granularityPathSpec.setPathFormat("yyyy/MM/dd/HH");
granularityPathSpec.setFilePattern(".*");
granularityPathSpec.setInputFormat(TextInputFormat.class);

Job job = Job.getInstance();
String formatStr = "file:%s/%s;org.apache.hadoop.mapreduce.lib.input.TextInputFormat";

createFile(
testFolder,
"test/2015/01/01/00/file1", "test/2015/01/01/10/file2", "test/2015/01/01/18/file3", "test/2015/01/02/00/file1",
"test/2015/01/02/03/file2", "test/2015/01/02/05/file3", "test/2015/01/02/07/file4", "test/2015/01/02/09/file5"
);

granularityPathSpec.setInputPath(testFolder.getRoot().getPath() + "/test");

granularityPathSpec.addInputPaths(HadoopDruidIndexerConfig.fromSpec(spec), job);

String actual = job.getConfiguration().get("mapreduce.input.multipleinputs.dir.formats");

String expected = Joiner.on(",").join(
Lists.newArrayList(
String.format(formatStr, testFolder.getRoot(), "test/2015/01/01/18/file3"),
String.format(formatStr, testFolder.getRoot(), "test/2015/01/02/00/file1"),
String.format(formatStr, testFolder.getRoot(), "test/2015/01/02/03/file2")
)
);

Assert.assertEquals("Did not find expected input paths", expected, actual);
}

private void createFile(TemporaryFolder folder, String... files) throws IOException
{
for (String file : files) {
String[] split = file.split("/");
Assert.assertTrue(split.length > 1);
folder.newFolder(Arrays.copyOfRange(split, 0, split.length - 1));
folder.newFile(file);
}
}

private void testSerde(
String inputPath,
String filePattern,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.PeekingIterator;
Expand Down Expand Up @@ -107,7 +108,13 @@ public ArbitraryGranularitySpec(
@JsonProperty("intervals")
public Optional<SortedSet<Interval>> bucketIntervals()
{
return Optional.of((SortedSet<Interval>) intervals);
return Optional.<SortedSet<Interval>>of(intervals);
}

@Override
public List<Interval> inputIntervals()
{
return ImmutableList.copyOf(intervals);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,12 @@
import org.joda.time.DateTime;
import org.joda.time.Interval;

import java.util.List;
import java.util.SortedSet;

/**
* Tells the indexer how to group events based on timestamp. The events may then be further partitioned based
* on anything, using a ShardSpec.
* on anything, using a ShardSpec.
*/
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = UniformGranularitySpec.class)
@JsonSubTypes(value = {
Expand All @@ -47,14 +48,22 @@ public interface GranularitySpec
*
* @return set of all time groups
*/
public Optional<SortedSet<Interval>> bucketIntervals();
public Optional<SortedSet<Interval>> bucketIntervals();

/**
* Returns user provided intervals as-is state. used for configuring granular path spec
*
* @return
*/
public List<Interval> inputIntervals();

/**
* Time-grouping interval corresponding to some instant, if any.
*
* @param dt instant to return time interval for
*
* @return optional time interval
* */
*/
public Optional<Interval> bucketInterval(DateTime dt);

public Granularity getSegmentGranularity();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,12 @@ public Optional<SortedSet<Interval>> bucketIntervals()
}
}

@Override
public List<Interval> inputIntervals()
{
return inputIntervals == null ? ImmutableList.<Interval>of() : ImmutableList.copyOf(inputIntervals);
}

@Override
public Optional<Interval> bucketInterval(DateTime dt)
{
Expand Down

0 comments on commit f794246

Please sign in to comment.