SpringBoot整合阿里云RocketMQ对接,商业版

1.需要阿里云开通商业版RocketMQ

普通消息新建普通主题,普通组,延迟消息新建延迟消息主题,延迟消息组

2.结构目录

在这里插入图片描述

3.引入依赖

<!--阿里云RocketMq整合-->
        <dependency>
            <groupId>com.aliyun.openservices</groupId>
            <artifactId>ons-client</artifactId>
            <version>1.8.8.5.Final</version>
        </dependency>

4.延迟消息配置

import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import com.aliyun.openservices.ons.api.bean.BatchConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * 延迟消息配置类
 */
@Configuration
public class BatchConsumerClient {

    @Autowired
    private MqConfig mqConfig;

    @Autowired
    private BatchDemoMessageListener messageListener;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public BatchConsumerBean buildBatchConsumer() {
        BatchConsumerBean batchConsumerBean = new BatchConsumerBean();
        //配置文件
        Properties properties = mqConfig.getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getDelayGroupId());
        //将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        batchConsumerBean.setProperties(properties);
        //订阅关系
        Map<Subscription, BatchMessageListener> subscriptionTable = new HashMap<Subscription, BatchMessageListener>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqConfig.getDelayTopic());
        subscription.setExpression(mqConfig.getDelayTag());
        subscriptionTable.put(subscription, messageListener);
        //订阅多个topic如上面设置
        batchConsumerBean.setSubscriptionTable(subscriptionTable);
        return batchConsumerBean;
    }

}

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.batch.BatchMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

/**
 * 延迟消息消费者
 */
@Slf4j
@Component
public class BatchDemoMessageListener implements BatchMessageListener {

    @Override
    public Action consume(final List<Message> messages, final ConsumeContext context) {
        log.info("消费者收到消息大小:"+messages.size());
        for (Message message : messages) {
            byte[] body = message.getBody();
            String s = new String(body);
            Date date = new Date();
            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            String formatTime = sdf.format(date);
            System.out.println("接收到消息时间:"+formatTime);
            log.info("接收到消息内容:"+s);
        }
        try {
            //do something..
            return Action.CommitMessage;
        } catch (Exception e) {
            //消费失败
            return Action.ReconsumeLater;
        }
    }
}

5.MQ配置类


import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;

import java.util.Properties;

@Data
@Configuration
@ConfigurationProperties(prefix = "rocketmq")
public class MqConfig {

    private String accessKey;
    private String secretKey;
    private String nameSrvAddr;
    private String topic;
    private String groupId;
    private String tag;
    private String orderTopic;
    private String orderGroupId;
    private String orderTag;
    private String delayTopic;
    private String delayGroupId;
    private String delayTag;

    public Properties getMqPropertie() {
        Properties properties = new Properties();
        properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);
        properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);
        properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);
        return properties;
    }

}

6.YML配置

## 阿里云RocketMQ配置
rocketmq:
  accessKey: laskdfjlaksdjflaksjdflaksdjflakdjf
  secretKey: asdfasdlfkasjdlfkasjdlfkajsdlkfjkalksdfj
  nameSrvAddr: rmq..rmq.acs.com:8080
  topic: topic_lsdjf_test
  groupId: Glskdfjalsdkfjalksdjflaksdfj_push
  tag: "*"
  orderTopic: XXX
  orderGroupId: XXX
  orderTag: "*"
  delayTopic: topic_alskdjfalksdjflksdjfkla_delay
  delayGroupId: GIlaskdjflkasdjflkajsdkf_delay
  delayTag: "*"

7.普通消息配置

import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.ConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;


/**
 * 普通消息配置类
 */
@Configuration
public class ConsumerClient {

    @Autowired
    private MqConfig mqConfig;

    @Autowired
    private DemoMessageListener messageListener;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ConsumerBean buildConsumer() {
        ConsumerBean consumerBean = new ConsumerBean();
        //配置文件
        Properties properties = mqConfig.getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());
        //将消费者线程数固定为20个 20为默认值
        properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");
        consumerBean.setProperties(properties);
        //订阅关系
        Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqConfig.getTopic());
        subscription.setExpression(mqConfig.getTag());
        subscriptionTable.put(subscription, messageListener);
        //订阅多个topic如上面设置

        consumerBean.setSubscriptionTable(subscriptionTable);
        return consumerBean;
    }

}


import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * 普通主题消费者
 */
@Component
@Slf4j
public class DemoMessageListener implements MessageListener {

    @Override
    public Action consume(Message message, ConsumeContext context) {

        log.info("接收到消息: " + message);
        try {
            byte[] body = message.getBody();
            String s = new String(body);
            log.info("接收到消息字符串:"+s);
            //Action.CommitMessag 进行消息的确认
            return Action.CommitMessage;
        } catch (Exception e) {
            //消费失败
            return Action.ReconsumeLater;
        }
    }
}

import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 普通消息生产者配置类
 */
@Configuration
public class ProducerClient {

    @Autowired
    private MqConfig mqConfig;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public ProducerBean buildProducer() {
        ProducerBean producer = new ProducerBean();
        producer.setProperties(mqConfig.getMqPropertie());
        return producer;
    }

}

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.SendResult;
import com.aliyun.openservices.ons.api.bean.ProducerBean;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.stereotype.Component;

/**
 * 普通消息生产者
 *
 **/
@Component
public class RocketMessageProducer {
 
    private static ProducerBean producer;
    private static MqConfig mqConfig;
 
    public RocketMessageProducer(ProducerBean producer, MqConfig mqConfig) {
        this.producer = producer;
        this.mqConfig = mqConfig;
    }
 
    /**
     * @Description: <h2>生产 普通 消息</h2>
     * @author: LiRen
     */
    public  static void producerMsg(String tag, String key, String body) {
        Message msg = new Message(mqConfig.getTopic(), tag, key, body.getBytes());
        long time = System.currentTimeMillis();
        try {
            SendResult sendResult = producer.send(msg);
            assert sendResult != null;
            System.out.println(time
                    + " Send mq message success.Topic is:" + msg.getTopic()
                    + " Tag is:" + msg.getTag() + " Key is:" + msg.getKey()
                    + " msgId is:" + sendResult.getMessageId());
        } catch (ONSClientException e) {
            e.printStackTrace();
            System.out.println(time + " Send mq message failed. Topic is:" + msg.getTopic());
        }
    }
 
}
import com.aliyun.openservices.ons.api.*;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;

import java.util.Properties;

/**
 * 普通消息消费者
 */
//效果和 DemoMessageListener 一致
//@Component
public class RocketMQConsumer {
 
    @Autowired
    private MqConfig rocketMQConfig;
 
 
    /**
     * 1、普通订阅
     *
     * @param
     */
    @Bean //不加@Bean Spring启动时没有注册该方法,就无法被调用
    public void normalSubscribe( ) {
 
        Properties properties = rocketMQConfig.getMqPropertie();
 
        properties.put(PropertyKeyConst.GROUP_ID,rocketMQConfig.getGroupId());
 
        Consumer consumer = ONSFactory.createConsumer(properties);
        consumer.subscribe(rocketMQConfig.getTopic(), rocketMQConfig.getTag(), new MessageListener() {
            @Override
            public Action consume(Message message, ConsumeContext context) {
                System.out.println("Receive: " + new String(message.getBody()));
 
                //把消息转化为java对象
                //JSONObject jsonObject=JSONObject.parseObject(jsonString);
                //Book book= jsonObject.toJavaObject(Book.class);

                return Action.CommitMessage;
            }
        });
 
        consumer.start();
    }
}

7.order没用到


import com.aliyun.openservices.ons.api.PropertyKeyConst;
import com.aliyun.openservices.ons.api.bean.OrderConsumerBean;
import com.aliyun.openservices.ons.api.bean.Subscription;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

//项目中加上 @Configuration 注解,这样服务启动时consumer也启动了
public class OrderConsumerClient {

    @Autowired
    private MqConfig mqConfig;

    @Autowired
    private OrderDemoMessageListener messageListener;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public OrderConsumerBean buildOrderConsumer() {
        OrderConsumerBean orderConsumerBean = new OrderConsumerBean();
        //配置文件
        Properties properties = mqConfig.getMqPropertie();
        properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getOrderGroupId());
        orderConsumerBean.setProperties(properties);
        //订阅关系
        Map<Subscription, MessageOrderListener> subscriptionTable = new HashMap<Subscription, MessageOrderListener>();
        Subscription subscription = new Subscription();
        subscription.setTopic(mqConfig.getOrderTopic());
        subscription.setExpression(mqConfig.getOrderTag());
        subscriptionTable.put(subscription, messageListener);
        //订阅多个topic如上面设置

        orderConsumerBean.setSubscriptionTable(subscriptionTable);
        return orderConsumerBean;
    }

}


import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.order.ConsumeOrderContext;
import com.aliyun.openservices.ons.api.order.MessageOrderListener;
import com.aliyun.openservices.ons.api.order.OrderAction;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

@Slf4j
@Component
public class OrderDemoMessageListener implements MessageOrderListener {
    @Override
    public OrderAction consume(final Message message, final ConsumeOrderContext context) {
        log.info("接收到消息: " + message);
        try {
            //do something..
            return OrderAction.Success;
        } catch (Exception e) {
            //消费失败,挂起当前队列
            return OrderAction.Suspend;
        }
    }
}

import com.aliyun.openservices.ons.api.bean.OrderProducerBean;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 生产者配置类
 */
@Configuration
public class OrderProducerClient {

    @Autowired
    private MqConfig mqConfig;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public OrderProducerBean buildOrderProducer() {
        OrderProducerBean orderProducerBean = new OrderProducerBean();
        orderProducerBean.setProperties(mqConfig.getMqPropertie());
        return orderProducerBean;
    }

}

8.事务消息没用到

import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker;
import com.aliyun.openservices.ons.api.transaction.TransactionStatus;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

/**
 * 事务消息
 */
@Slf4j
@Component
public class DemoLocalTransactionChecker implements LocalTransactionChecker {
    @Override
    public TransactionStatus check(Message msg) {
        log.info("开始回查本地事务状态");
        return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的TransactionStatus
    }
}

import com.aliyun.openservices.ons.api.bean.TransactionProducerBean;
import com.atkj.devicewx.config.MqConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * 事务消息配置类
 */
@Configuration
public class TransactionProducerClient {

    @Autowired
    private MqConfig mqConfig;

    @Autowired
    private DemoLocalTransactionChecker localTransactionChecker;

    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public TransactionProducerBean buildTransactionProducer() {
        TransactionProducerBean producer = new TransactionProducerBean();
        producer.setProperties(mqConfig.getMqPropertie());
        producer.setLocalTransactionChecker(localTransactionChecker);
        return producer;
    }

}

9.测试类


import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON;
import com.atkj.devicewx.config.MqConfig;
import com.atkj.devicewx.normal.RocketMessageProducer;
import com.atkj.devicewx.service.TestService;
import com.atkj.devicewx.vo.MetabolicVo;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;

/**
 * @Author: albc
 * @Date: 2024/07/12/10:22
 * @Description: good good study,day day up
 */
@RequestMapping("/api/v1/mq/test")
@RestController
public class TestController {


    @Autowired
    private TestService testService;


    @Autowired
    private MqConfig mqConfig;

    @RequestMapping("/one")
    public String testOne(){
        Integer count = testService.testOne();
        return "发送成功:"+count;
    }

    /**
     * 普通消息测试
     * @return
     */
    @RequestMapping("/useRocketMQ")
    public String useRocketMQ() {


        MetabolicVo metabolicVo = new MetabolicVo();
        metabolicVo.setAge(123);
        metabolicVo.setName("测试名字");
        metabolicVo.setWeight(75);
        RocketMessageProducer.producerMsg("123","666", JSON.toJSONString(metabolicVo));
        return "请求成功!";
    }

    /**
     * 发送延迟消息测试
     * @return
     */
    @RequestMapping("/delayMqMsg")
    public String delayMqMsg() {
        Properties producerProperties = new Properties();
        producerProperties.setProperty(PropertyKeyConst.AccessKey, mqConfig.getAccessKey());
        producerProperties.setProperty(PropertyKeyConst.SecretKey, mqConfig.getSecretKey());
        producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, mqConfig.getNameSrvAddr());
        //注意!!!如果访问阿里云RocketMQ 5.0系列实例,不要设置PropertyKeyConst.INSTANCE_ID,否则会导致收发失败
        Producer producer = ONSFactory.createProducer(producerProperties);
        producer.start();
        System.out.println("生产者启动..........");

        Date date = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String formatTime = sdf.format(date);

        String meg = formatTime + "发送延迟消息测试";
        Message message = new Message(mqConfig.getDelayTopic(), mqConfig.getDelayTag(), meg.getBytes());
        // 延时时间单位为毫秒(ms),指定一个时刻,在这个时刻之后才能被消费,这个例子表示 3秒 后才能被消费
        long delayTime = 3000;
        message.setStartDeliverTime(System.currentTimeMillis() + delayTime);
        try {
            SendResult sendResult = producer.send(message);
            assert sendResult != null;
            System.out.println(new Date() + "发送mq消息主题:" + mqConfig.getDelayTopic() + "消息id: " + sendResult.getMessageId());
        } catch (ONSClientException e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            System.out.println(new Date() + "重试发送mq消息主题:" + mqConfig.getDelayTopic());
            e.printStackTrace();
        }
        return "请求成功!";

    }

}

优化部分

每次发送消息都要创建生产者,效率低下
使用单例优化

import com.aliyun.openservices.ons.api.ONSFactory;
import com.aliyun.openservices.ons.api.Producer;
import com.aliyun.openservices.ons.api.PropertyKeyConst;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.util.Properties;

/**
 * 生产者单例
 * @Author: albc
 * @Date: 2024/07/15/15:49
 * @Description: good good study,day day up
 */
@Component
@Slf4j
public class ProducerSingleton {

    private volatile static Producer producer;

    private static String accessKey;
    private static String secretKey;
    private static String nameSrvAddr;

    private ProducerSingleton() {

    }

    @Value("${rocketmq.accessKey}")
    private void setAccessKey(String accessKey) {
        ProducerSingleton.accessKey = accessKey;
    }

    @Value("${rocketmq.secretKey}")
    private void setSecretKey(String secretKey) {
        ProducerSingleton.secretKey = secretKey;
    }

    @Value("${rocketmq.nameSrvAddr}")
    private void setNameSrvAddr(String nameSrvAddr) {
        ProducerSingleton.nameSrvAddr = nameSrvAddr;
    }

    /**
     * 创建生产者
     * @return
     */
    public static Producer getProducer(){
        if (producer == null){
            synchronized(ProducerSingleton.class){
                if (producer == null){
                    Properties producerProperties = new Properties();
                    producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey);
                    producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey);
                    producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);
                    //注意!!!如果访问阿里云RocketMQ 5.0系列实例,不要设置PropertyKeyConst.INSTANCE_ID,否则会导致收发失败
                    producer = ONSFactory.createProducer(producerProperties);
                    producer.start();
                    log.info("生产者启动........");
                }
            }
        }
        return producer;
    }

}


import com.aliyun.openservices.ons.api.*;
import com.aliyun.openservices.ons.api.exception.ONSClientException;
import com.atkj.devicewx.level.config.MqConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


/**
 * 延迟消息生产者
 *
 * @Author: albc
 * @Date: 2024/07/15/14:11
 * @Description: good good study,day day up
 */
@Slf4j
@Component
public class BatchMessageProducer {


    @Autowired
    private MqConfig mqConfig;


    /**
     * 发送消息
     * @param msg 发送消息内容
     * @param delayTime 延迟时间,毫秒
     */
    public void sendDelayMeg(String msg,Long delayTime) {
        Producer producer = ProducerSingleton.getProducer();
        Message message = new Message(mqConfig.getDelayTopic(), mqConfig.getDelayTag(), msg.getBytes());
        message.setStartDeliverTime(System.currentTimeMillis() + delayTime);
        try {
            SendResult sendResult = producer.send(message);
            assert sendResult != null;
            log.info( "发送mq消息主题:" + mqConfig.getDelayTopic() + "消息id: " + sendResult.getMessageId());
        } catch (ONSClientException e) {
            // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理
            log.error("重试发送mq消息主题:" + mqConfig.getDelayTopic());
            e.printStackTrace();
        }finally {
            message = null;
        }
    }


}


其他不变

相关推荐

  1. 对比阿里的SofaMQ与RocketMQ

    2024-07-16 10:10:03       45 阅读
  2. spring boot项目对接阿里RocketMq5

    2024-07-16 10:10:03       38 阅读
  3. SpringBoot整合RocketMQ

    2024-07-16 10:10:03       46 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-16 10:10:03       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-16 10:10:03       72 阅读
  3. 在Django里面运行非项目文件

    2024-07-16 10:10:03       58 阅读
  4. Python语言-面向对象

    2024-07-16 10:10:03       69 阅读

热门阅读

  1. k8s nacos2.0.3 连接 mysql8.0 提示No Datasource set问题

    2024-07-16 10:10:03       22 阅读
  2. 神经系统疾病病人的护理

    2024-07-16 10:10:03       19 阅读
  3. RandomAccessFile详细总结

    2024-07-16 10:10:03       20 阅读
  4. XML 解析异常问题解决

    2024-07-16 10:10:03       27 阅读
  5. GCN、GIN

    2024-07-16 10:10:03       22 阅读
  6. c#中的事件

    2024-07-16 10:10:03       26 阅读
  7. QT下,如何获取控制台输入

    2024-07-16 10:10:03       25 阅读