什么是死信队列
- 消息的存活时间到了
- 队列满了(是哪一个消息会成为死信?)
等待最久的消息会成为死信消息
- 消费被拒绝了,或者给
rabbitmq
返回ack
,并且
requeue
设置为false
默认情况下,rabbitmq 对死信的处理就是直接丢弃
如果在创建某个队列的时候,给队列设置了死信队列,那么此时如果一个消息成为了死信,那么对 rabbitmq
就会吧这个死信存储在死信队列中。
package com.atguigu.rabbit.producer.configuration;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author AlenLin
* @version 1.0
* @DESCRIPTION TODO
* @date 2024/4/8 14:56
*/
@Configuration
public class RabbitmaProducerDLConfiguration {
/**
* 死信交换机
*/
@Bean
public Exchange dlxExchange() {
Exchange dlxExchange = ExchangeBuilder.directExchange("dlx_exchange").durable(true).build();
return dlxExchange;
}
@Bean
public Queue dlxQueue() {
Queue dlxQueue = QueueBuilder.durable("dlx_queue").build();
return dlxQueue;
}
@Bean
public Binding dlxQueueBinding() {
Binding dlxBinding = BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dead").noargs();
return dlxBinding;
}
/**
* 正常的交换机
*/
@Bean
public Exchange orderExchange() {
Exchange exchange = ExchangeBuilder.directExchange("order_exchange").durable(true).build();
return exchange;
}
@Bean
public Queue orderQueue() {
Queue queue = QueueBuilder.durable("order_queue")
.maxLength(10)
.deadLetterExchange("dlx_exchange") //设置死信交换机
.deadLetterRoutingKey("dead")//设置死信 routingKey
.build();
return queue;
}
@Bean
public Binding orderQueueBinding() {
Binding binding = BindingBuilder.bind(orderQueue()).to(orderExchange()).with("info").noargs();
return binding;
}
}
@SpringBootTest(classes = ProducerApplication.class)
public class ProducerTset08 {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test01() {
for (int i = 0; i < 11; i++) {
rabbitTemplate.convertAndSend("order_exchange", "info", "hello confirm order_exchange ..." + (i + 1));
}
}
}
延迟队列
延迟队列的作用
存储延迟消息
例如:微信公众号文章的延迟发布、火车票未支付订单到时间自动取消等。
rabbitmq 的没有提供没有真正意义上的延迟队列,需要通过 ttl + 死信队列来实现
@Configuration
public class RabbitmaProducerDLConfiguration {
/**
* 死信交换机
*/
@Bean
public Exchange dlxExchange() {
Exchange dlxExchange = ExchangeBuilder.directExchange("dlx_exchange").durable(true).build();
return dlxExchange;
}
@Bean
public Queue dlxQueue() {
Queue dlxQueue = QueueBuilder.durable("dlx_queue").build();
return dlxQueue;
}
@Bean
public Binding dlxQueueBinding() {
Binding dlxBinding = BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with("dead").noargs();
return dlxBinding;
}
/**
* 正常的交换机
*/
@Bean
public Exchange orderExchange() {
Exchange exchange = ExchangeBuilder.directExchange("order_exchange").durable(true).build();
return exchange;
}
@Bean
public Queue orderQueue() {
Queue queue = QueueBuilder.durable("order_queue")
// .maxLength(10)
.ttl(20000)//设置消息的存活时间
.deadLetterExchange("dlx_exchange") //设置死信交换机
.deadLetterRoutingKey("dead")//设置死信 routingKey
.build();
return queue;
}
@Bean
public Binding orderQueueBinding() {
Binding binding = BindingBuilder.bind(orderQueue()).to(orderExchange()).with("info").noargs();
return binding;
}
}
@Component //spring 容器进行管理
public class ConsumerListener01 {
@RabbitListener(queues = "dlx_queue")//接收消息
public void listener01(Message message, Channel channel) {
byte[] body = message.getBody();
String str = new String(body);//字节数组 转化为字符串
long deliverryTag = message.getMessageProperties().getDeliveryTag();//获取消息的标签
try {
System.out.println("ConsumerListener01...msg --------->" + str);
channel.basicAck(deliverryTag, true);
} catch (IOException e) {
e.printStackTrace();
try {
channel.basicNack(deliverryTag, true, true);
} catch (IOException ex) {
e.printStackTrace();
}
}
}
}