AOP 之 @Transactional
使用
- @Transactional 可以添加在类上面,也可以添加在方法上面。同时存在的话,以方法上面的优先。
- rollbackFor 参数默认为 RuntimeException.class,而如果抛出了 Exception.class 类型的异常,事务就不会回滚。因此,可以设置 rollbackFor = Exception.class。
- propagation 参数默认为 Propagation.REQUIRED
@Transactional 或 @Async 注解失效的问题
@Service
@Transactional(rollbackFor = Exception.class)
public class UserService {
@Autowired
private UserMapper userMapper;
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void addAddress() {
return userMapper.addAddress();
}
@Transactional(propagation = Propagation.REQUIRES)
public void addUser() {
/**
* 本意是这个方法需要新开启一个事务,即使后面 userMapper.addUser() 发生异常,addAddress() 也不会回滚事务。
* 然而,由于这两个方法都在 UserService 中,方法的调用相当于 this.addAddress();
* 这样的调用方式不会走代理,因此 Spring 不会读取到 addAddress 方法上面的 @Transactional(propagation = Propagation.REQUIRES_NEW)
* 所以,addAddress() 上面并没有新开启一个事务,所以当 userMapper.addUser() 发生异常,addAddress() 会回滚事务。
* 注解 @Async 和 @Transactional 失效情况同理。都是因为发生了 this 调用,方法没有走代理。
*/
addAddress();
userMapper.addUser();
}
}
解决 @Transactional 或 @Async 注解失效的问题
这两个注解 @Transactional 和 @Async 失效情况同理。都是因为发生了 this 调用,方法没有走代理。那解决方法自然是让它避免 this 调用,而走代理。
方法一,自己注入自己(一般人不会这么做)
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
// 自己注入自己
@Autowired
private UserService userService;
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void addAddress() {
return userMapper.addAddress();
}
@Transactional(propagation = Propagation.REQUIRES)
public void addUser() {
// 使用注入的对象调用方法
userService.addAddress();
userMapper.addUser();
}
}
方法二,拆分为两个 Spring Bean 类,注入即可
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
@Autowired
private AddressService addressService;
@Transactional(propagation = Propagation.REQUIRES)
public void addUser() {
// 使用注入的 AddressService 对象调用方法
addressService.addAddress();
userMapper.addUser();
}
}
@Service
public class AddressService {
@Autowired
private AddressMapper addressMapper;
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void addAddress() {
return addressMapper.addAddress();
}
}
方法三,实现 ApplicationContextAware 接口,从 Spring 容器中获取 bean 对象再调用
@Service
public class UserService implements ApplicationContextAware {
@Autowired
private UserMapper userMapper;
private ApplicationContext applicationContext;
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void addAddress() {
return userMapper.addAddress();
}
@Transactional(propagation = Propagation.REQUIRES)
public void addUser() {
// 使用注入的对象调用方法
UserService proxyUserService = applicationContext.getBean(UserService.class);
proxyUserService.addAddress();
userMapper.addUser();
}
}
方法四, 开启 cglib 代理,手动获取 Spring 代理类
@Configuration
@EnableAspectJAutoProxy(exposeProxy = true)
public class Config{}
@Service
public class UserService {
@Autowired
private UserMapper userMapper;
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void addAddress() {
return userMapper.addAddress();
}
@Transactional(propagation = Propagation.REQUIRES)
public void addUser() {
// 使用注入的对象调用方法
UserService proxyUserService = (UserService)AopContext.currentProxy();
proxyUserService.addAddress();
userMapper.addUser();
}
}
源码解读
SPI 读取 CacheAutoConfiguration 配置类
在 spring-boot-autoconfigure-2.5.4.jar 中通过 SPI 读取自动配置类 org.springframework.boot.autoconfigure.transaction.TransactionAutoConfiguration;
TransactionAutoConfiguration
可以看出,SpringBoot 默认就启用了 CGLIB 代理,并添加了 @EnableTransactionManagement 注解,开启了事务支持。
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(PlatformTransactionManager.class)
@AutoConfigureAfter({ JtaAutoConfiguration.class, HibernateJpaAutoConfiguration.class,
DataSourceTransactionManagerAutoConfiguration.class, Neo4jDataAutoConfiguration.class })
@EnableConfigurationProperties(TransactionProperties.class)
public class TransactionAutoConfiguration {
@Bean
@ConditionalOnMissingBean
public TransactionManagerCustomizers platformTransactionManagerCustomizers(
ObjectProvider<PlatformTransactionManagerCustomizer<?>> customizers) {
return new TransactionManagerCustomizers(customizers.orderedStream().collect(Collectors.toList()));
}
@Bean
@ConditionalOnMissingBean
@ConditionalOnSingleCandidate(ReactiveTransactionManager.class)
public TransactionalOperator transactionalOperator(ReactiveTransactionManager transactionManager) {
return TransactionalOperator.create(transactionManager);
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnSingleCandidate(PlatformTransactionManager.class)
public static class TransactionTemplateConfiguration {
@Bean
@ConditionalOnMissingBean(TransactionOperations.class)
public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) {
return new TransactionTemplate(transactionManager);
}
}
@Configuration(proxyBeanMethods = false)
@ConditionalOnBean(TransactionManager.class)
@ConditionalOnMissingBean(AbstractTransactionManagementConfiguration.class)
public static class EnableTransactionManagementConfiguration {
@Configuration(proxyBeanMethods = false)
@EnableTransactionManagement(proxyTargetClass = false)
@ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "false")
public static class JdkDynamicAutoProxyConfiguration {
}
/**
* 可以看出,SpringBoot 默认就启用了 CGLIB 代理,并添加了 @EnableTransactionManagement 注解,开启了事务支持。
*/
@Configuration(proxyBeanMethods = false)
@EnableTransactionManagement(proxyTargetClass = true)
@ConditionalOnProperty(prefix = "spring.aop", name = "proxy-target-class", havingValue = "true",
matchIfMissing = true)
public static class CglibAutoProxyConfiguration {
}
}
}
@EnableTransactionManagement
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(TransactionManagementConfigurationSelector.class)
public @interface EnableTransactionManagement {
boolean proxyTargetClass() default false;
AdviceMode mode() default AdviceMode.PROXY;
int order() default Ordered.LOWEST_PRECEDENCE;
}
TransactionManagementConfigurationSelector
主要导入了下面两个类,并实例化为 Spring Bean
- AutoProxyRegistrar.class 主要为了实现开启 AOP 并生成代理对象的功能(和缓存切面一模一样)。
- ProxyTransactionManagementConfiguration.class 主要用来处理事务相关的 AOP 的 Advisor, PointCut, Advice
public class TransactionManagementConfigurationSelector extends AdviceModeImportSelector<EnableTransactionManagement> {
@Override
protected String[] selectImports(AdviceMode adviceMode) {
switch (adviceMode) {
case PROXY:
return new String[] {AutoProxyRegistrar.class.getName(),
ProxyTransactionManagementConfiguration.class.getName()};
case ASPECTJ:
return new String[] {determineTransactionAspectClass()};
default:
return null;
}
}
private String determineTransactionAspectClass() {
return (ClassUtils.isPresent("javax.transaction.Transactional", getClass().getClassLoader()) ?
TransactionManagementConfigUtils.JTA_TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME :
TransactionManagementConfigUtils.TRANSACTION_ASPECT_CONFIGURATION_CLASS_NAME);
}
}
AutoProxyRegistrar
这个类主要为了实现开启 AOP 并生成代理对象的功能。
AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
主要是上面这一行,注册了类 InfrastructureAdvisorAutoProxyCreator.class。
这个类类似与 AnnotationAwareAspectJAutoProxyCreator 类(用于处理以 @AspectJ 注解形式开启 AOP)。参考 AopConfigUtils
二者都实现了 AbstractAdvisorAutoProxyCreator 类,而这个类主要是用来开启 AOP 并生成代理对象的。
AbstractAdvisorAutoProxyCreator 最终实现了 BeanPostProcessor。
public class AutoProxyRegistrar implements ImportBeanDefinitionRegistrar {
private final Log logger = LogFactory.getLog(getClass());
@Override
public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
boolean candidateFound = false;
Set<String> annTypes = importingClassMetadata.getAnnotationTypes();
for (String annType : annTypes) {
AnnotationAttributes candidate = AnnotationConfigUtils.attributesFor(importingClassMetadata, annType);
if (candidate == null) {
continue;
}
Object mode = candidate.get("mode");
Object proxyTargetClass = candidate.get("proxyTargetClass");
if (mode != null && proxyTargetClass != null && AdviceMode.class == mode.getClass() &&
Boolean.class == proxyTargetClass.getClass()) {
candidateFound = true;
if (mode == AdviceMode.PROXY) {
// 主要这一行,注册了类 InfrastructureAdvisorAutoProxyCreator.class
// 这个类类似与 AnnotationAwareAspectJAutoProxyCreator 类(用于处理以 @AspectJ 注解形式开启AOP)
// 二者都实现了 AbstractAdvisorAutoProxyCreator 类,而这个类主要是用来生成代理对象的。
// AbstractAdvisorAutoProxyCreator 最终实现了 BeanPostProcessor
AopConfigUtils.registerAutoProxyCreatorIfNecessary(registry);
if ((Boolean) proxyTargetClass) {
AopConfigUtils.forceAutoProxyCreatorToUseClassProxying(registry);
return;
}
}
}
}
// ......
}
}
ProxyTransactionManagementConfiguration
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyTransactionManagementConfiguration extends AbstractTransactionManagementConfiguration {
@Bean(name = TransactionManagementConfigUtils.TRANSACTION_ADVISOR_BEAN_NAME)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public BeanFactoryTransactionAttributeSourceAdvisor transactionAdvisor(
TransactionAttributeSource transactionAttributeSource, TransactionInterceptor transactionInterceptor) {
//缓存操作的Advisor,间接继承了 AbstractPointcutAdvisor,能返回指定的切点
// BeanFactoryTransactionAttributeSourceAdvisor 对象间接实现类 PointcutAdvisor 接口的 getPointcut 方法,
// 这个方法在 AOP 确定 advisor 能否在代理目标类上适用的时候会调用。具体在 AopUtils 里面
BeanFactoryTransactionAttributeSourceAdvisor advisor = new BeanFactoryTransactionAttributeSourceAdvisor();
//设置事务操作源,也就是切点源, 通过这个来设置 Advisor 中持有的 AOP 的 pointcut 属性
advisor.setTransactionAttributeSource(transactionAttributeSource);
//设置 advice,拦截器 TransactionInterceptor
advisor.setAdvice(transactionInterceptor);
if (this.enableTx != null) {
advisor.setOrder(this.enableTx.<Integer>getNumber("order"));
}
return advisor;
}
/**
* 其中包含 SpringTransactionAnnotationParser 类来解析和匹配 @Transactional 注解
* @return
*/
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionAttributeSource transactionAttributeSource() {
return new AnnotationTransactionAttributeSource();
}
@Bean
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public TransactionInterceptor transactionInterceptor(TransactionAttributeSource transactionAttributeSource) {
TransactionInterceptor interceptor = new TransactionInterceptor();
interceptor.setTransactionAttributeSource(transactionAttributeSource);
if (this.txManager != null) {
interceptor.setTransactionManager(this.txManager);
}
return interceptor;
}
}
BeanFactoryTransactionAttributeSourceAdvisor
AOP 中的 Advisor
├─BeanFactoryTransactionAttributeSourceAdvisor
│ ├─extends AbstractBeanFactoryPointcutAdvisor
│ │ ├─extends AbstractPointcutAdvisor implements BeanFactoryAware
│ │ │ ├─implements PointcutAdvisor, Ordered, Serializable
│ │ │ │ ├─PointcutAdvisor extends Advisor
AnnotationTransactionAttributeSource
AOP 中的方法注解扫描匹配
├─AnnotationTransactionAttributeSource
│ ├─ extends AbstractFallbackTransactionAttributeSource implements Serializable
│ │ ├─ implements TransactionAttributeSource, EmbeddedValueResolverAware
TransactionInterceptor
AOP 中的 Advice
├─TransactionInterceptor
│ ├─ extends TransactionAspectSupport implements MethodInterceptor, Serializable
│ │ ├─ MethodInterceptor extends Interceptor
│ │ │ ├─ Interceptor extends Advice
经过前面的分析已经知道,能够进入到这里的方法都是贴有 @Transactional 注解的方法。 在这里主要就是调用那些方法,获取到方法的结果然后,进入到父类 TransactionAspectSupport 中进一步处理。
public class TransactionInterceptor extends TransactionAspectSupport implements MethodInterceptor, Serializable {
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
@Override
@Nullable
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
@Override
public Object getTarget() {
return invocation.getThis();
}
@Override
public Object[] getArguments() {
return invocation.getArguments();
}
});
}
}
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional
// 获取事务属性类 AnnotationTransactionAttributeSource
TransactionAttributeSource tas = getTransactionAttributeSource();
// 获取方法上面有 @Transactional 注解的属性
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 获取事务管理器
final TransactionManager tm = determineTransactionManager(txAttr);
// ......
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)){
// Standard transaction demarcation with getTransaction and commit/rollback calls.
// 创建并开启事务
TransactionInfo txInfo=createTransactionIfNecessary(ptm,txAttr,joinpointIdentification);
Object retVal;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
// 事务的火炬传递
retVal = invocation.proceedWithInvocation();
} catch (Throwable ex) {
// target invocation exception
// 事务回滚
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
} finally {
cleanupTransactionInfo(txInfo);
}
if (retVal != null && vavrPresent && VavrDelegate.isVavrTry(retVal)) {
// Set rollback-only in case of Vavr failure matching our rollback rules...
TransactionStatus status = txInfo.getTransactionStatus();
if (status != null && txAttr != null) {
retVal = VavrDelegate.evaluateTryFailure(retVal, txAttr, status);
}
}
// 事务提交
commitTransactionAfterReturning(txInfo);
return retVal;
}
// ......
开启事务的创建
在理解 spring 事务的时候必须要牢牢记住几个概念: Connection 连接、事务、和用户会话
我们在看了 jdbc 代码以后可以清楚地知道,事务就是由 connection 对象控制的,所以 connection 对象是和事务是绑定的,然后用户请求过来时又需要数据库连接来执行 sql 语句, 所以用户请求线程又是跟 connection 是绑定的。这点一定要弄清楚。
在 spring 中事务传播属性有如下几种:
- PROPAGATION_REQUIRED 如果当前没有事务,就新建一个事务,如果已经存在一个事务中,加入到这个事务中。这是最常见的选择。
- PROPAGATION_SUPPORTS 支持当前事务,如果当前没有事务,就以非事务方式执行。
- PROPAGATION_MANDATORY 使用当前的事务,如果当前没有事务,就抛出异常。
- PROPAGATION_REQUIRES_NEW 新建事务,如果当前存在事务,把当前事务挂起。
- PROPAGATION_NOT_SUPPORTED 以非事务方式执行操作,如果当前存在事务,就把当前事务挂起。
- PROPAGATION_NEVER 以非事务方式执行,如果当前存在事务,则抛出异常。
- PROPAGATION_NESTED 如果当前存在事务,则在嵌套事务内执行。如果当前没有事务,则执行与 PROPAGATION_REQUIRED 类似的操作。
其中最常见的,用得最多就 PROPAGATION_REQUIRED、PROPAGATION_REQUIRES_NEW、PROPAGATION_NESTED 这三种。
事务的传播属性是 spring 特有的,是 spring 用来控制方法事务的一种手段,说直白点就是用来控制方法是否使用同一事务的一种属性,以及按照什么规则回滚的一种手段。 如果事务注解和事务管理器都已经获取到,那么就开启事务了,具体如下:
// 主要看上面代码段中的这一行:
TransactionInfo txInfo=createTransactionIfNecessary(ptm,txAttr,joinpointIdentification);
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
// 开启事务,重点看这里
status = tm.getTransaction(txAttr);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("Skipping transactional joinpoint [" + joinpointIdentification +
"] because no transaction manager has been configured");
}
}
}
// 创建事务信息对象,记录新老事务信息对象
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 这里重点看。.DataSourceTransactionObject 拿到对象
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 第一次进来 ConnectionHolder 肯定为空,所以不存在事务
// 这个方法主要判断 ConnectionHolder 是否为空,并且连接 connectionHolder 状态是活跃的
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(def, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
// 第一次进来,大部分会走这里(三种不同的事务传播属性)
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 先挂起
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
// 开启事务,重点。代码看后面的章节
return startTransaction(def, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// ......
}
}
@Override
protected Object doGetTransaction() {
// 管理 Connection 对象,创建回滚点,按照回滚点回滚,释放回滚点
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
// DataSourceTransactionManager 默认是允许嵌套事务的
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// obtainDataSource() 获取数据源对象
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(obtainDataSource());
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
TransactionSynchronizationManager.getResource(obtainDataSource()) 这个方法创建事务对象,事务对象往往需要跟连接挂钩, 所以里面肯定会有连接对象 ConnectionHolder,在这个方法里面会首先从 ThreadLocal 中获取连接对象,如下所示:
@Nullable
public static Object getResource(Object key) {
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
return doGetResource(actualKey);
}
@Nullable
private static Object doGetResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.get(actualKey);
// Transparently remove ResourceHolder that was marked as void...
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
map.remove(actualKey);
// Remove entire ThreadLocal if empty...
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
这个 ThreadLocal 中 value 封装了一个 map,map 是数据源对象 DataSource 和连接对象的映射关系,也就是说,如果上一个事务中建立了映 射关系, 下一个事务就可以通过当前线程从 ThreadLocal 中获取到这个 map,这是针对两个事务使用同一个线程,嵌套事务这种, 然后根据当前使用的数据源对象对应的 actualKey 拿到对应的连接对象,然后设置到事务对象中。
startTransaction(def, transaction, debugEnabled, suspendedResources);
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction,
boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 创建事务状态
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 开启事务,重点
doBegin(transaction, definition);
// 开启事务后,改变事务状态
prepareSynchronization(status, definition);
return status;
}
该代码创建事务状态对象,事务状态就是记录事务流转过程中状态数据的,有一个数据非常重要,直接决定了提交,回滚和恢复绑定操作,就是 newTransaction 属性,这个属性要牢记。 如果为 true 就代表当前事务允许单独提交和回滚,一般是第一次创建事务或者事务传播属性为 PROPAGATION_REQUIRES_NEW 的时候。 如果为 false 则当前事务不能单独提交和回滚。
doBegin(transaction, definition);
该方法则是开启事务的核心方法
@Override
protected void doBegin(Object transaction, TransactionDefinition definition) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
Connection con = null;
try {
// 如果没有数据库连接
if (!txObject.hasConnectionHolder() ||
txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
// 从连接池中获取一个连接对象
Connection newCon = obtainDataSource().getConnection();
if (logger.isDebugEnabled()) {
logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
}
// 包装成 ConnectionHolder 对象, 然后设置到事务对象中
txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
}
txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
con = txObject.getConnectionHolder().getConnection();
// 从数据库连接中获取隔离级别
Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
txObject.setPreviousIsolationLevel(previousIsolationLevel);
txObject.setReadOnly(definition.isReadOnly());
// Switch to manual commit if necessary. This is very expensive in some JDBC drivers,
// so we don't want to do it unnecessarily (for example if we've explicitly
// configured the connection pool to set it already).
if (con.getAutoCommit()) {
txObject.setMustRestoreAutoCommit(true);
if (logger.isDebugEnabled()) {
logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
}
// 关闭连接的自动提交,即开启了事务
con.setAutoCommit(false);
}
/**
* 建立 ThreadLocal 的绑定关系这个绑定关系就是前面提到的,从绑定关系中可以拿到 map,map 中建立的是数据源对象和连接对象的映射,
* 此处的绑定关系为当前线程和 Map 对象,因为 ThreadLocal 是针对当前线程的副本。
*
* 设置只读事务。从这一点设置的时间点开始,到这个事务结束的过程中,其他事务提交的事务,该事务不可见。
* 设置只读事务就是告诉数据库,我这个事务没有新增,修改,删除操作,只有查询操作,不需要数据库锁库等操作,减少数据库的压力。
*/
prepareTransactionalConnection(con, definition);
// 上面关闭了自动提交,开启了事务,那么就需要把事务状态改为活动的。用于后面的操作来判断是否已经存在事务。
txObject.getConnectionHolder().setTransactionActive(true);
int timeout = determineTimeout(definition);
if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
}
// Bind the connection holder to the thread.
// 如果是新创建的事务,则建立当前线程和数据库连接的关系
if (txObject.isNewConnectionHolder()) {
TransactionSynchronizationManager.bindResource(obtainDataSource(), txObject.getConnectionHolder());
}
}
}
到这里,doBegin 基本结束了,事务已经开启了。
getConnection()
getConnection() 方法找到子类 AbstractRoutingDataSource 中的 getConnection 方法然后从 determineTargetDataSource()方法点进去, 这块从数据源获取到连接对象,有一个数据源要提一下,就是动态切换数据源的一个抽象类:AbstractRoutingDataSource,如果有多数据源的需求, 可以自己定义数据源对象然后往自定义对象中设置目标数据源和默认数据源,通过切面控制动态切换逻辑如下:
public abstract class AbstractRoutingDataSource extends AbstractDataSource implements InitializingBean {
@Override
public Connection getConnection() throws SQLException {
return determineTargetDataSource().getConnection();
}
protected DataSource determineTargetDataSource() {
Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
// 切面数据源使用的 key,是一个钩子函数,动态数据源的切换参考其他文章单独说明
Object lookupKey = determineCurrentLookupKey();
DataSource dataSource = this.resolvedDataSources.get(lookupKey);
if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
dataSource = this.resolvedDefaultDataSource;
}
if (dataSource == null) {
throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
}
return dataSource;
}
}
存在事务传播的代码
事务已经开启后,接下来我们重点看一下存在事务传播的代码。回到这个方法,主要看这一行:
return handleExistingTransaction(def, transaction, debugEnabled);
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
// Use defaults if no transaction definition given.
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 这里重点看。.DataSourceTransactionObject 拿到对象
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
// 第一次进来 ConnectionHolder 肯定为空,所以不存在事务
// 这个方法主要判断 ConnectionHolder 是否为空,并且连接 connectionHolder 状态是活跃的
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
// 如果已经存在事务,就会走这里
return handleExistingTransaction(def, transaction, debugEnabled);
}
}
接下来我们看一下 handleExistingTransaction 这个方法,重点看一下这两种:PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED。
private TransactionStatus handleExistingTransaction(
TransactionDefinition definition, Object transaction, boolean debugEnabled)
throws TransactionException {
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
if (debugEnabled) {
logger.debug("Suspending current transaction");
}
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
if (debugEnabled) {
logger.debug("Suspending current transaction, creating new transaction with name [" +
definition.getName() + "]");
}
// 挂起当前事务 PROPAGATION_REQUIRES_NEW(开启新事物)
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
/**
* 参考上面的代码段。可以发现:
* 参数 newTransaction = true, doBegin() 默认开启一个新事物.
*/
return startTransaction(definition, transaction, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error beginEx) {
resumeAfterBeginException(transaction, suspendedResources, beginEx);
throw beginEx;
}
}
// .PROPAGATION_NESTED 嵌套事务,其实就是按照回滚点来回滚事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException(
"Transaction manager does not allow nested transactions by default - " +
"specify 'nestedTransactionAllowed' property with value 'true'");
}
if (debugEnabled) {
logger.debug("Creating nested transaction with name [" + definition.getName() + "]");
}
if (useSavepointForNestedTransaction()) {
// Create savepoint within existing Spring-managed transaction,
// through the SavepointManager API implemented by TransactionStatus.
// Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization.
DefaultTransactionStatus status =
prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
status.createAndHoldSavepoint();
return status;
}
else {
// Nested transaction through nested begin and commit/rollback calls.
// Usually only for JTA: Spring synchronization might get activated here
// in case of a pre-existing JTA transaction.
return startTransaction(definition, transaction, debugEnabled, null);
}
}
// Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED.
if (debugEnabled) {
logger.debug("Participating in existing transaction");
}
if (isValidateExistingTransaction()) {
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] specifies isolation level which is incompatible with existing transaction: " +
(currentIsolationLevel != null ?
isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) :
"(unknown)"));
}
}
if (!definition.isReadOnly()) {
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" +
definition + "] is not marked as read-only but existing transaction is");
}
}
}
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
SuspendedResourcesHolder suspendedResources = suspend(transaction);
这个代码里面,判断了第一个事务下面的事务特性,根据传播特性来控制事务这个事务传播属性意思是,如果当前没事务创建事务, 如果当前有事务则把当前事务挂起并创建新的事务,反正就是要用自己创建的事务,接下来看下如何挂起事务,看下方法 suspend, 这个方法的作用呢,很简单其实就是把事务对象中的连接对象设置为 null,并且解除 ThreadLocal 的绑定关系,如下图所示:
protected final SuspendedResourcesHolder suspend(@Nullable Object transaction) throws TransactionException {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
try {
Object suspendedResources = null;
if (transaction != null) {
suspendedResources = doSuspend(transaction);
}
}
}
}
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
/**
* 其实就是从 threadLocal 中移除
* 删除旧的连接对象并且返回旧的连接对象,这个一定要注意,因为会根据这个返回的连接对象,在该事务提交的时候重新的建立绑定关系的。
*/
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}