RabbitMQ的部分模式

1发布订阅模式

发送者

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class PublishProduct {
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQ
        factory.setHost("192.168.74.75");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            // 创建一个通道
            channel = connection.createChannel();
            //创建交换机
            channel.exchangeDeclare("qy172-fanout-exchange", BuiltinExchangeType.FANOUT, true);
            //创建队列,如果存在则不会创建
            channel.queueDeclare("qy172-publish-queue01", true, false, false, null);
            channel.queueDeclare("qy172-publish-queue02", true, false, false, null);
            //交互机和队列绑定
            channel.queueBind("qy172-publish-queue01", "qy172-fanout-exchange", "");
            channel.queueBind("qy172-publish-queue02", "qy172-fanout-exchange", "");
            // 创建消息内容
            HashMap<String, Object> map = new HashMap<>();
            map.put("name", "张三");
            map.put("age", "22");
//把数据给交换机,让他分发给队列
            channel.basicPublish("qy172-fanout-exchange", "", null, JSON.toJSONBytes(map));
            System.out.println("发送成功");
        } catch (IOException e) {
            // 发生 IO 异常时抛出运行时异常
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            // 发生超时异常时抛出运行时异常
            throw new RuntimeException(e);
        } finally {
            if (channel != null) {
                try {
                    // 关闭通道
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    // 发生 IO 或超时异常时抛出运行时异常
                    throw new RuntimeException(e);
                }
            }
            if (connection != null) {
                try {
                    // 关闭连接
                    connection.close();
                } catch (IOException e) {
                    // 发生 IO 异常时抛出运行时异常
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

2订阅个订阅者

订阅者1

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
        factory.setHost("192.168.74.75");
        Connection connection = factory.newConnection();
        // 创建一个 RabbitMQ 连接
        Channel channel = connection.createChannel();
        // 创建一个通道,用于与 RabbitMQ 之间的通信
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            // 创建一个消费者对象,并重写其方法
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               // 消费消息的处理方法
               String json = new String(body);
               // 将消息内容转换为字符串
               Map map = JSON.parseObject(json, Map.class);
               // 使用 JSON 解析成 Map 对象
               System.out.println("消息内容Consumer01"+map);
               // 输出消息内容
           }
       };
        channel.basicConsume("qy172-publish-queue01",true,consumer);
    }
}

订阅者2

package com.aaa;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer02 {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.74.75");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String json = new String(body);
                    Map map = JSON.parseObject(json, Map.class);
                    System.out.println("消息内容Consumer02" + map);
                }
            };
            //订阅者2
            channel.basicConsume("qy172-publish-queue02",true,consumer);
        } catch (IOException | TimeoutException e) {
            // 处理连接、通道创建或消费消息时可能抛出的异常
            e.printStackTrace();
        }
    }
}

2路由模式

发送者

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class PublishProduct {
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQ
        factory.setHost("192.168.74.75");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            // 创建一个通道
            channel = connection.createChannel();
            //创建交换机,
            channel.exchangeDeclare("qy172-router-exchange", BuiltinExchangeType.DIRECT, true);
            //创建队列,如果存在则不会创建
            channel.queueDeclare("qy172-router-queue01", true, false, false, null);
            channel.queueDeclare("qy172-router-queue02", true, false, false, null);
            //交互机和队列绑定
            channel.queueBind("qy172-router-queue01", "qy172-router-exchange", "error");
            channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "error");
            channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "info");
            channel.queueBind("qy172-router-queue02", "qy172-router-exchange", "warning");
            // 创建消息内容
            HashMap<String, Object> map = new HashMap<>();
            map.put("name", "张三");
            map.put("age", "22");
            //把数据给交换机,让他分发给队列
            channel.basicPublish("qy172-router-exchange","error",null,JSON.toJSONBytes(map));
//            channel.basicPublish("qy172-router-exchange","info",null,JSON.toJSONBytes(map));
            System.out.println("发送成功");
        } catch (IOException e) {
            // 发生 IO 异常时抛出运行时异常
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            // 发生超时异常时抛出运行时异常
            throw new RuntimeException(e);
        } finally {
            if (channel != null) {
                try {
                    // 关闭通道
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    // 发生 IO 或超时异常时抛出运行时异常
                    throw new RuntimeException(e);
                }
            }
            if (connection != null) {
                try {
                    // 关闭连接
                    connection.close();
                } catch (IOException e) {
                    // 发生 IO 异常时抛出运行时异常
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

接收者1

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
        factory.setHost("192.168.74.75");
        Connection connection = factory.newConnection();
        // 创建一个 RabbitMQ 连接
        Channel channel = connection.createChannel();
        // 创建一个通道,用于与 RabbitMQ 之间的通信
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            // 创建一个消费者对象,并重写其方法
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               // 消费消息的处理方法
               String json = new String(body);
               // 将消息内容转换为字符串
               Map map = JSON.parseObject(json, Map.class);
               // 使用 JSON 解析成 Map 对象
               System.out.println("消息内容Consumer01"+map);
               // 输出消息内容
           }
       };
        channel.basicConsume("qy172-router-queue01",true,consumer);
    }
}

接收者2

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
        factory.setHost("192.168.74.75");
        Connection connection = factory.newConnection();
        // 创建一个 RabbitMQ 连接
        Channel channel = connection.createChannel();
        // 创建一个通道,用于与 RabbitMQ 之间的通信
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            // 创建一个消费者对象,并重写其方法
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               // 消费消息的处理方法
               String json = new String(body);
               // 将消息内容转换为字符串
               Map map = JSON.parseObject(json, Map.class);
               // 使用 JSON 解析成 Map 对象
               System.out.println("消息内容Consumer01"+map);
               // 输出消息内容
           }
       };
        channel.basicConsume("qy172-router-queue01",true,consumer);
    }
}

3主题模式

发送者

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.HashMap;
import java.util.concurrent.TimeoutException;
public class PublishProduct {
    public static void main(String[] args) {
        // 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 服务器的地址,我用的redis,RabbitMQ,现在redis里安装RabbitMQ
        factory.setHost("192.168.74.75");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            // 创建一个通道
            channel = connection.createChannel();
            //创建交换机,
            channel.exchangeDeclare("qy172-topic-exchange", BuiltinExchangeType.TOPIC, true);
            //创建队列,如果存在则不会创建
            channel.queueDeclare("qy172-topic-queue01", true, false, false, null);
            channel.queueDeclare("qy172-topic-queue02", true, false, false, null);
            //交互机和队列绑定
            //主题匹配给这个
            channel.queueBind("qy172-topic-queue01", "qy172-topic-exchange", "*.orange.*");
            //主题,也匹配给这个
            channel.queueBind("qy172-topic-queue02", "qy172-topic-exchange", "*.*.rabbit");
            channel.queueBind("qy172-topic-queue02", "qy172-topic-exchange", "lazy.#");
            // 创建消息内容
            HashMap<String, Object> map = new HashMap<>();
            map.put("name", "张三");
            map.put("age", "22");
            //把数据给交换机,让他分发给队列
            channel.basicPublish("qy172-topic-exchange","lazy.orange.rabbit",null,JSON.toJSONBytes(map));
            System.out.println("发送成功");
        } catch (IOException e) {
            // 发生 IO 异常时抛出运行时异常
            throw new RuntimeException(e);
        } catch (TimeoutException e) {
            // 发生超时异常时抛出运行时异常
            throw new RuntimeException(e);
        } finally {
            if (channel != null) {
                try {
                    // 关闭通道
                    channel.close();
                } catch (IOException | TimeoutException e) {
                    // 发生 IO 或超时异常时抛出运行时异常
                    throw new RuntimeException(e);
                }
            }
            if (connection != null) {
                try {
                    // 关闭连接
                    connection.close();
                } catch (IOException e) {
                    // 发生 IO 异常时抛出运行时异常
                    throw new RuntimeException(e);
                }
            }
        }
    }
}

接收者1

package org.example;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
public class Consumer01 {
    public static void main(String[] args) throws Exception {
        // 创建连接工厂对象
        ConnectionFactory factory = new ConnectionFactory();
        // 设置 RabbitMQ 服务器的主机地址为 "192.168.74.75"
        factory.setHost("192.168.74.75");
        Connection connection = factory.newConnection();
        // 创建一个 RabbitMQ 连接
        Channel channel = connection.createChannel();
        // 创建一个通道,用于与 RabbitMQ 之间的通信
        com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
            // 创建一个消费者对象,并重写其方法
           @Override
           public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
               // 消费消息的处理方法
               String json = new String(body);
               // 将消息内容转换为字符串
               Map map = JSON.parseObject(json, Map.class);
               // 使用 JSON 解析成 Map 对象
               System.out.println("消息内容Consumer01"+map);
               // 输出消息内容
           }
       };
        channel.basicConsume("qy172-topic-queue01",true,consumer);
    }
}

接收者2

package com.aaa;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeoutException;
public class Consumer02 {
    public static void main(String[] args) {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.74.75");
        try {
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    String json = new String(body);
                    Map map = JSON.parseObject(json, Map.class);
                    System.out.println("消息内容Consumer02" + map);
                }
            };
            //订阅者2
            channel.basicConsume("qy172-topic-queue02",true,consumer);
        } catch (IOException | TimeoutException e) {
            // 处理连接、通道创建或消费消息时可能抛出的异常
            e.printStackTrace();
        }
    }
}

相关推荐

  1. RabbitMQ部分模式

    2024-03-30 12:54:02       17 阅读
  2. Rabbitmq几种模式总结

    2024-03-30 12:54:02       30 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-30 12:54:02       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-30 12:54:02       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-30 12:54:02       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-30 12:54:02       18 阅读

热门阅读

  1. 关于学习编程和技术的自述

    2024-03-30 12:54:02       16 阅读
  2. 每日更新5个Python小技能 | 第六期

    2024-03-30 12:54:02       16 阅读
  3. Hive窗口函数笔试题(面试题)

    2024-03-30 12:54:02       19 阅读
  4. Android studio 老旧版本下载地址

    2024-03-30 12:54:02       20 阅读
  5. leetcode 62.不同路径

    2024-03-30 12:54:02       12 阅读
  6. 电子元器件商城模式的安全与风险管理

    2024-03-30 12:54:02       22 阅读
  7. 算法——图论:路径,回溯

    2024-03-30 12:54:02       17 阅读
  8. 了解监控易(13):数据库监控-功能模块解析

    2024-03-30 12:54:02       13 阅读