Skip to content

Commit

Permalink
add Transaction event param for spi
Browse files Browse the repository at this point in the history
  • Loading branch information
cherrylzhao committed Jul 13, 2018
1 parent f968940 commit 1c70533
Show file tree
Hide file tree
Showing 5 changed files with 31 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package io.shardingsphere.core.transaction.spi;

import io.shardingsphere.core.transaction.event.AbstractTransactionEvent;

/**
* Transaction Spi interface.
*
Expand All @@ -29,19 +31,19 @@ public interface Transaction {
*
* @throws Exception Exception
*/
void begin() throws Exception;
void begin(AbstractTransactionEvent transactionEvent) throws Exception;

/**
* Do transaction commit.
*
* @throws Exception Exception
*/
void commit() throws Exception;
void commit(AbstractTransactionEvent transactionEvent) throws Exception;

/**
* Do transaction rollback.
*
* @throws Exception Exception
*/
void rollback() throws Exception;
void rollback(AbstractTransactionEvent transactionEvent) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package io.shardingsphere.transaction.xa;

import io.shardingsphere.core.transaction.event.AbstractTransactionEvent;
import io.shardingsphere.core.transaction.spi.Transaction;

import javax.transaction.HeuristicMixedException;
Expand All @@ -36,17 +37,17 @@ public class AtomikosXaTransaction implements Transaction {
private static UserTransaction userTransaction = AtomikosUserTransaction.getInstance();

@Override
public void begin() throws SystemException, NotSupportedException {
public void begin(AbstractTransactionEvent transactionEvent) throws SystemException, NotSupportedException {
userTransaction.begin();
}

@Override
public void commit() throws HeuristicRollbackException, RollbackException, HeuristicMixedException, SystemException {
public void commit(AbstractTransactionEvent transactionEvent) throws HeuristicRollbackException, RollbackException, HeuristicMixedException, SystemException {
userTransaction.commit();
}

@Override
public void rollback() throws SystemException {
public void rollback(AbstractTransactionEvent transactionEvent) throws SystemException {
userTransaction.rollback();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,43 +17,40 @@

package io.shardingsphere.transaction.xa;

import io.shardingsphere.core.transaction.event.AbstractTransactionEvent;
import io.shardingsphere.core.transaction.event.WeakXaTransactionEvent;
import io.shardingsphere.core.transaction.spi.Transaction;
import io.shardingsphere.core.util.EventBusInstance;
import lombok.AllArgsConstructor;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;

/**
* Weak XA transaction implement for Transaction spi.
*
* @author zhaojun
*/
@AllArgsConstructor
public class WeakXaTransaction implements Transaction {

static {
EventBusInstance.getInstance().register(new WeakXaTransactionListener(new WeakXaTransaction()));
}

private final boolean autoCommit = true;

private final Map<String, Connection> cachedConnections;

@Override
public void begin() throws SQLException {
for (Connection each : cachedConnections.values()) {
each.setAutoCommit(autoCommit);
public void begin(AbstractTransactionEvent transactionEvent) throws SQLException {
WeakXaTransactionEvent weakXaTransactionEvent = (WeakXaTransactionEvent) transactionEvent;
for (Connection each : weakXaTransactionEvent.getCachedConnections().values()) {
each.setAutoCommit(weakXaTransactionEvent.isAutoCommit());
}
}

@Override
public void commit() throws SQLException {
public void commit(AbstractTransactionEvent transactionEvent) throws SQLException {
WeakXaTransactionEvent weakXaTransactionEvent = (WeakXaTransactionEvent) transactionEvent;
Collection<SQLException> exceptions = new LinkedList<>();
for (Connection each : cachedConnections.values()) {
for (Connection each : weakXaTransactionEvent.getCachedConnections().values()) {
try {
each.commit();
} catch (final SQLException ex) {
Expand All @@ -64,9 +61,10 @@ public void commit() throws SQLException {
}

@Override
public void rollback() throws SQLException {
public void rollback(AbstractTransactionEvent transactionEvent) throws SQLException {
WeakXaTransactionEvent weakXaTransactionEvent = (WeakXaTransactionEvent) transactionEvent;
Collection<SQLException> exceptions = new LinkedList<>();
for (Connection each : cachedConnections.values()) {
for (Connection each : weakXaTransactionEvent.getCachedConnections().values()) {
try {
each.rollback();
} catch (final SQLException ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@
import io.shardingsphere.core.transaction.event.WeakXaTransactionEvent;
import lombok.AllArgsConstructor;

import java.sql.Connection;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;

/**
* Weak-XA Transaction Listener.
Expand All @@ -48,56 +45,15 @@ public class WeakXaTransactionListener {
public void listen(final WeakXaTransactionEvent weakXaTransactionEvent) throws SQLException {
switch (weakXaTransactionEvent.getTclType()) {
case BEGIN:
weakXaTransaction.begin();
weakXaTransaction.begin(weakXaTransactionEvent);
break;
case COMMIT:
weakXaTransaction.commit();
weakXaTransaction.commit(weakXaTransactionEvent);
break;
case ROLLBACK:
weakXaTransaction.rollback();
weakXaTransaction.rollback(weakXaTransactionEvent);
break;
default:
}
}

private void doBegin(final WeakXaTransactionEvent weakXaTransactionEvent) throws SQLException {
for (Connection each : weakXaTransactionEvent.getCachedConnections().values()) {
each.setAutoCommit(weakXaTransactionEvent.isAutoCommit());
}
}

private void doCommit(final WeakXaTransactionEvent weakXaTransactionEvent) throws SQLException {
Collection<SQLException> exceptions = new LinkedList<>();
for (Connection each : weakXaTransactionEvent.getCachedConnections().values()) {
try {
each.commit();
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
throwSQLExceptionIfNecessary(exceptions);
}

private void doRollback(final WeakXaTransactionEvent weakXaTransactionEvent) throws SQLException {
Collection<SQLException> exceptions = new LinkedList<>();
for (Connection each : weakXaTransactionEvent.getCachedConnections().values()) {
try {
each.rollback();
} catch (final SQLException ex) {
exceptions.add(ex);
}
}
throwSQLExceptionIfNecessary(exceptions);
}

private void throwSQLExceptionIfNecessary(final Collection<SQLException> exceptions) throws SQLException {
if (exceptions.isEmpty()) {
return;
}
SQLException ex = new SQLException();
for (SQLException each : exceptions) {
ex.setNextException(each);
}
throw ex;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.common.eventbus.AllowConcurrentEvents;
import com.google.common.eventbus.Subscribe;
import io.shardingsphere.core.transaction.event.XaTransactionEvent;
import lombok.AllArgsConstructor;

import javax.transaction.HeuristicMixedException;
import javax.transaction.HeuristicRollbackException;
Expand All @@ -32,8 +33,11 @@
*
* @author zhaojun
*/
@AllArgsConstructor
public class XaTransactionListener {

private AtomikosXaTransaction atomikosXaTransaction;

/**
* Listen event.
*
Expand All @@ -50,13 +54,13 @@ public class XaTransactionListener {
public void listen(final XaTransactionEvent xaTransactionEvent) throws SystemException, NotSupportedException, HeuristicRollbackException, HeuristicMixedException, RollbackException {
switch (xaTransactionEvent.getTclType()) {
case BEGIN:
AtomikosUserTransaction.getInstance().begin();
atomikosXaTransaction.begin(xaTransactionEvent);
break;
case COMMIT:
AtomikosUserTransaction.getInstance().commit();
atomikosXaTransaction.commit(xaTransactionEvent);
break;
case ROLLBACK:
AtomikosUserTransaction.getInstance().rollback();
atomikosXaTransaction.rollback(xaTransactionEvent);
break;
default:
}
Expand Down

0 comments on commit 1c70533

Please sign in to comment.