forked from opensearch-project/sql
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
add TakeOrderedOperator (opensearch-project#2863)
--------- Signed-off-by: Heng Qian <[email protected]>
- Loading branch information
1 parent
14a80a9
commit 0e70a50
Showing
14 changed files
with
883 additions
and
60 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
70 changes: 70 additions & 0 deletions
70
core/src/main/java/org/opensearch/sql/planner/physical/SortHelper.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package org.opensearch.sql.planner.physical; | ||
|
||
import static org.opensearch.sql.ast.tree.Sort.NullOrder.NULL_FIRST; | ||
import static org.opensearch.sql.ast.tree.Sort.SortOrder.ASC; | ||
|
||
import com.google.common.collect.Ordering; | ||
import java.util.ArrayList; | ||
import java.util.Comparator; | ||
import java.util.List; | ||
import org.apache.commons.lang3.tuple.Pair; | ||
import org.opensearch.sql.ast.tree.Sort.SortOption; | ||
import org.opensearch.sql.data.model.ExprValue; | ||
import org.opensearch.sql.data.utils.ExprValueOrdering; | ||
import org.opensearch.sql.expression.Expression; | ||
|
||
public interface SortHelper { | ||
|
||
/** | ||
* Construct an expr comparator for sorting on ExprValue. | ||
* | ||
* @param sortList list of sort fields and their related sort options. | ||
* @return A comparator for ExprValue | ||
*/ | ||
static Comparator<ExprValue> constructExprComparator( | ||
List<Pair<SortOption, Expression>> sortList) { | ||
return (o1, o2) -> compareWithExpressions(o1, o2, constructComparator(sortList)); | ||
} | ||
|
||
/** | ||
* Construct an expr ordering for efficiently taking the top-k elements on ExprValue. | ||
* | ||
* @param sortList list of sort fields and their related sort options. | ||
* @return An guava ordering for ExprValue | ||
*/ | ||
static Ordering<ExprValue> constructExprOrdering(List<Pair<SortOption, Expression>> sortList) { | ||
return Ordering.from(constructExprComparator(sortList)); | ||
} | ||
|
||
private static List<Pair<Expression, Comparator<ExprValue>>> constructComparator( | ||
List<Pair<SortOption, Expression>> sortList) { | ||
List<Pair<Expression, Comparator<ExprValue>>> comparators = new ArrayList<>(); | ||
for (Pair<SortOption, Expression> pair : sortList) { | ||
SortOption option = pair.getLeft(); | ||
ExprValueOrdering ordering = | ||
ASC.equals(option.getSortOrder()) | ||
? ExprValueOrdering.natural() | ||
: ExprValueOrdering.natural().reverse(); | ||
ordering = | ||
NULL_FIRST.equals(option.getNullOrder()) ? ordering.nullsFirst() : ordering.nullsLast(); | ||
comparators.add(Pair.of(pair.getRight(), ordering)); | ||
} | ||
return comparators; | ||
} | ||
|
||
private static int compareWithExpressions( | ||
ExprValue o1, ExprValue o2, List<Pair<Expression, Comparator<ExprValue>>> comparators) { | ||
for (Pair<Expression, Comparator<ExprValue>> comparator : comparators) { | ||
Expression expression = comparator.getKey(); | ||
int result = | ||
comparator | ||
.getValue() | ||
.compare( | ||
expression.valueOf(o1.bindingTuples()), expression.valueOf(o2.bindingTuples())); | ||
if (result != 0) { | ||
return result; | ||
} | ||
} | ||
return 0; | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
88 changes: 88 additions & 0 deletions
88
core/src/main/java/org/opensearch/sql/planner/physical/TakeOrderedOperator.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,88 @@ | ||
/* | ||
* Copyright OpenSearch Contributors | ||
* SPDX-License-Identifier: Apache-2.0 | ||
*/ | ||
|
||
package org.opensearch.sql.planner.physical; | ||
|
||
import com.google.common.collect.Ordering; | ||
import java.util.Collections; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import lombok.EqualsAndHashCode; | ||
import lombok.Getter; | ||
import lombok.ToString; | ||
import org.apache.commons.lang3.tuple.Pair; | ||
import org.opensearch.sql.ast.tree.Sort.SortOption; | ||
import org.opensearch.sql.data.model.ExprValue; | ||
import org.opensearch.sql.expression.Expression; | ||
|
||
/** | ||
* TakeOrdered Operator. This operator will sort input data as the order of {@link this#sortList} | ||
* specifies and return {@link this#limit} rows from the {@link this#offset} index. | ||
* | ||
* <p>Functionally, this operator is a combination of {@link SortOperator} and {@link | ||
* LimitOperator}. But it can reduce the time complexity from O(nlogn) to O(n), and memory from O(n) | ||
* to O(k) due to use guava {@link com.google.common.collect.Ordering}. | ||
* | ||
* <p>Overall, it's an optimization to replace `Limit(Sort)` in physical plan level since it's all | ||
* about execution. Because most execution engine may not support this operator, it doesn't have a | ||
* related logical operator. | ||
*/ | ||
@ToString | ||
@EqualsAndHashCode(callSuper = false) | ||
public class TakeOrderedOperator extends PhysicalPlan { | ||
@Getter private final PhysicalPlan input; | ||
|
||
@Getter private final List<Pair<SortOption, Expression>> sortList; | ||
@Getter private final Integer limit; | ||
@Getter private final Integer offset; | ||
@EqualsAndHashCode.Exclude private final Ordering<ExprValue> ordering; | ||
@EqualsAndHashCode.Exclude private Iterator<ExprValue> iterator; | ||
|
||
/** | ||
* TakeOrdered Operator Constructor. | ||
* | ||
* @param input input {@link PhysicalPlan} | ||
* @param limit the limit value from LimitOperator | ||
* @param offset the offset value from LimitOperator | ||
* @param sortList list of sort field from SortOperator | ||
*/ | ||
public TakeOrderedOperator( | ||
PhysicalPlan input, | ||
Integer limit, | ||
Integer offset, | ||
List<Pair<SortOption, Expression>> sortList) { | ||
this.input = input; | ||
this.sortList = sortList; | ||
this.limit = limit; | ||
this.offset = offset; | ||
this.ordering = SortHelper.constructExprOrdering(sortList); | ||
} | ||
|
||
@Override | ||
public <R, C> R accept(PhysicalPlanNodeVisitor<R, C> visitor, C context) { | ||
return visitor.visitTakeOrdered(this, context); | ||
} | ||
|
||
@Override | ||
public void open() { | ||
super.open(); | ||
iterator = ordering.leastOf(input, offset + limit).stream().skip(offset).iterator(); | ||
} | ||
|
||
@Override | ||
public List<PhysicalPlan> getChild() { | ||
return Collections.singletonList(input); | ||
} | ||
|
||
@Override | ||
public boolean hasNext() { | ||
return iterator.hasNext(); | ||
} | ||
|
||
@Override | ||
public ExprValue next() { | ||
return iterator.next(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.