rocketmq官网介绍了生产者和消费者的类型:官网
生产者:
1.DefaultMQProducer
消费者:
1.DefaultMQPushConsumer
2.DefaultLitePullConsumer
rocketmq的消费模式有两种:
1.push消费 (主动推送rocketmq,然后获取数据)使用的是DefaultMQPushConsumer
2.pull消费 (主动拉取数据)使用的是DefaultLitePullConsumer
原本我使用的是
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot</artifactId>
<version>2.2.3</version>
</dependency>
rocketmq自动装配,然而这个依赖默认使用的是DefaultLitePullConsumer,也就是pull类型,但是此类型不支持设置mq消费端的限流配置,如消息拉取频率、拉取数量等,所以我本次使用了DefaultMQPushConsumer,支持以上限流,接下来就看代码吧
1.首先引入rocketmq-client依赖,如果不清楚版本对应关系,请看我的这篇帖子:springboot各个组件版本对应关系
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>${rocketmq.version}</version>
</dependency>
2.新建一个rocketmq启动监听类DefaultMQConsumeListener
package com.tfyt.rocketmq.config;
import com.tfyt.common.constant.rocketmq.RocketMqConstant;
import com.tfyt.rocketmq.consumer.UserFocusConsumer;
import jakarta.annotation.PostConstruct;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Configuration;
/**
* <p>
* rocketmq配置
* </p>
*
* @author 刘辉
* @since 2024/7/11 上午11:16
*/
@Configuration
@Slf4j
public class DefaultMQConsumeListener {
@Value("${rocketmq.name-server}")
private String nameServerAddr;
@Value("${rocketmq.pull-interval}")
private Long pullInterval;
@Value("${rocketmq.pull-batch-size}")
private Integer pullBatchSize;
@Autowired
private UserFocusConsumer userFocusConsumer;
@PostConstruct
public void defaultMQConsumer(){
run(RocketMqConstant.USER_FOCUS_GROUP,RocketMqConstant.USER_FOCUS_TOPIC,userFocusConsumer);
}
private void run(String group, String topic, MessageListener listener){
try{
log.info("mq consumer groupName:{},配置 start",group);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer (group);
consumer.setNamesrvAddr(nameServerAddr);
//拉取消息速度
consumer.setPullInterval(pullInterval);
//一次性拉取的数量
consumer.setPullBatchSize(pullBatchSize);
//订阅消息
consumer.subscribe(topic,"*");
if(listener instanceof MessageListenerConcurrently){
consumer.registerMessageListener((MessageListenerConcurrently)listener);
}else if(listener instanceof MessageListenerOrderly){
consumer.registerMessageListener((MessageListenerOrderly) listener);
}else{
log.error("mq producer groupName:{},不支持的监听器",group);
return;
}
consumer.start();
log.info("mq producer groupName:{},配置 end",group);
}
catch (Exception e){
log.error("mq consume groupName:{} 启动失败",group);
log.error(ExceptionUtils.getStackTrace(e));
}
}
}
PullInterval配置的是消息的拉取频率,单位是毫秒ms,例如一秒就是1000,默认是0
PullBatchSize配置的是消息拉取数量,也就是单次拉取多少,默认是32,
如果要监听多个topic,调用多次run方法即可,启动多个监听
3.配置具体消费者逻辑:UserFocusConsumer(这是我自己的业务逻辑,实际上你要替换成自己的)
package com.tfyt.rocketmq.consumer;
import com.alibaba.fastjson2.JSONObject;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.tfyt.common.enums.FocusState;
import com.tfyt.common.vo.UserFocusVo;
import com.tfyt.rocketmq.entity.Focus;
import com.tfyt.rocketmq.mapper.FocusMapper;
import com.tfyt.rocketmq.util.ByteUtil;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.List;
import java.util.Objects;
/**
* <p>
* 用户关注消费端
* </p>
*
* @author 刘辉
* @since 2024/7/11 上午11:54
*/
@Slf4j
@Component
public class UserFocusConsumer implements MessageListenerOrderly {
@Autowired
private FocusMapper focusMapper;
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext context) {
try{
// log.info("{} Receive New Messages:{} ", Thread.currentThread().getName(), list);
Message message = list.get(0);
UserFocusVo userFocusVo = (UserFocusVo)ByteUtil.ByteToObject(message.getBody());
log.info("message body:{}", JSONObject.toJSONString(userFocusVo));
return ConsumeOrderlyStatus.SUCCESS;
}catch (Exception e){
log.error("接收消息异常{}",e.getMessage(),e);
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
}
如果你是顺序消息,就实现MessageListenerOrderly,如果不是顺序消息,就实现MessageListenerConcurrently
实现consumeMessage方法即可,启动springboot,发送消息,就可以看到已经监听成功了。
最后附上一个字节转对象的工具类:
package com.tfyt.rocketmq.util;
import com.tfyt.common.exception.BusinessException;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.exception.ExceptionUtils;
import java.io.ByteArrayInputStream;
import java.io.ObjectInputStream;
/**
* <p>
*
* </p>
*
* @author 刘辉
* @since 2024/7/11 下午12:00
*/
@Slf4j
public class ByteUtil {
/**
* byte转对象
* @param bytes
* @return
*/
public static Object ByteToObject(byte[] bytes) {
Object obj = null;
try {
// bytearray to object
ByteArrayInputStream bi = new ByteArrayInputStream(bytes);
ObjectInputStream oi = new ObjectInputStream(bi);
obj = oi.readObject();
bi.close();
oi.close();
} catch (Exception e) {
log.error(ExceptionUtils.getStackTrace(e));
throw new BusinessException(400,"消息消费失败");
}
return obj;
}
}