RabbitMq多数据源配置

第一步:配置连接参数

spring:
  rabbitmq:
    master:
      addresses: xxx.xxx.xx.xx:5672
      username: xxxxx
      password: xxxxxx
      virtual-host: /
    slave:
      addresses: xxx.xxx.xx.xx:5672
      username: xxxxx
      password: xxxxxx
      virtual-host: /test
@Component
public class MyRabbitProperties {

    /**
     * @primary注解必须加,不然rabbitmq内部自动处理时会发现多个配置源不知道用哪个导致报错
     */
    @Bean
    @Primary
    @ConfigurationProperties(prefix = "spring.rabbitmq.master")
    public RabbitProperties master() {
        return new RabbitProperties();
    }

    @Bean
    @ConfigurationProperties(prefix = "spring.rabbitmq.slave")
    public RabbitProperties slave() {
        return new RabbitProperties();
    }
}

第二步:编写多数据源配置类

多数据源配置类主要分为四小部分:

1.创建连接工厂。注意:主数据源必须加@Primary注解
2.创建rabbitMq可移植管理器(RabbitAdmin)。
3.创建rabbitMq模板。(不写生产者可省略)
4.创建rabbitMq监听容器。(不写消费者可省略)

注:代码示例将四部分的多个数据源内容都写在了一个配置类中,如果嫌乱可以创建多个配置类,每个配置类写一个数据源。

@Configuration
public class RabbitMqDataSourceConfig {

    @Autowired
    private RabbitProperties master;

    @Autowired
    private RabbitProperties slave;

	// 定义bean名称,防止后面多个地方注入的时候混乱
    public static final String MASTER_FACTORY="masterConnectionFactory";
    public static final String SLAVE_FACTORY="slaveConnectionFactory";
    public static final String MASTER_ADMIN="masterRabbitMqAdmin";
    public static final String SLAVE_ADMIN="salveRabbitMqAdmin";
    public static final String MASTER_TEMPLATE="masterRabbitTemplate";
    public static final String SLAVE_TEMPLATE="slaveRabbitTemplate";
    public static final String MASTER_LISTENER="masterListenerContainerFactory";
    public static final String SLAVE_LISTENER="slaveListenerContainerFactory";

	// 1.创建连接工厂。
    @Bean(MASTER_FACTORY)
    @Primary
    public ConnectionFactory masterConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(master.getAddresses());
        connectionFactory.setUsername(master.getUsername());
        connectionFactory.setPassword(master.getPassword());
        connectionFactory.setVirtualHost(master.getVirtualHost());
        return connectionFactory;
    }

    @Bean(SLAVE_FACTORY)
    public ConnectionFactory salveConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(slave.getAddresses());
        connectionFactory.setUsername(slave.getUsername());
        connectionFactory.setPassword(slave.getPassword());
        connectionFactory.setVirtualHost(slave.getVirtualHost());
        return connectionFactory;
    }

	// 2.创建rabbitMq可移植管理器(RabbitAdmin)。
    @Bean(MASTER_ADMIN)
    public RabbitAdmin masterRabbitMqAdmin(@Qualifier(MASTER_FACTORY) ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 如果只有消费者可不设置,且如果每个数据源都要生成相同的交换机和队列也可以不设置
        rabbitAdmin.setAutoStartup(false);
        return rabbitAdmin;
    }

    @Bean(SLAVE_ADMIN)
    public RabbitAdmin salveRabbitMqAdmin(@Qualifier(SLAVE_FACTORY) ConnectionFactory connectionFactory){
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        // 如果只有消费者可不设置,且如果每个数据源都要生成相同的交换机和队列也可以不设置
        rabbitAdmin.setAutoStartup(false);
        return rabbitAdmin;
    }

	// 3.创建rabbitMq模板。(不写生产者可省略)
    @Bean(MASTER_TEMPLATE)
    public RabbitMessagingTemplate masterRabbitTemplate(@Qualifier(MASTER_ADMIN) RabbitAdmin masterRabbitMqAdmin) {
        RabbitTemplate rabbitTemplate = masterRabbitMqAdmin.getRabbitTemplate();
        return new RabbitMessagingTemplate(rabbitTemplate);
    }

    @Bean(SLAVE_TEMPLATE)
    public RabbitMessagingTemplate slaveRabbitTemplate(@Qualifier(SLAVE_ADMIN) RabbitAdmin salveRabbitMqAdmin) {
        RabbitTemplate rabbitTemplate = salveRabbitMqAdmin.getRabbitTemplate();
        return new RabbitMessagingTemplate(rabbitTemplate);
    }

	// 4.创建rabbitMq监听容器。(不写消费者可省略)
    @Bean(MASTER_LISTENER)
    public SimpleRabbitListenerContainerFactory masterListenerContainerFactory(
            @Qualifier(MASTER_FACTORY) ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(5);
        factory.setPrefetchCount(1);
        factory.setAutoStartup(true);
        return factory;
    }

    @Bean(SLAVE_LISTENER)
    public SimpleRabbitListenerContainerFactory slaveListenerContainerFactory(
            @Qualifier(SLAVE_FACTORY) ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setConcurrentConsumers(1);
        factory.setMaxConcurrentConsumers(5);
        factory.setPrefetchCount(1);
        factory.setAutoStartup(true);
        return factory;
    }
}

第三步:编写初始化配置(无生产者可省略)

单数据源在配置类中声明交换机、队列并进行绑定。
多数据源需要编写初始化操作,给数据源单独创建交换机、队列并进行绑定。

在第二步创建可移植管理器时可以看到设置了rabbitAdmin.setAutoStartup(false); 该属性默认是true,单数据源提供一个rabbitmqConfig配置交换机队列时,rabbitmq会自动进行交换机,队列绑定等操作,但是多数据源时如果还按照自动创建只提供一个配置文件,则会给每个数据源都创建相同的交换机和队列,如果需求是这样那有没问题。
如果每个数据源的交换机、队列要求是不同的,则需要禁止自动初始化,手动初始化每个管理器的交换机和队列,防止重复创建。
以生产者视角,消费者视角无需关心

@Component
public class RabbitMqInitializer {

    @Autowired
    @Qualifier(RabbitMqDataSourceConfig.MASTER_ADMIN)
    private RabbitAdmin masterRabbitAdmin;

    @Autowired
    @Qualifier(RabbitMqDataSourceConfig.SLAVE_ADMIN)
    private RabbitAdmin slaveRabbitAdmin;

    @PostConstruct
    public void initializeRabbitMq() {
        // 在这里调用声明交换机和队列的方法
        // 简单举例,没做封装,交换机队列多的话这样会很乱,自行代码优化
        // declare方法为新添加一个,多个交换机或队列照着写即可
        masterRabbitAdmin.declareExchange(new DirectExchange("masterExchange"));
        masterRabbitAdmin.declareQueue(new Queue("masterQueue", true, false, false));
        masterRabbitAdmin.declareBinding(new Binding("masterQueue", Binding.DestinationType.QUEUE, "masterExchange", "masterRoutingKey", null));

        slaveRabbitAdmin.declareExchange(new DirectExchange("slaveExchange"));
        slaveRabbitAdmin.declareQueue(new Queue("slaveQueue", true, false, false));
        slaveRabbitAdmin.declareBinding(new Binding("slaveQueue", Binding.DestinationType.QUEUE, "slaveExchange", "slaveRoutingKey", null));
    }
}

第四步:编写消费者监听(无消费者可省略)

通过containerFactory来指定数据源,containerFactory对应第二步最后创建的监听容器。我这里多数据源都分开创建了不同的监听类。

@Component
public class RabbitMqMasterListenter {
    @RabbitListener(containerFactory = RabbitMqDataSourceConfig.MASTER_LISTENER,queues = {"masterQueue"})
    public void process(String message, Channel channel){
        System.out.println(channel.toString()+"消费到数据:" + message);
    }
}
@Component
public class RabbitMqSlaveListenter {
    @RabbitListener(containerFactory = RabbitMqDataSourceConfig.SLAVE_LISTENER,queues = {"slaveQueue"})
    public void process(String message, Channel channel){
        System.out.println(channel.toString()+"消费到数据:" + message);
    }
}

相关推荐

  1. RabbitMq数据配置

    2024-03-16 11:24:01       19 阅读
  2. Springboot实现配置数据

    2024-03-16 11:24:01       39 阅读
  3. 数据配置H2 Mysql

    2024-03-16 11:24:01       34 阅读
  4. Redis - 集群数据配置

    2024-03-16 11:24:01       29 阅读
  5. Springboot JPA实现数据配置

    2024-03-16 11:24:01       32 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-16 11:24:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-16 11:24:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-16 11:24:01       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-16 11:24:01       20 阅读

热门阅读

  1. 开发K8S Operator

    2024-03-16 11:24:01       18 阅读
  2. LeetCode 174.地下城游戏 Python题解

    2024-03-16 11:24:01       20 阅读
  3. 探索机器学习:智能时代的魔法

    2024-03-16 11:24:01       23 阅读
  4. Github 2024-03-13 C开源项目日报 Top10

    2024-03-16 11:24:01       17 阅读
  5. Python 算法交易实验68 回测对象重构

    2024-03-16 11:24:01       24 阅读
  6. HTML前置基础

    2024-03-16 11:24:01       19 阅读
  7. 00342第一章 概述 思考题和练习题(C语言)

    2024-03-16 11:24:01       18 阅读
  8. ES清理索引镜像

    2024-03-16 11:24:01       18 阅读