rabbitmq 使用SAC队列实现顺序消息

rabbitmq 使用SAC队列实现顺序消息

前提

SAC: single active consumer, 是指如果有多个实例,只允许其中一个实例消费,其他实例为空闲

目的

实现消息顺序消费,操作:

  • 创建4个SAC队列,
  • 消息的路由key 取队列个数模,这里是4
  • 发送消息到每个队列,保证每个队列只有一个消费者!!

实现

定义消息 SeqMessage
@Data
@AllArgsConstructor
public class SeqMessage implements Serializable {

    //消息id
    private String requestNo;
    //消息中顺序,1,2,3,4
    private int order;
}
创建 队列 绑定
@Configuration
public class OrderQueueConfiguration {

    public static final String EXCHANGE = "order-ex";
    public static final String RK_PREFIX = "rk-";
    public static final String ONE_QUEUE = "one-queue";
    public static final String TWO_QUEUE = "two-queue";
    public static final String THREE_QUEUE = "three-queue";
    public static final String FOUR_QUEUE = "four-queue";

    @Bean
    public DirectExchange exchange() { // 使用直连的模式
        return new DirectExchange(EXCHANGE, true, false);
    }

    @Bean
    public Binding oneBinding() {
        return BindingBuilder.bind(oneQueue()).to(exchange()).with(RK_PREFIX + 1);
    }
    @Bean
    public Binding twoBinding() {
        return BindingBuilder.bind(twoQueue()).to(exchange()).with(RK_PREFIX + 2);
    }
    @Bean
    public Binding threeBinding() {
        return BindingBuilder.bind(threeQueue()).to(exchange()).with(RK_PREFIX + 3);
    }
    @Bean
    public Binding fourBinding() {
        return BindingBuilder.bind(fourQueue()).to(exchange()).with(RK_PREFIX + 4);
    }


    @Bean
    public Queue oneQueue() {
        return createSacQueue(ONE_QUEUE);
    }

    @Bean
    public Queue twoQueue() {
        return createSacQueue(TWO_QUEUE);
    }

    @Bean
    public Queue threeQueue() {
        return createSacQueue(THREE_QUEUE);
    }

    @Bean
    public Queue fourQueue() {
        return createSacQueue(FOUR_QUEUE);
    }

    private static Queue createSacQueue(String queueName) {
        Map<String, Object> arguments = new HashMap<>(2);
        arguments.put("x-single-active-consumer", true);
        return new Queue(queueName, true, false, false, arguments);
    }

}

重要的是 x-single-active-consumer 这个属性, 只有一个实例生效

在这里插入图片描述

创建 消费者

为每个队列创建一个监听消费者

@Slf4j
@Component
public class OrderListener {


    @RabbitListener(bindings = @QueueBinding(
                    exchange = @Exchange(value = EXCHANGE,declare = "false"),
                    value = @Queue(value = ONE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 1))
    public void onMessage1(Message message, @Headers Channel channel) {
        String messageStr = "";
        try {
            messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("{} recv: {}", ONE_QUEUE, messageStr);
        } catch (Exception e) {
            log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
        }
    }

    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = EXCHANGE,declare = "false"),
            value = @Queue(value = TWO_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 2))
    public void onMessage2(Message message, @Headers Channel channel) {
        String messageStr = "";
        try {
            messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("{} recv: {}", TWO_QUEUE, messageStr);
        } catch (Exception e) {
            log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
        }
    }
    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = EXCHANGE,declare = "false"),
            value = @Queue(value = THREE_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 3))
    public void onMessage3(Message message, @Headers Channel channel) {
        String messageStr = "";
        try {
            messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("{} recv: {}", THREE_QUEUE, messageStr);
        } catch (Exception e) {
            log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
        }
    }

    @RabbitListener(bindings = @QueueBinding(
            exchange = @Exchange(value = EXCHANGE,declare = "false"),
            value = @Queue(value = FOUR_QUEUE, durable = "true", declare = "false"), key = RK_PREFIX + 4))
    public void onMessage4(Message message, @Headers Channel channel) {
        String messageStr = "";
        try {
            messageStr = new String(message.getBody(), StandardCharsets.UTF_8);
            log.info("{} recv: {}", FOUR_QUEUE, messageStr);
        } catch (Exception e) {
            log.error("######### OrderListener.onMessage: {}-{}", messageStr, e);
        }
    }

}
生产者发送消息
@GetMapping("/send/seq/messqge")
 public String sendSeqMessage() throws JsonProcessingException {
     int cnt = 100;
     int mod = 4;
     int seqSize = 6;
     for (int i = 0; i < cnt; i++) {
         for (int j = 0; j < seqSize; j++) {
             int rk = i % mod + 1;
             SeqMessage seqMessage = new SeqMessage("seq-" + i, j);
             String s = objectMapper.writeValueAsString(seqMessage);
             log.info("routeKey: {}, send msg: {}", rk, s);
             rabbitTemplate.convertAndSend(EXCHANGE, RK_PREFIX + rk, s);
         }
     }
     return "success";
 }

运行结果:

two-queue recv: {"requestNo":"seq-1","order":0}
two-queue recv: {"requestNo":"seq-1","order":1}
two-queue recv: {"requestNo":"seq-1","order":2}
two-queue recv: {"requestNo":"seq-1","order":3}
two-queue recv: {"requestNo":"seq-1","order":4}
two-queue recv: {"requestNo":"seq-1","order":5}
two-queue recv: {"requestNo":"seq-5","order":0}
two-queue recv: {"requestNo":"seq-5","order":1}
two-queue recv: {"requestNo":"seq-5","order":2}
two-queue recv: {"requestNo":"seq-5","order":3}
two-queue recv: {"requestNo":"seq-5","order":4}
two-queue recv: {"requestNo":"seq-5","order":5}

three-queue recv: {"requestNo":"seq-2","order":0}
three-queue recv: {"requestNo":"seq-2","order":1}
three-queue recv: {"requestNo":"seq-2","order":2}
three-queue recv: {"requestNo":"seq-2","order":3}
three-queue recv: {"requestNo":"seq-2","order":4}
three-queue recv: {"requestNo":"seq-2","order":5}
three-queue recv: {"requestNo":"seq-6","order":0}
three-queue recv: {"requestNo":"seq-6","order":1}
three-queue recv: {"requestNo":"seq-6","order":2}
three-queue recv: {"requestNo":"seq-6","order":3}
three-queue recv: {"requestNo":"seq-6","order":4}
three-queue recv: {"requestNo":"seq-6","order":5}

可以发现,消息消费是顺序的

good luck!

相关推荐

  1. Springboot-RabbitMQ 消息队列使用

    2024-04-23 10:38:03       9 阅读
  2. 消息队列 RabbitMQ python实战

    2024-04-23 10:38:03       14 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-04-23 10:38:03       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-04-23 10:38:03       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-04-23 10:38:03       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-04-23 10:38:03       18 阅读

热门阅读

  1. RabbitMQ:消息队列的卓越之选

    2024-04-23 10:38:03       10 阅读
  2. kubernetes中的静态POD

    2024-04-23 10:38:03       21 阅读
  3. kitti2bag,py 报错

    2024-04-23 10:38:03       17 阅读
  4. P8739 [蓝桥杯 2020 国 C] 重复字符串

    2024-04-23 10:38:03       14 阅读
  5. hive通过正则过滤其他字段

    2024-04-23 10:38:03       17 阅读
  6. 数学分析复习:洛必达法则、泰勒公式

    2024-04-23 10:38:03       15 阅读
  7. AntD上传文件 结合Axios 服务端由Spring MVC接收

    2024-04-23 10:38:03       13 阅读
  8. Hive第二篇HQL

    2024-04-23 10:38:03       16 阅读
  9. Hive第一篇简介

    2024-04-23 10:38:03       12 阅读
  10. 7、docker 集群

    2024-04-23 10:38:03       15 阅读
  11. 数仓建模—维度建模之维度表

    2024-04-23 10:38:03       17 阅读