SpringBoot集成RabbitMQ

RabbitMQ详细说及作用

RabbitMq的简介


      RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的 Erlang 写成。RabbitMQ是一个消息中间件,它接收并转发消息,但不处理消息。

工作原理图

Broker:接收和分发消息的应用,指RabbitMQ Server
Connection:生产者producer / 消费者consumer和 broker之间的TCP连接
Channel:在connection 内部建立的逻辑连接,channel 之间是完全隔离的
Exchange: message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue中,消费者再去消费。
类型有:直连,主题,扇形
Routing Key:生产者将消息发送到交换机时会携带一个key,来指定路由规则
Binding Key:在绑定Exchange和Queue时,会指定一个BindingKey,生产者发送消息携带的RoutingKey会和bindingKey对比,若一致就将消息分发至这个队列
vHost 虚拟主机:每一个RabbitMQ服务器可以开设多个虚拟主机每一个vhost本质上是一个mini版的RabbitMQ服务器,拥有自己的 “交换机exchange、绑定Binding、队列Queue”,更重要的是每一个vhost拥有独立的权限机制,这样就能安全地使用一个RabbitMQ服务器来服务多个应用程序,其中每个vhost服务一个应用程序。


应运场景

        工作队列 默认是Round-robin dispatching 即循环调度    即:轮流所有的消费者  发送消息。当消费者稳定运行时,他们接受的消息数量是一样的。体现在工作任务的派发时,他是"公平派遣"。 工作任务的处理因工作者的能力不同 处理的速度和效率也不同。 假设业务场景:一个项目中有十个子任务【十个子任务的工作量一样时】,我们有两个工作人员(即消费者),我们把任务通过 循环调度派发 给 C1 和 C2 , 当10个任务都完成时 我们的项目才算完成。 若C1 工作效率高 ,C1 完成5个任务只需10个小时,而C2 需要 20 个小时,这时项目用时就等待多了10个小时。这时我们为了加快项目的完成就可以叫C1或者其他空闲的人 多做几个任务。 我们在消费者代码中用 channel.basicQos(1)  保证在接收端一个消息没有处理完时不会接收另一个消息,即接收端发送了ack后才会接收下一个消息。在这种情况下发送端会尝试把消息发送给下一个not busy的接收端。这种模式 即工作竞争模式或者又称消费者竞争模式。即谁先拿到消息,谁来负责 消费。常见的业务场景有抢红包,秒杀。

        注意:我们在用这种模式做工作任务的派发时,应尽可能的确保消息能被消费。假设你只有几个工人在做N多个任务 由于工作任务的复杂性,所有的工作任务处理不过来,这时队列就可能会填满,导致RabbitMQ 崩溃。

工作模式

1.simple (简单模式): 一个消费者消费一个生产者生产的信息
2.**Work queues(工作模式):**一个生产者生产信息,多个消费者进行消费,但是一条消息只能消费一次
3.Publish/Subscribe(发布订阅模式): 生产者首先投递消息到交换机,订阅了这个交换机的队列就会收到生产者投递的消息
4.**Routing(路由模式):**生产者生产消息投递到direct交换机中,扇出交换机会根据消息携带的routing Key匹配相应的队列
5.Topics(主题模式): 生产者生产消息投递到topic交换机中,上面是完全匹配路由键,而主题模式是模糊匹配,只要有合适规则的路由就会投递给消费者

安装RabbitMQ(Windows环境下)及访问

安装erlang

otp_win64_25.3.2.exe 下载地址:https://objects.githubusercontent.com/github-production-release-asset-2e65be/374927/f73efcc5-1734-4541-a266-cad9da949408?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240418%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240418T075304Z&X-Amz-Expires=300&X-Amz-Signature=56d8f4bab4726b5b596483d4be03b8ab0d5fe5395e9e15d137838b36da7efc3d&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=374927&response-content-disposition=attachment%3B%20filename%3Dotp_win64_25.3.2.exe&response-content-type=application%2Foctet-stream

    安装简单,一直下一步安装完成,记录安装路径,不能有中文,后面配置环境变量需要用

配置环境变量


      新建系统变量名为:ERLANG_HOME 变量值为erlang安装地址
双击系统变量path,点击“新建”,将%ERLANG_HOME%\bin加入到path中。
win+R键,输入cmd,命令行输入erl,显示版本号就说明erlang安装成功了。

安装RabbitMQ

下载 :rabbitmq-server-3.12.11.exe 

https://objects.githubusercontent.com/github-production-release-asset-2e65be/924551/9a772a35-1ec2-438b-9139-931458f06e26?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240418%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240418T083928Z&X-Amz-Expires=300&X-Amz-Signature=b65b384558739d82e01e63024f1b65469c5f0b19950eed852d6f761a77ff5b99&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=924551&response-content-disposition=attachment%3B%20filename%3Drabbitmq-server-3.12.11.exe&response-content-type=application%2Foctet-stream
 双击下载后的.exe文件,记住安装路径
按win+r,输入cmd进入命令窗口,然后cd到Mq的sbin路径下,输入如下命令进行安装

rabbitmq-plugins enable rabbitmq_management
// 验证是否安装成功
rabbitmqctl status

浏览器访问


浏览器输入http://localhost:15672 ,即可看到RabbitMQ的登录管理界面
用户名和密码默认是:guest

安装成功后浏览界面如下:

安装的问题

   1、有可能找不到erlang环境路径,需要大家在环境变量中进行相应设置即可;

   

SpringBoot集成RabbitMQ代码部分

添加依赖

pom.xml 文件中添加

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

添加配置

application.properties 文件中添加

spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

代码分三块编写:

1.配置类:创建我们的直接交换机和队列,以及直接交换机跟队列的绑定关系
2.消息生产者:生产消息
3.消息消费者:消费消息

直接交换机:

     一个交换机可以绑定一个队列一个消费者,也可以绑定多个队列多个消费者

配置类:

import org.springframework.amqp.core.*;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @Author yb
 * @Date 2024/4/19 14:19
 * @Version 1.0
 */
@Configuration
public class RabbitConfig {

    //定制JSON格式的消息转化器
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    // 1.创建一个交换机
    @Bean
    public Exchange fanoutExchange(){
        return ExchangeBuilder.fanoutExchange("myExchange").build();
    }

    //2.定义消息队列
    @Bean
    public Queue fanoutQueue(){
        return new Queue("fanoutQueue");
    }

    //3.将创建的队列绑定到对应的交换机上
    @Bean
    public Binding bingingQueue(){
        return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange()).with("").noargs();
    }


}

消息生产者:

    @Autowired
    RabbitTemplate rabbitTemplate;

    @PostMapping("/sendMessage")
    @ResponseBody
    public ResultEntity sendMessage(@RequestBody Map params) {
        try {
            String id = UUID.randomUUID().toString();
            String createTime = DateUtils.dateTimeNow();
            params.put("messageId", id);
            params.put("createTime", createTime);
            /**
             * 发给交换机,在发给路由绑定的队列
             */

            rabbitTemplate.convertAndSend("myExchange", "testRoute", params);
            return ResultEntity.success("成功");
        }catch (Exception ex){
           return ResultEntity.fail("失败!!");
        }
    }

     没消费前我们还可以查看一下数据是否生成成功:

消息消费者:多个消费者可以写在同一个类中,分别监听不同的队列即可

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
 * @Author yb
 * @Date 2024/4/19 14:45
 * @Version 1.0
 */
@Component
@RabbitListener(queues = "fanoutQueue")
public class RabbitConsumer {
    @RabbitHandler
    public void process(Map message){
        System.out.println("消费者接收到消息:" + message.toString());
    }
}

其它主题交换机: 还有模糊匹配队列等

	*:星号表示任意一个字符
	#:表示任意一个或者多个字符

       这里就不一一列出代码了, 以上就是使用Spring Boot集成RabbitMQ的基本步骤,可以根据实际需求进行更进一步的配置和扩展。

相关推荐

  1. Springboot集成Rabbitmq

    2024-04-23 00:14:03       16 阅读
  2. SpringBoot集成RabbitMQ

    2024-04-23 00:14:03       18 阅读
  3. SpringBoot集成rabbitMq

    2024-04-23 00:14:03       17 阅读
  4. Springboot 集成Rabbitmq之延时队列

    2024-04-23 00:14:03       15 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-04-23 00:14:03       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-04-23 00:14:03       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-04-23 00:14:03       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-04-23 00:14:03       20 阅读

热门阅读

  1. 在k8s集群中部署EdgeMesh

    2024-04-23 00:14:03       16 阅读
  2. 【redis】String类型常用命令

    2024-04-23 00:14:03       14 阅读
  3. 【前端】数组的常用处理函数

    2024-04-23 00:14:03       14 阅读
  4. JG/T 396-2012 外墙用非承重纤维增强水泥板检测

    2024-04-23 00:14:03       16 阅读
  5. ipad 连接WiFi无网络

    2024-04-23 00:14:03       16 阅读
  6. Vue 组件通信的几种方式

    2024-04-23 00:14:03       13 阅读
  7. C++:异常处理

    2024-04-23 00:14:03       14 阅读
  8. 计算机网络——应用层(3)电子邮件

    2024-04-23 00:14:03       15 阅读
  9. .net core8 自定义一个中间件

    2024-04-23 00:14:03       15 阅读
  10. node.js 什么是模板引擎?(具体介绍underscore)

    2024-04-23 00:14:03       15 阅读