Springboot整合阿里云ONS RocketMq(4.0 http)

1. 引入依赖

<!--阿里云ons,方便的接入到云服务-->
<dependency>
  <groupId>com.aliyun.openservices</groupId>
  <artifactId>ons-client</artifactId>
  <version>1.8.4.Final</version>
</dependency>

2. 配置

配置注意事项:

  1. nameSrvAddr我这里是用的4.0版本的支持http,5.0不支持http
    image.png
  2. 一个 Group ID 代表一个 Consumer 实例群组。同一个消费者 Group ID 下所有的 Consumer 实例必须保证订阅的 Topic 一致,并且也必须保证订阅 Topic 时设置的过滤规则(Tag)一致。否则您的消息可能会丢失。
  3. 订阅关系参考官方文档: 订阅关系一致
  4. 此处我配置了多个GroupId,Tag,Topic(order,market,vehicle)如果不需要配置一个即可,对应基本配置类需要增减对应属性
aliyun:
  rocketmq:
    accessKey: LTAI5txxxxxxx
    secretKey: Afq06tBxrdBxxxxxxxx
    nameSrvAddr: http://MQ_INST_xxxxxxxxxx_BYkZuJCq.cn-beijing.mq.aliyuncs.com:80
    orderGroupId: GID_xxxxxx_test
    orderTag: 'order'
    orderTopic: vehicle-order-test
    marketGroupId: GID_xxxxxx2_test
    marketTag: 'market'
    marketTopic: vehicle-market-test
    vehicleGroupId: GID_xxxxxx3_test
    vehicleTag: 'vehicle'
    vehicleTopic: vehicle-order-test

3. 配置类

3.1 基本配置类

package com.vehicle.manager.core.config;

import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

/**
 * Rocket MQ 配置类
 * @author zr 2024/3/1
 */
@Configuration
@ConfigurationProperties(prefix = "aliyun.rocketmq")
@Data
public class RocketMqConfig {

    private String accessKey;
    private String secretKey;
    private String nameSrvAddr;
    private String marketGroupId;
    private String marketTopic;
    private String marketTag;
    private String orderTopic;
    private String orderGroupId;
    private String orderTag;
    private String vehicleTopic;
    private String vehicleGroupId;
    private String vehicleTag;

    public Properties getMqPropertie() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        return properties;
    }
}

3.2 生产者配置

package com.vehicle.manager.core.config;

import com.aliyun.openservices.ons.api.bean.ProducerBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author zr 2024/3/1
 */
@Configuration
public class ProducerConfig {
    @Autowired
    private RocketMqConfig mqConfig;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean buildProducer() {
        ProducerBean producer = new ProducerBean();
        producer.setProperties(mqConfig.getMqPropertie());
        return producer;
    }
}

3.3 消费者配置

package com.vehicle.manager.core.config;

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.vehicle.manager.core.listener.VehicleListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * RocketMq消费者
 * @author zr 2024/3/1
 */
@Configuration
public class VehicleConsumerConfig {
    @Autowired
    private RocketMqConfig mqConfig;

    @Autowired
    private VehicleListener vehicleListener;


    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildVehicleBuyerConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        //配置文件
        Properties properties = mqConfig.getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getVehicleGroupId());
        //将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        consumerBean.setProperties(properties);
        //订阅关系
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqConfig.getVehicleTopic());
        subscription.setExpression(mqConfig.getVehicleTag());
        subscriptionTable.put(subscription, vehicleListener);
        //订阅多个topic如上面设置

        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }
}

4. 生产者工具类

  • MessageRecord为记录消息发送的对象,可以自行根据字段进行设计调整
  • 参数说明:
    • topic – 消息主题, 最长不超过255个字符; 由a-z, A-Z, 0-9, 以及中划线"-“和下划线”_"构成.
    • tag – 消息标签, 请使用合法标识符, 尽量简短且见名知意
    • key – 业务主键
    • body – 消息体, 消息体长度默认不超过4M, 具体请参阅集群部署文档描述.
package com.vehicle.manager.core.util;

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.vehicle.manager.core.config.RocketMqConfig;
import com.vehicle.manager.core.mapper.MessageRecordMapper;
import com.vehicle.manager.core.model.entity.MessageRecord;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.time.LocalDateTime;

/**
 * RocketMessageProducer rocketMQ消息生产者
 * @author zr 2024/3/1
 */
@Component
@Slf4j
public class RocketMessageProducer {
    private static ProducerBean producer;
    private static RocketMqConfig mqConfig;

    private  static MessageRecordMapper messageRecordMapper;

    @Autowired
    private  MessageRecordMapper messageRecordMapperInstance;


    @PostConstruct
    public void init() {
        RocketMessageProducer.messageRecordMapper = messageRecordMapperInstance;
    }

    public RocketMessageProducer(ProducerBean producer, RocketMqConfig mqConfig) {
        this.producer = producer;
        this.mqConfig = mqConfig;
    }

    /**
     * 生产车辆服务普通消息
     * @param tag
     * @param key
     * @param body
     */
    public  static void producerVehicleMsg(String tag, String key, String body) {
        Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());
        long time = System.currentTimeMillis();
        try {
            SendResult sendResult = producer.send(msg);
            assert sendResult != null;
            log.info(time
                    + " Send mq message success.Topic is:" + msg.getTopic()
                    + " Tag is:" + msg.getTag() + " Key is:" + msg.getKey()+" body is:"+new String(msg.getBody())
                    + " msgId is:" + sendResult.getMessageId());

            MessageRecord messageRecord = new MessageRecord();
            messageRecord.setPlatformType("mq");
            messageRecord.setMessageType("order");
            messageRecord.setMqMessageTopic(msg.getTopic());
            messageRecord.setMqMessageTag(msg.getTag());
            messageRecord.setMqMessageKey(msg.getKey());
            messageRecord.setMqMessageId(sendResult.getMessageId());
            messageRecord.setCreatedTime(LocalDateTime.now());
            messageRecord.setMessageContent(new String(msg.getBody()));
            messageRecordMapper.insert(messageRecord);
        } catch (ONSClientException e) {
            e.printStackTrace();
            log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());
        }
    }

    /**
     * 生产车辆服务延时普通消息
     * @param tag  order:订单服务   vehicle:主要用于本服务的超时回应
     * @param key
     * @param body
     * @param delay 延迟秒
     */
    public  static void producerVehicleDelayMsg(String tag, String key, String body,Integer delay) {
        Message msg = new Message(mqConfig.getVehicleTopic(), tag, key, body.getBytes());
        long time = System.currentTimeMillis();
        msg.setStartDeliverTime(time+ delay*1000);
        try {
            SendResult sendResult = producer.send(msg);
            assert sendResult != null;
            log.info(time
                    + " 发送消息成功.Topic is:" + msg.getTopic()
                    + " Tag 为:" + msg.getTag() + " Key 为:" + msg.getKey()+" body 为:"+new String(msg.getBody())
                    + " msgId 为:" + sendResult.getMessageId());
        } catch (ONSClientException e) {
            e.printStackTrace();
            log.error(time + " Send mq message failed. Topic is:" + msg.getTopic());
        }
    }
}

5. 消费者监听

package com.vehicle.manager.core.listener;

import com.alibaba.fastjson.JSON;
import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.vehicle.manager.core.model.dto.req.VehicleMQMessageDTO;
import com.vehicle.manager.core.service.HlCarService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


/**
 * @author zr 2024/3/1
 */
@Component
@Slf4j
public class VehicleListener implements MessageListener {
    @Autowired
    private HlCarService hlCarService;


    @Override
    public Action consume(Message message, ConsumeContext context) {

        log.info("VehicleReceive 消息: " + message);

        try {
            byte[] body = message.getBody();
            String s = new String(body);
            log.info(s);
            // VehicleMQMessageDTO需要自行根据业务封装
            VehicleMQMessageDTO vehicleMQMessageDTO = JSON.parseObject(s, VehicleMQMessageDTO.class);
            log.info(vehicleMQMessageDTO.toString());

            // 以下做你的业务处理
            // .........
            
            return Action.CommitMessage;//进行消息的确认
        } catch (Exception e) {
            log.info(e.getMessage());
            //消费失败
            return Action.ReconsumeLater;
        }
    }
}

6. 测试

6.1 发送消息

package com.vehicle.manager.core;

import com.alibaba.fastjson.JSON;
import com.vehicle.manager.api.StartApplication;
import com.vehicle.manager.core.util.RocketMessageProducer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;


/**
 * @author zr 2024/3/1
 */
@RunWith(SpringRunner.class)
@SpringBootTest(classes = StartApplication.class)
public class MqTest {
    
    @Test
    public void producerMsg() {
        RocketMessageProducer.producerVehicleMsg("vehicle","test", JSON.toJSONString(new String("testBody")));
    }
}

6.2 接收消息

image.png

7. 延时消息

如果需要使用延时消息可以参考RocketMessageProducer中有一个延时消息的方法producerVehicleDelayMsg

相关推荐

  1. SpringBoot】文件上传到阿里

    2024-06-17 20:56:03       36 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-17 20:56:03       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-17 20:56:03       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-17 20:56:03       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-17 20:56:03       18 阅读

热门阅读

  1. 代码随想录打卡第十二天补

    2024-06-17 20:56:03       5 阅读
  2. cocosCreator获取手机剪切板内容

    2024-06-17 20:56:03       7 阅读
  3. python 多线程

    2024-06-17 20:56:03       9 阅读
  4. C++之std::lock_guard和std::unique_lock

    2024-06-17 20:56:03       8 阅读
  5. 异常类型处理 3.0

    2024-06-17 20:56:03       7 阅读
  6. Docker学习

    2024-06-17 20:56:03       6 阅读