Skip to content

Commit

Permalink
scan-query: Use long as limit. (apache#4081)
Browse files Browse the repository at this point in the history
* scan-query: Use long instead of int as limit type

* Use MAX_INSTANT queryTimeout, if timeout == 0
  • Loading branch information
KenjiTakahashi authored and fjy committed Mar 20, 2017
1 parent 64248d3 commit 8510a52
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class ScanQuery extends BaseQuery<ScanResultValue>

private final String resultFormat;
private final int batchSize;
private final int limit;
private final long limit;
private final DimFilter dimFilter;
private final List<String> columns;

Expand All @@ -58,7 +58,7 @@ public ScanQuery(
@JsonProperty("intervals") QuerySegmentSpec querySegmentSpec,
@JsonProperty("resultFormat") String resultFormat,
@JsonProperty("batchSize") int batchSize,
@JsonProperty("limit") int limit,
@JsonProperty("limit") long limit,
@JsonProperty("filter") DimFilter dimFilter,
@JsonProperty("columns") List<String> columns,
@JsonProperty("context") Map<String, Object> context
Expand All @@ -67,7 +67,7 @@ public ScanQuery(
super(dataSource, querySegmentSpec, false, context);
this.resultFormat = resultFormat == null ? RESULT_FORMAT_LIST : resultFormat;
this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize;
this.limit = (limit == 0) ? Integer.MAX_VALUE : limit;
this.limit = (limit == 0) ? Long.MAX_VALUE : limit;
Preconditions.checkArgument(this.batchSize > 0, "batchSize must be greater than 0");
Preconditions.checkArgument(this.limit > 0, "limit must be greater than 0");
this.dimFilter = dimFilter;
Expand All @@ -87,7 +87,7 @@ public int getBatchSize()
}

@JsonProperty
public int getLimit()
public long getLimit()
{
return limit;
}
Expand Down Expand Up @@ -217,7 +217,7 @@ public int hashCode()
int result = super.hashCode();
result = 31 * result + (resultFormat != null ? resultFormat.hashCode() : 0);
result = 31 * result + batchSize;
result = 31 * result + limit;
result = 31 * result + (int) (limit ^ (limit >>> 32));
result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0);
result = 31 * result + (columns != null ? columns.hashCode() : 0);
return result;
Expand Down Expand Up @@ -260,7 +260,7 @@ public static class ScanQueryBuilder
private Map<String, Object> context;
private String resultFormat;
private int batchSize;
private int limit;
private long limit;
private DimFilter dimFilter;
private List<String> columns;

Expand Down Expand Up @@ -346,7 +346,7 @@ public ScanQueryBuilder batchSize(int b)
return this;
}

public ScanQueryBuilder limit(int l)
public ScanQueryBuilder limit(long l)
{
limit = l;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public Sequence<ScanResultValue> process(
)
{
if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) != null) {
int count = (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
long count = (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
if (count >= query.getLimit()) {
return Sequences.empty();
}
Expand Down Expand Up @@ -104,9 +104,9 @@ public Sequence<ScanResultValue> process(
final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getDimensionsFilter()));

if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) == null) {
responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0);
responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0L);
}
final int limit = query.getLimit() - (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
final long limit = query.getLimit() - (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT);
return Sequences.concat(
Sequences.map(
adapter.makeCursors(
Expand Down Expand Up @@ -145,7 +145,7 @@ public Iterator<ScanResultValue> make()
final int batchSize = query.getBatchSize();
return new Iterator<ScanResultValue>()
{
private int offset = 0;
private long offset = 0;

@Override
public boolean hasNext()
Expand All @@ -159,7 +159,7 @@ public ScanResultValue next()
if (System.currentTimeMillis() >= timeoutAt) {
throw new QueryInterruptedException(new TimeoutException());
}
int lastOffset = offset;
long lastOffset = offset;
Object events = null;
String resultFormat = query.getResultFormat();
if (ScanQuery.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) {
Expand All @@ -171,7 +171,7 @@ public ScanResultValue next()
}
responseContext.put(
ScanQueryRunnerFactory.CTX_COUNT,
(int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset)
(long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset)
);
responseContext.put(
ScanQueryRunnerFactory.CTX_TIMEOUT_AT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ public class ScanQueryLimitRowIterator implements CloseableIterator<ScanResultVa
{
private Yielder<ScanResultValue> yielder;
private String resultFormat;
private int limit = 0;
private int count = 0;
private long limit = 0;
private long count = 0;

public ScanQueryLimitRowIterator(
QueryRunner<ScanResultValue> baseRunner, ScanQuery query,
Expand Down Expand Up @@ -76,7 +76,8 @@ public ScanResultValue next()
return batch;
} else {
// last batch
int left = limit - count;
// single batch length is <= Integer.MAX_VALUE, so this should not overflow
int left = (int) (limit - count);
count = limit;
return new ScanResultValue(batch.getSegmentId(), batch.getColumns(), events.subList(0, left));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public Sequence<ScanResultValue> run(
)
{
ScanQuery scanQuery = (ScanQuery) query;
if (scanQuery.getLimit() == Integer.MAX_VALUE) {
if (scanQuery.getLimit() == Long.MAX_VALUE) {
return runner.run(query, responseContext);
}
return new BaseSequence<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Sequence<ScanResultValue> run(
)
{
final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null);
final long timeoutAt = queryTimeout == null
final long timeoutAt = (queryTimeout == null || queryTimeout.longValue() == 0L)
? JodaUtils.MAX_INSTANT : System.currentTimeMillis() + queryTimeout.longValue();
responseContext.put(CTX_TIMEOUT_AT, timeoutAt);
return Sequences.concat(
Expand Down Expand Up @@ -119,7 +119,8 @@ public Sequence<ScanResultValue> run(
}

// it happens in unit tests
if (responseContext.get(CTX_TIMEOUT_AT) == null) {
final Number timeoutAt = (Number) responseContext.get(CTX_TIMEOUT_AT);
if (timeoutAt == null || timeoutAt.longValue() == 0L) {
responseContext.put(CTX_TIMEOUT_AT, JodaUtils.MAX_INSTANT);
};
return engine.process((ScanQuery) query, segment, responseContext);
Expand Down

0 comments on commit 8510a52

Please sign in to comment.