Spring AMQP(3.1.1)设置ConfirmCallback和ReturnsCallback

环境如下

Version
SpringBoot 3.2.1
spring-amqp 3.1.1
RabbitMq 3-management

一、起因

老版本的spring-amqpCorrelationData上设置ConfirmCallback。但是今天却突然发现correlationData.getFuture()没有addCallback函数了。

查询文档和帖子后,发现ConfirmCallbackReturnsCallback都需要在RabbitTemplate中设置,同时ConfirmCallback中默认无法得到消息内容,如果想在ConfirmCallback中把消息内容存到数据库等地方进行记录,怎么办呢?

参考手册

二、代码

1. 定义exchange和queue

@Slf4j
@Configuration
public class PayNotifyConfig{
   

    //交换机
    public static final String PAYNOTIFY_EXCHANGE_FANOUT = "paynotify_exchange_fanout";
    //支付通知队列
    public static final String PAYNOTIFY_QUEUE = "paynotify_queue";
    //支付结果通知消息类型
    public static final String MESSAGE_TYPE = "payresult_notify";


    //声明交换机,且持久化
    @Bean(PAYNOTIFY_EXCHANGE_FANOUT)
    public FanoutExchange paynotify_exchange_fanout() {
   
        // 三个参数:交换机名称、是否持久化、当没有queue与其绑定时是否自动删除
        return new FanoutExchange(PAYNOTIFY_EXCHANGE_FANOUT, true, false);
    }
    //支付通知队列,且持久化
    @Bean(PAYNOTIFY_QUEUE)
    public Queue paynotify_queue() {
   
        return QueueBuilder.durable(PAYNOTIFY_QUEUE).build();
    }

    //交换机和支付通知队列绑定
    @Bean
    public Binding binding_paynotify_queue(@Qualifier(PAYNOTIFY_QUEUE) Queue queue, @Qualifier(PAYNOTIFY_EXCHANGE_FANOUT) FanoutExchange exchange) {
   
        return BindingBuilder.bind(queue).to(exchange);
    }
}

2. RabbitTemplate

在上面的类中继续添加RabbitTemplate ,并设置ConfirmCallbackReturnsCallback

	@Bean
    public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory) {
   
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        //设置confirm callback
        rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
   
            String body = "1";
            if (correlationData instanceof EnhancedCorrelationData) {
   
                body = ((EnhancedCorrelationData) correlationData).getBody();
            }
            if (ack) {
   
                //消息投递到exchange
                log.debug("消息发送到exchange成功:correlationData={},message_id={} ", correlationData, body);
                System.out.println("消息发送到exchange成功:correlationData={},message_id={}"+correlationData+body);
            } else {
   
                log.debug("消息发送到exchange失败:cause={},message_id={}",cause, body);
                System.out.println("消息发送到exchange失败:cause={},message_id={}"+cause+body);
            }
        });
        
        //设置return callback
        rabbitTemplate.setReturnsCallback(returned -> {
   
            Message message = returned.getMessage();
            int replyCode = returned.getReplyCode();
            String replyText = returned.getReplyText();
            String exchange = returned.getExchange();
            String routingKey = returned.getRoutingKey();
            // 投递失败,记录日志
            log.error("消息发送失败,应答码{},原因{},交换机{},路由键{},消息{}",
                    replyCode, replyText, exchange, routingKey, message.toString());
        });
        return rabbitTemplate;
    }

3. EnhancedCorrelationData

原始的CorrelationData,目前已经无法从中获取消息内容,也就是说现在的ConfirmCallback无法获取到消息的内容,因为设计上只关注是否投递到exchange成功。如果需要在ConfirmCallback中获取消息的内容,需要扩展这个类,并在发消息的时候,放入自定义数据。

public class EnhancedCorrelationData extends CorrelationData {
   
    private final String body;

    public EnhancedCorrelationData(String id, String body) {
   
        super(id);
        this.body = body;
    }

    public String getBody() {
   
        return body;
    }
}

4. 发送消息

EnhancedCorrelationData把消息本身放进去,或者如果你有表记录消息,你可以只放入其id。这样触发ConfirmCallback的时候,就可以获取消息内容。

		public void notifyPayResult() {
   
		String message = "TEST Message";
        Message message1 = MessageBuilder.withBody(message.getBytes(StandardCharsets.UTF_8))
                .setDeliveryMode(MessageDeliveryMode.PERSISTENT)
                .build();
        CorrelationData correlationData = new EnhancedCorrelationData(UUID.randomUUID().toString(), message.toString());
        rabbitTemplate.convertAndSend(PayNotifyConfig.PAYNOTIFY_EXCHANGE_FANOUT,"", message1, correlationData);
    }

在这里插入图片描述

相关推荐

  1. springAMQP(示例)

    2024-02-17 17:14:02       29 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-02-17 17:14:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-02-17 17:14:02       100 阅读
  3. 在Django里面运行非项目文件

    2024-02-17 17:14:02       82 阅读
  4. Python语言-面向对象

    2024-02-17 17:14:02       91 阅读

热门阅读

  1. 求最小生成树相关例题题解

    2024-02-17 17:14:02       45 阅读
  2. 正则表达式

    2024-02-17 17:14:02       40 阅读
  3. LinkedList数据结构链表

    2024-02-17 17:14:02       57 阅读
  4. 牛客 数星星 Stars

    2024-02-17 17:14:02       56 阅读
  5. vue实现多个下拉框联动(一)

    2024-02-17 17:14:02       44 阅读
  6. 深度学习与机器学习的关系

    2024-02-17 17:14:02       52 阅读
  7. Qt 说明Q_PROPERTY的作用

    2024-02-17 17:14:02       49 阅读
  8. python无人医疗战车

    2024-02-17 17:14:02       48 阅读
  9. 【C++搜索】DFS:排列与组合

    2024-02-17 17:14:02       54 阅读