Skip to content

Commit

Permalink
Calculate max split size based on numMapTask in DatasourceInputFormat (
Browse files Browse the repository at this point in the history
…apache#2882)

* Calculate max split size based on numMapTask

* updated docs & fixed possible ArithmeticException
  • Loading branch information
navis authored and fjy committed Jul 20, 2016
1 parent fd798d3 commit cd7337f
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 4 deletions.
2 changes: 1 addition & 1 deletion docs/content/ingestion/update-existing-data.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ This is a type of `inputSpec` that reads data already stored inside Druid.
|-----|----|-----------|--------|
|type|String.|This should always be 'dataSource'.|yes|
|ingestionSpec|JSON object.|Specification of Druid segments to be loaded. See below.|yes|
|maxSplitSize|Number|Enables combining multiple segments into single Hadoop InputSplit according to size of segments. Default is none. |no|
|maxSplitSize|Number|Enables combining multiple segments into single Hadoop InputSplit according to size of segments. With -1, druid calculates max split size based on user specified number of map task(mapred.map.tasks or mapreduce.job.maps). By default, one split is made for one segment. |no|

Here is what goes inside `ingestionSpec`:

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ public List<InputSplit> getSplits(JobContext context) throws IOException, Interr
logger.info("segments to read [%s]", segmentsStr);

long maxSize = conf.getLong(CONF_MAX_SPLIT_SIZE, 0);
if (maxSize < 0) {
long totalSize = 0;
for (WindowedDataSegment segment : segments) {
totalSize += segment.getSegment().getSize();
}
int mapTask = ((JobConf)conf).getNumMapTasks();
if (mapTask > 0) {
maxSize = totalSize / mapTask;
}
}

if (maxSize > 0) {
//combining is to happen, let us sort the segments list by size so that they
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import io.druid.jackson.DefaultObjectMapper;
import io.druid.timeline.DataSegment;
import io.druid.timeline.partition.NoneShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -55,7 +54,7 @@ public class DatasourceInputFormatTest
{
private List<WindowedDataSegment> segments;
private List<LocatedFileStatus> locations;
private Configuration config;
private JobConf config;
private JobContext context;

@Before
Expand Down Expand Up @@ -142,7 +141,7 @@ public void setUp() throws Exception
)
);

config = new Configuration();
config = new JobConf();
config.set(
DatasourceInputFormat.CONF_INPUT_SEGMENTS,
new DefaultObjectMapper().writeValueAsString(segments)
Expand Down Expand Up @@ -238,6 +237,34 @@ public void testGetSplitsCombineInTwo() throws Exception
Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(1).getLocations());
}

@Test
public void testGetSplitsCombineCalculated() throws Exception
{
config.set(DatasourceInputFormat.CONF_MAX_SPLIT_SIZE, "-1");
config.setNumMapTasks(3);
List<InputSplit> splits = new DatasourceInputFormat().setSupplier(testFormatter).getSplits(context);

Assert.assertEquals(3, splits.size());

Assert.assertEquals(
Sets.newHashSet(segments.get(0)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(0)).getSegments()))
);
Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(0).getLocations());

Assert.assertEquals(
Sets.newHashSet(segments.get(2)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(1)).getSegments()))
);
Assert.assertArrayEquals(new String[]{"s2", "s3"}, splits.get(1).getLocations());

Assert.assertEquals(
Sets.newHashSet(segments.get(1)),
Sets.newHashSet((((DatasourceInputSplit) splits.get(2)).getSegments()))
);
Assert.assertArrayEquals(new String[]{"s1", "s2"}, splits.get(2).getLocations());
}

@Test
public void testGetRecordReader() throws Exception
{
Expand Down

0 comments on commit cd7337f

Please sign in to comment.