seata源码分析(04)_TM开启全局事务

TM使用GlobalTransactionalInterceptorHandler拦截被@GlobalTransactional和@GlobalLock标注的全局事务方法,本文将详细分析这个类,以及对应的seata-server开启事务的逻辑。

事务传播级别

  • REQUIRED - If transaction is existing, execute with current transaction, else execute with new transaction. The default propagation.
  • REQUIRES_NEW - If transaction is existing, suspend it, and then execute business with new transaction.
  • NOT_SUPPORTED - If transaction is existing, suspend it, and then execute business without transaction.
  • SUPPORTS - If transaction is not existing, execute without global transaction, else execute business with current transaction.
  • NEVER - If transaction is existing, throw exception, else execute business without transaction.
  • MANDATORY - If transaction is not existing, throw exception, else execute business with current transaction.

GlobalTransactionalInterceptorHandler

doInvoke

protected Object doInvoke(InvocationWrapper invocation) throws Throwable {
    Class<?> targetClass = invocation.getTarget().getClass();
    Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
    if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
        // 获取@GlobalTransactional注解
        final GlobalTransactional globalTransactionalAnnotation =
            getAnnotation(specificMethod, targetClass, GlobalTransactional.class);
        // 获取@GlobalLock注解
        final GlobalLock globalLockAnnotation =
            getAnnotation(specificMethod, targetClass, GlobalLock.class);
        boolean localDisable =
            disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes);
        if (!localDisable) {
            // 此分支处理@GlobalTransactional注解
            if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
                AspectTransactional transactional;
                if (globalTransactionalAnnotation != null) {
                    transactional = new AspectTransactional(
                            globalTransactionalAnnotation.timeoutMills(),
                            globalTransactionalAnnotation.name(),
                            globalTransactionalAnnotation.rollbackFor(),
                            globalTransactionalAnnotation.rollbackForClassName(),
                            globalTransactionalAnnotation.noRollbackFor(),
                            globalTransactionalAnnotation.noRollbackForClassName(),
                            globalTransactionalAnnotation.propagation(),
                            globalTransactionalAnnotation.lockRetryInterval(),
                            globalTransactionalAnnotation.lockRetryTimes(),
                            globalTransactionalAnnotation.lockStrategyMode());
                } else {
                    transactional = this.aspectTransactional;
                }
                return handleGlobalTransaction(invocation, transactional);
            } else if (globalLockAnnotation != null) {
                // 此分支处理@GlobalLock注解
                return handleGlobalLock(invocation, globalLockAnnotation);
            }
        }
    }
    // 如果不是全局事务方法则直接调用目标方法
    return invocation.proceed();
}

处理@GlobalTransactional注解

Object handleGlobalTransaction(final InvocationWrapper methodInvocation,
                               final AspectTransactional aspectTransactional) throws Throwable {
    boolean succeed = true;
    try {
        return transactionalTemplate.execute(new TransactionalExecutor() {
            @Override
            public Object execute() throws Throwable {
                // 调用目标方法
                return methodInvocation.proceed();
            }

            public String name() {
                // 返回事务名 略
                return formatMethod(methodInvocation.getMethod());
            }

            @Override
            public TransactionInfo getTransactionInfo() {
                TransactionInfo transactionInfo = new TransactionInfo();
                // aspectTransactional属性赋值给transactionInfo对象

                // ...

                return transactionInfo;
            }
        });
    } catch (TransactionalExecutor.ExecutionException e) {
        GlobalTransaction globalTransaction = e.getTransaction();

        // 出发failureHandler回调
        // 之后抛出异常 略

    } finally {
        if (ATOMIC_DEGRADE_CHECK.get()) {
            EVENT_BUS.post(new DegradeCheckEvent(succeed));
        }
    }
}

transactionalTemplate.execute

public Object execute(TransactionalExecutor business) throws Throwable {
    TransactionInfo txInfo = business.getTransactionInfo();
    if (txInfo == null) {
        throw new ShouldNeverHappenException("transactionInfo does not exist");
    }
    // 获取xid并创建DefaultGlobalTransaction对象
    // xid使用RootContext.getXID()获取
    GlobalTransaction tx = GlobalTransactionContext.getCurrent();

    // 获取传播级别 默认REQUIRED
    Propagation propagation = txInfo.getPropagation();
    SuspendedResourcesHolder suspendedResourcesHolder = null;
    try {
        switch (propagation) {
            case NOT_SUPPORTED:
                // 挂起当前事务
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend(false);
                }
                // 执行业务方法并执行业务方法
                return business.execute();
            case REQUIRES_NEW:
                // 挂起当前事务并创建新事务
                if (existingTransaction(tx)) {
                    suspendedResourcesHolder = tx.suspend(false);
                }
                tx = GlobalTransactionContext.createNew();
                break;
            case SUPPORTS:
                // 如果当前没有事务则直接执行业务方法
                if (notExistingTransaction(tx)) {
                    return business.execute();
                }
                break;
            case REQUIRED:
                // 创建或直接使用当前事务
                tx = GlobalTransactionContext.getCurrentOrCreate();
                break;
            case NEVER:
                // 如果当前存在事务则抛错
                if (existingTransaction(tx)) {
                    throw new TransactionException(String.format(
                        "Existing transaction found for propagation 'never', xid = %s",
                        tx.getXid()));
                } else {
                    // 执行业务方法
                    return business.execute();
                }
            case MANDATORY:
                // 如果当前没有事务则抛错
                if (notExistingTransaction(tx)) {
                    throw new TransactionException(
                        "No existing transaction found for propagation 'mandatory'");
                }
                break;
            default:
                throw new TransactionException("Not Supported Propagation:" + propagation);
        }

        GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);

        try {
            // 开启全局事务
            beginTransaction(txInfo, tx);

            Object rs;
            try {
                // 执行业务方法
                rs = business.execute();
            } catch (Throwable ex) {
                // 回滚全局事务
                completeTransactionAfterThrowing(txInfo, tx, ex);
                throw ex;
            }

            // 提交全局事务
            commitTransaction(tx, txInfo);

            return rs;
        } finally {
            // 清理线程上的GlobalLockConfig
            resumeGlobalLockConfig(previousConfig);
            // 出发TransactionHook.after
            triggerAfterCompletion(tx);
            // 清理TransactionHook
            cleanUp();
        }
    } finally {
        // 恢复挂起的事务
        if (suspendedResourcesHolder != null) {
            tx.resume(suspendedResourcesHolder);
        }
    }
}

private void beginTransaction(TransactionInfo txInfo,
                  GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
    // role分为两种:
    // 1. Launcher - 发起者
    // 2. Participant - 参与者
    // 对应TM来说 是发起者
    if (tx.getGlobalTransactionRole() != GlobalTransactionRole.Launcher) {
        return;
    }
    try {
        triggerBeforeBegin();
        // 开启事务
        tx.begin(txInfo.getTimeOut(), txInfo.getName());
        triggerAfterBegin();
    } catch (TransactionException txe) {
        throw new TransactionalExecutor.ExecutionException(tx, txe,
                TransactionalExecutor.Code.BeginFailure);
    }
}

tx.begin

public void begin(int timeout, String name) throws TransactionException {
    this.createTime = System.currentTimeMillis();
    if (role != GlobalTransactionRole.Launcher) {
        return;
    }
    String currentXid = RootContext.getXID();
    if (currentXid != null) {
        throw new IllegalStateException("Global transaction already exists," +
            " can't begin a new global transaction, currentXid = " + currentXid);
    }
    // 使用transactionManager开启事务
    xid = transactionManager.begin(null, null, name, timeout);
    status = GlobalStatus.Begin;
    // 绑定xid
    RootContext.bind(xid);
}

transactionManager开启事务

默认使用DefaultTransactionManager实现类对象。

public String begin(String applicationId, String transactionServiceGroup,
                    String name, int timeout) throws TransactionException {
    // 封装transactionName、timeout等
    GlobalBeginRequest request = new GlobalBeginRequest();
    request.setTransactionName(name);
    request.setTimeout(timeout);
    // 返回xid
    GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
    if (response.getResultCode() == ResultCode.Failed) {
        throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
    }
    return response.getXid();
}

private AbstractTransactionResponse syncCall(
        AbstractTransactionRequest request) throws TransactionException {
    try {
        // 使用TM客户端发消息
        return (AbstractTransactionResponse) TmNettyRemotingClient
            .getInstance().sendSyncRequest(request);
    } catch (TimeoutException toe) {
        throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
    }
}

绑定xid

public static void bind(String xid) {
    if (StringUtils.isBlank(xid)) {
        // 解绑xid
        unbind();
    } else {
        // 把xid绑定到MDC上 key为X-TX-XID
        MDC.put(MDC_KEY_XID, xid);
        // 把xid绑定到ThreadLocal上 key为TX_XID
        CONTEXT_HOLDER.put(KEY_XID, xid);
    }
}

处理@GlobalLock注解

private Object handleGlobalLock(final InvocationWrapper methodInvocation,
                                final GlobalLock globalLockAnno) throws Throwable {
    return globalLockTemplate.execute(new GlobalLockExecutor() {
        @Override
        public Object execute() throws Throwable {
            return methodInvocation.proceed();
        }

        @Override
        public GlobalLockConfig getGlobalLockConfig() {
            GlobalLockConfig config = new GlobalLockConfig();
            config.setLockRetryInterval(globalLockAnno.lockRetryInterval());
            config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
            return config;
        }
    });
}

globalLockTemplate.execute

public Object execute(GlobalLockExecutor executor) throws Throwable {
    boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
    if (!alreadyInGlobalLock) {
        RootContext.bindGlobalLockFlag();
    }

    // 获取GlobalLockConfig并绑定到线程上
    GlobalLockConfig myConfig = executor.getGlobalLockConfig();
    GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);

    try {
        // 执行业务方法
        return executor.execute();
    } finally {
        if (!alreadyInGlobalLock) {
            RootContext.unbindGlobalLockFlag();
        }
        if (previousConfig != null) {
            GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
        } else {
            GlobalLockConfigHolder.remove();
        }
    }
}

seata-server开启全局事务

ServerOnRequestProcessor

seata-server使用ServerOnRequestProcessor处理进入的请求。

public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
    if (ChannelManager.isRegistered(ctx.channel())) {
        onRequestMessage(ctx, rpcMessage);
    } else {
        try {
            ctx.disconnect();
            ctx.close();
        } catch (Exception exx) {}
    }
}

private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
    Object message = rpcMessage.getBody();
    RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
    if (!(message instanceof AbstractMessage)) {
        return;
    }
    // the batch send request message
    if (message instanceof MergedWarpMessage) {
        // 略
    } else {
        // 调用DefaultCoordinator的onRequest处理请求
        final AbstractMessage msg = (AbstractMessage) message;
        AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
        remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
    }
}

DefaultCoordinator的onRequest

public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
    AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
    transactionRequest.setTCInboundHandler(this);
    // 此处是GlobalBeginRequest对象
    return transactionRequest.handle(context);
}

GlobalBeginRequest的handle方法:

@Override
public AbstractTransactionResponse handle(RpcContext rpcContext) {
    // handler是前面传递进来的DefaultCoordinator对象
    return handler.handle(this, rpcContext);
}

handle GlobalBeginRequest逻辑:

@Override
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
    GlobalBeginResponse response = new GlobalBeginResponse();
    exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
        @Override
        public void execute(GlobalBeginRequest request,
                            GlobalBeginResponse response) throws TransactionException {
            try {
                doGlobalBegin(request, response, rpcContext);
            } catch (StoreException e) {
                throw new TransactionException(TransactionExceptionCode.FailedStore,
                    String.format("begin global request failed. xid=%s, msg=%s",
                                  response.getXid(), e.getMessage()), e);
            }
        }
    }, request, response);
    return response;
}

@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response,
                             RpcContext rpcContext) throws TransactionException {
    // 使用DefaultCore的begin开启事务
    response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
            request.getTransactionName(), request.getTimeout()));
}

DefaultCore的begin开启事务:

@Override
public String begin(String applicationId, String transactionServiceGroup,
                    String name, int timeout) throws TransactionException {
    GlobalSession session = GlobalSession
        .createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
    MDC.put(RootContext.MDC_KEY_XID, session.getXid());

    session.begin();

    // transaction start event
    MetricsPublisher.postSessionDoingEvent(session, false);

    return session.getXid();
}

相关推荐

  1. seata分析(04)_TM开启全局事务

    2024-06-16 06:34:01       24 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-16 06:34:01       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-16 06:34:01       100 阅读
  3. 在Django里面运行非项目文件

    2024-06-16 06:34:01       82 阅读
  4. Python语言-面向对象

    2024-06-16 06:34:01       91 阅读

热门阅读

  1. 738. 单调递增的数字

    2024-06-16 06:34:01       33 阅读
  2. QStateMachine 笔记

    2024-06-16 06:34:01       21 阅读
  3. 笔记-python 中BeautifulSoup入门

    2024-06-16 06:34:01       35 阅读
  4. Selenium 定位编辑框有span

    2024-06-16 06:34:01       30 阅读
  5. 高考毕业季--浅谈自己感想

    2024-06-16 06:34:01       23 阅读
  6. spring boot 多个项目整合,打包成可依赖的包

    2024-06-16 06:34:01       28 阅读
  7. QML Controls模块-标准对话框用法说明

    2024-06-16 06:34:01       24 阅读