Springboot3.0整合RocketMq消费端,实现消息流控(配置消息拉取频率、拉取数量)

rocketmq官网介绍了生产者和消费者的类型:官网

生产者:

1.DefaultMQProducer

消费者:

1.DefaultMQPushConsumer

2.DefaultLitePullConsumer

rocketmq的消费模式有两种:

1.push消费 (主动推送rocketmq,然后获取数据)使用的是DefaultMQPushConsumer

2.pull消费 (主动拉取数据)使用的是DefaultLitePullConsumer

原本我使用的是

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot</artifactId>
            <version>2.2.3</version>
        </dependency>

rocketmq自动装配,然而这个依赖默认使用的是DefaultLitePullConsumer,也就是pull类型,但是此类型不支持设置mq消费端的限流配置,如消息拉取频率、拉取数量等,所以我本次使用了DefaultMQPushConsumer,支持以上限流,接下来就看代码吧

1.首先引入rocketmq-client依赖,如果不清楚版本对应关系,请看我的这篇帖子:springboot各个组件版本对应关系

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>${rocketmq.version}</version>
        </dependency>

2.新建一个rocketmq启动监听类DefaultMQConsumeListener

package com.tfyt.rocketmq.config;

import com.tfyt.common.constant.rocketmq.RocketMqConstant;
import com.tfyt.rocketmq.consumer.UserFocusConsumer;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;

/**
 * <p>
 * rocketmq配置
 * </p>
 *
 * @author 刘辉
 * @since 2024/7/11 上午11:16
 */
@Configuration
@Slf4j
public class DefaultMQConsumeListener {
    @Value("${rocketmq.name-server}")
    private String nameServerAddr;
    @Value("${rocketmq.pull-interval}")
    private Long pullInterval;
    @Value("${rocketmq.pull-batch-size}")
    private Integer pullBatchSize;

    @Autowired
    private UserFocusConsumer userFocusConsumer;

    @PostConstruct
    public void defaultMQConsumer(){
        run(RocketMqConstant.USER_FOCUS_GROUP,RocketMqConstant.USER_FOCUS_TOPIC,userFocusConsumer);
    }

    private void run(String group, String topic, MessageListener listener){
        try{
            log.info("mq consumer groupName:{},配置 start",group);
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer (group);
            consumer.setNamesrvAddr(nameServerAddr);
            //拉取消息速度
            consumer.setPullInterval(pullInterval);
            //一次性拉取的数量
            consumer.setPullBatchSize(pullBatchSize);
            //订阅消息
            consumer.subscribe(topic,"*");
            if(listener instanceof MessageListenerConcurrently){
                consumer.registerMessageListener((MessageListenerConcurrently)listener);
            }else if(listener instanceof MessageListenerOrderly){
                consumer.registerMessageListener((MessageListenerOrderly) listener);
            }else{
                log.error("mq producer groupName:{},不支持的监听器",group);
                return;
            }
            consumer.start();
            log.info("mq producer groupName:{},配置 end",group);
        }
        catch (Exception e){
            log.error("mq consume groupName:{} 启动失败",group);
            log.error(ExceptionUtils.getStackTrace(e));
        }
    }
}

PullInterval配置的是消息的拉取频率,单位是毫秒ms,例如一秒就是1000,默认是0

PullBatchSize配置的是消息拉取数量,也就是单次拉取多少,默认是32,

如果要监听多个topic,调用多次run方法即可,启动多个监听

3.配置具体消费者逻辑:UserFocusConsumer(这是我自己的业务逻辑,实际上你要替换成自己的)

package com.tfyt.rocketmq.consumer;

import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.tfyt.common.enums.FocusState;
import com.tfyt.common.vo.UserFocusVo;
import com.tfyt.rocketmq.entity.Focus;
import com.tfyt.rocketmq.mapper.FocusMapper;
import com.tfyt.rocketmq.util.ByteUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Objects;

/**
 * <p>
 * 用户关注消费端
 * </p>
 *
 * @author 刘辉
 * @since 2024/7/11 上午11:54
 */
@Slf4j
@Component
public class UserFocusConsumer implements MessageListenerOrderly {
    @Autowired
    private FocusMapper focusMapper;
    @Override
    public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
        try{
//            log.info("{} Receive New Messages:{} ", Thread.currentThread().getName(), list);
            Message message = list.get(0);
            UserFocusVo userFocusVo = (UserFocusVo)ByteUtil.ByteToObject(message.getBody());
            log.info("message body:{}", JSONObject.toJSONString(userFocusVo));
            return ConsumeOrderlyStatus.SUCCESS;
        }catch (Exception e){
            log.error("接收消息异常{}",e.getMessage(),e);
            return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
        }
    }
}

如果你是顺序消息,就实现MessageListenerOrderly,如果不是顺序消息,就实现MessageListenerConcurrently

实现consumeMessage方法即可,启动springboot,发送消息,就可以看到已经监听成功了。

最后附上一个字节转对象的工具类:

package com.tfyt.rocketmq.util;

import com.tfyt.common.exception.BusinessException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;

import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;

/**
 * <p>
 *
 * </p>
 *
 * @author 刘辉
 * @since 2024/7/11 下午12:00
 */
@Slf4j
public class ByteUtil {
    /**
     * byte转对象
     * @param bytes
     * @return
     */
    public static Object ByteToObject(byte[] bytes) {
        Object obj = null;
        try {
            // bytearray to object
            ByteArrayInputStream bi = new ByteArrayInputStream(bytes);
            ObjectInputStream oi = new ObjectInputStream(bi);

            obj = oi.readObject();
            bi.close();
            oi.close();
        } catch (Exception e) {
            log.error(ExceptionUtils.getStackTrace(e));
            throw new BusinessException(400,"消息消费失败");
        }
        return obj;
    }
}

相关推荐

  1. 使用 RocketMQ 实现消息的顺序消费

    2024-07-18 20:28:01       26 阅读
  2. RocketMQ笔记(九)SpringBoot整合RocketMQ消息过滤

    2024-07-18 20:28:01       36 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-18 20:28:01       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-18 20:28:01       72 阅读
  3. 在Django里面运行非项目文件

    2024-07-18 20:28:01       58 阅读
  4. Python语言-面向对象

    2024-07-18 20:28:01       69 阅读

热门阅读

  1. QTableView

    2024-07-18 20:28:01       19 阅读
  2. crontab文件只能有一个吗

    2024-07-18 20:28:01       22 阅读
  3. Linux系统如何安装pip pip3

    2024-07-18 20:28:01       21 阅读
  4. 深入解析:conda 与 pip 使用全攻略

    2024-07-18 20:28:01       20 阅读
  5. c字符串转go字符串

    2024-07-18 20:28:01       20 阅读
  6. Maxwell同步mysql binlog日志执行的几条数据库命令

    2024-07-18 20:28:01       20 阅读
  7. Andrey‘s Tree

    2024-07-18 20:28:01       21 阅读