TM使用GlobalTransactionalInterceptorHandler拦截被@GlobalTransactional和@GlobalLock标注的全局事务方法,本文将详细分析这个类,以及对应的seata-server开启事务的逻辑。
事务传播级别
REQUIRED
- If transaction is existing, execute with current transaction, else execute with new transaction. The default propagation.REQUIRES_NEW
- If transaction is existing, suspend it, and then execute business with new transaction.NOT_SUPPORTED
- If transaction is existing, suspend it, and then execute business without transaction.SUPPORTS
- If transaction is not existing, execute without global transaction, else execute business with current transaction.NEVER
- If transaction is existing, throw exception, else execute business without transaction.MANDATORY
- If transaction is not existing, throw exception, else execute business with current transaction.
GlobalTransactionalInterceptorHandler
doInvoke
protected Object doInvoke(InvocationWrapper invocation) throws Throwable {
Class<?> targetClass = invocation.getTarget().getClass();
Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
if (specificMethod != null && !specificMethod.getDeclaringClass().equals(Object.class)) {
// 获取@GlobalTransactional注解
final GlobalTransactional globalTransactionalAnnotation =
getAnnotation(specificMethod, targetClass, GlobalTransactional.class);
// 获取@GlobalLock注解
final GlobalLock globalLockAnnotation =
getAnnotation(specificMethod, targetClass, GlobalLock.class);
boolean localDisable =
disable || (ATOMIC_DEGRADE_CHECK.get() && degradeNum >= degradeCheckAllowTimes);
if (!localDisable) {
// 此分支处理@GlobalTransactional注解
if (globalTransactionalAnnotation != null || this.aspectTransactional != null) {
AspectTransactional transactional;
if (globalTransactionalAnnotation != null) {
transactional = new AspectTransactional(
globalTransactionalAnnotation.timeoutMills(),
globalTransactionalAnnotation.name(),
globalTransactionalAnnotation.rollbackFor(),
globalTransactionalAnnotation.rollbackForClassName(),
globalTransactionalAnnotation.noRollbackFor(),
globalTransactionalAnnotation.noRollbackForClassName(),
globalTransactionalAnnotation.propagation(),
globalTransactionalAnnotation.lockRetryInterval(),
globalTransactionalAnnotation.lockRetryTimes(),
globalTransactionalAnnotation.lockStrategyMode());
} else {
transactional = this.aspectTransactional;
}
return handleGlobalTransaction(invocation, transactional);
} else if (globalLockAnnotation != null) {
// 此分支处理@GlobalLock注解
return handleGlobalLock(invocation, globalLockAnnotation);
}
}
}
// 如果不是全局事务方法则直接调用目标方法
return invocation.proceed();
}
处理@GlobalTransactional注解
Object handleGlobalTransaction(final InvocationWrapper methodInvocation,
final AspectTransactional aspectTransactional) throws Throwable {
boolean succeed = true;
try {
return transactionalTemplate.execute(new TransactionalExecutor() {
@Override
public Object execute() throws Throwable {
// 调用目标方法
return methodInvocation.proceed();
}
public String name() {
// 返回事务名 略
return formatMethod(methodInvocation.getMethod());
}
@Override
public TransactionInfo getTransactionInfo() {
TransactionInfo transactionInfo = new TransactionInfo();
// aspectTransactional属性赋值给transactionInfo对象
// ...
return transactionInfo;
}
});
} catch (TransactionalExecutor.ExecutionException e) {
GlobalTransaction globalTransaction = e.getTransaction();
// 出发failureHandler回调
// 之后抛出异常 略
} finally {
if (ATOMIC_DEGRADE_CHECK.get()) {
EVENT_BUS.post(new DegradeCheckEvent(succeed));
}
}
}
transactionalTemplate.execute
public Object execute(TransactionalExecutor business) throws Throwable {
TransactionInfo txInfo = business.getTransactionInfo();
if (txInfo == null) {
throw new ShouldNeverHappenException("transactionInfo does not exist");
}
// 获取xid并创建DefaultGlobalTransaction对象
// xid使用RootContext.getXID()获取
GlobalTransaction tx = GlobalTransactionContext.getCurrent();
// 获取传播级别 默认REQUIRED
Propagation propagation = txInfo.getPropagation();
SuspendedResourcesHolder suspendedResourcesHolder = null;
try {
switch (propagation) {
case NOT_SUPPORTED:
// 挂起当前事务
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend(false);
}
// 执行业务方法并执行业务方法
return business.execute();
case REQUIRES_NEW:
// 挂起当前事务并创建新事务
if (existingTransaction(tx)) {
suspendedResourcesHolder = tx.suspend(false);
}
tx = GlobalTransactionContext.createNew();
break;
case SUPPORTS:
// 如果当前没有事务则直接执行业务方法
if (notExistingTransaction(tx)) {
return business.execute();
}
break;
case REQUIRED:
// 创建或直接使用当前事务
tx = GlobalTransactionContext.getCurrentOrCreate();
break;
case NEVER:
// 如果当前存在事务则抛错
if (existingTransaction(tx)) {
throw new TransactionException(String.format(
"Existing transaction found for propagation 'never', xid = %s",
tx.getXid()));
} else {
// 执行业务方法
return business.execute();
}
case MANDATORY:
// 如果当前没有事务则抛错
if (notExistingTransaction(tx)) {
throw new TransactionException(
"No existing transaction found for propagation 'mandatory'");
}
break;
default:
throw new TransactionException("Not Supported Propagation:" + propagation);
}
GlobalLockConfig previousConfig = replaceGlobalLockConfig(txInfo);
try {
// 开启全局事务
beginTransaction(txInfo, tx);
Object rs;
try {
// 执行业务方法
rs = business.execute();
} catch (Throwable ex) {
// 回滚全局事务
completeTransactionAfterThrowing(txInfo, tx, ex);
throw ex;
}
// 提交全局事务
commitTransaction(tx, txInfo);
return rs;
} finally {
// 清理线程上的GlobalLockConfig
resumeGlobalLockConfig(previousConfig);
// 出发TransactionHook.after
triggerAfterCompletion(tx);
// 清理TransactionHook
cleanUp();
}
} finally {
// 恢复挂起的事务
if (suspendedResourcesHolder != null) {
tx.resume(suspendedResourcesHolder);
}
}
}
private void beginTransaction(TransactionInfo txInfo,
GlobalTransaction tx) throws TransactionalExecutor.ExecutionException {
// role分为两种:
// 1. Launcher - 发起者
// 2. Participant - 参与者
// 对应TM来说 是发起者
if (tx.getGlobalTransactionRole() != GlobalTransactionRole.Launcher) {
return;
}
try {
triggerBeforeBegin();
// 开启事务
tx.begin(txInfo.getTimeOut(), txInfo.getName());
triggerAfterBegin();
} catch (TransactionException txe) {
throw new TransactionalExecutor.ExecutionException(tx, txe,
TransactionalExecutor.Code.BeginFailure);
}
}
tx.begin
public void begin(int timeout, String name) throws TransactionException {
this.createTime = System.currentTimeMillis();
if (role != GlobalTransactionRole.Launcher) {
return;
}
String currentXid = RootContext.getXID();
if (currentXid != null) {
throw new IllegalStateException("Global transaction already exists," +
" can't begin a new global transaction, currentXid = " + currentXid);
}
// 使用transactionManager开启事务
xid = transactionManager.begin(null, null, name, timeout);
status = GlobalStatus.Begin;
// 绑定xid
RootContext.bind(xid);
}
transactionManager开启事务
默认使用DefaultTransactionManager实现类对象。
public String begin(String applicationId, String transactionServiceGroup,
String name, int timeout) throws TransactionException {
// 封装transactionName、timeout等
GlobalBeginRequest request = new GlobalBeginRequest();
request.setTransactionName(name);
request.setTimeout(timeout);
// 返回xid
GlobalBeginResponse response = (GlobalBeginResponse) syncCall(request);
if (response.getResultCode() == ResultCode.Failed) {
throw new TmTransactionException(TransactionExceptionCode.BeginFailed, response.getMsg());
}
return response.getXid();
}
private AbstractTransactionResponse syncCall(
AbstractTransactionRequest request) throws TransactionException {
try {
// 使用TM客户端发消息
return (AbstractTransactionResponse) TmNettyRemotingClient
.getInstance().sendSyncRequest(request);
} catch (TimeoutException toe) {
throw new TmTransactionException(TransactionExceptionCode.IO, "RPC timeout", toe);
}
}
绑定xid
public static void bind(String xid) {
if (StringUtils.isBlank(xid)) {
// 解绑xid
unbind();
} else {
// 把xid绑定到MDC上 key为X-TX-XID
MDC.put(MDC_KEY_XID, xid);
// 把xid绑定到ThreadLocal上 key为TX_XID
CONTEXT_HOLDER.put(KEY_XID, xid);
}
}
处理@GlobalLock注解
private Object handleGlobalLock(final InvocationWrapper methodInvocation,
final GlobalLock globalLockAnno) throws Throwable {
return globalLockTemplate.execute(new GlobalLockExecutor() {
@Override
public Object execute() throws Throwable {
return methodInvocation.proceed();
}
@Override
public GlobalLockConfig getGlobalLockConfig() {
GlobalLockConfig config = new GlobalLockConfig();
config.setLockRetryInterval(globalLockAnno.lockRetryInterval());
config.setLockRetryTimes(globalLockAnno.lockRetryTimes());
return config;
}
});
}
globalLockTemplate.execute
public Object execute(GlobalLockExecutor executor) throws Throwable {
boolean alreadyInGlobalLock = RootContext.requireGlobalLock();
if (!alreadyInGlobalLock) {
RootContext.bindGlobalLockFlag();
}
// 获取GlobalLockConfig并绑定到线程上
GlobalLockConfig myConfig = executor.getGlobalLockConfig();
GlobalLockConfig previousConfig = GlobalLockConfigHolder.setAndReturnPrevious(myConfig);
try {
// 执行业务方法
return executor.execute();
} finally {
if (!alreadyInGlobalLock) {
RootContext.unbindGlobalLockFlag();
}
if (previousConfig != null) {
GlobalLockConfigHolder.setAndReturnPrevious(previousConfig);
} else {
GlobalLockConfigHolder.remove();
}
}
}
seata-server开启全局事务
ServerOnRequestProcessor
seata-server使用ServerOnRequestProcessor处理进入的请求。
public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
if (ChannelManager.isRegistered(ctx.channel())) {
onRequestMessage(ctx, rpcMessage);
} else {
try {
ctx.disconnect();
ctx.close();
} catch (Exception exx) {}
}
}
private void onRequestMessage(ChannelHandlerContext ctx, RpcMessage rpcMessage) {
Object message = rpcMessage.getBody();
RpcContext rpcContext = ChannelManager.getContextFromIdentified(ctx.channel());
if (!(message instanceof AbstractMessage)) {
return;
}
// the batch send request message
if (message instanceof MergedWarpMessage) {
// 略
} else {
// 调用DefaultCoordinator的onRequest处理请求
final AbstractMessage msg = (AbstractMessage) message;
AbstractResultMessage result = transactionMessageHandler.onRequest(msg, rpcContext);
remotingServer.sendAsyncResponse(rpcMessage, ctx.channel(), result);
}
}
DefaultCoordinator的onRequest
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
AbstractTransactionRequestToTC transactionRequest = (AbstractTransactionRequestToTC) request;
transactionRequest.setTCInboundHandler(this);
// 此处是GlobalBeginRequest对象
return transactionRequest.handle(context);
}
GlobalBeginRequest的handle方法:
@Override
public AbstractTransactionResponse handle(RpcContext rpcContext) {
// handler是前面传递进来的DefaultCoordinator对象
return handler.handle(this, rpcContext);
}
handle GlobalBeginRequest逻辑:
@Override
public GlobalBeginResponse handle(GlobalBeginRequest request, final RpcContext rpcContext) {
GlobalBeginResponse response = new GlobalBeginResponse();
exceptionHandleTemplate(new AbstractCallback<GlobalBeginRequest, GlobalBeginResponse>() {
@Override
public void execute(GlobalBeginRequest request,
GlobalBeginResponse response) throws TransactionException {
try {
doGlobalBegin(request, response, rpcContext);
} catch (StoreException e) {
throw new TransactionException(TransactionExceptionCode.FailedStore,
String.format("begin global request failed. xid=%s, msg=%s",
response.getXid(), e.getMessage()), e);
}
}
}, request, response);
return response;
}
@Override
protected void doGlobalBegin(GlobalBeginRequest request, GlobalBeginResponse response,
RpcContext rpcContext) throws TransactionException {
// 使用DefaultCore的begin开启事务
response.setXid(core.begin(rpcContext.getApplicationId(), rpcContext.getTransactionServiceGroup(),
request.getTransactionName(), request.getTimeout()));
}
DefaultCore的begin开启事务:
@Override
public String begin(String applicationId, String transactionServiceGroup,
String name, int timeout) throws TransactionException {
GlobalSession session = GlobalSession
.createGlobalSession(applicationId, transactionServiceGroup, name, timeout);
MDC.put(RootContext.MDC_KEY_XID, session.getXid());
session.begin();
// transaction start event
MetricsPublisher.postSessionDoingEvent(session, false);
return session.getXid();
}