// 事务相关拦截器
class TransactionInterceptor {
public TransactionInterceptor(PlatformTransactionManager ptm, TransactionAttributeSource tas) {
// 设置事务管理器,默认为空
setTransactionManager(ptm);
// 设置解析到的@Transactional注解信息
setTransactionAttributeSource(tas);
}
public Object invoke(MethodInvocation invocation) throws Throwable {
// 获取到目标类对象,如果target不为null,获取target的类对象
Class<?> targetClass = (invocation.getThis() {
return this.target;
}!=null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// 使用事务执行目标方法,invocation::proceed传递的是一个lambda函数
return invokeWithinTransaction(invocation.getMethod(), targetClass, invocation::proceed) {
// 获取事务属性源,如果有事务,tas就不为空,上面配置的时候就已经设置了TransactionAttributeSource
TransactionAttributeSource tas = getTransactionAttributeSource();
// 获取已经解析了@Transactional注解事务属性,上面有getTransactionAttribute的详细源码讲解,并且预先缓存了
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
// 确定事务管理器
final TransactionManager tm = determineTransactionManager(txAttr);
{
// 如果没有事务属性,表示没有解析到@Transactional注解,就没有事务
if (txAttr == null || this.beanFactory == null) {
// 如果TransactionInterceptor有设置就有,没有设置就没有
return getTransactionManager();
{
return this.transactionManager;
}
}
// 获取transactionManager的BeanName,就是@Transactional注解的value值
// @AliasFor("transactionManager")
String qualifier = txAttr.getQualifier();
// 如果执行了transactionManager的BeanName
if (StringUtils.hasText(qualifier)) {
// 从Spring中获取指定名称的transactionManager事务管理器
return determineQualifiedTransactionManager(this.beanFactory, qualifier);
{
// 先看一下之前是否缓存过
TransactionManager txManager = this.transactionManagerCache.get(qualifier);
// 未被缓存
if (txManager == null) {
// 从Spring中获取
txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType(beanFactory, TransactionManager.class, qualifier);
// 缓存起来
this.transactionManagerCache.putIfAbsent(qualifier, txManager);
}
return txManager;
}
}
// 如果单独调用了setTransactionManagerBeanName,指定要transactionManagerBeanName的事务管理器
else if (StringUtils.hasText(this.transactionManagerBeanName)) {
// 和上面一样,就根据自己设置的beanName获取对应的txManager
return determineQualifiedTransactionManager(this.beanFactory, this.transactionManagerBeanName);
} else {
// 默认情况,都没设置beanName,先获取当前类的TransactionManager,如果设置了就有,否则就没
// 如果没有,那就根据类型去Spring中获取,上面都是根据beanName获取
TransactionManager defaultTransactionManager = getTransactionManager();
// 如果没有设置
if (defaultTransactionManager == null) {
// 查看是否被缓存过事务管理器
defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY);
// 双重校验
if (defaultTransactionManager == null) {
// 从Spring中获取TransactionManager类型的事务管理器
defaultTransactionManager = this.beanFactory.getBean(TransactionManager.class);
// 缓存事务管理器
this.transactionManagerCache.putIfAbsent(DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager);
}
}
return defaultTransactionManager;
}
}
// 如果事务管理器是响应式的ReactiveTransactionManager类型
if (this.reactiveAdapterRegistry != null && tm instanceof ReactiveTransactionManager) {
ReactiveTransactionSupport txSupport = this.transactionSupportCache.computeIfAbsent(method, key -> {
// 获取对应方法类型的适配器
ReactiveAdapter adapter = this.reactiveAdapterRegistry.getAdapter(method.getReturnType());
if (adapter == null) {
throw new IllegalStateException("Cannot apply reactive transaction to non-reactive return type: " + method.getReturnType());
}
// 将ReactiveTransactionSupport缓存到transactionSupportCache的map中
return new ReactiveTransactionSupport(adapter);
});
// ReactiveTransactionSupport的invokeWithinTransaction方法,里面就是一些响应式的逻辑,例如Mono...
return txSupport.invokeWithinTransaction(method, targetClass, invocation, txAttr, (ReactiveTransactionManager) tm);
}
// 将事务管理器转换成PlatformTransactionManager类型
// 因为事务管理器分为两大类,一类是响应式的ReactiveTransactionManager,一种是PlatformTransactionManager
// ReactiveTransactionManager响应式的上面单独处理,到了这里,事务管理器必然是PlatformTransactionManager类型
// 如果不是,就要抛出异常了
PlatformTransactionManager ptm = asPlatformTransactionManager(tm);
{
// 不是PlatformTransactionManager类型的事务管理器要抛出异常
if (transactionManager == null || transactionManager instanceof PlatformTransactionManager) {
return (PlatformTransactionManager) transactionManager;
}
throw new IllegalStateException("Specified transaction manager is not a PlatformTransactionManager: " + transactionManager);
}
// 获取方法的名称
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
{
// 方法识别,默认返回null
String methodIdentification = methodIdentification(method, targetClass);
if (methodIdentification == null) {
// 如果事务属性是DefaultTransactionAttribute类型,需要获取方法签名
// 例如:luck.spring.tx.Demo$TransactionalConfig.test
// 对应着解析注解的时候设置的代码
// String methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
// if (txAttr instanceof DefaultTransactionAttribute) {
// ((DefaultTransactionAttribute) txAttr).setDescriptor(methodIdentification);
// }
if (txAttr instanceof DefaultTransactionAttribute) {
methodIdentification = ((DefaultTransactionAttribute) txAttr).getDescriptor();
}
// 如果是其他类型事务属性
if (methodIdentification == null) {
// 通过类名.方法名拼接
methodIdentification = ClassUtils.getQualifiedMethodName(method, targetClass);
{
return (clazz != null ? clazz : method.getDeclaringClass()).getName() + '.' + method.getName();
}
}
}
return methodIdentification;
}
// 如果事务属性为null,或者事务管理器的类型不是CallbackPreferringPlatformTransactionManager类型
if (txAttr == null || !(ptm instanceof CallbackPreferringPlatformTransactionManager)) {
// 创建事务,如果有必要
TransactionInfo txInfo = createTransactionIfNecessary(ptm, txAttr, joinpointIdentification);
{
// 如果事务属性没有名称,使用方法签名做为名称
if (txAttr != null && txAttr.getName() == null) {
// 代理一下原有的事务属性,添加事务名称
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
// 事务状态
TransactionStatus status = null;
// 如果存在事务属性,并且存在事务管理器
if (txAttr != null && tm != null) {
// 获取一个事务
status = tm.getTransaction(txAttr);
{
// 如果没有给出事务定义,则使用默认值,txAttr = TransactionAttribute事务属性就是TransactionDefinition
// 所以有事务定义信息
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
// 创建事务,这里我们配置的是JpaTransactionManager,不同的事务管理器配置事务的方式不同
Object transaction = doGetTransaction();
{
// 创建数据源事务对象
JpaTransactionObject txObject = new JpaTransactionObject();
// 设置此事务中是否允许保存点,如果支持嵌套事务,就允许有保存点,到时候可以通过这个回滚到指定的点位
txObject.setSavepointAllowed(isNestedTransactionAllowed());
// 获取EntityManagerFactory,使用TransactionSynchronizationManager事务同步管理器
// 检索绑定到当前线程的给定Key的资源EntityManager
// @param key要检查的键(通常是资源工厂)
// 返回一个绑定到当前线程的值(通常是活动的资源对象),如果没有则返回{@code null}
EntityManagerHolder emHolder = (EntityManagerHolder) TransactionSynchronizationManager.getResource(obtainEntityManagerFactory())
{
// 通过获取数据源的真实类型
Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);
// 获取资源值
Object value = doGetResource(actualKey);
{
// 获取到线程的所有资源
Map<Object, Object> map = resources.get();
// 如果没有,返回null
if (map == null) {
return null;
}
// 如果存在某些线程的资源
// 则根据数据源类型,获取当前线程绑定的值
Object value = map.get(actualKey);
// 如果缓存的值为ResourceHolder类型,并且hodler是空的
if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) {
// 异常这个Key
map.remove(actualKey);
// 所有线程的值都为空,删除整个ThreadLocal
if (map.isEmpty()) {
resources.remove();
}
value = null;
}
return value;
}
return value;
}
// 如果当前线程已经绑定了EntityManager
if (emHolder != null) {
// 将链接的Hodler设置都事务对象中,第二个参数表示:是否是新的EntityManager
txObject.setEntityManagerHolder(emHolder, false);
}
// 如果数据源不为空
if (getDataSource() != null) {
// 获取数据源,使用TransactionSynchronizationManager事务同步管理器
// 检索绑定到当前线程的给定Key的资源
// @param key要检查的键(通常是资源工厂)
// 返回一个绑定到当前线程的值(通常是活动的资源对象),如果没有则返回{@code null}
ConnectionHolder conHolder = (ConnectionHolder)TransactionSynchronizationManager.getResource(getDataSource());
// 设置JDBC连接,无论有没有绑定到当前线程
txObject.setConnectionHolder(conHolder);
}
// 返回事务对象
return txObject;
}
boolean debugEnabled = logger.isDebugEnabled();
// 判断事务是否存在
if (isExistingTransaction(transaction) {
return ((JpaTransactionObject) transaction).hasTransaction(){
return (this.entityManagerHolder != null && this.entityManagerHolder.isTransactionActive());
}
})
{
// 处理已存在的事务
return handleExistingTransaction(def, transaction, debugEnabled);
{
// 如果存在事务,并且传播行为设置不允许事务,抛出异常
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException("Existing transaction found for transaction marked with propagation 'never'");
}
// 如果传播行为为不支持事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
// 暂停已存在的事务
Object suspendedResources = suspend(transaction);
{
// 事务同步管理器是否处于同步状态,就是同步状态信息在当前线程TransactionSynchronization存不存在
// 存在,表示现在事务处于同步状态
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 开始暂停事务
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
{
// 获取当前线程所有的同步回调信息
List<TransactionSynchronization> suspendedSynchronizations = TransactionSynchronizationManager.getSynchronizations();
// 遍历所有的事务同步回调信息
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
// 调用对象的暂停方法
synchronization.suspend();
{
// 解绑事务管理器中绑定当前线程的资源
TransactionSynchronizationManager.unbindResource(this.resourceKey);
}
}
// 清空当前线程的事务同步回调信息
TransactionSynchronizationManager.clearSynchronization();
// 返回暂停的事务同步回调信息
return suspendedSynchronizations;
}
try {
// 需要暂停的资源
Object suspendedResources = null;
// 如果存在事务对象
if (transaction != null) {
// 开始暂停
suspendedResources = doSuspend(transaction);
{
// 解绑存在当前线程中的EntityManager
JpaTransactionObject txObject = (JpaTransactionObject) transaction;
txObject.setEntityManagerHolder(null, false);
EntityManagerHolder entityManagerHolder = (EntityManagerHolder)TransactionSynchronizationManager.unbindResource(obtainEntityManagerFactory());
// 解绑存在当前线程中的数据库链接
txObject.setConnectionHolder(null);
ConnectionHolder connectionHolder = null;
if (getDataSource() != null && TransactionSynchronizationManager.hasResource(getDataSource())) {
connectionHolder = (ConnectionHolder) TransactionSynchronizationManager.unbindResource(getDataSource());
}
// 返回暂停的资源信息
return new SuspendedResourcesHolder(entityManagerHolder, connectionHolder);
}
}
// 获取原有的事务名称
String name = TransactionSynchronizationManager.getCurrentTransactionName();
// 设置当前事务名称为null
TransactionSynchronizationManager.setCurrentTransactionName(null);
// 当前原有的事务是否仅读
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
// 设置当前可以读写
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
// 获取原有的隔离级别
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
// 设置当前事务的隔离界别为null
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
// 获取是否有实际活动的事务
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
// 设置为false
TransactionSynchronizationManager.setActualTransactionActive(false);
// 保存暂停之前的配置信息返回
return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
} catch (RuntimeException | Error ex) {
// 恢复事务
doResumeSynchronization(suspendedSynchronizations);
{
// 初始化同步信息
TransactionSynchronizationManager.initSynchronization();
{
// 如果事务处于活跃的,表示出问题了,因为现在是在恢复暂停的事务
if (isSynchronizationActive()) {
throw new IllegalStateException("Cannot activate transaction synchronization - already active");
}
synchronizations.set(new LinkedHashSet<>());
}
// 遍历所有暂停的事务
for (TransactionSynchronization synchronization : suspendedSynchronizations) {
// 调用事务同步信息的恢复方法
synchronization.resume();
{
// 绑定当前资源到事务同步管理器中
TransactionSynchronizationManager.bindResource(this.resourceKey, this.resourceHolder);
}
// 注册事务同步信息,将当前事务信息绑定到当前线程
TransactionSynchronizationManager.registerSynchronization(synchronization);
{
Set<TransactionSynchronization> synchs = synchronizations.get();
if (synchs == null) {
throw new IllegalStateException("Transaction synchronization is not active");
}
synchs.add(synchronization);
}
}
}
throw ex;
}
}
// 事务处于非同步状态,但是事务对象不为空
else if (transaction != null) {
// 暂停事务,上面有讲过
Object suspendedResources = doSuspend(transaction);
// 返回SuspendedResourcesHolder对象
return new SuspendedResourcesHolder(suspendedResources);
} else {
// 其他情况,返回null
return null;
}
}
// 判断是否是新的事务,当前事务标识是否是默认的SYNCHRONIZATION_ALWAYS,如果是表示是新的事务
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 预处理事务状态
return prepareTransactionStatus(definition, null, false, newSynchronization, debugEnabled, suspendedResources);
{
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, newTransaction, newSynchronization, debug, suspendedResources);
{
// 是否是一个新的事务,新创建的,并且没有进行任何同步信息
boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive();
// 封装成DefaultTransactionStatus对象
return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources);
}
// 预处理同步信息
prepareSynchronization(status, definition);
{
// 是否是新实物
if (status.isNewSynchronization()) {
// 设置真实的事务活动状态,如果存在事务对象,就表示有事务,那么事务的真实活动状态就为true
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
// 设置当前的隔离级别
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null);
// 设置当前事务是否只读
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
// 设置当前事务的名称
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
// 初始化事务信息,上面有同样的代码解析
TransactionSynchronizationManager.initSynchronization();
}
}
return status;
}
}
// 如果传播行为为开启新事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
// 暂停当前事务,上面有这个方法的代码讲解
SuspendedResourcesHolder suspendedResources = suspend(transaction);
try {
// 判断当前是否是新事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 封装成事务状态对象
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
// 开启事务,有讲过,搜索doBegin(transaction, def)
doBegin(transaction, definition);
// 预处理事务,上面有该方法的详解,搜prepareSynchronization(status, def)
prepareSynchronization(status, definition);
// 返回创建的事务状态对象
return status;
} catch (RuntimeException | Error beginEx) {
// 内部事务开始失败后恢复外部事务
resumeAfterBeginException(transaction, suspendedResources, beginEx);
{
resume(transaction, suspendedResources);
{
// 如果前外层事务暂停成功,就会返回外层事务的resourcesHolder
if (resourcesHolder != null) {
// 获取暂停的资源值
Object suspendedResources = resourcesHolder.suspendedResources;
// 如果绑定了资源值
if (suspendedResources != null) {
// 恢复事务,将暂停的时候保存的资源继续绑定进去
doResume(transaction, suspendedResources);
{
// 继续绑定当前资源
TransactionSynchronizationManager.bindResource(obtainDataSource(), suspendedResources);
}
}
// 获取到被暂停的所有事务信息,因为一个业务,一个线程可能开启了N多事务
List<TransactionSynchronization> suspendedSynchronizations = resourcesHolder.suspendedSynchronizations;
// 如果存在暂停的事务
if (suspendedSynchronizations != null) {
// 将暂停时保存的资源进行一一恢复
TransactionSynchronizationManager.setActualTransactionActive(resourcesHolder.wasActive);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(resourcesHolder.isolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(resourcesHolder.readOnly);
TransactionSynchronizationManager.setCurrentTransactionName(resourcesHolder.name);
// 恢复所有的事务,上面有这个方法的详解,遍历所有事务信息,一一调用事务的恢复方法
doResumeSynchronization(suspendedSynchronizations);
}
}
}
}
throw beginEx;
}
}
// 如果传播行为为嵌套事务
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 如果不支持嵌套事务,抛出异常
if (!isNestedTransactionAllowed()) {
throw new NestedTransactionNotSupportedException();
}
// 判断对嵌套事务是否使用保存点,默认返回true
if (useSavepointForNestedTransaction()) {
// 预处理事务状态,上面有讲解
DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null);
// 给当前状态创建一个保存点,并保存到事务状态中
status.createAndHoldSavepoint();
{
SavepointManager sm = getSavepointManager();
{
Object transaction = this.transaction;
// 如果事务对象,如果存在保存点的事务对象就必须是SavepointManager类型
if (!(transaction instanceof SavepointManager)) {
throw new NestedTransactionNotSupportedException("Transaction object [" + this.transaction + "] does not support savepoints");
}
return (SavepointManager) transaction;
}
// 创建保存点
Object sp = sm.createSavepoint();
{
// 如果当前事务已经标记了仅回滚,没必要往下执行
if (getEntityManagerHolder().isRollbackOnly()) {
throw new CannotCreateTransactionException("Cannot create savepoint for transaction which is already marked as rollback-only");
}
// 搜索txObject.setTransactionData(transactionData);
return getSavepointManager(){
// 从EntityManagerHolder获取保存点管理器,在txObject.setTransactionData(transactionData)会判断
// 如果transactionData为SavepointManager,会给EntityManagerHolder设置,如果不是(Hibernate就不是),则不会设置
SavepointManager savepointManager = getEntityManagerHolder().getSavepointManager();
// 如果没有设置保存点管理器
if (savepointManager == null) {
// 抛出不支持事务异常
throw new NestedTransactionNotSupportedException("JpaDialect does not support savepoints - check your JPA provider's capabilities");
}
return savepointManager;
}
// 如果支持,使用保存点管理器创建保存点
.createSavepoint();
}
// 设置保存点信息
setSavepoint(sp);
}
return status;
} else {
// 其他的传播行为
// 是否是新事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 封装成事务状态对象
DefaultTransactionStatus status = newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, null);
// 开启事务,上面有讲解
doBegin(transaction, definition);
// 预处理事务同步信息
prepareSynchronization(status, definition);
return status;
}
}
// 是否要验证已存在的事务信息,默认为false
if (isValidateExistingTransaction()) {
// 如果隔离级别不为为默认值
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
// 获取当前事务的隔离级别
Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
// 如果当前隔离级别为null,获取当前隔离级别不等于定义的隔离级别,表示在别的地方改了隔离级别,导致隔离级别不同步
if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) {
// 抛出异常
Constants isoConstants = DefaultTransactionDefinition.constants;
throw new IllegalTransactionStateException("Participating transaction with definition");
}
}
// 如果事务不是只读的
if (!definition.isReadOnly()) {
// 但是当前事务同步管理器保存当前事务的信息为只读,表示只读也被暗改了,抛出异常
if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) {
throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is");
}
}
}
// 判断是否是新的事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 预处理事务信息,上面有讲解
return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);
}
}
// 检查新事务的定义设置,如果定义的超时时间,小于默认的超时时间(-1),表示时间是无效的
if (def.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
// 如果传播行为为强制要有事务,如果没有事务抛出异常\
// 上面如果当前已存在事务,在上面就会按照已存在事务的逻辑处理
// 到了之前,表示之前没有存在的饰物,就要报错,PROPAGATION_MANDATORY表示要运行在已存在的事务中
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException();
}
// 如果传播行为为下面几种,表示需要开启事务
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
// 暂停当前事务,上面有讲解该方法,并且返回暂停之前的事务信息
SuspendedResourcesHolder suspendedResources = suspend(null){
// 当前同步信息是否激活,进行了初始化
// 之前调用过prepareSynchronization方法就会进行激活
// 如果之前存在事务,那么一定被激活过,除非手动重置了
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 获取到当前线程绑定的所有TransactionSynchronization,回调接口的suspend暂停方法
List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();
Object suspendedResources = null;
if (transaction != null) {
// 如果存在事务,将事务暂停
suspendedResources = doSuspend(transaction);
{
// 解绑存在当前线程中的EntityManager
JpaTransactionObject txObject = (JpaTransactionObject) transaction;
txObject.setEntityManagerHolder(null, false);
EntityManagerHolder entityManagerHolder = (EntityManagerHolder)TransactionSynchronizationManager.unbindResource(obtainEntityManagerFactory());
// 解绑存在当前线程中的数据库链接
txObject.setConnectionHolder(null);
ConnectionHolder connectionHolder = null;
if (getDataSource() != null && TransactionSynchronizationManager.hasResource(getDataSource())) {
connectionHolder = (ConnectionHolder) TransactionSynchronizationManager.unbindResource(getDataSource());
}
// 返回暂停的资源信息
return new SuspendedResourcesHolder(entityManagerHolder, connectionHolder);
}
}
// 保存暂停之前的配置
String name = TransactionSynchronizationManager.getCurrentTransactionName();
TransactionSynchronizationManager.setCurrentTransactionName(null);
boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly();
TransactionSynchronizationManager.setCurrentTransactionReadOnly(false);
Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel();
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null);
boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive();
TransactionSynchronizationManager.setActualTransactionActive(false);
// 封装成暂停的资源对象
return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive);
}
// 如果事务获取到了,但是设置活跃标识
if (transaction != null) {
// 将事务暂停
Object suspendedResources = doSuspend(transaction);
// 保存暂停的资源信息
return new SuspendedResourcesHolder(suspendedResources);
}
else {
// 不存在事务,返回null
return null;
}
}
try {
// 下面逻辑在上面都有详细解析
// 是否是新事务
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
// 封装成事务状态
DefaultTransactionStatus status = newTransactionStatus(def, transaction, true, newSynchronization, debugEnabled, suspendedResources){
// 是否获取到真实的新事务对象
boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive();
// 封装成状态,保存当前事务的以及暂停事务的所有信息
return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization,definition.isReadOnly(), debug, suspendedResources);
}
// 开始事务
doBegin(transaction, def){
// 事务对象
JpaTransactionObject txObject = (JpaTransactionObject) transaction;
// 如果存在数据库连接,并且事务没有进行同步,抛出异常
if (txObject.hasConnectionHolder() && !txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
throw new IllegalTransactionStateException(
"Pre-bound JDBC Connection found! JpaTransactionManager does not support " +
"running within DataSourceTransactionManager if told to manage the DataSource itself. " +
"It is recommended to use a single JpaTransactionManager for all transactions " +
"on a single DataSource, no matter whether JPA or JDBC access.");
}
try {
// 如果没有在当前线程绑定EntityManager,或者事务同步标识为true,默认为false,在开始处理事务的时候,会将标识设置true
if (!txObject.hasEntityManagerHolder() || txObject.getEntityManagerHolder().isSynchronizedWithTransaction()) {
// 创建一个支持事务的EntityManager,Spring容器的EntityManager是共享的,不支持事务
EntityManager newEm = createEntityManagerForTransaction(){
// 通过EntityManagerFactory创建
EntityManagerFactory emf = obtainEntityManagerFactory();
if (emf instanceof EntityManagerFactoryInfo) {
emf = ((EntityManagerFactoryInfo) emf).getNativeEntityManagerFactory();
}
Map<String, Object> properties = getJpaPropertyMap();
return (!CollectionUtils.isEmpty(properties) ? emf.createEntityManager(properties) : emf.createEntityManager());
}
// 设置到事务对象中
txObject.setEntityManagerHolder(new EntityManagerHolder(newEm), true);
}
// 获取EntityManager
EntityManager em = txObject.getEntityManagerHolder().getEntityManager();
// 设置超时时间,默认为-1,不超时
final int timeoutToUse = determineTimeout(definition);
// 调用JPA的不同供应商对应的开启事务方法
Object transactionData = getJpaDialect().beginTransaction(em,new JpaTransactionDefinition(definition, timeoutToUse, txObject.isNewEntityManagerHolder())){
// 这里是Hibernate实现,将entityManager拆包为具体类型
// Session就是entityManager实现
Session session = getSession(entityManager);
// 设置超时事件
if (definition.getTimeout() != TransactionDefinition.TIMEOUT_DEFAULT) {
session.getTransaction().setTimeout(definition.getTimeout());
}
// 设置隔离级别
boolean isolationLevelNeeded = (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT);
Integer previousIsolationLevel = null;
Connection preparedCon = null;
// 如果需要设置隔离级别,并且事务标识只读
if (isolationLevelNeeded || definition.isReadOnly()) {
// 是否要提前处理JDBC连接,默认为true
if (this.prepareConnection) {
// 获取JDBC连接
preparedCon = HibernateConnectionHandle.doGetConnection(session){
Method methodToUse = connectionMethodToUse;
if (methodToUse == null) {
// 在Hibernate 4.x/5.x上查找SessionImpl的connection()方法的反射查找
methodToUse = session.getClass().getMethod("connection");
// 保存connection方法,connection方法不在session,而在SessionImpl中,从org.hibernate.engine.spi.SharedSessionContractImplementor来的
connectionMethodToUse = methodToUse;
}
}
// 给JDBC连接设置隔离级别
previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(preparedCon, definition){
// 当前事务是否只读
if (definition != null && definition.isReadOnly()) {
try {
// 设置到连接中
con.setReadOnly(true);
} catch (SQLException | RuntimeException ex) {
Throwable exToCheck = ex;
while (exToCheck != null) {
// 只有出现超时异常,才将异常抛出
// 其他的异常都不管
if (exToCheck.getClass().getSimpleName().contains("Timeout")) {
throw ex;
}
exToCheck = exToCheck.getCause();
}
}
}
// 之前的隔离级别
Integer previousIsolationLevel = null;
// 隔离级别不为默认值
if (definition != null && definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) {
// 获取连接对应的原始隔离级别
int currentIsolation = con.getTransactionIsolation();
// 如果原来的隔离级别和新的隔离级别不一样
if (currentIsolation != definition.getIsolationLevel()) {
// 保存原有的隔离级别
// 保存原有的隔离级别
previousIsolationLevel = currentIsolation;
// 设置新的隔离级别
con.setTransactionIsolation(definition.getIsolationLevel());
}
}
// 返回原有的隔离级别
return previousIsolationLevel;
}
}
// 如果设置不需要提前处理JDBC连接,此时不允许自定义隔离级别
else if (isolationLevelNeeded) {
throw new InvalidIsolationLevelException(getClass().getSimpleName() +" does not support custom isolation levels since the 'prepareConnection' flag is off.");
}
}
// 标准JPA事务开始,调用完整的JPA上下文设置
entityManager.getTransaction().begin();
// 调整刷新模式并返回以前的刷新模式
FlushMode previousFlushMode = prepareFlushMode(session, definition.isReadOnly()){
// 反射或者getHibernateFlushMode方法,获取刷新模式
FlushMode flushMode = (FlushMode) ReflectionUtils.invokeMethod(getFlushMode, session);
// 如果是只读事务
if (readOnly) {
// FlushMode.MANUAL: 只有在显示调用flush的时候才会刷新持久化上下文
// 如果不是这种,我们需要设置为这种,因为这种效率最高
if (!flushMode.equals(FlushMode.MANUAL)) {
session.setFlushMode(FlushMode.MANUAL);
return flushMode;
}
}
else {
// 对于非只读事务,如果是COMMIT,则设置为AUTO
if (flushMode.lessThan(FlushMode.COMMIT)) {
session.setFlushMode(FlushMode.AUTO);
return flushMode;
}
}
// 其他的,直接不管,没有修改,也不需要还原
return null;
}
// JpaTransactionDefinition符合下面条件
if (definition instanceof ResourceTransactionDefinition && ((ResourceTransactionDefinition) definition).isLocalResource()) {
// 在5.1版本中,显式地优化了事务本地EntityManager,与HibernateTransactionManager行为保持一致
// 将之前的刷新模式制空
previousFlushMode = null;
// 如果为只读事务
if (definition.isReadOnly()) {
// 设置事务只读
session.setDefaultReadOnly(true);
}
}
// 返回Session事务数据信息
return new SessionTransactionData(session, previousFlushMode, preparedCon, previousIsolationLevel, definition.isReadOnly());
}
// 保存事务数据对象
txObject.setTransactionData(transactionData){
// 保存事务数据
this.transactionData = transactionData;
// 设置当前EntityManager与事务进行了同步
getEntityManagerHolder().setTransactionActive(true);
// 如果事务数据类型为SavepointManager管理器,那么才支持嵌套事务
// 如果不是,那就不支持,Hibernate就不支持
if (transactionData instanceof SavepointManager) {
// 设置保存点管理器
getEntityManagerHolder().setSavepointManager((SavepointManager) transactionData);
}
}
// 设置事务只读
txObject.setReadOnly(definition.isReadOnly());
// 设置事务超时的时间
if (timeoutToUse != TransactionDefinition.TIMEOUT_DEFAULT) {
txObject.getEntityManagerHolder().setTimeoutInSeconds(timeoutToUse);
}
// 为数据源注册JPA EntityManager的JDBC连接
// 如果存在数据源
if (getDataSource() != null) {
// 获取JPA供应商提供的获取JDBC连接处理器
ConnectionHandle conHandle = getJpaDialect().getJdbcConnection(em, definition.isReadOnly()){
Session session = getSession(entityManager);
return new HibernateConnectionHandle(session);
}
// 如果存在连接管理器
if (conHandle != null) {
// 保存JDBC连接信息
ConnectionHolder conHolder = new ConnectionHolder(conHandle);
// 设置超时时间
if (timeoutToUse != TransactionDefinition.TIMEOUT_DEFAULT) {
conHolder.setTimeoutInSeconds(timeoutToUse);
}
// 当前当前连接资源
TransactionSynchronizationManager.bindResource(getDataSource(), conHolder);
// 将当前连接保存到事务对象中
txObject.setConnectionHolder(conHolder);
}
}
// 如果EntityManager是新建的
if (txObject.isNewEntityManagerHolder()) {
// 将EntityManager绑定到当前线程
TransactionSynchronizationManager.bindResource(obtainEntityManagerFactory(), txObject.getEntityManagerHolder());
}
// 设置当前EntityManager的标识,标识当前EntityManager与事务同步
txObject.getEntityManagerHolder().setSynchronizedWithTransaction(true);
}
catch (TransactionException ex) {
closeEntityManagerAfterFailedBegin(txObject){
// 如果是新创建的EntityManager
if (txObject.isNewEntityManagerHolder()) {
// 获取EntityManager
EntityManager em = txObject.getEntityManagerHolder().getEntityManager();
try {
// 如果开启了事务,则回滚事务
if (em.getTransaction().isActive()) {
em.getTransaction().rollback(){
// 如果事务不是活跃的
if (!isActive()) {
// JPA法规要求在非活动事务上调用回滚时抛出IllegalStateException
if ( jpaCompliance.isJpaTransactionComplianceEnabled() ) {
throw new IllegalStateException("JPA compliance dictates throwing IllegalStateException when #rollback " +"is called on non-active transaction");
}
}
// 获取当前事务状态,这个是Hibernate的事务状态
TransactionStatus status = getStatus();
// 如果连接的事务状态已经是回滚状态或者非活跃状态(事务尚未开始),直接返回
if ( status == TransactionStatus.ROLLED_BACK || status == TransactionStatus.NOT_ACTIVE ) {
return;
}
// 如果该状态不能回滚,抛出异常
if ( !status.canRollback() ) {
throw new TransactionException( "Cannot rollback transaction in current status [" + status.name() + "]" );
}
// 如果状态不为FAILED_COMMIT(事务试图提交,但失败),或者允许提交失败后回滚
if ( status != TransactionStatus.FAILED_COMMIT || allowFailedCommitToPhysicallyRollback() ) {
// 调用内部的获取事务控制器进行回滚
internalGetTransactionDriverControl().rollback(){
try {
// 获取事务状态
TransactionStatus status = jdbcResourceTransaction.getStatus();
// 如果已经标识了当前事务仅回滚并且事务已经开始,或者事务是活跃的
if ( ( rollbackOnly && status != TransactionStatus.NOT_ACTIVE ) || status == TransactionStatus.ACTIVE ) {
jdbcResourceTransaction.rollback(){
try {
// 获取当前连接对象
Connection con = getConnectionForTransactionManagement();
// 执行回滚操作
con.rollback();
// 标识该事务已经回滚
status = TransactionStatus.ROLLED_BACK;
}
catch( SQLException e ) {
// 标识该事务回滚失败
status = TransactionStatus.FAILED_ROLLBACK;
throw new TransactionException( "Unable to rollback against JDBC Connection", e );
}
// 事务完成操作
afterCompletion(){
// 如果设置了初始自动提交
if ( initiallyAutoCommit ) {
// 还原自动提交表示
getConnectionForTransactionManagement().setAutoCommit( true );
// 标记事务已经关闭
status = TransactionStatus.NOT_ACTIVE;
}
// 设置自动提交为false,因为已经重置了AutoCommit为true
initiallyAutoCommit = false;
// 事务完成之后操作
afterTransaction(){
// 释放资源
resourceRegistry.releaseResources();
// 释放JDBC连接
releaseConnection();
}
}
}
JdbcResourceLocalTransactionCoordinatorImpl.this.afterCompletionCallback( false );
}
}
finally {
// 重置仅回滚标识
rollbackOnly = false;
}
}
}
}
}
}
finally {
// 关闭EntityManager
EntityManagerFactoryUtils.closeEntityManager(em);
}
// 清空事务对象的EntityManager
txObject.setEntityManagerHolder(null, false);
}
}
throw ex;
}
catch (Throwable ex) {
closeEntityManagerAfterFailedBegin(txObject);
throw new CannotCreateTransactionException("Could not open JPA EntityManager for transaction", ex);
}
}
// 预处理事务
prepareSynchronization(status, def)
{
// 是否是新事务
if (status.isNewSynchronization()) {
// 设置真实的事务活动状态,如果存在事务对象,就表示有事务,那么事务的真实活动状态就为true
TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction());
// 设置当前的隔离级别
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null);
// 设置当前事务是否只读
TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly());
// 设置当前事务的名称
TransactionSynchronizationManager.setCurrentTransactionName(definition.getName());
// 初始化事务信息,上面有同样的代码解析
TransactionSynchronizationManager.initSynchronization();
}
}
return status;
} catch (RuntimeException | Error ex) {
// 恢复当前事务信息
resume(null, suspendedResources);
throw ex;
}
}
// 除了PROPAGATION_REQUIRED,PROPAGATION_REQUIRES_NEW,PROPAGATION_NESTED这三中传播行为
// 其他的都不需要创建开启新的事物
else {
// 判断是否是新事物
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
// 预处理事物状态,封装成事物状态对象
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
}
// 预处理事物信息,封装成事物信息对象
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
{
// 将事务管理器,事务属性,和方法签名封装成事务信息对象
TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification);
// 如果存在事务属性
if (txAttr != null) {
// 将事务状态保存到事务信息中
txInfo.newTransactionStatus(status);
}
// 将事务信息绑定到当前线程
txInfo.bindToThread();
{
// 从ThreadLocal中获取原有的事务信息
this.oldTransactionInfo = transactionInfoHolder.get();
// 设置为当前的事务信息
transactionInfoHolder.set(this);
}
return txInfo;
}
}
Object retVal;
try {
// 执行回调函数,这个的回调是是一个lambda
// 是org.springframework.aop.framework.ReflectiveMethodInvocation.proceed的变种
// 也就是这个invocation.proceedWithInvocation();就是触发ReflectiveMethodInvocation.proceed的执行
// 在Spring中JDK动态代理和CBLIB代理的拦截逻辑有详细讲解
retVal = invocation.proceedWithInvocation();
} catch (Throwable ex) {
// 如果执行目标方法中存在异常
completeTransactionAfterThrowing(txInfo, ex);
{
// 如果存在事务信息,并且创建了事务状态(创建了事务)
if (txInfo != null && txInfo.getTransactionStatus() != null) {
// 获取事务属性,以及事务属性中设置的回滚异常标识
// 如果当前异常与事务属性设置的异常符合条件,需要进行回滚
// rollbackOn: return (ex instanceof RuntimeException || ex instanceof Error);
if (txInfo.transactionAttribute != null && txInfo.transactionAttribute.rollbackOn(ex)) {
try {
// 调用事务管理器进行回滚
// 将事务状态信息传递
txInfo.getTransactionManager().rollback(txInfo.getTransactionStatus());
{
// 如果已经提交了
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
}
// 强转为事务状态类型
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 执行回滚
processRollback(defStatus, false);
{
try {
boolean unexpectedRollback = unexpected;
try {
// 触发提交之前的回调方法
triggerBeforeCompletion(status);
{
// 如果当前事务状态是一个新的事物
if (status.isNewSynchronization()) {
// 获取当前线程绑定到事务同步管理器中的所有事务
// 并且执行这些事务的提交之前的回调方法
TransactionSynchronizationUtils.triggerBeforeCompletion();
}
}
// 如果存在保存点,表示是嵌套事务
if (status.hasSavepoint()) {
// 将状态
status.rollbackToHeldSavepoint();
{
Object savepoint = getSavepoint();
// 获取保存点管理器,回滚
getSavepointManager().rollbackToSavepoint(savepoint);
{
// 获取到连接
ConnectionHolder conHolder = getConnectionHolderForSavepoint();
// 回滚到保存点
conHolder.getConnection().rollback((Savepoint) savepoint);
// 重置当前仅回滚标识,可能在之前逻辑设置了当前事务仅回滚
// 回滚之后要重置标识
conHolder.resetRollbackOnly();
}
// 重置保存点
getSavepointManager().releaseSavepoint(savepoint);
{
conHolder.getConnection().releaseSavepoint((Savepoint) savepoint);
}
// 删除保存点
setSavepoint(null);
}
}
// 如果不是嵌套事务
// 是一个新事务
else if (status.isNewTransaction()) {
// 回滚
doRollback(status);
{
// 获取到事务对象
JpaTransactionObject txObject = (JpaTransactionObject) status.getTransaction();
// 获取到事务对象
EntityTransaction tx = txObject.getEntityManagerHolder().getEntityManager().getTransaction();
// 如果活跃的
if (tx.isActive()) {
// 上面有讲解,调用EntityTransaction的回滚方法,在上面搜em.getTransaction().rollback()中有解释
tx.rollback();
}
// 如果不是新建的Entity
if (!txObject.isNewEntityManagerHolder()) {
// 清空持久化上下文管理的实体信息
txObject.getEntityManagerHolder().getEntityManager().clear();
}
}
}
// 如果不是一个新事物,表示之前已经存在事务
else {
// 当前是否存在事务
if (status.hasTransaction()) {
// 如果当前事务标记了仅回滚,并且全局事务的仅回滚标识为true,默认为true
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
// 设置当前事务为仅状态
doSetRollbackOnly(status);
{
// 获取对事务对象
JpaTransactionObject txObject = (JpaTransactionObject) status.getTransaction();
// 设置当前事务为仅回滚
txObject.setRollbackOnly();
}
}
}
// 如果事务被全局标记为仅回滚,则返回是否提前失败
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
} catch (RuntimeException | Error ex) {
// 触发当前事务同步管理器保存的当前线程所有的事务提交之后的回调(TransactionSynchronization)
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
// 触发当前事务同步管理器保存的当前线程所有的事务提交之后的回调
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// 如果有意想不到的异常,则抛出异常,这个是外部传递过来的
if (unexpectedRollback) {
throw new UnexpectedRollbackException("Transaction rolled back because it has been marked as rollback-only");
}
} finally {
// 提交之后,清理逻辑
cleanupAfterCompletion(status);
{
// 设置当前事务已经提交
status.setCompleted();
// 如果是新事物同步信息
if (status.isNewSynchronization()) {
// 情况当前事物的所有配置
TransactionSynchronizationManager.clear();
}
// 如果是新事物对象信息
if (status.isNewTransaction()) {
// 清理事务对象
doCleanupAfterCompletion(status.getTransaction());
{
// 获取事务对象
JpaTransactionObject txObject = (JpaTransactionObject) transaction;
// 如果是新建的EntityManager
if (txObject.isNewEntityManagerHolder()) {
// 解绑当前线程中的EntityManager
TransactionSynchronizationManager.unbindResourceIfPossible(obtainEntityManagerFactory());
}
// 清空EntityManager的上下文
txObject.getEntityManagerHolder().clear();
// 如果存在JDBC连接
if (getDataSource() != null && txObject.hasConnectionHolder()) {
// 解绑绑定到当前线程的JDBC连接
TransactionSynchronizationManager.unbindResource(getDataSource());
ConnectionHandle conHandle = txObject.getConnectionHolder().getConnectionHandle();
if (conHandle != null) {
// 释放JDBC连接
getJpaDialect().releaseJdbcConnection(conHandle, txObject.getEntityManagerHolder().getEntityManager());
}
}
// 清空事务信息
getJpaDialect().cleanupTransaction(txObject.getTransactionData())
{
if (transactionData instanceof SessionTransactionData) {
((SessionTransactionData) transactionData).resetSessionState()
{
// 重置session刷新模式
if (this.previousFlushMode != null) {
this.session.setFlushMode(this.previousFlushMode);
}
// 重置事务的隔离级别,只读标识
if (this.preparedCon != null && this.session.isConnected()) {
DataSourceUtils.resetConnectionAfterTransaction(conToReset, this.previousIsolationLevel, this.readOnly);
}
}
}
// 如果是新建的EntityManager
if (txObject.isNewEntityManagerHolder()) {
// 关闭EntityManager
EntityManager em = txObject.getEntityManagerHolder().getEntityManager();
EntityManagerFactoryUtils.closeEntityManager(em);
}
}
}
// 如果存在暂停的事务资源
if (status.getSuspendedResources() != null) {
// 获取到事务对象
Object transaction = (status.hasTransaction() ? status.getTransaction() : null);
// 恢复事务
resume(transaction, (SuspendedResourcesHolder) status.getSuspendedResources());
}
}
}
}
}
} catch(TransactionSystemException ex2){
throw ex2;
} catch(RuntimeException | Error ex2){
throw ex2;
}
}
// 非Error和非RuntimeException,提交事务
else{
try {
// 提交事务
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()) {
// 如果已经提交了
if (status.isCompleted()) {
throw new IllegalTransactionStateException("Transaction is already completed - do not call commit or rollback more than once per transaction");
}
// 事务状态
DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status;
// 如果被标记了仅回滚,这里是判断当前事务是否设置了仅回滚
if (defStatus.isLocalRollbackOnly()) {
// 执行回滚逻辑,上面有详细讲解
processRollback(defStatus, false);
return;
}
// shouldCommitOnGlobalRollbackOnly:默认值为false
// 如果当前状态标记了整个大事务全局回滚,则进行回滚
// 这里是判断整个事务中,是否有被标记了仅回滚的事务,因为一次可能存在多个事务的情况,只要有一个事务被标记了,那么所有的事务将回滚
if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()
{
// 当事务类型是SmartTransactionObject,其中就是这种类型JpaTransactionObject
// 并且给txObject设置了RollbackOnly
// 因为this.transaction.isRollbackOnly()实际上获取的是return getConnectionHolder().isRollbackOnly();
// 而已存在的ConnectionHolder是会被绑定到当前线程的资源中,如果一个或者多个事务中,并没有将这些事务隔离,那么其他事务获取到的也是同一个ConnectionHolder
// 所以最终共享这个回滚的变量,有一个事务设置了,那么整个事务将被回滚
return ((this.transaction instanceof SmartTransactionObject) && ((SmartTransactionObject) this.transaction).isRollbackOnly());
}){
// 回滚
processRollback(defStatus, true);
return;
}
// 提交事务
processCommit(defStatus);
{
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
// 预处理提交信息,默认空实现
prepareForCommit(status);
// 触发提交之前的所有事务回调
triggerBeforeCommit(status);
// 触发完成之前的所有事务回调
triggerBeforeCompletion(status);
// 回调执行完毕标识
beforeCompletionInvoked = true;
// 如果是嵌套事务,存在保存点
if (status.hasSavepoint()) {
unexpectedRollback = status.isGlobalRollbackOnly();
// 重置保存点
status.releaseHeldSavepoint();
{
Object savepoint = getSavepoint();
getSavepointManager().releaseSavepoint(savepoint);
setSavepoint(null);
}
}
// 如果是新事务
else if (status.isNewTransaction()) {
unexpectedRollback = status.isGlobalRollbackOnly();
// 提交事务
doCommit(status);
{
// 获取事务对象
JpaTransactionObject txObject = (JpaTransactionObject) status.getTransaction();
// 获取Jpa事务对象
EntityTransaction tx = txObject.getEntityManagerHolder().getEntityManager().getTransaction();
// 提交JPP事务
tx.commit(){
internalGetTransactionDriverControl().commit(){
jdbcResourceTransaction.commit(){
try {
// 获取到JDBC连接,进行提交
getConnectionForTransactionManagement().commit();
// 修改事务的状态为提交完成
status = TransactionStatus.COMMITTED;
}
catch( SQLException e ) {
// 修改事务状态为提交失败
status = TransactionStatus.FAILED_COMMIT;
throw new TransactionException( "Unable to commit against JDBC Connection", e );
}
// 提交之后的收尾,上面有讲解
afterCompletion();
}
}
}
}
} else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// 如果全局回滚,则抛出异常,但是事务已经提交了
if (unexpectedRollback) {
throw new UnexpectedRollbackException("Transaction silently rolled back because it has been marked as rollback-only");
}
} catch (UnexpectedRollbackException ex) {
// 触发当前事务同步管理器保存的当前线程所有的事务提交之后的回调
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
} catch (TransactionException ex) {
// 在调用commit的时候失败,是否需要调用回滚
// 默认为false
if (isRollbackOnCommitFailure()) {
// 提交出现异常执行回滚
doRollbackOnCommitException(status, ex);
{
try {
// 如果是新事务,直接回滚
if (status.isNewTransaction()) {
// 上面有详解
doRollback(status);
}
// 如果存在事务,并且标记了全局事务回滚
else if (status.hasTransaction() && isGlobalRollbackOnParticipationFailure()) {
// 标记当前事务为仅回滚
doSetRollbackOnly(status);
}
} catch (RuntimeException | Error rbex) {
// 触发当前事务同步管理器保存的当前线程所有的事务提交之后的回调
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw rbex;
}
// 触发当前事务同步管理器保存的当前线程所有的事务提交之后的回调
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
}
} else {
// 触发当前事务同步管理器保存的当前线程所有的事务提交之后的回调
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
} catch (RuntimeException | Error ex) {
// beforeCompletion已经执行过的标识
// 如果未执行过,标识执行之前出现了异常
if (!beforeCompletionInvoked) {
// 触发完成之前的所有事务回调
triggerBeforeCompletion(status);
}
// 提交出现异常执行回滚,上面有详解
doRollbackOnCommitException(status, ex);
throw ex;
}
try {
// 触发所有事务信息提交之后的回调
triggerAfterCommit(status);
} finally {
// 触发当前事务同步管理器保存的当前线程所有的事务提交之后的回调
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
} finally {
// 提交之后,清理逻辑,上面有讲解
cleanupAfterCompletion(status);
}
}
} ;
} catch (TransactionSystemException ex2) {
throw ex2;
} catch (RuntimeException | Error ex2) {
throw ex2;
}
}
}
}
throw ex;
} finally{
// 清理事务信息
cleanupTransactionInfo(txInfo);
{
if (txInfo != null) {
txInfo.restoreThreadLocalStatus();
{
// 还原之前的事务状态信息
transactionInfoHolder.set(this.oldTransactionInfo);
}
}
}
}
// 正常返回的情况下提交事务
commitTransactionAfterReturning(txInfo);
{
// 如果存在事务信息,并且存在事务状态
if (txInfo != null && txInfo.getTransactionStatus() != null) {
// 获取事务管理器提交事务,上面有详细讲解
// 搜txInfo.getTransactionManager().commit(txInfo.getTransactionStatus())
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}
// 返回执行结果
return retVal;
}
} else {
final ThrowableHolder throwableHolder = new ThrowableHolder();
// 事务属性不为空,并且事务管理器为CallbackPreferringPlatformTransactionManager类型
try {
// 调用事务管理器的execute方法,开始执行
Object result = ((CallbackPreferringPlatformTransactionManager) ptm).execute(txAttr, status -> {
// 预处理事务信息,上面有讲过
TransactionInfo txInfo = prepareTransactionInfo(ptm, txAttr, joinpointIdentification, status);
try {
// 执行回调函数,这个的回调是是一个lambda
// 是org.springframework.aop.framework.ReflectiveMethodInvocation.proceed的变种
// 也就是这个invocation.proceedWithInvocation();就是触发ReflectiveMethodInvocation.proceed的执行
// 在Spring中JDK动态代理和CBLIB代理的拦截逻辑有详细讲解
Object retVal = invocation.proceedWithInvocation();
return retVal;
} catch (Throwable ex) {
if (txAttr.rollbackOn(ex)) {
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
} else {
throw new ThrowableHolderException(ex);
}
} else {
throwableHolder.throwable = ex;
return null;
}
} finally {
// 清理事务信息,上面有讲
cleanupTransactionInfo(txInfo);
}
});
if (throwableHolder.throwable != null) {
throw throwableHolder.throwable;
}
return result;
} catch (ThrowableHolderException ex) {
throw ex.getCause();
} catch (TransactionSystemException ex2) {
throw ex2;
} catch (Throwable ex2) {
throw ex2;
}
}
}
}
}
// 用于解析jpa中@Transactional事务注解
class CustomAnnotationTransactionAttributeSource {
private static final boolean jta12Present = ClassUtils.isPresent("javax.transaction.Transactional",
CustomAnnotationTransactionAttributeSource.class.getClassLoader());
private static final boolean ejb3Present = ClassUtils.isPresent("javax.ejb.TransactionAttribute",
CustomAnnotationTransactionAttributeSource.class.getClassLoader());
// 默认为true
public CustomAnnotationTransactionAttributeSource(boolean publicMethodsOnly) {
// 是否仅支持public方法
this.publicMethodsOnly = publicMethodsOnly;
// 注解解析器
this.annotationParsers = new LinkedHashSet<>(2);
// Spring事务注解解析器
this.annotationParsers.add(new SpringTransactionAnnotationParser());
if (jta12Present) {
// JTA事务注解解析器
this.annotationParsers.add(new JtaTransactionAnnotationParser());
}
if (ejb3Present) {
// EJB注解解析器
this.annotationParsers.add(new Ejb3TransactionAnnotationParser());
}
}
// 获取事务属性
public TransactionAttribute getTransactionAttribute(Method method, Class<?> targetClass) {
// 先看之前是否查找过事务注解,如果处理了就会缓存起来
Object cacheKey = getCacheKey(method, targetClass);
Object cached = this.attributeCache.get(cacheKey);
// 如果处理过
if (cached != null) {
// 方法没有事务注解
if (cached == NULL_TRANSACTION_ATTRIBUTE) {
return null;
} else {
// 存在事务注解,返回缓存的事务属性
return (TransactionAttribute) cached;
}
} else {
// 如果没有缓存,我们需要找事务注解
TransactionAttribute txAtt = computeTransactionAttribute(method, targetClass);
// 如果没有获取到事务属性,表示没有事务注解
if (txAtt == null) {
// 标识没有事务注解
this.attributeCache.put(cacheKey, NULL_TRANSACTION_ATTRIBUTE);
} else {
// 缓存事务属性信息
this.attributeCache.put(cacheKey, txAtt);
}
return txAtt;
}
}
// 找@Transactional注解计算事务属性
private TransactionAttribute computeTransactionAttribute(Method method, Class<?> targetClass) {
// publilc方法的逻辑校验,字面意思,如果当前类只支持public方法,但是该方法又不是public,就不处理
if (allowPublicMethodsOnly() && !Modifier.isPublic(method.getModifiers())) {
return null;
}
// 获取到用户实际的类
Class<?> userClass = ProxyUtils.getUserClass(targetClass);
// 上面注释,method是从外面传递过来的,要单独处理JDK动态代理
// specificMethod是被调用的类的方法对象,根据上面注释代码,method可能是原始方法,CBLIB方法,也可能是接口的方法对象
// 例如:userClass = B B impl A => B.test() 此时,specificMethod = B.test() , method = A.test()
Method specificMethod = ClassUtils.getMostSpecificMethod(method, userClass);
specificMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
TransactionAttribute txAtt = null;
// specificMethod是被调用的类的方法对象,根据上面注释代码,method可能是原始方法,CBLIB方法,也可能是接口的方法对象
// 例如:userClass = B B impl A => B.test() 此时,specificMethod = B.test() , method = A.test()
// specificMethod != method,表示该方法是继承或者实现过来的,不是自身的方法
if (specificMethod != method) {
// 首先通过method本身来找方法上的注解
// 这里实际上是@Repository接口中的方法上的注解
txAtt = findTransactionAttribute(method);
if (txAtt != null) {
return txAtt;
}
// 再找方法所属类中的注解
// 这里实际上是@Repository接口上的注解
txAtt = findTransactionAttribute(method.getDeclaringClass());
// 如果找到了注解,或者不是启用默认的事务
if (txAtt != null || !enableDefaultTransactions) {
return txAtt;
}
}
// 接着找实现类方法上的注解
txAtt = findTransactionAttribute(specificMethod);
if (txAtt != null) {
return txAtt;
}
// 最后中实现类上的注解
txAtt = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAtt != null) {
return txAtt;
}
// 如果都没找到,并且不启动默认的事务,则返回空
if (!enableDefaultTransactions) {
return null;
}
// 获取@Repository接口实现类中的方法
Method targetClassMethod = repositoryInformation.getTargetClassMethod(method);
// 如果实现类中的方法与当前执行的方法是同一个,直接结束,因为之前已经通过method找过了
if (targetClassMethod.equals(method)) {
return null;
}
// 找所有接口碎片实现类中的该方法的注解
txAtt = findTransactionAttribute(targetClassMethod);
if (txAtt != null) {
return txAtt;
}
// 找所有接口碎片实现类中上的注解
txAtt = findTransactionAttribute(targetClassMethod.getDeclaringClass());
if (txAtt != null) {
return txAtt;
}
// 没有任何地方添加了事务注解
return null;
}
// 找方法中的事务注解
protected TransactionAttribute findTransactionAttribute(Method method) {
return determineTransactionAttribute(method);
}
// 找类上的事务注解
protected TransactionAttribute findTransactionAttribute(Class<?> clazz) {
return determineTransactionAttribute(clazz);
}
// 确定事务属性,解析@Transactional注解,得到对应的TransactionAttribute对象
protected TransactionAttribute determineTransactionAttribute(AnnotatedElement ae) {
// 使用事务解析器进行解析
for (TransactionAnnotationParser annotationParser : this.annotationParsers) {
// 事务解析器解析注解
TransactionAttribute attr = annotationParser.parseTransactionAnnotation(ae) {
// 找Transactional注解
AnnotationAttributes attributes = AnnotatedElementUtils.findMergedAnnotationAttributes(element, Transactional.class, false, false);
// 如果存在Transactional注解,解析Transactional注解
if (attributes != null) {
// 将注解信息封装成RuleBasedTransactionAttribute对象
return parseTransactionAnnotation(attributes);
{
RuleBasedTransactionAttribute rbta = new RuleBasedTransactionAttribute();
Propagation propagation = attributes.getEnum("propagation");
rbta.setPropagationBehavior(propagation.value());
Isolation isolation = attributes.getEnum("isolation");
rbta.setIsolationLevel(isolation.value());
rbta.setTimeout(attributes.getNumber("timeout").intValue());
rbta.setReadOnly(attributes.getBoolean("readOnly"));
rbta.setQualifier(attributes.getString("value"));
List<RollbackRuleAttribute> rollbackRules = new ArrayList<>();
for (Class<?> rbRule : attributes.getClassArray("rollbackFor")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("rollbackForClassName")) {
rollbackRules.add(new RollbackRuleAttribute(rbRule));
}
for (Class<?> rbRule : attributes.getClassArray("noRollbackFor")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
for (String rbRule : attributes.getStringArray("noRollbackForClassName")) {
rollbackRules.add(new NoRollbackRuleAttribute(rbRule));
}
rbta.setRollbackRules(rollbackRules);
return rbta;
}
}
// 不存在,返回null
return null;
}
// 如果解析到注解,返回事务属性信息
if (attr != null) {
return attr;
}
}
return null;
}
}
// 处理事务的后置处理器
class TransactionalRepositoryProxyPostProcessor {
public TransactionalRepositoryProxyPostProcessor(ListableBeanFactory beanFactory, String transactionManagerName, boolean enableDefaultTransaction) {
this.beanFactory = beanFactory;
// 事务管理器beanName,默认为transactionManager
this.transactionManagerName = transactionManagerName;
// 默认为true
this.enableDefaultTransactions = enableDefaultTransaction;
}
// 处理逻辑
public void postProcess(ProxyFactory factory, RepositoryInformation repositoryInformation) {
// 自定义事务注解解析器
CustomAnnotationTransactionAttributeSource transactionAttributeSource = new CustomAnnotationTransactionAttributeSource()[]
// 设置Repository接口信息
transactionAttributeSource.setRepositoryInformation(repositoryInformation);
// 设置是否开启默认事务
transactionAttributeSource.setEnableDefaultTransactions(enableDefaultTransactions);
// 设置事务拦截器
TransactionInterceptor transactionInterceptor = new TransactionInterceptor(null, transactionAttributeSource);
// 设置事务管理器
transactionInterceptor.setTransactionManagerBeanName(transactionManagerName);
transactionInterceptor.setBeanFactory(beanFactory);
transactionInterceptor.afterPropertiesSet();
// 给创建代理的工厂添加拦截器
factory.addAdvice(transactionInterceptor);
}
}
深入理解分布式事务⑧ ---->MySQL 事务的实现原理 之 MySQL 事务流程(MySQL 事务执行流程 和 恢复流程)详解
2024-04-03 04:34:02 10 阅读