Skip to content

Commit

Permalink
Allow SegmentAnalyzer to read columns from StorageAdapter, allow Segm…
Browse files Browse the repository at this point in the history
…entMetadataQuery to query IncrementalIndexSegments on realtime node
  • Loading branch information
jon-wei committed Sep 17, 2015
1 parent 193fb4f commit 367c50d
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,26 @@

import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.primitives.Longs;
import com.metamx.common.logger.Logger;
import com.metamx.common.StringUtils;
import io.druid.query.metadata.metadata.ColumnAnalysis;
import io.druid.segment.QueryableIndex;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.BitmapIndex;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.column.ComplexColumn;
import io.druid.segment.column.ValueType;
import io.druid.segment.data.Indexed;
import io.druid.segment.serde.ComplexMetricSerde;
import io.druid.segment.serde.ComplexMetrics;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class SegmentAnalyzer
Expand Down Expand Up @@ -61,7 +67,7 @@ public Map<String, ColumnAnalysis> analyze(QueryableIndex index)

final ColumnAnalysis analysis;
final ValueType type = capabilities.getType();
switch(type) {
switch (type) {
case LONG:
analysis = analyzeLongColumn(column);
break;
Expand All @@ -82,7 +88,55 @@ public Map<String, ColumnAnalysis> analyze(QueryableIndex index)
columns.put(columnName, analysis);
}

columns.put(Column.TIME_COLUMN_NAME, lengthBasedAnalysis(index.getColumn(Column.TIME_COLUMN_NAME), NUM_BYTES_IN_TIMESTAMP));
columns.put(
Column.TIME_COLUMN_NAME,
lengthBasedAnalysis(index.getColumn(Column.TIME_COLUMN_NAME), NUM_BYTES_IN_TIMESTAMP)
);

return columns;
}

public Map<String, ColumnAnalysis> analyze(StorageAdapter adapter)
{
Preconditions.checkNotNull(adapter, "Adapter cannot be null");
Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
List<String> columnNames = getStorageAdapterColumnNames(adapter);

int numRows = adapter.getNumRows();
for (String columnName : columnNames) {
final ColumnCapabilities capabilities = adapter.getColumnCapabilities(columnName);
final ColumnAnalysis analysis;

/**
* StorageAdapter doesn't provide a way to get column values, so size is
* not calculated for STRING and COMPLEX columns.
*/
ValueType capType = capabilities.getType();
switch (capType) {
case LONG:
analysis = lengthBasedAnalysisForAdapter(capType.name(), capabilities, numRows, Longs.BYTES);
break;
case FLOAT:
analysis = lengthBasedAnalysisForAdapter(capType.name(), capabilities, numRows, NUM_BYTES_IN_TEXT_FLOAT);
break;
case STRING:
analysis = new ColumnAnalysis(capType.name(), 0, adapter.getDimensionCardinality(columnName), null);
break;
case COMPLEX:
analysis = new ColumnAnalysis(capType.name(), 0, null, null);
break;
default:
log.warn("Unknown column type[%s].", capType);
analysis = ColumnAnalysis.error(String.format("unknown_type_%s", capType));
}

columns.put(columnName, analysis);
}

columns.put(
Column.TIME_COLUMN_NAME,
lengthBasedAnalysisForAdapter(ValueType.LONG.name(), null, numRows, NUM_BYTES_IN_TIMESTAMP)
);

return columns;
}
Expand Down Expand Up @@ -154,4 +208,26 @@ public ColumnAnalysis analyzeComplexColumn(Column column)

return new ColumnAnalysis(typeName, size, null, null);
}

private List<String> getStorageAdapterColumnNames(StorageAdapter adapter)
{
Indexed<String> dims = adapter.getAvailableDimensions();
Iterable<String> metrics = adapter.getAvailableMetrics();
Iterable<String> columnNames = Iterables.concat(dims, metrics);
List<String> sortedColumnNames = Lists.newArrayList(columnNames);
Collections.sort(sortedColumnNames);
return sortedColumnNames;
}

private ColumnAnalysis lengthBasedAnalysisForAdapter(
String type, ColumnCapabilities capabilities,
int numRows, final int numBytes
)
{
if (capabilities != null && capabilities.hasMultipleValues()) {
return ColumnAnalysis.error("multi_value");
}
return new ColumnAnalysis(type, numRows * numBytes, null, null);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,6 @@ public SegmentAnalysis apply(SegmentAnalysis arg1, SegmentAnalysis arg2)
return arg1;
}

if (!query.isMerge()) {
throw new ISE("Merging when a merge isn't supposed to happen[%s], [%s]", arg1, arg2);
}

List<Interval> newIntervals = JodaUtils.condenseIntervals(
Iterables.concat(arg1.getIntervals(), arg2.getIntervals())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,17 @@ public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Obj
SegmentMetadataQuery query = (SegmentMetadataQuery) inQ;

final QueryableIndex index = segment.asQueryableIndex();
final Map<String, ColumnAnalysis> analyzedColumns;
long totalSize = 0;
if (index == null) {
return Sequences.empty();
// IncrementalIndexSegments (used by in-memory hydrants in the realtime service) do not have a QueryableIndex
analyzedColumns = analyzer.analyze(segment.asStorageAdapter());
} else {
analyzedColumns = analyzer.analyze(index);
// Initialize with the size of the whitespace, 1 byte per
totalSize = analyzedColumns.size() * index.getNumRows();
}

final Map<String, ColumnAnalysis> analyzedColumns = analyzer.analyze(index);

// Initialize with the size of the whitespace, 1 byte per
long totalSize = analyzedColumns.size() * index.getNumRows();

Map<String, ColumnAnalysis> columns = Maps.newTreeMap();
ColumnIncluderator includerator = query.getToInclude();
for (Map.Entry<String, ColumnAnalysis> entry : analyzedColumns.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,12 @@ public int getDimensionCardinality(String dimension)
return column.getDictionaryEncoding().getCardinality();
}

@Override
public int getNumRows()
{
return index.getNumRows();
}

@Override
public DateTime getMinTime()
{
Expand Down Expand Up @@ -136,6 +142,12 @@ public Capabilities getCapabilities()
return Capabilities.builder().dimensionValuesSorted(true).build();
}

@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return index.getColumn(column).getCapabilities();
}

@Override
public DateTime getMaxIngestedEventTime()
{
Expand Down Expand Up @@ -275,7 +287,10 @@ public void reset()
}

@Override
public DimensionSelector makeDimensionSelector(final String dimension, @Nullable final ExtractionFn extractionFn)
public DimensionSelector makeDimensionSelector(
final String dimension,
@Nullable final ExtractionFn extractionFn
)
{
final Column columnDesc = index.getColumn(dimension);
if (columnDesc == null) {
Expand All @@ -296,8 +311,7 @@ public DimensionSelector makeDimensionSelector(final String dimension, @Nullable

if (column == null) {
return NULL_DIMENSION_SELECTOR;
}
else if (columnDesc.getCapabilities().hasMultipleValues()) {
} else if (columnDesc.getCapabilities().hasMultipleValues()) {
return new DimensionSelector()
{
@Override
Expand Down Expand Up @@ -325,7 +339,9 @@ public String lookupName(int id)
public int lookupId(String name)
{
if (extractionFn != null) {
throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function");
throw new UnsupportedOperationException(
"cannot perform lookup when applying an extraction function"
);
}
return column.lookupId(name);
}
Expand Down Expand Up @@ -388,7 +404,9 @@ public String lookupName(int id)
public int lookupId(String name)
{
if (extractionFn != null) {
throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function");
throw new UnsupportedOperationException(
"cannot perform lookup when applying an extraction function"
);
}
return column.lookupId(name);
}
Expand Down
3 changes: 3 additions & 0 deletions processing/src/main/java/io/druid/segment/StorageAdapter.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.druid.segment;

import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.Indexed;
import org.joda.time.DateTime;
import org.joda.time.Interval;
Expand All @@ -42,5 +43,7 @@ public interface StorageAdapter extends CursorFactory
public DateTime getMinTime();
public DateTime getMaxTime();
public Capabilities getCapabilities();
public ColumnCapabilities getColumnCapabilities(String column);
public int getNumRows();
public DateTime getMaxIngestedEventTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import io.druid.segment.SingleScanTimeDimSelector;
import io.druid.segment.StorageAdapter;
import io.druid.segment.column.Column;
import io.druid.segment.column.ColumnCapabilities;
import io.druid.segment.data.Indexed;
import io.druid.segment.data.IndexedInts;
import io.druid.segment.data.ListIndexed;
Expand Down Expand Up @@ -102,7 +103,7 @@ public Iterable<String> getAvailableMetrics()
@Override
public int getDimensionCardinality(String dimension)
{
if(dimension.equals(Column.TIME_COLUMN_NAME)) {
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
return Integer.MAX_VALUE;
}
IncrementalIndex.DimDim dimDim = index.getDimension(dimension);
Expand All @@ -112,6 +113,12 @@ public int getDimensionCardinality(String dimension)
return dimDim.size();
}

@Override
public int getNumRows()
{
return index.size();
}

@Override
public DateTime getMinTime()
{
Expand All @@ -130,6 +137,12 @@ public Capabilities getCapabilities()
return Capabilities.builder().dimensionValuesSorted(false).build();
}

@Override
public ColumnCapabilities getColumnCapabilities(String column)
{
return index.getCapabilities(column);
}

@Override
public DateTime getMaxIngestedEventTime()
{
Expand Down Expand Up @@ -278,7 +291,10 @@ public void reset()
}

@Override
public DimensionSelector makeDimensionSelector(final String dimension, @Nullable final ExtractionFn extractionFn)
public DimensionSelector makeDimensionSelector(
final String dimension,
@Nullable final ExtractionFn extractionFn
)
{
if (dimension.equals(Column.TIME_COLUMN_NAME)) {
return new SingleScanTimeDimSelector(makeLongColumnSelector(dimension), extractionFn);
Expand Down Expand Up @@ -310,7 +326,7 @@ public IndexedInts getRow()
}
}
// check for null entry
if(vals.isEmpty() && dimValLookup.contains(null)){
if (vals.isEmpty() && dimValLookup.contains(null)) {
int id = dimValLookup.getId(null);
if (id < maxId) {
vals.add(id);
Expand Down Expand Up @@ -369,7 +385,9 @@ public String lookupName(int id)
public int lookupId(String name)
{
if (extractionFn != null) {
throw new UnsupportedOperationException("cannot perform lookup when applying an extraction function");
throw new UnsupportedOperationException(
"cannot perform lookup when applying an extraction function"
);
}
return dimValLookup.getId(name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,38 @@
public class SegmentAnalyzerTest
{
@Test
public void testIncrementalDoesNotWork() throws Exception
public void testIncrementalWorks() throws Exception
{
final List<SegmentAnalysis> results = getSegmentAnalysises(
new IncrementalIndexSegment(TestIndex.getIncrementalTestIndex(false), null)
);

Assert.assertEquals(0, results.size());
Assert.assertEquals(1, results.size());

final SegmentAnalysis analysis = results.get(0);
Assert.assertEquals(null, analysis.getId());

final Map<String, ColumnAnalysis> columns = analysis.getColumns();

Assert.assertEquals(
TestIndex.COLUMNS.length,
columns.size()
); // All columns including time and empty/null column

for (String dimension : TestIndex.DIMENSIONS) {
final ColumnAnalysis columnAnalysis = columns.get(dimension);

Assert.assertEquals(dimension, ValueType.STRING.name(), columnAnalysis.getType());
Assert.assertTrue(dimension, columnAnalysis.getCardinality() > 0);
}

for (String metric : TestIndex.METRICS) {
final ColumnAnalysis columnAnalysis = columns.get(metric);

Assert.assertEquals(metric, ValueType.FLOAT.name(), columnAnalysis.getType());
Assert.assertTrue(metric, columnAnalysis.getSize() > 0);
Assert.assertNull(metric, columnAnalysis.getCardinality());
}
}

@Test
Expand All @@ -66,7 +91,10 @@ public void testMappedWorks() throws Exception
Assert.assertEquals("test_1", analysis.getId());

final Map<String, ColumnAnalysis> columns = analysis.getColumns();
Assert.assertEquals(TestIndex.COLUMNS.length -1, columns.size()); // All columns including time and excluding empty/null column
Assert.assertEquals(
TestIndex.COLUMNS.length - 1,
columns.size()
); // All columns including time and excluding empty/null column

for (String dimension : TestIndex.DIMENSIONS) {
final ColumnAnalysis columnAnalysis = columns.get(dimension);
Expand Down Expand Up @@ -107,7 +135,7 @@ private List<SegmentAnalysis> getSegmentAnalysises(Segment index)
final SegmentMetadataQuery query = new SegmentMetadataQuery(
new LegacyDataSource("test"), QuerySegmentSpecs.create("2011/2012"), null, null, null, false
);
HashMap<String,Object> context = new HashMap<String, Object>();
HashMap<String, Object> context = new HashMap<String, Object>();
return Sequences.toList(query.run(runner, context), Lists.<SegmentAnalysis>newArrayList());
}
}

0 comments on commit 367c50d

Please sign in to comment.