Skip to content

Commit

Permalink
Add option for select query to get next page without modifying return…
Browse files Browse the repository at this point in the history
…ed paging identifiers
  • Loading branch information
navis committed Apr 1, 2016
1 parent 2fc5918 commit 29bb005
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 198 deletions.
12 changes: 11 additions & 1 deletion docs/content/querying/select-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,15 @@ This can be used with the next query's pagingSpec:
"pagingSpec":{"pagingIdentifiers": {"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9" : 5}, "threshold":5}

}
```

Note that in the second query, an offset is specified and that it is 1 greater than the largest offset found in the initial results. To return the next "page", this offset must be incremented by 1 (should be decremented by 1 for descending query), with each new query, but with option `fromNext` enabled, this operation is not needed. When an empty results set is received, the very last page has been returned.

`fromNext` options is in pagingSpec:

Note that in the second query, an offset is specified and that it is 1 greater than the largest offset found in the initial results. To return the next "page", this offset must be incremented by 1 (should be decremented by 1 for descending query), with each new query. When an empty results set is received, the very last page has been returned.
```json
{
...
"pagingSpec":{"pagingIdentifiers": {}, "threshold":5, "fromNext": true}
}
```
79 changes: 62 additions & 17 deletions processing/src/main/java/io/druid/query/select/PagingSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,60 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.metamx.common.StringUtils;

import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.Map;

/**
*/
public class PagingSpec
{
private final LinkedHashMap<String, Integer> pagingIdentifiers;
public static PagingSpec newSpec(int threshold)
{
return new PagingSpec(null, threshold);
}

public static Map<String, Integer> merge(Iterable<Map<String, Integer>> cursors)
{
Map<String, Integer> next = Maps.newHashMap();
for (Map<String, Integer> cursor : cursors) {
for (Map.Entry<String, Integer> entry : cursor.entrySet()) {
next.put(entry.getKey(), entry.getValue());
}
}
return next;
}

public static Map<String, Integer> next(Map<String, Integer> cursor, boolean descending)
{
for (Map.Entry<String, Integer> entry : cursor.entrySet()) {
entry.setValue(descending ? entry.getValue() - 1 : entry.getValue() + 1);
}
return cursor;
}

private final Map<String, Integer> pagingIdentifiers;
private final int threshold;
private final boolean fromNext;

@JsonCreator
public PagingSpec(
@JsonProperty("pagingIdentifiers") LinkedHashMap<String, Integer> pagingIdentifiers,
@JsonProperty("threshold") int threshold
@JsonProperty("pagingIdentifiers") Map<String, Integer> pagingIdentifiers,
@JsonProperty("threshold") int threshold,
@JsonProperty("fromNext") boolean fromNext
)
{
this.pagingIdentifiers = pagingIdentifiers == null ? new LinkedHashMap<String, Integer>() : pagingIdentifiers;
this.pagingIdentifiers = pagingIdentifiers == null ? Maps.<String, Integer>newHashMap() : pagingIdentifiers;
this.threshold = threshold;
this.fromNext = fromNext;
}

public PagingSpec(Map<String, Integer> pagingIdentifiers, int threshold)
{
this(pagingIdentifiers, threshold, false);
}

@JsonProperty
Expand All @@ -57,6 +89,12 @@ public int getThreshold()
return threshold;
}

@JsonProperty
public boolean isFromNext()
{
return fromNext;
}

public byte[] getCacheKey()
{
final byte[][] pagingKeys = new byte[pagingIdentifiers.size()][];
Expand All @@ -75,7 +113,7 @@ public byte[] getCacheKey()

final byte[] thresholdBytes = ByteBuffer.allocate(Ints.BYTES).putInt(threshold).array();

final ByteBuffer queryCacheKey = ByteBuffer.allocate(pagingKeysSize + pagingValuesSize + thresholdBytes.length);
final ByteBuffer queryCacheKey = ByteBuffer.allocate(pagingKeysSize + pagingValuesSize + thresholdBytes.length + 1);

for (byte[] pagingKey : pagingKeys) {
queryCacheKey.put(pagingKey);
Expand All @@ -86,22 +124,37 @@ public byte[] getCacheKey()
}

queryCacheKey.put(thresholdBytes);
queryCacheKey.put(isFromNext() ? (byte) 0x01 : 0x00);

return queryCacheKey.array();
}

public PagingOffset getOffset(String identifier, boolean descending)
{
Integer offset = pagingIdentifiers.get(identifier);
if (offset == null) {
offset = PagingOffset.toOffset(0, descending);
} else if (fromNext) {
offset = descending ? offset - 1 : offset + 1;
}
return PagingOffset.of(offset, threshold);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof PagingSpec)) {
if (o == null || getClass() != o.getClass()) {
return false;
}

PagingSpec that = (PagingSpec) o;

if (fromNext != that.fromNext) {
return false;
}
if (threshold != that.threshold) {
return false;
}
Expand All @@ -117,6 +170,7 @@ public int hashCode()
{
int result = pagingIdentifiers.hashCode();
result = 31 * result + threshold;
result = 31 * result + (fromNext ? 1 : 0);
return result;
}

Expand All @@ -126,16 +180,7 @@ public String toString()
return "PagingSpec{" +
"pagingIdentifiers=" + pagingIdentifiers +
", threshold=" + threshold +
", fromNext=" + fromNext +
'}';
}

public PagingOffset getOffset(String identifier, boolean descending)
{
Integer offset = pagingIdentifiers.get(identifier);
if (offset == null) {
offset = PagingOffset.toOffset(0, descending);
}
return PagingOffset.of(offset, threshold);
}

}
Loading

0 comments on commit 29bb005

Please sign in to comment.