Skip to content

Commit

Permalink
optimize: optimized globaltransaction compatibility issues (apache#6366)
Browse files Browse the repository at this point in the history
  • Loading branch information
funky-eyes authored Mar 4, 2024
1 parent 7ce96e4 commit 13a5ada
Show file tree
Hide file tree
Showing 32 changed files with 587 additions and 108 deletions.
1 change: 1 addition & 0 deletions changes/en-us/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ Add changes here for all PR submitted to the 2.x branch.
- [[#6345](https://github.com/apache/incubator-seata/pull/6345)] compatible with tcc module
- [[#6356](https://github.com/apache/incubator-seata/pull/6356)] remove authentication from the health check page
- [[#6360](https://github.com/apache/incubator-seata/pull/6360)] optimize 401 issues for some links
- [[#6366](https://github.com/apache/incubator-seata/pull/6366)] optimized globaltransaction compatibility issues
- [[#6369](https://github.com/apache/incubator-seata/pull/6369)] optimize arm64 ci
- [[#6386](https://github.com/apache/incubator-seata/pull/6386)] replace `byte-buddy` to JDK proxy in `ConfigurationCache`

Expand Down
2 changes: 2 additions & 0 deletions changes/zh-cn/2.x.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,12 @@
- [[#6356](https://github.com/apache/incubator-seata/pull/6356)] 去除健康检查页面的鉴权
- [[#6360](https://github.com/apache/incubator-seata/pull/6360)] 优化部分链接 401 的问题
- [[#6350](https://github.com/apache/incubator-seata/pull/6350)] 移除 enableDegrade 配置
- [[#6366](https://github.com/apache/incubator-seata/pull/6366)] 优化globaltransaction向下兼容性
- [[#6369](https://github.com/apache/incubator-seata/pull/6369)] 优化 arm64 ci
- [[#6386](https://github.com/apache/incubator-seata/pull/6386)]`ConfigurationCache` 类中,将 `byte-buddy` 替换为JDK代理



### refactor:
- [[#6269](https://github.com/apache/incubator-seata/pull/6269)] 统一Seata异常规范

Expand Down
6 changes: 6 additions & 0 deletions compatible/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,12 @@
<version>1.27.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>5.8.2</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seata</groupId>
<artifactId>seata-tcc</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
/**
* Extracting TCC Context from Method
*/
@Deprecated
public final class ActionContextUtil {

private ActionContextUtil() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
/**
* Handler the Tx Participant Aspect : Setting Context, Creating Branch Record
*/
@Deprecated
public class ActionInterceptorHandler extends org.apache.seata.integration.tx.api.interceptor.ActionInterceptorHandler {


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,78 +18,78 @@

import io.seata.spring.annotation.GlobalLock;
import io.seata.spring.annotation.GlobalTransactional;
import io.seata.tm.api.FailureHandler;
import org.apache.seata.common.LockStrategyMode;
import org.apache.seata.core.model.GlobalLockConfig;
import org.apache.seata.integration.tx.api.annotation.AspectTransactional;
import org.apache.seata.integration.tx.api.interceptor.InvocationWrapper;
import org.apache.seata.integration.tx.api.util.ClassUtils;
import org.apache.seata.rm.GlobalLockExecutor;
import org.apache.seata.tm.api.transaction.Propagation;

import java.lang.reflect.Method;
import java.util.Objects;
import java.util.Set;

/**
* The type Global transactional interceptor handler.
*/
@Deprecated
public class GlobalTransactionalInterceptorHandler extends org.apache.seata.integration.tx.api.interceptor.handler.GlobalTransactionalInterceptorHandler {


public GlobalTransactionalInterceptorHandler(FailureHandler failureHandler, Set<String> methodsToProxy) {
public GlobalTransactionalInterceptorHandler(org.apache.seata.tm.api.FailureHandler failureHandler, Set<String> methodsToProxy) {
super(failureHandler, methodsToProxy);
}

public GlobalTransactionalInterceptorHandler(FailureHandler failureHandler, Set<String> methodsToProxy, AspectTransactional aspectTransactional) {
public GlobalTransactionalInterceptorHandler(org.apache.seata.tm.api.FailureHandler failureHandler, Set<String> methodsToProxy,
AspectTransactional aspectTransactional) {
super(failureHandler, methodsToProxy, aspectTransactional);
}

@Override
protected Object doInvoke(InvocationWrapper invocation) throws Throwable {
Class<?> targetClass = invocation.getTarget().getClass();
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
final GlobalTransactional globalTransactionalAnnotation = getAnnotation(specificMethod, targetClass, GlobalTransactional.class);
final GlobalLock globalLockAnnotation = getAnnotation(specificMethod, targetClass, GlobalLock.class);
boolean localDisable = disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
AspectTransactional transactional;
if (globalTransactionalAnnotation != null) {
transactional = new AspectTransactional(globalTransactionalAnnotation.timeoutMills(),
globalTransactionalAnnotation.name(), globalTransactionalAnnotation.rollbackFor(),
globalTransactionalAnnotation.rollbackForClassName(),
globalTransactionalAnnotation.noRollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
org.apache.seata.tm.api.transaction.Propagation.valueOf(globalTransactionalAnnotation.propagation().name()),
globalTransactionalAnnotation.lockRetryInterval(),
globalTransactionalAnnotation.lockRetryTimes(),
org.apache.seata.common.LockStrategyMode.valueOf(globalTransactionalAnnotation.lockStrategyMode().name()));
} else {
transactional = this.aspectTransactional;
}
return handleGlobalTransaction(invocation, transactional);
} else if (globalLockAnnotation != null) {
return handleGlobalLock(invocation, globalLockAnnotation);
}
}
public GlobalLockConfig getGlobalLockConfig(Method method, Class<?> targetClass) {
final GlobalLock globalLockAnno = getAnnotation(method, targetClass, GlobalLock.class);
if (globalLockAnno != null) {
GlobalLockConfig config = new GlobalLockConfig();
config.setLockRetryInterval(globalLockAnno.lockRetryInterval());
config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
return config;
} else {
return null;
}
return invocation.proceed();
}

@Override
public AspectTransactional getAspectTransactional(Method method, Class<?> targetClass) {
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(method, targetClass, GlobalTransactional.class);
return globalTransactionalAnnotation != null ? new AspectTransactional(
globalTransactionalAnnotation.timeoutMills(), globalTransactionalAnnotation.name(),
globalTransactionalAnnotation.rollbackFor(), globalTransactionalAnnotation.rollbackForClassName(),
globalTransactionalAnnotation.noRollbackFor(), globalTransactionalAnnotation.noRollbackForClassName(),
propagation2ApacheSeataPropagation(globalTransactionalAnnotation.propagation()),
globalTransactionalAnnotation.lockRetryInterval(), globalTransactionalAnnotation.lockRetryTimes(),
lockStrategyMode2ApacheSeataLockStrategyMode(globalTransactionalAnnotation.lockStrategyMode())) : null;
}

private Object handleGlobalLock(final InvocationWrapper methodInvocation, final GlobalLock globalLockAnno) throws Throwable {
return globalLockTemplate.execute(new GlobalLockExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
private Propagation propagation2ApacheSeataPropagation(io.seata.tm.api.transaction.Propagation propagation){
switch (propagation) {
case NEVER:
return Propagation.NEVER;
case REQUIRES_NEW:
return Propagation.REQUIRES_NEW;
case NOT_SUPPORTED:
return Propagation.NOT_SUPPORTED;
case SUPPORTS:
return Propagation.SUPPORTS;
case MANDATORY:
return Propagation.MANDATORY;
default:
return Propagation.REQUIRED;
}
}

@Override
public GlobalLockConfig getGlobalLockConfig() {
GlobalLockConfig config = new GlobalLockConfig();
config.setLockRetryInterval(globalLockAnno.lockRetryInterval());
config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
return config;
}
});
private LockStrategyMode lockStrategyMode2ApacheSeataLockStrategyMode(io.seata.common.LockStrategyMode lockStrategyMode){
if (Objects.requireNonNull(lockStrategyMode) == io.seata.common.LockStrategyMode.OPTIMISTIC) {
return LockStrategyMode.OPTIMISTIC;
}
return LockStrategyMode.PESSIMISTIC;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,16 @@
*/
package io.seata.integration.tx.api.interceptor.parser;

import io.seata.integration.tx.api.interceptor.handler.GlobalTransactionalInterceptorHandler;
import io.seata.spring.annotation.GlobalLock;
import io.seata.spring.annotation.GlobalTransactional;
import org.apache.seata.common.util.CollectionUtils;
import org.apache.seata.integration.tx.api.interceptor.handler.ProxyInvocationHandler;
import org.apache.seata.tm.api.FailureHandlerHolder;

import java.lang.reflect.Method;

@Deprecated
public class GlobalTransactionalInterceptorParser extends org.apache.seata.integration.tx.api.interceptor.parser.GlobalTransactionalInterceptorParser {

@Override
Expand All @@ -33,24 +37,21 @@ protected boolean existsAnnotation(Class<?>... classes) {
continue;
}
GlobalTransactional trxAnnoOld = clazz.getAnnotation(GlobalTransactional.class);
org.apache.seata.spring.annotation.GlobalTransactional trxAnnoNew = clazz.getAnnotation(org.apache.seata.spring.annotation.GlobalTransactional.class);

if (trxAnnoOld != null || trxAnnoNew != null) {
if (trxAnnoOld != null) {
return true;
}
Method[] methods = clazz.getMethods();
for (Method method : methods) {
trxAnnoOld = method.getAnnotation(GlobalTransactional.class);
trxAnnoNew = method.getAnnotation(org.apache.seata.spring.annotation.GlobalTransactional.class);
if (trxAnnoOld != null || trxAnnoNew != null) {
if (trxAnnoOld != null) {
methodsToProxy.add(method.getName());
result = true;
}

GlobalLock lockAnnoOld = method.getAnnotation(GlobalLock.class);
org.apache.seata.spring.annotation.GlobalLock lockAnnoNew = method.getAnnotation(org.apache.seata.spring.annotation.GlobalLock.class);

if (lockAnnoOld != null || lockAnnoNew != null) {
if (lockAnnoOld != null) {
methodsToProxy.add(method.getName());
result = true;
}
Expand All @@ -59,4 +60,10 @@ protected boolean existsAnnotation(Class<?>... classes) {
}
return result;
}

@Override
public ProxyInvocationHandler createProxyInvocationHandler(){
return new GlobalTransactionalInterceptorHandler(FailureHandlerHolder.getFailureHandler(), methodsToProxy);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package io.seata.integration.tx.api.json;



@Deprecated
public interface JsonParser extends org.apache.seata.integration.tx.api.json.JsonParser {
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,5 +21,6 @@
* extract remoting bean info
*
*/
@Deprecated
public interface RemotingParser extends org.apache.seata.integration.tx.api.remoting.RemotingParser {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.seata.tm.api;

import java.util.concurrent.TimeUnit;
import io.netty.util.HashedWheelTimer;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import io.seata.core.model.GlobalStatus;
import org.apache.seata.common.thread.NamedThreadFactory;
import org.apache.seata.core.exception.TransactionException;
import org.apache.seata.core.logger.StackTraceLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The type Default failure handler.
*
*/
public class DefaultFailureHandlerImpl implements FailureHandler {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultFailureHandlerImpl.class);

/**
* Retry 1 hours by default
*/
private static final int RETRY_MAX_TIMES = 6 * 60;

private static final long SCHEDULE_INTERVAL_SECONDS = 10;

private static final long TICK_DURATION = 1;

private static final int TICKS_PER_WHEEL = 8;

private static final HashedWheelTimer TIMER = new HashedWheelTimer(
new NamedThreadFactory("failedTransactionRetry", 1),
TICK_DURATION, TimeUnit.SECONDS, TICKS_PER_WHEEL);

@Override
public void onBeginFailure(GlobalTransaction tx, Throwable cause) {
LOGGER.warn("Failed to begin transaction. ", cause);
}

@Override
public void onCommitFailure(GlobalTransaction tx, Throwable cause) {
LOGGER.warn("Failed to commit transaction[" + tx.getXid() + "]", cause);
TIMER.newTimeout(new DefaultFailureHandlerImpl.CheckTimerTask(tx, GlobalStatus.Committed), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}

@Override
public void onRollbackFailure(GlobalTransaction tx, Throwable originalException) {
LOGGER.warn("Failed to rollback transaction[" + tx.getXid() + "]", originalException);
TIMER.newTimeout(new DefaultFailureHandlerImpl.CheckTimerTask(tx, GlobalStatus.Rollbacked), SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}

@Override
public void onRollbacking(GlobalTransaction tx, Throwable originalException) {
StackTraceLogger.warn(LOGGER, originalException, "Retrying to rollback transaction[{}]", new String[] {tx.getXid()});
TIMER.newTimeout(new DefaultFailureHandlerImpl.CheckTimerTask(tx, GlobalStatus.RollbackRetrying), SCHEDULE_INTERVAL_SECONDS,
TimeUnit.SECONDS);
}

protected class CheckTimerTask implements TimerTask {

private final GlobalTransaction tx;

private final GlobalStatus required;

private int count = 0;

private boolean isStopped = false;

protected CheckTimerTask(final GlobalTransaction tx, GlobalStatus required) {
this.tx = tx;
this.required = required;
}

@Override
public void run(Timeout timeout) throws Exception {
if (!isStopped) {
if (++count > RETRY_MAX_TIMES) {
LOGGER.error("transaction [{}] retry fetch status times exceed the limit [{} times]", tx.getXid(), RETRY_MAX_TIMES);
return;
}
isStopped = shouldStop(tx, required);
TIMER.newTimeout(this, SCHEDULE_INTERVAL_SECONDS, TimeUnit.SECONDS);
}
}
}

private boolean shouldStop(final GlobalTransaction tx, GlobalStatus required) {
try {
GlobalStatus status = tx.getStatus();
LOGGER.info("transaction [{}] current status is [{}]", tx.getXid(), status);
if (status == required || status == GlobalStatus.Finished) {
return true;
}
} catch (TransactionException e) {
LOGGER.error("fetch GlobalTransaction status error", e);
}
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,8 @@ public GlobalTransactionRole getGlobalTransactionRole() {
public long getCreateTime() {
return this.instance.getCreateTime();
}

public org.apache.seata.tm.api.DefaultGlobalTransaction getInstance() {
return instance;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,5 @@
*/
package io.seata.tm.api;

public interface FailureHandler extends org.apache.seata.tm.api.FailureHandler {
public interface FailureHandler extends org.apache.seata.tm.api.FailureHandler<GlobalTransaction> {
}
Loading

0 comments on commit 13a5ada

Please sign in to comment.