文章目录
Producer
这段代码定义了一个名为Producer的RESTful Web服务Controller,它提供了一个HTTP GET接口/direct/sendMsg,用于发送消息到RabbitMQ的交换机。当该接口被调用时,它会发送一个固定的消息字符串到名为myDirectExchangeAAA的交换机,并使用keyAAA作为RoutingKey。发送成功后,它会返回一个表示成功的字符串。
// 声明一个包名,用于组织和管理Java类。
package com.example.direct;
// 导入Spring框架中RabbitMQ相关的RabbitTemplate类,它提供了发送和接收消息的方法。
import org.springframework.amqp.rabbit.core.RabbitTemplate;
// 导入Spring框架的自动装配注解,用于自动注入依赖。
import org.springframework.beans.factory.annotation.Autowired;
// 导入Spring Web模块的注解,用于映射HTTP GET请求到特定的处理方法。
import org.springframework.web.bind.annotation.GetMapping;
// 导入Spring Web模块的注解,用于定义Controller类的请求映射路径。
import org.springframework.web.bind.annotation.RequestMapping;
// 导入Spring Web模块的注解,用于标识一个类为RESTful Web服务的Controller。
import org.springframework.web.bind.annotation.RestController;
// 使用@RestController注解标识该类为RESTful Web服务的Controller,
// 意味着此类将处理HTTP请求并返回数据。
@RestController
// 使用@RequestMapping注解定义该Controller的基础路径为"direct"。
@RequestMapping("direct")
// 声明一个名为Producer的公共类。
public class Producer {
// 使用@Autowired注解自动注入RabbitTemplate的实例,
// 以便在类中使用RabbitMQ的功能。
@Autowired
private RabbitTemplate rabbitTemplate;
// 使用@GetMapping注解映射HTTP GET请求到sendMsg方法,
// 当访问"/direct/sendMsg"路径时,将调用此方法。
@GetMapping("sendMsg")
// 声明一个公共的sendMsg方法,该方法不接收任何参数,并返回一个字符串。
public String sendMsg(){
// 定义一个要发送的消息字符串。
String msg = "已经生成了订单,需要减去库存1个";
// 使用RabbitTemplate的convertAndSend方法发送消息到RabbitMQ交换机。
// 第一个参数是交换机的名称("myDirectExchangeAAA"),
// 第二个参数是RoutingKey("keyAAA"),用于确定消息应该路由到哪个队列,
// 第三个参数是要发送的消息内容(msg)。
rabbitTemplate.convertAndSend("myDirectExchangeAAA","keyAAA",msg);
// 返回一个表示消息发送成功的字符串。
return "send msg ok";
}
}
Consumer
package com.example.direct; // 声明包名为com.example.direct。
import com.rabbitmq.client.Channel; // 导入RabbitMQ的Channel类,它代表了一个通信信道。
import org.springframework.amqp.core.Message; // 导入Spring AMQP的Message类,表示一条消息。
import org.springframework.amqp.rabbit.annotation.RabbitHandler; // 导入RabbitHandler注解,标识处理RabbitMQ消息的方法。
import org.springframework.amqp.rabbit.annotation.RabbitListener; // 导入RabbitListener注解,用于监听RabbitMQ队列。
import org.springframework.stereotype.Component; // 导入Spring的Component注解,标识该类为Spring的一个组件。
import java.io.IOException; // 导入Java的IOException类,处理可能的输入输出异常。
// 使用@Component注解将该类声明为Spring的一个组件,这样Spring会自动扫描并管理它。
@Component
public class Consumer { // 声明一个公共类Consumer。
// 使用@RabbitHandler注解标识该方法为处理RabbitMQ消息的方法。
// 使用@RabbitListener注解监听名为"queueAAA"的队列。
@RabbitHandler
@RabbitListener(queues = "queueAAA")
public void getMSg1(Message message, Channel channel){ // 定义一个公共方法getMSg1,接收一个Message和一个Channel作为参数。
try {
System.out.println("模拟库存业务处理减库存:" + message); // 打印接收到的消息。
Integer stock = 3;
int number = 10;
number -= stock;
// 消息确认:立马删除 消息。这是RabbitMQ的消息确认机制,确保消息被正确处理后可以安全删除。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
System.out.println("减库存业务执行结束,队列消息已删除"); // 打印消息表示业务处理完毕且消息已被删除。
} catch (Exception e) { // 捕获所有异常。
try {
System.out.println("减库存业务有异常,消息重入队列"); // 打印异常信息。
// 当处理消息时发生异常,可以选择将消息重新放回队列以供后续处理。
// 这里basicReject的第二个参数为true,表示消息将重新入队。
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException ioException) { // 捕获可能的IO异常。
ioException.printStackTrace(); // 打印IO异常的堆栈信息。
}
e.printStackTrace(); // 打印原始异常的堆栈信息。
}
}
}
RabbitMQDirectConfig.java
package com.example.direct;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQDirectConfig {
// 1. 创建交换机
@Bean
public DirectExchange newDirectExchange(){
return new DirectExchange("myDirectExchangeAAA",true,false);
}
//2. 创建队列
@Bean
public Queue newQueueA(){
return new Queue("queueAAA",true);
}
//3. 绑定队列到交换机中
@Bean
public Binding bindingA(){
return BindingBuilder.bind(newQueueA()).to(newDirectExchange()).with("keyAAA");
}
}
application.yaml
server:
servlet:
context-path: /app
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated # 确认交换机已经接收到生产者的消息了
publisher-returns: true # 消息已经到了队列(交换机与队列绑定成功的)
listener:
simple:
acknowledge-mode: manual # 手动消息确认
ServletInitializer.java
package com.example;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
public class ServletInitializer extends SpringBootServletInitializer {
@Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
return application.sources(RabbitmqApplication.class);
}
}
RabbitmqApplication.java
package com.example;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class RabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqApplication.class, args);
}
}
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>war</packaging>
<name>rabbitmq</name>
<description>rabbitmq</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<!-- AMQP客户端 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
扣库存业务处理成功,消息确认 ACK
当扣库存业务处理成功后,消费者(即处理扣库存业务的系统或服务)会向消息队列发送一个确认消息(ACK)。
这个ACK消息是告知消息队列,该条消息已经被成功处理,可以从队列中删除。
RabbitMQ等消息队列在收到ACK后,会将该条消息从队列中移除,确保它不会被再次分发。
扣库存业务处理成功,没有消息确认 uAC
如果扣库存业务处理成功,但没有发送ACK消息进行确认,这种情况通常是不正常的。
在RabbitMQ的默认设置中,如果消费者没有显式地发送ACK,消息队列会认为该消息未被成功处理。
这可能导致消息被重新放入队列(requeue),以供其他消费者尝试处理,或者在某些配置下,可能会导致消息被丢弃或转入死信队列。
则意味着系统需要处理这种未确认的情况,可能通过重试、日志记录或警报来确保消息不会丢失。
扣库存失败,消息拒绝,重入队列 ready 1
当扣库存业务处理失败时,消费者可以选择拒绝该消息,并指示消息队列重新将该消息放入队列以供后续处理。
在RabbitMQ中,这通常通过调用basicReject方法实现,并设置requeue参数为true。
“ready 1”表示队列已经准备好重新分发该消息。具体含义可能依赖于您的系统实现和上下文。
重新入队列的消息将有机会被其他消费者或同一消费者(在后续轮询中)再次尝试处理。
“该消息已经准备好再次被消费”和“队列已经准备好重新分发该消息”
在RabbitMQ中,当说到“该消息已经准备好再次被消费”和“队列已经准备好重新分发该消息”时,虽然两者在某种程度上是相关的,但它们指的是不同的状态或情况。以下是对这两个表述的详细解释:
- 该消息已经准备好再次被消费
含义:这通常意味着某个特定的消息在被消费者拒绝后,已经被重新放回到队列中,并处于可消费状态。换句话说,这个消息现在可以被任何连接到该队列的消费者再次获取和处理。
背景:在RabbitMQ中,当消费者处理消息失败时,它可以选择拒绝该消息,并通过设置requeue参数来决定是否将消息重新放回队列。如果requeue设置为true,则消息会被重新放回队列的尾部,等待下一次的消费。
影响:这个消息会再次出现在队列中,供其他消费者或同一消费者在未来的某个时间点再次消费。这有助于实现消息的可靠传递,确保即使处理失败,消息也不会丢失。 - 队列已经准备好重新分发该消息
含义:这通常指的是队列本身的状态。当队列中有消息等待被消费时,我们可以说这个队列已经准备好分发或重新分发消息。这并不一定意味着某个特定的消息已经被重新放回队列,而是指队列整体处于可以分发消息的状态。
背景:在RabbitMQ中,队列是存储和分发消息的中间件。当生产者发送消息到队列时,这些消息会被存储在队列中,等待消费者来消费。如果消费者处理消息失败并将其重新放回队列,队列会再次将这个消息添加到其分发列表中。
影响:当队列处于这种状态时,任何连接到该队列的消费者都有可能接收到这个消息(或其他等待中的消息)。队列会按照其配置(如轮询、优先级等)来决定下一个接收消息的消费者。
总结
“该消息已经准备好再次被消费”关注的是某个特定消息的状态,即该消息已被重新放回队列并可供再次消费。
“队列已经准备好重新分发该消息”则关注的是队列整体的状态,即队列中有消息等待被分发,这些消息可能包括之前被拒绝并重新放回的消息,也可能包括新进入队列的消息。
在RabbitMQ的实际应用中,理解和区分这两种状态对于确保消息的可靠传递和系统的稳定运行至关重要。