Springboot整合mqtt采用注解进行监听(第二篇)

基础实现请看第一篇SpringBoot整合MQTT最新教程,最详细教程!

  这篇文章是将监听的实现加入了注解,项目的任何地方只用写入注解就可以实现监听,并对监听的消息进行处理。如果文章中有描述不清楚的地方请留言。

第一步创建注解
@Target(ElementType.METHOD)
@Retention(value = RetentionPolicy.RUNTIME)
public @interface MqttTopicListener {
   

    String value(); // 主题名称

    int qos() default 0; // QoS级别,默认为0
    
    //boolean hookFlag() default false; //钩子标记
}
第二步

  创建MqttTopicAnnotationProcessor用于处理带有注解的对象的方法。对于每个被@MqttTopicListener注解的方法,提取注解中的topic和qos值,创建一个IMqttMessageListener并订阅该topic。当消息到达时,调用目标对象的该方法并传入mqttTopic和message参数。

/** 
 * method.invoke(objectWithAnnotations, mqttTopic, message);
 * 上面这种方式绕过了 Spring 的 AOP 代理,也就是说,这个调用并不会触发 Spring AOP 的切面逻辑。
 * 也就是说直接使用 objectWithAnnotations 不会经过 Spring 容器,导致 AOP 切面无法拦截和处理这个调用。
 * 改用手动代理方式
 * Object targetObject = applicationContext.getBean(objectWithAnnotations.getClass());
 * method.invoke(targetObject, mqttTopic, message);
 * 或者采用消息适配器
 */
@Component
public class MqttTopicAnnotationProcessor {
   

    private final MQTTClientUtils mqttClientUtils;

    private final ApplicationContext applicationContext;

    public MqttTopicAnnotationProcessor(MQTTClientUtils mqttClientUtils,ApplicationContext applicationContext) {
   
        this.mqttClientUtils = mqttClientUtils;
        this.applicationContext = applicationContext;
    }

    public void processAnnotations(Object objectWithAnnotations) {
   
        Class<?> clazz = objectWithAnnotations.getClass();
        for (Method method : clazz.getDeclaredMethods()) {
   
            if (method.isAnnotationPresent(MqttTopicListener.class)) {
   
                MqttTopicListener annotation = method.getAnnotation(MqttTopicListener.class);
                String topic = annotation.value();
                int qos = annotation.qos();
                IMqttMessageListener listener = new IMqttMessageListener() {
   
                    @Override
                    public void messageArrived(String mqttTopic, MqttMessage message) throws Exception {
   
                        Object targetObject = applicationContext.getBean(objectWithAnnotations.getClass());
                        method.invoke(targetObject, mqttTopic, message);
//                        method.invoke(objectWithAnnotations, mqttTopic, message);如果采用这种方式targetObject 没有被代理
                    }
                };
                mqttClientUtils.subscribe(topic, qos, listener);
            }
        }
    }
}
第三步

  创建一个模板类,并利用他来对上一步中类在进行初始化,以便让我们的注解在启动时生效。既然叫模板类,那么我们可以在这个类中对于监听的消息做一些异常处理,对象转换将公共方法提取到该类中,简化我们的代码,这里我只做了简单的初始化。

import javax.annotation.PostConstruct;

public abstract class AbstractMqttMessageListenerService {
   

    protected MqttTopicAnnotationProcessor mqttTopicAnnotationProcessor;

    public AbstractMqttMessageListenerService(MqttTopicAnnotationProcessor mqttTopicAnnotationProcessor) {
   
        this.mqttTopicAnnotationProcessor = mqttTopicAnnotationProcessor;
    }

    @PostConstruct
    public void initialize() {
   
        mqttTopicAnnotationProcessor.processAnnotations(this);
    }
}

第四步

  具体的实现,创建一个继承AbstractMqttMessageListenerService 重写构造方法,添加一个testMessage,写上了我们的注解,之后我们发送消息就可以了。(如果其他类想实现就直接继承AbstractMqttMessageListenerService ,在方法上添加注解就完成了)

@Slf4j
@Component
public class MqttMessageListenerService extends AbstractMqttMessageListenerService {
   

    public MqttMessageListenerService(MqttTopicAnnotationProcessor mqttTopicAnnotationProcessor) {
   
        super(mqttTopicAnnotationProcessor);
    }
	
	/**
     * 测试
     *
     * @param topic
     * @param message
     */
    @MqttTopicListener(value = "test", qos = 2)
    public void testMessage(String topic, MqttMessage message) {
   
        String messageBody= new String(message.getPayload(), StandardCharsets.UTF_8);
        log.info("收到{}主题消息:messageBody:{}", topic, messageBody);
    }

}

相关推荐

  1. Springboot整合mqtt采用注解进行监听第二

    2024-01-31 00:16:02       39 阅读
  2. Springboot整合物联网IOT的MQTT协议

    2024-01-31 00:16:02       13 阅读
  3. springboot 整合 actuator监控详情

    2024-01-31 00:16:02       30 阅读
  4. SpringBoot与Prometheus监控整合

    2024-01-31 00:16:02       19 阅读
  5. SpringBoot 注解超全详解(整合超详细版本)

    2024-01-31 00:16:02       23 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-01-31 00:16:02       16 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-01-31 00:16:02       16 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-01-31 00:16:02       15 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-01-31 00:16:02       18 阅读

热门阅读

  1. 2.1写一个梅林dynv6插件(上)

    2024-01-31 00:16:02       49 阅读
  2. 为什么Vue3双向绑定使用Proxy

    2024-01-31 00:16:02       45 阅读
  3. 飞往前端的第二天

    2024-01-31 00:16:02       34 阅读
  4. SpringMVC初始化源码学习

    2024-01-31 00:16:02       34 阅读
  5. Chinese and English names of 45 common character symbols

    2024-01-31 00:16:02       28 阅读
  6. Map和Set

    Map和Set

    2024-01-31 00:16:02      34 阅读
  7. 如何编写.gitignore文件

    2024-01-31 00:16:02       28 阅读