Skip to content

Commit

Permalink
feature: custom saga transaction recovery strategy on transaction tim…
Browse files Browse the repository at this point in the history
…eout (apache#2240)
  • Loading branch information
long187 authored Feb 15, 2020
1 parent 1d6a37b commit 769f061
Show file tree
Hide file tree
Showing 21 changed files with 485 additions and 110 deletions.
4 changes: 2 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ before_script:

script:
- if [ "$TRAVIS_BRANCH" == "develop" ] && [ "$TRAVIS_PULL_REQUEST" == false ]; then
travis_wait 30 ./mvnw clean install -DskipTests=false -P image -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn;
travis_wait 30 ./mvnw clean install -DskipTests=false -q -P image -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn;
else
travis_wait 30 ./mvnw clean install -DskipTests=false -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn;
travis_wait 30 ./mvnw clean install -DskipTests=false -q -B -Dorg.slf4j.simpleLogger.log.org.apache.maven.cli.transfer.Slf4jMavenTransferListener=warn;
fi
after_success:
- bash <(curl -s https://codecov.io/bash)
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -31,6 +32,7 @@
import io.seata.saga.engine.StateMachineConfig;
import io.seata.saga.engine.config.DbStateMachineConfig;
import io.seata.saga.engine.exception.EngineExecutionException;
import io.seata.saga.engine.pcext.utils.EngineUtils;
import io.seata.saga.engine.sequence.SeqGenerator;
import io.seata.saga.engine.serializer.Serializer;
import io.seata.saga.engine.serializer.impl.ExceptionSerializer;
Expand Down Expand Up @@ -82,7 +84,17 @@ public class DbAndReportTcStateLogStore extends AbstractStore implements StateLo
public void recordStateMachineStarted(StateMachineInstance machineInstance, ProcessContext context) {

if (machineInstance != null) {
beginTransaction(machineInstance, context);
//if parentId is not null, machineInstance is a SubStateMachine, do not start a new global transaction,
//use parent transaction instead.
String parentId = machineInstance.getParentId();
if (StringUtils.hasLength(parentId)) {
if (StringUtils.isEmpty(machineInstance.getId())) {
machineInstance.setId(parentId);
}
} else {
beginTransaction(machineInstance, context);
}


if (StringUtils.isEmpty(machineInstance.getId()) && seqGenerator != null) {
machineInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_MACHINE_INST));
Expand All @@ -95,20 +107,10 @@ public void recordStateMachineStarted(StateMachineInstance machineInstance, Proc
}
}

private void beginTransaction(StateMachineInstance machineInstance, ProcessContext context) {
protected void beginTransaction(StateMachineInstance machineInstance, ProcessContext context) {

if (sagaTransactionalTemplate != null) {

//if parentId is not null, machineInstance is a SubStateMachine, do not start a new global transaction,
//use parent transaction instead.
String parentId = machineInstance.getParentId();
if (StringUtils.hasLength(parentId)) {
if (StringUtils.isEmpty(machineInstance.getId())) {
machineInstance.setId(parentId);
}
return;
}

StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(
DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);
TransactionInfo transactionInfo = new TransactionInfo();
Expand Down Expand Up @@ -156,25 +158,25 @@ public void recordStateMachineFinished(StateMachineInstance machineInstance, Pro
machineInstance.setSerializedException(exceptionSerializer.serialize(machineInstance.getException()));
int effect = executeUpdate(stateLogStoreSqls.getRecordStateMachineFinishedSql(dbType),
STATE_MACHINE_INSTANCE_TO_STATEMENT_FOR_UPDATE, machineInstance);
if (effect < 0) {
if (effect < 1) {
LOGGER.warn("StateMachineInstance[{}] is recovery by server, skip recordStateMachineFinished.", machineInstance.getId());
} else {
StateMachineConfig stateMachineConfig = (StateMachineConfig) context.getVariable(
DomainConstants.VAR_NAME_STATEMACHINE_CONFIG);
if (machineInstance.isTimeout(stateMachineConfig.getTransOperationTimeout())) {
if (EngineUtils.isTimeout(machineInstance.getGmtUpdated(), stateMachineConfig.getTransOperationTimeout())) {
LOGGER.warn("StateMachineInstance[{}] is execution timeout, skip report transaction finished to server.", machineInstance.getId());
} else {
} else if (StringUtils.isEmpty(machineInstance.getParentId())) {
//if parentId is not null, machineInstance is a SubStateMachine, do not report global transaction.
reportTransactionFinished(machineInstance, context);
}
}
RootContext.unbind();
}
}

private void reportTransactionFinished(StateMachineInstance machineInstance, ProcessContext context) {
protected void reportTransactionFinished(StateMachineInstance machineInstance, ProcessContext context) {

//if parentId is not null, machineInstance is a SubStateMachine, do not report global transaction.
if (sagaTransactionalTemplate != null && StringUtils.isEmpty(machineInstance.getParentId())) {
if (sagaTransactionalTemplate != null) {

try {
GlobalTransaction globalTransaction = getGlobalTransaction(machineInstance, context);
Expand Down Expand Up @@ -226,8 +228,14 @@ public void recordStateMachineRestarted(StateMachineInstance machineInstance, Pr

if (machineInstance != null) {
//save to db
executeUpdate(stateLogStoreSqls.getUpdateStateMachineRunningStatusSql(dbType), machineInstance.isRunning(), new Timestamp(machineInstance.getGmtUpdated().getTime()),
machineInstance.getId());
Date gmtUpdated = new Date();
int effect = executeUpdate(stateLogStoreSqls.getUpdateStateMachineRunningStatusSql(dbType), machineInstance.isRunning(), new Timestamp(gmtUpdated.getTime()),
machineInstance.getId(), new Timestamp(machineInstance.getGmtUpdated().getTime()));
if (effect < 1) {
throw new EngineExecutionException(
"StateMachineInstance [id:" + machineInstance.getId() + "] is recovered by an other execution, restart denied", FrameworkErrorCode.OperationDenied);
}
machineInstance.setGmtUpdated(gmtUpdated);
}
}

Expand All @@ -236,7 +244,20 @@ public void recordStateStarted(StateInstance stateInstance, ProcessContext conte

if (stateInstance != null) {

branchRegister(stateInstance, context);
//if this state is for retry, do not register branch, but generate id
if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) {

stateInstance.setId(generateRetryStateInstanceId(stateInstance));
}
//if this state is for compensation, do not register branch, but generate id
else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) {

stateInstance.setId(generateCompensateStateInstanceId(stateInstance));
}
else {
branchRegister(stateInstance, context);
}


if (StringUtils.isEmpty(stateInstance.getId()) && seqGenerator != null) {
stateInstance.setId(seqGenerator.generate(DomainConstants.SEQ_ENTITY_STATE_INST));
Expand All @@ -248,49 +269,38 @@ public void recordStateStarted(StateInstance stateInstance, ProcessContext conte
}
}

private void branchRegister(StateInstance stateInstance, ProcessContext context) {
protected void branchRegister(StateInstance stateInstance, ProcessContext context) {

if (sagaTransactionalTemplate != null) {

//if this state is for retry, do not register branch, but generate id
if (StringUtils.hasLength(stateInstance.getStateIdRetriedFor())) {

stateInstance.setId(generateRetryStateInstanceId(stateInstance));
}
//if this state is for compensation, do not register branch, but generate id
else if (StringUtils.hasLength(stateInstance.getStateIdCompensatedFor())) {

stateInstance.setId(generateCompensateStateInstanceId(stateInstance));
} else {
//Register branch
try {
StateMachineInstance machineInstance = stateInstance.getStateMachineInstance();
GlobalTransaction globalTransaction = getGlobalTransaction(machineInstance, context);
if (globalTransaction == null) {
throw new EngineExecutionException("Global transaction is not exists", FrameworkErrorCode.ObjectNotExists);
}

String resourceId = stateInstance.getStateMachineInstance().getStateMachine().getName() + "#" + stateInstance.getName();
long branchId = sagaTransactionalTemplate.branchRegister(resourceId, null, globalTransaction.getXid(), null, null);
stateInstance.setId(String.valueOf(branchId));
} catch (TransactionException e) {
throw new EngineExecutionException(e,
"Branch transaction error: " + e.getCode() + ", StateMachine:" + stateInstance.getStateMachineInstance()
.getStateMachine().getName() + ", XID: " + stateInstance.getStateMachineInstance().getId() + ", State:"
+ stateInstance.getName() + ", stateId: " + stateInstance.getId() + ", Reason: " + e.getMessage(),
FrameworkErrorCode.TransactionManagerError);
} catch (ExecutionException e) {
throw new EngineExecutionException(e,
"Branch transaction error: " + e.getCode() + ", StateMachine:" + stateInstance.getStateMachineInstance()
.getStateMachine().getName() + ", XID: " + stateInstance.getStateMachineInstance().getId() + ", State:"
+ stateInstance.getName() + ", stateId: " + stateInstance.getId() + ", Reason: " + e.getMessage(),
FrameworkErrorCode.TransactionManagerError);
//Register branch
try {
StateMachineInstance machineInstance = stateInstance.getStateMachineInstance();
GlobalTransaction globalTransaction = getGlobalTransaction(machineInstance, context);
if (globalTransaction == null) {
throw new EngineExecutionException("Global transaction is not exists", FrameworkErrorCode.ObjectNotExists);
}

String resourceId = stateInstance.getStateMachineInstance().getStateMachine().getName() + "#" + stateInstance.getName();
long branchId = sagaTransactionalTemplate.branchRegister(resourceId, null, globalTransaction.getXid(), null, null);
stateInstance.setId(String.valueOf(branchId));
} catch (TransactionException e) {
throw new EngineExecutionException(e,
"Branch transaction error: " + e.getCode() + ", StateMachine:" + stateInstance.getStateMachineInstance()
.getStateMachine().getName() + ", XID: " + stateInstance.getStateMachineInstance().getId() + ", State:"
+ stateInstance.getName() + ", stateId: " + stateInstance.getId() + ", Reason: " + e.getMessage(),
FrameworkErrorCode.TransactionManagerError);
} catch (ExecutionException e) {
throw new EngineExecutionException(e,
"Branch transaction error: " + e.getCode() + ", StateMachine:" + stateInstance.getStateMachineInstance()
.getStateMachine().getName() + ", XID: " + stateInstance.getStateMachineInstance().getId() + ", State:"
+ stateInstance.getName() + ", stateId: " + stateInstance.getId() + ", Reason: " + e.getMessage(),
FrameworkErrorCode.TransactionManagerError);
}
}
}

private GlobalTransaction getGlobalTransaction(StateMachineInstance machineInstance, ProcessContext context)
protected GlobalTransaction getGlobalTransaction(StateMachineInstance machineInstance, ProcessContext context)
throws ExecutionException, TransactionException {

GlobalTransaction globalTransaction = (GlobalTransaction) context.getVariable(DomainConstants.VAR_NAME_GLOBAL_TX);
Expand Down Expand Up @@ -392,7 +402,7 @@ public void recordStateFinished(StateInstance stateInstance, ProcessContext cont
}
}

private void branchReport(StateInstance stateInstance, ProcessContext context) {
protected void branchReport(StateInstance stateInstance, ProcessContext context) {

if (sagaTransactionalTemplate != null) {

Expand Down Expand Up @@ -721,8 +731,9 @@ public void toStatement(StateMachineInstance stateMachineInstance, PreparedState
stateMachineInstance.getCompensationStatus() != null ? stateMachineInstance.getCompensationStatus()
.name() : null);
statement.setBoolean(6, stateMachineInstance.isRunning());
statement.setString(7, stateMachineInstance.getId());
statement.setTimestamp(8, new Timestamp(stateMachineInstance.getGmtUpdated().getTime()));
statement.setTimestamp(7, new Timestamp(System.currentTimeMillis()));
statement.setString(8, stateMachineInstance.getId());
statement.setTimestamp(9, new Timestamp(stateMachineInstance.getGmtUpdated().getTime()));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import java.sql.Timestamp;
import java.util.List;

import io.seata.common.util.StringUtils;
import io.seata.saga.engine.store.StateLangStore;
import io.seata.saga.statelang.domain.RecoverStrategy;
import io.seata.saga.statelang.domain.StateMachine;
import io.seata.saga.statelang.domain.StateMachine.Status;
import io.seata.saga.statelang.domain.impl.StateMachineImpl;
Expand Down Expand Up @@ -80,7 +82,10 @@ public StateMachine toObject(ResultSet resultSet) throws SQLException {
stateMachine.setContent(resultSet.getString("content"));
stateMachine.setGmtCreate(resultSet.getTimestamp("gmt_create"));
stateMachine.setType(resultSet.getString("type"));
stateMachine.setRecoverStrategy(resultSet.getString("recover_strategy"));
String recoverStrategy = resultSet.getString("recover_strategy");
if (StringUtils.isNotBlank(recoverStrategy)) {
stateMachine.setRecoverStrategy(RecoverStrategy.valueOf(recoverStrategy));
}
stateMachine.setTenantId(resultSet.getString("tenant_id"));
stateMachine.setStatus(Status.valueOf(resultSet.getString("status")));
return stateMachine;
Expand All @@ -99,7 +104,7 @@ public void toStatement(StateMachine stateMachine, PreparedStatement statement)
statement.setString(7, stateMachine.getVersion());
statement.setString(8, stateMachine.getType());
statement.setString(9, stateMachine.getContent());
statement.setString(10, stateMachine.getRecoverStrategy());
statement.setString(10, stateMachine.getRecoverStrategy() != null ? stateMachine.getRecoverStrategy().name() : null);
statement.setString(11, stateMachine.getComment());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ public class StateLogStoreSqls {

private static final String RECORD_STATE_MACHINE_FINISHED_SQL
= "UPDATE ${TABLE_PREFIX}state_machine_inst SET gmt_end = ?, excep = ?, end_params = ?,status = ?, "
+ "compensation_status = ?, is_running = ?, gmt_updated = current_timestamp WHERE id = ? and gmt_updated = ?";
+ "compensation_status = ?, is_running = ?, gmt_updated = ? WHERE id = ? and gmt_updated = ?";

private static final String UPDATE_STATE_MACHINE_RUNNING_STATUS_SQL =
"UPDATE ${TABLE_PREFIX}state_machine_inst SET\n"
+ "is_running = ?, gmt_updated = ? where id = ?";
+ "is_running = ?, gmt_updated = ? where id = ? and gmt_updated = ?";

private static final String GET_STATE_MACHINE_INSTANCE_BY_ID_SQL = "SELECT " + STATE_MACHINE_INSTANCE_FIELDS
+ " FROM ${TABLE_PREFIX}state_machine_inst WHERE id = ?";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,10 @@ public interface StateMachineConfig {
* @return
*/
int getTransOperationTimeout();

/**
* get service invoke timeout
* @return
*/
int getServiceInvokeTimeout();
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,4 +153,11 @@ StateMachineInstance skipAndForwardAsync(String stateMachineInstId, AsyncCallbac
* @return
*/
StateMachineConfig getStateMachineConfig();

/**
* Reload StateMachine Instance
* @param instId
* @return
*/
StateMachineInstance reloadStateMachineInstance(String instId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ public class DefaultStateMachineConfig implements StateMachineConfig, Applicatio

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultStateMachineConfig.class);

private static final int DEFAULT_TRANS_OPER_TIMEOUT = 60000 * 30;
private static final int DEFAULT_TRANS_OPER_TIMEOUT = 60000 * 30;
private static final int DEFAULT_SERVICE_INVOKE_TIMEOUT = 60000 * 5;

private int transOperationTimeout = DEFAULT_TRANS_OPER_TIMEOUT;
private int serviceInvokeTimeout = DEFAULT_SERVICE_INVOKE_TIMEOUT;

private StateLogRepository stateLogRepository;
private StateLogStore stateLogStore;
Expand Down Expand Up @@ -393,4 +395,12 @@ public void setTransOperationTimeout(int transOperationTimeout) {
this.transOperationTimeout = transOperationTimeout;
}

@Override
public int getServiceInvokeTimeout() {
return serviceInvokeTimeout;
}

public void setServiceInvokeTimeout(int serviceInvokeTimeout) {
this.serviceInvokeTimeout = serviceInvokeTimeout;
}
}
Loading

0 comments on commit 769f061

Please sign in to comment.