Skip to content

Commit

Permalink
Merge pull request apache#1403 from metamx/mergerMakerTests
Browse files Browse the repository at this point in the history
Improvements around resource handling in IndexMerger / IndexIO / QueryableIndex
  • Loading branch information
xvrl committed Jun 4, 2015
2 parents f1721fa + ed8eb5c commit 395ba79
Show file tree
Hide file tree
Showing 9 changed files with 1,293 additions and 72 deletions.
12 changes: 8 additions & 4 deletions processing/src/main/java/io/druid/segment/IndexIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -466,10 +466,14 @@ public MMappedIndex mapDir(File inDir) throws IOException

public static void validateTwoSegments(File dir1, File dir2) throws IOException
{
validateTwoSegments(
new QueryableIndexIndexableAdapter(loadIndex(dir1)),
new QueryableIndexIndexableAdapter(loadIndex(dir2))
);
try(QueryableIndex queryableIndex1 = loadIndex(dir1)) {
try(QueryableIndex queryableIndex2 = loadIndex(dir2)) {
validateTwoSegments(
new QueryableIndexIndexableAdapter(queryableIndex1),
new QueryableIndexIndexableAdapter(queryableIndex2)
);
}
}
}

public static void validateTwoSegments(final IndexableAdapter adapter1, final IndexableAdapter adapter2)
Expand Down
42 changes: 24 additions & 18 deletions processing/src/main/java/io/druid/segment/IndexMaker.java
Original file line number Diff line number Diff line change
Expand Up @@ -333,24 +333,26 @@ public static File convert(
final File inDir, final File outDir, final IndexSpec indexSpec, final ProgressIndicator progress
) throws IOException
{
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(IndexIO.loadIndex(inDir));
return makeIndexFiles(
ImmutableList.of(adapter),
outDir,
progress,
Lists.newArrayList(adapter.getDimensionNames()),
Lists.newArrayList(adapter.getMetricNames()),
new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
{
@Nullable
@Override
public Iterable<Rowboat> apply(ArrayList<Iterable<Rowboat>> input)
try (QueryableIndex index = IndexIO.loadIndex(inDir)) {
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index);
return makeIndexFiles(
ImmutableList.of(adapter),
outDir,
progress,
Lists.newArrayList(adapter.getDimensionNames()),
Lists.newArrayList(adapter.getMetricNames()),
new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
{
return input.get(0);
}
},
indexSpec
);
@Nullable
@Override
public Iterable<Rowboat> apply(ArrayList<Iterable<Rowboat>> input)
{
return input.get(0);
}
},
indexSpec
);
}
}


Expand Down Expand Up @@ -842,6 +844,7 @@ private static class NullsAtZeroConvertingIntList extends AbstractList<Integer>
{
private final List<Integer> delegate;
private final boolean delegateHasNullAtZero;

NullsAtZeroConvertingIntList(List<Integer> delegate, final boolean delegateHasNullAtZero)
{
this.delegate = delegate;
Expand Down Expand Up @@ -961,7 +964,10 @@ public VSizeIndexedInts apply(final List<Integer> input)
if (input == null) {
return VSizeIndexedInts.fromList(ImmutableList.<Integer>of(0), dictionarySize);
} else {
return VSizeIndexedInts.fromList(new NullsAtZeroConvertingIntList(input, false), dictionarySize);
return VSizeIndexedInts.fromList(
new NullsAtZeroConvertingIntList(input, false),
dictionarySize
);
}
}
}
Expand Down
33 changes: 32 additions & 1 deletion processing/src/main/java/io/druid/segment/IndexMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.base.Function;
import com.google.common.base.Objects;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
Expand Down Expand Up @@ -337,6 +336,38 @@ public Iterable<Rowboat> apply(
return makeIndexFiles(indexes, outDir, progress, mergedDimensions, mergedMetrics, rowMergerFn, indexSpec);
}

// Faster than IndexMaker
public static File convert(final File inDir, final File outDir, final IndexSpec indexSpec) throws IOException
{
return convert(inDir, outDir, indexSpec, new BaseProgressIndicator());
}

public static File convert(
final File inDir, final File outDir, final IndexSpec indexSpec, final ProgressIndicator progress
) throws IOException
{
try (QueryableIndex index = IndexIO.loadIndex(inDir)) {
final IndexableAdapter adapter = new QueryableIndexIndexableAdapter(index);
return makeIndexFiles(
ImmutableList.of(adapter),
outDir,
progress,
Lists.newArrayList(adapter.getDimensionNames()),
Lists.newArrayList(adapter.getMetricNames()),
new Function<ArrayList<Iterable<Rowboat>>, Iterable<Rowboat>>()
{
@Nullable
@Override
public Iterable<Rowboat> apply(ArrayList<Iterable<Rowboat>> input)
{
return input.get(0);
}
},
indexSpec
);
}
}

public static File append(
List<IndexableAdapter> indexes, File outDir, IndexSpec indexSpec
) throws IOException
Expand Down
5 changes: 3 additions & 2 deletions processing/src/main/java/io/druid/segment/QueryableIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@
import io.druid.segment.data.Indexed;
import org.joda.time.Interval;

import java.io.Closeable;
import java.io.IOException;

/**
*/
public interface QueryableIndex extends ColumnSelector
public interface QueryableIndex extends ColumnSelector, Closeable
{
public Interval getDataInterval();
public int getNumRows();
Expand All @@ -37,6 +38,6 @@ public interface QueryableIndex extends ColumnSelector
* The close method shouldn't actually be here as this is nasty. We will adjust it in the future.
* @throws java.io.IOException if an exception was thrown closing the index
*/
@Deprecated
//@Deprecated // This is still required for SimpleQueryableIndex. It should not go away unitl SimpleQueryableIndex is fixed
public void close() throws IOException;
}
103 changes: 103 additions & 0 deletions processing/src/test/java/io/druid/segment/CloserRule.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Licensed to Metamarkets Group Inc. (Metamarkets) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Metamarkets licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package io.druid.segment;

import com.metamx.common.logger.Logger;
import org.junit.rules.TestRule;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;

import java.io.Closeable;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;

public class CloserRule implements TestRule
{
private final boolean throwException;

public CloserRule(boolean throwException)
{
this.throwException = throwException;
}

private static final Logger log = new Logger(CloserRule.class);
private final List<AutoCloseable> autoCloseables = new LinkedList<>();

@Override
public Statement apply(
final Statement base, Description description
)
{
return new Statement()
{
@Override
public void evaluate() throws Throwable
{
Throwable baseThrown = null;
try {
base.evaluate();
}
catch (Throwable e) {
baseThrown = e;
}
finally {
Throwable exception = null;
for (AutoCloseable autoCloseable : autoCloseables) {
try {
autoCloseable.close();
}
catch (Exception e) {
exception = suppressOrSet(exception, e);
}
}
autoCloseables.clear();
if (exception != null) {
if (throwException && baseThrown == null) {
throw exception;
} else if (baseThrown != null) {
baseThrown.addSuppressed(exception);
} else {
log.error(exception, "Exception closing resources");
}
}
if (baseThrown != null) {
throw baseThrown;
}
}
}
};
}

private static Throwable suppressOrSet(Throwable prior, Throwable other)
{
if (prior == null) {
prior = new IOException("Error closing resources");
}
prior.addSuppressed(other);
return prior;
}

public <T extends AutoCloseable> T closeLater(T autoCloseable)
{
autoCloseables.add(autoCloseable);
return autoCloseable;
}
}
Loading

0 comments on commit 395ba79

Please sign in to comment.