AbstractRoutingDataSource实现多数据源切换以及事务中无法切换问题

一、AbstractRoutingDataSource实现多数据源切换

        为了实现数据源的动态切换,我们采用了AbstractRoutingDataSource结合AOP+反射来自定义注解。通过这种机制,我们可以在运行时根据自定义注解来选择不同的数据源,从而实现灵活高效的数据访问策略。

        具体来说,我们首先创建了一个继承自AbstractRoutingDataSource的动态数据源类DynamicDataSource,该类能够管理多个数据源并根据线程上下文中的特定键值来选择使用哪一个数据源。接着,我们定义了一个自定义注解,用于标记需要切换数据源的方法。然后,利用AOP技术,我们在方法执行之前和之后添加了环绕通知,在这个通知中,我们通过反射获取到当前方法的自定义注解信息,并据此设置线程上下文中的数据源键值,从而引导数据源切换逻辑。

       这样,当一个被标注了自定义注解的方法被调用时,系统会自动根据注解指定的数据源名称切换到对应的数据源,完成对数据库的操作。这种方式不仅提高了代码的复用性和可维护性,而且通过解耦业务逻辑与数据源选择,增强了系统的可扩展性和灵活性。

1、自定义注解DataSource
import java.lang.annotation.*;

@Target({ElementType.METHOD})//注解方法
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface DataSource {
    String name() default "";
}
2、切面类DataSourceAspect
import com.zcloud.sunshine.common.constant.DatabaseType;
import com.zcloud.sunshine.config.DynamicDataSource;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;

@Aspect
@Component
@Slf4j
public class DataSourceAspect {

    @Pointcut("@annotation(com.zcloud.sunshine.annotation.DataSource)")
    public void dataSourcePointCut() {

    }

    @Around("dataSourcePointCut()")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        
       //获取被拦截方法的签名,该签名包含了方法的名称、参数类型等信息
        MethodSignature signature = (MethodSignature) point.getSignature();
        //从签名中获取了实际的方法对象
        Method method = signature.getMethod();
        //从方法上获取名为DataSource的注解。如果这个方法上有DataSource注解,那么dataSource将不为null,否则为null
        DataSource dataSource = method.getAnnotation(DataSource.class);
        if (dataSource == null) {
            log.info("默认数据源 dataSource1");
            DynamicDataSource.setDataSource(DatabaseType.dataSource1.toString());
        } else {
            log.info("切换数据源: " + dataSource.name());
            DynamicDataSource.setDataSource(dataSource.name());
        }

        try {
            return point.proceed();
        } finally {
        //最后一定要清除,这里使用的ThreadLocal来存储的数据源key,所以为了防止内存泄露一定要清除
        //而且该清除操作也是为了防止该切换操作对后续的切换操作造成影响
            DynamicDataSource.clearDataSource();
        }
    }
}

上面两步主要就是通过aop+反射完成自定义切换数据源注解的功能,实现在该注解修饰的方法上,设置当前方法的数据源。如果未设置数据源,则使用默认的数据源。

3、自定义动态数据源DynamicDataSource类
import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;

import javax.sql.DataSource;
import java.util.Map;

public class DynamicDataSource extends AbstractRoutingDataSource {
//使用ThreadLocal来存储当前线程的数据源名称,保证多线程情况下,各自的数据源互不影响
    private static final ThreadLocal<String> contextHolder = new ThreadLocal<>();

    public DynamicDataSource(DataSource defaultTargetDataSource, Map<Object, Object> targetDataSources) {
        //将注册的数据源以及设置的默认数据源设置到父类对应的成员变量中
        super.setDefaultTargetDataSource(defaultTargetDataSource);
        super.setTargetDataSources(targetDataSources);
        super.afterPropertiesSet();
    }

    //返回当前数据源
    @Override
    protected Object determineCurrentLookupKey() {
        return getDataSource();
    }

    public static void setDataSource(String dataSource) {
        contextHolder.set(dataSource);
    }

    public static String getDataSource() {
        return contextHolder.get();
    }

    public static void clearDataSource() {
        contextHolder.remove();
    }
}
4、配置多数据源DynamicDataSourceConfig 
import com.alibaba.druid.spring.boot.autoconfigure.DruidDataSourceBuilder;
import com.zcloud.sunshine.common.constant.DatabaseType;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.stereotype.Component;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

/**
 * @author zy
 * @desc 配置多数据源
 */
@Configuration
@Component
public class DynamicDataSourceConfig {

    @Bean(value = "dataSource1")
    @ConfigurationProperties(prefix = "spring.datasource.druid.dataSource1")
    public DataSource dataSource1() {  //此处的返回类型DataSource不是我们自定义的注解DataSource,而是java.sql包下的
        return DruidDataSourceBuilder.create().build();
    }

    @Bean(value = "dataSource2")
    @ConfigurationProperties(prefix = "spring.datasource.druid.dataSource2")
    public DataSource dataSource2() {
        return DruidDataSourceBuilder.create().build();
    }

    @Bean
    @Primary
    public DynamicDataSource dataSource(@Qualifier(value = "dataSource1") DataSource dataSource1,
                                        @Qualifier(value = "dataSource2") DataSource dataSource2
                                        ) {
        Map<Object, Object> targetDataSources = new HashMap<>();
        targetDataSources.put(DatabaseType.dataSource1.toString(), dataSource1);
        targetDataSources.put(DatabaseType.dataSource2.toString(), dataSource2);
        //注册所有的数据源,并且将数据源1设置为默认数据源
        return new DynamicDataSource(dataSource1, targetDataSources);
    }
}

5、多数据源枚举类
public enum DatabaseType {

    dataSource1,//数据库1
    dataSource2//数据库2
}
6、配置多数据源数据库连接信息
#数据源1
spring.datasource.druid.dataSource1.url=jdbc:mysql://127.0.0.1:3306/zcloud_user?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useTimezone=true&serverTimezone=GMT%2B8
spring.datasource.druid.dataSource1.username=root
spring.datasource.druid.dataSource1.password=zy521
spring.datasource.druid.dataSource1.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.druid.dataSource1.type=com.alibaba.druid.pool.DruidDataSource

#数据源2
spring.datasource.druid.dataSource2.url=jdbc:mysql://127.0.0.1:3306/zcloud_order?useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true&useTimezone=true&serverTimezone=GMT%2B8
spring.datasource.druid.dataSource2.username=root
spring.datasource.druid.dataSource2.password=zy521
spring.datasource.druid.dataSource2.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.druid.dataSource2.type=com.alibaba.druid.pool.DruidDataSource

3-6步骤中的3-4这两个步骤就是实现动态切换数据源的核心代码。我们在配置类DynamicDataSourceConfig中配置我们需要切换的数据源信息,并且设置默认的数据源。配置类中设置动态数据源的信息实际就是调用我们定义的动态数据源类DynamicDataSource的有参构造方法DynamicDataSource。

从该构造方法中,我们可以得知,我们设置的动态数据源信息也就是设置给了我们继承的父类AbstractRoutingDataSource中的成员变量,并且调用了父类的成员方法afterPropertiesSet()。afterPropertiesSet方法大家是不是感觉很熟悉?是的,看过springbean的生命周期的应该对该方法都不会陌生。afterPropertiesSet方法就是在一个类实现InitializingBean接口必须要重写的方法。 该方法是在bean属性设置完成后执行。在我们这个场景,其实就是在我们设置完AbstractRoutingDataSource中的成员变量后去执行。在这个方法里面我们可以看到,其实他就是对我们之前设置的成员变量,进行了一个处理,再赋值给对应的成员变量。

    public void afterPropertiesSet() {
        if (this.targetDataSources == null) {
            throw new IllegalArgumentException("Property 'targetDataSources' is required");
        } else {
            this.resolvedDataSources = CollectionUtils.newHashMap(this.targetDataSources.size());
            this.targetDataSources.forEach((key, value) -> {
                Object lookupKey = this.resolveSpecifiedLookupKey(key);
                DataSource dataSource = this.resolveSpecifiedDataSource(value);
                this.resolvedDataSources.put(lookupKey, dataSource);
            });
            if (this.defaultTargetDataSource != null) {
                this.resolvedDefaultDataSource = this.resolveSpecifiedDataSource(this.defaultTargetDataSource);
            }

        }
    }

 这些都是在项目启动的时候执行的,目的说白了就是将所有的动态数据源信息存储到一个map集合resolvedDataSources中,并设置默认的数据源resolvedDefaultDataSource。

而真正的设置数据源是使用我们自定义的动态数据源类DynamicDataSource的setDataSource()方法实现的。我们设置的数据源能够生效的原因就是,我们在继承AbstractRoutingDataSource类的时候,实现了该抽象方法determineCurrentLookupKey。该方法其实就是我们能够动态设置数据源的核心方法,说白了也就是AbstractRoutingDataSource给我们留的一个口子。

我们实现该方法的逻辑并不难,就是返回我们设置的数据源。 然后,我们看AbstractRoutingDataSource使用该方法的返回值做了什么。追溯到AbstractRoutingDataSource类,找到这个方法determineTargetDataSource中调用了我们实现的determineCurrentLookupKey方法(其实这里也就是使用了模板方法设计模式)。

protected DataSource determineTargetDataSource() {
        Assert.notNull(this.resolvedDataSources, "DataSource router not initialized");
        //调用我们设置的数据源名称
        Object lookupKey = this.determineCurrentLookupKey();
        //从我们在项目启动的时候注册到map集合resolvedDataSources中的数据源信息中get
        //该名称的数据源
        DataSource dataSource = (DataSource)this.resolvedDataSources.get(lookupKey);
        //没有的话就使用默认数据源
        if (dataSource == null && (this.lenientFallback || lookupKey == null)) {
            dataSource = this.resolvedDefaultDataSource;
        }
        //为空抛出异常
        if (dataSource == null) {
            throw new IllegalStateException("Cannot determine target DataSource for lookup key [" + lookupKey + "]");
        } else { 
            return dataSource;
        }
    }

再追溯determineTargetDataSource()方法,可以看到我们再使用getConnection()方法获取数据库连接的时候,就是调用了该方法determineTargetDataSource来获取数据源,然后根据数据源来获取数据库连接的。

至此,我们就应该明白为什么我们能够通过 AbstractRoutingDataSource来实现动态切换数据源了。

二、事务问题

1、问题描述

       在使用spring编程式事务的时候,切换数据源dataSource2会出现无法切换的情况。导致在事务中进行数据库操作的时候使用的默认数据源dataSource1,这样就会导致sql报错。因为默认数据源中并没有sql语句中进行操作的表。

事务代码:

 //开启事务
TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);
            try {
                if (!CollectionUtils.isEmpty(list)) {
                    //插入运行记录
                    traceRouteResultMapper.addDialRunRecord2(dialRunRecord);
                    //插入执行结果
                    traceRouteResultMapper.insertBatch(list);
                    //插入告警
                    if(!CollectionUtils.isEmpty(alarmListAdd)){
                        dialTestAlarmMapper.insertBatch(alarmListAdd);
                    }
                    //更新告警最新触发时间
                    if(!CollectionUtils.isEmpty(alarmUpdateTime)){
                        dialTestAlarmMapper.updateLastAlarmTime(alarmUpdateTime);
                    }
                }
                //提交事务
                transactionManager.commit(transaction);
            } catch (Exception e) {
                //回滚
                transactionManager.rollback(transaction);
                log.error("TraceRouteServiceImpl.getIndexInfo traceroute失败!", e);
            }

 map层代码:

@Mapper
public interface DialTestAlarmMapper {

    @DataSource(name = "dataSource2")
    List<DialTestAlarmPartInfoDto> queryAllTraceRouteAlarm();

    @DataSource(name = "dataSource2")
    void insertBatch(@Param("dtoList") List<DialTestAlarmDto> alarmListAdd);

    @DataSource(name = "dataSource2")
    void updateLastAlarmTime(@Param("dtoList")  List<String> alarmUpdateTimeIds);
}
2、问题原因

     为什么在事务内无法切换数据源呢?想要弄清这个原因,我们就要看看开启事务的时候,做了什么。追溯开始事务的源码,可以发现一个关键方法doBegin()。

protected void doBegin(Object transaction, TransactionDefinition definition) {
        DataSourceTransactionManager.DataSourceTransactionObject txObject = (DataSourceTransactionManager.DataSourceTransactionObject)transaction;
        Connection con = null;

        try {
            if (!txObject.hasConnectionHolder() || txObject.getConnectionHolder().isSynchronizedWithTransaction()) {
                Connection newCon = this.obtainDataSource().getConnection();
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction");
                }

                txObject.setConnectionHolder(new ConnectionHolder(newCon), true);
            }
            //获取数据库连接
            txObject.getConnectionHolder().setSynchronizedWithTransaction(true);
            con = txObject.getConnectionHolder().getConnection();
            Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition);
            txObject.setPreviousIsolationLevel(previousIsolationLevel);
            txObject.setReadOnly(definition.isReadOnly());
            if (con.getAutoCommit()) {
                txObject.setMustRestoreAutoCommit(true);
                if (this.logger.isDebugEnabled()) {
                    this.logger.debug("Switching JDBC Connection [" + con + "] to manual commit");
                }

                con.setAutoCommit(false);
            }

            this.prepareTransactionalConnection(con, definition);
            txObject.getConnectionHolder().setTransactionActive(true);
            int timeout = this.determineTimeout(definition);
            if (timeout != -1) {
                txObject.getConnectionHolder().setTimeoutInSeconds(timeout);
            }
             // 把当前的数据源的Connection与线程进行绑定
            if (txObject.isNewConnectionHolder()) {
                TransactionSynchronizationManager.bindResource(this.obtainDataSource(), txObject.getConnectionHolder());
            }

        } catch (Throwable var7) {
            if (txObject.isNewConnectionHolder()) {
                DataSourceUtils.releaseConnection(con, this.obtainDataSource());
                txObject.setConnectionHolder((ConnectionHolder)null, false);
            }

            throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", var7);
        }
    }

从该方法中,我们可以看到,在我们开启事务的时候,就完成了数据库连接connection和线程的绑定。此时,由于还未进行数据源的切换,所绑定的连接自然来自于默认的数据源。这意味着,在随后的事务处理过程中,只要事务尚未提交,所有针对数据库的操作都将通过这个已经建立好的连接来进行。因此,如果尝试在事务进行中切换数据源,则不会生效。

3、解决办法

      既然我们已经知道了问题的根本原因,解决办法也就变得相对直接明了。我们可以在开启事务之前,先手动的将数据源切换到dataSource2。这样,在随后打开事务时,将会使用来dataSource2的连接与当前线程进行绑定。通过这种预先设置的方式,我们确保了事务中使用的连接来源于我们指定的数据源,从而解决了在事务中切换数据源的问题。

            //手动设置数据源为dataSource2
            DynamicDataSource.setDataSource(DatabaseType.dataSource2.toString());
            TransactionStatus transaction = transactionManager.getTransaction(transactionDefinition);
            try {
                if (!CollectionUtils.isEmpty(list)) {
                    //插入运行记录
                    traceRouteResultMapper.addDialRunRecord2(dialRunRecord);
                    //插入执行结果
                    traceRouteResultMapper.insertBatch(list);
                    //插入告警
                    if(!CollectionUtils.isEmpty(alarmListAdd)){
                        dialTestAlarmMapper.insertBatch(alarmListAdd);
                    }
                    //更新告警最新触发时间
                    if(!CollectionUtils.isEmpty(alarmUpdateTime)){
                        dialTestAlarmMapper.updateLastAlarmTime(alarmUpdateTime);
                    }
                }
                //提交事务
                transactionManager.commit(transaction);
            } catch (Exception e) {
                //回滚
                transactionManager.rollback(transaction);
                log.error("TraceRouteServiceImpl.getIndexInfo traceroute失败!", e);
            }finally {
                //最后清除设置的数据源
                DynamicDataSource.clearDataSource();
            }

最近更新

  1. TCP协议是安全的吗?

    2024-04-26 14:18:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-04-26 14:18:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-04-26 14:18:03       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-04-26 14:18:03       20 阅读

热门阅读

  1. 市政行业乙级资质改革对公共交通工程的影响

    2024-04-26 14:18:03       13 阅读
  2. 商业认证项目表

    2024-04-26 14:18:03       13 阅读
  3. Leetcode 5.最长回文子串

    2024-04-26 14:18:03       17 阅读
  4. 自动驾驶---OpenSpace之Hybrid A*规划算法

    2024-04-26 14:18:03       15 阅读
  5. word 第十四课

    2024-04-26 14:18:03       14 阅读
  6. IOS恢复

    IOS恢复

    2024-04-26 14:18:03      12 阅读