diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md index b429ec857fad2..03cad09615106 100644 --- a/RELEASE-NOTES.md +++ b/RELEASE-NOTES.md @@ -36,6 +36,7 @@ 1. [ISSUE #806](https://github.com/sharding-sphere/sharding-sphere/issues/806) SQL parse error with `NOT IN` 1. [ISSUE #827](https://github.com/sharding-sphere/sharding-sphere/issues/827) Fix endless loop for bad SQL like `SELECT * FROM table WHERE id IN ()` 1. [ISSUE #919](https://github.com/sharding-sphere/sharding-sphere/issues/919) Use groovy to parse inline expression may cause memory leak +1. [ISSUE #1011](https://github.com/sharding-sphere/sharding-sphere/issues/1011) Can't resolve placeholder in spring boot configuration yaml 1. [ISSUE #1015](https://github.com/sharding-sphere/sharding-sphere/issues/1015) Support the statement of `SELECT id, COUNT(*) FROM table GROUP BY 1,2` ## 2.0.3 diff --git a/RELEASE-NOTES_ZH.md b/RELEASE-NOTES_ZH.md index e83cee0f988bc..0544a38674a37 100644 --- a/RELEASE-NOTES_ZH.md +++ b/RELEASE-NOTES_ZH.md @@ -36,6 +36,7 @@ 1. [ISSUE #806](https://github.com/sharding-sphere/sharding-sphere/issues/806) `NOT IN`解析异常 1. [ISSUE #827](https://github.com/sharding-sphere/sharding-sphere/issues/827) 将`SELECT * FROM table WHERE id IN ()`这种SQL跳出死循环 1. [ISSUE #919](https://github.com/sharding-sphere/sharding-sphere/issues/919) 使用Groovy解析行表达式可能导致内存泄漏 +1. [ISSUE #1011](https://github.com/sharding-sphere/sharding-sphere/issues/1011) 无法在Spring Boot的yaml中处理占位符 1. [ISSUE #1015](https://github.com/sharding-sphere/sharding-sphere/issues/1015) 支持使用`SELECT id, COUNT(*) FROM table GROUP BY 1,2` ## 2.0.3 diff --git a/sharding-core/src/test/java/io/shardingsphere/core/rewrite/SQLRewriteEngineTest.java b/sharding-core/src/test/java/io/shardingsphere/core/rewrite/SQLRewriteEngineTest.java index 90508d14cc63d..ae7e045ba07cc 100644 --- a/sharding-core/src/test/java/io/shardingsphere/core/rewrite/SQLRewriteEngineTest.java +++ b/sharding-core/src/test/java/io/shardingsphere/core/rewrite/SQLRewriteEngineTest.java @@ -121,12 +121,12 @@ public void assertRewriteForTableName() { public void assertRewriteForOrderByAndGroupByDerivedColumns() { selectStatement.getSqlTokens().add(new TableToken(18, 0, "table_x")); ItemsToken itemsToken = new ItemsToken(12); - itemsToken.getItems().addAll(Arrays.asList("x.id as ORDER_BY_DERIVED_0", "x.name as GROUP_BY_DERIVED_0")); + itemsToken.getItems().addAll(Arrays.asList("x.id as GROUP_BY_DERIVED_0", "x.name as ORDER_BY_DERIVED_0")); selectStatement.getSqlTokens().add(itemsToken); SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, "SELECT x.age FROM table_x x GROUP BY x.id ORDER BY x.name", DatabaseType.MySQL, selectStatement, null, Collections.emptyList()); assertThat(rewriteEngine.rewrite(true).toSQL(null, tableTokens, null, shardingDataSourceMetaData).getSql(), is( - "SELECT x.age, x.id as ORDER_BY_DERIVED_0, x.name as GROUP_BY_DERIVED_0 FROM table_1 x GROUP BY x.id ORDER BY x.name")); + "SELECT x.age, x.id as GROUP_BY_DERIVED_0, x.name as ORDER_BY_DERIVED_0 FROM table_1 x GROUP BY x.id ORDER BY x.name")); } @Test @@ -153,9 +153,9 @@ public void assertRewriteForAutoGeneratedKeyColumn() { insertStatement.getSqlTokens().add(itemsToken); insertStatement.getSqlTokens().add(new InsertValuesToken(39, "table_x")); InsertShardingCondition shardingCondition = new InsertShardingCondition("(?, ?, ?)", parameters); - shardingCondition.getDataNodes().add(new DataNode("db0.table_x")); + shardingCondition.getDataNodes().add(new DataNode("db0.table_1")); TableUnit tableUnit = new TableUnit("db0"); - tableUnit.getRoutingTables().add(new RoutingTable("table_x", "table_x")); + tableUnit.getRoutingTables().add(new RoutingTable("table_x", "table_1")); SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, "INSERT INTO table_x (name, age) VALUES (?, ?)", DatabaseType.MySQL, insertStatement, new ShardingConditions(Collections.singletonList(shardingCondition)), parameters); assertThat(rewriteEngine.rewrite(true).toSQL(tableUnit, tableTokens, null, shardingDataSourceMetaData).getSql(), is("INSERT INTO table_1 (name, age, id) VALUES (?, ?, ?)")); @@ -178,9 +178,9 @@ public void assertRewriteForAutoGeneratedKeyColumnWithoutColumnsWithParameter() insertStatement.getSqlTokens().add(new InsertColumnToken(21, ")")); insertStatement.getSqlTokens().add(new InsertValuesToken(29, "table_x")); InsertShardingCondition shardingCondition = new InsertShardingCondition("(?, ?)", parameters); - shardingCondition.getDataNodes().add(new DataNode("db0.table_x")); + shardingCondition.getDataNodes().add(new DataNode("db0.table_1")); TableUnit tableUnit = new TableUnit("db0"); - tableUnit.getRoutingTables().add(new RoutingTable("table_x", "table_x")); + tableUnit.getRoutingTables().add(new RoutingTable("table_x", "table_1")); SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, "INSERT INTO `table_x` VALUES (?)", DatabaseType.MySQL, insertStatement, new ShardingConditions(Collections.singletonList(shardingCondition)), parameters); assertThat(rewriteEngine.rewrite(true).toSQL(tableUnit, tableTokens, null, shardingDataSourceMetaData).getSql(), is("INSERT INTO table_1(name, id) VALUES (?, ?)")); @@ -200,9 +200,9 @@ public void assertRewriteForAutoGeneratedKeyColumnWithoutColumnsWithoutParameter insertStatement.getSqlTokens().add(new InsertColumnToken(21, ")")); insertStatement.getSqlTokens().add(new InsertValuesToken(29, "table_x")); InsertShardingCondition shardingCondition = new InsertShardingCondition("(10, 1)", Collections.emptyList()); - shardingCondition.getDataNodes().add(new DataNode("db0.table_x")); + shardingCondition.getDataNodes().add(new DataNode("db0.table_1")); TableUnit tableUnit = new TableUnit("db0"); - tableUnit.getRoutingTables().add(new RoutingTable("table_x", "table_x")); + tableUnit.getRoutingTables().add(new RoutingTable("table_x", "table_1")); SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, "INSERT INTO `table_x` VALUES (10)", DatabaseType.MySQL, insertStatement, new ShardingConditions(Collections.singletonList(shardingCondition)), Collections.emptyList()); assertThat(rewriteEngine.rewrite(true).toSQL(tableUnit, tableTokens, null, shardingDataSourceMetaData).getSql(), is("INSERT INTO table_1(name, id) VALUES (10, 1)")); @@ -225,9 +225,9 @@ public void assertRewriteColumnWithoutColumnsWithoutParameter() { insertStatement.getSqlTokens().add(new InsertColumnToken(21, ")")); insertStatement.getSqlTokens().add(new InsertValuesToken(29, "table_x")); InsertShardingCondition shardingCondition = new InsertShardingCondition("(10, 1)", parameters); - shardingCondition.getDataNodes().add(new DataNode("db0.table_x")); + shardingCondition.getDataNodes().add(new DataNode("db0.table_1")); TableUnit tableUnit = new TableUnit("db0"); - tableUnit.getRoutingTables().add(new RoutingTable("table_x", "table_x")); + tableUnit.getRoutingTables().add(new RoutingTable("table_x", "table_1")); SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, "INSERT INTO `table_x` VALUES (10, 1)", DatabaseType.MySQL, insertStatement, new ShardingConditions(Collections.singletonList(shardingCondition)), parameters); assertThat(rewriteEngine.rewrite(true).toSQL(tableUnit, tableTokens, null, shardingDataSourceMetaData).getSql(), is("INSERT INTO table_1(name, id) VALUES (10, 1)")); @@ -250,9 +250,9 @@ public void assertRewriteColumnWithoutColumnsWithParameter() { insertStatement.getSqlTokens().add(new InsertColumnToken(21, ")")); insertStatement.getSqlTokens().add(new InsertValuesToken(29, "table_x")); InsertShardingCondition shardingCondition = new InsertShardingCondition("(?, ?)", parameters); - shardingCondition.getDataNodes().add(new DataNode("db0.table_x")); + shardingCondition.getDataNodes().add(new DataNode("db0.table_1")); TableUnit tableUnit = new TableUnit("db0"); - tableUnit.getRoutingTables().add(new RoutingTable("table_x", "table_x")); + tableUnit.getRoutingTables().add(new RoutingTable("table_x", "table_1")); SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, "INSERT INTO `table_x` VALUES (?, ?)", DatabaseType.MySQL, insertStatement, new ShardingConditions(Collections.singletonList(shardingCondition)), parameters); assertThat(rewriteEngine.rewrite(true).toSQL(tableUnit, tableTokens, null, shardingDataSourceMetaData).getSql(), is("INSERT INTO table_1(name, id) VALUES (?, ?)")); diff --git a/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/main/java/io/shardingsphere/jdbc/orchestration/spring/boot/util/PropertyUtil.java b/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/main/java/io/shardingsphere/jdbc/orchestration/spring/boot/util/PropertyUtil.java index 027944be83646..adc74df4ca0e4 100644 --- a/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/main/java/io/shardingsphere/jdbc/orchestration/spring/boot/util/PropertyUtil.java +++ b/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/main/java/io/shardingsphere/jdbc/orchestration/spring/boot/util/PropertyUtil.java @@ -20,7 +20,11 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.springframework.beans.factory.config.PlaceholderConfigurerSupport; import org.springframework.core.env.Environment; import org.springframework.core.env.PropertyResolver; @@ -55,7 +59,8 @@ public static T handle(final Environment environment, final String prefix, f return (T) v2(environment, prefix, targetClass); } } - + + @SuppressWarnings("unchecked") private static Object v1(final Environment environment, final String prefix) { try { Class resolverClass = Class.forName("org.springframework.boot.bind.RelaxedPropertyResolver"); @@ -63,7 +68,21 @@ private static Object v1(final Environment environment, final String prefix) { Method getSubPropertiesMethod = resolverClass.getDeclaredMethod("getSubProperties", String.class); Object resolverObject = resolverConstructor.newInstance(environment); String prefixParam = prefix.endsWith(".") ? prefix : prefix + "."; - return getSubPropertiesMethod.invoke(resolverObject, prefixParam); + Method getPropertyMethod = resolverClass.getDeclaredMethod("getProperty", String.class); + Map dataSourceProps = (Map) getSubPropertiesMethod.invoke(resolverObject, prefixParam); + Map propertiesWithPlaceholderResolved = new HashMap<>(); + for (Map.Entry entry : dataSourceProps.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + if (value instanceof String && ((String) value).contains( + PlaceholderConfigurerSupport.DEFAULT_PLACEHOLDER_PREFIX)) { + String resolvedValue = (String) getPropertyMethod.invoke(resolverObject, prefixParam + key); + propertiesWithPlaceholderResolved.put(key, resolvedValue); + } else { + propertiesWithPlaceholderResolved.put(key, value); + } + } + return Collections.unmodifiableMap(propertiesWithPlaceholderResolved); } catch (final ClassNotFoundException | NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) { throw new ShardingException(ex.getMessage(), ex); diff --git a/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/OrchestrationSpringBootMasterSlaveTest.java b/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/OrchestrationSpringBootMasterSlaveTest.java index 9336dccd101f7..162c99c842be9 100644 --- a/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/OrchestrationSpringBootMasterSlaveTest.java +++ b/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/OrchestrationSpringBootMasterSlaveTest.java @@ -57,9 +57,12 @@ public void assertWithMasterSlaveDataSource() { assertTrue(dataSource instanceof MasterSlaveDataSource); for (DataSource each : ((MasterSlaveDataSource) dataSource).getAllDataSources().values()) { assertThat(((BasicDataSource) each).getMaxTotal(), is(16)); + assertThat(((BasicDataSource) each).getUsername(), is("root")); } Map configMap = new ConcurrentHashMap<>(); configMap.put("key1", "value1"); + configMap.put("key2", "value1"); + configMap.put("username", "root"); assertThat(ConfigMapContext.getInstance().getMasterSlaveConfig(), is(configMap)); } } diff --git a/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/test/resources/application-masterslave.properties b/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/test/resources/application-masterslave.properties index 9aeda78a5f4fe..449f2020af43d 100644 --- a/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/test/resources/application-masterslave.properties +++ b/sharding-jdbc-orchestration-spring/sharding-jdbc-orchestration-spring-boot-starter/src/test/resources/application-masterslave.properties @@ -10,7 +10,7 @@ sharding.jdbc.datasource.ds_master.max-total=16 sharding.jdbc.datasource.ds_slave_0.type=org.apache.commons.dbcp2.BasicDataSource sharding.jdbc.datasource.ds_slave_0.driver-class-name=org.h2.Driver sharding.jdbc.datasource.ds_slave_0.url=jdbc:h2:mem:demo_ds_slave_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MYSQL -sharding.jdbc.datasource.ds_slave_0.username=root +sharding.jdbc.datasource.ds_slave_0.username=${sharding.jdbc.config.masterslave.config-map.username} sharding.jdbc.datasource.ds_slave_0.password= sharding.jdbc.datasource.ds_slave_0.max-total=16 @@ -32,3 +32,5 @@ sharding.jdbc.config.orchestration.zookeeper.namespace=orchestration-spring-boot sharding.jdbc.config.orchestration.zookeeper.server-lists=localhost:3181 sharding.jdbc.config.masterslave.config-map.key1=value1 +sharding.jdbc.config.masterslave.config-map.key2=${sharding.jdbc.config.masterslave.config-map.key1} +sharding.jdbc.config.masterslave.config-map.username=root diff --git a/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/main/java/io/shardingsphere/jdbc/spring/boot/util/PropertyUtil.java b/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/main/java/io/shardingsphere/jdbc/spring/boot/util/PropertyUtil.java index a1be6a2192fa8..e6943dc5cb889 100644 --- a/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/main/java/io/shardingsphere/jdbc/spring/boot/util/PropertyUtil.java +++ b/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/main/java/io/shardingsphere/jdbc/spring/boot/util/PropertyUtil.java @@ -20,7 +20,11 @@ import java.lang.reflect.Constructor; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import org.springframework.beans.factory.config.PlaceholderConfigurerSupport; import org.springframework.core.env.Environment; import org.springframework.core.env.PropertyResolver; @@ -55,7 +59,8 @@ public static T handle(final Environment environment, final String prefix, f return (T) v2(environment, prefix, targetClass); } } - + + @SuppressWarnings("unchecked") private static Object v1(final Environment environment, final String prefix) { try { Class resolverClass = Class.forName("org.springframework.boot.bind.RelaxedPropertyResolver"); @@ -63,7 +68,21 @@ private static Object v1(final Environment environment, final String prefix) { Method getSubPropertiesMethod = resolverClass.getDeclaredMethod("getSubProperties", String.class); Object resolverObject = resolverConstructor.newInstance(environment); String prefixParam = prefix.endsWith(".") ? prefix : prefix + "."; - return getSubPropertiesMethod.invoke(resolverObject, prefixParam); + Method getPropertyMethod = resolverClass.getDeclaredMethod("getProperty", String.class); + Map dataSourceProps = (Map) getSubPropertiesMethod.invoke(resolverObject, prefixParam); + Map propertiesWithPlaceholderResolved = new HashMap<>(); + for (Map.Entry entry : dataSourceProps.entrySet()) { + String key = entry.getKey(); + Object value = entry.getValue(); + if (value instanceof String && ((String) value).contains( + PlaceholderConfigurerSupport.DEFAULT_PLACEHOLDER_PREFIX)) { + String resolvedValue = (String) getPropertyMethod.invoke(resolverObject, prefixParam + key); + propertiesWithPlaceholderResolved.put(key, resolvedValue); + } else { + propertiesWithPlaceholderResolved.put(key, value); + } + } + return Collections.unmodifiableMap(propertiesWithPlaceholderResolved); } catch (final ClassNotFoundException | NoSuchMethodException | SecurityException | InstantiationException | IllegalAccessException | IllegalArgumentException | InvocationTargetException ex) { throw new ShardingException(ex.getMessage(), ex); diff --git a/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/SpringBootMasterSlaveTest.java b/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/SpringBootMasterSlaveTest.java index 9d5cf801674f6..69b0b8a513a6e 100644 --- a/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/SpringBootMasterSlaveTest.java +++ b/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/test/java/io/shardingsphere/jdbc/spring/boot/type/SpringBootMasterSlaveTest.java @@ -53,6 +53,8 @@ public void assertWithMasterSlaveDataSource() { } Map configMap = new ConcurrentHashMap<>(); configMap.put("key1", "value1"); + configMap.put("key2", "value1"); + configMap.put("username", "root"); assertThat(ConfigMapContext.getInstance().getMasterSlaveConfig(), is(configMap)); } } diff --git a/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/test/resources/application-masterslave.properties b/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/test/resources/application-masterslave.properties index 0ad9fee5973a0..38673fdbfd2a3 100644 --- a/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/test/resources/application-masterslave.properties +++ b/sharding-jdbc-spring/sharding-jdbc-spring-boot-starter/src/test/resources/application-masterslave.properties @@ -10,7 +10,7 @@ sharding.jdbc.datasource.ds_master.max-total=16 sharding.jdbc.datasource.ds_slave_0.type=org.apache.commons.dbcp2.BasicDataSource sharding.jdbc.datasource.ds_slave_0.driver-class-name=org.h2.Driver sharding.jdbc.datasource.ds_slave_0.url=jdbc:h2:mem:demo_ds_slave_0;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MYSQL -sharding.jdbc.datasource.ds_slave_0.username=root +sharding.jdbc.datasource.ds_slave_0.username=${sharding.jdbc.config.masterslave.config-map.username} sharding.jdbc.datasource.ds_slave_0.password= sharding.jdbc.datasource.ds_slave_0.max-total=16 @@ -26,3 +26,5 @@ sharding.jdbc.config.masterslave.master-data-source-name=ds_master sharding.jdbc.config.masterslave.slave-data-source-names=ds_slave_0,ds_slave_1 sharding.jdbc.config.masterslave.config-map.key1=value1 +sharding.jdbc.config.masterslave.config-map.key2=${sharding.jdbc.config.masterslave.config-map.key1} +sharding.jdbc.config.masterslave.config-map.username=root diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/ConnectionManager.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/ConnectionManager.java index 09f643b4b0f52..cc161664a18d9 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/ConnectionManager.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/ConnectionManager.java @@ -20,18 +20,14 @@ import io.shardingsphere.core.routing.router.masterslave.MasterVisitedManager; import io.shardingsphere.proxy.backend.common.ProxyMode; import io.shardingsphere.proxy.config.RuleRegistry; -import lombok.Getter; import javax.sql.DataSource; import java.sql.Connection; -import java.sql.ResultSet; import java.sql.SQLException; import java.util.Collection; import java.util.HashMap; import java.util.LinkedList; -import java.util.List; import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; /** * Connection manager. @@ -44,9 +40,6 @@ public final class ConnectionManager implements AutoCloseable { private final Collection cachedConnections = new LinkedList<>(); - @Getter - private final List resultSets = new CopyOnWriteArrayList<>(); - /** * Get connection of current thread datasource. * diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCBackendHandler.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCBackendHandler.java index 26a004b732b93..c25ec07a74fb1 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCBackendHandler.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCBackendHandler.java @@ -17,6 +17,7 @@ package io.shardingsphere.proxy.backend.common.jdbc; +import com.google.common.collect.Lists; import io.netty.channel.EventLoopGroup; import io.shardingsphere.core.constant.SQLType; import io.shardingsphere.core.constant.TransactionType; @@ -35,6 +36,7 @@ import io.shardingsphere.proxy.config.RuleRegistry; import io.shardingsphere.proxy.metadata.ProxyShardingRefreshHandler; import io.shardingsphere.proxy.transport.common.packet.DatabaseProtocolPacket; +import io.shardingsphere.proxy.transport.mysql.constant.ColumnType; import io.shardingsphere.proxy.transport.mysql.constant.ServerErrorCode; import io.shardingsphere.proxy.transport.mysql.packet.command.CommandResponsePackets; import io.shardingsphere.proxy.transport.mysql.packet.command.text.query.FieldCountPacket; @@ -44,7 +46,6 @@ import io.shardingsphere.proxy.util.ExecutorContext; import io.shardingsphere.transaction.xa.AtomikosUserTransaction; import lombok.Getter; -import lombok.Setter; import javax.transaction.Status; import javax.transaction.SystemException; @@ -52,11 +53,12 @@ import java.sql.SQLException; import java.sql.Statement; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; +import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; @@ -70,32 +72,34 @@ public abstract class JDBCBackendHandler implements BackendHandler { private final String sql; + private final RuleRegistry ruleRegistry; + + private final EventLoopGroup userGroup; + + private final ConnectionManager connectionManager; + + private final List queryResults; + private MergedResult mergedResult; private int currentSequenceId; - @Setter private int columnCount; + private List columnTypes; + private boolean isMerged; private boolean hasMoreResultValueFlag; - private final List queryResults = new CopyOnWriteArrayList<>(); - - private final RuleRegistry ruleRegistry; - - private final EventLoopGroup userGroup; - - private final ConnectionManager connectionManager; - public JDBCBackendHandler(final String sql) { this.sql = sql; - isMerged = false; - hasMoreResultValueFlag = true; ruleRegistry = RuleRegistry.getInstance(); userGroup = ExecutorContext.getInstance().getUserGroup(); connectionManager = new ConnectionManager(); + queryResults = new LinkedList<>(); + isMerged = false; + hasMoreResultValueFlag = true; } @Override @@ -119,13 +123,11 @@ private CommandResponsePackets execute(final SQLRouteResult routeResult) throws return new CommandResponsePackets(new ErrPacket(1, ServerErrorCode.ER_ERROR_ON_MODIFYING_GTID_EXECUTED_TABLE, sqlStatement.getTables().isSingleTable() ? sqlStatement.getTables().getSingleTableName() : "unknown_table")); } - List> futureList = new ArrayList<>(1024); - for (SQLExecutionUnit each : routeResult.getExecutionUnits()) { - String actualSQL = each.getSqlUnit().getSql(); - Statement statement = createStatement(connectionManager.getConnection(each.getDataSource()), actualSQL, isReturnGeneratedKeys); - futureList.add(userGroup.submit(createExecuteWorker(statement, isReturnGeneratedKeys, actualSQL))); - } - List packets = buildCommandResponsePackets(futureList); + Iterator sqlExecutionUnits = routeResult.getExecutionUnits().iterator(); + SQLExecutionUnit firstSQLExecutionUnit = sqlExecutionUnits.next(); + List> futureList = asyncExecute(isReturnGeneratedKeys, Lists.newArrayList(sqlExecutionUnits)); + JDBCExecuteResponse firstJDBCExecuteResponse = syncExecute(isReturnGeneratedKeys, firstSQLExecutionUnit); + List packets = buildCommandResponsePackets(firstJDBCExecuteResponse, futureList); CommandResponsePackets result = merge(sqlStatement, packets); if (!ruleRegistry.isMasterSlaveOnly()) { ProxyShardingRefreshHandler.build(routeResult).execute(); @@ -140,13 +142,41 @@ private boolean isUnsupportedXA(final SQLType sqlType) throws SystemException { protected abstract Statement createStatement(Connection connection, String actualSQL, boolean isReturnGeneratedKeys) throws SQLException; - protected abstract Callable createExecuteWorker(Statement statement, boolean isReturnGeneratedKeys, String actualSQL); + protected abstract JDBCExecuteWorker createExecuteWorker(Statement statement, boolean isReturnGeneratedKeys, String actualSQL); + + private List> asyncExecute(final boolean isReturnGeneratedKeys, final Collection sqlExecutionUnits) throws SQLException { + List> result = new LinkedList<>(); + for (SQLExecutionUnit each : sqlExecutionUnits) { + final String actualSQL = each.getSqlUnit().getSql(); + final Statement statement = createStatement(connectionManager.getConnection(each.getDataSource()), actualSQL, isReturnGeneratedKeys); + result.add(userGroup.submit(new Callable() { + + @Override + public JDBCExecuteResponse call() { + return createExecuteWorker(statement, isReturnGeneratedKeys, actualSQL).execute(); + } + })); + } + return result; + } + + private JDBCExecuteResponse syncExecute(final boolean isReturnGeneratedKeys, final SQLExecutionUnit sqlExecutionUnit) throws SQLException { + String actualSQL = sqlExecutionUnit.getSqlUnit().getSql(); + Statement statement = createStatement(connectionManager.getConnection(sqlExecutionUnit.getDataSource()), actualSQL, isReturnGeneratedKeys); + return createExecuteWorker(statement, isReturnGeneratedKeys, actualSQL).execute(); + } - private List buildCommandResponsePackets(final List> futureList) { - List result = new ArrayList<>(); - for (Future each : futureList) { + private List buildCommandResponsePackets(final JDBCExecuteResponse firstJDBCExecuteResponse, final List> futureList) { + List result = new ArrayList<>(futureList.size() + 1); + result.add(firstJDBCExecuteResponse.getCommandResponsePackets()); + columnCount = firstJDBCExecuteResponse.getColumnCount(); + columnTypes = firstJDBCExecuteResponse.getColumnTypes(); + queryResults.add(firstJDBCExecuteResponse.getQueryResult()); + for (Future each : futureList) { try { - result.add(each.get()); + JDBCExecuteResponse executeResponse = each.get(); + result.add(executeResponse.getCommandResponsePackets()); + queryResults.add(executeResponse.getQueryResult()); } catch (final InterruptedException | ExecutionException ex) { throw new ShardingException(ex.getMessage(), ex); } @@ -244,11 +274,11 @@ public final DatabaseProtocolPacket getResultValue() { for (int i = 1; i <= columnCount; i++) { data.add(mergedResult.getValue(i, Object.class)); } - return newDatabaseProtocolPacket(++currentSequenceId, data); + return newDatabaseProtocolPacket(++currentSequenceId, data, columnTypes); } catch (final SQLException ex) { return new ErrPacket(1, ex); } } - protected abstract DatabaseProtocolPacket newDatabaseProtocolPacket(int sequenceId, List data); + protected abstract DatabaseProtocolPacket newDatabaseProtocolPacket(int sequenceId, List data, List columnTypes); } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCExecuteResponse.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCExecuteResponse.java new file mode 100644 index 0000000000000..3e627c784c784 --- /dev/null +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCExecuteResponse.java @@ -0,0 +1,71 @@ +/* + * Copyright 2016-2018 shardingsphere.io. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + *

+ */ + +package io.shardingsphere.proxy.backend.common.jdbc; + +import io.shardingsphere.core.merger.QueryResult; +import io.shardingsphere.proxy.transport.common.packet.DatabaseProtocolPacket; +import io.shardingsphere.proxy.transport.mysql.constant.ColumnType; +import io.shardingsphere.proxy.transport.mysql.packet.command.CommandResponsePackets; +import io.shardingsphere.proxy.transport.mysql.packet.command.text.query.ColumnDefinition41Packet; +import lombok.Getter; +import lombok.RequiredArgsConstructor; + +import java.util.ArrayList; +import java.util.List; + +/** + * JDBC execute response. + * + * @author zhangliang + */ +@RequiredArgsConstructor +@Getter +public final class JDBCExecuteResponse { + + private final CommandResponsePackets commandResponsePackets; + + private final QueryResult queryResult; + + JDBCExecuteResponse(final CommandResponsePackets commandResponsePackets) { + this(commandResponsePackets, null); + } + + /** + * Get column types. + * + * @return column types + */ + public List getColumnTypes() { + List result = new ArrayList<>(commandResponsePackets.getDatabaseProtocolPackets().size()); + for (DatabaseProtocolPacket each : commandResponsePackets.getDatabaseProtocolPackets()) { + if (each instanceof ColumnDefinition41Packet) { + result.add(((ColumnDefinition41Packet) each).getColumnType()); + } + } + return result; + } + + /** + * Get column count. + * + * @return column count + */ + public int getColumnCount() { + return 1 == commandResponsePackets.getDatabaseProtocolPackets().size() ? 0 : commandResponsePackets.getDatabaseProtocolPackets().size() - 2; + } +} diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCExecuteWorker.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCExecuteWorker.java index 90358c3135f72..f089ea38d0193 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCExecuteWorker.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/JDBCExecuteWorker.java @@ -21,21 +21,18 @@ import io.shardingsphere.proxy.backend.common.jdbc.resultset.MemoryQueryResult; import io.shardingsphere.proxy.backend.common.jdbc.resultset.StreamQueryResult; import io.shardingsphere.proxy.config.RuleRegistry; -import io.shardingsphere.proxy.transport.mysql.constant.ColumnType; import io.shardingsphere.proxy.transport.mysql.packet.command.CommandResponsePackets; import io.shardingsphere.proxy.transport.mysql.packet.command.text.query.ColumnDefinition41Packet; import io.shardingsphere.proxy.transport.mysql.packet.command.text.query.FieldCountPacket; import io.shardingsphere.proxy.transport.mysql.packet.generic.EofPacket; import io.shardingsphere.proxy.transport.mysql.packet.generic.ErrPacket; import io.shardingsphere.proxy.transport.mysql.packet.generic.OKPacket; -import lombok.Getter; import lombok.RequiredArgsConstructor; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; -import java.util.concurrent.Callable; /** * Execute worker via JDBC to connect databases. @@ -44,7 +41,7 @@ * @author zhangliang */ @RequiredArgsConstructor -public abstract class JDBCExecuteWorker implements Callable { +public abstract class JDBCExecuteWorker { private static final Integer FETCH_ONE_ROW_A_TIME = Integer.MIN_VALUE; @@ -52,54 +49,45 @@ public abstract class JDBCExecuteWorker implements Callable columnTypes; - private final RuleRegistry ruleRegistry; public JDBCStatementBackendHandler(final List preparedStatementParameters, final int statementId, final DatabaseType databaseType) { super(PreparedStatementRegistry.getInstance().getSQL(statementId)); this.preparedStatementParameters = preparedStatementParameters; this.databaseType = databaseType; - columnTypes = new CopyOnWriteArrayList<>(); ruleRegistry = RuleRegistry.getInstance(); } @@ -80,8 +72,8 @@ private List getComStmtExecuteParameters() { } @Override - protected Callable createExecuteWorker(final Statement statement, final boolean isReturnGeneratedKeys, final String actualSQL) { - return new JDBCStatementExecuteWorker((PreparedStatement) statement, isReturnGeneratedKeys, this); + protected JDBCStatementExecuteWorker createExecuteWorker(final Statement statement, final boolean isReturnGeneratedKeys, final String actualSQL) { + return new JDBCStatementExecuteWorker((PreparedStatement) statement, isReturnGeneratedKeys); } @Override @@ -94,7 +86,7 @@ protected PreparedStatement createStatement(final Connection connection, final S } @Override - protected DatabaseProtocolPacket newDatabaseProtocolPacket(final int sequenceId, final List data) { + protected DatabaseProtocolPacket newDatabaseProtocolPacket(final int sequenceId, final List data, final List columnTypes) { return new BinaryResultSetRowPacket(sequenceId, getColumnCount(), data, columnTypes); } } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/statement/JDBCStatementExecuteWorker.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/statement/JDBCStatementExecuteWorker.java index 9f2e212ebc04d..6993c96cc6b13 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/statement/JDBCStatementExecuteWorker.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/statement/JDBCStatementExecuteWorker.java @@ -18,7 +18,6 @@ package io.shardingsphere.proxy.backend.common.jdbc.statement; import io.shardingsphere.proxy.backend.common.jdbc.JDBCExecuteWorker; -import io.shardingsphere.proxy.transport.mysql.constant.ColumnType; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -34,8 +33,8 @@ public final class JDBCStatementExecuteWorker extends JDBCExecuteWorker { private final PreparedStatement preparedStatement; - public JDBCStatementExecuteWorker(final PreparedStatement preparedStatement, final boolean isReturnGeneratedKeys, final JDBCStatementBackendHandler jdbcStatementBackendHandler) { - super(preparedStatement, isReturnGeneratedKeys, jdbcStatementBackendHandler); + public JDBCStatementExecuteWorker(final PreparedStatement preparedStatement, final boolean isReturnGeneratedKeys) { + super(preparedStatement, isReturnGeneratedKeys); this.preparedStatement = preparedStatement; } @@ -43,9 +42,4 @@ public JDBCStatementExecuteWorker(final PreparedStatement preparedStatement, fin protected boolean executeSQL(final boolean isReturnGeneratedKeys) throws SQLException { return preparedStatement.execute(); } - - @Override - protected void setColumnType(final ColumnType columnType) { - ((JDBCStatementBackendHandler) getJdbcBackendHandler()).getColumnTypes().add(columnType); - } } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/text/JDBCTextBackendHandler.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/text/JDBCTextBackendHandler.java index 6c38df66fb624..c7c1422d75f06 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/text/JDBCTextBackendHandler.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/text/JDBCTextBackendHandler.java @@ -23,14 +23,13 @@ import io.shardingsphere.proxy.backend.common.jdbc.JDBCBackendHandler; import io.shardingsphere.proxy.config.RuleRegistry; import io.shardingsphere.proxy.transport.common.packet.DatabaseProtocolPacket; -import io.shardingsphere.proxy.transport.mysql.packet.command.CommandResponsePackets; +import io.shardingsphere.proxy.transport.mysql.constant.ColumnType; import io.shardingsphere.proxy.transport.mysql.packet.command.text.query.TextResultSetRowPacket; import java.sql.Connection; import java.sql.SQLException; import java.sql.Statement; import java.util.List; -import java.util.concurrent.Callable; /** * Text protocol backend handler via JDBC to connect databases. @@ -64,12 +63,12 @@ protected Statement createStatement(final Connection connection, final String ac } @Override - protected Callable createExecuteWorker(final Statement statement, final boolean isReturnGeneratedKeys, final String actualSQL) { - return new JDBCTextExecuteWorker(actualSQL, statement, isReturnGeneratedKeys, this); + protected JDBCTextExecuteWorker createExecuteWorker(final Statement statement, final boolean isReturnGeneratedKeys, final String actualSQL) { + return new JDBCTextExecuteWorker(actualSQL, statement, isReturnGeneratedKeys); } @Override - protected DatabaseProtocolPacket newDatabaseProtocolPacket(final int sequenceId, final List data) { + protected DatabaseProtocolPacket newDatabaseProtocolPacket(final int sequenceId, final List data, final List columnTypes) { return new TextResultSetRowPacket(sequenceId, data); } } diff --git a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/text/JDBCTextExecuteWorker.java b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/text/JDBCTextExecuteWorker.java index 1697220d4e8ed..705df32cc6f65 100644 --- a/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/text/JDBCTextExecuteWorker.java +++ b/sharding-proxy/src/main/java/io/shardingsphere/proxy/backend/common/jdbc/text/JDBCTextExecuteWorker.java @@ -35,8 +35,8 @@ public final class JDBCTextExecuteWorker extends JDBCExecuteWorker { private final String sql; - public JDBCTextExecuteWorker(final String sql, final Statement statement, final boolean isReturnGeneratedKeys, final JDBCTextBackendHandler jdbcTextBackendHandler) { - super(statement, isReturnGeneratedKeys, jdbcTextBackendHandler); + public JDBCTextExecuteWorker(final String sql, final Statement statement, final boolean isReturnGeneratedKeys) { + super(statement, isReturnGeneratedKeys); this.statement = statement; this.sql = sql; }