Skip to content

Commit

Permalink
Add timestampSpec to metadata.drd and SegmentMetadataQuery (apache#3227)
Browse files Browse the repository at this point in the history
* save TimestampSpec in metadata.drd

* add timestampSpec info in SegmentMetadataQuery
  • Loading branch information
kaijianding authored and fjy committed Jul 25, 2016
1 parent d5ed3f1 commit 3dc2974
Show file tree
Hide file tree
Showing 18 changed files with 229 additions and 20 deletions.
22 changes: 22 additions & 0 deletions api/src/main/java/io/druid/data/input/impl/TimestampSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import com.metamx.common.parsers.TimestampParser;
import org.joda.time.DateTime;

import java.util.List;
import java.util.Map;
import java.util.Objects;

/**
*/
Expand Down Expand Up @@ -130,4 +132,24 @@ public int hashCode()
result = 31 * result + (missingValue != null ? missingValue.hashCode() : 0);
return result;
}

//simple merge strategy on timestampSpec that checks if all are equal or else
//returns null. this can be improved in future but is good enough for most use-cases.
public static TimestampSpec mergeTimestampSpec(List<TimestampSpec> toMerge) {
if (toMerge == null || toMerge.size() == 0) {
return null;
}

TimestampSpec result = toMerge.get(0);
for (int i = 1; i < toMerge.size(); i++) {
if (toMerge.get(i) == null) {
continue;
}
if (!Objects.equals(result, toMerge.get(i))) {
return null;
}
}

return result;
}
}
4 changes: 4 additions & 0 deletions docs/content/querying/segmentmetadataquery.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ dimension columns.

* `intervals` in the result will contain the list of intervals associated with the queried segments.

#### timestampSpec

* `timestampSpec` in the result will contain timestampSpec of data stored in segments. this can be null if timestampSpec of segments was unknown or unmergeable (if merging is enabled).

#### queryGranularity

* `queryGranularity` in the result will contain query granularity of data stored in segments. this can be null if query granularity of segments was unknown or unmergeable (if merging is enabled).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ private static IncrementalIndex makeIncrementalIndex(
final HadoopTuningConfig tuningConfig = config.getSchema().getTuningConfig();
final IncrementalIndexSchema indexSchema = new IncrementalIndexSchema.Builder()
.withMinTimestamp(theBucket.time.getMillis())
.withTimestampSpec(config.getSchema().getDataSchema().getParser().getParseSpec().getTimestampSpec())
.withDimensionsSpec(config.getSchema().getDataSchema().getParser())
.withQueryGranularity(config.getSchema().getDataSchema().getGranularitySpec().getQueryGranularity())
.withMetrics(aggs)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import com.metamx.emitter.service.ServiceMetricEvent;
import io.druid.common.guava.CombiningSequence;
import io.druid.common.utils.JodaUtils;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.CacheStrategy;
import io.druid.query.DruidMetrics;
Expand Down Expand Up @@ -332,6 +333,13 @@ public static SegmentAnalysis mergeAnalyses(
}
}

final TimestampSpec timestampSpec = TimestampSpec.mergeTimestampSpec(
Lists.newArrayList(
arg1.getTimestampSpec(),
arg2.getTimestampSpec()
)
);

final QueryGranularity queryGranularity = QueryGranularity.mergeQueryGranularities(
Lists.newArrayList(
arg1.getQueryGranularity(),
Expand All @@ -354,6 +362,7 @@ public static SegmentAnalysis mergeAnalyses(
arg1.getSize() + arg2.getSize(),
arg1.getNumRows() + arg2.getNumRows(),
aggregators.isEmpty() ? null : aggregators,
timestampSpec,
queryGranularity
);
}
Expand All @@ -368,6 +377,7 @@ public static SegmentAnalysis finalizeAnalysis(SegmentAnalysis analysis)
analysis.getSize(),
analysis.getNumRows(),
analysis.getAggregators(),
analysis.getTimestampSpec(),
analysis.getQueryGranularity()
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.metamx.common.guava.Sequence;
import com.metamx.common.guava.Sequences;
import com.metamx.common.logger.Logger;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.AbstractPrioritizedCallable;
import io.druid.query.BaseQuery;
Expand Down Expand Up @@ -127,6 +128,16 @@ public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Obj
aggregators = null;
}

final TimestampSpec timestampSpec;
if (query.hasTimestampSpec()) {
if (metadata == null) {
metadata = segment.asStorageAdapter().getMetadata();
}
timestampSpec = metadata != null ? metadata.getTimestampSpec() : null;
} else {
timestampSpec = null;
}

final QueryGranularity queryGranularity;
if (query.hasQueryGranularity()) {
if (metadata == null) {
Expand All @@ -146,6 +157,7 @@ public Sequence<SegmentAnalysis> run(Query<SegmentAnalysis> inQ, Map<String, Obj
totalSize,
numRows,
aggregators,
timestampSpec,
queryGranularity
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;
import org.joda.time.Interval;
Expand All @@ -37,6 +38,7 @@ public class SegmentAnalysis implements Comparable<SegmentAnalysis>
private final long size;
private final long numRows;
private final Map<String, AggregatorFactory> aggregators;
private final TimestampSpec timestampSpec;
private final QueryGranularity queryGranularity;

@JsonCreator
Expand All @@ -47,6 +49,7 @@ public SegmentAnalysis(
@JsonProperty("size") long size,
@JsonProperty("numRows") long numRows,
@JsonProperty("aggregators") Map<String, AggregatorFactory> aggregators,
@JsonProperty("timestampSpec") TimestampSpec timestampSpec,
@JsonProperty("queryGranularity") QueryGranularity queryGranularity
)
{
Expand All @@ -56,6 +59,7 @@ public SegmentAnalysis(
this.size = size;
this.numRows = numRows;
this.aggregators = aggregators;
this.timestampSpec = timestampSpec;
this.queryGranularity = queryGranularity;
}

Expand Down Expand Up @@ -89,6 +93,12 @@ public long getNumRows()
return numRows;
}

@JsonProperty
public TimestampSpec getTimestampSpec()
{
return timestampSpec;
}

@JsonProperty
public QueryGranularity getQueryGranularity()
{
Expand All @@ -111,6 +121,7 @@ public String toString()
", size=" + size +
", numRows=" + numRows +
", aggregators=" + aggregators +
", timestampSpec=" + timestampSpec +
", queryGranularity=" + queryGranularity +
'}';
}
Expand All @@ -134,6 +145,7 @@ public boolean equals(Object o)
Objects.equals(interval, that.interval) &&
Objects.equals(columns, that.columns) &&
Objects.equals(aggregators, that.aggregators) &&
Objects.equals(timestampSpec, that.timestampSpec) &&
Objects.equals(queryGranularity, that.queryGranularity);
}

Expand All @@ -144,7 +156,7 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
return Objects.hash(id, interval, columns, size, numRows, aggregators, queryGranularity);
return Objects.hash(id, interval, columns, size, numRows, aggregators, timestampSpec, queryGranularity);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public enum AnalysisType
INTERVAL,
AGGREGATORS,
MINMAX,
TIMESTAMPSPEC,
QUERYGRANULARITY;

@JsonValue
Expand Down Expand Up @@ -188,6 +189,11 @@ public boolean hasAggregators()
return analysisTypes.contains(AnalysisType.AGGREGATORS);
}

public boolean hasTimestampSpec()
{
return analysisTypes.contains(AnalysisType.TIMESTAMPSPEC);
}

public boolean hasQueryGranularity()
{
return analysisTypes.contains(AnalysisType.QUERYGRANULARITY);
Expand Down
39 changes: 34 additions & 5 deletions processing/src/main/java/io/druid/segment/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package io.druid.segment;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.druid.data.input.impl.TimestampSpec;
import io.druid.granularity.QueryGranularity;
import io.druid.query.aggregation.AggregatorFactory;

Expand All @@ -42,6 +43,9 @@ public class Metadata
@JsonProperty
private AggregatorFactory[] aggregators;

@JsonProperty
private TimestampSpec timestampSpec;

@JsonProperty
private QueryGranularity queryGranularity;

Expand All @@ -61,6 +65,17 @@ public Metadata setAggregators(AggregatorFactory[] aggregators)
return this;
}

public TimestampSpec getTimestampSpec()
{
return timestampSpec;
}

public Metadata setTimestampSpec(TimestampSpec timestampSpec)
{
this.timestampSpec = timestampSpec;
return this;
}

public QueryGranularity getQueryGranularity()
{
return queryGranularity;
Expand Down Expand Up @@ -111,6 +126,7 @@ public static Metadata merge(
? new ArrayList<AggregatorFactory[]>()
: null;

List<TimestampSpec> timestampSpecsToMerge = new ArrayList<>();
List<QueryGranularity> gransToMerge = new ArrayList<>();

for (Metadata metadata : toBeMerged) {
Expand All @@ -120,6 +136,10 @@ public static Metadata merge(
aggregatorsToMerge.add(metadata.getAggregators());
}

if (timestampSpecsToMerge != null && metadata.getTimestampSpec() != null) {
timestampSpecsToMerge.add(metadata.getTimestampSpec());
}

if (gransToMerge != null) {
gransToMerge.add(metadata.getQueryGranularity());
}
Expand All @@ -128,6 +148,7 @@ public static Metadata merge(
//if metadata and hence aggregators and queryGranularity for some segment being merged are unknown then
//final merged segment should not have same in metadata
aggregatorsToMerge = null;
timestampSpecsToMerge = null;
gransToMerge = null;
}
}
Expand All @@ -143,6 +164,10 @@ public static Metadata merge(
result.setAggregators(overrideMergedAggregators);
}

if (timestampSpecsToMerge != null) {
result.setTimestampSpec(TimestampSpec.mergeTimestampSpec(timestampSpecsToMerge));
}

if (gransToMerge != null) {
result.setQueryGranularity(QueryGranularity.mergeQueryGranularities(gransToMerge));
}
Expand Down Expand Up @@ -171,17 +196,21 @@ public boolean equals(Object o)
if (!Arrays.equals(aggregators, metadata.aggregators)) {
return false;
}
return !(queryGranularity != null
? !queryGranularity.equals(metadata.queryGranularity)
: metadata.queryGranularity != null);
if (timestampSpec != null ? !timestampSpec.equals(metadata.timestampSpec) : metadata.timestampSpec != null) {
return false;
}
return queryGranularity != null
? queryGranularity.equals(metadata.queryGranularity)
: metadata.queryGranularity == null;

}

@Override
public int hashCode()
{
int result = container.hashCode();
result = 31 * result + (aggregators != null ? Arrays.hashCode(aggregators) : 0);
result = 31 * result + Arrays.hashCode(aggregators);
result = 31 * result + (timestampSpec != null ? timestampSpec.hashCode() : 0);
result = 31 * result + (queryGranularity != null ? queryGranularity.hashCode() : 0);
return result;
}
Expand All @@ -190,9 +219,9 @@ public int hashCode()
public String toString()
{
return "Metadata{" +

"container=" + container +
", aggregators=" + Arrays.toString(aggregators) +
", timestampSpec=" + timestampSpec +
", queryGranularity=" + queryGranularity +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,6 +410,7 @@ public IncrementalIndex(

this.metadata = new Metadata()
.setAggregators(getCombiningAggregators(metrics))
.setTimestampSpec(incrementalIndexSchema.getTimestampSpec())
.setQueryGranularity(this.gran);

this.aggs = initAggs(metrics, rowSupplier, deserializeComplexMetrics);
Expand Down
Loading

0 comments on commit 3dc2974

Please sign in to comment.