mqtt-emqx:设置遗嘱消息

【pom.xml】

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.3.12.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.2.2</version>
</dependency>
<dependency>
    <groupId>com.alibaba.fastjson2</groupId>
    <artifactId>fastjson2</artifactId>
    <version>2.0.49</version>
</dependency>

【MyDemo6MqttCallback.java】

package com.chz.myMqttV3.demo6;

@Slf4j
public class MyDemo6MqttCallback implements MqttCallbackExtended {

    private MqttClient client;
    private MqttConnectOptions options;
    private String[] topics;

    public MyDemo6MqttCallback(MqttClient client, MqttConnectOptions options, String[] topics)
    {
        this.client = client;
        this.options = options;
        this.topics = topics;
    }

    @SneakyThrows
    @Override
    public void connectionLost(Throwable throwable) {
        log.error("connectionLost", throwable);
        while (!client.isConnected()) {
            log.info("emqx重新连接....................................................");
            client.connect(options);
            Thread.sleep(1000);
        }
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        log.info("messageArrived: topic={}, message={}", topic, new String(message.getPayload()));
    }

    @SneakyThrows
    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        if( token!=null ){
            MqttMessage message = token.getMessage();
            String topic = token.getTopics()==null ? null : Arrays.asList(token.getTopics()).toString();
            String str = message==null ? null : new String(message.getPayload());
            log.info("deliveryComplete: topic={}, message={}", topic, str);
        } else {
            log.info("deliveryComplete: null");
        }
    }

    @SneakyThrows
    @Override
    public void connectComplete(boolean reconnect, String serverURI) {
        log.info("connectComplete: reconnect={}, serverURI={}", reconnect, serverURI);

        if( topics.length > 0 ){
            int[] qosArr = new int[topics.length];
            Arrays.fill(qosArr, 2);

            MyDemo6MqttMessageListener[] listeners = new MyDemo6MqttMessageListener[topics.length];
            Arrays.fill(listeners, new MyDemo6MqttMessageListener());

            client.subscribe(topics, qosArr, listeners);
        }
    }
}

【MyDemo6MqttMessageListener.java】

package com.chz.myMqttV3.demo6;

@Slf4j
public class MyDemo6MqttMessageListener implements IMqttMessageListener
{
    @Override
    public void messageArrived(String topic, MqttMessage message) {
        log.info("messageArrived: topic={}, message={}", topic, new String(message.getPayload()));
    }
}

【MyDemo6MqttClient1Test.java】

package com.chz.myMqttV3.demo6;

public class MyDemo6MqttClient1Test
{
    public static void main(String[] args) throws  MqttException {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("admin");
        options.setPassword("public".toCharArray());
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(20);
        options.setKeepAliveInterval(10);

        MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo6MqttClient1Test", new MemoryPersistence());
        client.setCallback(new MyDemo6MqttCallback(client, options, new String[]{"device/#"}));
        client.connect(options);
    }
}

【MyDemo6MqttSenderTest.java】

package com.chz.myMqttV3.demo6;

public class MyDemo6MqttSenderTest
{
    public static void main(String[] args) throws UnknownHostException, MqttException, InterruptedException {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setUserName("admin");
        options.setPassword("public".toCharArray());
        options.setCleanSession(true);
        options.setAutomaticReconnect(true);
        options.setConnectionTimeout(20);
        options.setKeepAliveInterval(10);
        // 这里设置遗嘱消息
        options.setWill("device/1", "I am MyDemo6MqttSenderTest, I am dead!!!".getBytes(), 1, false);

        MqttClient client = new MqttClient("tcp://192.168.44.228:1883", "MyDemo6MqttSenderTest", new MemoryPersistence());
        client.setCallback(new MyDemo6MqttCallback(client, options, new String[]{}));
        client.connect(options);
    }
}

启动【MyDemo6MqttSenderTest、MyDemo6MqttClient1Test】,等两个进程都正常启动完之后,将【MyDemo6MqttSenderTest】进程杀掉。会发现【MyDemo6MqttClient1Test】自动收到消息【I am MyDemo6MqttSenderTest, I am dead!!!】
在这里插入图片描述

相关推荐

  1. MQTT保留消息遗嘱消息理解和应用

    2024-06-09 00:04:02       62 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-06-09 00:04:02       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-06-09 00:04:02       101 阅读
  3. 在Django里面运行非项目文件

    2024-06-09 00:04:02       82 阅读
  4. Python语言-面向对象

    2024-06-09 00:04:02       91 阅读

热门阅读

  1. 基于SpringBoot的装饰工程管理系统源码数据库

    2024-06-09 00:04:02       25 阅读
  2. TCP为什么握手是三次,而挥手是四次

    2024-06-09 00:04:02       25 阅读
  3. [力扣题解] 617. 合并二叉树

    2024-06-09 00:04:02       28 阅读
  4. Android13 调试,解锁bootloader

    2024-06-09 00:04:02       26 阅读
  5. 发送TCP reset (RST) 包打断TCP连接

    2024-06-09 00:04:02       28 阅读