Skip to content

Commit

Permalink
Merge branch 'merger/215' of https://github.com/iamhuzl/sharding-jdbc
Browse files Browse the repository at this point in the history
…into iamhuzl-merger/215
  • Loading branch information
gaohongtao committed Jan 17, 2017
2 parents 32b350d + ae8d5c5 commit 4e63e67
Showing 1 changed file with 39 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,48 +39,70 @@ public final class StreamingOrderByReducerResultSet extends AbstractDelegateResu

private final List<OrderByColumn> orderByColumns;

private final List<ResultSet> resultSets = new LinkedList<>();

private final List<OrderByDelegateResultSet> delegateResultSets = new LinkedList<>();
private OrderByDelegateResultSet lastDelegateResultSet ;


public StreamingOrderByReducerResultSet(final ResultSetMergeContext resultSetMergeContext) throws SQLException {
super(resultSetMergeContext.getShardingResultSets().getResultSets());
orderByColumns = resultSetMergeContext.getCurrentOrderByKeys();
resultSets.addAll(resultSetMergeContext.getShardingResultSets().getResultSets());
List<ResultSet> mergeResultSets = resultSetMergeContext.getShardingResultSets().getResultSets();
for(ResultSet each: mergeResultSets){
delegateResultSets.add(new OrderByDelegateResultSet(each,orderByColumns));
}
}

@Override
protected boolean firstNext() throws SQLException {
initialResultSetCursors();
return doNext();
}

private void initialResultSetCursors() throws SQLException {
for (ResultSet each : resultSets) {
for (OrderByDelegateResultSet each : delegateResultSets) {
each.next();
}
return doNext();
}

@Override
protected boolean afterFirstNext() throws SQLException {
if (!getDelegate().next()) {
resultSets.remove(getDelegate());
}
lastDelegateResultSet.next();
return doNext();
}

private boolean doNext() throws SQLException {
setDelegateResultSet();
return !resultSets.isEmpty();
return !delegateResultSets.isEmpty();
}

private void setDelegateResultSet() throws SQLException {
OrderByResultSetRow chosenOrderByValue = null;
for (ResultSet each : resultSets) {
OrderByResultSetRow eachOrderByValue = new OrderByResultSetRow(each, orderByColumns);
for (OrderByDelegateResultSet each : delegateResultSets) {
OrderByResultSetRow eachOrderByValue = each.orderByValue;
if (null == chosenOrderByValue || chosenOrderByValue.compareTo(eachOrderByValue) > 0) {
chosenOrderByValue = eachOrderByValue;
setDelegate(each);
setDelegate(each.delegate);
lastDelegateResultSet = each;
}
}
log.trace("Chosen order by value: {}, current result set hashcode: {}", chosenOrderByValue, getDelegate().hashCode());
}

class OrderByDelegateResultSet {
private ResultSet delegate;
private List<OrderByColumn> orderByColumns;
private OrderByResultSetRow orderByValue ;

public OrderByDelegateResultSet(ResultSet delegate, List<OrderByColumn> orderByColumns) throws SQLException {
this.delegate = delegate;
this.orderByColumns = orderByColumns;
}

public boolean next() throws SQLException {
boolean result = delegate.next();
if(result) {
orderByValue = new OrderByResultSetRow(delegate, orderByColumns);
}else {
delegateResultSets.remove(this);
}
return result;
}

}
}

0 comments on commit 4e63e67

Please sign in to comment.