Skip to content

Commit

Permalink
introducing lists of existing columns in the fields of select queries…
Browse files Browse the repository at this point in the history
…' output (apache#2491)

* introducing lists of existing columns in the fields of select queries' output

* rebase master

* address the comment. add test code for select query caching

* change the cache code in SelectQueryQueryToolChest to 0x16
  • Loading branch information
jaehc authored and nishantmonu51 committed Aug 25, 2016
1 parent d624037 commit 2e0f253
Show file tree
Hide file tree
Showing 8 changed files with 223 additions and 15 deletions.
24 changes: 24 additions & 0 deletions processing/src/main/java/io/druid/query/select/SelectBinaryFn.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,14 @@

package io.druid.query.select;

import com.google.common.collect.Sets;
import com.metamx.common.guava.nary.BinaryFn;
import io.druid.granularity.AllGranularity;
import io.druid.granularity.QueryGranularity;
import io.druid.query.Result;

import java.util.Set;

import org.joda.time.DateTime;

import java.util.List;
Expand Down Expand Up @@ -77,6 +81,9 @@ public Result<SelectResultValue> apply(

SelectResultValueBuilder builder = new SelectResultValueBuilder.MergeBuilder(timestamp, pagingSpec, descending);

builder.addDimensions(mergeColumns(arg1.getValue().getDimensions(), arg2.getValue().getDimensions()));
builder.addMetrics(mergeColumns(arg1.getValue().getMetrics(), arg2.getValue().getMetrics()));

for (EventHolder event : arg1Val) {
builder.addEntry(event);
}
Expand All @@ -87,4 +94,21 @@ public Result<SelectResultValue> apply(

return builder.build();
}

private Set<String> mergeColumns(final Set<String> arg1, final Set<String> arg2)
{
if (arg1.isEmpty()) {
return arg2;
}

if (arg2.isEmpty()) {
return arg1;
}

if (arg1.equals(arg2)) {
return arg1;
}

return Sets.union(arg1, arg2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -108,12 +108,14 @@ public Result<SelectResultValue> apply(Cursor cursor)
for (DimensionSpec dim : dims) {
final DimensionSelector dimSelector = cursor.makeDimensionSelector(dim);
dimSelectors.put(dim.getOutputName(), dimSelector);
builder.addDimension(dim.getOutputName());
}

final Map<String, ObjectColumnSelector> metSelectors = Maps.newHashMap();
for (String metric : metrics) {
final ObjectColumnSelector metricSelector = cursor.makeObjectColumnSelector(metric);
metSelectors.put(metric, metricSelector);
builder.addMetric(metric);
}

final PagingOffset offset = query.getPagingOffset(segmentId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
*/
public class SelectQueryQueryToolChest extends QueryToolChest<Result<SelectResultValue>, SelectQuery>
{
private static final byte SELECT_QUERY = 0x13;
private static final byte SELECT_QUERY = 0x16;
private static final TypeReference<Object> OBJECT_TYPE_REFERENCE =
new TypeReference<Object>()
{
Expand Down Expand Up @@ -220,6 +220,8 @@ public Object apply(final Result<SelectResultValue> input)
return Arrays.asList(
input.getTimestamp().getMillis(),
input.getValue().getPagingIdentifiers(),
input.getValue().getDimensions(),
input.getValue().getMetrics(),
input.getValue().getEvents()
);
}
Expand Down Expand Up @@ -249,6 +251,16 @@ public Result<SelectResultValue> apply(Object input)
{
}
),
(Set<String>) jsonMapper.convertValue(
resultIter.next(), new TypeReference<Set<String>>()
{
}
),
(Set<String>) jsonMapper.convertValue(
resultIter.next(), new TypeReference<Set<String>>()
{
}
),
(List<EventHolder>) jsonMapper.convertValue(
resultIter.next(), new TypeReference<List<EventHolder>>()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,28 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
*/
public class SelectResultValue implements Iterable<EventHolder>
{
private final Map<String, Integer> pagingIdentifiers;
private final Set<String> dimensions;
private final Set<String> metrics;
private final List<EventHolder> events;

@JsonCreator
public SelectResultValue(
@JsonProperty("pagingIdentifiers") Map<String, Integer> pagingIdentifiers,
@JsonProperty("dimensions") Set<String> dimensions,
@JsonProperty("metrics") Set<String> metrics,
@JsonProperty("events") List<EventHolder> events
)
{
this.pagingIdentifiers = pagingIdentifiers;
this.dimensions = dimensions;
this.metrics = metrics;
this.events = events;
}

Expand All @@ -49,6 +56,18 @@ public Map<String, Integer> getPagingIdentifiers()
return pagingIdentifiers;
}

@JsonProperty
public Set<String> getDimensions()
{
return dimensions;
}

@JsonProperty
public Set<String> getMetrics()
{
return metrics;
}

@JsonProperty
public List<EventHolder> getEvents()
{
Expand Down Expand Up @@ -76,6 +95,15 @@ public boolean equals(Object o)
if (events != null ? !events.equals(that.events) : that.events != null) {
return false;
}

if (dimensions != null ? !dimensions.equals(that.dimensions) : that.dimensions != null) {
return false;
}

if (metrics != null ? !metrics.equals(that.metrics) : that.metrics != null) {
return false;
}

if (pagingIdentifiers != null
? !pagingIdentifiers.equals(that.pagingIdentifiers)
: that.pagingIdentifiers != null) {
Expand All @@ -89,6 +117,8 @@ public boolean equals(Object o)
public int hashCode()
{
int result = pagingIdentifiers != null ? pagingIdentifiers.hashCode() : 0;
result = 31 * result + (dimensions != null ? dimensions.hashCode() : 0);
result = 31 * result + (metrics != null ? metrics.hashCode() : 0);
result = 31 * result + (events != null ? events.hashCode() : 0);
return result;
}
Expand All @@ -98,6 +128,8 @@ public String toString()
{
return "SelectResultValue{" +
"pagingIdentifiers=" + pagingIdentifiers +
", dimensions=" + dimensions +
", metrics=" + metrics +
", events=" + events +
'}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Maps;
import com.google.common.collect.MinMaxPriorityQueue;
import com.google.common.collect.Queues;
import com.google.common.collect.Sets;
import com.google.common.primitives.Longs;
import com.metamx.common.guava.Comparators;
import io.druid.query.Result;
Expand All @@ -32,6 +33,7 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;

/**
*/
Expand Down Expand Up @@ -59,6 +61,8 @@ public int compare(EventHolder o1, EventHolder o2)
protected final DateTime timestamp;
protected final PagingSpec pagingSpec;
protected final boolean descending;
protected Set<String> dimensions;
protected Set<String> metrics;

protected final Queue<EventHolder> pQueue;
protected final Map<String, Integer> pagingIdentifiers;
Expand All @@ -68,6 +72,8 @@ public SelectResultValueBuilder(DateTime timestamp, PagingSpec pagingSpec, boole
this.timestamp = timestamp;
this.pagingSpec = pagingSpec;
this.descending = descending;
this.dimensions = Sets.newHashSet();
this.metrics = Sets.newHashSet();
this.pagingIdentifiers = Maps.newLinkedHashMap();
this.pQueue = instantiatePQueue();
}
Expand All @@ -81,12 +87,32 @@ public void finished(String segmentId, int lastOffset)
{
pagingIdentifiers.put(segmentId, lastOffset);
}

public void addDimension(String dimension)
{
dimensions.add(dimension);
}

public void addDimensions(Set<String> dimensions)
{
this.dimensions.addAll(dimensions);
}

public void addMetric(String metric)
{
metrics.add(metric);
}

public void addMetrics(Set<String> metrics)
{
this.metrics.addAll(metrics);
}

public Result<SelectResultValue> build()
{
return new Result<SelectResultValue>(
timestamp,
new SelectResultValue(pagingIdentifiers, getEventHolders())
new SelectResultValue(pagingIdentifiers, dimensions, metrics, getEventHolders())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
package io.druid.query.select;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.metamx.common.ISE;
import io.druid.granularity.QueryGranularities;
import io.druid.query.Result;
Expand All @@ -32,6 +34,7 @@
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Set;

/**
*/
Expand All @@ -50,6 +53,8 @@ public void testApply() throws Exception
new DateTime("2013-01-01"),
new SelectResultValue(
ImmutableMap.<String, Integer>of(),
Sets.newHashSet("first", "fourth"),
Sets.newHashSet("sixth"),
Arrays.asList(
new EventHolder(
segmentId1,
Expand Down Expand Up @@ -90,6 +95,8 @@ public void testApply() throws Exception
new DateTime("2013-01-01"),
new SelectResultValue(
ImmutableMap.<String, Integer>of(),
Sets.newHashSet("second", "third"),
Sets.newHashSet("fifth"),
Arrays.asList(
new EventHolder(
segmentId2,
Expand Down Expand Up @@ -203,6 +210,61 @@ public void testApply() throws Exception
verifyEvents(exEvents, acEvents);
}

@Test
public void testColumnMerge() throws Exception
{
SelectBinaryFn binaryFn = new SelectBinaryFn(QueryGranularities.ALL, new PagingSpec(null, 5), false);

Result<SelectResultValue> res1 = new Result<>(
new DateTime("2013-01-01"),
new SelectResultValue(
ImmutableMap.<String, Integer>of(),
Sets.newHashSet("first", "second", "fourth"),
Sets.newHashSet("eight", "nineth"),
Lists.<EventHolder>newArrayList(
new EventHolder(
segmentId1,
0,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T00"), "dim", "first"
)
))
)
);

Result<SelectResultValue> res2 = new Result<>(
new DateTime("2013-01-01"),
new SelectResultValue(
ImmutableMap.<String, Integer>of(),
Sets.newHashSet("third", "second", "fifth"),
Sets.newHashSet("seventh"),
Lists.<EventHolder>newArrayList(
new EventHolder(
segmentId2,
0,
ImmutableMap.<String, Object>of(
EventHolder.timestampKey,
new DateTime("2013-01-01T00"),
"dim",
"second"
)
))
)
);

Result<SelectResultValue> merged = binaryFn.apply(res1, res2);

Set<String> exDimensions = Sets.newHashSet("first", "second", "fourth", "third", "fifth");
Set<String> exMetrics = Sets.newHashSet("eight", "nineth", "seventh");

Set<String> acDimensions = merged.getValue().getDimensions();
Set<String> acMetrics = merged.getValue().getMetrics();

Assert.assertEquals(exDimensions, acDimensions);
Assert.assertEquals(exMetrics, acMetrics);
}

private void verifyIters(Iterator iter1, Iterator iter2)
{
while (iter1.hasNext()) {
Expand Down
Loading

0 comments on commit 2e0f253

Please sign in to comment.