Seata中AT模式的实现原理01-TM开启全局事务

什么是AT模式

AT模式是一种无侵入的分布式事务解决方案 保证最终一致性 是Seata默认的方式,在AT模式下,用户只需要关注自己的“业务SQL”,用户的“业务SQL”作为一阶段,Seata框架会自动的生成事务的二阶段提交和回滚

AT模式的机制

AT模式其实就是二阶段提交的演变
什么是二阶段提交可以看https://cloud.tencent.com/developer/article/2363585?areaId=106001
AT模式的主要流程是
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
二阶段:
提交异步化,非常快速地完成。
回滚通过一阶段的回滚日志进行反向补偿。

项目启动创建GlobalTransactional的代理类

SpringBoot项目启动的时候会通过自动装配机制将GlobalTransactionScanner装载到Bean中 然后 wrapIfNecessary方法会去做增强 创建针对 GlobalTransactionScanner的代理类 并构建全局的 globalTransactionalInterceptor拦截器
GlobalTransactionScanner

   // 对于扫描到的@GlobalTransactional注解, bean和beanName
// 判断类 或 类的某一个方法是否被@GlobalTransactional注解标注,进而决定当前Class是否需要创建动态代理;存在则创建。
@Override
protected Object wrapIfNecessary(Object bean, String beanName, Object cacheKey) {
   
    // do checkers,做一些检查,不用花过多精力关注
    if (!doCheckers(bean, beanName)) {
   
        return bean;
    }

    try {
   
        synchronized (PROXYED_SET) {
   
            if (PROXYED_SET.contains(beanName)) {
   
                return bean;
            }
            interceptor = null;
            //check TCC proxy TCC的动态代理
            if (TCCBeanParserUtils.isTccAutoProxy(bean, beanName, applicationContext)) {
   
                // init tcc fence clean task if enable useTccFence
                TCCBeanParserUtils.initTccFenceCleanTask(TCCBeanParserUtils.getRemotingDesc(beanName), applicationContext);
                //TCC interceptor, proxy bean of sofa:reference/dubbo:reference, and LocalTCC
                interceptor = new TccActionInterceptor(TCCBeanParserUtils.getRemotingDesc(beanName));
                ConfigurationCache.addConfigListener(ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                        (ConfigurationChangeListener) interceptor);
            } else {
   
                // 先获取目标Class的接口
                Class<?> serviceInterface = SpringProxyUtils.findTargetClass(bean);
                Class<?>[] interfacesIfJdk = SpringProxyUtils.findInterfaces(bean);

                // existsAnnotation()表示类或类方法是否有被@GlobalTransactional注解标注,进而决定类是否需要被动态代理
                if (!existsAnnotation(new Class[]{
   serviceInterface})
                        && !existsAnnotation(interfacesIfJdk)) {
   
                    return bean;
                }

                if (globalTransactionalInterceptor == null) {
   
                    // 构建一个全局拦截器
                    globalTransactionalInterceptor = new GlobalTransactionalInterceptor(failureHandlerHook);
                    ConfigurationCache.addConfigListener(
                            ConfigurationKeys.DISABLE_GLOBAL_TRANSACTION,
                            (ConfigurationChangeListener) globalTransactionalInterceptor);
                }
                interceptor = globalTransactionalInterceptor;
            }

            LOGGER.info("Bean[{}] with name [{}] would use interceptor [{}]", bean.getClass().getName(), beanName, interceptor.getClass().getName());
            // 如果当前Bean没有被AOP代理
            if (!AopUtils.isAopProxy(bean)) {
   
                // 基于Spring AOP的AutoProxyCreator对当前Class创建全局事务动态动态代理类
                bean = super.wrapIfNecessary(bean, beanName, cacheKey);
            } else {
   
                AdvisedSupport advised = SpringProxyUtils.getAdvisedSupport(bean);
                Advisor[] advisor = buildAdvisors(beanName, getAdvicesAndAdvisorsForBean(null, null, null));
                int pos;
                for (Advisor avr : advisor) {
   
                    // Find the position based on the advisor's order, and add to advisors by pos
                    // 找到seata切面的位置
                    pos = findAddSeataAdvisorPosition(advised, avr);
                    advised.addAdvisor(pos, avr);
                }
            }
            PROXYED_SET.add(beanName);
            return bean;
        }
    } catch (Exception exx) {
   
        throw new RuntimeException(exx);
    }
}

全局事务处理拦截器执行

对于一个全局事务来说,开启全局事务的角色,即TM
GlobalTransactionalInterceptor实现了MethodInterceptor接口,所以当每次执行添加了 GlobalTransactionalInterceptor拦截器的Bean的方法时,都会进入到GlobalTransactionalInterceptor类覆写MethodInterceptor接口的invoke()方法:

    @Override
    public Object invoke(final MethodInvocation methodInvocation) throws Throwable {
   
        //获取目标类
        Class<?> targetClass =
            methodInvocation.getThis() != null ? AopUtils.getTargetClass(methodInvocation.getThis()) : null;
       //获取调用的目标方法
        Method specificMethod = ClassUtils.getMostSpecificMethod(methodInvocation.getMethod(), targetClass);
        //只要不是object 的方法
        if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
   
            final Method method = BridgeMethodResolver.findBridgedMethod(specificMethod);
            //拿到注解
            final GlobalTransactional globalTransactionalAnnotation =
                getAnnotation(method, targetClass, GlobalTransactional.class);
           //获取全局锁
            final GlobalLock globalLockAnnotation = getAnnotation(method, targetClass, GlobalLock.class);
            boolean localDisable = disable || (degradeCheck && degradeNum >= degradeCheckAllowTimes);
            if (!localDisable) {
   
                if (globalTransactionalAnnotation != null) {
   
                    //处理全局事务
                    return handleGlobalTransaction(methodInvocation, globalTransactionalAnnotation);
                } else if (globalLockAnnotation != null) {
   
                    return handleGlobalLock(methodInvocation);
                }
            }
        }
        //调用原始方法
        return methodInvocation.proceed();
    }

GlobalTransactionalInterceptor

    private Object handleGlobalTransaction(final MethodInvocation methodInvocation,
        final GlobalTransactional globalTrxAnno) throws Throwable {
   
        boolean succeed = true;
        try {
   
            //调用之后调用重写的方法
            return transactionalTemplate.execute(new TransactionalExecutor() {
   
                @Override
                public Object execute() throws Throwable {
   
                    return methodInvocation.proceed();
                }
               //获取名字  GlobalTransactional中定义的name
                public String name() {
   
                    String name = globalTrxAnno.name();
                    if (!StringUtils.isNullOrEmpty(name)) {
   
                        return name;
                    }
                    return formatMethod(methodInvocation.getMethod());
                }

                //填充属性
                @Override
                public TransactionInfo getTransactionInfo() {
   
                    TransactionInfo transactionInfo = new TransactionInfo();
                    transactionInfo.setTimeOut(globalTrxAnno.timeoutMills());
                    transactionInfo.setName(name());
                    transactionInfo.setPropagation(globalTrxAnno.propagation());
                    Set<RollbackRule> rollbackRules = new LinkedHashSet<>();
                    for (Class<?> rbRule : globalTrxAnno.rollbackFor()) {
   
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.rollbackForClassName()) {
   
                        rollbackRules.add(new RollbackRule(rbRule));
                    }
                    for (Class<?> rbRule : globalTrxAnno.noRollbackFor()) {
   
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    for (String rbRule : globalTrxAnno.noRollbackForClassName()) {
   
                        rollbackRules.add(new NoRollbackRule(rbRule));
                    }
                    transactionInfo.setRollbackRules(rollbackRules);
                    return transactionInfo;
                }
            });
        } catch (TransactionalExecutor.ExecutionException e) {
   
            TransactionalExecutor.Code code = e.getCode();
            switch (code) {
   
                case RollbackDone:
                    throw e.getOriginalException();
                case BeginFailure:
                    succeed = false;
                    failureHandler.onBeginFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case CommitFailure:
                    succeed = false;
                    failureHandler.onCommitFailure(e.getTransaction(), e.getCause());
                    throw e.getCause();
                case RollbackFailure:
                    failureHandler.onRollbackFailure(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                case RollbackRetrying:
                    failureHandler.onRollbackRetrying(e.getTransaction(), e.getOriginalException());
                    throw e.getOriginalException();
                default:
                    throw new ShouldNeverHappenException(String.format("Unknown TransactionalExecutor.Code: %s", code));
            }
        } finally {
   
            if (degradeCheck) {
   
                EVENT_BUS.post(new DegradeCheckEvent(succeed));
            }
        }
    }

处理事务传播机制

TransactionalTemplate

 public Object execute(TransactionalExecutor business) throws Throwable {
   
        // 1 get transactionInfo
        //获取事务信息
        TransactionInfo txInfo = business.getTransactionInfo();
        if (txInfo == null) {
   
            throw new ShouldNeverHappenException("transactionInfo does not exist");
        }
        // 1.1 get or create a transaction
        //获取全局事务
        GlobalTransaction tx = GlobalTransactionContext.getCurrentOrCreate();

        // 1.2 Handle the Transaction propatation and the branchType
        //事务传播行为 默认REQUIRED
        Propagation propagation = txInfo.getPropagation();
        SuspendedResourcesHolder suspendedResourcesHolder = null;
        try {
   
            
            switch (propagation) {
   
                    //非事务方式执行 存在事务挂起
                case NOT_SUPPORTED:
                    suspendedResourcesHolder = tx.suspend(true);
                    return business.execute();
                    // 新建事务  如果当前存在事务把事务挂起
                case REQUIRES_NEW:
                    suspendedResourcesHolder = tx.suspend(true);
                    break;
                    //支持当前事务 如果当前没有事务已非事务的方式运行
                case SUPPORTS:
                    if (!existingTransaction()) {
   
                        return business.execute();
                    }
                    break;
                    //支持当前事务 如果没有则新建
                case REQUIRED:
                    break;
                    // 已非事务的方式执行 如果当前存在事务抛出异常
                case NEVER:
                    if (existingTransaction()) {
   
                        throw new TransactionException(
                                String.format("Existing transaction found for transaction marked with propagation 'never',xid = %s"
                                        ,RootContext.getXID()));
                    } else {
   
                        return business.execute();
                    }
                    // 支持当前事务 如果当前存在事务则抛异常
                case MANDATORY:
                    if (!existingTransaction()) {
   
                        throw new TransactionException("No existing transaction found for transaction marked with propagation 'mandatory'");
                    }
                    break;
                default:
                    throw new TransactionException("Not Supported Propagation:" + propagation);
            }


           ......
        } finally {
   
            tx.resume(suspendedResourcesHolder);
        }

    }

全局事务开启

   ......
   try {
   

                // 2. begin transaction 
                //开启全局事务
                beginTransaction(txInfo, tx);
 
                Object rs = null;
                try {
   
                    //执行业务代码
                    // Do Your Business
                    rs = business.execute();

                } catch (Throwable ex) {
   
                   //出现异常回滚
                    // 3.the needed business exception to rollback.
                    completeTransactionAfterThrowing(txInfo, tx, ex);
                    throw ex;
                }
                //提交
                // 4. everything is fine, commit.
                commitTransaction(tx);

                return rs;
            } finally {
   
                //5. clear
                triggerAfterCompletion();
                cleanUp();
            }
            ......

获取全局事务

    public static GlobalTransaction getCurrentOrCreate() {
   
        GlobalTransaction tx = getCurrent();
        if (tx == null) {
   
            return createNew();
        }
        return tx;
    }

开启全局事务

    private void beginTransaction(TransactionInfo txInfo, GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
   
        try {
   
            //前置钩子扩展
            triggerBeforeBegin();
            tx.begin(txInfo.getTimeOut(), txInfo.getName());
            //后置钩子扩展
            triggerAfterBegin();
        } catch (TransactionException txe) {
   
            throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);

        }
    }

DefaultGlobalTransaction

    @Override
    public void begin(int timeout, String name) throws TransactionException {
   
          
        if (role != GlobalTransactionRole.Launcher) {
   
            assertXIDNotNull();
            if (LOGGER.isDebugEnabled()) {
   
                LOGGER.debug("Ignore Begin(): just involved in global transaction [{}]", xid);
            }
            return;
        }
        assertXIDNull();
        if (RootContext.getXID() != null) {
   
            throw new IllegalStateException();
        }
        //开启事务  xid是server端创建返回的
        xid = transactionManager.begin(null, null, name, timeout);
        //修改状态为Begin
        status = GlobalStatus.Begin;
        //绑定 xid 在被调用方会从Feign 那个拦截器里拿出来用
        RootContext.bind(xid);
        if (LOGGER.isInfoEnabled()) {
   
            LOGGER.info("Begin new global transaction [{}]", xid);
        }

    }
    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
   
        //创建请求
        GlobalBeginRequest request = new GlobalBeginRequest();
        request.setTransactionName(name);
        request.setTimeout(timeout);
        //发起请求 通过netty
        GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
        if (response.getResultCode() == ResultCode.Failed) {
   
            throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
        }
        return response.getXid();
    }

TC处理TM的开启全局事务请求

前面那些Netty怎么处理接收请求的就不写了 想看的可以子集dbug看
AbstractTCInboundHandler

    @Override
    public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
   
        GlobalBeginResponse response = new GlobalBeginResponse();
        exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
   
            @Override
            public void execute(GlobalBeginRequest request, GlobalBeginResponse response) throws TransactionException {
   
                try {
   
                    doGlobalBegin(request, response, rpcContext);
                } catch (StoreException e) {
   
                    throw new TransactionException(TransactionExceptionCode.FailedStore,
                        String.format("begin global request failed. xid=%s, msg=%s", response.getXid(), e.getMessage()),
                        e);
                }
            }
        }, request, response);
        return response;
    }

DefaultCoordinator

    @Override
    protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
            throws TransactionException {
   
        //设置全局事务id
        response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
                request.getTransactionName(), request.getTimeout()));
        if (LOGGER.isInfoEnabled()) {
   
            LOGGER.info("Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
                    rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(), request.getTransactionName(), request.getTimeout(), response.getXid());
        }
    }
    @Override
    public String begin(String applicationId, String transactionServiceGroup, String name, int timeout)
        throws TransactionException {
   
        //创建一个全局会话
        GlobalSession session = GlobalSession.createGlobalSession(applicationId, transactionServiceGroup, name,
            timeout);
        //设置xid信息 到map里面 
        MDC.put(RootContext.MDC_KEY_XID, session.getXid());
        session.addSessionLifecycleListener(SessionHolder.getRootSessionManager());
        //开启一个会话  
        session.begin();
        //发送事件
        // transaction start event
        MetricsPublisher.postSessionDoingEvent(session, false);

        return session.getXid();
    }

GlobalSession


    @Override
    public void begin() throws TransactionException {
   
        //更新状态
        this.status = GlobalStatus.Begin;
        //更新变更事件
        this.beginTime = System.currentTimeMillis();
        this.active = true;
        //遍历会话生命周期的监听
        for (SessionLifecycleListener lifecycleListener : lifecycleListeners) {
   
            lifecycleListener.onBegin(this);
        }
    }

AbstractSessionManager

    @Override
    public void onBegin(GlobalSession globalSession) throws TransactionException {
   
        addGlobalSession(globalSession);
    }

AbstractSessionManager

    @Override
    public void addGlobalSession(GlobalSession session) throws TransactionException {
   
        if (LOGGER.isDebugEnabled()) {
   
            LOGGER.debug("MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation.GLOBAL_ADD);
        }
        writeSession(LogOperation.GLOBAL_ADD, session);
    }

将全局事务写到事务表(global_table)中
AbstractSessionManager

    private void writeSession(LogOperation logOperation, SessionStorable sessionStorable) throws TransactionException {
   
        if (!transactionStoreManager.writeSession(logOperation, sessionStorable)) {
   
            if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
   
                throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
                    "Fail to store global session");
            } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
   
                throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
                    "Fail to update global session");
            } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
   
                throw new GlobalTransactionException(TransactionExceptionCode.FailedWriteSession,
                    "Fail to remove global session");
            } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
   
                throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
                    "Fail to store branch session");
            } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
   
                throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
                    "Fail to update branch session");
            } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
   
                throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
                    "Fail to remove branch session");
            } else {
   
                throw new BranchTransactionException(TransactionExceptionCode.FailedWriteSession,
                    "Unknown LogOperation:" + logOperation.name());
            }
        }
    }

在这里插入图片描述
transactionStoreManager 分为三种一种是文件的形式 一种是db的形式一种是redis的形式 他们是项目启动的时候在Server的start方法中调用SessionHolder的init方法进行加载 mode是在 配置文件的 store.mode进行配置 我这里是配置的 db 所以会走下面的 DataBaseTransactionStoreManager 的逻辑中
DataBaseTransactionStoreManager

    @Override
    public boolean writeSession(LogOperation logOperation, SessionStorable session) {
   
        if (LogOperation.GLOBAL_ADD.equals(logOperation)) {
   
            return logStore.insertGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
        } else if (LogOperation.GLOBAL_UPDATE.equals(logOperation)) {
   
            return logStore.updateGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
        } else if (LogOperation.GLOBAL_REMOVE.equals(logOperation)) {
   
            return logStore.deleteGlobalTransactionDO(SessionConverter.convertGlobalTransactionDO(session));
        } else if (LogOperation.BRANCH_ADD.equals(logOperation)) {
   
            return logStore.insertBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
        } else if (LogOperation.BRANCH_UPDATE.equals(logOperation)) {
   
            return logStore.updateBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
        } else if (LogOperation.BRANCH_REMOVE.equals(logOperation)) {
   
            return logStore.deleteBranchTransactionDO(SessionConverter.convertBranchTransactionDO(session));
        } else {
   
            throw new StoreException("Unknown LogOperation:" + logOperation.name());
        }
    

LogStoreDataBaseDAO

    @Override
    public boolean insertGlobalTransactionDO(GlobalTransactionDO globalTransactionDO) {
   
         //新增global_table信息的sql
        String sql = LogStoreSqlsFactory.getLogStoreSqls(dbType).getInsertGlobalTransactionSQL(globalTable);
        Connection conn = null;
        PreparedStatement ps = null;
        try {
   
            int index = 1;
            conn = logStoreDataSource.getConnection();
            conn.setAutoCommit(true);
            ps = conn.prepareStatement(sql);
            ps.setString(index++, globalTransactionDO.getXid());
            ps.setLong(index++, globalTransactionDO.getTransactionId());
            ps.setInt(index++, globalTransactionDO.getStatus());
            ps.setString(index++, globalTransactionDO.getApplicationId());
            ps.setString(index++, globalTransactionDO.getTransactionServiceGroup());
            String transactionName = globalTransactionDO.getTransactionName();
            transactionName = transactionName.length() > transactionNameColumnSize ?
                transactionName.substring(0, transactionNameColumnSize) :
                transactionName;
            ps.setString(index++, transactionName);
            ps.setInt(index++, globalTransactionDO.getTimeout());
            ps.setLong(index++, globalTransactionDO.getBeginTime());
            ps.setString(index++, globalTransactionDO.getApplicationData());
            return ps.executeUpdate() > 0;
        } catch (SQLException e) {
   
            throw new StoreException(e);
        } finally {
   
            IOUtil.close(ps, conn);
        }
    }

在这里插入图片描述
在这里插入图片描述
自此TM开启全局事务结束

总结

1.项目启动的时候会由SpringBoot自动装配机制把GlobalTransactionScanner装配进spring容器中去做增强并会注入一个全局事务的拦截器
2.当有标准了GlobalTransactional的方法被执行的时候会通过拦截器进行拦截进行全局事务的开启
3.开启全局事务的之前会先根据不同的事务隔离级别去做不同的处理
4.然后TM会通过Netty发送一个开启全局事务的请求到TC
5.TC接收到请求之后会根据不同的数据类型去做不同的处理 我这里用的是db 并且用的是mysql 他会在mysql的global_table中去插入一条全局事务的数据 并更新事务状态为Begin
6.返回XID给TM 作为全局事务的ID
请添加图片描述

相关推荐

  1. seata源码分析(04)_TM开启全局事务

    2023-12-21 17:30:04       5 阅读
  2. spring alibabaseata分布式事务

    2023-12-21 17:30:04       10 阅读

最近更新

  1. TCP协议是安全的吗?

    2023-12-21 17:30:04       19 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2023-12-21 17:30:04       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2023-12-21 17:30:04       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2023-12-21 17:30:04       20 阅读

热门阅读

  1. DevOps实践指南(目录)

    2023-12-21 17:30:04       44 阅读
  2. 微信小程序ts+less模版引入Rant Weapp

    2023-12-21 17:30:04       49 阅读
  3. C++中sort()排序函数使用方法

    2023-12-21 17:30:04       40 阅读
  4. C语言实现大数的加法

    2023-12-21 17:30:04       36 阅读
  5. obsidian + cloudreve 搭建个人云盘

    2023-12-21 17:30:04       46 阅读
  6. electron兼容统信UOS系统过程中的坑

    2023-12-21 17:30:04       39 阅读
  7. 亲测解决ERROR: Could not build wheels for cryptacular

    2023-12-21 17:30:04       37 阅读
  8. uniapp随记

    2023-12-21 17:30:04       51 阅读
  9. qt 实现 ftp 上传与下载

    2023-12-21 17:30:04       35 阅读
  10. facebook广告投放有哪些需要注意的

    2023-12-21 17:30:04       35 阅读
  11. 在 Docker 上部署 Nacos 并连接到 MySQL

    2023-12-21 17:30:04       30 阅读