Springboot集成Rabbitmq

Springboot集成Rabbitmq实现请求异步处理

一、Docker部署rabbitmq

1. docker pull rabbitmq:3.7.8

rabbitmq为需要拉取的镜像名称,3.7.8为版本号

2. docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:3.7.8

运行拉取到的镜像文件;-d表示后台运行镜像;-p指将容器的端口映射到主机中;–name为设置容器的名称

3. docker ps -a

查看镜像是否运行成功

4. docker exec -it rabbitmq rabbitmq-plugins enable rabbitmq_management

启动rabbitmq_management,可以在浏览器访问web页面,默认账号密码为guest/guest

5. docker exec -it rabbitmq rabbitmqctl add_user admin xxx

docker exec -it rabbitmq rabbitmqctl set_user_tags admin administrator

为rabbitmq创建用户并设置权限,admin为用户名,xxx为密码

二、Springboot集成rabbitmq

1.maven依赖添加
<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.application.properties配置
#rabbitmq
#常规配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=433430
#特定需求配置
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
spring.rabbitmq.template.mandatory=true
3.rabbitmq配置文件类
@Configuration
public class RabbitConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMandatory(true);
      //设置rabbitTemplate确认机制
        rabbitTemplate.setConfirmCallback(new RabbitConfirm());
        rabbitTemplate.setReturnCallback(new RabbitReturn());
        return rabbitTemplate;
    }
}
4.生产者发送数据
@RestController
public class SendMessageController {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @GetMapping("/send")
    public String sendDirectMessage(){
        String messageId = String.valueOf(UUID.randomUUID());
        String messageData = "test message, hello!";
        String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        Map<String,Object> map=new HashMap<>();
        map.put("messageId",messageId);
        map.put("messageData",messageData);
        map.put("createTime",createTime);
        //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange
        rabbitTemplate.convertAndSend("test", "a", map);
        return "ok";
    }
}
4.消费者消费数据
@Component
@RabbitListener(queues = "test-que")    //配置需要监听的队列名
public class DirectReceiver {
    @RabbitHandler
    public void process(Map test) throws InterruptedException {
        System.out.println("DirectReceiver消费者收到消息  : " + test.toString());
    }
}
5.ConfirmCallback确认机制配置(确认消息是否发送到交换机上)

在application.properties中配置:spring.rabbitmq.publisher-confirm-type=correlated

publisher-confirm-type 新版发布确认属性有三种确认类型:

  1. NONE值是禁用发布确认模式,是默认值
  2. CORRELATED值是发布消息成功到交换器后会触发回调方法,如1示例
  3. SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;
public class RabbitConfirm implements RabbitTemplate.ConfirmCallback {
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (b){
            System.out.println("接收到了来自生产者的消息!");
        }else {
            System.out.println("未接收到");
        }
      //可自行定义业务逻辑
    }
}
6.ReturnCallback确认机制配置(未投递到队列上的数据将退回)

在application.properties中配置:

spring.rabbitmq.publisher-returns=true

spring.rabbitmq.template.mandatory=true(将 mandatory设置为true,如果消息不可路由那么rabbitmq会把完整的消息退回到发布者中)

public class RabbitReturn implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int i, String s, String s1, String s2) {
        System.out.println("消息主体:"+message.toString());
        System.out.println("回复编码:"+i);
        System.out.println("回复内容:"+s);
        System.out.println("交换器:"+s1);
        System.out.println("路由键:"+s2);
      //可自行定义业务逻辑
    }
}

7.发送自定义对象(两种方法)

rabbitTemplate源码默认采用SimpleMessageConverter来序列化消息的,只接受byte数组,string字符串,可序列化对象,如果传入费序列化的对象,会报错:java.lang.IllegalArgumentException: SimpleMessageConverter only supports String, byte[] and Serializable payloads, received: com.rabbitmq.producer.model.xxx

(1).自定义MessageConverter
public static class SelfConverter extends AbstractMessageConverter {
    @Override
    protected Message createMessage(Object object, MessageProperties messageProperties) {
        messageProperties.setContentType("application/json");
        return new Message(JSON.toJSONBytes(object), messageProperties);
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        return JSON.parse(message.getBody());
    }
}

然后在rabbitmq的配置类代码中添加

rabbitTemplate.setMessageConverter(new SelfConverter());
(2).Jackson2JsonMessageConverter

直接在rabbitmq的配置类代码中添加

rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
8.设置消费者应答机制
//在rabbit配置文件类中添加如下代码
@Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        //设置手动ack模式 要和yml配置保持一致,不然会覆盖yml文件的配置
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
//消费者消费数据
@Component
public class DirectReceiver {
    @RabbitListener(queues = "test-que")
    @RabbitHandler
    public void process(String test, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("DirectReceiver消费者收到消息  : " + test.toString());
        channel.basicAck(deliveryTag,false);
        System.out.println("消息回退");
    }
}

//应答机制共有三种:
/**
首先需要在SimpleRabbitListenerContainerFactory中配置 “factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);”
AcknowledgeMode共有三种:
1.NONE:默认情况下消息消费者是NONE模式,默认所有消息消费成功,会不断的向消费者推送消息。因为rabbitMq认为所有消息都被消费成功,所以队列中不在存有消息,消息存在丢失的危险
2.AUTO(自动确认):在自动确认模式下,消息发送后即被认为成功投递,不管消费者端是否成功处理本次投递
3.MANUAL(手动确认):消费者收到消息后,手动调用basic.ack/basic.nack/basic.reject后,RabbitMQ收到这些消息后,才认为本次投递成功
	(1)basicAck(long deliveryTag, boolean multiple):表示成功确认,使用此回执方法后,消息会被rabbitmq broker删除。
		deliveryTag:消息投递序号。
		multiple:是否批量确认,true->将一次性拒绝所有小于deliveryTag的消息。
	(2)basicNack(long deliveryTag, boolean multiple, boolean requeue):表示失败确认,一般在消费消息异常时用到此方法,可以将消息重新投递入队列。
		deliveryTag:表示消息投递序号。
		multiple:是否批量;true->将一次性拒绝所有小于deliveryTag的消息。
		requeue: 表示消息是否重新入队列,true表示重新投入队列中。
	(3)basicReject(long deliveryTag, boolean requeue):拒绝消息,与basicNack区别在于不能进行批量操作,其他用法很相似。
		deliveryTag:消息投递序号。
		requeue:值为true表示消息重新入队列。
**/

相关推荐

  1. Springboot集成Rabbitmq

    2024-03-27 21:18:05       17 阅读
  2. SpringBoot集成RabbitMQ

    2024-03-27 21:18:05       18 阅读
  3. SpringBoot集成rabbitMq

    2024-03-27 21:18:05       17 阅读
  4. Springboot 集成Rabbitmq之延时队列

    2024-03-27 21:18:05       15 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-03-27 21:18:05       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-03-27 21:18:05       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-03-27 21:18:05       19 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-03-27 21:18:05       20 阅读

热门阅读

  1. 系统架构师需要掌握的知识体系

    2024-03-27 21:18:05       19 阅读
  2. 分享一些大数据处理算法

    2024-03-27 21:18:05       17 阅读
  3. Dubbo源码解析-Provider服务暴露Export源码解析

    2024-03-27 21:18:05       19 阅读
  4. Go打造REST Server【一】:用标准库来实现

    2024-03-27 21:18:05       19 阅读
  5. 服务器有哪些作用?

    2024-03-27 21:18:05       17 阅读
  6. 如何培养科学思维

    2024-03-27 21:18:05       20 阅读
  7. docker启动rocketmq简洁教程

    2024-03-27 21:18:05       16 阅读
  8. ASR工业化语音模型总结

    2024-03-27 21:18:05       18 阅读
  9. 一些关于网络的笔记

    2024-03-27 21:18:05       17 阅读
  10. C#实现简单同步Echo服务端和客户端

    2024-03-27 21:18:05       19 阅读
  11. day5-QT

    day5-QT

    2024-03-27 21:18:05      16 阅读
  12. reactive和ref的异同、toRef和toRefs的使用

    2024-03-27 21:18:05       19 阅读