1.简介
MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息发布/订阅网络协议,专为低带宽和不稳定网络环境下的设备通信设计,常用于物联网(IoT)应用。它在1999年由IBM开发,现在是一个开放的标准协议。
1.1.主要特点包括:
- 轻量级和低开销:MQTT 协议的数据包非常小巧,非常适合网络带宽有限的情况。
- 数据传输可靠:支持多种消息服务质量等级,确保数据按需可靠传输。
- 支持离线消息:通过保留消息和持久会话,支持设备离线时的消息保存和转发。
- 简单易用:协议结构简单,易于实现和维护。
1.2.工作原理:
MQTT基于发布/订阅模式,其中消息通过主题进行分类。客户端可以订阅一个或多个主题,而发布者发送消息到特定的主题,所有订阅该主题的客户端都将接收到这些消息。
1.3.服务质量(QoS)等级:
- QoS 0:最多发送一次,不保证消息到达,也就是"最多一次"。
- QoS 1:至少发送一次,确保消息到达,但可能会有重复,也就是"至少一次"。
- QoS 2:确保消息只有一次到达,也就是"只有一次"。
MQTT被广泛应用于智能家居、工业互联网、车联网等场景,由于其简单高效的特性,特别适合需要快速、可靠通信的物联网设备。
2.MQTT和MQ消息队列之间的区别是什么
MQTT 和传统的消息队列(如 IBM 的 MQ 系列)都用于在应用程序之间传输消息,但它们在设计目的、架构和适用场景上有一些显著的区别:
2.1. 设计目的和应用场景:
- MQTT:专为低带宽、高延迟或不可靠的网络环境设计,主要用于物联网设备间的轻量级通信。
- MQ 消息队列(如 IBM MQ):设计用于企业级应用,支持复杂的消息传递模式,确保消息传输的安全性和可靠性,适用于金融、零售等行业的应用。
2.2. 通信模型:
- MQTT:基于发布/订阅模式,消息通过主题进行分类。发布者发送消息到主题,订阅该主题的客户端接收消息。
- MQ 消息队列:基于点对点的消息队列模型。消息被发送到一个队列,然后一个或多个消费者可以从队列中取出消息进行处理。
2.3. 网络开销和性能:
- MQTT:非常轻量级,网络开销小,适用于带宽受限的环境。
- MQ 消息队列:相对于 MQTT,可能有更高的网络开销和更复杂的客户端实现,但提供更高的数据完整性和安全性。
2.4. 可靠性和服务质量:
- MQTT:提供三种服务质量等级,适应不同的网络可靠性需求。
- MQ 消息队列:通常提供更高级的消息持久性、事务支持和恢复能力,确保在系统故障时消息不会丢失。
2.5. 使用环境:
- MQTT:常用于移动设备、传感器网络、小型嵌入式设备等。
- MQ 消息队列:常用于服务器与应用之间、大型分布式系统的核心组件。
总的来说,MQTT 更适合物联网和移动环境,而 MQ 消息队列则更适合需要高度可靠和安全的企业级应用。
3. MQTT主要用于物联网设备间的轻量级通信
MQTT协议之所以能够在低带宽、高延迟或不可靠的网络环境中表现出色,主要得益于其设计哲学和技术特点。以下是一些关键因素,使得MQTT特别适合物联网(IoT)和类似场景:
3.1. 轻量级协议设计
- 小型消息头:MQTT 的消息头非常小(固定头部只有2字节),这减少了网络传输的数据量,使其在带宽有限的环境中更有效。
- 简化的消息交换:MQTT 的交互模式简单,仅需少量的指令和消息类型来进行通信,降低了处理复杂性和资源消耗。
3.2. 发布/订阅模型
- 解耦生产者和消费者:在这种模型中,消息的发送者(发布者)和接收者(订阅者)不直接通信,而是通过主题来交换消息。这种解耦方式减少了客户端之间的连接数,适用于设备数量庞大的网络。
- 灵活的消息分发:发布者可以发送消息到一个主题,而所有订阅该主题的客户端都会接收到消息,支持一对多的消息分发,有效地减少了网络流量。
3.3. 服务质量等级
- 可配置的消息交付保证:MQTT 提供三种服务质量(QoS)等级,允许开发者根据网络环境和应用需求选择最合适的交付方式:
- QoS 0(最多一次):消息可能丢失,适用于对可靠性要求不高的应用。
- QoS 1(至少一次):确保消息到达,但可能重复。
- QoS 2(只有一次):确保每条消息只被接收一次,适用于对重复或丢失非常敏感的应用。
3.4. 网络不可靠性容忍
- 会话持久性:在连接断开后,可以通过持久会话重新建立连接而不丢失订阅信息和消息。
- 离线消息:支持存储消息直至订阅者上线后再发送,保证即使在连接不稳定时也不会丢失重要信息。
3.5. 心跳和保持连接机制
- 定期心跳:客户端定期发送心跳包以保持与服务器的连接,这有助于检测和恢复死链,同时避免频繁的网络连接和断开。
这些特性使得MQTT非常适合在网络条件不佳的环境中使用,如远程监控、智能设备、移动通信等场景,特别是在物联网设备间需要高效、节省资源的通信时。
4.MQTT代码Demo
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.5</version>
</dependency>
import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class Subscriber {
public static void main(String[] args) {
String broker = "tcp://124.70.4.116:9003";
String clientId = MqttClient.generateClientId();
try {
MqttClient client = new MqttClient(broker, clientId);
client.setCallback(new MqttCallback() {
@Override
public void connectionLost(Throwable cause) {
System.out.println("Connection lost!");
}
@Override
public void messageArrived(String topic, MqttMessage message) {
System.out.println("Message received:\n\t" + new String(message.getPayload()));
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
}
});
client.connect();
client.subscribe("test/topic");
System.out.println("Subscriber is now listening to 'test/topic'");
} catch (MqttException e) {
e.printStackTrace();
}
}
}
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
public class Publisher {
public static void main(String[] args) {
String broker = "tcp://124.70.4.116:9003";
String clientId = MqttClient.generateClientId();
try {
MqttClient client = new MqttClient(broker, clientId);
client.connect();
MqttMessage message = new MqttMessage("Hello MQTT xiaocai".getBytes());
message.setQos(2);
client.publish("test/topic", message);
System.out.println("Message published");
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
}
}
}
5.MQTT客户端
具体可借鉴[分享]7 个 MQTT 客户端工具-腾讯云开发者社区-腾讯云 (tencent.com)
5.1. MQTTX
MQTTX 是 EMQ 开源的一款跨平台 MQTT 5.0 客户端工具,它支持 macOS, Linux, Windows,并且支持 MQTT 消息格式转换。
MQTTX 的用户界面借助聊天软件的形式简化了页面的操作逻辑,用户可以快速创建连接保存并同时建立多个连接客户端,方便用户快速测试 MQTT/TCP、MQTT/TLS、MQTT/WebSocket 的 连接/发布/订阅 功能及其他特性。
MQTTX 致力于打造优雅、易用的全平台 MQTT 客户端,并在最近发布了 MQTTX CLI 及 MQTTX Web 两个版本,目前在 GitHub Star 数已达到 2K,已成为使用场景最完整的 MQTT 测试客户端。
5.1.1.特性
- 跨平台,支持 Windows,macOS 和 Linux
- 支持 MQTT v3.1.1 以及 MQTT v5.0 协议
- 单/双向 SSL 认证:支持 CA、自签名证书,以及单、双向 SSL 认证
- 支持 Light、Dark、Night 三种主题模式切换
- 支持 WebSocket 连接至 MQTT 服务器
- 支持 Hex, Base64, JSON, Plaintext
- 支持简体中文、英文、日文、土耳其文及匈牙利文
- 订阅 Topic 支持自定义颜色标记
- 支持 $SYS 主题自动订阅,查看流量统计
- 自定义编辑脚本测试和模拟收发数据
- 完整的日志记录
5.1.2.下载
- 官网下载:https://mqttx.app/zh
- GitHub 下载:https://github.com/emqx/MQTTX/releases
6.MQTT的Docker安装
version: '3.0'
services:
base_software_emqx1:
image: emqx:5.7.1
container_name: base_software_emqx1
environment:
- "EMQX_NODE_NAME=emqx@node1.emqx.io"
- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"
healthcheck:
test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]
interval: 5s
timeout: 25s
retries: 5
networks:
emqx-bridge:
aliases:
- node1.emqx.io
ports:
- 9003:1883
- 8083:8083
- 8084:8084
- 8883:8883
- 18083:18083
volumes:
- /data/docker/dms/emqx1_data:/opt/emqx/data
base_software_emqx2:
image: emqx:5.7.1
container_name: base_software_emqx2
environment:
- "EMQX_NODE_NAME=emqx@node2.emqx.io"
- "EMQX_CLUSTER__DISCOVERY_STRATEGY=static"
- "EMQX_CLUSTER__STATIC__SEEDS=[emqx@node1.emqx.io,emqx@node2.emqx.io]"
healthcheck:
test: ["CMD", "/opt/emqx/bin/emqx", "ctl", "status"]
interval: 5s
timeout: 25s
retries: 5
networks:
emqx-bridge:
aliases:
- node2.emqx.io
volumes:
- /data/docker/dms/emqx2_data:/opt/emqx/data
networks:
emqx-bridge:
driver: bridge