From 5c021b8179477e7980a0c5a1551c7c8e6af876ff Mon Sep 17 00:00:00 2001 From: tuohai666 Date: Fri, 20 Apr 2018 10:42:26 +0800 Subject: [PATCH] for #675 initial succeed --- .../StatementExecuteBackendHandler.java | 134 ++++++++++++------ ...ySQLPacketStatementExecuteQueryResult.java | 27 +++- .../frontend/mysql/MySQLFrontendHandler.java | 6 +- .../mysql/packet/MySQLPacketPayload.java | 1 + .../mysql/packet/command/CommandPacket.java | 15 ++ .../command/UnsupportedCommandPacket.java | 11 ++ .../statement/close/ComStmtClosePacket.java | 11 ++ .../execute/ComStmtExecutePacket.java | 19 ++- .../prepare/ComStmtPreparePacket.java | 11 ++ .../text/fieldlist/ComFieldListPacket.java | 11 ++ .../command/text/initdb/ComInitDbPacket.java | 11 ++ .../command/text/query/ComQueryPacket.java | 11 ++ .../command/text/quit/ComQuitPacket.java | 11 ++ 13 files changed, 225 insertions(+), 54 deletions(-) diff --git a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/backend/common/StatementExecuteBackendHandler.java b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/backend/common/StatementExecuteBackendHandler.java index 4a718f221fa14..1365f85d6852c 100644 --- a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/backend/common/StatementExecuteBackendHandler.java +++ b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/backend/common/StatementExecuteBackendHandler.java @@ -65,9 +65,27 @@ public final class StatementExecuteBackendHandler implements BackendHandler { private final PreparedStatementRoutingEngine routingEngine; + private List connections; + + private List resultSets; + + private MergedResult mergedResult; + + private int currentSequenceId; + + private int columnCount; + + private final List columnTypes; + + private boolean noMoreValues; + public StatementExecuteBackendHandler(final List preparedStatementParameters, final int statementId, final DatabaseType databaseType, final boolean showSQL) { this.preparedStatementParameters = preparedStatementParameters; routingEngine = new PreparedStatementRoutingEngine(PreparedStatementRegistry.getInstance().getSQL(statementId), ShardingRuleRegistry.getInstance().getShardingRule(), databaseType, showSQL); + connections = new ArrayList<>(1024); + resultSets = new ArrayList<>(1024); + columnTypes = new ArrayList<>(32); + noMoreValues = false; } @Override @@ -77,24 +95,23 @@ public CommandResponsePackets execute() { if (routeResult.getExecutionUnits().isEmpty()) { return new CommandResponsePackets(new OKPacket(1, 0, 0, StatusFlag.SERVER_STATUS_AUTOCOMMIT.getValue(), 0, "")); } - List columnTypes = new ArrayList<>(32); List result = new LinkedList<>(); for (SQLExecutionUnit each : routeResult.getExecutionUnits()) { // TODO multiple threads - result.add(execute(routeResult.getSqlStatement(), each, columnTypes)); + result.add(execute(routeResult.getSqlStatement(), each)); } - return merge(routeResult.getSqlStatement(), result, columnTypes); + return merge(routeResult.getSqlStatement(), result); } - private CommandResponsePackets execute(final SQLStatement sqlStatement, final SQLExecutionUnit sqlExecutionUnit, final List columnTypes) { + private CommandResponsePackets execute(final SQLStatement sqlStatement, final SQLExecutionUnit sqlExecutionUnit) { switch (sqlStatement.getType()) { case DQL: - return executeQuery(ShardingRuleRegistry.getInstance().getDataSourceMap().get(sqlExecutionUnit.getDataSource()), sqlExecutionUnit.getSql(), columnTypes); + return executeQuery(ShardingRuleRegistry.getInstance().getDataSourceMap().get(sqlExecutionUnit.getDataSource()), sqlExecutionUnit.getSql()); case DML: case DDL: return executeUpdate(ShardingRuleRegistry.getInstance().getDataSourceMap().get(sqlExecutionUnit.getDataSource()), sqlExecutionUnit.getSql(), sqlStatement); default: - return executeCommon(ShardingRuleRegistry.getInstance().getDataSourceMap().get(sqlExecutionUnit.getDataSource()), sqlExecutionUnit.getSql(), columnTypes); + return executeCommon(ShardingRuleRegistry.getInstance().getDataSourceMap().get(sqlExecutionUnit.getDataSource()), sqlExecutionUnit.getSql()); } } @@ -112,15 +129,25 @@ private void setJDBCPreparedStatementParameters(final PreparedStatement prepared } } - private CommandResponsePackets executeQuery(final DataSource dataSource, final String sql, final List columnTypes) { - try ( - Connection connection = dataSource.getConnection(); - PreparedStatement preparedStatement = connection.prepareStatement(sql)) { + private CommandResponsePackets executeQuery(final DataSource dataSource, final String sql) { + PreparedStatement preparedStatement = null; + try { + Connection connection = dataSource.getConnection(); + connections.add(connection); + preparedStatement = connection.prepareStatement(sql); + preparedStatement.setFetchSize(Integer.MIN_VALUE); setJDBCPreparedStatementParameters(preparedStatement); - ResultSet resultSet = preparedStatement.executeQuery(); - return getDatabaseProtocolPackets(resultSet, columnTypes); + resultSets.add(preparedStatement.executeQuery()); + return getDatabaseProtocolPackets(); } catch (final SQLException ex) { return new CommandResponsePackets(new ErrPacket(1, ex.getErrorCode(), "", ex.getSQLState(), ex.getMessage())); + } finally { +// if (preparedStatement != null) { +// try { +// preparedStatement.close(); +// } catch (SQLException ignore) { +// } +// } } } @@ -151,17 +178,17 @@ private CommandResponsePackets executeUpdate(final DataSource dataSource, final } } } - } - private CommandResponsePackets executeCommon(final DataSource dataSource, final String sql, final List columnTypes) { + private CommandResponsePackets executeCommon(final DataSource dataSource, final String sql) { try ( Connection connection = dataSource.getConnection(); PreparedStatement preparedStatement = connection.prepareStatement(sql)) { setJDBCPreparedStatementParameters(preparedStatement); boolean hasResultSet = preparedStatement.execute(); if (hasResultSet) { - return getDatabaseProtocolPackets(preparedStatement.getResultSet(), columnTypes); + resultSets.add(preparedStatement.getResultSet()); + return getDatabaseProtocolPackets(); } else { return new CommandResponsePackets(new OKPacket(1, preparedStatement.getUpdateCount(), 0, StatusFlag.SERVER_STATUS_AUTOCOMMIT.getValue(), 0, "")); } @@ -170,11 +197,11 @@ private CommandResponsePackets executeCommon(final DataSource dataSource, final } } - private CommandResponsePackets getDatabaseProtocolPackets(final ResultSet resultSet, final List columnTypes) throws SQLException { + private CommandResponsePackets getDatabaseProtocolPackets() throws SQLException { CommandResponsePackets result = new CommandResponsePackets(); int currentSequenceId = 0; - ResultSetMetaData resultSetMetaData = resultSet.getMetaData(); - int columnCount = resultSetMetaData.getColumnCount(); + ResultSetMetaData resultSetMetaData = resultSets.get(resultSets.size() - 1).getMetaData(); + columnCount = resultSetMetaData.getColumnCount(); if (0 == columnCount) { result.addPacket(new OKPacket(++currentSequenceId, 0, 0, StatusFlag.SERVER_STATUS_AUTOCOMMIT.getValue(), 0, "")); return result; @@ -188,14 +215,6 @@ private CommandResponsePackets getDatabaseProtocolPackets(final ResultSet result columnTypes.add(columnType); } result.addPacket(new EofPacket(++currentSequenceId, 0, StatusFlag.SERVER_STATUS_AUTOCOMMIT.getValue())); - while (resultSet.next()) { - List data = new ArrayList<>(columnCount); - for (int i = 1; i <= columnCount; i++) { - data.add(resultSet.getObject(i)); - } - result.addPacket(new BinaryResultSetRowPacket(++currentSequenceId, columnCount, data, columnTypes)); - } - result.addPacket(new EofPacket(++currentSequenceId, 0, StatusFlag.SERVER_STATUS_AUTOCOMMIT.getValue())); return result; } @@ -208,7 +227,7 @@ private long getGeneratedKey(final PreparedStatement preparedStatement) throws S return result; } - private CommandResponsePackets merge(final SQLStatement sqlStatement, final List packets, final List columnTypes) { + private CommandResponsePackets merge(final SQLStatement sqlStatement, final List packets) { if (1 == packets.size()) { return packets.iterator().next(); } @@ -225,7 +244,7 @@ private CommandResponsePackets merge(final SQLStatement sqlStatement, final List return mergeDML(headPackets); } if (SQLType.DQL == sqlStatement.getType() || SQLType.DAL == sqlStatement.getType()) { - return mergeDQLorDAL(sqlStatement, packets, columnTypes); + return mergeDQLorDAL(sqlStatement, packets); } return packets.get(0); } @@ -241,44 +260,71 @@ private CommandResponsePackets mergeDML(final CommandResponsePackets firstPacket return new CommandResponsePackets(new OKPacket(1, affectedRows, 0, StatusFlag.SERVER_STATUS_AUTOCOMMIT.getValue(), 0, "")); } - private CommandResponsePackets mergeDQLorDAL(final SQLStatement sqlStatement, final List packets, final List columnTypes) { + private CommandResponsePackets mergeDQLorDAL(final SQLStatement sqlStatement, final List packets) { List queryResults = new ArrayList<>(packets.size()); - for (CommandResponsePackets each : packets) { +// for (CommandResponsePackets each : packets) { +// // TODO replace to a common PacketQueryResult +// queryResults.add(new MySQLPacketStatementExecuteQueryResult(each, resultSet, columnTypes)); +// } + for (int i = 0; i < packets.size(); i++) { // TODO replace to a common PacketQueryResult - queryResults.add(new MySQLPacketStatementExecuteQueryResult(each)); + queryResults.add(new MySQLPacketStatementExecuteQueryResult(packets.get(i), resultSets.get(i), columnTypes)); } - MergedResult mergedResult; try { mergedResult = MergeEngineFactory.newInstance(ShardingRuleRegistry.getInstance().getShardingRule(), queryResults, sqlStatement).merge(); } catch (final SQLException ex) { return new CommandResponsePackets(new ErrPacket(1, ex.getErrorCode(), "", ex.getSQLState(), ex.getMessage())); } - return buildPackets(packets, mergedResult, columnTypes); + return buildPackets(packets); } - private CommandResponsePackets buildPackets(final List packets, final MergedResult mergedResult, final List columnTypes) { + private CommandResponsePackets buildPackets(final List packets) { CommandResponsePackets result = new CommandResponsePackets(); Iterator databaseProtocolPacketsSampling = packets.iterator().next().getDatabaseProtocolPackets().iterator(); FieldCountPacket fieldCountPacketSampling = (FieldCountPacket) databaseProtocolPacketsSampling.next(); result.addPacket(fieldCountPacketSampling); + ++currentSequenceId; int columnCount = fieldCountPacketSampling.getColumnCount(); for (int i = 0; i < columnCount; i++) { result.addPacket(databaseProtocolPacketsSampling.next()); + ++currentSequenceId; } result.addPacket(databaseProtocolPacketsSampling.next()); - int currentSequenceId = result.size(); - try { - while (mergedResult.next()) { - List data = new ArrayList<>(columnCount); - for (int i = 1; i <= columnCount; i++) { - data.add(mergedResult.getValue(i, Object.class)); + ++currentSequenceId; + return result; + } + + public boolean hasMoreResultValue() throws SQLException { + if (noMoreValues) { + + return false; + } + if (!mergedResult.next()) { + noMoreValues = true; + for (Connection each : connections) { + if (null != each) { + try { + each.close(); + } catch (SQLException ignore) { + } } - result.addPacket(new BinaryResultSetRowPacket(++currentSequenceId, columnCount, data, columnTypes)); } + } + return true; + } + + public DatabaseProtocolPacket getResultValue() { + if (noMoreValues) { + return new EofPacket(++currentSequenceId, 0, StatusFlag.SERVER_STATUS_AUTOCOMMIT.getValue()); + } + try { + List data = new ArrayList<>(columnCount); + for (int i = 1; i <= columnCount; i++) { + data.add(mergedResult.getValue(i, Object.class)); + } + return new BinaryResultSetRowPacket(++currentSequenceId, columnCount, data, columnTypes); } catch (final SQLException ex) { - return new CommandResponsePackets(new ErrPacket(1, ex.getErrorCode(), "", ex.getSQLState(), ex.getMessage())); + return new ErrPacket(1, ex.getErrorCode(), "", ex.getSQLState(), ex.getMessage()); } - result.addPacket(new EofPacket(++currentSequenceId, 0, StatusFlag.SERVER_STATUS_AUTOCOMMIT.getValue())); - return result; } } diff --git a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/backend/mysql/MySQLPacketStatementExecuteQueryResult.java b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/backend/mysql/MySQLPacketStatementExecuteQueryResult.java index b75110c4ed404..81b8f98a62441 100644 --- a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/backend/mysql/MySQLPacketStatementExecuteQueryResult.java +++ b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/backend/mysql/MySQLPacketStatementExecuteQueryResult.java @@ -19,6 +19,7 @@ import io.shardingjdbc.core.merger.QueryResult; import io.shardingjdbc.proxy.transport.common.packet.DatabaseProtocolPacket; +import io.shardingjdbc.proxy.transport.mysql.constant.ColumnType; import io.shardingjdbc.proxy.transport.mysql.packet.command.CommandResponsePackets; import io.shardingjdbc.proxy.transport.mysql.packet.command.statement.execute.BinaryResultSetRowPacket; import io.shardingjdbc.proxy.transport.mysql.packet.command.text.query.ColumnDefinition41Packet; @@ -26,9 +27,13 @@ import lombok.RequiredArgsConstructor; import java.io.InputStream; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; import java.util.Calendar; import java.util.HashMap; import java.util.Iterator; +import java.util.List; import java.util.Map; /** @@ -45,11 +50,15 @@ public final class MySQLPacketStatementExecuteQueryResult implements QueryResult private final Map columnLabelAndIndexMap; - private final Iterator data; + private final ResultSet resultSet; + + private final List columnTypes; + + private int currentSequenceId; private BinaryResultSetRowPacket currentRow; - public MySQLPacketStatementExecuteQueryResult(final CommandResponsePackets packets) { + public MySQLPacketStatementExecuteQueryResult(final CommandResponsePackets packets, final ResultSet resultSet, final List columnTypes) { Iterator packetIterator = packets.getDatabaseProtocolPackets().iterator(); columnCount = ((FieldCountPacket) packetIterator.next()).getColumnCount(); columnIndexAndLabelMap = new HashMap<>(columnCount, 1); @@ -60,14 +69,18 @@ public MySQLPacketStatementExecuteQueryResult(final CommandResponsePackets packe columnLabelAndIndexMap.put(columnDefinition41Packet.getName(), i); } packetIterator.next(); - data = packetIterator; + this.resultSet = resultSet; + this.columnTypes = columnTypes; } @Override - public boolean next() { - DatabaseProtocolPacket databaseProtocolPacket = data.next(); - if (databaseProtocolPacket instanceof BinaryResultSetRowPacket) { - currentRow = (BinaryResultSetRowPacket) databaseProtocolPacket; + public boolean next() throws SQLException { + if (resultSet.next()) { + List data = new ArrayList<>(columnCount); + for (int i = 1; i <= columnCount; i++) { + data.add(resultSet.getObject(i)); + } + currentRow = new BinaryResultSetRowPacket(++currentSequenceId, columnCount, data, columnTypes); return true; } return false; diff --git a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/frontend/mysql/MySQLFrontendHandler.java b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/frontend/mysql/MySQLFrontendHandler.java index c77652d86f59d..b5b2fe65dcfcc 100644 --- a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/frontend/mysql/MySQLFrontendHandler.java +++ b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/frontend/mysql/MySQLFrontendHandler.java @@ -69,9 +69,11 @@ public void run() { int sequenceId = mysqlPacketPayload.readInt1(); CommandPacket commandPacket = CommandPacketFactory.getCommandPacket(sequenceId, mysqlPacketPayload); for (DatabaseProtocolPacket each : commandPacket.execute().getDatabaseProtocolPackets()) { - context.write(each); + context.writeAndFlush(each); + } + while (commandPacket.hasMoreResultValue()) { + context.writeAndFlush(commandPacket.getResultValue()); } - context.flush(); } }); } diff --git a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/MySQLPacketPayload.java b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/MySQLPacketPayload.java index 3c42679741e92..04e9cd69a9336 100644 --- a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/MySQLPacketPayload.java +++ b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/MySQLPacketPayload.java @@ -33,6 +33,7 @@ * @see binary protocol value * * @author zhangliang + * @author zhangyonglun */ @RequiredArgsConstructor @Getter diff --git a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/CommandPacket.java b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/CommandPacket.java index 31f3780d2bf4c..eaaf20f41b5ed 100644 --- a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/CommandPacket.java +++ b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/CommandPacket.java @@ -17,6 +17,7 @@ package io.shardingjdbc.proxy.transport.mysql.packet.command; +import io.shardingjdbc.proxy.transport.common.packet.DatabaseProtocolPacket; import io.shardingjdbc.proxy.transport.mysql.packet.MySQLPacket; /** @@ -36,4 +37,18 @@ public CommandPacket(final int sequenceId) { * @return result packets to be sent */ public abstract CommandResponsePackets execute(); + + /** + * Has more result value. + * + * @return has more result value + */ + public abstract boolean hasMoreResultValue(); + + /** + * Get result value. + * + * @return result to be sent + */ + public abstract DatabaseProtocolPacket getResultValue(); } diff --git a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/UnsupportedCommandPacket.java b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/UnsupportedCommandPacket.java index 1779e42728c69..d8e6fd5ca9b2c 100644 --- a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/UnsupportedCommandPacket.java +++ b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/UnsupportedCommandPacket.java @@ -17,6 +17,7 @@ package io.shardingjdbc.proxy.transport.mysql.packet.command; +import io.shardingjdbc.proxy.transport.common.packet.DatabaseProtocolPacket; import io.shardingjdbc.proxy.transport.mysql.packet.MySQLPacketPayload; import io.shardingjdbc.proxy.transport.mysql.packet.generic.ErrPacket; @@ -50,4 +51,14 @@ public CommandResponsePackets execute() { @Override public void write(final MySQLPacketPayload mysqlPacketPayload) { } + + @Override + public boolean hasMoreResultValue() { + return false; + } + + @Override + public DatabaseProtocolPacket getResultValue() { + return null; + } } diff --git a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/statement/close/ComStmtClosePacket.java b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/statement/close/ComStmtClosePacket.java index 9e13b858efbda..427b4cb2accf0 100644 --- a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/statement/close/ComStmtClosePacket.java +++ b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/statement/close/ComStmtClosePacket.java @@ -17,6 +17,7 @@ package io.shardingjdbc.proxy.transport.mysql.packet.command.statement.close; +import io.shardingjdbc.proxy.transport.common.packet.DatabaseProtocolPacket; import io.shardingjdbc.proxy.transport.mysql.packet.MySQLPacketPayload; import io.shardingjdbc.proxy.transport.mysql.packet.command.CommandPacket; import io.shardingjdbc.proxy.transport.mysql.packet.command.CommandResponsePackets; @@ -47,4 +48,14 @@ public CommandResponsePackets execute() { log.debug("COM_STMT_CLOSE received for Sharding-Proxy: {}", statementId); return new CommandResponsePackets(new DummyPacket()); } + + @Override + public boolean hasMoreResultValue() { + return false; + } + + @Override + public DatabaseProtocolPacket getResultValue() { + return null; + } } diff --git a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/statement/execute/ComStmtExecutePacket.java b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/statement/execute/ComStmtExecutePacket.java index 13cb6a3c55be7..f0e937cdb574a 100644 --- a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/statement/execute/ComStmtExecutePacket.java +++ b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/statement/execute/ComStmtExecutePacket.java @@ -23,6 +23,7 @@ import io.shardingjdbc.core.parsing.parser.sql.SQLStatement; import io.shardingjdbc.proxy.backend.common.StatementExecuteBackendHandler; import io.shardingjdbc.proxy.config.ShardingRuleRegistry; +import io.shardingjdbc.proxy.transport.common.packet.DatabaseProtocolPacket; import io.shardingjdbc.proxy.transport.mysql.constant.ColumnType; import io.shardingjdbc.proxy.transport.mysql.constant.NewParametersBoundFlag; import io.shardingjdbc.proxy.transport.mysql.packet.MySQLPacketPayload; @@ -32,6 +33,7 @@ import lombok.Getter; import lombok.extern.slf4j.Slf4j; +import java.sql.SQLException; import java.util.ArrayList; import java.util.List; @@ -63,6 +65,8 @@ public final class ComStmtExecutePacket extends CommandPacket { private final List preparedStatementParameters = new ArrayList<>(32); + private final StatementExecuteBackendHandler statementExecuteBackendHandler; + public ComStmtExecutePacket(final int sequenceId, final MySQLPacketPayload mysqlPacketPayload) { super(sequenceId); statementId = mysqlPacketPayload.readInt4(); @@ -77,6 +81,7 @@ public ComStmtExecutePacket(final int sequenceId, final MySQLPacketPayload mysql } newParametersBoundFlag = NewParametersBoundFlag.valueOf(mysqlPacketPayload.readInt1()); setParameterList(mysqlPacketPayload, numParameters, newParametersBoundFlag); + statementExecuteBackendHandler = new StatementExecuteBackendHandler(preparedStatementParameters, statementId, DatabaseType.MySQL, true); } private void setParameterList(final MySQLPacketPayload mysqlPacketPayload, final int numParameters, final NewParametersBoundFlag newParametersBoundFlag) { @@ -144,6 +149,18 @@ public void write(final MySQLPacketPayload mysqlPacketPayload) { @Override public CommandResponsePackets execute() { log.debug("COM_STMT_EXECUTE received for Sharding-Proxy: {}", statementId); - return new StatementExecuteBackendHandler(preparedStatementParameters, statementId, DatabaseType.MySQL, true).execute(); + return statementExecuteBackendHandler.execute(); + } + + public boolean hasMoreResultValue() { + try { + return statementExecuteBackendHandler.hasMoreResultValue(); + } catch (SQLException ex) { + return false; + } + } + + public DatabaseProtocolPacket getResultValue() { + return statementExecuteBackendHandler.getResultValue(); } } diff --git a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/statement/prepare/ComStmtPreparePacket.java b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/statement/prepare/ComStmtPreparePacket.java index 483ce4e576875..f4147d551df8d 100644 --- a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/statement/prepare/ComStmtPreparePacket.java +++ b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/statement/prepare/ComStmtPreparePacket.java @@ -24,6 +24,7 @@ import io.shardingjdbc.core.parsing.parser.sql.dml.insert.InsertStatement; import io.shardingjdbc.core.parsing.parser.sql.dql.select.SelectStatement; import io.shardingjdbc.proxy.config.ShardingRuleRegistry; +import io.shardingjdbc.proxy.transport.common.packet.DatabaseProtocolPacket; import io.shardingjdbc.proxy.transport.mysql.constant.ColumnType; import io.shardingjdbc.proxy.transport.mysql.constant.StatusFlag; import io.shardingjdbc.proxy.transport.mysql.packet.MySQLPacketPayload; @@ -75,6 +76,16 @@ public CommandResponsePackets execute() { return result; } + @Override + public boolean hasMoreResultValue() { + return false; + } + + @Override + public DatabaseProtocolPacket getResultValue() { + return null; + } + private int getNumColumns(final SQLStatement sqlStatement) { if (sqlStatement instanceof SelectStatement) { // TODO select * cannot know items num diff --git a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/fieldlist/ComFieldListPacket.java b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/fieldlist/ComFieldListPacket.java index 259565eed1505..648be8556bf26 100644 --- a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/fieldlist/ComFieldListPacket.java +++ b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/fieldlist/ComFieldListPacket.java @@ -20,6 +20,7 @@ import io.shardingjdbc.core.constant.DatabaseType; import io.shardingjdbc.core.constant.ShardingConstant; import io.shardingjdbc.proxy.backend.common.SQLExecuteBackendHandler; +import io.shardingjdbc.proxy.transport.common.packet.DatabaseProtocolPacket; import io.shardingjdbc.proxy.transport.mysql.packet.MySQLPacketPayload; import io.shardingjdbc.proxy.transport.mysql.packet.command.CommandPacket; import io.shardingjdbc.proxy.transport.mysql.packet.command.CommandPacketType; @@ -60,4 +61,14 @@ public CommandResponsePackets execute() { // TODO use common database type return new SQLExecuteBackendHandler(sql, DatabaseType.MySQL, true).execute(); } + + @Override + public boolean hasMoreResultValue() { + return false; + } + + @Override + public DatabaseProtocolPacket getResultValue() { + return null; + } } diff --git a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/initdb/ComInitDbPacket.java b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/initdb/ComInitDbPacket.java index 7f3c70a9ce92b..5d45c48370d5c 100644 --- a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/initdb/ComInitDbPacket.java +++ b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/initdb/ComInitDbPacket.java @@ -18,6 +18,7 @@ package io.shardingjdbc.proxy.transport.mysql.packet.command.text.initdb; import io.shardingjdbc.core.constant.ShardingConstant; +import io.shardingjdbc.proxy.transport.common.packet.DatabaseProtocolPacket; import io.shardingjdbc.proxy.transport.mysql.constant.StatusFlag; import io.shardingjdbc.proxy.transport.mysql.packet.MySQLPacketPayload; import io.shardingjdbc.proxy.transport.mysql.packet.command.CommandPacket; @@ -57,4 +58,14 @@ public CommandResponsePackets execute() { } return new CommandResponsePackets(new ErrPacket(getSequenceId() + 1, 1049, "", "", String.format("Unknown database '%s'", schemaName))); } + + @Override + public boolean hasMoreResultValue() { + return false; + } + + @Override + public DatabaseProtocolPacket getResultValue() { + return null; + } } diff --git a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/query/ComQueryPacket.java b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/query/ComQueryPacket.java index eae292de4e961..45ede30ec9862 100644 --- a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/query/ComQueryPacket.java +++ b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/query/ComQueryPacket.java @@ -19,6 +19,7 @@ import io.shardingjdbc.core.constant.DatabaseType; import io.shardingjdbc.proxy.backend.common.SQLExecuteBackendHandler; +import io.shardingjdbc.proxy.transport.common.packet.DatabaseProtocolPacket; import io.shardingjdbc.proxy.transport.mysql.packet.MySQLPacketPayload; import io.shardingjdbc.proxy.transport.mysql.packet.command.CommandPacket; import io.shardingjdbc.proxy.transport.mysql.packet.command.CommandResponsePackets; @@ -50,4 +51,14 @@ public CommandResponsePackets execute() { log.debug("COM_QUERY received for Sharding-Proxy: {}", sql); return new SQLExecuteBackendHandler(sql, DatabaseType.MySQL, true).execute(); } + + @Override + public boolean hasMoreResultValue() { + return false; + } + + @Override + public DatabaseProtocolPacket getResultValue() { + return null; + } } diff --git a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/quit/ComQuitPacket.java b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/quit/ComQuitPacket.java index 83d9f24deb8f5..cb386989f35f4 100644 --- a/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/quit/ComQuitPacket.java +++ b/sharding-proxy/src/main/java/io/shardingjdbc/proxy/transport/mysql/packet/command/text/quit/ComQuitPacket.java @@ -17,6 +17,7 @@ package io.shardingjdbc.proxy.transport.mysql.packet.command.text.quit; +import io.shardingjdbc.proxy.transport.common.packet.DatabaseProtocolPacket; import io.shardingjdbc.proxy.transport.mysql.constant.StatusFlag; import io.shardingjdbc.proxy.transport.mysql.packet.MySQLPacketPayload; import io.shardingjdbc.proxy.transport.mysql.packet.command.CommandPacket; @@ -45,4 +46,14 @@ public CommandResponsePackets execute() { public void write(final MySQLPacketPayload mysqlPacketPayload) { mysqlPacketPayload.writeInt1(CommandPacketType.COM_QUIT.getValue()); } + + @Override + public boolean hasMoreResultValue() { + return false; + } + + @Override + public DatabaseProtocolPacket getResultValue() { + return null; + } }