Skip to content

Commit

Permalink
add JDBCExecuteQueryResponse & JDBCExecuteUpdateResponse
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu committed Jul 20, 2018
1 parent 357064c commit bad1ba7
Show file tree
Hide file tree
Showing 10 changed files with 156 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import io.shardingsphere.core.exception.ShardingException;
import io.shardingsphere.core.merger.MergeEngineFactory;
import io.shardingsphere.core.merger.MergedResult;
import io.shardingsphere.core.merger.QueryResult;
import io.shardingsphere.core.parsing.SQLJudgeEngine;
import io.shardingsphere.core.parsing.parser.sql.SQLStatement;
import io.shardingsphere.core.parsing.parser.sql.dml.insert.InsertStatement;
Expand All @@ -39,6 +38,7 @@
import io.shardingsphere.proxy.transport.mysql.constant.ColumnType;
import io.shardingsphere.proxy.transport.mysql.constant.ServerErrorCode;
import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets;
import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.QueryResponsePackets;
import io.shardingsphere.proxy.transport.mysql.packet.command.text.query.FieldCountPacket;
import io.shardingsphere.proxy.transport.mysql.packet.generic.EofPacket;
import io.shardingsphere.proxy.transport.mysql.packet.generic.ErrPacket;
Expand Down Expand Up @@ -73,11 +73,7 @@ public abstract class JDBCBackendHandler implements BackendHandler {

private final JDBCExecuteEngine executeEngine;

private final List<QueryResult> queryResults;

private int columnCount;

private List<ColumnType> columnTypes;
private SQLExecuteResponses responses;

private MergedResult mergedResult;

Expand All @@ -92,7 +88,6 @@ public JDBCBackendHandler(final String sql, final JDBCExecuteEngine executeEngin
this.executeEngine = executeEngine;
ruleRegistry = RuleRegistry.getInstance();
backendConnection = executeEngine.getBackendConnection();
queryResults = new LinkedList<>();
isMerged = false;
hasMoreResultValueFlag = true;
}
Expand All @@ -118,10 +113,7 @@ private CommandResponsePackets execute(final SQLRouteResult routeResult) throws
return new CommandResponsePackets(new ErrPacket(1,
ServerErrorCode.ER_ERROR_ON_MODIFYING_GTID_EXECUTED_TABLE, sqlStatement.getTables().isSingleTable() ? sqlStatement.getTables().getSingleTableName() : "unknown_table"));
}
SQLExecuteResponses responses = executeEngine.execute(routeResult, isReturnGeneratedKeys);
queryResults.addAll(responses.getQueryResults());
columnCount = responses.getColumnCount();
columnTypes = responses.getColumnTypes();
responses = executeEngine.execute(routeResult, isReturnGeneratedKeys);
CommandResponsePackets result = merge(sqlStatement, responses.getCommandResponsePacketsList());
if (!ruleRegistry.isMasterSlaveOnly()) {
ProxyShardingRefreshHandler.build(routeResult).execute();
Expand All @@ -135,43 +127,25 @@ private boolean isUnsupportedXA(final SQLType sqlType) throws SystemException {
}

private CommandResponsePackets merge(final SQLStatement sqlStatement, final Collection<CommandResponsePackets> packets) {
CommandResponsePackets headPackets = new CommandResponsePackets();
Collection<DatabasePacket> headPackets = new LinkedList<>();
for (CommandResponsePackets each : packets) {
if (null != each) {
headPackets.getPackets().add(each.getHeadPacket());
}
}
for (DatabasePacket each : headPackets.getPackets()) {
if (each instanceof ErrPacket) {
return new CommandResponsePackets(each);
if (each.getHeadPacket() instanceof ErrPacket) {
return new CommandResponsePackets(each.getHeadPacket());
}
headPackets.add(each.getHeadPacket());
}
}
if (SQLType.DML == sqlStatement.getType()) {
return mergeDML(headPackets);
CommandResponsePackets firstCommandResponsePackets = packets.iterator().next();
if (firstCommandResponsePackets instanceof QueryResponsePackets) {
return mergeQuery(sqlStatement, packets);
}
if (SQLType.DQL == sqlStatement.getType() || SQLType.DAL == sqlStatement.getType()) {
return mergeDQLorDAL(sqlStatement, packets);
}
return packets.iterator().next();
return mergeUpdate(headPackets);
}

private CommandResponsePackets mergeDML(final CommandResponsePackets firstPackets) {
int affectedRows = 0;
long lastInsertId = 0;
for (DatabasePacket each : firstPackets.getPackets()) {
if (each instanceof OKPacket) {
OKPacket okPacket = (OKPacket) each;
affectedRows += okPacket.getAffectedRows();
// TODO consider about insert multiple values
lastInsertId = okPacket.getLastInsertId();
}
}
return new CommandResponsePackets(new OKPacket(1, affectedRows, lastInsertId));
}

private CommandResponsePackets mergeDQLorDAL(final SQLStatement sqlStatement, final Collection<CommandResponsePackets> packets) {
private CommandResponsePackets mergeQuery(final SQLStatement sqlStatement, final Collection<CommandResponsePackets> packets) {
try {
mergedResult = MergeEngineFactory.newInstance(ruleRegistry.getShardingRule(), queryResults, sqlStatement, ruleRegistry.getShardingMetaData()).merge();
mergedResult = MergeEngineFactory.newInstance(ruleRegistry.getShardingRule(), responses.getQueryResults(), sqlStatement, ruleRegistry.getShardingMetaData()).merge();
isMerged = true;
} catch (final SQLException ex) {
return new CommandResponsePackets(new ErrPacket(1, ex));
Expand All @@ -185,7 +159,7 @@ private CommandResponsePackets buildPackets(final Collection<CommandResponsePack
FieldCountPacket fieldCountPacketSampling = (FieldCountPacket) databasePacketsSampling.next();
result.getPackets().add(fieldCountPacketSampling);
++currentSequenceId;
for (int i = 0; i < columnCount; i++) {
for (int i = 0; i < responses.getColumnCount(); i++) {
result.getPackets().add(databasePacketsSampling.next());
++currentSequenceId;
}
Expand All @@ -194,6 +168,20 @@ private CommandResponsePackets buildPackets(final Collection<CommandResponsePack
return result;
}

private CommandResponsePackets mergeUpdate(final Collection<DatabasePacket> packets) {
int affectedRows = 0;
long lastInsertId = 0;
for (DatabasePacket each : packets) {
if (each instanceof OKPacket) {
OKPacket okPacket = (OKPacket) each;
affectedRows += okPacket.getAffectedRows();
// TODO consider about insert multiple values
lastInsertId = okPacket.getLastInsertId();
}
}
return new CommandResponsePackets(new OKPacket(1, affectedRows, lastInsertId));
}

private SQLRouteResult doMasterSlaveRoute() {
SQLStatement sqlStatement = new SQLJudgeEngine(sql).judge();
SQLRouteResult result = new SQLRouteResult(sqlStatement);
Expand Down Expand Up @@ -223,15 +211,15 @@ public final DatabasePacket getResultValue() {
return new EofPacket(++currentSequenceId);
}
try {
List<Object> data = new ArrayList<>(columnCount);
for (int i = 1; i <= columnCount; i++) {
List<Object> data = new ArrayList<>(responses.getColumnCount());
for (int i = 1; i <= responses.getColumnCount(); i++) {
data.add(mergedResult.getValue(i, Object.class));
}
return newDatabasePacket(++currentSequenceId, data, columnTypes);
return newDatabasePacket(++currentSequenceId, data, responses.getColumnCount(), responses.getColumnTypes());
} catch (final SQLException ex) {
return new ErrPacket(1, ex);
}
}

protected abstract DatabasePacket newDatabasePacket(int sequenceId, List<Object> data, List<ColumnType> columnTypes);
protected abstract DatabasePacket newDatabasePacket(int sequenceId, List<Object> data, int columnCount, List<ColumnType> columnTypes);
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,28 +68,28 @@ protected JDBCExecuteResponse executeWithMetadata(final Statement statement, fin
try {
setFetchSize(statement);
if (!executeSQL(statement, sql, isReturnGeneratedKeys)) {
return new JDBCExecuteResponse(new CommandResponsePackets(new OKPacket(1, statement.getUpdateCount(), isReturnGeneratedKeys ? getGeneratedKey(statement) : 0)));
return new JDBCExecuteUpdateResponse(new CommandResponsePackets(new OKPacket(1, statement.getUpdateCount(), isReturnGeneratedKeys ? getGeneratedKey(statement) : 0)));
}
ResultSet resultSet = statement.getResultSet();
ResultSetMetaData resultSetMetaData = resultSet.getMetaData();
if (0 == resultSetMetaData.getColumnCount()) {
return new JDBCExecuteResponse(new CommandResponsePackets(new OKPacket(1)));
return new JDBCExecuteUpdateResponse(new CommandResponsePackets(new OKPacket(1)));
}
return new JDBCExecuteResponse(getHeaderPackets(resultSetMetaData), createQueryResult(resultSet));
return new JDBCExecuteQueryResponse(getHeaderPackets(resultSetMetaData), createQueryResult(resultSet));
} catch (final SQLException ex) {
return new JDBCExecuteResponse(new CommandResponsePackets(new ErrPacket(1, ex)));
return new JDBCExecuteUpdateResponse(new CommandResponsePackets(new ErrPacket(1, ex)));
}
}

protected JDBCExecuteResponse executeWithoutMetadata(final Statement statement, final String sql, final boolean isReturnGeneratedKeys) {
try {
setFetchSize(statement);
if (!executeSQL(statement, sql, isReturnGeneratedKeys)) {
return new JDBCExecuteResponse(new CommandResponsePackets(new OKPacket(1, statement.getUpdateCount(), isReturnGeneratedKeys ? getGeneratedKey(statement) : 0)));
return new JDBCExecuteUpdateResponse(new CommandResponsePackets(new OKPacket(1, statement.getUpdateCount(), isReturnGeneratedKeys ? getGeneratedKey(statement) : 0)));
}
return new JDBCExecuteResponse(createQueryResult(statement.getResultSet()));
return new JDBCExecuteQueryResponse(null, createQueryResult(statement.getResultSet()));
} catch (final SQLException ex) {
return new JDBCExecuteResponse(new CommandResponsePackets(new ErrPacket(1, ex)));
return new JDBCExecuteUpdateResponse(new CommandResponsePackets(new ErrPacket(1, ex)));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.proxy.backend.common.jdbc.execute;

import io.shardingsphere.core.merger.QueryResult;
import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.QueryResponsePackets;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* JDBC execute query response.
*
* @author zhangliang
*/
@RequiredArgsConstructor
public final class JDBCExecuteQueryResponse implements JDBCExecuteResponse {

private final QueryResponsePackets queryResponsePackets;

@Getter
private final QueryResult queryResult;

@Override
public QueryResponsePackets getCommandResponsePackets() {
return queryResponsePackets;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,29 +17,19 @@

package io.shardingsphere.proxy.backend.common.jdbc.execute;

import io.shardingsphere.core.merger.QueryResult;
import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* JDBC execute response.
*
* @author zhangliang
*/
@RequiredArgsConstructor
@Getter
public final class JDBCExecuteResponse {
public interface JDBCExecuteResponse {

private final CommandResponsePackets commandResponsePackets;

private final QueryResult queryResult;

public JDBCExecuteResponse(final QueryResult queryResult) {
this(null, queryResult);
}

public JDBCExecuteResponse(final CommandResponsePackets commandResponsePackets) {
this(commandResponsePackets, null);
}
/**
* Get command response packets.
*
* @return command response packets
*/
CommandResponsePackets getCommandResponsePackets();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.proxy.backend.common.jdbc.execute;

import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets;
import lombok.Getter;
import lombok.RequiredArgsConstructor;

/**
* JDBC execute update response.
*
* @author zhangliang
*/
@RequiredArgsConstructor
@Getter
public final class JDBCExecuteUpdateResponse implements JDBCExecuteResponse {

private final CommandResponsePackets commandResponsePackets;
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import lombok.Getter;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
Expand All @@ -36,13 +35,13 @@
@Getter
public final class SQLExecuteResponses {

private final Collection<CommandResponsePackets> commandResponsePacketsList;
private final List<CommandResponsePackets> commandResponsePacketsList;

private final Collection<QueryResult> queryResults;
private final List<QueryResult> queryResults;

private final CommandResponsePackets firstCommandResponsePackets;

public SQLExecuteResponses(final Collection<CommandResponsePackets> commandResponsePacketsList, final Collection<QueryResult> queryResults) {
public SQLExecuteResponses(final List<CommandResponsePackets> commandResponsePacketsList, final List<QueryResult> queryResults) {
this.commandResponsePacketsList = commandResponsePacketsList;
this.queryResults = queryResults;
firstCommandResponsePackets = commandResponsePacketsList.iterator().next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.shardingsphere.core.routing.SQLRouteResult;
import io.shardingsphere.core.routing.SQLUnit;
import io.shardingsphere.proxy.backend.common.jdbc.execute.JDBCExecuteEngine;
import io.shardingsphere.proxy.backend.common.jdbc.execute.JDBCExecuteQueryResponse;
import io.shardingsphere.proxy.backend.common.jdbc.execute.JDBCExecuteResponse;
import io.shardingsphere.proxy.backend.common.jdbc.execute.SQLExecuteResponses;
import io.shardingsphere.proxy.transport.mysql.packet.command.reponse.CommandResponsePackets;
Expand Down Expand Up @@ -98,18 +99,26 @@ private Collection<JDBCExecuteResponse> syncExecute(final boolean isReturnGenera
}

private SQLExecuteResponses buildCommandResponsePackets(final Collection<JDBCExecuteResponse> firstJDBCExecuteResponses, final List<Future<Collection<JDBCExecuteResponse>>> futureList) {
Collection<CommandResponsePackets> commandResponsePackets = new LinkedList<>();
Collection<QueryResult> queryResults = new LinkedList<>();
List<CommandResponsePackets> commandResponsePackets = new LinkedList<>();
List<QueryResult> queryResults = new LinkedList<>();
for (JDBCExecuteResponse each : firstJDBCExecuteResponses) {
commandResponsePackets.add(each.getCommandResponsePackets());
queryResults.add(each.getQueryResult());
if (null != each.getCommandResponsePackets()) {
commandResponsePackets.add(each.getCommandResponsePackets());
}
if (each instanceof JDBCExecuteQueryResponse) {
queryResults.add(((JDBCExecuteQueryResponse) each).getQueryResult());
}
}
for (Future<Collection<JDBCExecuteResponse>> each : futureList) {
try {
Collection<JDBCExecuteResponse> executeResponses = each.get();
for (JDBCExecuteResponse jdbcExecuteResponse : executeResponses) {
commandResponsePackets.add(jdbcExecuteResponse.getCommandResponsePackets());
queryResults.add(jdbcExecuteResponse.getQueryResult());
for (JDBCExecuteResponse executeResponse : executeResponses) {
if (executeResponse instanceof JDBCExecuteQueryResponse) {
queryResults.add(((JDBCExecuteQueryResponse) executeResponse).getQueryResult());
} else {
commandResponsePackets.add(executeResponse.getCommandResponsePackets());
}
commandResponsePackets.add(executeResponse.getCommandResponsePackets());
}
} catch (final InterruptedException | ExecutionException ex) {
throw new ShardingException(ex.getMessage(), ex);
Expand Down
Loading

0 comments on commit bad1ba7

Please sign in to comment.