Skip to content

Commit

Permalink
fix apache#79 limit change dependent routing result
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoht committed Jul 13, 2016
1 parent 9a6866e commit 2921a56
Show file tree
Hide file tree
Showing 23 changed files with 327 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

package com.dangdang.ddframe.rdb.sharding.parser;

import java.util.Collection;
import java.util.List;

import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.statement.SQLDeleteStatement;
import com.alibaba.druid.sql.ast.statement.SQLInsertStatement;
Expand All @@ -35,6 +32,9 @@
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import java.util.Collection;
import java.util.List;

/**
* 不包含OR语句的SQL构建器解析.
*
Expand Down Expand Up @@ -66,7 +66,7 @@ public SQLParsedResult parse() {
sqlStatement.accept(visitor);
SQLParsedResult result;
if (sqlVisitor.getParseContext().isHasOrCondition()) {
result = new OrParser(sqlStatement, visitor).parse();
result = new OrParser(sqlStatement, visitor).parse(sqlVisitor.getParseContext().getParsedResult());
} else {
sqlVisitor.getParseContext().mergeCurrentConditionContext();
result = sqlVisitor.getParseContext().getParsedResult();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package com.dangdang.ddframe.rdb.sharding.parser.result.merger;

import com.google.common.base.Optional;
import lombok.Getter;
import lombok.RequiredArgsConstructor;
import lombok.ToString;
Expand All @@ -31,7 +32,15 @@
@ToString
public class Limit {

public static final String OFFSET_NAME = "limit_offset";

public static final String COUNT_NAME = "limit_count";

private final int offset;

private final int rowCount;

private final Optional<Integer> offsetParameterIndex;

private final Optional<Integer> rowCountParameterIndex;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@

package com.dangdang.ddframe.rdb.sharding.parser.result.router;

import com.google.common.base.Joiner;

import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;

import com.google.common.base.Joiner;

/**
* SQL构建器.
*
Expand Down Expand Up @@ -91,6 +91,16 @@ public SQLBuilder buildSQL(final String originToken, final String newToken) {
return this;
}

/**
* 是否包含占位符.
*
* @param token 占位符
* @return true 包含 false 不包含
*/
public boolean containsToken(final String token) {
return tokenMap.containsKey(token);
}

/**
* 生成SQL语句.
*
Expand Down Expand Up @@ -134,6 +144,22 @@ public String toString() {
return result.toString();
}

public SQLBuilder cloneBuilder() {
SQLBuilder result = new SQLBuilder();
for (Object each : segments) {
if (each instanceof StringToken) {
StringToken token = (StringToken) each;
StringToken newToken = new StringToken();
newToken.value = token.value;
result.segments.add(newToken);
result.tokenMap.put(newToken.value, newToken);
} else {
result.segments.add(each);
}
}
return result;
}

private class StringToken {

private String value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.alibaba.druid.sql.dialect.mysql.ast.expr.MySqlSelectGroupByExpr;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlSelectQueryBlock;
import com.alibaba.druid.sql.dialect.mysql.visitor.MySqlOutputVisitor;
import com.dangdang.ddframe.rdb.sharding.exception.SQLParserException;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.AbstractSortableColumn;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.AggregationColumn;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.AggregationColumn.AggregationType;
Expand Down Expand Up @@ -162,26 +163,42 @@ public boolean visit(final MySqlSelectGroupByExpr x) {
public boolean visit(final MySqlSelectQueryBlock.Limit x) {
print("LIMIT ");
int offset = 0;
Optional<Integer> offSetIndex;
if (null != x.getOffset()) {
if (x.getOffset() instanceof SQLNumericLiteralExpr) {
offset = ((SQLNumericLiteralExpr) x.getOffset()).getNumber().intValue();
print("0, ");
offSetIndex = Optional.absent();
printToken(Limit.OFFSET_NAME);
print(", ");
} else {
offset = ((Number) getParameters().get(((SQLVariantRefExpr) x.getOffset()).getIndex())).intValue();
getParameters().set(((SQLVariantRefExpr) x.getOffset()).getIndex(), 0);
offSetIndex = Optional.of(((SQLVariantRefExpr) x.getOffset()).getIndex());
print("?, ");
}
} else {
offSetIndex = Optional.absent();
}
int rowCount;
Optional<Integer> rowCountIndex;
if (x.getRowCount() instanceof SQLNumericLiteralExpr) {
rowCount = ((SQLNumericLiteralExpr) x.getRowCount()).getNumber().intValue();
print(rowCount + offset);
rowCountIndex = Optional.absent();
printToken(Limit.COUNT_NAME);
} else {
rowCount = ((Number) getParameters().get(((SQLVariantRefExpr) x.getRowCount()).getIndex())).intValue();
getParameters().set(((SQLVariantRefExpr) x.getRowCount()).getIndex(), rowCount + offset);
rowCountIndex = Optional.of(((SQLVariantRefExpr) x.getRowCount()).getIndex());
print("?");
}
getParseContext().getParsedResult().getMergeContext().setLimit(new Limit(offset, rowCount));
if (offset < 0 || rowCount < 0) {
throw new SQLParserException("LIMIT offset and row count can not be a negative value");
}
// "LIMIT {rowCount} OFFSET {offset}" will transform to "LIMIT {offset}, {rowCount}".So exchange parameter index
if (offSetIndex.isPresent() && rowCountIndex.isPresent() && offSetIndex.get() > rowCountIndex.get()) {
Optional<Integer> tmp = rowCountIndex;
rowCountIndex = offSetIndex;
offSetIndex = tmp;
}
getParseContext().getParsedResult().getMergeContext().setLimit(new Limit(offset, rowCount, offSetIndex, rowCountIndex));
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,14 @@ public OrParser(final SQLStatement sqlStatement, final SQLASTOutputVisitor depen
*  解析SQL.
*
* @return SQL解析结果
* @param parsedResult 初步解析结果
*/
public SQLParsedResult parse() {
SQLParsedResult result = orVisitor.getParseContext().getParsedResult();
public SQLParsedResult parse(final SQLParsedResult parsedResult) {
Optional<AbstractOrASTNode> rootASTNode = orVisitor.visitHandle(sqlStatement);
if (rootASTNode.isPresent()) {
result.getConditionContexts().addAll(rootASTNode.get().getCondition());
parsedResult.getConditionContexts().clear();
parsedResult.getConditionContexts().addAll(rootASTNode.get().getCondition());
}
return result;
return parsedResult;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public SQLRouteResult route(final List<Object> parameters) {
each.setNewConditionValue(parameters);
}
}
return engine.routeSQL(sqlParsedResult);
return engine.routeSQL(sqlParsedResult, parameters);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@

package com.dangdang.ddframe.rdb.sharding.router;

import com.dangdang.ddframe.rdb.sharding.parser.result.router.SQLBuilder;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;

Expand All @@ -30,16 +32,20 @@
@Getter
@Slf4j
@ToString
@EqualsAndHashCode
@EqualsAndHashCode(exclude = "sqlBuilder")
public class SQLExecutionUnit {

private final String dataSource;

private final String sql;
@Setter
private String sql;

public SQLExecutionUnit(final String dataSource, final String sql) {
private final SQLBuilder sqlBuilder;

public SQLExecutionUnit(final String dataSource, final SQLBuilder sqlBuilder) {
this.dataSource = dataSource;
this.sql = sql;
this.sqlBuilder = sqlBuilder.cloneBuilder();
sql = sqlBuilder.toSQL();
log.trace("route sql to db: [{}] sql: [{}]", dataSource, sql);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.dangdang.ddframe.rdb.sharding.metrics.MetricsContext;
import com.dangdang.ddframe.rdb.sharding.parser.SQLParserFactory;
import com.dangdang.ddframe.rdb.sharding.parser.result.SQLParsedResult;
import com.dangdang.ddframe.rdb.sharding.parser.result.merger.Limit;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.ConditionContext;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.SQLBuilder;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.SQLStatementType;
Expand Down Expand Up @@ -77,7 +78,7 @@ public SQLRouteResult route(final String logicSql) throws SQLParserException {
* @throws SQLParserException SQL解析失败异常
*/
public SQLRouteResult route(final String logicSql, final List<Object> parameters) throws SQLParserException {
return routeSQL(parseSQL(logicSql, parameters));
return routeSQL(parseSQL(logicSql, parameters), parameters);
}

/**
Expand All @@ -97,7 +98,7 @@ SQLParsedResult parseSQL(final String logicSql, final List<Object> parameters) {
return result;
}

SQLRouteResult routeSQL(final SQLParsedResult parsedResult) {
SQLRouteResult routeSQL(final SQLParsedResult parsedResult, final List<Object> parameters) {
Context context = MetricsContext.start("Route SQL");
SQLRouteResult result = new SQLRouteResult(parsedResult.getRouteContext().getSqlStatementType(), parsedResult.getMergeContext());
for (ConditionContext each : parsedResult.getConditionContexts()) {
Expand All @@ -109,6 +110,7 @@ public String apply(final Table input) {
}
})), parsedResult.getRouteContext().getSqlBuilder(), parsedResult.getRouteContext().getSqlStatementType()));
}
processLimit(result.getExecutionUnits(), parsedResult, parameters);
MetricsContext.stop(context);
log.debug("final route result:{}", result.getExecutionUnits());
log.debug("merge context:{}", result.getMergeContext());
Expand All @@ -130,4 +132,34 @@ private Collection<SQLExecutionUnit> routeSQL(final ConditionContext conditionCo
}
return result.getSQLExecutionUnits(sqlBuilder);
}

private void processLimit(final Set<SQLExecutionUnit> sqlExecutionUnits, final SQLParsedResult parsedResult, final List<Object> parameters) {
if (!parsedResult.getMergeContext().hasLimit()) {
return;
}
int offset;
int rowCount;
Limit limit = parsedResult.getMergeContext().getLimit();
if (sqlExecutionUnits.size() > 1) {
offset = 0;
rowCount = limit.getOffset() + limit.getRowCount();
} else {
offset = limit.getOffset();
rowCount = limit.getRowCount();
}
if (parsedResult.getRouteContext().getSqlBuilder().containsToken(Limit.OFFSET_NAME) || parsedResult.getRouteContext().getSqlBuilder().containsToken(Limit.COUNT_NAME)) {
for (SQLExecutionUnit each : sqlExecutionUnits) {
SQLBuilder sqlBuilder = each.getSqlBuilder();
sqlBuilder.buildSQL(Limit.OFFSET_NAME, String.valueOf(offset));
sqlBuilder.buildSQL(Limit.COUNT_NAME, String.valueOf(rowCount));
each.setSql(sqlBuilder.toSQL());
}
}
if (limit.getOffsetParameterIndex().isPresent()) {
parameters.set(limit.getOffsetParameterIndex().get(), offset);
}
if (limit.getRowCountParameterIndex().isPresent()) {
parameters.set(limit.getRowCountParameterIndex().get(), rowCount);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ Collection<SQLExecutionUnit> getSQLExecutionUnits(final SQLBuilder sqlBuilder) {
Collection<SQLExecutionUnit> result = new ArrayList<>();
for (CartesianTableReference each : routingTableReferences) {
each.buildSQL(sqlBuilder);
result.add(new SQLExecutionUnit(dataSource, sqlBuilder.toSQL()));
result.add(new SQLExecutionUnit(dataSource, sqlBuilder));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ Collection<SQLExecutionUnit> getSQLExecutionUnits(final SQLBuilder sqlBuilder) {
Collection<SQLExecutionUnit> result = new ArrayList<>();
for (SingleRoutingTableFactor each : routingTableFactors) {
each.buildSQL(sqlBuilder);
result.add(new SQLExecutionUnit(dataSource, sqlBuilder.toSQL()));
result.add(new SQLExecutionUnit(dataSource, sqlBuilder));
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import com.dangdang.ddframe.rdb.sharding.executor.fixture.TestDMLExecutionEventListener;
import com.dangdang.ddframe.rdb.sharding.executor.fixture.TestDQLExecutionEventListener;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.PreparedStatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.SQLBuilder;
import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.io.IOException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
Expand Down Expand Up @@ -351,6 +353,10 @@ private PreparedStatementExecutorWrapper createPreparedStatementExecutorWrapperF
}

private PreparedStatementExecutorWrapper createPreparedStatementExecutorWrapper(final PreparedStatement preparedStatement, final String dataSource, final String sql) {
return new PreparedStatementExecutorWrapper(preparedStatement, Collections.emptyList(), new SQLExecutionUnit(dataSource, sql));
try {
return new PreparedStatementExecutorWrapper(preparedStatement, Collections.emptyList(), new SQLExecutionUnit(dataSource, (SQLBuilder) new SQLBuilder().append(sql)));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@
import com.dangdang.ddframe.rdb.sharding.executor.fixture.TestDMLExecutionEventListener;
import com.dangdang.ddframe.rdb.sharding.executor.fixture.TestDQLExecutionEventListener;
import com.dangdang.ddframe.rdb.sharding.executor.wrapper.StatementExecutorWrapper;
import com.dangdang.ddframe.rdb.sharding.parser.result.router.SQLBuilder;
import com.dangdang.ddframe.rdb.sharding.router.SQLExecutionUnit;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import java.io.IOException;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
Expand Down Expand Up @@ -470,10 +472,18 @@ public void assertExecuteWithColumnNames() throws SQLException {
}

private StatementExecutorWrapper createStatementExecutorWrapperForDQL(final Statement statement, final String dataSource) {
return new StatementExecutorWrapper(statement, new SQLExecutionUnit(dataSource, SELECT_FROM_DUAL));
try {
return new StatementExecutorWrapper(statement, new SQLExecutionUnit(dataSource, (SQLBuilder) new SQLBuilder().append(SELECT_FROM_DUAL)));
} catch (final IOException e) {
throw new RuntimeException(e);
}
}

private StatementExecutorWrapper createStatementExecutorWrapperForDML(final Statement statement, final String dataSource) {
return new StatementExecutorWrapper(statement, new SQLExecutionUnit(dataSource, DELETE_FROM_DUAL));
try {
return new StatementExecutorWrapper(statement, new SQLExecutionUnit(dataSource, (SQLBuilder) new SQLBuilder().append(DELETE_FROM_DUAL)));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
Loading

0 comments on commit 2921a56

Please sign in to comment.