Skip to content

Commit

Permalink
Merge pull request apache#1504 from metamx/fix-1447
Browse files Browse the repository at this point in the history
fix for apache#1447
  • Loading branch information
fjy committed Jul 14, 2015
2 parents 14a0707 + 5fe27fe commit 3f7ba58
Show file tree
Hide file tree
Showing 9 changed files with 28 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ static RealtimeTuningConfig convertTuningConfig(ShardSpec spec, IndexTuningConfi
config.getIndexSpec(),
null,
null,
null,
null
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ public Plumber findPlumber(
indexSpec,
false,
false,
null
null,
0.3F
)
)
);
Expand Down Expand Up @@ -282,6 +283,8 @@ public Plumber findPlumber(
task.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity(),
task2.getRealtimeIngestionSchema().getDataSchema().getGranularitySpec().getSegmentGranularity()
);
Assert.assertEquals(task.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(),
task2.getRealtimeIngestionSchema().getTuningConfig().getAggregationBufferRatio(), 0.0f);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,7 @@ public void testRealtimeIndexTask() throws Exception{
null,
null,
null,
null,
null
);
FireDepartment fireDepartment = new FireDepartment(dataSchema, realtimeIOConfig, realtimeTuningConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class RealtimeTuningConfig implements TuningConfig
private static final boolean defaultPersistInHeap = false;
private static final boolean defaultIngestOffheap = false;
private static final int defaultBufferSize = 128 * 1024* 1024; // 128M

private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f;

// Might make sense for this to be a builder
public static RealtimeTuningConfig makeDefaultTuningConfig()
Expand All @@ -66,7 +66,8 @@ public static RealtimeTuningConfig makeDefaultTuningConfig()
defaultIndexSpec,
defaultPersistInHeap,
defaultIngestOffheap,
defaultBufferSize
defaultBufferSize,
DEFAULT_AGG_BUFFER_RATIO
);
}

Expand All @@ -82,6 +83,7 @@ public static RealtimeTuningConfig makeDefaultTuningConfig()
private final boolean persistInHeap;
private final boolean ingestOffheap;
private final int bufferSize;
private final float aggregationBufferRatio;

@JsonCreator
public RealtimeTuningConfig(
Expand All @@ -96,7 +98,8 @@ public RealtimeTuningConfig(
@JsonProperty("indexSpec") IndexSpec indexSpec,
@JsonProperty("persistInHeap") Boolean persistInHeap,
@JsonProperty("ingestOffheap") Boolean ingestOffheap,
@JsonProperty("buffersize") Integer bufferSize
@JsonProperty("buffersize") Integer bufferSize,
@JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio
)
{
this.maxRowsInMemory = maxRowsInMemory == null ? defaultMaxRowsInMemory : maxRowsInMemory;
Expand All @@ -115,7 +118,7 @@ public RealtimeTuningConfig(
this.persistInHeap = persistInHeap == null ? defaultPersistInHeap : persistInHeap;
this.ingestOffheap = ingestOffheap == null ? defaultIngestOffheap : ingestOffheap;
this.bufferSize = bufferSize == null ? defaultBufferSize : bufferSize;

this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio;
}

@JsonProperty
Expand Down Expand Up @@ -188,6 +191,12 @@ public int getBufferSize(){
return bufferSize;
}

@JsonProperty
public float getAggregationBufferRatio()
{
return aggregationBufferRatio;
}

public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
{
return new RealtimeTuningConfig(
Expand All @@ -202,7 +211,8 @@ public RealtimeTuningConfig withVersioningPolicy(VersioningPolicy policy)
indexSpec,
persistInHeap,
ingestOffheap,
bufferSize
bufferSize,
aggregationBufferRatio
);
}

Expand All @@ -220,7 +230,8 @@ public RealtimeTuningConfig withBasePersistDirectory(File dir)
indexSpec,
persistInHeap,
ingestOffheap,
bufferSize
bufferSize,
aggregationBufferRatio
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ private FireHydrant makeNewCurrIndex(long minTimestamp, DataSchema schema)
newIndex = new OffheapIncrementalIndex(
indexSchema,
// Assuming half space for aggregates
new OffheapBufferPool(config.getBufferSize()),
new OffheapBufferPool((int) ((double) config.getBufferSize() * config.getAggregationBufferRatio())),
true,
config.getBufferSize()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public void testSerde() throws Exception
)
),
new RealtimeTuningConfig(
null, null, null, null, null, null, null, null, null, false, false, null
null, null, null, null, null, null, null, null, null, false, false, null, null
)
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ public Plumber findPlumber(
null,
null,
null,
null,
null
);
plumber = new TestPlumber(new Sink(new Interval("0/P5000Y"), schema, tuningConfig, new DateTime().toString()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ public InputRowParser withParseSpec(ParseSpec parseSpec)
null,
null,
null,
null,
null
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public void testSwap() throws Exception
null,
false,
false,
null,
null
);
final Sink sink = new Sink(interval, schema, tuningConfig, version);
Expand Down

0 comments on commit 3f7ba58

Please sign in to comment.