目录
MQTT源码分析
分析源码:mqttclient\test\emqx\test.c
参考资料:
kawaii-mqtt源码:
博客
APP
1. MQTT客户端功能
MQTT通信模型示意图如下:
以"记者-电视台-观众"的模式来理解,客户端具体的流程是这样的:
客户端1:观众打电话到电视台:connect
客户端1:观众向电视台订阅"财经新闻": Subscribe 某个 Topic
客户端2:记者打电话到电视台:connect
客户端2:记者向电视台发布"财经新闻":Public某个Topic的某个Playload
服务器:电视台向"订阅了财经新闻的观众"发布"某条消息":Public某个Playload给Subscriber
整个过程中,电视台和记者、电视台和观众直接的电话要保存连接状态,还要时不时确认一下:
记者要时不时给电视台喊一声"喂":确保电视台还正常
观众要时不时给电视台喊一声"喂":确保电视台还正常
2. 客户端软件如何实现
连接服务器
订阅:
发布订阅请求,等待回应
循环:读取Publish信息(得到订阅的信息),处理
发布:
发送数据包即可
PING
循环:确保自己、对方还活着
mqtt_packet_handle > mqtt_keep_alive
需要一个循环!
3. 程序分层
至少可以分为3层:
最上层:APP
中间层:MQTT
平台层:实现多线程、定时器、网卡收发数据
4. 情景分析
4.1 连接服务器
函数调用过程:
main
client = mqtt_lease();
mqtt_set_port(client, "1883");
mqtt_set_host(client, "www.jiejie01.top");
mqtt_connect(client);
mqtt_connect_with_results(c);
rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);
rc = network_connect(c->mqtt_network);
nettype_tcp_connect(n);
platform_net_socket_connect
4.2 创建线程
调用过程:
main
mqtt_connect(client);
mqtt_connect_with_results(c);
rc = network_init(c->mqtt_network, c->mqtt_host, c->mqtt_port, NULL);
rc = network_connect(c->mqtt_network);
/* send connect packet */
if ((rc = mqtt_send_packet(c, len, &connect_timer)) != MQTT_SUCCESS_ERROR)
goto exit;
if (mqtt_wait_packet(c, CONNACK, &connect_timer) == CONNACK) {
}
/* connect success, and need init mqtt thread */
c->mqtt_thread= platform_thread_init("mqtt_yield_thread", mqtt_yield_thread,c, ...);
4.3 发布消息
调用过程:
main
res = pthread_create(&thread1, NULL, mqtt_publish_thread, client);
mqtt_publish_thread
mqtt_publish(client, "topic1", &msg);
// 1. 构造消息
mqtt_message_t msg;
memset(&msg, 0, sizeof(msg));
msg.payload = (void *) buf;
msg.payloadlen = xxx;
mqtt_publish(client, "topic1", &msg);
// 1.1 根据MQTT协议构造数据包
// 1.2 根据平台相关的函数发送数据包
mqtt_send_packet
network_write
nettype_tcp_write
platform_net_socket_write_timeout
4.4 最复杂:订阅消息
消息何时到来?不知道!
所以,必定是某个内核线程不断查询网卡:
读网卡数据
得到数据的话就判断、处理