如何保证消息不丢失?——使用rabbitmq的死信队列!

如何保证消息不丢失?——使用rabbitmq的死信队列!

1、什么是死信

在 RabbitMQ 中充当主角的就是消息,在不同场景下,消息会有不同地表现。
死信就是消息在特定场景下的一种表现形式,这些场景包括:

    1. 消息被拒绝访问,即 RabbitMQ返回 basicNack 的信号时 或者拒绝basicReject
    1. 消费者发生异常,超过重试次数 。 其实spring框架调用的就是 basicNack
    1. 消息的Expiration 过期时长或队列TTL过期时间。
    1. 消息队列达到最大容量

上述场景经常产生死信,即消息在这些场景中时,被称为死信。

2、什么是死信队列

死信队列就是用于储存死信的消息队列,在死信队列中,有且只有死信构成,不会存在其余类型的消息。
死信队列在 RabbitMQ 中并不会单独存在,往往死信队列都会绑定这一个普通的业务消息队列,当所绑定的消息队列中,有消息变成死信了,那么这个消息就会重新被交换机路由到指定的死信队列中去,我们可以通过对这个死信队列进行监听,从而手动的去对这一消息进行补偿。 人工干预
在这里插入图片描述

3、那么,我们到底如何来使用死信队列呢?

死信队列基本使用,只需要在声明业务队列的时候,绑定指定的死信交换机和RoutingKey即可。

生产者

/*
 * Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.
 *
 */
package com.fpl.provider;

import com.fpl.model.OrderingOk;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * <p>Project: spring-rabbitmq - DeadProvider</p>
 * <p>Powered by fpl1116 On 2024-04-09 11:35:12</p>
 * <p>描述:<p>
 *
 * @author penglei
 * @version 1.0
 * @since 1.8
 */
@Service
public class DeadProvider {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(OrderingOk orderingOk) {
        rabbitTemplate.convertAndSend("Direct_E01", "RK01", orderingOk,new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                int id  = orderingOk.getId();
                int expiration = 0;
                if(id == 1){
                    expiration = 50*1000;
                }else if(id == 2){
                    expiration = 40*1000;
                }else if(id ==3){
                    expiration = 30*1000;
                }else if(id ==4){
                    expiration = 20*1000;
                }else if(id ==5){
                    expiration = 10*1000;
                }
                //为每个消息设置过期时长,但是有可能造成最前面的一个消息未过期一直阻塞后面的消息不能被消费
                message.getMessageProperties().setExpiration(String.valueOf(expiration));
                return message;
            }
        });
    }
}

消费者

/*
 * Copyright (c) 2020, 2024, fpl1116.cn All rights reserved.
 *
 */
package com.fpl.consumers;

import com.fpl.model.OrderingOk;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;

/**
 * <p>Project: spring-rabbitmq - DeadConsumer</p>
 * <p>Powered by fpl1116 On 2024-04-09 11:32:59</p>
 * <p>描述:<p>
 *
 * @author penglei
 * @version 1.0
 * @since 1.8
 */
//@Configuration
@Slf4j
public class DeadConsumer {
    //死信交换机
    @Bean
    public DirectExchange deadExchange(){
        return  ExchangeBuilder.directExchange("Dead_E01").build();
    }
    //死信队列
    @Bean
    public Queue deadQueue1(){
        return   QueueBuilder.durable("Dead_Q01").build();
    }
    //死信交换机与死信队列的绑定
    @Bean
    public Binding deadBinding1(Queue deadQueue1,DirectExchange deadExchange){
        return BindingBuilder.bind(deadQueue1).to(deadExchange).with("RK_DEAD");
    }
    //业务队列
    @Bean
    public Queue queue1(){
        return   QueueBuilder
                .durable("Direct_Q01")
                .deadLetterExchange("Dead_E01")
                .deadLetterRoutingKey("RK_DEAD")
                //.ttl(10*1000) //该属性是队列的属性,设置消息的过期时间,消息在队列里面停留时间n毫秒后,就会把这个消息投递到死信交换机,针对的是所有的消息
                //.maxLength(20) //设置队列存放消息的最大个数,x-max-length属性值,当队列里面消息超过20,会把队列之前的消息依次放进死信队列
                .build();
    }

    //业务交换机
    @Bean
    public DirectExchange exchange(){
        return  ExchangeBuilder.directExchange("Direct_E01").build();
    }
    //业务交换机与队列的绑定
    @Bean
    public Binding binding1(Queue queue1,DirectExchange exchange){
        return BindingBuilder.bind(queue1).to(exchange).with("RK01");
    }


//    @RabbitListener(queues = "Direct_Q01")
//    public void receiveMessage(OrderingOk msg,Message message, Channel channel) throws IOException {
//
//        long deliveryTag = message.getMessageProperties().getDeliveryTag();
//
//        System.out.println("消费者1 收到消息:"+ msg +" tag:"+deliveryTag);
//
//        channel.basicReject(deliveryTag, false);
//        try {
//            // 处理消息...
//            int  i= 5/0;
//            // 如果处理成功,手动发送ack确认 ,Yes
//            channel.basicAck(deliveryTag, false);
//        } catch (Exception e) {
//            // 处理失败,可以选择重试或拒绝消息(basicNack或basicReject)  NO
//            channel.basicNack(deliveryTag, false, false); // 并重新入队
//
//        }
}


//}

测试

@Test
    void test4() throws IOException {

        for (int i = 1; i <=5;i++){
            OrderingOk orderingOk = OrderingOk.builder().id(i).name("fpl " + i).build();
            deadProvider.send(orderingOk);
            System.out.println("发送成功:"+i);
        }
        System.in.read();
    }

在这里插入图片描述

相关推荐

  1. RabbitMQ如何保证消息丢失

    2024-04-11 09:58:03       51 阅读
  2. RabbitMQ 如何保证消息丢失

    2024-04-11 09:58:03       35 阅读
  3. 如何保证消息队列丢失消息(以kafka为例)

    2024-04-11 09:58:03       42 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-04-11 09:58:03       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-04-11 09:58:03       100 阅读
  3. 在Django里面运行非项目文件

    2024-04-11 09:58:03       82 阅读
  4. Python语言-面向对象

    2024-04-11 09:58:03       91 阅读

热门阅读

  1. SpringCloudAlibaba-整合sentinel(四)

    2024-04-11 09:58:03       36 阅读
  2. .NET 设计模式—桥接模式(Design pattern)

    2024-04-11 09:58:03       37 阅读
  3. Css3梳理篇——animation(动画)

    2024-04-11 09:58:03       40 阅读
  4. Mac环境简化RSA密钥生成命令

    2024-04-11 09:58:03       33 阅读
  5. 项目成本管理写作思路

    2024-04-11 09:58:03       37 阅读
  6. Python的re模块

    2024-04-11 09:58:03       39 阅读
  7. 软件测试常见面试题目合集【测试面试】

    2024-04-11 09:58:03       29 阅读
  8. myweb项目资料集

    2024-04-11 09:58:03       34 阅读
  9. 深入理解与实践:npm常用命令全面解析

    2024-04-11 09:58:03       35 阅读