RabbitMQ基本使用,docker安装RabbitMQ,SpringBoot整合RabbitMQ

1.拉取镜像

docker pull rabbitmq:3.9.15-management

2.运行容器

docker run -d --hostname rabbit1 --name myrabbit1 -p 15672:15672 -p 5672:5672 -e RABBITMQ_ERLANG_COOKIE='rabbitcookie' rabbitmq:3.9.15-management

3.访问地址

安装ip加端口号

http://192.168.123.3:15672/

客户端如下:

在这里插入图片描述
登录账号密码:
username:guest
password:guest

4.新增用户

创建管理员账号:
admin
admin
点击add user保存
在这里插入图片描述

在这里插入图片描述

5.新增虚拟空间

在这里插入图片描述

名字要以/开头
/mqname1
在这里插入图片描述

创建成功
在这里插入图片描述

查看是否授予权限
在这里插入图片描述
授权给guest用户权限,根据自己需要授权
在这里插入图片描述
授权成功
在这里插入图片描述

6.原生RabbitMq代码实现

加入依赖

        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.6.0</version>
        </dependency>
package com.mq.pruducer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Author: 简单模式生产者
 * @Date: 2024/01/29/15:16
 * @Description: good good study,day day up
 */
public class SimpleProducer {


    /**
     * 简单模式消息的生产者发送消息
     * @param args
     */
    public static void main(String[] args) throws Exception{
        //创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置RabbitMQ服务主机地址
        connectionFactory.setHost("192.168.3.123");
        //设置RabbitMQ服务端口,默认5672
        connectionFactory.setPort(5672);
        //设置虚拟主机名字,默认/
        connectionFactory.setVirtualHost("/mqname1");
        //设置用户连接名,默认guest
        connectionFactory.setUsername("admin");
        //设置连接密码,默认guest
        connectionFactory.setPassword("admin");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明队列
        /**
         * 1.队列的名字
         * 2.持久化
         * 3.是否独占队列,ture:只有这个对象可以操作这个队列,其他的对象如果要操作,只能等这个队列操作结束,相当于加锁
         * 4.在本次连接释放以后,是否删除队列---类似数据库临时表
         * 5.队列的附加属性
         */
        channel.queueDeclare("simple_queue", true, false, false, null);

        for (int i = 0; i < 10; i++) {
            //创建消息
            String message = "这是RabbitMQ的第" + i + "条消息!";
            //消息发送
            /**
             * 1.交换机
             * 2.routingkey是什么:简单模式下和队列的名字保持一致
             * 3.消息的附加属性是什么
             * 4.消息的内容是什么
             */
            channel.basicPublish("","simple_queue", null, message.getBytes());
            //关闭资源
        }
        channel.close();
        connection.close();
    }
}

查看发送消息
在这里插入图片描述

在这里插入图片描述
发送了10条消息
在这里插入图片描述

消费消息

package com.mq.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.io.UnsupportedEncodingException;

/**
 * @Author: 简单模式消息消费者
 * @Date: 2024/01/29/15:31
 * @Description: good good study,day day up
 */
public class SimpleConsumer {


    /**
     * 简单模式消息消费者接受消息
     * @param args
     */
    public static void main(String[] args) throws Exception{
        //创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置RabbitMQ服务主机地址,默认localhost
        connectionFactory.setHost("192.168.3.123");
        //设置RabbitMQ服务端口,默认5672
        connectionFactory.setPort(5672);
        //设置虚拟主机名字,默认/
        connectionFactory.setVirtualHost("/mqname1");
        //设置用户连接名,默认guest
        connectionFactory.setUsername("admin");
        //设置连接密码,默认guest
        connectionFactory.setPassword("admin");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明队列
        /**
         * 1.队列的名字
         * 2.持久化
         * 3.是否独占队列
         * 4.在本次连接释放以后,是否删除队列---临时表
         * 5.队列的附加属性
         */
        channel.queueDeclare("simple_queue", true, false, false, null);
        //创建消费者,并设置消息处理:自定义的操作
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 真实自定义处理消息的逻辑
             * @param consumerTag:消息的标签
             * @param envelope:消息的属性:消息属于哪个交换机发来的, 消息数据哪个队列=消息routingkey是什么,消息的编号
             * @param properties
             * @param body:消息的内容
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException, UnsupportedEncodingException {
                String s = new String(body, "UTF-8");
                System.out.println("收到的消息的内容为:" + s);
                long deliveryTag = envelope.getDeliveryTag();//消息的编号
                String exchange = envelope.getExchange();//交换机的信息
                String routingKey = envelope.getRoutingKey();//routingKey的信息
                System.out.println("收到的消息的编号为:" + deliveryTag);
                System.out.println("收到的消息的所属的为:" + exchange);
                System.out.println("收到的消息所属的队列为:" + routingKey);

                //保存消息到数据库
            }
        };

        //消息监听
        /**
         * 1.监听队列的名字
         * 2.是否自动确认消息
         */
        channel.basicConsume("simple_queue", true, defaultConsumer);
        //关闭资源(不建议关闭,建议一直监听消息)
    }
}

在这里插入图片描述

已经消费

在这里插入图片描述

广播模式

package com.mq.pruducer;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Author: 广播模式生产者
 * @Date: 2024/01/29/15:58
 * @Description: good good study,day day up
 */
public class FanoutProducer {

    /**
     * 广播模式消息的生产者发送消息
     * @param args
     */
    public static void main(String[] args) throws Exception{
        //创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置RabbitMQ服务主机地址,默认localhost
        connectionFactory.setHost("192.168.3.123");
        //设置RabbitMQ服务端口,默认5672
        connectionFactory.setPort(5672);
        //设置虚拟主机名字,默认/
        connectionFactory.setVirtualHost("/mqname1");
        //设置用户连接名,默认guest
        connectionFactory.setUsername("admin");
        //设置连接密码,默认guest
        connectionFactory.setPassword("admin");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明队列
        /**
         * 1.队列的名字
         * 2.持久化
         * 3.是否独占队列
         * 4.在本次连接释放以后,是否删除队列---临时表
         * 5.队列的附加属性
         */
        channel.queueDeclare("fanout_queue_1", true, false, false, null);
        channel.queueDeclare("fanout_queue_2", true, false, false, null);

        //声明交换机
        /**
         * 1.交换机的名字
         * 2.交换机的类型
         */
        channel.exchangeDeclare("fanout_exchange", BuiltinExchangeType.FANOUT);

        //绑定
        /**
         * 1.队列
         * 2.交换机
         * 3.routingkey
         */
        channel.queueBind("fanout_queue_1", "fanout_exchange", "");
        channel.queueBind("fanout_queue_2", "fanout_exchange", "");

        for (int i = 0; i < 10; i++) {
            //创建消息
            String message = "这是广播模式的第" + i + "条消息!";
            //消息发送
            /**
             * 1.交换机
             * 2.routingkey是什么:简单模式下和队列的名字保持一致
             * 3.消息的附加属性是什么
             * 4.消息的内容是什么
             */
            if(i % 3  == 0){
                channel.basicPublish("fanout_exchange","", null, message.getBytes());
            }else{
                channel.basicPublish("fanout_exchange","", null, message.getBytes());
            }

            //关闭资源
        }
        channel.close();
        connection.close();
    }
}

消费者

package com.mq.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Author: 广播模式消费者1
 * @Date: 2024/01/29/16:03
 * @Description: good good study,day day up
 */
public class FanoutConsumer1 {


    /**
     * 广播模式消息消费者接受消息
     * @param args
     */
    public static void main(String[] args) throws Exception{
        //创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置RabbitMQ服务主机地址,默认localhost
        connectionFactory.setHost("192.168.3.123");
        //设置RabbitMQ服务端口,默认5672
        connectionFactory.setPort(5672);
        //设置虚拟主机名字,默认/
        connectionFactory.setVirtualHost("/mqname1");
        //设置用户连接名,默认guest
        connectionFactory.setUsername("admin");
        //设置连接密码,默认guest
        connectionFactory.setPassword("admin");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明队列
        /**
         * 1.队列的名字
         * 2.持久化
         * 3.是否独占队列
         * 4.在本次连接释放以后,是否删除队列---临时表
         * 5.队列的附加属性
         */
        channel.queueDeclare("fanout_queue_1", true, false, false, null);
        //创建消费者,并设置消息处理:自定义的操作
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 真实自定义处理消息的逻辑
             * @param consumerTag:消息的标签
             * @param envelope:消息的属性:消息属于哪个交换机发来的, 消息数据哪个队列=消息routingkey是什么,消息的编号
             * @param properties
             * @param body:消息的内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body, "UTF-8");
                System.out.println("收到的消息的内容为:" + s);
                long deliveryTag = envelope.getDeliveryTag();//消息的编号
                String exchange = envelope.getExchange();//交换机的信息
                String routingKey = envelope.getRoutingKey();//routingKey的信息
                System.out.println("收到的消息的编号为:" + deliveryTag);
                System.out.println("收到的消息的所属的为:" + exchange);
                System.out.println("收到的消息所属的队列为:" + routingKey);

                //保存消息到数据库
            }
        };

        //消息监听
        /**
         * 1.监听队列的名字
         * 2.是否自动确认消息
         */
        channel.basicConsume("fanout_queue_1", true, defaultConsumer);
        //关闭资源(不建议关闭,建议一直监听消息)
    }
}

package com.mq.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Author: 广播模式消费者2
 * @Date: 2024/01/29/16:03
 * @Description: good good study,day day up
 */
public class FanoutConsumer2 {


    /**
     * 广播模式消息消费者接受消息
     * @param args
     */
    public static void main(String[] args) throws Exception{
        //创建连接工厂对象
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //设置RabbitMQ服务主机地址,默认localhost
        connectionFactory.setHost("192.168.3.123");
        //设置RabbitMQ服务端口,默认5672
        connectionFactory.setPort(5672);
        //设置虚拟主机名字,默认/
        connectionFactory.setVirtualHost("/mqname1");
        //设置用户连接名,默认guest
        connectionFactory.setUsername("admin");
        //设置连接密码,默认guest
        connectionFactory.setPassword("admin");
        //创建连接
        Connection connection = connectionFactory.newConnection();
        //创建频道
        Channel channel = connection.createChannel();
        //声明队列
        /**
         * 1.队列的名字
         * 2.持久化
         * 3.是否独占队列
         * 4.在本次连接释放以后,是否删除队列---临时表
         * 5.队列的附加属性
         */
        channel.queueDeclare("fanout_queue_2", true, false, false, null);
        //创建消费者,并设置消息处理:自定义的操作
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            /**
             * 真实自定义处理消息的逻辑
             * @param consumerTag:消息的标签
             * @param envelope:消息的属性:消息属于哪个交换机发来的, 消息数据哪个队列=消息routingkey是什么,消息的编号
             * @param properties
             * @param body:消息的内容
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String s = new String(body, "UTF-8");
                System.out.println("收到的消息的内容为:" + s);
                long deliveryTag = envelope.getDeliveryTag();//消息的编号
                String exchange = envelope.getExchange();//交换机的信息
                String routingKey = envelope.getRoutingKey();//routingKey的信息
                System.out.println("收到的消息的编号为:" + deliveryTag);
                System.out.println("收到的消息的所属的为:" + exchange);
                System.out.println("收到的消息所属的队列为:" + routingKey);

                //保存消息到数据库
            }
        };

        //消息监听
        /**
         * 1.监听队列的名字
         * 2.是否自动确认消息
         */
        channel.basicConsume("fanout_queue_2", true, defaultConsumer);
        //关闭资源(不建议关闭,建议一直监听消息)
    }
}

广播模式队列
在这里插入图片描述
广播模式交换机

在这里插入图片描述

7.springboot整合RabbitMQ

       <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
            <version>2.3.4.RELEASE</version>
        </dependency>
server:
  port: 19012
spring:
  rabbitmq:
    host: 192.168.3.123
    port: 5672
    virtual-host: /mqname1
    username: admin
    password: admin

配置类

package com.mq.config;

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author: 配置类
 * @Date: 2024/01/29/16:37
 * @Description: good good study,day day up
 */
@Configuration
public class RabbitMQConfig {
    //创建队列
    @Bean("myQueue")
    public Queue myQueue(){
        return QueueBuilder.durable("springboot_queue").build();
    }

    //创建交换机
    @Bean("myExchange")
    public Exchange myExchange(){
        return ExchangeBuilder.topicExchange("springboot_exchange").build();
    }

    //创建绑定
    @Bean
    public Binding myBinding(@Qualifier("myQueue") Queue myQueue,
                             @Qualifier("myExchange") Exchange myExchange){
        return BindingBuilder.bind(myQueue).to(myExchange).with("user.#").noargs();
    }
}

发送消息测试

package com.mq.controller;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @Author: 测试类
 * @Date: 2024/01/29/13:36
 * @Description: good good study,day day up
 */
@RestController
@RequestMapping("/test")
public class TestController {

    @Autowired
    private RabbitTemplate rabbitTemplate;


    @RequestMapping("/one")
    public String one(){
        rabbitTemplate.convertAndSend("springboot_exchange","user.insert","1新增类型的消息");
        rabbitTemplate.convertAndSend("springboot_exchange","user.update","2修改类型的消息");
        rabbitTemplate.convertAndSend("springboot_exchange","user.delete","3删除类型的消息");
        return "发送成功";
    }

}

新建一个监听服务

package com.listener;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
 * @Author: 监听类mq
 * @Date: 2024/01/29/17:19
 * @Description: good good study,day day up
 */
@Component
public class MessageListener {
    /**
     * 监听某个队列的消息
     * @param message 接收到的消息
     */
    @RabbitListener(queues = "springboot_queue")
    public void myListener1(String message){
        System.out.println("消费者接收到的消息为:" + message);
    }
}

在这里插入图片描述

在这里插入图片描述

相关推荐

  1. linux中基于docker安装RabbitMQ

    2024-01-30 06:58:01       30 阅读
  2. docker 安装rabbitmq

    2024-01-30 06:58:01       44 阅读
  3. docker安装rabbitmq

    2024-01-30 06:58:01       53 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-01-30 06:58:01       98 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-01-30 06:58:01       106 阅读
  3. 在Django里面运行非项目文件

    2024-01-30 06:58:01       87 阅读
  4. Python语言-面向对象

    2024-01-30 06:58:01       96 阅读

热门阅读

  1. LeetCode1504. Count Submatrices With All Ones

    2024-01-30 06:58:01       52 阅读
  2. 基于机器学习的无损缺陷检测技术研究进展

    2024-01-30 06:58:01       68 阅读
  3. 机器学习复习(1)——任务整理流程

    2024-01-30 06:58:01       60 阅读
  4. 怎么创建docker镜像

    2024-01-30 06:58:01       60 阅读
  5. 升级anaconda中python到3.10版本

    2024-01-30 06:58:01       47 阅读
  6. 中间件

    2024-01-30 06:58:01       46 阅读