基础实现请看第一篇SpringBoot整合MQTT最新教程,最详细教程!
这篇文章是将监听的实现加入了注解,项目的任何地方只用写入注解就可以实现监听,并对监听的消息进行处理。如果文章中有描述不清楚的地方请留言。
第一步创建注解
@Target(ElementType.METHOD)
@Retention(value = RetentionPolicy.RUNTIME)
public @interface MqttTopicListener {
String value(); // 主题名称
int qos() default 0; // QoS级别,默认为0
//boolean hookFlag() default false; //钩子标记
}
第二步
创建MqttTopicAnnotationProcessor用于处理带有注解的对象的方法。对于每个被@MqttTopicListener注解的方法,提取注解中的topic和qos值,创建一个IMqttMessageListener并订阅该topic。当消息到达时,调用目标对象的该方法并传入mqttTopic和message参数。
/**
* method.invoke(objectWithAnnotations, mqttTopic, message);
* 上面这种方式绕过了 Spring 的 AOP 代理,也就是说,这个调用并不会触发 Spring AOP 的切面逻辑。
* 也就是说直接使用 objectWithAnnotations 不会经过 Spring 容器,导致 AOP 切面无法拦截和处理这个调用。
* 改用手动代理方式
* Object targetObject = applicationContext.getBean(objectWithAnnotations.getClass());
* method.invoke(targetObject, mqttTopic, message);
* 或者采用消息适配器
*/
@Component
public class MqttTopicAnnotationProcessor {
private final MQTTClientUtils mqttClientUtils;
private final ApplicationContext applicationContext;
public MqttTopicAnnotationProcessor(MQTTClientUtils mqttClientUtils,ApplicationContext applicationContext) {
this.mqttClientUtils = mqttClientUtils;
this.applicationContext = applicationContext;
}
public void processAnnotations(Object objectWithAnnotations) {
Class<?> clazz = objectWithAnnotations.getClass();
for (Method method : clazz.getDeclaredMethods()) {
if (method.isAnnotationPresent(MqttTopicListener.class)) {
MqttTopicListener annotation = method.getAnnotation(MqttTopicListener.class);
String topic = annotation.value();
int qos = annotation.qos();
IMqttMessageListener listener = new IMqttMessageListener() {
@Override
public void messageArrived(String mqttTopic, MqttMessage message) throws Exception {
Object targetObject = applicationContext.getBean(objectWithAnnotations.getClass());
method.invoke(targetObject, mqttTopic, message);
// method.invoke(objectWithAnnotations, mqttTopic, message);如果采用这种方式targetObject 没有被代理
}
};
mqttClientUtils.subscribe(topic, qos, listener);
}
}
}
}
第三步
创建一个模板类,并利用他来对上一步中类在进行初始化,以便让我们的注解在启动时生效。既然叫模板类,那么我们可以在这个类中对于监听的消息做一些异常处理,对象转换将公共方法提取到该类中,简化我们的代码,这里我只做了简单的初始化。
import javax.annotation.PostConstruct;
public abstract class AbstractMqttMessageListenerService {
protected MqttTopicAnnotationProcessor mqttTopicAnnotationProcessor;
public AbstractMqttMessageListenerService(MqttTopicAnnotationProcessor mqttTopicAnnotationProcessor) {
this.mqttTopicAnnotationProcessor = mqttTopicAnnotationProcessor;
}
@PostConstruct
public void initialize() {
mqttTopicAnnotationProcessor.processAnnotations(this);
}
}
第四步
具体的实现,创建一个继承AbstractMqttMessageListenerService 重写构造方法,添加一个testMessage,写上了我们的注解,之后我们发送消息就可以了。(如果其他类想实现就直接继承AbstractMqttMessageListenerService ,在方法上添加注解就完成了)
@Slf4j
@Component
public class MqttMessageListenerService extends AbstractMqttMessageListenerService {
public MqttMessageListenerService(MqttTopicAnnotationProcessor mqttTopicAnnotationProcessor) {
super(mqttTopicAnnotationProcessor);
}
/**
* 测试
*
* @param topic
* @param message
*/
@MqttTopicListener(value = "test", qos = 2)
public void testMessage(String topic, MqttMessage message) {
String messageBody= new String(message.getPayload(), StandardCharsets.UTF_8);
log.info("收到{}主题消息:messageBody:{}", topic, messageBody);
}
}