MQTT协议

1.简介

MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息发布/订阅网络协议,专为低带宽和不稳定网络环境下的设备通信设计,常用于物联网(IoT)应用。它在1999年由IBM开发,现在是一个开放的标准协议。

1.1.主要特点包括:

  1. 轻量级和低开销:MQTT 协议的数据包非常小巧,非常适合网络带宽有限的情况。
  2. 数据传输可靠:支持多种消息服务质量等级,确保数据按需可靠传输。
  3. 支持离线消息:通过保留消息和持久会话,支持设备离线时的消息保存和转发。
  4. 简单易用:协议结构简单,易于实现和维护。

1.2.工作原理:

MQTT基于发布/订阅模式,其中消息通过主题进行分类。客户端可以订阅一个或多个主题,而发布者发送消息到特定的主题,所有订阅该主题的客户端都将接收到这些消息。

1.3.服务质量(QoS)等级:

  • QoS 0:最多发送一次,不保证消息到达,也就是"最多一次"。
  • QoS 1:至少发送一次,确保消息到达,但可能会有重复,也就是"至少一次"。
  • QoS 2:确保消息只有一次到达,也就是"只有一次"。

MQTT被广泛应用于智能家居、工业互联网、车联网等场景,由于其简单高效的特性,特别适合需要快速、可靠通信的物联网设备。

2.MQTT和MQ消息队列之间的区别是什么

MQTT 和传统的消息队列(如 IBM 的 MQ 系列)都用于在应用程序之间传输消息,但它们在设计目的、架构和适用场景上有一些显著的区别:

2.1. 设计目的和应用场景:

  • MQTT:专为低带宽、高延迟或不可靠的网络环境设计,主要用于物联网设备间的轻量级通信。
  • MQ 消息队列(如 IBM MQ):设计用于企业级应用,支持复杂的消息传递模式,确保消息传输的安全性和可靠性,适用于金融、零售等行业的应用。

2.2. 通信模型:

  • MQTT:基于发布/订阅模式,消息通过主题进行分类。发布者发送消息到主题,订阅该主题的客户端接收消息。
  • MQ 消息队列:基于点对点的消息队列模型。消息被发送到一个队列,然后一个或多个消费者可以从队列中取出消息进行处理。

2.3. 网络开销和性能:

  • MQTT:非常轻量级,网络开销小,适用于带宽受限的环境。
  • MQ 消息队列:相对于 MQTT,可能有更高的网络开销和更复杂的客户端实现,但提供更高的数据完整性和安全性。

2.4. 可靠性和服务质量:

  • MQTT:提供三种服务质量等级,适应不同的网络可靠性需求。
  • MQ 消息队列:通常提供更高级的消息持久性、事务支持和恢复能力,确保在系统故障时消息不会丢失。

2.5. 使用环境:

  • MQTT:常用于移动设备、传感器网络、小型嵌入式设备等。
  • MQ 消息队列:常用于服务器与应用之间、大型分布式系统的核心组件。

总的来说,MQTT 更适合物联网和移动环境,而 MQ 消息队列则更适合需要高度可靠和安全的企业级应用。

3. MQTT主要用于物联网设备间的轻量级通信

MQTT协议之所以能够在低带宽、高延迟或不可靠的网络环境中表现出色,主要得益于其设计哲学和技术特点。以下是一些关键因素,使得MQTT特别适合物联网(IoT)和类似场景:

3.1. 轻量级协议设计

  • 小型消息头:MQTT 的消息头非常小(固定头部只有2字节),这减少了网络传输的数据量,使其在带宽有限的环境中更有效。
  • 简化的消息交换:MQTT 的交互模式简单,仅需少量的指令和消息类型来进行通信,降低了处理复杂性和资源消耗。

3.2. 发布/订阅模型

  • 解耦生产者和消费者:在这种模型中,消息的发送者(发布者)和接收者(订阅者)不直接通信,而是通过主题来交换消息。这种解耦方式减少了客户端之间的连接数,适用于设备数量庞大的网络。
  • 灵活的消息分发:发布者可以发送消息到一个主题,而所有订阅该主题的客户端都会接收到消息,支持一对多的消息分发,有效地减少了网络流量。

3.3. 服务质量等级

  • 可配置的消息交付保证:MQTT 提供三种服务质量(QoS)等级,允许开发者根据网络环境和应用需求选择最合适的交付方式:
    • QoS 0(最多一次):消息可能丢失,适用于对可靠性要求不高的应用。
    • QoS 1(至少一次):确保消息到达,但可能重复。
    • QoS 2(只有一次):确保每条消息只被接收一次,适用于对重复或丢失非常敏感的应用。

3.4. 网络不可靠性容忍

  • 会话持久性:在连接断开后,可以通过持久会话重新建立连接而不丢失订阅信息和消息。
  • 离线消息:支持存储消息直至订阅者上线后再发送,保证即使在连接不稳定时也不会丢失重要信息。

3.5. 心跳和保持连接机制

  • 定期心跳:客户端定期发送心跳包以保持与服务器的连接,这有助于检测和恢复死链,同时避免频繁的网络连接和断开。

这些特性使得MQTT非常适合在网络条件不佳的环境中使用,如远程监控、智能设备、移动通信等场景,特别是在物联网设备间需要高效、节省资源的通信时。

4.MQTT代码Demo

    <dependency>  
        <groupId>org.eclipse.paho</groupId>  
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId>  
        <version>1.2.5</version>  
    </dependency>
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;  
import org.eclipse.paho.client.mqttv3.MqttClient;  
import org.eclipse.paho.client.mqttv3.MqttException;  
import org.eclipse.paho.client.mqttv3.MqttMessage;  
  
public class Subscriber {  
  
        public static void main(String[] args) {  
                String broker = "tcp://124.70.4.116:9003";  
                String clientId = MqttClient.generateClientId();  
                try {  
                        MqttClient client = new MqttClient(broker, clientId);  
                        client.setCallback(new MqttCallback() {  
                                @Override  
                                public void connectionLost(Throwable cause) {  
                                        System.out.println("Connection lost!");  
                                }  
  
                                @Override  
                                public void messageArrived(String topic, MqttMessage message) {  
                                        System.out.println("Message received:\n\t" + new String(message.getPayload()));  
                                }  
  
                                @Override  
                                public void deliveryComplete(IMqttDeliveryToken token) {  
                                }  
                        });  
                        client.connect();  
                        client.subscribe("test/topic");  
                        System.out.println("Subscriber is now listening to 'test/topic'");  
                } catch (MqttException e) {  
                        e.printStackTrace();  
                }  
        }  
}
import org.eclipse.paho.client.mqttv3.MqttClient;  
import org.eclipse.paho.client.mqttv3.MqttException;  
import org.eclipse.paho.client.mqttv3.MqttMessage;  
  
public class Publisher {  
  
    public static void main(String[] args) {  
        String broker = "tcp://124.70.4.116:9003";  
        String clientId = MqttClient.generateClientId();  
        try {  
            MqttClient client = new MqttClient(broker, clientId);  
            client.connect();  
            MqttMessage message = new MqttMessage("Hello MQTT xiaocai".getBytes());  
            message.setQos(2);  
            client.publish("test/topic", message);  
            System.out.println("Message published");  
            client.disconnect();  
        } catch (MqttException e) {  
            e.printStackTrace();  
        }  
    }  
}

5.MQTT客户端

具体可借鉴[分享]7 个 MQTT 客户端工具-腾讯云开发者社区-腾讯云 (tencent.com)

5.1. MQTTX

MQTTX 是 EMQ 开源的一款跨平台 MQTT 5.0 客户端工具,它支持 macOS, Linux, Windows,并且支持 MQTT 消息格式转换。

MQTTX 的用户界面借助聊天软件的形式简化了页面的操作逻辑,用户可以快速创建连接保存并同时建立多个连接客户端,方便用户快速测试 MQTT/TCP、MQTT/TLS、MQTT/WebSocket 的 连接/发布/订阅 功能及其他特性。

MQTTX 致力于打造优雅、易用的全平台 MQTT 客户端,并在最近发布了 MQTTX CLI 及 MQTTX Web 两个版本,目前在 GitHub Star 数已达到 2K,已成为使用场景最完整的 MQTT 测试客户端。

在这里插入图片描述

5.1.1.特性

  • 跨平台,支持 Windows,macOS 和 Linux
  • 支持 MQTT v3.1.1 以及 MQTT v5.0 协议
  • 单/双向 SSL 认证:支持 CA、自签名证书,以及单、双向 SSL 认证
  • 支持 Light、Dark、Night 三种主题模式切换
  • 支持 WebSocket 连接至 MQTT 服务器
  • 支持 Hex, Base64, JSON, Plaintext
  • 支持简体中文、英文、日文、土耳其文及匈牙利文
  • 订阅 Topic 支持自定义颜色标记
  • 支持 $SYS 主题自动订阅,查看流量统计
  • 自定义编辑脚本测试和模拟收发数据
  • 完整的日志记录

5.1.2.下载

  • 官网下载:https://mqttx.app/zh
  • GitHub 下载:https://github.com/emqx/MQTTX/releases

6.MQTT的Docker安装

version: '3.0'
services:             
  base_software_emqx1:
    image: emqx:5.7.1
    container_name: base_software_emqx1
    environment:
    - "EMQX_NODE_NAME=emqx@node1.emqx.io"
    - "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
    - "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"
    healthcheck:
      test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]
      interval: 5s
      timeout: 25s
      retries: 5
    networks:
      emqx-bridge:
        aliases:
        - node1.emqx.io
    ports:
      - 9003:1883
      - 8083:8083
      - 8084:8084
      - 8883:8883
      - 18083:18083 
    volumes:
      - /data/docker/dms/emqx1_data:/opt/emqx/data

  base_software_emqx2:
    image: emqx:5.7.1
    container_name: base_software_emqx2
    environment:
    - "EMQX_NODE_NAME=emqx@node2.emqx.io"
    - "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
    - "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"
    healthcheck:
      test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]
      interval: 5s
      timeout: 25s
      retries: 5
    networks:
      emqx-bridge:
        aliases:
        - node2.emqx.io
    volumes:
      - /data/docker/dms/emqx2_data:/opt/emqx/data  

networks:
  emqx-bridge:
    driver: bridge

相关推荐

  1. MQTT 协议的优势

    2024-07-14 23:56:04       18 阅读
  2. MQTTMQTT协议与指令下发;MQTT与Kafka比较

    2024-07-14 23:56:04       45 阅读
  3. mqtt协议有哪些机制

    2024-07-14 23:56:04       20 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-07-14 23:56:04       67 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-07-14 23:56:04       71 阅读
  3. 在Django里面运行非项目文件

    2024-07-14 23:56:04       58 阅读
  4. Python语言-面向对象

    2024-07-14 23:56:04       69 阅读

热门阅读

  1. 分析 Android 应用中的日志信息应遵循的原则

    2024-07-14 23:56:04       20 阅读
  2. 牛客周赛51 F(静态区间最大连续子段和)

    2024-07-14 23:56:04       21 阅读
  3. 解锁Postman的API参数化:动态请求的秘诀

    2024-07-14 23:56:04       21 阅读
  4. 如何理解electron 的预加载脚本

    2024-07-14 23:56:04       20 阅读
  5. 力扣题解(回文子串)

    2024-07-14 23:56:04       21 阅读
  6. 题解:P9999 [Ynoi2000] tmostnrq

    2024-07-14 23:56:04       20 阅读
  7. Vue项目中禁用ESLint的几种常见方法

    2024-07-14 23:56:04       17 阅读