模拟实现
模拟消费者
package com.example.demo.demo;
import com.example.demo.common.Consumer;
import com.example.demo.common.MqException;
import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqsever.core.BasicProperties;
import com.example.demo.mqsever.core.ExchangeType;
import java.io.IOException;
public class DemoConsumer {
public static void main(String[] args) throws IOException, MqException, InterruptedException {
System.out.println("启动消费者!");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
channel.queueDeclare("testQueue", true, false, false, null);
channel.basicConsume("testQueue", true, new Consumer() {
@Override
public void handleDelivery(String consumerTag, BasicProperties basicProperties, byte[] body) throws MqException, IOException {
System.out.println("[消费数据] 开始!");
System.out.println("consumerTag=" + consumerTag);
System.out.println("basicProperties=" + basicProperties);
String bodyString = new String(body, 0, body.length);
System.out.println("body=" + bodyString);
System.out.println("[消费数据] 结束!");
}
});
while (true) {
Thread.sleep(500);
}
}
}
模拟生产者
package com.example.demo.demo;
import com.example.demo.mqclient.Channel;
import com.example.demo.mqclient.Connection;
import com.example.demo.mqclient.ConnectionFactory;
import com.example.demo.mqsever.core.ExchangeType;
import java.io.IOException;
public class DemoProducer {
public static void main(String[] args) throws IOException, InterruptedException {
System.out.println("启动生产者");
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1");
factory.setPort(9090);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("testExchange", ExchangeType.DIRECT, true, false, null);
channel.queueDeclare("testQueue", true, false, false, null);
byte[] body = "hello".getBytes();
boolean ok = channel.basicPublish("testExchange", "testQueue", null, body);
System.out.println("消息投递完成! ok=" + ok);
Thread.sleep(500);
channel.close();
connection.close();
}
}
效果展示
启动结果如下: