Skip to content

Commit

Permalink
Merge pull request apache#2576 from navis/paging-from-next
Browse files Browse the repository at this point in the history
Add option for select query to get next page without modifying returned paging identifiers
  • Loading branch information
fjy committed Apr 1, 2016
2 parents 4eb5a2c + 29bb005 commit eea7a47
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 198 deletions.
12 changes: 11 additions & 1 deletion docs/content/querying/select-query.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,5 +167,15 @@ This can be used with the next query's pagingSpec:
"pagingSpec":{"pagingIdentifiers": {"wikipedia_2012-12-29T00:00:00.000Z_2013-01-10T08:00:00.000Z_2013-01-10T08:13:47.830Z_v9" : 5}, "threshold":5}

}
```

Note that in the second query, an offset is specified and that it is 1 greater than the largest offset found in the initial results. To return the next "page", this offset must be incremented by 1 (should be decremented by 1 for descending query), with each new query, but with option `fromNext` enabled, this operation is not needed. When an empty results set is received, the very last page has been returned.

`fromNext` options is in pagingSpec:

Note that in the second query, an offset is specified and that it is 1 greater than the largest offset found in the initial results. To return the next "page", this offset must be incremented by 1 (should be decremented by 1 for descending query), with each new query. When an empty results set is received, the very last page has been returned.
```json
{
...
"pagingSpec":{"pagingIdentifiers": {}, "threshold":5, "fromNext": true}
}
```
79 changes: 62 additions & 17 deletions processing/src/main/java/io/druid/query/select/PagingSpec.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,28 +21,60 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.Maps;
import com.google.common.primitives.Ints;
import com.metamx.common.StringUtils;

import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
import java.util.Map;

/**
*/
public class PagingSpec
{
private final LinkedHashMap<String, Integer> pagingIdentifiers;
public static PagingSpec newSpec(int threshold)
{
return new PagingSpec(null, threshold);
}

public static Map<String, Integer> merge(Iterable<Map<String, Integer>> cursors)
{
Map<String, Integer> next = Maps.newHashMap();
for (Map<String, Integer> cursor : cursors) {
for (Map.Entry<String, Integer> entry : cursor.entrySet()) {
next.put(entry.getKey(), entry.getValue());
}
}
return next;
}

public static Map<String, Integer> next(Map<String, Integer> cursor, boolean descending)
{
for (Map.Entry<String, Integer> entry : cursor.entrySet()) {
entry.setValue(descending ? entry.getValue() - 1 : entry.getValue() + 1);
}
return cursor;
}

private final Map<String, Integer> pagingIdentifiers;
private final int threshold;
private final boolean fromNext;

@JsonCreator
public PagingSpec(
@JsonProperty("pagingIdentifiers") LinkedHashMap<String, Integer> pagingIdentifiers,
@JsonProperty("threshold") int threshold
@JsonProperty("pagingIdentifiers") Map<String, Integer> pagingIdentifiers,
@JsonProperty("threshold") int threshold,
@JsonProperty("fromNext") boolean fromNext
)
{
this.pagingIdentifiers = pagingIdentifiers == null ? new LinkedHashMap<String, Integer>() : pagingIdentifiers;
this.pagingIdentifiers = pagingIdentifiers == null ? Maps.<String, Integer>newHashMap() : pagingIdentifiers;
this.threshold = threshold;
this.fromNext = fromNext;
}

public PagingSpec(Map<String, Integer> pagingIdentifiers, int threshold)
{
this(pagingIdentifiers, threshold, false);
}

@JsonProperty
Expand All @@ -57,6 +89,12 @@ public int getThreshold()
return threshold;
}

@JsonProperty
public boolean isFromNext()
{
return fromNext;
}

public byte[] getCacheKey()
{
final byte[][] pagingKeys = new byte[pagingIdentifiers.size()][];
Expand All @@ -75,7 +113,7 @@ public byte[] getCacheKey()

final byte[] thresholdBytes = ByteBuffer.allocate(Ints.BYTES).putInt(threshold).array();

final ByteBuffer queryCacheKey = ByteBuffer.allocate(pagingKeysSize + pagingValuesSize + thresholdBytes.length);
final ByteBuffer queryCacheKey = ByteBuffer.allocate(pagingKeysSize + pagingValuesSize + thresholdBytes.length + 1);

for (byte[] pagingKey : pagingKeys) {
queryCacheKey.put(pagingKey);
Expand All @@ -86,22 +124,37 @@ public byte[] getCacheKey()
}

queryCacheKey.put(thresholdBytes);
queryCacheKey.put(isFromNext() ? (byte) 0x01 : 0x00);

return queryCacheKey.array();
}

public PagingOffset getOffset(String identifier, boolean descending)
{
Integer offset = pagingIdentifiers.get(identifier);
if (offset == null) {
offset = PagingOffset.toOffset(0, descending);
} else if (fromNext) {
offset = descending ? offset - 1 : offset + 1;
}
return PagingOffset.of(offset, threshold);
}

@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (!(o instanceof PagingSpec)) {
if (o == null || getClass() != o.getClass()) {
return false;
}

PagingSpec that = (PagingSpec) o;

if (fromNext != that.fromNext) {
return false;
}
if (threshold != that.threshold) {
return false;
}
Expand All @@ -117,6 +170,7 @@ public int hashCode()
{
int result = pagingIdentifiers.hashCode();
result = 31 * result + threshold;
result = 31 * result + (fromNext ? 1 : 0);
return result;
}

Expand All @@ -126,16 +180,7 @@ public String toString()
return "PagingSpec{" +
"pagingIdentifiers=" + pagingIdentifiers +
", threshold=" + threshold +
", fromNext=" + fromNext +
'}';
}

public PagingOffset getOffset(String identifier, boolean descending)
{
Integer offset = pagingIdentifiers.get(identifier);
if (offset == null) {
offset = PagingOffset.toOffset(0, descending);
}
return PagingOffset.of(offset, threshold);
}

}
Loading

0 comments on commit eea7a47

Please sign in to comment.