Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
resolve conflict in RouterUtil.java from MyCATApache  https://github.com/MyCATApache/Mycat-Server.git
  • Loading branch information
runfriends committed Dec 23, 2014
2 parents c6738fe + 556ef17 commit d4f9ddd
Show file tree
Hide file tree
Showing 15 changed files with 1,403 additions and 622 deletions.
2 changes: 1 addition & 1 deletion src/main/java/org/opencloudb/config/Versions.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,6 @@ public interface Versions {
byte PROTOCOL_VERSION = 10;

/** 服务器版本 */
byte[] SERVER_VERSION = "5.5-mycat-1.3-alpha".getBytes();
byte[] SERVER_VERSION = "5.5.8-mycat-1.3".getBytes();

}
5 changes: 3 additions & 2 deletions src/main/java/org/opencloudb/parser/druid/DruidParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import java.sql.SQLNonTransientException;

import org.opencloudb.cache.LayerCachePool;
import org.opencloudb.config.model.SchemaConfig;
import org.opencloudb.route.RouteResultset;

Expand All @@ -21,7 +22,7 @@ public interface DruidParser {
* @param schema
* @param stmt
*/
public void parser(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, String originSql) throws SQLNonTransientException;
public void parser(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, String originSql,LayerCachePool cachePool) throws SQLNonTransientException;

/**
* statement方式解析
Expand All @@ -44,7 +45,7 @@ public interface DruidParser {
* @param stmt
* @throws SQLNonTransientException
*/
public void changeSql(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt) throws SQLNonTransientException;
public void changeSql(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt,LayerCachePool cachePool) throws SQLNonTransientException;
/**
* 获取解析到的信息
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Map;

import org.apache.log4j.Logger;
import org.opencloudb.cache.LayerCachePool;
import org.opencloudb.config.model.SchemaConfig;
import org.opencloudb.mpp.RangeValue;
import org.opencloudb.parser.druid.DruidParser;
Expand Down Expand Up @@ -49,7 +50,7 @@ public List<Condition> getConditions() {
* @param schema
* @param stmt
*/
public void parser(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, String originSql) throws SQLNonTransientException {
public void parser(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, String originSql,LayerCachePool cachePool) throws SQLNonTransientException {
ctx = new DruidShardingParseInfo();
//设置为原始sql,如果有需要改写sql的,可以通过修改SQLStatement中的属性,然后调用SQLStatement.toString()得到改写的sql
ctx.setSql(originSql);
Expand All @@ -59,7 +60,7 @@ public void parser(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt, S
statementParse(schema, rrs, stmt);

//改写sql:如insert语句主键自增长的可以
changeSql(schema, rrs, stmt);
changeSql(schema, rrs, stmt,cachePool);
}

/**
Expand All @@ -76,7 +77,7 @@ public void statementParse(SchemaConfig schema, RouteResultset rrs, SQLStatement
*/
@Override
public void changeSql(SchemaConfig schema, RouteResultset rrs,
SQLStatement stmt) throws SQLNonTransientException {
SQLStatement stmt,LayerCachePool cachePool) throws SQLNonTransientException {

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ public void statementParse(SchemaConfig schema, RouteResultset rrs, SQLStatement
MySqlDeleteStatement delete = (MySqlDeleteStatement)stmt;
String tableName = removeBackquote(delete.getTableName().getSimpleName().toUpperCase());
ctx.addTable(tableName);
ctx.setSql(stmt.toString());
}
}

124 changes: 112 additions & 12 deletions src/main/java/org/opencloudb/parser/druid/impl/DruidSelectParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
import java.util.LinkedHashMap;
import java.util.List;

import org.opencloudb.cache.LayerCachePool;
import org.opencloudb.config.ErrorCode;
import org.opencloudb.config.model.SchemaConfig;
import org.opencloudb.config.model.TableConfig;
import org.opencloudb.mpp.OrderCol;
import org.opencloudb.route.RouteResultset;
import org.opencloudb.route.util.RouterUtil;

import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLOrderingSpecification;
Expand Down Expand Up @@ -92,48 +96,144 @@ public void statementParse(SchemaConfig schema, RouteResultset rrs, SQLStatement
* 改写sql:需要加limit的加上
*/
@Override
public void changeSql(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt) throws SQLNonTransientException {
public void changeSql(SchemaConfig schema, RouteResultset rrs, SQLStatement stmt,LayerCachePool cachePool) throws SQLNonTransientException {
RouterUtil.tryRouteForTables(schema, ctx, rrs, true,cachePool);
if(rrs == null) {
String msg = " find no Route:" + ctx.getSql();
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
rrs.setFinishedRoute(true);

// if(!isNeedChangeLimit(rrs,schema)){
// return;
// }

SQLSelectStatement selectStmt = (SQLSelectStatement)stmt;
SQLSelectQuery sqlSelectQuery = selectStmt.getSelect().getQuery();
if(sqlSelectQuery instanceof MySqlSelectQueryBlock) {
MySqlSelectQueryBlock mysqlSelectQuery = (MySqlSelectQueryBlock)selectStmt.getSelect().getQuery();
int limitStart = 0;
int limitSize = schema.getDefaultMaxLimit();
if(isNeedAddLimit(schema, rrs, mysqlSelectQuery) ) {
boolean isNeedAddLimit = isNeedAddLimit(schema, rrs, mysqlSelectQuery);
if(isNeedAddLimit) {
Limit limit = new Limit();
limit.setRowCount(new SQLIntegerExpr(limitSize));
mysqlSelectQuery.setLimit(limit);
rrs.changeNodeSqlAfterAddLimit(stmt.toString());
}
Limit limit = mysqlSelectQuery.getLimit();
if(limit != null) {
SQLIntegerExpr offset = (SQLIntegerExpr)limit.getOffset();
SQLIntegerExpr count = (SQLIntegerExpr)limit.getRowCount();
if(offset != null) {
limitStart = offset.getNumber().intValue();
rrs.setLimitStart(limitStart);
}
if(count != null) {
limitSize = count.getNumber().intValue();
rrs.setLimitSize(limitSize);
}

if(isNeedChangeLimit(rrs, schema)) {
Limit changedLimit = new Limit();
changedLimit.setRowCount(new SQLIntegerExpr(limitStart + limitSize));

if(offset != null) {
if(limitStart < 0) {
String msg = "You have an error in your SQL syntax; check the manual that " +
"corresponds to your MySQL server version for the right syntax to use near '" + limitStart + "'";
throw new SQLNonTransientException(ErrorCode.ER_PARSE_ERROR + " - " + msg);
} else {
changedLimit.setOffset(new SQLIntegerExpr(0));
//TODO
}
}

mysqlSelectQuery.setLimit(changedLimit);
rrs.changeNodeSqlAfterAddLimit(stmt.toString());
// rrs.setSqlChanged(true);
}

Limit changedLimit = new Limit();
changedLimit.setRowCount(new SQLIntegerExpr(limitStart + limitSize));
changedLimit.setOffset(new SQLIntegerExpr(0));
mysqlSelectQuery.setLimit(changedLimit);

//设置改写后的sql
ctx.setSql(stmt.toString());

rrs.setLimitStart(limitStart);
rrs.setLimitSize(limitSize);
}

rrs.setCacheAble(isNeedCache(schema, rrs, mysqlSelectQuery));
}

}

private boolean isNeedChangeLimit(RouteResultset rrs, SchemaConfig schema) {
if(rrs.getNodes() == null) {
return false;
} else {
if(rrs.getNodes().length > 1) {
return true;
}
return false;

}
}

private boolean isNeedCache(SchemaConfig schema, RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery) {
TableConfig tc = schema.getTables().get(ctx.getTables().get(0));
if((ctx.getTables().size() == 1 && tc.isGlobalTable())
) {//|| (ctx.getTables().size() == 1) && tc.getRule() == null && tc.getDataNodes().size() == 1
return false;
} else {
//单表主键查询
if(ctx.getTables().size() == 1) {
String tableName = ctx.getTables().get(0);
String primaryKey = schema.getTables().get(tableName).getPrimaryKey();
// schema.getTables().get(ctx.getTables().get(0)).getParentKey() != null;
if(ctx.getTablesAndConditions().get(tableName) != null
&& ctx.getTablesAndConditions().get(tableName).get(primaryKey) != null
&& tc.getDataNodes().size() > 1) {//有主键条件
return false;
}
}
return true;
}
}

/**
* 单表且是全局表
* 单表且rule为空且nodeNodes只有一个
* @param schema
* @param rrs
* @param mysqlSelectQuery
* @return
*/
private boolean isNeedAddLimit(SchemaConfig schema, RouteResultset rrs, MySqlSelectQueryBlock mysqlSelectQuery) {
if (!rrs.hasPrimaryKeyToCache() && schema.getDefaultMaxLimit() != -1
&& mysqlSelectQuery.getLimit() == null && ctx.getTables().size() > 0) {
// ctx.getTablesAndConditions().get(key))

if(schema.getDefaultMaxLimit() == -1) {
return false;
} else if (mysqlSelectQuery.getLimit() != null) {//语句中已有limit
return false;
} else if(ctx.getTables().size() == 1) {
if(schema.getTables().get(ctx.getTables().get(0)).isGlobalTable()) {
return true;//TODO
}
String tableName = ctx.getTables().get(0);
String primaryKey = schema.getTables().get(tableName).getPrimaryKey();
// schema.getTables().get(ctx.getTables().get(0)).getParentKey() != null;
if(ctx.getTablesAndConditions().get(tableName) == null) {//无条件
return true;
}

if (ctx.getTablesAndConditions().get(tableName).get(primaryKey) != null) {//条件中带主键
return false;
}
return true;
} else if(rrs.hasPrimaryKeyToCache() && ctx.getTables().size() == 1){//只有一个表且条件中有主键,不需要limit了,因为主键只能查到一条记录
return false;
} else {//多表或无表
return false;
}
return false;

}

private String[] buildGroupByCols(List<SQLExpr> groupByItems) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@

import java.sql.SQLNonTransientException;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.opencloudb.config.model.SchemaConfig;
import org.opencloudb.config.model.TableConfig;
import org.opencloudb.mpp.ColumnRoutePair;
import org.opencloudb.route.RouteResultset;
import org.opencloudb.route.util.RouterUtil;

import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.SQLStatement;
import com.alibaba.druid.sql.ast.statement.SQLUpdateSetItem;
import com.alibaba.druid.sql.dialect.mysql.ast.statement.MySqlUpdateStatement;
Expand All @@ -17,27 +23,54 @@ public void statementParse(SchemaConfig schema, RouteResultset rrs, SQLStatement
String tableName = removeBackquote(update.getTableName().getSimpleName().toUpperCase());

List<SQLUpdateSetItem> updateSetItem = update.getItems();
TableConfig tc = schema.getTables().get(tableName);
String partitionColumn = tc.getPartitionColumn();
String joinKey = tc.getJoinKey();
if(schema.isNoSharding()) {//整个schema都不分库或者该表不拆分
RouterUtil.routeForTableMeta(rrs, schema, tableName, rrs.getStatement());
rrs.setFinishedRoute(true);
return;
}

if(tc.isGlobalTable() || (partitionColumn == null && joinKey == null)) {
RouterUtil.routeToMultiNode(false, rrs, schema.getAllDataNodes(), rrs.getStatement());
rrs.setFinishedRoute(true);
return;
}

if(updateSetItem != null && updateSetItem.size() > 0) {
String partitionColumn = schema.getTables().get(tableName).getPartitionColumn();
String joinKey = schema.getTables().get(tableName).getJoinKey();
boolean hasParent = (schema.getTables().get(tableName).getParentTC() != null);
for(SQLUpdateSetItem item : updateSetItem) {
String column = removeBackquote(item.getColumn().toString().toUpperCase());
if(partitionColumn.equals(column)) {
String msg = "partion key can't be updated: " + tableName + " -> " + partitionColumn;
if(partitionColumn != null && partitionColumn.equals(column)) {
String msg = "partion key can't be updated " + tableName + "->" + partitionColumn;
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
if(hasParent) {
if(column.equals(joinKey)) {
String msg = "parent relation column can't be updated " + tableName + " -> " + joinKey;
String msg = "parent relation column can't be updated " + tableName + "->" + joinKey;
LOGGER.warn(msg);
throw new SQLNonTransientException(msg);
}
rrs.setCacheAble(true);
}
}
}

// if(ctx.getTablesAndConditions().size() > 0) {
// Map<String, Set<ColumnRoutePair>> map = ctx.getTablesAndConditions().get(tableName);
// if(map != null) {
// for(Map.Entry<String, Set<ColumnRoutePair>> entry : map.entrySet()) {
// String column = entry.getKey();
// Set<ColumnRoutePair> value = entry.getValue();
// if(column.toUpperCase().equals(anObject))
// }
// }
//
// }
// System.out.println();

if(schema.getTables().get(tableName).isGlobalTable() && ctx.getTablesAndConditions().size() > 1) {
throw new SQLNonTransientException("global table not supported multi table related update "+ tableName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public class MyCATSequnceProcessor {
private static final Logger LOGGER = Logger.getLogger(MyCATSequnceProcessor.class);
private LinkedBlockingQueue<SessionSQLPair> seqSQLQueue = new LinkedBlockingQueue<SessionSQLPair>();
private ExecutorService sqlExecutor=Executors.newSingleThreadExecutor();
private volatile boolean running;
private volatile boolean running=true;

public MyCATSequnceProcessor() {
sqlExecutor.submit(new ExecuteThread());
Expand Down Expand Up @@ -68,7 +68,6 @@ private void outRawData(ServerConnection sc,String value) {

private void executeSeq(SessionSQLPair pair) {
try {

// @micmiu 扩展NodeToString实现自定义全局序列号
NodeToString strHandler = new ExtNodeToString4SEQ(MycatServer
.getInstance().getConfig().getSystem()
Expand Down
8 changes: 8 additions & 0 deletions src/main/java/org/opencloudb/route/RouteResultset.java
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ public boolean isCallStatement() {
public void setCallStatement(boolean callStatement) {
this.callStatement = callStatement;
}

public void changeNodeSqlAfterAddLimit(String sql) {
if (nodes != null) {
for (RouteResultsetNode node : nodes) {
node.setStatement(sql);
}
}
}

@Override
public String toString() {
Expand Down
16 changes: 10 additions & 6 deletions src/main/java/org/opencloudb/route/impl/AbstractRouteStrategy.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,17 @@ public RouteResultset route(SystemConfig sysConfig, SchemaConfig schema,int sqlT

// check if there is sharding in schema
if (schema.isNoSharding() && ServerParse.SHOW != sqlType) {
return RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
rrs = RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
// return RouterUtil.routeToSingleNode(rrs, schema.getDataNode(), stmt);
} else {
RouteResultset returnedSet=routeSystemInfo(schema, sqlType, stmt, rrs);
if(returnedSet==null){
rrs = routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool);
// return routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool);
}
}
RouteResultset returnedSet=routeSystemInfo(schema, sqlType, stmt, rrs);
if(returnedSet==null){
return routeNormalSqlWithAST(schema, stmt, rrs, charset, cachePool);
}
return returnedSet;

return rrs;
}

/**
Expand Down
Loading

0 comments on commit d4f9ddd

Please sign in to comment.