Skip to content

Commit

Permalink
feature: support propagation.never, propagation.mandatory, transactio…
Browse files Browse the repository at this point in the history
…n suspend and resume api (apache#2359)
  • Loading branch information
lightClouds917 authored Mar 20, 2020
1 parent 6ddcf27 commit 7b796c3
Show file tree
Hide file tree
Showing 8 changed files with 178 additions and 15 deletions.
10 changes: 10 additions & 0 deletions common/src/main/java/io/seata/common/util/StringUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,4 +258,14 @@ public static String trim(final String str) {
public static boolean isEmpty(final CharSequence cs) {
return cs == null || cs.length() == 0;
}

/**
* Checks if a CharSequence is not empty ("") and not null.
*
* @param cs the CharSequence to check, may be null
* @return {@code true} if the CharSequence is not empty and not null
*/
public static boolean isNotEmpty(final CharSequence cs) {
return !isEmpty(cs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.seata.core.model.GlobalStatus;
import io.seata.saga.engine.sequence.SpringJvmUUIDSeqGenerator;
import io.seata.tm.api.GlobalTransaction;
import io.seata.tm.api.transaction.SuspendedResourcesHolder;

/**
*
Expand Down Expand Up @@ -68,6 +69,18 @@ public void rollback() throws TransactionException {

}

@Override
public SuspendedResourcesHolder suspend(boolean unbindXid)
throws TransactionException {
return null;
}

@Override
public void resume(SuspendedResourcesHolder suspendedResourcesHolder)
throws TransactionException {

}

@Override
public GlobalStatus getStatus() throws TransactionException {
return status;
Expand All @@ -87,4 +100,4 @@ public void globalReport(GlobalStatus globalStatus) throws TransactionException
public GlobalStatus getLocalStatus() {
return status;
}
}
}
36 changes: 33 additions & 3 deletions tm/src/main/java/io/seata/tm/api/DefaultGlobalTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/
package io.seata.tm.api;

import io.seata.common.util.StringUtils;
import io.seata.config.ConfigurationFactory;
import io.seata.core.constants.ConfigurationKeys;
import io.seata.core.context.RootContext;
import io.seata.core.exception.TransactionException;
import io.seata.core.model.GlobalStatus;
import io.seata.core.model.TransactionManager;
import io.seata.tm.TransactionManagerHolder;
import io.seata.tm.api.transaction.SuspendedResourcesHolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -134,7 +136,7 @@ public void commit() throws TransactionException {
}
} finally {
if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {
RootContext.unbind();
suspend(true);
}
}
if (LOGGER.isInfoEnabled()) {
Expand Down Expand Up @@ -170,14 +172,42 @@ public void rollback() throws TransactionException {
}
} finally {
if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {
RootContext.unbind();
suspend(true);
}
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("[{}] rollback status: {}", xid, status);
}
}

@Override
public SuspendedResourcesHolder suspend(boolean unbindXid) throws TransactionException {
String xid = RootContext.getXID();
if (StringUtils.isNotEmpty(xid) && unbindXid) {
RootContext.unbind();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Suspending current transaction,xid = {}",xid);
}
} else {
xid = null;
}
return new SuspendedResourcesHolder(xid);
}

@Override
public void resume(SuspendedResourcesHolder suspendedResourcesHolder) throws TransactionException {
if (suspendedResourcesHolder == null) {
return;
}
String xid = suspendedResourcesHolder.getXid();
if (StringUtils.isNotEmpty(xid)) {
RootContext.bind(xid);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Resumimg the transaction,xid = {}", xid);
}
}
}

@Override
public GlobalStatus getStatus() throws TransactionException {
if (xid == null) {
Expand Down Expand Up @@ -206,7 +236,7 @@ public void globalReport(GlobalStatus globalStatus) throws TransactionException
}

if (RootContext.getXID() != null && xid.equals(RootContext.getXID())) {
RootContext.unbind();
suspend(true);
}
}

Expand Down
21 changes: 21 additions & 0 deletions tm/src/main/java/io/seata/tm/api/GlobalTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.seata.core.exception.TransactionException;
import io.seata.core.model.GlobalStatus;
import io.seata.tm.api.transaction.SuspendedResourcesHolder;

/**
* Global transaction.
Expand Down Expand Up @@ -68,6 +69,26 @@ public interface GlobalTransaction {
*/
void rollback() throws TransactionException;

/**
* Suspend the global transaction.
*
* @param unbindXid if true,suspend the global transaction.
* @return the SuspendedResourcesHolder which holds the suspend resources
* @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
* @see SuspendedResourcesHolder
*/
SuspendedResourcesHolder suspend(boolean unbindXid) throws TransactionException;

/**
* Resume the global transaction.
*
* @param suspendedResourcesHolder the suspended resources to resume
* @throws TransactionException Any exception that fails this will be wrapped with TransactionException and thrown
* out.
* @see SuspendedResourcesHolder
*/
void resume(SuspendedResourcesHolder suspendedResourcesHolder) throws TransactionException;

/**
* Ask TC for current status of the corresponding global transaction.
*
Expand Down
41 changes: 31 additions & 10 deletions tm/src/main/java/io/seata/tm/api/TransactionalTemplate.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.seata.core.exception.TransactionException;
import io.seata.core.model.GlobalStatus;
import io.seata.tm.api.transaction.Propagation;
import io.seata.tm.api.transaction.SuspendedResourcesHolder;
import io.seata.tm.api.transaction.TransactionHook;
import io.seata.tm.api.transaction.TransactionHookManager;
import io.seata.tm.api.transaction.TransactionInfo;
Expand Down Expand Up @@ -52,29 +53,44 @@ public Object execute(TransactionalExecutor business) throws Throwable {
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 1.1 get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

// 1.2 Handle the Transaction propatation and the branchType
Propagation propagation = txInfo.getPropagation();
String previousXid = null;
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
previousXid = RootContext.unbind();
suspendedResourcesHolder = tx.suspend(true);
return business.execute();
case REQUIRES_NEW:
previousXid = RootContext.unbind();
suspendedResourcesHolder = tx.suspend(true);
break;
case SUPPORTS:
if (StringUtils.isEmpty(RootContext.getXID())) {
if (!existingTransaction()) {
return business.execute();
}
break;
case REQUIRED:
break;
case NEVER:
if (existingTransaction()) {
throw new TransactionException(
String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
,RootContext.getXID()));
} else {
return business.execute();
}
case MANDATORY:
if (!existingTransaction()) {
throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
}
break;
default:
throw new ShouldNeverHappenException("Not Supported Propagation:" + propagation);
throw new TransactionException("Not Supported Propagation:" + propagation);
}

// 1.1 get or create a transaction
GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

try {

Expand Down Expand Up @@ -104,13 +120,18 @@ public Object execute(TransactionalExecutor business) throws Throwable {
cleanUp();
}
} finally {
if (previousXid != null) {
RootContext.bind(previousXid);
}
tx.resume(suspendedResourcesHolder);
}

}

public boolean existingTransaction() {
return StringUtils.isNotEmpty(RootContext.getXID());

}



private void completeTransactionAfterThrowing(TransactionInfo txInfo, GlobalTransaction tx, Throwable ex) throws TransactionalExecutor.ExecutionException {
//roll back
if (txInfo != null && txInfo.rollbackOn(ex)) {
Expand Down
13 changes: 12 additions & 1 deletion tm/src/main/java/io/seata/tm/api/transaction/Propagation.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,17 @@ public enum Propagation {
/**
* The SUPPORTS
*/
SUPPORTS
SUPPORTS,

/**
* The NEVER
*/
NEVER,

/**
* The MANDATORY
*/
MANDATORY

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Copyright 1999-2019 Seata.io Group.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.tm.api.transaction;

/**
* Holder for suspended resources to support propagation or nested logic.
* Used by {@code suspend} and {@code resume}
*
* @author wangzhongxiang
*/
public class SuspendedResourcesHolder {

/**The xid*/
private String xid;


public SuspendedResourcesHolder() {
}

public SuspendedResourcesHolder(String xid) {
this.xid = xid;
}

public String getXid() {
return xid;
}

public void setXid(String xid) {
this.xid = xid;
}
}
13 changes: 13 additions & 0 deletions tm/src/test/java/io/seata/tm/api/TransactionTemplateTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.seata.tm.api;

import io.seata.core.context.RootContext;
import io.seata.core.model.GlobalStatus;
import io.seata.core.model.TransactionManager;
import io.seata.tm.TransactionManagerHolder;
Expand All @@ -24,6 +25,7 @@
import io.seata.tm.api.transaction.TransactionHookManager;
import io.seata.tm.api.transaction.TransactionInfo;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
Expand Down Expand Up @@ -155,4 +157,15 @@ private void verifyRollBack(TransactionHook transactionHook) {
verify(transactionHook).afterCompletion();
}


@Test
public void testExistingTransaction(){
RootContext.bind(DEFAULT_XID);
TransactionalTemplate template = new TransactionalTemplate();
Assertions.assertTrue(template.existingTransaction(),"Existing transaction");

RootContext.unbind();
Assertions.assertFalse(template.existingTransaction(),"No existing transaction");
}

}

0 comments on commit 7b796c3

Please sign in to comment.