Skip to content

Commit

Permalink
Merge pull request apache#2290 from gianm/index-merger-v9-stuff
Browse files Browse the repository at this point in the history
Respect buildV9Directly in PlumberSchools, so it works on standalone realtime.
  • Loading branch information
jon-wei committed Jan 19, 2016
2 parents 0c31f00 + 1dcf22e commit df2906a
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ public class IndexGeneratorJobTest
{

@Parameterized.Parameters(name = "partitionType={0}, interval={1}, shardInfoForEachSegment={2}, data={3}, " +
"inputFormatName={4}")
"inputFormatName={4}, buildV9Directly={5}")
public static Collection<Object[]> constructFeed()
{
return Arrays.asList(
final List<Object[]> baseConstructors = Arrays.asList(
new Object[][]{
{
false,
Expand Down Expand Up @@ -273,22 +273,39 @@ public static Collection<Object[]> constructFeed()
}
}
);

// Run each baseConstructor with/without buildV9Directly.
final List<Object[]> constructors = Lists.newArrayList();
for (Object[] baseConstructor : baseConstructors) {
final Object[] c1 = new Object[baseConstructor.length + 1];
final Object[] c2 = new Object[baseConstructor.length + 1];
System.arraycopy(baseConstructor, 0, c1, 0, baseConstructor.length);
System.arraycopy(baseConstructor, 0, c2, 0, baseConstructor.length);
c1[c1.length - 1] = true;
c2[c2.length - 1] = false;
constructors.add(c1);
constructors.add(c2);
}

return constructors;
}

@Rule
public final TemporaryFolder temporaryFolder = new TemporaryFolder();

private final boolean useCombiner;
private final String partitionType;
private final Interval interval;
private final Object[][][] shardInfoForEachSegment;
private final List<String> data;
private final String inputFormatName;
private final InputRowParser inputRowParser;
private final boolean buildV9Directly;

private ObjectMapper mapper;
private HadoopDruidIndexerConfig config;
private File dataFile;
private File tmpDir;
private Interval interval;
private String partitionType;
private Object[][][] shardInfoForEachSegment;
private List<String> data;
private boolean useCombiner;
private String inputFormatName;
private InputRowParser inputRowParser;

public IndexGeneratorJobTest(
boolean useCombiner,
Expand All @@ -297,7 +314,8 @@ public IndexGeneratorJobTest(
Object[][][] shardInfoForEachSegment,
List<String> data,
String inputFormatName,
InputRowParser inputRowParser
InputRowParser inputRowParser,
boolean buildV9Directly
) throws IOException
{
this.useCombiner = useCombiner;
Expand All @@ -307,6 +325,7 @@ public IndexGeneratorJobTest(
this.data = data;
this.inputFormatName = inputFormatName;
this.inputRowParser = inputRowParser;
this.buildV9Directly = buildV9Directly;
}

private void writeDataToLocalSequenceFile(File outputFile, List<String> data) throws IOException
Expand Down Expand Up @@ -396,7 +415,7 @@ public void setUp() throws Exception
false,
useCombiner,
null,
null
buildV9Directly
)
)
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.druid.query.QueryRunner;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.QueryableIndex;
import io.druid.segment.SegmentUtils;
import io.druid.segment.incremental.IndexSizeExceededException;
Expand Down Expand Up @@ -68,6 +69,7 @@ public class YeOldePlumberSchool implements PlumberSchool
private final DataSegmentPusher dataSegmentPusher;
private final File tmpSegmentDir;
private final IndexMerger indexMerger;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;

private static final Logger log = new Logger(YeOldePlumberSchool.class);
Expand All @@ -79,6 +81,7 @@ public YeOldePlumberSchool(
@JacksonInject("segmentPusher") DataSegmentPusher dataSegmentPusher,
@JacksonInject("tmpSegmentDir") File tmpSegmentDir,
@JacksonInject IndexMerger indexMerger,
@JacksonInject IndexMergerV9 indexMergerV9,
@JacksonInject IndexIO indexIO
)
{
Expand All @@ -87,6 +90,7 @@ public YeOldePlumberSchool(
this.dataSegmentPusher = dataSegmentPusher;
this.tmpSegmentDir = tmpSegmentDir;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
}

Expand All @@ -106,6 +110,9 @@ public Plumber findPlumber(
// Set of spilled segments. Will be merged at the end.
final Set<File> spilled = Sets.newHashSet();

// IndexMerger implementation.
final IndexMerger theIndexMerger = config.getBuildV9Directly() ? indexMergerV9 : indexMerger;

return new Plumber()
{
@Override
Expand Down Expand Up @@ -174,7 +181,7 @@ public void finishJob()
}

fileToUpload = new File(tmpSegmentDir, "merged");
indexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload, config.getIndexSpec());
theIndexMerger.mergeQueryableIndex(indexes, schema.getAggregators(), fileToUpload, config.getIndexSpec());
}

// Map merged segment so we can extract dimensions
Expand Down Expand Up @@ -219,7 +226,7 @@ private void spillIfSwappable()
log.info("Spilling index[%d] with rows[%d] to: %s", indexToPersist.getCount(), rowsToPersist, dirToPersist);

try {
indexMerger.persist(
theIndexMerger.persist(
indexToPersist.getIndex(),
dirToPersist,
config.getIndexSpec()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,12 @@ private static Interval makeInterval(IndexIngestionSpec ingestionSchema)
);
}

static RealtimeTuningConfig convertTuningConfig(ShardSpec shardSpec, int rowFlushBoundary, IndexSpec indexSpec)
static RealtimeTuningConfig convertTuningConfig(
ShardSpec shardSpec,
int rowFlushBoundary,
IndexSpec indexSpec,
boolean buildV9Directly
)
{
return new RealtimeTuningConfig(
rowFlushBoundary,
Expand All @@ -136,7 +141,7 @@ static RealtimeTuningConfig convertTuningConfig(ShardSpec shardSpec, int rowFlus
null,
shardSpec,
indexSpec,
null
buildV9Directly
);
}

Expand Down Expand Up @@ -355,19 +360,22 @@ public DataSegment push(File file, DataSegment segment) throws IOException
final FireDepartmentMetrics metrics = new FireDepartmentMetrics();
final Firehose firehose = firehoseFactory.connect(ingestionSchema.getDataSchema().getParser());
final Supplier<Committer> committerSupplier = Committers.supplierFromFirehose(firehose);
final IndexMerger indexMerger = ingestionSchema.getTuningConfig().getBuildV9Directly()
? toolbox.getIndexMergerV9()
: toolbox.getIndexMerger();
final Plumber plumber = new YeOldePlumberSchool(
interval,
version,
wrappedDataSegmentPusher,
tmpDir,
indexMerger,
toolbox.getIndexMerger(),
toolbox.getIndexMergerV9(),
toolbox.getIndexIO()
).findPlumber(
schema,
convertTuningConfig(shardSpec, myRowFlushBoundary, ingestionSchema.getTuningConfig().getIndexSpec()),
convertTuningConfig(
shardSpec,
myRowFlushBoundary,
ingestionSchema.getTuningConfig().getIndexSpec(),
ingestionSchema.tuningConfig.getBuildV9Directly()
),
metrics
);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import io.druid.query.QueryRunnerFactory;
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.query.QueryToolChest;
import io.druid.segment.IndexMerger;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeIOConfig;
import io.druid.segment.indexing.RealtimeTuningConfig;
Expand Down Expand Up @@ -293,9 +292,6 @@ public String getVersion(final Interval interval)
);
this.queryRunnerFactoryConglomerate = toolbox.getQueryRunnerFactoryConglomerate();

IndexMerger indexMerger = spec.getTuningConfig().getBuildV9Directly()
? toolbox.getIndexMergerV9()
: toolbox.getIndexMerger();
// NOTE: This pusher selects path based purely on global configuration and the DataSegment, which means
// NOTE: that redundant realtime tasks will upload to the same location. This can cause index.zip and
// NOTE: descriptor.json to mismatch, or it can cause historical nodes to load different instances of the
Expand All @@ -308,7 +304,8 @@ public String getVersion(final Interval interval)
segmentPublisher,
toolbox.getSegmentHandoffNotifierFactory(),
toolbox.getQueryExecutorService(),
indexMerger,
toolbox.getIndexMerger(),
toolbox.getIndexMergerV9(),
toolbox.getIndexIO(),
toolbox.getCache(),
toolbox.getCacheConfig(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import io.druid.indexing.common.actions.LockListAction;
import io.druid.indexing.common.actions.TaskAction;
import io.druid.indexing.common.actions.TaskActionClient;
import io.druid.indexing.common.actions.TaskActionClientFactory;
import io.druid.query.aggregation.AggregatorFactory;
import io.druid.query.aggregation.LongSumAggregatorFactory;
import io.druid.segment.IndexIO;
Expand Down Expand Up @@ -342,7 +341,8 @@ public void testConvertProps()
RealtimeTuningConfig realtimeTuningConfig = IndexTask.convertTuningConfig(
spec,
config.getRowFlushBoundary(),
config.getIndexSpec()
config.getIndexSpec(),
config.getBuildV9Directly()
);
Assert.assertEquals(realtimeTuningConfig.getMaxRowsInMemory(), config.getRowFlushBoundary());
Assert.assertEquals(realtimeTuningConfig.getShardSpec(), spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,20 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;

@RunWith(Parameterized.class)
public class RealtimeIndexTaskTest
{
private static final Logger log = new Logger(RealtimeIndexTaskTest.class);
Expand All @@ -143,10 +148,25 @@ public class RealtimeIndexTaskTest
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();

private final boolean buildV9Directly;

private DateTime now;
private ListeningExecutorService taskExec;
private Map<SegmentDescriptor, Pair<Executor, Runnable>> handOffCallbacks;
private SegmentHandoffNotifierFactory handoffNotifierFactory;

@Parameterized.Parameters(name = "buildV9Directly = {0}")
public static Collection<?> constructorFeeder() throws IOException
{
return ImmutableList.of(
new Object[]{true},
new Object[]{false}
);
}

public RealtimeIndexTaskTest(boolean buildV9Directly)
{
this.buildV9Directly = buildV9Directly;
}

@Before
public void setUp()
Expand Down Expand Up @@ -572,7 +592,7 @@ private RealtimeIndexTask makeRealtimeTask(final String taskId)
null,
null,
null,
null
buildV9Directly
);
return new RealtimeIndexTask(
taskId,
Expand Down Expand Up @@ -650,7 +670,7 @@ public void registerQuery(Query query, ListenableFuture future)
)
);
handOffCallbacks = Maps.newConcurrentMap();
handoffNotifierFactory = new SegmentHandoffNotifierFactory()
final SegmentHandoffNotifierFactory handoffNotifierFactory = new SegmentHandoffNotifierFactory()
{
@Override
public SegmentHandoffNotifier createSegmentHandoffNotifier(String dataSource)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.realtime.FireDepartmentMetrics;
Expand All @@ -54,6 +55,7 @@ public class FlushingPlumberSchool extends RealtimePlumberSchool
private final DataSegmentAnnouncer segmentAnnouncer;
private final ExecutorService queryExecutorService;
private final IndexMerger indexMerger;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
Expand All @@ -67,6 +69,7 @@ public FlushingPlumberSchool(
@JacksonInject DataSegmentAnnouncer segmentAnnouncer,
@JacksonInject @Processing ExecutorService queryExecutorService,
@JacksonInject IndexMerger indexMerger,
@JacksonInject IndexMergerV9 indexMergerV9,
@JacksonInject IndexIO indexIO,
@JacksonInject Cache cache,
@JacksonInject CacheConfig cacheConfig,
Expand All @@ -82,6 +85,7 @@ public FlushingPlumberSchool(
null,
queryExecutorService,
indexMerger,
indexMergerV9,
indexIO,
cache,
cacheConfig,
Expand All @@ -94,6 +98,7 @@ public FlushingPlumberSchool(
this.segmentAnnouncer = segmentAnnouncer;
this.queryExecutorService = queryExecutorService;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");
this.cache = cache;
this.cacheConfig = cacheConfig;
Expand All @@ -118,7 +123,7 @@ public Plumber findPlumber(
conglomerate,
segmentAnnouncer,
queryExecutorService,
indexMerger,
config.getBuildV9Directly() ? indexMergerV9 : indexMerger,
indexIO,
cache,
cacheConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import io.druid.query.QueryRunnerFactoryConglomerate;
import io.druid.segment.IndexIO;
import io.druid.segment.IndexMerger;
import io.druid.segment.IndexMergerV9;
import io.druid.segment.indexing.DataSchema;
import io.druid.segment.indexing.RealtimeTuningConfig;
import io.druid.segment.loading.DataSegmentPusher;
Expand All @@ -51,6 +52,7 @@ public class RealtimePlumberSchool implements PlumberSchool
private final SegmentHandoffNotifierFactory handoffNotifierFactory;
private final ExecutorService queryExecutorService;
private final IndexMerger indexMerger;
private final IndexMergerV9 indexMergerV9;
private final IndexIO indexIO;
private final Cache cache;
private final CacheConfig cacheConfig;
Expand All @@ -66,6 +68,7 @@ public RealtimePlumberSchool(
@JacksonInject SegmentHandoffNotifierFactory handoffNotifierFactory,
@JacksonInject @Processing ExecutorService executorService,
@JacksonInject IndexMerger indexMerger,
@JacksonInject IndexMergerV9 indexMergerV9,
@JacksonInject IndexIO indexIO,
@JacksonInject Cache cache,
@JacksonInject CacheConfig cacheConfig,
Expand All @@ -80,6 +83,7 @@ public RealtimePlumberSchool(
this.handoffNotifierFactory = handoffNotifierFactory;
this.queryExecutorService = executorService;
this.indexMerger = Preconditions.checkNotNull(indexMerger, "Null IndexMerger");
this.indexMergerV9 = Preconditions.checkNotNull(indexMergerV9, "Null IndexMergerV9");
this.indexIO = Preconditions.checkNotNull(indexIO, "Null IndexIO");

this.cache = cache;
Expand Down Expand Up @@ -107,7 +111,7 @@ public Plumber findPlumber(
dataSegmentPusher,
segmentPublisher,
handoffNotifierFactory.createSegmentHandoffNotifier(schema.getDataSource()),
indexMerger,
config.getBuildV9Directly() ? indexMergerV9 : indexMerger,
indexIO,
cache,
cacheConfig,
Expand Down
Loading

0 comments on commit df2906a

Please sign in to comment.