Fanout消息模型
* 广播模型:
* 一个交换机绑定多个队列
* 每个队列都有一个消费者
* 每个消费者消费自己队列中的消息,每个队列的信息是一样的
生产者
package com.example.demo02.mq.fanout;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
public class FanoutSender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT,false);
String msg = "fanout message";
channel.basicPublish("fanout.exchange", "", null, msg.getBytes());
channel.close();
connection.close();
}
}
消费者1
package com.example.demo02.mq.fanout;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class FanoutReceiver1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT,false);
channel.queueDeclare("fanout.queue1", false, false, false, null);
channel.queueBind("fanout.queue1", "fanout.exchange", "");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Fanout1接收到的消息是:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("fanout.queue1",false,consumer);
}
}
消费者2
package com.example.demo02.mq.fanout;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class FanoutReceiver2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("fanout.exchange", BuiltinExchangeType.FANOUT,false);
channel.queueDeclare("fanout.queue2", false, false, false, null);
channel.queueBind("fanout.queue2", "fanout.exchange", "");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("Fanout2接收到的消息是:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("fanout.queue2",false,consumer);
}
}
结果
![在这里插入图片描述](https://img-blog.csdnimg.cn/direct/1ab4319a1a7b4c14a9c0c3464d422f80.png)