Skip to content

Commit

Permalink
continue hadoop job for sparse intervals
Browse files Browse the repository at this point in the history
  • Loading branch information
dclim committed Jan 7, 2016
1 parent 3048b1f commit 2308c8c
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
Expand All @@ -31,12 +32,15 @@
*/
public class FSSpideringIterator implements Iterator<FileStatus>
{
public static FSSpideringIterator spiderPathPropogateExceptions(FileSystem fs, Path path)
public static FSSpideringIterator spiderPathPropagateExceptions(FileSystem fs, Path path)
{
try {
final FileStatus[] statii = fs.listStatus(path);
return new FSSpideringIterator(fs, statii == null ? new FileStatus[]{} : statii);
}
catch (FileNotFoundException e) {
return new FSSpideringIterator(fs, new FileStatus[]{});
}
catch (IOException e) {
throw new RuntimeException(e);
}
Expand All @@ -48,7 +52,7 @@ public static Iterable<FileStatus> spiderIterable(final FileSystem fs, final Pat
{
public Iterator<FileStatus> iterator()
{
return spiderPathPropogateExceptions(fs, path);
return spiderPathPropagateExceptions(fs, path);
}
};
}
Expand Down Expand Up @@ -82,7 +86,7 @@ public FileStatus next()
while (hasNext()) {
if (statii[index].isDir()) {
if (statuses == null) {
statuses = spiderPathPropogateExceptions(fs, statii[index].getPath());
statuses = spiderPathPropagateExceptions(fs, statii[index].getPath());
} else if (statuses.hasNext()) {
return statuses.next();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,29 @@
package io.druid.indexer.path;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.metamx.common.Granularity;
import io.druid.granularity.QueryGranularity;
import io.druid.indexer.HadoopDruidIndexerConfig;
import io.druid.indexer.HadoopIOConfig;
import io.druid.indexer.HadoopIngestionSpec;
import io.druid.indexer.HadoopTuningConfig;
import io.druid.jackson.DefaultObjectMapper;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.granularity.UniformGranularitySpec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.security.UserGroupInformation;
import org.joda.time.Interval;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;

import org.junit.rules.TemporaryFolder;

public class GranularityPathSpecTest
{
Expand All @@ -38,6 +53,9 @@ public class GranularityPathSpecTest

private final ObjectMapper jsonMapper = new DefaultObjectMapper();

@Rule
public final TemporaryFolder testFolder = new TemporaryFolder();

@Before public void setUp()
{
granularityPathSpec = new GranularityPathSpec();
Expand Down Expand Up @@ -70,7 +88,7 @@ public class GranularityPathSpecTest
{
Granularity granularity = Granularity.DAY;
granularityPathSpec.setDataGranularity(granularity);
Assert.assertEquals(granularity,granularityPathSpec.getDataGranularity());
Assert.assertEquals(granularity, granularityPathSpec.getDataGranularity());
}

@Test
Expand All @@ -85,6 +103,57 @@ public void testSerdeNoInputFormat() throws Exception
testSerde("/test/path", "*.test", "pat_pat", Granularity.SECOND, null);
}

@Test
public void testAddInputPath() throws Exception
{
UserGroupInformation.setLoginUser(UserGroupInformation.createUserForTesting("test", new String[]{"testGroup"}));
HadoopIngestionSpec spec = new HadoopIngestionSpec(
new DataSchema(
"foo",
null,
new AggregatorFactory[0],
new UniformGranularitySpec(
Granularity.DAY,
QueryGranularity.MINUTE,
ImmutableList.of(new Interval("2015-11-06T00:00Z/2015-11-07T00:00Z"))
),
jsonMapper
),
new HadoopIOConfig(null, null, null),
new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null)
);

granularityPathSpec.setDataGranularity(Granularity.HOUR);
granularityPathSpec.setFilePattern(".*");
granularityPathSpec.setInputFormat(TextInputFormat.class);

Job job = Job.getInstance();
String formatStr = "file:%s/%s;org.apache.hadoop.mapreduce.lib.input.TextInputFormat";

testFolder.newFolder("test", "y=2015", "m=11", "d=06", "H=00");
testFolder.newFolder("test", "y=2015", "m=11", "d=06", "H=02");
testFolder.newFolder("test", "y=2015", "m=11", "d=06", "H=05");
testFolder.newFile("test/y=2015/m=11/d=06/H=00/file1");
testFolder.newFile("test/y=2015/m=11/d=06/H=02/file2");
testFolder.newFile("test/y=2015/m=11/d=06/H=05/file3");
testFolder.newFile("test/y=2015/m=11/d=06/H=05/file4");

granularityPathSpec.setInputPath(testFolder.getRoot().getPath() + "/test");

granularityPathSpec.addInputPaths(HadoopDruidIndexerConfig.fromSpec(spec), job);

String actual = job.getConfiguration().get("mapreduce.input.multipleinputs.dir.formats");

String expected = Joiner.on(",").join(Lists.newArrayList(
String.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=00/file1"),
String.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=02/file2"),
String.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=05/file3"),
String.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=05/file4")
));

Assert.assertEquals("Did not find expected input paths", expected, actual);
}

private void testSerde(
String inputPath,
String filePattern,
Expand Down

0 comments on commit 2308c8c

Please sign in to comment.