Skip to content

Commit

Permalink
Support Savepoint for Proxy (apache#10468)
Browse files Browse the repository at this point in the history
* Support Savepoint for Proxy

* Fix code style and license

* Complete testcases for BackendTransactionManager
  • Loading branch information
TeslaCN authored May 25, 2021
1 parent 3e52fd7 commit 156fa6a
Show file tree
Hide file tree
Showing 16 changed files with 430 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction;

import org.apache.shardingsphere.infra.transaction.TransactionHolder;
import org.apache.shardingsphere.proxy.backend.communication.jdbc.connection.BackendConnection;
import org.apache.shardingsphere.proxy.backend.context.ProxyContext;
import org.apache.shardingsphere.transaction.ShardingTransactionManagerEngine;
import org.apache.shardingsphere.infra.transaction.TransactionHolder;
import org.apache.shardingsphere.transaction.core.TransactionType;
import org.apache.shardingsphere.transaction.spi.ShardingTransactionManager;

Expand Down Expand Up @@ -92,4 +92,37 @@ public void rollback() throws SQLException {
}
}
}

@Override
public void setSavepoint(final String savepointName) throws SQLException {
if (!connection.getTransactionStatus().isInTransaction()) {
return;
}
if (TransactionType.LOCAL == transactionType || null == shardingTransactionManager) {
localTransactionManager.setSavepoint(savepointName);
}
// TODO Non-local transaction manager
}

@Override
public void rollbackTo(final String savepointName) throws SQLException {
if (!connection.getTransactionStatus().isInTransaction()) {
return;
}
if (TransactionType.LOCAL == transactionType || null == shardingTransactionManager) {
localTransactionManager.rollbackTo(savepointName);
}
// TODO Non-local transaction manager
}

@Override
public void releaseSavepoint(final String savepointName) throws SQLException {
if (!connection.getTransactionStatus().isInTransaction()) {
return;
}
if (TransactionType.LOCAL == transactionType || null == shardingTransactionManager) {
localTransactionManager.releaseSavepoint(savepointName);
}
// TODO Non-local transaction manager
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.shardingsphere.proxy.backend.communication.jdbc.transaction;

import lombok.AccessLevel;
import lombok.NoArgsConstructor;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

/**
* Connection savepoint manager for local transaction.
*/
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public final class ConnectionSavepointManager {

private static final ConnectionSavepointManager INSTANCE = new ConnectionSavepointManager();

private static final Map<Connection, Map<String, Savepoint>> CONNECTION_SAVEPOINT_MAP = new ConcurrentHashMap<>(128);

/**
* Get instance of connection savepoint manager.
*
* @return instance of connection savepoint manager
*/
public static ConnectionSavepointManager getInstance() {
return INSTANCE;
}

/**
* Set savepoint.
*
* @param connection connection
* @param savepointName savepoint name
* @throws SQLException SQL Exception
*/
public void setSavepoint(final Connection connection, final String savepointName) throws SQLException {
Savepoint result = connection.setSavepoint(savepointName);
CONNECTION_SAVEPOINT_MAP.computeIfAbsent(connection, unused -> new LinkedHashMap<>()).put(savepointName, result);
}

/**
* Rollback to savepoint.
*
* @param connection connection
* @param savepointName savepoint name
* @throws SQLException SQL Exception
*/
public void rollbackToSavepoint(final Connection connection, final String savepointName) throws SQLException {
Optional<Savepoint> result = lookupSavepoint(connection, savepointName);
if (result.isPresent()) {
connection.rollback(result.get());
}
}

/**
* Release savepoint.
*
* @param connection connection
* @param savepointName savepoint name
* @throws SQLException SQL Exception
*/
public void releaseSavepoint(final Connection connection, final String savepointName) throws SQLException {
Optional<Savepoint> result = lookupSavepoint(connection, savepointName);
if (result.isPresent()) {
connection.releaseSavepoint(result.get());
}
}

private Optional<Savepoint> lookupSavepoint(final Connection connection, final String savepointName) {
return Optional.ofNullable(CONNECTION_SAVEPOINT_MAP.get(connection)).map(savepointMap -> savepointMap.get(savepointName));
}

/**
* Transaction finished.
*
* @param connection connection
*/
public void transactionFinished(final Connection connection) {
CONNECTION_SAVEPOINT_MAP.remove(connection);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ private Collection<SQLException> commitConnections() {
each.commit();
} catch (final SQLException ex) {
result.add(ex);
} finally {
ConnectionSavepointManager.getInstance().transactionFinished(each);
}
}
return result;
Expand All @@ -79,6 +81,8 @@ private Collection<SQLException> rollbackConnections() {
each.rollback();
} catch (final SQLException ex) {
result.add(ex);
} finally {
ConnectionSavepointManager.getInstance().transactionFinished(each);
}
}
return result;
Expand All @@ -94,4 +98,41 @@ private void throwSQLExceptionIfNecessary(final Collection<SQLException> excepti
}
throw ex;
}

@Override
public void setSavepoint(final String savepointName) throws SQLException {
if (!connection.getTransactionStatus().isInTransaction()) {
return;
}
for (Connection each : connection.getCachedConnections().values()) {
ConnectionSavepointManager.getInstance().setSavepoint(each, savepointName);
}
connection.getConnectionPostProcessors().add(target -> {
try {
ConnectionSavepointManager.getInstance().setSavepoint(target, savepointName);
} catch (final SQLException ex) {
throw new RuntimeException(ex);
}
});
}

@Override
public void rollbackTo(final String savepointName) throws SQLException {
if (!connection.getTransactionStatus().isInTransaction()) {
return;
}
for (Connection each : connection.getCachedConnections().values()) {
ConnectionSavepointManager.getInstance().rollbackToSavepoint(each, savepointName);
}
}

@Override
public void releaseSavepoint(final String savepointName) throws SQLException {
if (!connection.getTransactionStatus().isInTransaction()) {
return;
}
for (Connection each : connection.getCachedConnections().values()) {
ConnectionSavepointManager.getInstance().releaseSavepoint(each, savepointName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,28 @@ public interface TransactionManager {
* @throws SQLException SQL Exception
*/
void rollback() throws SQLException;

/**
* Set savepoint.
*
* @param savepointName savepoint name
* @throws SQLException SQL Exception
*/
void setSavepoint(String savepointName) throws SQLException;

/**
* Rollback to savepoint.
*
* @param savepointName savepoint name
* @throws SQLException SQL Exception
*/
void rollbackTo(String savepointName) throws SQLException;

/**
* Release savepoint.
*
* @param savepointName savepoint name
* @throws SQLException SQL Exception
*/
void releaseSavepoint(String savepointName) throws SQLException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@
import org.apache.shardingsphere.proxy.backend.response.header.ResponseHeader;
import org.apache.shardingsphere.proxy.backend.response.header.update.UpdateResponseHeader;
import org.apache.shardingsphere.proxy.backend.text.TextProtocolBackendHandler;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.ReleaseSavepointStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.RollbackToSavepointStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.SavepointStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
import org.apache.shardingsphere.transaction.core.TransactionOperationType;

Expand Down Expand Up @@ -51,6 +54,15 @@ public ResponseHeader execute() throws SQLException {
case BEGIN:
backendTransactionManager.begin();
break;
case SAVEPOINT:
backendTransactionManager.setSavepoint(((SavepointStatement) tclStatement).getSavepointName());
break;
case ROLLBACK_TO_SAVEPOINT:
backendTransactionManager.rollbackTo(((RollbackToSavepointStatement) tclStatement).getSavepointName());
break;
case RELEASE_SAVEPOINT:
backendTransactionManager.releaseSavepoint(((ReleaseSavepointStatement) tclStatement).getSavepointName());
break;
case COMMIT:
backendTransactionManager.commit();
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import org.apache.shardingsphere.proxy.backend.text.skip.SkipBackendHandler;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.BeginTransactionStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.CommitStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.ReleaseSavepointStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.RollbackStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.RollbackToSavepointStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.SavepointStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.SetAutoCommitStatement;
import org.apache.shardingsphere.sql.parser.sql.common.statement.tcl.TCLStatement;
import org.apache.shardingsphere.transaction.core.TransactionOperationType;
Expand Down Expand Up @@ -55,6 +58,15 @@ public static TextProtocolBackendHandler newInstance(final TCLStatement tclState
}
return new TransactionBackendHandler(tclStatement, TransactionOperationType.BEGIN, backendConnection);
}
if (tclStatement instanceof SavepointStatement) {
return new TransactionBackendHandler(tclStatement, TransactionOperationType.SAVEPOINT, backendConnection);
}
if (tclStatement instanceof ReleaseSavepointStatement) {
return new TransactionBackendHandler(tclStatement, TransactionOperationType.RELEASE_SAVEPOINT, backendConnection);
}
if (tclStatement instanceof RollbackToSavepointStatement) {
return new TransactionBackendHandler(tclStatement, TransactionOperationType.ROLLBACK_TO_SAVEPOINT, backendConnection);
}
if (tclStatement instanceof CommitStatement) {
return new TransactionBackendHandler(tclStatement, TransactionOperationType.COMMIT, backendConnection);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -145,6 +146,54 @@ public void assertRollbackWithoutTransaction() throws SQLException {
verify(shardingTransactionManager, times(0)).rollback();
}

@Test
public void assertSetSavepointForLocalTransaction() throws SQLException {
newBackendTransactionManager(TransactionType.LOCAL, true);
String savepointName = "JDBC_SAVEPOINT_0";
backendTransactionManager.setSavepoint(savepointName);
verify(localTransactionManager).setSavepoint(savepointName);
}

@Test
public void assertSetSavepointWithoutTransaction() throws SQLException {
newBackendTransactionManager(TransactionType.LOCAL, false);
String savepointName = "JDBC_SAVEPOINT_0";
backendTransactionManager.setSavepoint(savepointName);
verify(localTransactionManager, never()).setSavepoint(savepointName);
}

@Test
public void assertRollbackToSavepointForLocalTransaction() throws SQLException {
newBackendTransactionManager(TransactionType.LOCAL, true);
String savepointName = "JDBC_SAVEPOINT_0";
backendTransactionManager.rollbackTo(savepointName);
verify(localTransactionManager).rollbackTo(savepointName);
}

@Test
public void assertRollbackToSavepointWithoutTransaction() throws SQLException {
newBackendTransactionManager(TransactionType.LOCAL, false);
String savepointName = "JDBC_SAVEPOINT_0";
backendTransactionManager.rollbackTo(savepointName);
verify(localTransactionManager, never()).rollbackTo(savepointName);
}

@Test
public void assertReleaseSavepointForLocalTransaction() throws SQLException {
newBackendTransactionManager(TransactionType.LOCAL, true);
String savepointName = "JDBC_SAVEPOINT_0";
backendTransactionManager.releaseSavepoint(savepointName);
verify(localTransactionManager).releaseSavepoint(savepointName);
}

@Test
public void assertReleaseSavepointWithoutTransaction() throws SQLException {
newBackendTransactionManager(TransactionType.LOCAL, false);
String savepointName = "JDBC_SAVEPOINT_0";
backendTransactionManager.releaseSavepoint(savepointName);
verify(localTransactionManager, never()).releaseSavepoint(savepointName);
}

private void newBackendTransactionManager(final TransactionType transactionType, final boolean inTransaction) {
when(backendConnection.getTransactionStatus().getTransactionType()).thenReturn(transactionType);
when(transactionStatus.isInTransaction()).thenReturn(inTransaction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ execute
| commit
| rollback
| savepoint
| releaseSavepoint
| rollbackToSavepoint
| grant
| revoke
| createUser
Expand Down
Loading

0 comments on commit 156fa6a

Please sign in to comment.