死信队列和延迟队列
死信队列
死信的概念
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。
死信消息是RabbitMQ为我们做的一层保证,其实我们也可以不使用死信队列,而是在消息消费异常时,将消息主动投递到另一个交换机中,或者将消息重新投递到一个队列然后设置过期时间,来进行延时消费。
死信的来源
- 消息在队列的存活时间超过设置的生存时间(TTL)时间。
- 消息队列的消息数量已经超过最大队列长度。
- 消息被拒绝,使用
channel.basicNack
或channel.basicReject
,并且此时requeue
属性被设置为false
。
如何配置死信队列
配置死信队列,大概可以分为以下步骤:
- 配置业务队列,绑定到业务交换机上
- 为业务队列配置死信交换机和路由key
- 为死信交换机配置死信队列
注意:
一般情况下会为每一个业务队列都配置一个相对应的死信队列,也可以多个业务队列共用一个死信队列。要注意的是业务队列和死信队列的交换机要区分开。
并不是直接声明一个公共的死信队列,然后所有死信消息就自己跑到死信队列里去了,而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key
重要的业务一般都是建议单独配置一条死信队列
创建config类
@Configuration
public class RabbitMQConfig {
public static final String BUSINESS_EXCHANGE_NAME = "business.exchange";
public static final String BUSINESS_QUEUEA_NAME = "business.queuea";
public static final String BUSINESS_QUEUEB_NAME = "business.queueb";
public static final String DEAD_LETTER_EXCHANGE = "deadletter.exchange";
public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "deadletter.queuea.routingkey";
public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "deadletter.queueb.routingkey";
public static final String DEAD_LETTER_QUEUEA_NAME = "deadletter.queuea";
public static final String DEAD_LETTER_QUEUEB_NAME = "deadletter.queueb";
// 声明业务Exchange
@Bean("businessExchange")
public FanoutExchange businessExchange(){
return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
}
// 声明死信Exchange
@Bean("deadLetterExchange")
public DirectExchange deadLetterExchange(){
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 声明业务队列A
@Bean("businessQueueA")
public Queue businessQueueA(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
}
// 声明业务队列B
@Bean("businessQueueB")
public Queue businessQueueB(){
Map<String, Object> args = new HashMap<>(2);
// x-dead-letter-exchange 这里声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// x-dead-letter-routing-key 这里声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
}
// 声明死信队列A
@Bean("deadLetterQueueA")
public Queue deadLetterQueueA(){
return new Queue(DEAD_LETTER_QUEUEA_NAME);
}
// 声明死信队列B
@Bean("deadLetterQueueB")
public Queue deadLetterQueueB(){
return new Queue(DEAD_LETTER_QUEUEB_NAME);
}
// 声明业务队列A绑定关系
@Bean
public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 声明业务队列B绑定关系
@Bean
public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
@Qualifier("businessExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
// 声明死信队列A绑定关系
@Bean
public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
}
// 声明死信队列B绑定关系
@Bean
public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
@Qualifier("deadLetterExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
}
}
配置文件
pom文件引入spring-boot-starter-amqp
和 spring-boot-starter-web
的依赖
spring:
rabbitmq:
host: localhost
password: guest
username: guest
listener:
type: simple
simple:
# 监听器抛出异常而拒绝的消息是否被重新放回队列。默认值为true(重新放回队列)。
default-requeue-rejected: false
# 消息确认方式
# none意味着没有任何的应答会被发送。
# manual意味着监听者必须通过调用Channel.basicAck()来告知所有的消息。
# auto意味着容器会自动应答,除非MessageListener抛出异常,这是默认配置方式。
acknowledge-mode: manual
业务队列消费者代码
@Slf4j
@Component
public class BusinessMessageReceiver {
@RabbitListener(queues = BUSINESS_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
String msg = new String(message.getBody());
log.info("收到业务消息A:{}", msg);
boolean ack = true;
Exception exception = null;
try {
if (msg.contains("deadletter")){
throw new RuntimeException("dead letter exception");
}
} catch (Exception e){
ack = false;
exception = e;
}
if (!ack){
log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
} else {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
@RabbitListener(queues = BUSINESS_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
System.out.println("收到业务消息B:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
死信队列消费者代码
@Component
public class DeadLetterMessageReceiver {
@RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
public void receiveA(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息A:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
@RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
public void receiveB(Message message, Channel channel) throws IOException {
System.out.println("收到死信消息B:" + new String(message.getBody()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
延迟队列
延迟队列概念
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。 延迟队列机制和死信 TTL过期相似 , 但延迟队列对时间的控制较灵活, 应用应用场景广泛。
延迟队列使用场景
- 订单在十分钟之内未支付则自动取消
- 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
- 用户注册成功后,如果三天内没有登陆则进行短信提醒。
- 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
- 预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议
RabbitMQ 中的TTL
TTL 是什么呢?TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间, 单位是毫秒。换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用,有两种方式设置TTL。
RabbitMQ TTL 配置
- RabbitMQ可以针对Queue和Message设置
x-message-ttl
,来控制消息的生存时间,如果超时则消息变为dead letter - RabbitMQ的Queue可以配置
x-dead-letter-exchange
和x-dead-letter-routing-key
(可选)两个参数,用来控制队列内出现的dead letter,dead letter到达死信队列交换机后则按照这两个参数重新路由。
1、队列设置TTL
在创建队列的时候设置队列的x-message-ttl
属性
@Bean
public Queue queue() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-message-ttl", 30000);// 设置超时时间30s
return QueueBuilder.durable(TAXI_OVER_QUEUE).withArguments(args).build();
}
2、消息设置TTL
另一种方式便是针对每条消息设置TTL,代码如下:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");// 当前消息TTL为60s
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
3、两者区别
如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队 列中),而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外,还需 要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以 直接投递该消息到消费者,否则该消息将会被丢弃。
实现延时队列
我们现在知道了死信队列、TTL,只要把二者相融合,就能实现延时队列了。
延时队列,不就是想要消息延迟多久被处理吗,TTL则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就万事大吉了,因为里面的消息都是希望被立即处理的消息。
配置类文件代码
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String DEAD_LETTER_QUEUE = "QD";
// 声明xExchange
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
// 声明yExchange
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
//声明队列A ttl为10s并绑定到对应的死信交换机
@Bean("queueA")
public Queue queueA(){
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的TTL
args.put("x-message-ttl", 10000); // 如果需要生产者来声明TTL则注释这行
return QueueBuilder.durable(QUEUE_A).withArguments(args).build();
}
// 声明队列A绑定X交换机
@Bean
public Binding queueaBindingX(@Qualifier("queueA") Queue queueA,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
//声明队列B ttl为40s并绑定到对应的死信交换机
@Bean("queueB")
public Queue queueB(){
Map<String, Object> args = new HashMap<>(3);
//声明当前队列绑定的死信交换机
args.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
//声明当前队列的死信路由key
args.put("x-dead-letter-routing-key", "YD");
//声明队列的TTL
args.put("x-message-ttl", 40000);
return QueueBuilder.durable(QUEUE_B).withArguments(args).build();
}
//声明队列B绑定X交换机
@Bean
public Binding queuebBindingX(@Qualifier("queueB") Queue queue1B,
@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queue1B).to(xExchange).with("XB");
}
//声明死信队列QD
@Bean("queueD")
public Queue queueD(){
return new Queue(DEAD_LETTER_QUEUE);
}
//声明死信队列QD绑定关系
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,
@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
生产者代码
@GetMapping("sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime) {
// 生产者设置延时时间方式,需要注释配置代码中第30行
rabbitTemplate.convertAndSend("X", "XC", message, correlationData ->{
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
log.info("当前时间:{},发送一条时长{}毫秒TTL信息给队列C:{}", new Date(),ttlTime, message);
}
以上代码看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置TTL的方式,消息可能并不会按时“死亡“,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行。
Rabbitmq 插件实现延迟队列
上述问题的解决方式是使用Rabbitmq 插件实现延迟队列(https://www.rabbitmq.com/community-plugins.html)。下载rabbitmq_delayed_message_exchange
插件,然后解压放置到RabbitMQ的插件目录。
安装插件后只要将配置文件中的交换机改为延迟交换机
配置类文件代码
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME = "delayed.queue";
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@Bean
public Queue delayedQueue() {
return new Queue(DELAYED_QUEUE_NAME);
}
//自定义交换机 我们在这里定义的是一个延迟交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
//自定义交换机的类型
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false,
args);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue,
@Qualifier("delayedExchange") CustomExchange
delayedExchange) {
return
BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
生产者代码
public static final String DELAYED_EXCHANGE_NAME = "delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingkey";
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message,@PathVariable Integer delayTime) {
rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, message,
correlationData ->{
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info(" 当 前 时 间 : {}, 发送一条延迟{}毫秒的信息给队列 delayed.queue:{}", new
Date(),delayTime, message);
}