Spring Boot 声明式事务

1. 生效原理

1.1 编写测试Demo来测试事务

编写一个普通的 Service 来简单构造一个事务场景。

@Service
public class DemoService {

    @Transactional(rollbackFor = Exception.class)
    public void test1() {
        System.out.println("test1 run...");
        int i = 1 / 0;
        System.out.println("test1 finish...");
    }

}

在启动类上标注 @EnableTransactionManagement 注解来启动注解事务。

@EnableTransactionManagement
@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        ConfigurableApplicationContext ctx = SpringApplication.run(DemoApplication.class, args);
        DemoService demoService = ctx.getBean(DemoService.class);
        demoService.test1();
    }

}

运行主启动类,发现控制台没有打印 test1 finish... ,并输出异常信息。

test1 run...
Exception in thread "main" java.lang.ArithmeticException: / by zero
    at com.example.demo.service.DemoService.test1(DemoService.java:12)
    at com.example.demo.service.DemoService$$FastClassBySpringCGLIB$$203c87bf.invoke(<generated>)

并且从控制台的异常信息栈中发现了cglib的身影,因为编写的 Service 没有接口,使用cglib创建的代理对象。

接下来咱来开始分析注解声明式事务的生效原理。

1.2 @EnableTransactionManagement

@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
    boolean proxyTargetClass() default false;

    AdviceMode mode() default AdviceMode.PROXY;

    int order() default Ordered.LOWEST_PRECEDENCE;
}

注解内部的定义咱暂且不关心,只记住默认使用 PROXY - 代理方式来增强代码即可。

@EnableTransactionManagement 注解上面声明了 @Import ,它导了一个Selector:TransactionManagementConfigurationSelector

1.3 TransactionManagementConfigurationSelector

咱已经很清楚, ImportSelector 的作用是筛选组件,返回组件的全限定类名,让IOC容器来创建这些组件。

public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {

    @Override
    protected String[] selectImports(AdviceMode adviceMode) {
        switch (adviceMode) {
            case PROXY:
                return new String[] {AutoProxyRegistrar.class.getName(), ProxyTransactionManagementConfiguration.class.getName()};
            case ASPECTJ:
                return new String[] {TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME};
            default:
                return null;
        }
    }

}

@EnableTransactionManagement 注解默认使用 PROXY 来增强事务,那这个switch结构中就应该返回两个类的全限定类名:AutoProxyRegistrarProxyTransactionManagementConfiguration ,可以看得出来,声明式事务最终起作用是上述两个组件的功能。下面咱分别来看这两个类。

1.4 AutoProxyRegistrar

public class AutoProxyRegistrar implements ImportBeanDefinitionRegistrar {
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
        boolean candidateFound = false;
        Set<String> annoTypes = importingClassMetadata.getAnnotationTypes();
        for (String annoType : annoTypes) {
            AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annoType);
            if (candidate == null) {
                continue;
            }
            Object mode = candidate.get("mode");
            Object proxyTargetClass = candidate.get("proxyTargetClass");
            if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
                Boolean.class == proxyTargetClass.getClass()) {
                candidateFound = true;
                // PROXY模式下会额外注册Bean
                if (mode == AdviceMode.PROXY) {
                    AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
                    if ((Boolean) proxyTargetClass) {
                        AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
                        return;
                    }
                }
            }
        }
        if (!candidateFound) {
            String name = getClass().getSimpleName();
            // logger......
        }
    }

}

它又实现了 ImportBeanDefinitionRegistrar ,又是手动向IOC容器中导入组件。

注意中间部分的一个if判断:如果 @EnableTransactionManagement 注解中设置 adviceModePROXY (默认PROXY),则会利用 AopUtils 创建组件,并且如果 @EnableTransactionManagement 设置 proxyTargetClass 为true,则还会额外导入组件(默认为false)。下面咱看看它又向容器里注册了什么组件。

1.4.1 AopUtils.registerAutoProxyCreatorIfNecessary

public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry) {
    return registerAutoProxyCreatorIfNecessary(registry, null);
}

public static BeanDefinition registerAutoProxyCreatorIfNecessary(BeanDefinitionRegistry registry,
                                                                 @Nullable Object source) {
    return registerOrEscalateApcAsRequired(InfrastructureAdvisorAutoProxyCreator.class, registry, source);
}

@Nullable
    private static BeanDefinition registerOrEscalateApcAsRequired(Class<?> cls, BeanDefinitionRegistry registry,
                                                                  @Nullable Object source) {

    Assert.notNull(registry, "BeanDefinitionRegistry must not be null");

    if (registry.containsBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME)) {
        BeanDefinition apcDefinition = registry.getBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME);
        if (!cls.getName().equals(apcDefinition.getBeanClassName())) {
            int currentPriority = findPriorityForClass(apcDefinition.getBeanClassName());
            int requiredPriority = findPriorityForClass(cls);
            if (currentPriority < requiredPriority) {
                apcDefinition.setBeanClassName(cls.getName());
            }
        }
        return null;
    }

    RootBeanDefinition beanDefinition = new RootBeanDefinition(cls);
    beanDefinition.setSource(source);
    beanDefinition.getPropertyValues().add("order", Ordered.HIGHEST_PRECEDENCE);
    beanDefinition.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
    registry.registerBeanDefinition(AUTO_PROXY_CREATOR_BEAN_NAME, beanDefinition);
    return beanDefinition;
}

从上面的方法一级一级向下执行,最终来到 registerOrEscalateApcAsRequired 方法(注意在第二层方法中传入了一个 InfrastructureAdvisorAutoProxyCreator.class (基础增强器自动代理创建器),可能下面就是注册这个类型的组件)。

(仔细观察一下,这种类名的命名风格分明就是AOP组件的命名诶。。。先保留这个疑问,继续往下看)

先看看它传的这个 InfrastructureAdvisorAutoProxyCreator 类是什么东西吧:

1.4.1.1 InfrastructureAdvisorAutoProxyCreator

文档注释原文翻译:

Auto-proxy creator that considers infrastructure Advisor beans only, ignoring any application-defined Advisors.

自动代理创建器,仅考虑基础结构Advisor类型的Bean,而忽略任何应用程序定义的Advisor。

注释解释的不是很清楚,咱再看看这个类的继承结构:

public class InfrastructureAdvisorAutoProxyCreator extends AbstractAdvisorAutoProxyCreator
public abstract class AbstractAdvisorAutoProxyCreator extends AbstractAutoProxyCreator
public abstract class AbstractAutoProxyCreator extends ProxyProcessorSupport
            implements SmartInstantiationAwareBeanPostProcessor, BeanFactoryAware

可以发现它也是个后置处理器,并且是在Bean创建前后执行的后置处理器(InstantiationAwareBeanPostProcessor),而且它来自 spring-aop 包。那既然是这样,它与之前AOP部分咱看到的思路就完全一致了(该类/父类中一定会有寻找增强器、过滤增强器,最终生成代理包装Bean为代理对象的方法)。

回到上面的 AopUtils 类,registerAutoProxyCreatorIfNecessary 方法注册了一个 InfrastructureAdvisorAutoProxyCreator ,跟之前咱在AOP部分看到的 @EnableAspectJAutoProxy 注解注册的 AnnotationAwareAspectJAutoProxyCreator 几乎完全一致了,那下面的方法也不用看了,思路真的完全一致。

1.5 ProxyTransactionManagementConfiguration

注意这个配置类还继承了父类,父类的配置也会被加载。

@Configuration
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {

	@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor() {
		BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
		advisor.setTransactionAttributeSource(transactionAttributeSource());
		advisor.setAdvice(transactionInterceptor());
		if (this.enableTx != null) {
			advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
		}
		return advisor;
	}

	@Bean
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public TransactionAttributeSource transactionAttributeSource() {
		return new AnnotationTransactionAttributeSource();
	}

	@Bean
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public TransactionInterceptor transactionInterceptor() {
		TransactionInterceptor interceptor = new TransactionInterceptor();
		interceptor.setTransactionAttributeSource(transactionAttributeSource());
		if (this.txManager != null) {
			interceptor.setTransactionManager(this.txManager);
		}
		return interceptor;
	}

}

很明显它向IOC容器中注册了3个Bean。一个一个来看:

1.5.1 transactionAdvisor:事务增强器

BeanFactoryTransactionAttributeSourceAdvisor 的文档注释原文翻译:

Advisor driven by a TransactionAttributeSource, used to include a transaction advice bean for methods that are transactional.

TransactionAttributeSource 驱动的增强器,用于为开启事务的Bean的方法附加事务通知。

文档注释大概是描述是它给业务方法增强事务通知,咱先放一边。注意看这个类名的最后:Advisor ,它是一个增强器!

看一眼这个类的继承和一些成员:

public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {

    @Nullable
    private TransactionAttributeSource transactionAttributeSource;

    private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut() {
        @Override
        @Nullable
        protected TransactionAttributeSource getTransactionAttributeSource() {
            return transactionAttributeSource;
        }
    };

从这部分源码中可以得知非常关键的点:它是利用切入点来增强方法 (源码中看到了pointcut)。源码中的pointcut属性的创建又要借助 TransactionAttributeSource 。这部分依赖关系如下:

Advisor → → →依赖→ → → Pointcut → → →依赖→ → → Source

1.5.1.1 TransactionAttributeSourcePointcut

通过前面AOP部分的阅读,咱也知道,所有的切入点类都会实现 Pointcut 接口,TransactionAttributeSourcePointcut 的类继承和部分源码:

abstract class TransactionAttributeSourcePointcut extends StaticMethodMatcherPointcut implements Serializable {

    @Override
    public boolean matches(Method method, Class<?> targetClass) {
        if (TransactionalProxy.class.isAssignableFrom(targetClass) ||
            PlatformTransactionManager.class.isAssignableFrom(targetClass) ||
            PersistenceExceptionTranslator.class.isAssignableFrom(targetClass)) {
            return false;
        }
        TransactionAttributeSource tas = getTransactionAttributeSource();
        return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
    }

它实现了 ClassFilter 接口(matches 是重写的方法,源码不再展开), matches 方法有两部分判断逻辑:是否为 TransactionalProxyPlatformTransactionManagerPersistenceExceptionTranslator 的实现类,以及让 TransactionAttributeSource 获取事务属性看是否为空。前半部分好理解,后半部分需要借助 TransactionAttributeSource 来判断,正好配置类中事务增强器的下边就要创建一个 AnnotationTransactionAttributeSource ,那咱就继续往下看。

1.5.2 AnnotationTransactionAttributeSource:注解事务配置源

@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {
    return new AnnotationTransactionAttributeSource();
}

创建出来的只是一个普通的 AnnotationTransactionAttributeSource 而已,它的文档注释原文翻译:

Implementation of the org.springframework.transaction.interceptor.TransactionAttributeSource interface for working with transaction metadata in JDK 1.5+ annotation format. This class reads Spring's JDK 1.5+ Transactional annotation and exposes corresponding transaction attributes to Spring's transaction infrastructure. Also supports JTA 1.2's javax.transaction.Transactional and EJB3's javax.ejb.TransactionAttribute annotation (if present). This class may also serve as base class for a custom TransactionAttributeSource, or get customized through TransactionAnnotationParser strategies.

org.springframework.transaction.interceptor.TransactionAttributeSource接口的实现,用于处理JDK 1.5+注释格式的事务元数据。

此类读取Spring的 @Transactional 注解,并将相应的事务属性公开给Spring的事务基础结构。此外,还支持JTA 1.2的 javax.transaction.Transactional 和EJB3的 javax.ejb.TransactionAttribute 注解(如果存在)。此类也可用作自定义 TransactionAttributeSource 的基类,或通过 TransactionAnnotationParser 策略进行自定义。

说了这么多,我们只关心一句话:它读取 @Transactional 注解。由此可见 AnnotationTransactionAttributeSource 是读取 @Transactional 注解的。

上面的Bean在创建时直接调了构造方法,这个构造方法咱还是要看一下的:

public AnnotationTransactionAttributeSource() {
    this(true);
}

public AnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
    this.publicMethodsOnly = publicMethodsOnly;
this.annotationParsers = new LinkedHashSet<>(2);
this.annotationParsers.add(new SpringTransactionAnnotationParser());
if (jta12Present) {
    this.annotationParsers.add(new JtaTransactionAnnotationParser());
}
if (ejb3Present) {
    this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
}
}

注意下面重载的构造方法中,它给 annotationParsers 中添加了一个 SpringTransactionAnnotationParser

1.5.2.1 SpringTransactionAnnotationParser

public class SpringTransactionAnnotationParser implements TransactionAnnotationParser, Serializable {

    @Override
    @Nullable
    public TransactionAttribute parseTransactionAnnotation(AnnotatedElement ae) {
        // 搜索被标注的元素(类、方法)上是否最终标注了@Transactional注解
        AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(
            ae, Transactional.class, false, false);
        if (attributes != null) {
            return parseTransactionAnnotation(attributes);
        }
        else {
            return null;
        }
    }

    public TransactionAttribute parseTransactionAnnotation(Transactional ann) {
        return parseTransactionAnnotation(AnnotationUtils.getAnnotationAttributes(ann, false, false));
    }

    protected TransactionAttribute parseTransactionAnnotation(AnnotationAttributes attributes) {
        RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
        // 解析事务传播行为
        Propagation propagation = attributes.getEnum("propagation");
        rbta.setPropagationBehavior(propagation.value());
        // 解析事务隔离级别
        Isolation isolation = attributes.getEnum("isolation");
        rbta.setIsolationLevel(isolation.value());
        // 解析超时
        rbta.setTimeout(attributes.getNumber("timeout").intValue());
        // 解析只读事务
        rbta.setReadOnly(attributes.getBoolean("readOnly"));
        rbta.setQualifier(attributes.getString("value"));
        ArrayList<RollbackRuleAttribute> rollBackRules = new ArrayList<>();
        // 解析回滚异常
        Class<?>[] rbf = attributes.getClassArray("rollbackFor");
        for (Class<?> rbRule : rbf) {
            RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule);
            rollBackRules.add(rule);
        }
        String[] rbfc = attributes.getStringArray("rollbackForClassName");
        for (String rbRule : rbfc) {
            RollbackRuleAttribute rule = new RollbackRuleAttribute(rbRule);
            rollBackRules.add(rule);
        }
        // 解析不回滚异常的“白名单”
        Class<?>[] nrbf = attributes.getClassArray("noRollbackFor");
        for (Class<?> rbRule : nrbf) {
            NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule);
            rollBackRules.add(rule);
        }
        String[] nrbfc = attributes.getStringArray("noRollbackForClassName");
        for (String rbRule : nrbfc) {
            NoRollbackRuleAttribute rule = new NoRollbackRuleAttribute(rbRule);
            rollBackRules.add(rule);
        }
        rbta.getRollbackRules().addAll(rollBackRules);
        return rbta;
    }
}

这里面的核心方法如上述源码,可以发现它的核心功能是解析 **@Transactional** 注解的信息

至此上面的配置类中需要的事务增强器、事务切入点、事务配置源、事务注解解析器都解析完,回到配置类中,还有一个拦截器:

1.5.3 TransactionInterceptor

@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor() {
    TransactionInterceptor interceptor = new TransactionInterceptor();
    interceptor.setTransactionAttributeSource(transactionAttributeSource());
    if (this.txManager != null) {
        interceptor.setTransactionManager(this.txManager);
    }
    return interceptor;
}

在Bean的创建过程中,它也把事务配置源保存起来了,并且还注入了事务管理器。而 TransactionInterceptor 本身的类定义:

public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable

发现它实现了 MethodInterceptor !它也是一个AOP的增强器。那它的核心作用大概率就是控制事务咯?咱先不着急,它的工作原理咱到下一篇再看,本篇先把需要配置的组件都解析完。


ProxyTransactionManagementConfiguration 的配置读完之后,别忘了它还继承了一个父类,下面咱看看这个父类里都干了什么:

1.6 AbstractTransactionManagementConfiguration的配置

@Configuration
public abstract class AbstractTransactionManagementConfiguration implements ImportAware {

    @Nullable
    protected AnnotationAttributes enableTx;

    @Nullable
    protected PlatformTransactionManager txManager;


    @Override
    public void setImportMetadata(AnnotationMetadata importMetadata) {
        this.enableTx = AnnotationAttributes.fromMap(
                importMetadata.getAnnotationAttributes(EnableTransactionManagement.class.getName(), false));
        if (this.enableTx == null) {
            throw new IllegalArgumentException(
                    "@EnableTransactionManagement is not present on importing class " + importMetadata.getClassName());
        }
    }

    @Autowired(required = false)
    void setConfigurers(Collection<TransactionManagementConfigurer> configurers) {
        if (CollectionUtils.isEmpty(configurers)) {
            return;
        }
        if (configurers.size() > 1) {
            throw new IllegalStateException("Only one TransactionManagementConfigurer may exist");
        }
        TransactionManagementConfigurer configurer = configurers.iterator().next();
        this.txManager = configurer.annotationDrivenTransactionManager();
    }


    @Bean(name = TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME)
    @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
    public static TransactionalEventListenerFactory transactionalEventListenerFactory() {
        return new TransactionalEventListenerFactory();
    }

}

可以发现在最底下它又创建了一个组件,类型是 TransactionalEventListenerFactory

1.6.1 TransactionalEventListenerFactory

它的文档注释原文翻译:

EventListenerFactory implementation that handles TransactionalEventListener annotated methods.

EventListenerFactory的实现类,用于处理带有 @TransactionalEventListener 注解的方法。

发现它又提到了一个注解:@TransactionalEventListener ,实际上 TransactionalEventListenerFactory 这个组件是做事务监听机制的。

【如果小伙伴还不是很了解 @TransactionalEventListener ,请继续往下看,熟悉的小伙伴请跳过6.1节】

1.6.1.1 【扩展】@TransactionalEventListener

自 SpringFramework4.2 之后,出现了一种能在事务动作发生前后注入监听器的机制。

举几个应用场景的例子:

  • 执行完数据库操作后发送消息

  • 执行数据库操作之前记录日志

  • 业务逻辑出错时事务回滚之后发邮件警报

类似于这种事务动作执行前后进行附加操作的问题,在SpringFramework4.2之后就可以通过 @TransactionalEventListener 注解来实现。

@TransactionalEventListener 可提供4种监听时机,来执行附加操作:

  • BEFORE_COMMIT:提交之前

  • AFTER_COMMIT:提交之后

  • AFTER_ROLLBACK:回滚之后

  • AFTER_COMPLETION:事务完成之后

1.6.1.2 @TransactionalEventListener的使用方式简单Demo

@Service
public class DemoService {

    @Autowired
    private USerDao userDao;

    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    @Transactional(rollbackFor = Exception.class)
    public void test() {
        // 执行清空用户的数据库操作
        userDao.deleteAll();

        // 使用事件广播器来广播 用户清除事件
        applicationEventPublisher.publishEvent(new UserCleanEvent());
    }
}

@Component
class MyTransactionListener {

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    private void onTestEvent(UserCleanEvent event) {
        System.out.println("UserCleanEvent detected ......");
    }

}

// 定义 用户清除事件,它需要继承ApplicationEvent 
class UserCleanEvent extends ApplicationEvent {

}

1.7 小结

  1. Spring的注解事务底层是借助AOP的机制,创建了一个 InfrastructureAdvisorAutoProxyCreator 组件来创建代理对象。

  2. 注解事务要想生效,需要事务增强器、事务切入点解析器、事务配置源、事务拦截器等组件。

2. 工作原理

2.0 测试Demo

@Service
public class DemoService {

    @Transactional(rollbackFor = Exception.class)
    public void test1() {
        System.out.println("test1 run...");
        int i = 1 / 0;
        System.out.println("test1 finish...");
    }

}

@Transactional 标注的注解所在类,在IOC容器初始化时被动态代理为一个代理对象。由于上面定义的Service是类,其代理方式为cglib代理。故加事务的方法在执行时首先被调用的方法是:intercept

下面将以Debug步骤来逐步观察和分析事务控制流程。

2.1 intercept

public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable {
    // ......
    try {
        // ......
        List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);
        Object retVal;
        // ......
        retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
        // ......
    }

整个方法在之前的AOP部分分析过了,关键的步骤是上述源码中的两步:获取拦截器调用链,执行代理+目标方法的调用。

第一步获取拦截器链的内容过程咱就不看了,前面AOP部分已经分析过了,咱看下返回的拦截器都有什么:

只有一个拦截器,而且恰好是上一篇分析的 TransactionInterceptor

下面进入proceed方法。

2.2 proceed

public Object proceed() throws Throwable {
    //  We start with an index of -1 and increment early.
    if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
        return invokeJoinpoint();
    }

    Object interceptorOrInterceptionAdvice =
            this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
    if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
        // ......
    }
    else {
        // It's an interceptor, so we just invoke it: The pointcut will have
        // been evaluated statically before this object was constructed.
        return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
    }
}

第一次进入 proceed 方法,由于此时 -1 ≠ (1 - 1) ,第一个if结构不进入。

自然进入下面的拦截器执行部分, TransactionInterceptor 不属于 InterceptorAndDynamicMethodMatcher ,自然走下面的else结构,执行 invoke 方法。

2.3 invoke

来到 TransactionInterceptor

public Object invoke(MethodInvocation invocation) throws Throwable {
    // Work out the target class: may be null.
    // The TransactionAttributeSource should be passed the target class
    // as well as the method, which may be from an interface.
    Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);

    // Adapt to TransactionAspectSupport's invokeWithinTransaction...
    return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed);
}

首先做一次目标方法执行的空校验,AopUtils.getTargetClass() 是为了获取被代理的目标类,之后执行 invokeWithinTransaction 方法,套用事务。

2.4 invokeWithinTransaction

该方法在父类 TransactionAspectSupport 中定义。

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
        final InvocationCallback invocation) throws Throwable {

    // If the transaction attribute is null, the method is non-transactional.
    TransactionAttributeSource tas = getTransactionAttributeSource();
    // 获取@Transactional的属性配置
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    // 获取事务管理器(IOC容器中获取)
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // Standard transaction demarcation with getTransaction and commit/rollback calls.
        // 创建事务
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

        Object retVal;
        try {
            // This is an around advice: Invoke the next interceptor in the chain.
            // This will normally result in a target object being invoked.
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
            // target invocation exception
            // 回滚事务
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
            cleanupTransactionInfo(txInfo);
        }
        // 提交事务
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }

    else {
        // 与上面相似,省略。。。
    }
}

由源码可以很明显看出来,它使用的是环绕通知

下面的 try-catch-finally 中,可以发现方法正常执行后,没有问题,会在finally块下面执行 commitTransactionAfterReturning 方法来提交事务,出现异常时会进入catch块,执行 completeTransactionAfterThrowing 方法来回滚事务。

下面根据不同情况来分别Debug看效果。

2.4.1 成功提交事务

将DemoService的test方法中去掉除零运算,Debug运行之后一切正常,try块没有抛出异常,进入下面的 commitTransactionAfterReturning 方法:

try {
    retVal = invocation.proceedWithInvocation();
}
// ......
// 提交事务
commitTransactionAfterReturning(txInfo);
return retVal;
}

准备进入 commitTransactionAfterReturning 方法时的Debug状态:

下面进入 commitTransactionAfterReturning 方法:

2.4.1.1 commitTransactionAfterReturning

protected void commitTransactionAfterReturning(@Nullable TransactionInfo txInfo) {
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
        if (logger.isTraceEnabled()) {
            logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]");
        }
        txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
    }
}

核心方法很简单:拿到事务管理器,执行 commit

2.4.1.2 commit

public final void commit(TransactionStatus status) throws TransactionException {
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
            "Transaction is already completed - do not call commit or rollback more than once per transaction");
    }

    // 校验事务状态,如果在事务链中已经有操作将当前事务标记为 “需要回滚” ,则直接回滚事务
    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    if (defStatus.isLocalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Transactional code has requested rollback");
        }
        processRollback(defStatus, false);
        return;
    }

    if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) {
        if (defStatus.isDebug()) {
            logger.debug("Global transaction is marked as rollback-only but transactional code requested commit");
        }
        processRollback(defStatus, true);
        return;
    }

    // 没有异常标记,提交事务
    processCommit(defStatus);
}

2.4.1.3 processCommit

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
    try {
        boolean beforeCompletionInvoked = false;

        try {
            boolean unexpectedRollback = false;
            prepareForCommit(status);
            triggerBeforeCommit(status);
            triggerBeforeCompletion(status);
            beforeCompletionInvoked = true;

            // 是否有保存点
            if (status.hasSavepoint()) {
                if (status.isDebug()) {
                    logger.debug("Releasing transaction savepoint");
                }
                unexpectedRollback = status.isGlobalRollbackOnly();
                // 保存点不会真正提交事务
                status.releaseHeldSavepoint();
            }
                // 全新的事务会执行commit操作(与事务传播行为有关)
            else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction commit");
                }
                unexpectedRollback = status.isGlobalRollbackOnly();
                doCommit(status);
            }
            else if (isFailEarlyOnGlobalRollbackOnly()) {
                unexpectedRollback = status.isGlobalRollbackOnly();
            }

            // Throw UnexpectedRollbackException if we have a global rollback-only
            // marker but still didn't get a corresponding exception from commit.
            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                    "Transaction silently rolled back because it has been marked as rollback-only");
            }
        }
        // catch ......

    }
    finally {
        // 清除缓存资源
        cleanupAfterCompletion(status);
    }
}

由于当前测试的Demo仅仅是单方法事务,所以它是一个全新的事务,进入else-if块,执行 **doCommit** 方法。(又看到xxx和doXXX了)

2.4.1.4 doCommit

protected void doCommit(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    if (status.isDebug()) {
        logger.debug("Committing JDBC transaction on Connection [" + con + "]");
    }
    try {
        con.commit();
    }
    catch (SQLException ex) {
        throw new TransactionSystemException("Could not commit JDBC transaction", ex);
    }
}

至此发现了我们熟悉的面孔,这也是jdbc最底层的API:拿 Connection 对象,调用 **commit** 方法,事务成功提交。

2.4.2 失败回滚事务

将DemoService的test方法中加入除零运算,Debug运行之后发现出现异常,进入 invokeWithinTransaction 方法中的 catch 块:

try {
    retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
    completeTransactionAfterThrowing(txInfo, ex);
    throw ex;
}

准备进入 completeTransactionAfterThrowing 方法时的Debug状态:

下面进入 completeTransactionAfterThrowing 方法:

2.4.2.1 completeTransactionAfterThrowing

protected void completeTransactionAfterThrowing(@Nullable TransactionInfo txInfo, Throwable ex) {
    // 回滚必须要保证当前正在一个事务中
    if (txInfo != null && txInfo.getTransactionStatus() != null) {
    if (logger.isTraceEnabled()) {
        logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() +
                     "] after exception: " + ex);
    }
    // 4.2.1.1 回滚的事务必须为RuntimeException或Error类型
    if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
        try {
            // 回滚事务
            txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
        }
        // catch ......
    }
    else {
        // We don't roll back on this exception.
        // Will still roll back if TransactionStatus.isRollbackOnly() is true.
        // 不满足回滚条件,即便抛出异常也会提交事务
        try {
            txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
        }
        // catch ......
    }
}
}

在回滚之前,它要判定抛出的异常类型。

2.4.2.1.1 rollbackOn

public boolean rollbackOn(Throwable ex) {
    return (ex instanceof RuntimeException || ex instanceof Error);
}

可以发现它是校验异常的类型是否为 RuntimeExceptionError

判断完成后,回到 completeTransactionAfterThrowing 方法,它要拿事务管理器来调 rollback 方法。

2.4.2.2 rollback

public final void rollback(TransactionStatus status) throws TransactionException {
    if (status.isCompleted()) {
        throw new IllegalTransactionStateException(
                "Transaction is already completed - do not call commit or rollback more than once per transaction");
    }

    DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
    processRollback(defStatus, false);
}

方法中先校验一下事务是否已经完成,之后会执行 processRollback 方法。

2.4.2.3 processRollback

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
    try {
        boolean unexpectedRollback = unexpected;

        try {
            triggerBeforeCompletion(status);

            // 如果事务状态中发现了保存点,证明当前事务是NESTD类型(子事务),会回滚到保存点
            if (status.hasSavepoint()) {
                if (status.isDebug()) {
                    logger.debug("Rolling back transaction to savepoint");
                }
                status.rollbackToHeldSavepoint();
            }
                // 全新的事务才会完全回滚
            else if (status.isNewTransaction()) {
                if (status.isDebug()) {
                    logger.debug("Initiating transaction rollback");
                }
                doRollback(status);
            }
            // ......
        }
        finally {
            // 清除事务缓存
            cleanupAfterCompletion(status);
        }
    }

测试Demo依然是单事务,进入 doRollback

2.4.2.4 doRollback

protected void doRollback(DefaultTransactionStatus status) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
    Connection con = txObject.getConnectionHolder().getConnection();
    if (status.isDebug()) {
        logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
    }
    try {
        con.rollback();
    }
    catch (SQLException ex) {
        throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
    }
}

又发现了我们熟悉的jdbc操作:拿 Connection 对象,调用 **rollback** 方法,事务成功回滚。

2.5 小结

  1. 声明式事务由动态代理进入,核心方法是 invokeWithinTransaction

  2. 事务管理器的提交和回滚最终是调用jdbc的底层API来进行提交和回滚。

3. 事务传播行为原理

前面的两篇咱们看了声明式事务的生效原理和工作原理,咱们也知道Spring有7种事务传播行为,这个在开发中也是可能会遇到的。本篇和下一篇会解析声明式事务的事务传播行为原理。

事务传播行为的7种类型:

事务传播行为
描述

PROPAGATION_REQUIRED

【默认值:必需】当前方法必须在事务中运行,如果当前线程中没有事务,则开启一个新的事务;如果当前线程中已经存在事务,则方法将会在该事务中运行。

PROPAGATION_SUPPORTS

【支持】当前方法单独运行时不需要事务,但如果当前线程中存在事务时,方法会在事务中运行

PROPAGATION_MANDATORY

【强制】当前方法必须在事务中运行,如果当前线程中不存在事务,则抛出异常

PROPAGATION_REQUIRES_NEW

【新事务】当前方法必须在独立的事务中运行,如果当前线程中已经存在事务,则将该事务挂起,重新开启一个事务,直到方法运行结束再释放之前的事务

PROPAGATION_NOT_SUPPORTED

【不支持】当前方法不会在事务中运行,如果当前线程中存在事务,则将事务挂起,直到方法运行结束

PROPAGATION_NEVER

【不允许】当前方法不允许在事务中运行,如果当前线程中存在事务,则抛出异常

PROPAGATION_NESTED

【嵌套】当前方法必须在事务中运行,如果当前线程中存在事务,则将该事务标注保存点,形成嵌套事务。嵌套事务中的子事务出现异常不会影响到父事务保存点之前的操作。

3.0 修改测试Demo

修改测试代码如下:

@Service
public class DemoService {
    
    @Transactional(rollbackFor = Exception.class)
    public void test1() {
        System.out.println("test1 run...");
        int i = 1 / 0;
        System.out.println("test1 finish...");
    }
    
}

@Service
public class DemoService2 {
    
    @Autowired
    private DemoService demoService;
    
    @Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
    public void test2() {
        System.out.println("test2 run...");
        demoService.test1();
        System.out.println("test2 finish...");
    }
    
}

@EnableTransactionManagement
@SpringBootApplication
public class DemoApplication {
    
    public static void main(String[] args) {
        ConfigurableApplicationContext ctx = SpringApplication.run(DemoApplication.class, args);
        DemoService2 demoService2 = ctx.getBean(DemoService2.class);
        demoService2.test2();
    }
    
}

默认情况下,SpringFramework 中 @Transactional 的事务传播行为是 Propagation.REQUIRED

Support a current transaction, create a new one if none exists.

支持当前事务,如果不存在则创建新事务。

在上面的测试代码中,应不会打印 "test2 finish..."

这其中的工作机制要回到 invokeWithinTransaction 方法中的 createTransactionIfNecessary 方法中,这部分会真正的开启事务。

下面咱还是以Debug的方式来分步调试,观察事务的开启时机:

3.1 【REQUIRED】第一次Debug

3.1.1 invokeWithinTransaction

咱们只关注关键部分:

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
        final InvocationCallback invocation) throws Throwable {

    // If the transaction attribute is null, the method is non-transactional.
    TransactionAttributeSource tas = getTransactionAttributeSource();
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // Standard transaction demarcation with getTransaction and commit/rollback calls.
        // 1.2 如果当前执行方法需要事务,则开启事务
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

        Object retVal;
        try {
            // This is an around advice: Invoke the next interceptor in the chain.
            // This will normally result in a target object being invoked.
            retVal = invocation.proceedWithInvocation();
        }
        catch (Throwable ex) {
            // target invocation exception
            completeTransactionAfterThrowing(txInfo, ex);
            throw ex;
        }
        finally {
            cleanupTransactionInfo(txInfo);
        }
        commitTransactionAfterReturning(txInfo);
        return retVal;
    }
    // ......
}

createTransactionIfNecessary 方法会根据切入点判断是否需要开启事务,而切入点就是要执行的 test2 方法。

3.1.2 createTransactionIfNecessary

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
                                                       @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

    // If no name specified, apply method identification as transaction name.
    // 如果未指定名称,则将方法名当做事务名称
    if (txAttr != null && txAttr.getName() == null) {
        txAttr = new DelegatingTransactionAttribute(txAttr) {
            @Override
            public String getName() {
                return joinpointIdentification;
            }
        };
    }

    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            // 1.3 获取事务状态。
            status = tm.getTransaction(txAttr);
        }
        else {
            if (logger.isDebugEnabled()) {
                logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
                             "] because no transaction manager has been configured");
            }
        }
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

上面先指定了当前事务的名称,下面会获取事务状态,而这个事务状态要从 DataSourceTransactionManager 中获取。

3.1.3 getTransaction

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    // 获取/创建事务对象
    Object transaction = doGetTransaction();

// log ......

if (definition == null) {
    // Use defaults if no transaction definition given.
    definition = new DefaultTransactionDefinition();
}

// ......
}

第一个 definition 的判断是否为空,Debug发现它不为null,经过方法调用栈的追溯,发现它来自 invokeWithinTransaction 方法:

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
        final InvocationCallback invocation) throws Throwable {

    // If the transaction attribute is null, the method is non-transactional.
    TransactionAttributeSource tas = getTransactionAttributeSource();
    // 事务定义信息已经在这里获取到了
    final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
    final PlatformTransactionManager tm = determineTransactionManager(txAttr);
    final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // Standard transaction demarcation with getTransaction and commit/rollback calls.
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

好吧,我们之前没有抓到这个点,那我们来重新Debug。

3.2 【REQUIRED】第二次Debug

把断点打在 tas.getTransactionAttribute 上,重新Debug,并进入到这个方法,发现来到了 AbstractFallbackTransactionAttributeSource 中。

private final Map<Object, TransactionAttribute> attributeCache = new ConcurrentHashMap<>(1024);

public TransactionAttribute getTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
    if (method.getDeclaringClass() == Object.class) {
        return null;
    }

    // First, see if we have a cached value.
    Object cacheKey = getCacheKey(method, targetClass);
    // 这里取出来的 cached 不是null
    TransactionAttribute cached = this.attributeCache.get(cacheKey);
    if (cached != null) {
        if (cached == NULL_TRANSACTION_ATTRIBUTE) {
            return null;
        }
        else {
            return cached;
        }
    }
    else {
        // We need to work it out.
        TransactionAttribute txAttr = computeTransactionAttribute(method, targetClass);
        // Put it in the cache.
        if (txAttr == null) {
            this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
        }
        else {
            String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
            if (txAttr instanceof DefaultTransactionAttribute) {
                ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
            }
            if (logger.isTraceEnabled()) {
                logger.trace("Adding transactional method '" + methodIdentification + "' with attribute: " + txAttr);
            }
            this.attributeCache.put(cacheKey, txAttr);
        }
        return txAttr;
    }
}

在这个类中,attributeCache 是一个 Map 。通过Debug,走到 this.attributeCache.get(cacheKey) 这一句时发现返回值不为null,直接返回走了!在咱看来这个方法是第一次执行,而且是我在主启动类里手动调用的,为什么 attributeCache 里会有缓存呢?

注意观察上面方法中的else部分,有对 attributeCacheput 操作,由此大概可以断定是之前有执行过这个方法,当时 attributeCache 中还没有,才进入到else中,对 attributeCache 执行 put 操作。

我们把断点打在else中的第一行,再次Debug。

3.3 【REQUIRED】第三次Debug

getTransactionAttribute 的else结构中追踪,发现在IOC容器启动时就已经执行进来了。而执行该方法的调用栈中发现了一个方法:

**org.springframework.aop.framework.autoproxy.AbstractAutoProxyCreator#postProcessAfterInitialization**

而且往上看还看到了 wrapIfNecessary ,证明这是在AOP部分就已经触发了事务信息的加载

仔细看这部分的方法调用,咱会发现这部分其实它想找一些可以应用在当前创建Bean的增强器。

往上倒一级,看 TransactionAttributeSourcePointcutmatches 方法:

public boolean matches(Method method, Class<?> targetClass) {
    if (TransactionalProxy.class.isAssignableFrom(targetClass) ||
            PlatformTransactionManager.class.isAssignableFrom(targetClass) ||
            PersistenceExceptionTranslator.class.isAssignableFrom(targetClass)) {
        return false;
    }
    TransactionAttributeSource tas = getTransactionAttributeSource();
    return (tas == null || tas.getTransactionAttribute(method, targetClass) != null);
}

发现它在这里来触发加载事务定义信息的。(其实在第21篇的5.1.1节已经介绍过它了,不再赘述)

那咱大概就知道了它的触发时机了:因为在之前开启注解事务时,触发自动配置,而自动配置中注入了一个 **InfrastructureAdvisorAutoProxyCreator** ,它配合 **BeanFactoryTransactionAttributeSourceAdvisor** (事务增强器)来完成事务织入,在第一次事务织入时要获取所有切入点,之后它会搜索所有切入点,判断创建的Bean是否可以被织入事务通知 ,在搜索时刚好来到这里要解析事务定义信息,所以会触发解析和缓存动作。

3.3.1 继续往下走,回到getTransaction

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    Object transaction = doGetTransaction();

// log ......

if (definition == null) {
    // Use defaults if no transaction definition given.
    definition = new DefaultTransactionDefinition();
}

// 3.2 判断当前线程中是否存在事务
if (isExistingTransaction(transaction)) {
    // Existing transaction found -> check propagation behavior to find out how to behave.
    return handleExistingTransaction(definition, transaction, debugEnabled);
}

// ......
}

确定事务定义信息不为空后,下一步要调用 isExistingTransaction ,判断当前线程中是否存在事务。

3.3.2 isExistingTransaction

来到 DataSourceTransactionManager

protected boolean isExistingTransaction(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
return (txObject.hasConnectionHolder() && txObject.getConnectionHolder().isTransactionActive());
}

从return的结构中看出,如果 ConnectionHolder 存在且激活,就表明当前线程已经存在事务。

通过Debug,发现 ConnectionHolder 为null,这个方法返回false,不进入上面的片段。

3.3.2.1 ConnectionHolder

文档注释原文翻译:

Resource holder wrapping a JDBC Connection. DataSourceTransactionManager binds instances of this class to the thread, for a specific javax.sql.DataSource. Inherits rollback-only support for nested JDBC transactions and reference count functionality from the base class. Note: This is an SPI class, not intended to be used by applications.

包装JDBC连接的资源持有者。对于特定的 javax.sql.DataSourceDataSourceTransactionManager 将此类的实例绑定到线程。

从父类继承对嵌套JDBC事务和引用计数功能的仅回滚支持。

注意:这是SPI类,不适合应用程序使用。

文档注释很容易理解,它是持有jdbc的 Connection 对象的,DataSourceDataSourceTransactionManager 可以借助它实现线程绑定。

3.3.3 判断超时和事务传播行为类型

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    // ......
    // Check definition settings for new transaction.
    if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
    throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}

// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
    throw new IllegalTransactionStateException(
        "No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
         definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
         definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
    // ......
}

这部分先判断超时时间的设置是否合理(默认的 TransactionDefinition.TIMEOUT_DEFAULT = -1),之后下面要筛选事务传播行为类型:

  • 如果是MANDATORY类型,则直接抛出异常,因为此时还没有事务

  • 如果是REQUIREDREQUIRES_NEWNESTED类型,则创建一个新的事务

  • 其余情况,返回空事务

下面咱先以REQUIRED行为来继续Debug,看它的处理方式。

3.3.4 【REQUIRED】进入else if结构

SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
    logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
    boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
    DefaultTransactionStatus status = newTransactionStatus(
        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
    doBegin(transaction, definition);
    prepareSynchronization(status, definition);
    return status;
}
catch (RuntimeException | Error ex) {
    resume(null, suspendedResources);
    throw ex;
}

这里面执行了几个关键的步骤:

  • 挂起null(相当于无操作)

  • 创建一个新的事务状态,并标记为新事务【关键】

  • 开启事务连接【关键】

  • 准备事务同步工作

分步骤来看:

3.3.4.1 suspend(null)

protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
    if (TransactionSynchronizationManager.isSynchronizationActive()) {
        List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
        try {
            Object suspendedResources = null;
            if (transaction != null) {
                suspendedResources = doSuspend(transaction);
            }
            String name = TransactionSynchronizationManager.getCurrentTransactionName();
            TransactionSynchronizationManager.setCurrentTransactionName(null);
            boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
            Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
            boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
            TransactionSynchronizationManager.setActualTransactionActive(false);
            return new SuspendedResourcesHolder(
                    suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
        }
        catch (RuntimeException | Error ex) {
            // doSuspend failed - original transaction is still active...
            doResumeSynchronization(suspendedSynchronizations);
            throw ex;
        }
    }
    else if (transaction != null) {
        // Transaction active but no synchronization active.
        Object suspendedResources = doSuspend(transaction);
        return new SuspendedResourcesHolder(suspendedResources);
    }
    else {
        // Neither transaction nor synchronization active.
        return null;
    }
}

这个方法既然是挂起和恢复的,从这段实现中只有一个点是我们应该关注的:doSuspend

3.3.4.1.1 doSuspend

protected Object doSuspend(Object transaction) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}

这个方法的逻辑比较简单,它会获取上一次事务的数据源连接对象,并将其从当前 ThreadLocal 中移除。这里面获取数据源的部分很简单:

protected DataSource obtainDataSource() {
    DataSource dataSource = getDataSource();
    Assert.state(dataSource != null, "No DataSource set");
    return dataSource;
}

关键的部分在 TransactionSynchronizationManager.unbindResource 中:

3.3.4.1.2 TransactionSynchronizationManager.unbindResource

public static Object unbindResource(Object key) throws IllegalStateException {
    Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
    Object value = doUnbindResource(actualKey);
    if (value == null) {
        throw new IllegalStateException(
            "No value for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]");
    }
    return value;
}

又看到了doXXX,进到 doUnbindResource 中:

3.3.4.1.3 doUnbindResource

private static final ThreadLocal<Map<Object, Object>> resources = new NamedThreadLocal<>("Transactional resources");

private static Object doUnbindResource(Object actualKey) {
    Map<Object, Object> map = resources.get();
if (map == null) {
    return null;
}
Object value = map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
    resources.remove();
}
// Transparently suppress a ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
    value = null;
}
if (value != null && logger.isTraceEnabled()) {
    logger.trace("Removed value [" + value + "] for key [" + actualKey + "] from thread [" +
                 Thread.currentThread().getName() + "]");
}
return value;
}

可以看到它对 ThreadLocal 中的事务对象进行移除操作,完成事务解除绑定。

至此,suspend(null) 执行完毕。

3.3.4.2 newTransactionStatus

注意源码中传入构造方法中的参数 newTransaction :true (第三个参数),标明马上要开启一个新事务

DefaultTransactionStatus status = newTransactionStatus(
    definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);

protected DefaultTransactionStatus newTransactionStatus(
    TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
    boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

    boolean actualNewSynchronization = newSynchronization &&
        !TransactionSynchronizationManager.isSynchronizationActive();
    return new DefaultTransactionStatus(
        transaction, newTransaction, actualNewSynchronization,
        definition.isReadOnly(), debug, suspendedResources);
}

3.3.4.3 doBegin

protected void doBegin(Object transaction, TransactionDefinition definition) {
    DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
    Connection con = null;

    try {
        // 如果当前线程中没有ConnectionHolder,则会获取新的数据库连接,并放入ConnectionHolder(线程绑定)
        if (!txObject.hasConnectionHolder() ||
            txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
            // 从DataSource中取connection
            Connection newCon = obtainDataSource().getConnection();
            if (logger.isDebugEnabled()) {
                logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
            }
            txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
        }

        txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
        con = txObject.getConnectionHolder().getConnection();

        Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
        txObject.setPreviousIsolationLevel(previousIsolationLevel);

        // Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
        // so we don't want to do it unnecessarily (for example if we've explicitly
        // configured the connection pool to set it already).
        if (con.getAutoCommit()) {
            txObject.setMustRestoreAutoCommit(true);
            if (logger.isDebugEnabled()) {
                logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
            }
            // 开启事务
            con.setAutoCommit(false);
        }

        prepareTransactionalConnection(con, definition);
        // 标记事务状态为激活
        txObject.getConnectionHolder().setTransactionActive(true);

        int timeout = determineTimeout(definition);
        if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
            txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
        }

        // Bind the connection holder to the thread.
        if (txObject.isNewConnectionHolder()) {
            TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
        }
    }

    catch (Throwable ex) {
        if (txObject.isNewConnectionHolder()) {
            DataSourceUtils.releaseConnection(con, obtainDataSource());
            txObject.setConnectionHolder(null, false);
        }
        throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex);
    }
}

try块中的第一个if结构体中,看到了 obtainDataSource().getConnection() ,获取到真正的数据库连接。

之后下面的if结构中,发现了 con.setAutoCommit(false) ,表明关闭自动提交,即开启事务

3.3.4.4 prepareSynchronization

protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) {
    if (status.isNewSynchronization()) {
        TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
        TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(
            definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ?
            definition.getIsolationLevel() : null);
        TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
        TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
        TransactionSynchronizationManager.initSynchronization();
    }
}

这部分是将事务状态和事务定义信息放入事务同步管理器中,逻辑很简单,不再展开。

3.3.5 回到createTransactionIfNecessary

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
        @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    // ......
    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            status = tm.getTransaction(txAttr);
        }
        // ......
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

最后一步准备事务信息:

3.3.6 prepareTransactionInfo

protected TransactionInfo prepareTransactionInfo(@Nullable PlatformTransactionManager tm,
                                                 @Nullable TransactionAttribute txAttr, String joinpointIdentification,
                                                 @Nullable TransactionStatus status) {

    TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
    if (txAttr != null) {
        // We need a transaction for this method...
        if (logger.isTraceEnabled()) {
            logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]");
        }
        // The transaction manager will flag an error if an incompatible tx already exists.
        txInfo.newTransactionStatus(status);
    }
    else {
        // The TransactionInfo.hasTransaction() method will return false. We created it only
        // to preserve the integrity of the ThreadLocal stack maintained in this class.
        if (logger.isTraceEnabled()) {
            logger.trace("No need to create transaction for [" + joinpointIdentification +
                         "]: This method is not transactional.");
        }
    }

    // We always bind the TransactionInfo to the thread, even if we didn't create
    // a new transaction here. This guarantees that the TransactionInfo stack
    // will be managed correctly even if no transaction was created by this aspect.
    txInfo.bindToThread();
    return txInfo;
}

中间大段的日志打印就不看了,最后有一个 txInfo.bindToThread()

3.3.7 txInfo.bindToThread()

private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
    new NamedThreadLocal<>("Current aspect-driven transaction");

private void bindToThread() {
    // Expose current TransactionStatus, preserving any existing TransactionStatus
    // for restoration after this transaction is complete.
    this.oldTransactionInfo = transactionInfoHolder.get();
    transactionInfoHolder.set(this);
}

可以发现又是直接把当前的事务信息放入 ThreadLocal 中。

至此,createTransactionIfNecessary 方法执行完成,DemoService2 的事务成功创建。

3.4 【REQUIRED】DemoService2执行DemoService

@Transactional(rollbackFor = Exception.class, propagation = Propagation.REQUIRED)
public void test2() {
    System.out.println("test2 run...");
    demoService.test1();
    System.out.println("test2 finish...");
}

DemoService2test2 执行中,会执行 DemoServicetest1 方法。

此时又会触发开启事务,来到 invokeWithinTransaction 方法:

3.4.1 invokeWithinTransaction

protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
                                         final InvocationCallback invocation) throws Throwable {
    // ......

    if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
        // Standard transaction demarcation with getTransaction and commit/rollback calls.
        TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

        // ......
    }
    // ......
}

再次进入 createTransactionIfNecessary 方法:

3.4.2 createTransactionIfNecessary

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
                                                       @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
    // .....
    TransactionStatus status = null;
    if (txAttr != null) {
        if (tm != null) {
            status = tm.getTransaction(txAttr);
        }
        // ......
    }
    return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}

进入 getTransaction

3.4.3 getTransaction

public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
    Object transaction = doGetTransaction();

// ......

if (isExistingTransaction(transaction)) {
    // Existing transaction found -> check propagation behavior to find out how to behave.
    return handleExistingTransaction(definition, transaction, debugEnabled);
}

首先去获取事务:

protected Object doGetTransaction() {
    DataSourceTransactionObject txObject = new DataSourceTransactionObject();
    txObject.setSavepointAllowed(isNestedTransactionAllowed());
    // 由于之前已经在ThreadLocal中放入DemoService2的事务,此时可以去除
    ConnectionHolder conHolder =
        (ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
    txObject.setConnectionHolder(conHolder, false);
    return txObject;
}

之后去下面的 isExistingTransaction 方法,很明显此时已经存在事务,进入 handleExistingTransaction 方法。

3.4.4 handleExistingTransaction

private TransactionStatus handleExistingTransaction(
    TransactionDefinition definition, Object transaction, boolean debugEnabled)
    throws TransactionException {

    // 如果当前方法的事务行为是NEVER(不允许),则抛出异常
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
        throw new IllegalTransactionStateException(
            "Existing transaction found for transaction marked with propagation 'never'");
    }

    // 如果当前方法的事务行为是NOT_SUPPORTED,挂起当前事务,执行完成之后,再次唤醒
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction");
        }
        Object suspendedResources = suspend(transaction);
        boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
        return prepareTransactionStatus(
            definition, null, false, newSynchronization, debugEnabled, suspendedResources);
    }

    // 如果当前方法的事务行为是REQUIRED_NEW,挂起当前事务,并新建一个事务执行,执行完之后,唤醒上个事务
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction, creating new transaction with name [" +
                         definition.getName() + "]");
        }
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException | Error beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }

    // 如果当前方法的事务行为是NESTED,创建一个保存点
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                "Transaction manager does not allow nested transactions by default - " +
                "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (debugEnabled) {
            logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
        }
        if (useSavepointForNestedTransaction()) {
            // Create savepoint within existing Spring-managed transaction,
            // through the SavepointManager API implemented by TransactionStatus.
            // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
            DefaultTransactionStatus status =
                prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
                                         status.createAndHoldSavepoint();
                                         return status;
                                         }
                                         else {
                                         // Nested transaction through nested begin and commit/rollback calls.
                                         // Usually only for JTA: Spring synchronization might get activated here
                                         // in case of a pre-existing JTA transaction.
                                         boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                                         DefaultTransactionStatus status = newTransactionStatus(
                                         definition, transaction, true, newSynchronization, debugEnabled, null);
                                         doBegin(transaction, definition);
                                         prepareSynchronization(status, definition);
                                         return status;
                                         }
                                         }

                                         // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
                                         if (debugEnabled) {
                                         logger.debug("Participating in existing transaction");
                                         }
                                         if (isValidateExistingTransaction()) {
                                         if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
                                         Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
                                         if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
                                         Constants isoConstants = DefaultTransactionDefinition.constants;
                                         throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                         definition + "] specifies isolation level which is incompatible with existing transaction: " +
                                         (currentIsolationLevel != null ?
                                         isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
                                         "(unknown)"));
                                         }
                                         }
                                         if (!definition.isReadOnly()) {
                                         if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
                                         throw new IllegalTransactionStateException("Participating transaction with definition [" +
                                         definition + "] is not marked as read-only but existing transaction is");
                                         }
                                         }
                                         }
                                         boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                                         return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
                                         }

关键的判断逻辑都在注释中标注好了,默认情况下 @Transactional 注解中的事务传播行为是REQUIRED,均不属于上面的if判断结构条件,最终到最后的 prepareTransactionStatus 方法,返回出去,全程没有再开启新事务,也没有挂起事务。

至此,REQUIRED模式得以体现。

3.5 【REQUIRES_NEW】过程

DemoServicetest1 方法中 @Transactional 注解的 propagation 修改为 REQUIRES_NEW 。重新Debug,来到 handleExistingTransaction 方法中:

private TransactionStatus handleExistingTransaction(
    TransactionDefinition definition, Object transaction, boolean debugEnabled)
    throws TransactionException {
    // ......
    // 如果当前方法的事务行为是REQUIRED_NEW,挂起当前事务,并新建一个事务执行,执行完之后,唤醒上个事务
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
        if (debugEnabled) {
            logger.debug("Suspending current transaction, creating new transaction with name [" +
                         definition.getName() + "]");
        }
        SuspendedResourcesHolder suspendedResources = suspend(transaction);
        try {
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
        catch (RuntimeException | Error beginEx) {
            resumeAfterBeginException(transaction, suspendedResources, beginEx);
            throw beginEx;
        }
    }

    // ......
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

进入这一组分支中,这里面的步骤与之前几乎完全一致,这里咱只关注几个不太相同的部分。

3.5.1 suspend执行完

通过Debug发现它返回了test2的事务信息:

3.5.2 创建TransactionStatus后

通过Debug发现它的 suspendedResources 包含了test2的事务:

3.5.3 test1方法commit

private void cleanupAfterCompletion(DefaultTransactionStatus status) {
    status.setCompleted();
if (status.isNewSynchronization()) {
    TransactionSynchronizationManager.clear();
}
if (status.isNewTransaction()) {
    doCleanupAfterCompletion(status.getTransaction());
}
if (status.getSuspendedResources() != null) {
    if (status.isDebug()) {
        logger.debug("Resuming suspended transaction after completion of inner transaction");
    }
    Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
    resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}

在清理test1的事务缓存后,最底下有一个 resume 方法,它负责激活上一个事务:

3.5.4 resume

protected final void resume(@Nullable Object transaction, @Nullable SuspendedResourcesHolder resourcesHolder)
    throws TransactionException {

    if (resourcesHolder != null) {
        Object suspendedResources = resourcesHolder.suspendedResources;
        if (suspendedResources != null) {
            doResume(transaction, suspendedResources);
        }
        List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
        if (suspendedSynchronizations != null) {
            TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
            TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
            TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
            TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
            doResumeSynchronization(suspendedSynchronizations);
        }
    }
}

它做空校验后,会拿到当前线程中的上一个事务,并执行 doResume 方法,最终再绑定到事务同步管理器上。

3.5.5 doResume

protected void doResume(@Nullable Object transaction, Object suspendedResources) {
    TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}

发现这里重新绑定了之前被挂起的事务。

至此,REQUIRES_NEW模式也得以体现。

3.6 【NESTED】Debug过程

DemoServicetest1 方法中 @Transactional 注解的 propagation 修改为 NESTED 。重新Debug,来到 handleExistingTransaction 方法中:

private TransactionStatus handleExistingTransaction(
        TransactionDefinition definition, Object transaction, boolean debugEnabled)
        throws TransactionException {
    // ......
    // 如果当前方法的事务行为是NESTED,创建一个保存点
    if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
        if (!isNestedTransactionAllowed()) {
            throw new NestedTransactionNotSupportedException(
                    "Transaction manager does not allow nested transactions by default - " +
                    "specify 'nestedTransactionAllowed' property with value 'true'");
        }
        if (debugEnabled) {
            logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
        }
        if (useSavepointForNestedTransaction()) {
            // Create savepoint within existing Spring-managed transaction,
            // through the SavepointManager API implemented by TransactionStatus.
            // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
            DefaultTransactionStatus status =
                    prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
            status.createAndHoldSavepoint();
            return status;
        }
        else {
            // Nested transaction through nested begin and commit/rollback calls.
            // Usually only for JTA: Spring synchronization might get activated here
            // in case of a pre-existing JTA transaction.
            boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
            DefaultTransactionStatus status = newTransactionStatus(
                    definition, transaction, true, newSynchronization, debugEnabled, null);
            doBegin(transaction, definition);
            prepareSynchronization(status, definition);
            return status;
        }
    }

    // ......
    return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}

这部分流程就跟之前不太一样了,因为涉及到保存点的概念。下面还是Debug到方法的if结构中来看:

3.6.1 useSavepointForNestedTransaction

protected boolean useSavepointForNestedTransaction() {
    return true;
}

没什么好说的,直接进if结构体吧。

3.6.2 prepareTransactionStatus

protected final DefaultTransactionStatus prepareTransactionStatus(
    TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction,
    boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {

    DefaultTransactionStatus status = newTransactionStatus(
        definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
    prepareSynchronization(status, definition);
    return status;
}

这里面分为两个部分:创建 TransactionStatus ,设置同步。这两步也很简单,之前也都看过了,不再赘述。

3.6.3 status.createAndHoldSavepoint

这里是设置保存点的部分。

public void createAndHoldSavepoint() throws TransactionException {
    setSavepoint(getSavepointManager().createSavepoint());
}

很明显核心的部分是拿到 SavepointManager 调用 createSavepoint 方法。

3.6.3.1 getSavepointManager

protected SavepointManager getSavepointManager() {
    Object transaction = this.transaction;
    if (!(transaction instanceof SavepointManager)) {
        throw new NestedTransactionNotSupportedException(
            "Transaction object [" + this.transaction + "] does not support savepoints");
    }
    return (SavepointManager) transaction;
}

很简单,它只是把当前的事务做了一次强转。

3.6.3.2 createSavepoint

public Object createSavepoint() throws TransactionException {
    ConnectionHolder conHolder = getConnectionHolderForSavepoint();
    try {
        if (!conHolder.supportsSavepoints()) {
            throw new NestedTransactionNotSupportedException(
                    "Cannot create a nested transaction because savepoints are not supported by your JDBC driver");
        }
        if (conHolder.isRollbackOnly()) {
            throw new CannotCreateTransactionException(
                    "Cannot create savepoint for transaction which is already marked as rollback-only");
        }
        return conHolder.createSavepoint();
    }
    catch (SQLException ex) {
        throw new CannotCreateTransactionException("Could not create JDBC savepoint", ex);
    }
}

这里面它拿 ConnectionHolder ,最底下的return中调了 createSavepoint 方法来实际的创建保存点。

public Savepoint createSavepoint() throws SQLException {
    this.savepointCounter++;
    return getConnection().setSavepoint(SAVEPOINT_NAME_PREFIX + this.savepointCounter);
}

发现了原生jdbc的操作:Connection 对象的 setSavepoint 方法。

3.6.4 test1方法commit

与之前没什么不同,直接commit即可。最后的清除缓存部分,因为当前事务不是全新的事务,所以没有任何动作,直接返回。

3.7 小结

  1. 声明式事务有7种事务传播行为,默认是REQUIRED。

  2. 事务传播行为的加载过程,是在事务通知织入代理对象时已经创建好了。

  3. 事务传播行为的核心控制点在 getTransactionhandleExistingTransaction 方法中。

最后更新于