From 2308c8c07f08c02dd7eb161250dd0d5d8c1e7998 Mon Sep 17 00:00:00 2001 From: dclim Date: Mon, 23 Nov 2015 19:27:45 -0700 Subject: [PATCH] continue hadoop job for sparse intervals --- .../indexer/hadoop/FSSpideringIterator.java | 10 ++- .../indexer/path/GranularityPathSpecTest.java | 73 ++++++++++++++++++- 2 files changed, 78 insertions(+), 5 deletions(-) diff --git a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/FSSpideringIterator.java b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/FSSpideringIterator.java index c7587f092399..22d558628d70 100644 --- a/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/FSSpideringIterator.java +++ b/indexing-hadoop/src/main/java/io/druid/indexer/hadoop/FSSpideringIterator.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import java.io.FileNotFoundException; import java.io.IOException; import java.util.Iterator; import java.util.NoSuchElementException; @@ -31,12 +32,15 @@ */ public class FSSpideringIterator implements Iterator { - public static FSSpideringIterator spiderPathPropogateExceptions(FileSystem fs, Path path) + public static FSSpideringIterator spiderPathPropagateExceptions(FileSystem fs, Path path) { try { final FileStatus[] statii = fs.listStatus(path); return new FSSpideringIterator(fs, statii == null ? new FileStatus[]{} : statii); } + catch (FileNotFoundException e) { + return new FSSpideringIterator(fs, new FileStatus[]{}); + } catch (IOException e) { throw new RuntimeException(e); } @@ -48,7 +52,7 @@ public static Iterable spiderIterable(final FileSystem fs, final Pat { public Iterator iterator() { - return spiderPathPropogateExceptions(fs, path); + return spiderPathPropagateExceptions(fs, path); } }; } @@ -82,7 +86,7 @@ public FileStatus next() while (hasNext()) { if (statii[index].isDir()) { if (statuses == null) { - statuses = spiderPathPropogateExceptions(fs, statii[index].getPath()); + statuses = spiderPathPropagateExceptions(fs, statii[index].getPath()); } else if (statuses.hasNext()) { return statuses.next(); } diff --git a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java index f206b9f0df2f..302446cad818 100644 --- a/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java +++ b/indexing-hadoop/src/test/java/io/druid/indexer/path/GranularityPathSpecTest.java @@ -20,14 +20,29 @@ package io.druid.indexer.path; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; import com.metamx.common.Granularity; +import io.druid.granularity.QueryGranularity; +import io.druid.indexer.HadoopDruidIndexerConfig; +import io.druid.indexer.HadoopIOConfig; +import io.druid.indexer.HadoopIngestionSpec; +import io.druid.indexer.HadoopTuningConfig; import io.druid.jackson.DefaultObjectMapper; +import io.druid.query.aggregation.AggregatorFactory; +import io.druid.segment.indexing.DataSchema; +import io.druid.segment.indexing.granularity.UniformGranularitySpec; +import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.security.UserGroupInformation; +import org.joda.time.Interval; import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; - +import org.junit.rules.TemporaryFolder; public class GranularityPathSpecTest { @@ -38,6 +53,9 @@ public class GranularityPathSpecTest private final ObjectMapper jsonMapper = new DefaultObjectMapper(); + @Rule + public final TemporaryFolder testFolder = new TemporaryFolder(); + @Before public void setUp() { granularityPathSpec = new GranularityPathSpec(); @@ -70,7 +88,7 @@ public class GranularityPathSpecTest { Granularity granularity = Granularity.DAY; granularityPathSpec.setDataGranularity(granularity); - Assert.assertEquals(granularity,granularityPathSpec.getDataGranularity()); + Assert.assertEquals(granularity, granularityPathSpec.getDataGranularity()); } @Test @@ -85,6 +103,57 @@ public void testSerdeNoInputFormat() throws Exception testSerde("/test/path", "*.test", "pat_pat", Granularity.SECOND, null); } + @Test + public void testAddInputPath() throws Exception + { + UserGroupInformation.setLoginUser(UserGroupInformation.createUserForTesting("test", new String[]{"testGroup"})); + HadoopIngestionSpec spec = new HadoopIngestionSpec( + new DataSchema( + "foo", + null, + new AggregatorFactory[0], + new UniformGranularitySpec( + Granularity.DAY, + QueryGranularity.MINUTE, + ImmutableList.of(new Interval("2015-11-06T00:00Z/2015-11-07T00:00Z")) + ), + jsonMapper + ), + new HadoopIOConfig(null, null, null), + new HadoopTuningConfig(null, null, null, null, null, null, false, false, false, false, null, false, false, null) + ); + + granularityPathSpec.setDataGranularity(Granularity.HOUR); + granularityPathSpec.setFilePattern(".*"); + granularityPathSpec.setInputFormat(TextInputFormat.class); + + Job job = Job.getInstance(); + String formatStr = "file:%s/%s;org.apache.hadoop.mapreduce.lib.input.TextInputFormat"; + + testFolder.newFolder("test", "y=2015", "m=11", "d=06", "H=00"); + testFolder.newFolder("test", "y=2015", "m=11", "d=06", "H=02"); + testFolder.newFolder("test", "y=2015", "m=11", "d=06", "H=05"); + testFolder.newFile("test/y=2015/m=11/d=06/H=00/file1"); + testFolder.newFile("test/y=2015/m=11/d=06/H=02/file2"); + testFolder.newFile("test/y=2015/m=11/d=06/H=05/file3"); + testFolder.newFile("test/y=2015/m=11/d=06/H=05/file4"); + + granularityPathSpec.setInputPath(testFolder.getRoot().getPath() + "/test"); + + granularityPathSpec.addInputPaths(HadoopDruidIndexerConfig.fromSpec(spec), job); + + String actual = job.getConfiguration().get("mapreduce.input.multipleinputs.dir.formats"); + + String expected = Joiner.on(",").join(Lists.newArrayList( + String.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=00/file1"), + String.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=02/file2"), + String.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=05/file3"), + String.format(formatStr, testFolder.getRoot(), "test/y=2015/m=11/d=06/H=05/file4") + )); + + Assert.assertEquals("Did not find expected input paths", expected, actual); + } + private void testSerde( String inputPath, String filePattern,