RabbitMQ模式
简单模式(HellWorld)
RabbitMQ依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
消息生产者
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("182.92.234.71");
factory.setUsername("admin");
factory.setPassword("123");
//channel实现了自动close接口 自动关闭 不需要显示关闭
try(Connection connection = factory.newConnection(); //创建链接
Channel channel = connection.createChannel() //获取信道) {
/**
queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
参数:
1. queue:队列名称
2. durable:是否持久化,当mq重启之后,还在
3. exclusive:
* 是否独占。只能有一个消费者监听这队列,true可以多个消费者消费
* 当Connection关闭时,是否删除队列
4. autoDelete:是否自动删除。当没有Consumer时,自动删除掉
5. arguments:其他参数。
*/
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
String message="hello world";
/**
basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
参数:
7. exchange:交换机名称。简单模式下交换机会使用默认的 ""
8. routingKey:路由名称
9. props:配置信息
10. body:发送消息数据
*/
channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
System.out.println("消息发送完毕");
}
}
}
消息消费者
public class Consumer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
//创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("182.92.234.71");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
System.out.println("等待接收消息....");
//推送的消息如何进行消费的接口回调
DeliverCallback deliverCallback=(consumerTag,delivery)->{
String message= new String(delivery.getBody());
System.out.println(message);
};
//取消消费的一个回调接口 如在消费的时候队列被删除掉了
CancelCallback cancelCallback=(consumerTag)->{
System.out.println("消息消费被中断");
};
/**
* 消费者消费消息
* 1.消费哪个队列
* 2.消费成功之后是否要自动应答 true代表自动应答 false手动应答
* 3.消费者未成功消费的回调
*/
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
工作队列模式(Work queues)
工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。
实现工作队列
抽取连接工厂的工具类
从简单模式(HelloWorld)中不难发现,生产者与消费者的创建连接工厂是一样的,那么我们就可以将其抽取出来封装成一个工具类,减少重复代买的书写
/*
* 此类为连接工厂创建信道的工具类
* */
public class RabbitMqUtils {
// 得到一个连接的channel
public static Channel getChannel() throws IOException, TimeoutException {
// 创建一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.163.128");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
消费者(工作线程)代码
public class Worker01 {
// 队列名称
public static final String QUEUE_NAME = "hello";
// 接受消息
public static void main(String[] args) throws IOException, TimeoutException {
//使用工具类创建连接工厂并获取信道
Channel channel = RabbitMqUtils.getChannel();
// 接受消息参数
DeliverCallback deliverCallback = (consumerTag,message) -> {
System.out.println("接受到的消息:"+message.getBody());
};
// 取消消费参数
CancelCallback cancelCallback = consumerTag -> {
System.out.println(consumerTag+"消费者取消消费借口回调逻辑");
};
// 消息的接受
channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
}
}
生产者代码
public class Task01 {
// 队列名称
public static final String QUEUE_NAME = "hello";
// 发送大量消息
public static void main(String[] args) throws Exception {
Channel channel = RabbitMqUtils.getChannel();
// 队列的声明
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 从控制台中输入消息
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()){
String message = scanner.next();
channel.basicPublish("",QUEUE_NAME,null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("发送消息完成:"+ message);
}
}
}
主题模式(Topic)
Topic模式与Direct模式相比,他们都可以根据Routing key把消息路由到对应的队列上,但是Topic模式相较于Direct来说,它可以基于多个标准进行路由。也就是在队列绑定Routing key的时候使用通配符。这让Topic模式相较于Direct模式灵活性更大。
Topic的要求
发送到类型是topic交换机的消息的routing_key不能随意写,必须是由"."进行分隔的单词列表,最大限制为255字节
通配符规则
- *(星号)可以代替一个单词
- #(井号)可以替代零个或多个单词
Topic匹配案例
从图中看下表示例结论
routingKey | 结果 |
---|---|
quick.orange.rabbit | 被队列Q1Q2接收到 |
lazy.orange.elephant | 被队列Q1Q2接收到 |
quick.orange.fox | 被队列Q1接收到 |
lazy.brown.fox | 被队列Q2接收到 |
lazy.pink.rabbit | 虽然满足两个绑定但只被队列Q2接收一次 |
quick.brown.fox | 不匹配任何绑定不会被任何队列接收到会被丢弃 |
quick.orange.male.rabbit | 是四个单词不匹配任何绑定会被丢弃 |
lazy.orange.male.rabbit | 是四个单词但匹配Q2 |
注意:
当一个队列绑定键是#,那么这个队列将接收所有数据,就有点像fanout了
如果队列绑定键当中没有#和*出现,那么该队列绑定类型就是direct了
生产者代码
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
try (Channel channel = RabbitUtils.getChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
/**
* Q1-->绑定的是
* 中间带orange带3个单词的字符串(*.orange.*)
* Q2-->绑定的是
* 最后一个单词是rabbit的3个单词(*.*.rabbit)
* 第一个单词是lazy的多个单词(lazy.#)
*
*/
Map<String, String> bindingKeyMap = new HashMap<>();
bindingKeyMap.put("quick.orange.rabbit","被队列Q1Q2接收到");
bindingKeyMap.put("lazy.orange.elephant","被队列Q1Q2接收到");
bindingKeyMap.put("quick.orange.fox","被队列Q1接收到");
bindingKeyMap.put("lazy.brown.fox","被队列Q2接收到");
bindingKeyMap.put("lazy.pink.rabbit","虽然满足两个绑定但只被队列Q2接收一次");
bindingKeyMap.put("quick.brown.fox","不匹配任何绑定不会被任何队列接收到会被丢弃");
bindingKeyMap.put("quick.orange.male.rabbit","是四个单词不匹配任何绑定会被丢弃");
bindingKeyMap.put("lazy.orange.male.rabbit","是四个单词但匹配Q2");
// 从map中获取发送消息的routingKey和消息本体
for (Map.Entry<String, String> bindingKeyEntry: bindingKeyMap.entrySet()){
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME,bindingKey, null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
}
消费者代码
public class ReceiveLogsTopic01 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明Q1队列与绑定关系
// String queueName="Q1"; 可以改为以下方式灵活获取队列名称
String queueName = channel.queueDeclare().getQueue();
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.orange.*");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收队列:"+queueName+"绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
public class ReceiveLogsTopic02 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] argv) throws Exception {
Channel channel = RabbitUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//声明Q2队列与绑定关系
String queueName="Q2";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind(queueName, EXCHANGE_NAME, "lazy.#");
System.out.println("等待接收消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收队列:"+queueName+"绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}