diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java index cdd2d1da4c13..df6a4079d5cb 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQuery.java @@ -48,7 +48,7 @@ public class ScanQuery extends BaseQuery private final String resultFormat; private final int batchSize; - private final int limit; + private final long limit; private final DimFilter dimFilter; private final List columns; @@ -58,7 +58,7 @@ public ScanQuery( @JsonProperty("intervals") QuerySegmentSpec querySegmentSpec, @JsonProperty("resultFormat") String resultFormat, @JsonProperty("batchSize") int batchSize, - @JsonProperty("limit") int limit, + @JsonProperty("limit") long limit, @JsonProperty("filter") DimFilter dimFilter, @JsonProperty("columns") List columns, @JsonProperty("context") Map context @@ -67,7 +67,7 @@ public ScanQuery( super(dataSource, querySegmentSpec, false, context); this.resultFormat = resultFormat == null ? RESULT_FORMAT_LIST : resultFormat; this.batchSize = (batchSize == 0) ? 4096 * 5 : batchSize; - this.limit = (limit == 0) ? Integer.MAX_VALUE : limit; + this.limit = (limit == 0) ? Long.MAX_VALUE : limit; Preconditions.checkArgument(this.batchSize > 0, "batchSize must be greater than 0"); Preconditions.checkArgument(this.limit > 0, "limit must be greater than 0"); this.dimFilter = dimFilter; @@ -87,7 +87,7 @@ public int getBatchSize() } @JsonProperty - public int getLimit() + public long getLimit() { return limit; } @@ -217,7 +217,7 @@ public int hashCode() int result = super.hashCode(); result = 31 * result + (resultFormat != null ? resultFormat.hashCode() : 0); result = 31 * result + batchSize; - result = 31 * result + limit; + result = 31 * result + (int) (limit ^ (limit >>> 32)); result = 31 * result + (dimFilter != null ? dimFilter.hashCode() : 0); result = 31 * result + (columns != null ? columns.hashCode() : 0); return result; @@ -260,7 +260,7 @@ public static class ScanQueryBuilder private Map context; private String resultFormat; private int batchSize; - private int limit; + private long limit; private DimFilter dimFilter; private List columns; @@ -346,7 +346,7 @@ public ScanQueryBuilder batchSize(int b) return this; } - public ScanQueryBuilder limit(int l) + public ScanQueryBuilder limit(long l) { limit = l; return this; diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java index 6253992952c6..139cc6d0ad0e 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryEngine.java @@ -60,7 +60,7 @@ public Sequence process( ) { if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) != null) { - int count = (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT); + long count = (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT); if (count >= query.getLimit()) { return Sequences.empty(); } @@ -104,9 +104,9 @@ public Sequence process( final Filter filter = Filters.convertToCNFFromQueryContext(query, Filters.toFilter(query.getDimensionsFilter())); if (responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) == null) { - responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0); + responseContext.put(ScanQueryRunnerFactory.CTX_COUNT, 0L); } - final int limit = query.getLimit() - (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT); + final long limit = query.getLimit() - (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT); return Sequences.concat( Sequences.map( adapter.makeCursors( @@ -145,7 +145,7 @@ public Iterator make() final int batchSize = query.getBatchSize(); return new Iterator() { - private int offset = 0; + private long offset = 0; @Override public boolean hasNext() @@ -159,7 +159,7 @@ public ScanResultValue next() if (System.currentTimeMillis() >= timeoutAt) { throw new QueryInterruptedException(new TimeoutException()); } - int lastOffset = offset; + long lastOffset = offset; Object events = null; String resultFormat = query.getResultFormat(); if (ScanQuery.RESULT_FORMAT_VALUE_VECTOR.equals(resultFormat)) { @@ -171,7 +171,7 @@ public ScanResultValue next() } responseContext.put( ScanQueryRunnerFactory.CTX_COUNT, - (int) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset) + (long) responseContext.get(ScanQueryRunnerFactory.CTX_COUNT) + (offset - lastOffset) ); responseContext.put( ScanQueryRunnerFactory.CTX_TIMEOUT_AT, diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java index 78a11073f55e..f102d1c3ec67 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryLimitRowIterator.java @@ -32,8 +32,8 @@ public class ScanQueryLimitRowIterator implements CloseableIterator yielder; private String resultFormat; - private int limit = 0; - private int count = 0; + private long limit = 0; + private long count = 0; public ScanQueryLimitRowIterator( QueryRunner baseRunner, ScanQuery query, @@ -76,7 +76,8 @@ public ScanResultValue next() return batch; } else { // last batch - int left = limit - count; + // single batch length is <= Integer.MAX_VALUE, so this should not overflow + int left = (int) (limit - count); count = limit; return new ScanResultValue(batch.getSegmentId(), batch.getColumns(), events.subList(0, left)); } diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java index 222efda0a8e1..69801c4c6ede 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryQueryToolChest.java @@ -50,7 +50,7 @@ public Sequence run( ) { ScanQuery scanQuery = (ScanQuery) query; - if (scanQuery.getLimit() == Integer.MAX_VALUE) { + if (scanQuery.getLimit() == Long.MAX_VALUE) { return runner.run(query, responseContext); } return new BaseSequence<>( diff --git a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java index 6b1244c5ff8a..712249eac5b7 100644 --- a/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java +++ b/extensions-contrib/scan-query/src/main/java/io/druid/query/scan/ScanQueryRunnerFactory.java @@ -72,7 +72,7 @@ public Sequence run( ) { final Number queryTimeout = query.getContextValue(QueryContextKeys.TIMEOUT, null); - final long timeoutAt = queryTimeout == null + final long timeoutAt = (queryTimeout == null || queryTimeout.longValue() == 0L) ? JodaUtils.MAX_INSTANT : System.currentTimeMillis() + queryTimeout.longValue(); responseContext.put(CTX_TIMEOUT_AT, timeoutAt); return Sequences.concat( @@ -119,7 +119,8 @@ public Sequence run( } // it happens in unit tests - if (responseContext.get(CTX_TIMEOUT_AT) == null) { + final Number timeoutAt = (Number) responseContext.get(CTX_TIMEOUT_AT); + if (timeoutAt == null || timeoutAt.longValue() == 0L) { responseContext.put(CTX_TIMEOUT_AT, JodaUtils.MAX_INSTANT); }; return engine.process((ScanQuery) query, segment, responseContext);