Linux环境下使用Eclipse Paho C 实现(MQTT Client)同步模式发布和订阅Message

目录

概述

1 同步模式和异步模式

1.1 同步模式

1.2 异步模式

2 下载和安装paho.mqtt.c

3 同步方式发布和订阅消息功能实现

3.1 MQTT Client参数配置

3.2 初始化MQTT Client

3.3 发布消息功能

3.4 订阅消息功能

3.5 解析订阅的信息

4 编译和测试

4.1 编译代码

4.2 运行

5 验证MQTT Client功能

5.1 EMQX服务器上查看MQTT Client

5.2 MQTT.fx发布Topic

5.3 MQTT.fx订阅的主题

6 完整代码


概述

本文主要介绍在linux环境(ubuntu)环境下,下载和安装Eclipse Paho C MQTT 软件包,还编写一个范例实现同步发布Message的功能,并使用基于EMQX的服务验证其功能,还是用MQTT.fx订阅消息,已验证发布消息功能的可靠性。

1 同步模式和异步模式

1.1 同步模式

在同步模式下,客户机应用程序在单个线程上运行。使用MQTTClient_publish()和MQTTClient_publishMessage()函数发布消息。要确定QoS1或QoS2(请参阅服务质量)消息已成功交付,应用程必须调用MQTTClient_waitForCompletion()函数。同步发布示例中显示了显示同步发布的示例。在同步模式下接消息使用MQTTClient_receive()函数。客户机应用程序必须相对频繁地调用 MQTTClient_receive()MQTTClient_yield(),以便允许处理确认和MQTT“ping”,从而保持与服务器的网络连接处于活动状态。

总结同步模式应用方法

1)客户机应用程序在单个线程上运行

2)使用MQTTClient_publish()或者MQTTClient_publishMessage()发布消息

3)使用MQTTClient_waitForCompletion()确认消息是否发布成功

4)使用MQTTClient_receive()接收消息

5)必须频繁调用MQTTClient_receive()和MQTTClient_yield(),以确认消息

1.2 异步模式

在异步模式下,客户机应用程序在多个线程上运行。主程序调用客户端库中的函数来发布和订阅,就像同步模式一样。但是,握手和维护网络连接的处理是在后台执行的。使用调用MQTTClient_setCallbacks()(参见MQTTClient_messageArrived()、MQTTClient_connectionLost()和MQTTClient_deliveryComplete())向库注册的回调,向客户端应用程序提供状态通知和消息接收。然而,这个API不是线程安全的——在没有同步的情况下,不可能从多个线程调用它。可以为此使用MQTTAsync API来实现这些功能。

总结异步模式应用方法

1)客户机应用程序在多个线程上运行

2)主程序调用客户端库中的函数来发布和订阅,使用MQTTClient_publish()或者MQTTClient_publishMessage()发布消息;使用MQTTClient_publishMessage订阅消息

3)使用调用MQTTClient_setCallbacks(),向客户端应用程序提供状态通知和消息接收

异步模式的详细使用范例,参看文章:

Linux环境下使用Eclipse Paho C 实现(MQTT Client)异步订阅Message-CSDN博客

Linux环境下使用Eclipse Paho C 实现(MQTT Client)异步方式发布Message-CSDN博客

2 下载和安装paho.mqtt.c

登录mqtt官网,点击Software,可以看见如下页面,选择Eclipse Paho C进入下载页面

https://mqtt.org/

下载paho.mqtt.c

笔者选择使用命令直接安装该软件包,具体操作步骤如下:

Step -1: 下载软件包执行命令:

git clone https://github.com/eclipse/paho.mqtt.c.git

step-2: 进入paho.mqtt.c目录,执行make

cd paho.mqtt.c
make

系统会自动编译代码,等待编译结果。

编译完成后,会自动生成build文件,这时可以安装

step-3 : 执行如下命令就可以安装软件

sudo make install

3 同步方式发布和订阅消息功能实现

3.1 MQTT Client参数配置

初始化MQTT Client,必须配置一些参数,包括broker的IP地址,订阅的topic等,具体参数如下表所示:

参数功能介绍:

参数名称 参数值 描述
ADDRESS tcp://192.168.1.11:1883 mqtt broker的IP地址
CLIENTID mqtt_ubuntu_asys 设备ID
TOPIC MQTTAsync 发布的Topic
SUBTOPIC switch 订阅的Topic
PAYLOAD 12.56 Topic下的payload
QOS 1 服务质量等级=1
TIMEOUT 10000L 超时计数
USERNAME mqtt_ubuntu_user 终端认证username
PASSWORD 123456 终端认证username对应的password

在代码中定义这些参数的位置:

3.2 初始化MQTT Client

初始化MQTT终端需要完成以下2个步骤:

step-1: 创建MQTT Client

step-2: 连接服务器

具体实现代码如下:

代码66行:创建MQTT Client,需要传入服务器IP和Client ID信息

代码73行: 心跳包时间间隔设置为20s

代码74行: 清除会话 标记设置为1,不接受离线消息

代码75行: 配置设备终端用户

代码76行: 配置设备终端用户password

代码78行: MQTT连接Broker

3.3 发布消息功能

要实现发布消息功能,需要将payload及其相关参数填到MQTTClient_message定义的数据结构中,下面介绍整个public message 函数的功能。

代码39行: 装载payload

代码40行:payload的字符长度

代码41行:消息服务等级参数

代码42行:配置为保留消息

代码44行:使用MQTTClient_publishMessage函数发布消息

代码53行:使用MQTTClient_waitForCompletion等待消息发布完毕

3.4 订阅消息功能

要取消订阅的Topic,调用MQTTClient_unsubscribe函数可以实现该功能。实现范例如下:

3.5 解析订阅的信息

代码69行: 使用MQTTClient_receive接收订阅的消息

代码70行: 通过检测rc的值,并判断topic的值是否有效,以确定是否要解析消息

4 编译和测试

4.1 编译代码

使用如下命令编译代码

 gcc test_03_Synchronous.c -lpaho-mqtt3c -lpthread

4.2 运行

执行.out文件后,可以看见,MQTT Client订阅和发布消息成功了,服务器端收到消息后,token值会自动加1

5 验证MQTT Client功能

5.1 EMQX服务器上查看MQTT Client

在ubuntu上运行MQTT Client后,EMQX服务器会显示MQTT Client的运行状态,登录EMQX服务器可以看见

在订阅管理面板上,也可以看见mqtt_ubuntu_asys订阅了Topic为"switch"

5.2 MQTT.fx发布Topic

使用MQTT.fx发布Topic为switch的消息,Client ID 为mqtt_ubuntu_asys的客户端订阅了该消息,那么当MQTT.fx发布消息之后,mqtt_ubuntu_asys会收到该消息,并在终端上打印出来。

要使用MQTT.fx MQTT Client工具订阅MQTTsync,首先保证MQTT.fx能正常连接至EMQX服务器

使用MQTT.fx发布Topic为switch的消息

{ 
   "switch": false 
}

使用MQTT.fx发布Topic为switch的消息

{ 
    "switch": false 
}

在EMQX的保留信息页面,查看MQTT.fx发布Topic为switch的消息,该信息和Client ID 为mqtt_ubuntu_asys的客户端

5.3 MQTT.fx订阅的主题

在EMQX的订阅管理页面,查看MQTT.fx订阅Topic为MQTTsync的消息,该信息和Client ID 为mqtt_ubuntu_asys的客户端发布的消息

在MQTT.fx上查看Topic为MQTTsync的消息

6 完整代码

创建test_03_Synchronous.c,编写如下代码:

/***************************************************************
Copyright  2024-2029. All rights reserved.
文件名  : test_03_Synchronous.c
作者    : tangmingfei2013@126.com
版本    : V1.0
描述    : mqtt同步发布和订阅消息
日志    : 初版V1.0 2024/03/13
​
***************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
​
#include "MQTTClient.h"
 
#define ADDRESS     "tcp://192.168.1.11:1883"
#define CLIENTID    "mqtt_ubuntu_asys"
#define TOPIC       "MQTTsync"
#define SUBTOPIC    "switch"
​
#define PAYLOAD     "12.56"
#define QOS         1
#define TIMEOUT     10000L
​
#define USERNAME    "mqtt_ubuntu_user"
#define PASSWORD    "123456" 
​
static MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
static MQTTClient_message pubmsg = MQTTClient_message_initializer;
static MQTTClient client;
static MQTTClient_deliveryToken deliveredtoken, temptoken;
int count;
​
​
MQTTClient_deliveryToken user_publicMsg( void )
{
    MQTTClient_deliveryToken token;
    int rc;
        
    pubmsg.payload = PAYLOAD;
    pubmsg.payloadlen = (int)strlen(PAYLOAD);
    pubmsg.qos = QOS;
    pubmsg.retained = 1;
    
    if ((rc = MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token)) != MQTTCLIENT_SUCCESS)
    {
         printf("Failed to publish message, return code %d\n", rc);
         exit(EXIT_FAILURE);
    }
 
    printf("Waiting for up to %d seconds for publication of %s\n"
            "on topic %s for client with ClientID: %s\n",
            (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
    rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
    printf("Message with delivery token %d delivered\n", token);
    
    return token;
}
​
int receive_subMessage( void )
{
    int topicLen;
    int rc;
    char *topic = SUBTOPIC;
    MQTTClient_message *message = NULL;
    
    rc = MQTTClient_receive( client, &topic, &topicLen, &message, TIMEOUT);
    if( rc == MQTTCLIENT_SUCCESS && topic!= NULL ){
        printf("Message arrived \n");
        printf("     topic: %s\n", topic);
        printf("   message: %.*s\n", message->payloadlen, (char*)message->payload);
    }
    printf("MQTTClient receive message, return code %d\n", rc);
    
    return rc;
}
​
​
/* 线程  */
void thread_subMsg(void)
{
    usleep(1000000L);
    count++;
}
​
​
int main(int argc, char* argv[])
{
    int current_cnt;
    pthread_t id;
    int ret;
    int rc;
 
    deliveredtoken = 0;
    if ((rc = MQTTClient_create(&client, ADDRESS, CLIENTID,
        MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
    {
         printf("Failed to create client, return code %d\n", rc);
         exit(EXIT_FAILURE);
    }
 
    conn_opts.keepAliveInterval = 20;
    conn_opts.cleansession = 1;
    conn_opts.username = USERNAME;   //用户名
    conn_opts.password = PASSWORD;   //密码
    
    if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to connect, return code %d\n", rc);
        exit(EXIT_FAILURE);
    }
    
    printf("Subscribing to topic %s\nfor client %s using QoS%d\n\n", 
            SUBTOPIC, CLIENTID, QOS);
            
    if ((rc = MQTTClient_subscribe(client, SUBTOPIC, QOS)) != MQTTCLIENT_SUCCESS)
    {
        printf("Failed to subscribe, return code %d\n", rc);
        rc = EXIT_FAILURE;
    }
    
    ret = pthread_create(&id,NULL,(void *) thread_subMsg,NULL);
    if(ret!=0){
        printf ("Create pthread error!\n");
        exit (1);
    }
​
    while(1)
    {
        receive_subMessage();
        temptoken = user_publicMsg(); 
        
        if(count != current_cnt )
        {
            current_cnt = count;
            if(temptoken != deliveredtoken){
                deliveredtoken = temptoken;
            }
        }
    }
 
    if ((rc = MQTTClient_disconnect(client, 10000)) != MQTTCLIENT_SUCCESS)
        printf("Failed to disconnect, return code %d\n", rc);
    MQTTClient_destroy(&client);
    
    return rc;
}
​

相关推荐

  1. 如何实现观察者模式发布-订阅模式

    2024-03-25 11:04:01       37 阅读
  2. 【DevOps云实践】Azure Function中使用发布/订阅模式

    2024-03-25 11:04:01       42 阅读
  3. Linux限定网络工具环境时间同步

    2024-03-25 11:04:01       54 阅读
  4. 【根据消息类型实现订阅发布模型

    2024-03-25 11:04:01       36 阅读
  5. 发布订阅模式观察者模式详解

    2024-03-25 11:04:01       56 阅读

最近更新

  1. docker php8.1+nginx base 镜像 dockerfile 配置

    2024-03-25 11:04:01       94 阅读
  2. Could not load dynamic library ‘cudart64_100.dll‘

    2024-03-25 11:04:01       100 阅读
  3. 在Django里面运行非项目文件

    2024-03-25 11:04:01       82 阅读
  4. Python语言-面向对象

    2024-03-25 11:04:01       91 阅读

热门阅读

  1. 【编程向导】代码管理-Git二期期讲解

    2024-03-25 11:04:01       29 阅读
  2. leetcode - 284. Peeking Iterator

    2024-03-25 11:04:01       38 阅读
  3. 天猫开店怎么发布产品

    2024-03-25 11:04:01       43 阅读
  4. 蓝桥杯刷题_day3

    2024-03-25 11:04:01       40 阅读
  5. vue v-for指令

    2024-03-25 11:04:01       36 阅读
  6. linux系统Kubernetes工具ingress暴露服务

    2024-03-25 11:04:01       35 阅读
  7. video/pdf文件预览与进度上传

    2024-03-25 11:04:01       40 阅读
  8. 代码审计与web安全-第四章作业

    2024-03-25 11:04:01       40 阅读
  9. vue3.0-monaco组件封装

    2024-03-25 11:04:01       36 阅读
  10. 1. 一起学习机器学习 -- Data_exploration

    2024-03-25 11:04:01       35 阅读
  11. QT GUI常用函数介绍

    2024-03-25 11:04:01       41 阅读
  12. React-创建虚拟Dom四种方法

    2024-03-25 11:04:01       36 阅读
  13. 网络安全实训Day12

    2024-03-25 11:04:01       37 阅读