Skip to content

Commit

Permalink
Merge pull request apache#1472 from himanshug/combiner
Browse files Browse the repository at this point in the history
Use Combiner to merge InputRows at the Mapper during Hadoop Batch Ingestion
  • Loading branch information
fjy committed Jul 20, 2015
2 parents 0208273 + 0eec1bb commit c4ed8fe
Show file tree
Hide file tree
Showing 20 changed files with 949 additions and 101 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Writable value,
Object value,
Context context
) throws IOException, InterruptedException
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Writable value,
Object value,
Context context
) throws IOException, InterruptedException
{
Expand Down Expand Up @@ -340,7 +340,7 @@ protected void setup(Context context)
@Override
protected void innerMap(
InputRow inputRow,
Writable value,
Object value,
Context context
) throws IOException, InterruptedException
{
Expand Down Expand Up @@ -378,7 +378,7 @@ public DeterminePartitionsDimSelectionMapperHelper(HadoopDruidIndexerConfig conf
}

public void emitDimValueCounts(
TaskInputOutputContext<? extends Writable, ? extends Writable, BytesWritable, Text> context,
TaskInputOutputContext<?, ?, BytesWritable, Text> context,
DateTime timestamp,
Map<String, Iterable<String>> dims
) throws IOException, InterruptedException
Expand Down Expand Up @@ -891,7 +891,7 @@ public static DimValueCount fromText(Text text)
}

private static void write(
TaskInputOutputContext<? extends Writable, ? extends Writable, BytesWritable, Text> context,
TaskInputOutputContext<?, ?, BytesWritable, Text> context,
final byte[] groupKey,
DimValueCount dimValueCount
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,11 @@

import com.metamx.common.RE;

public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<Writable, Writable, KEYOUT, VALUEOUT>
public abstract class HadoopDruidIndexerMapper<KEYOUT, VALUEOUT> extends Mapper<Object, Object, KEYOUT, VALUEOUT>
{
private static final Logger log = new Logger(HadoopDruidIndexerMapper.class);

private HadoopDruidIndexerConfig config;
protected HadoopDruidIndexerConfig config;
private InputRowParser parser;
protected GranularitySpec granularitySpec;

Expand All @@ -70,7 +70,7 @@ public InputRowParser getParser()

@Override
protected void map(
Writable key, Writable value, Context context
Object key, Object value, Context context
) throws IOException, InterruptedException
{
try {
Expand Down Expand Up @@ -99,7 +99,7 @@ protected void map(
}
}

public final static InputRow parseInputRow(Writable value, InputRowParser parser)
public final static InputRow parseInputRow(Object value, InputRowParser parser)
{
if(parser instanceof StringInputRowParser && value instanceof Text) {
//Note: This is to ensure backward compatibility with 0.7.0 and before
Expand All @@ -109,7 +109,7 @@ public final static InputRow parseInputRow(Writable value, InputRowParser parser
}
}

abstract protected void innerMap(InputRow inputRow, Writable value, Context context)
abstract protected void innerMap(InputRow inputRow, Object value, Context context)
throws IOException, InterruptedException;

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ public class HadoopTuningConfig implements TuningConfig
private static final int DEFAULT_ROW_FLUSH_BOUNDARY = 80000;
private static final int DEFAULT_BUFFER_SIZE = 128 * 1024 * 1024;
private static final float DEFAULT_AGG_BUFFER_RATIO = 0.5f;
private static final boolean DEFAULT_USE_COMBINER = false;

public static HadoopTuningConfig makeDefaultTuningConfig()
{
Expand All @@ -61,7 +62,8 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
false,
false,
DEFAULT_BUFFER_SIZE,
DEFAULT_AGG_BUFFER_RATIO
DEFAULT_AGG_BUFFER_RATIO,
DEFAULT_USE_COMBINER
);
}

Expand All @@ -81,6 +83,7 @@ public static HadoopTuningConfig makeDefaultTuningConfig()
private final boolean ingestOffheap;
private final int bufferSize;
private final float aggregationBufferRatio;
private final boolean useCombiner;

@JsonCreator
public HadoopTuningConfig(
Expand All @@ -99,7 +102,8 @@ public HadoopTuningConfig(
final @JsonProperty("persistInHeap") boolean persistInHeap,
final @JsonProperty("ingestOffheap") boolean ingestOffheap,
final @JsonProperty("bufferSize") Integer bufferSize,
final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio
final @JsonProperty("aggregationBufferRatio") Float aggregationBufferRatio,
final @JsonProperty("useCombiner") Boolean useCombiner
)
{
this.workingPath = workingPath;
Expand All @@ -120,6 +124,7 @@ public HadoopTuningConfig(
this.ingestOffheap = ingestOffheap;
this.bufferSize = bufferSize == null ? DEFAULT_BUFFER_SIZE : bufferSize;
this.aggregationBufferRatio = aggregationBufferRatio == null ? DEFAULT_AGG_BUFFER_RATIO : aggregationBufferRatio;
this.useCombiner = useCombiner == null ? DEFAULT_USE_COMBINER : useCombiner.booleanValue();
}

@JsonProperty
Expand Down Expand Up @@ -216,6 +221,12 @@ public float getAggregationBufferRatio()
return aggregationBufferRatio;
}

@JsonProperty
public boolean getUseCombiner()
{
return useCombiner;
}

public HadoopTuningConfig withWorkingPath(String path)
{
return new HadoopTuningConfig(
Expand All @@ -234,7 +245,8 @@ public HadoopTuningConfig withWorkingPath(String path)
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio
aggregationBufferRatio,
useCombiner
);
}

Expand All @@ -256,7 +268,8 @@ public HadoopTuningConfig withVersion(String ver)
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio
aggregationBufferRatio,
useCombiner
);
}

Expand All @@ -278,7 +291,8 @@ public HadoopTuningConfig withShardSpecs(Map<DateTime, List<HadoopyShardSpec>> s
persistInHeap,
ingestOffheap,
bufferSize,
aggregationBufferRatio
aggregationBufferRatio,
useCombiner
);
}
}
Loading

0 comments on commit c4ed8fe

Please sign in to comment.