我也是复制的,没有测试。应该能正常工作中。
发布端
/*********************************************************************************
* Copyright: (C) 2022 Ye Xingwei<2929273315@qq.com>
* All rights reserved.
*
* Filename: subscribe.c
* Description: This file MQTT_pub
*
* Version: 1.0.0(2022年01月04日)
* Author: Ye Xingwei <2929273315@qq.com>
* ChangeLog: 1, Release initial version on "2022年01月04日 15时08分27秒"
*
********************************************************************************/
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <unistd.h>
#include <mosquitto.h>
#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <dirent.h>
#include <signal.h>
#include <ctype.h>
#include "cJSON.h"
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE 512
static int g_stop = 0;
void mqtt_connect_callback(struct mosquitto *mosq, void *obj, int rc);
void mqtt_disconnect_callback(struct mosquitto *mosq, void *obj, int rc);
int get_time(char *datetime, int bytes);
int get_temperature(float *temp);
int get_ipaddr(char *interface,char *ipaddr,int ipaddr_size);
void sig_handle(int signum);
int main (int argc, char **argv)
{
int rv;
struct mosquitto *mosq = NULL;
/*安装信号*/
signal(SIGUSR1,sig_handle);
/* MQTT 初始化 */
rv = mosquitto_lib_init();
if(rv != MOSQ_ERR_SUCCESS)
{
printf("mosquitto lib int failure:%s\n", strerror(errno));
goto cleanup;
}
/* 创建新的客户端 */
mosq = mosquitto_new(NULL,true,NULL);
if(!mosq)
{
printf("create client failure:%s\n",strerror(errno));
goto cleanup;
}
/* 回调函数 */
mosquitto_connect_callback_set(mosq, mqtt_connect_callback);
while(!g_stop)
{
/* 连接MQTT服务器,ip,端口,时间 */
if(mosquitto_connect(mosq,HOST,PORT,KEEP_ALIVE) != MOSQ_ERR_SUCCESS)
{
printf("mosquitto_connect() failed: %s\n",strerror(errno));
goto cleanup;
}
printf("connect successfully\n");
/* 无阻塞 断线连接 */
mosquitto_loop_forever(mosq,-1,1);
sleep(10);
}
cleanup:
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
/*确认连接回函数*/
void mqtt_connect_callback(struct mosquitto *mosq, void *obj, int rc)
{
char ipaddr[16];
char *interface="eth0";
char datetime[64];
cJSON *root;
cJSON *item;
char *msg;
struct mqtt_user_data *mqtt;
printf("Connection successful cJSON call packaging\n");
float temper = 0.000000;
if(get_temperature(&temper) < 0)
{
printf("get_temperature failed.\n");
return;
}
if(get_time(datetime,sizeof(datetime))<0)
{
printf("get_time failure\n");
return ;
}
memset(ipaddr,0,sizeof(ipaddr));
if(get_ipaddr(interface,ipaddr,sizeof(ipaddr))<0)
{
printf("ERROR:get ip address failure\n");
return ;
}
root = cJSON_CreateObject();
item = cJSON_CreateObject();
/* cJSON打包 */
cJSON_AddItemToObject(root,"id",cJSON_CreateString(ipaddr));
cJSON_AddItemToObject(root,"time",cJSON_CreateString(datetime));
cJSON_AddItemToObject(root,"Temperature",cJSON_CreateNumber(temper));
msg = cJSON_Print(root);
//printf("%s\n",msg);
if(!rc)
{
if(mosquitto_publish(mosq,NULL,"temp",strlen(msg),msg,0,NULL) != MOSQ_ERR_SUCCESS)
{
printf("mosquitto_publish failed: %s\n",strerror(errno));
return;
}
}
mosquitto_disconnect(mosq);
}
/* 获取时间 */
int get_time(char *datetime, int bytes)
{
time_t now;
struct tm *t;
time(&now);
t = localtime(&now);
snprintf(datetime, bytes, "%04d-%02d-%02d %02d:%02d:%02d", t->tm_year + 1900, t->tm_mon + 1, t->tm_mday, (t->tm_hour)+8, t->tm_min, t->tm_sec);
return 0;
}
/* 安装信号 */
void sig_handle(int signum)
{
if(SIGUSR1 == signum)
{
g_stop = 1;
}
}
/* 获取温度 */
int get_temperature(float *temp)
{
int fd = -1;
char buf[128];
char *ptr=NULL;
DIR *dirp = NULL;
struct dirent *direntp = NULL;
char w1_path[64]="/sys/bus/w1/devices/";
char chip_sn[32];
int found = 0;
dirp=opendir(w1_path);
if(!dirp)
{
printf("open foldir %s failure:%s\n",w1_path,strerror(errno));
return -1;
}
while(NULL!=(direntp=readdir(dirp)))
{
if(strstr(direntp->d_name,"28-"))
{
strncpy(chip_sn, direntp->d_name,sizeof(chip_sn));
found = -1;
}
}
closedir(dirp);
if(!found)
{
printf("can not find ds18b20 chipset\n");
return -2;
}
strncat(w1_path,chip_sn,sizeof(w1_path)-strlen(w1_path));
strncat(w1_path,"/w1_slave",sizeof(w1_path)-strlen(w1_path));
if((fd = open(w1_path,O_RDONLY))<0)
{
printf("File opened successfully:%s\n",strerror(errno));
return -3;
}
memset(buf, 0, sizeof(buf));
if(read(fd, buf, sizeof(buf))<0)
{
printf("read data from fd=%d failure:%s\n",fd,strerror(errno));
return -4;
}
ptr = strstr(buf,"t=");
if(!ptr)
{
printf("t=string\n");
return -5;
}
ptr+= 2;
*temp = atof(ptr)/1000;
close(fd);
return 0;
}
/* 获取IP地址 */
int get_ipaddr(char *interface,char *ipaddr,int ipaddr_size)
{
char buf[1024];
char *ptr;
char *ip_start;
char *ip_end;
FILE *fp;
int len;
int rv;
if(!interface || !ipaddr || ipaddr_size <16)
{
printf("Invalid input argument\n");
return -2;
}
memset(buf, 0 , sizeof(buf));
snprintf(buf,sizeof(buf),"ifconfig %s",interface);
fp = popen(buf,"r");
if(NULL==fp)
{
printf("popen() to extern command\"%s\"failure:%s\n",buf,strerror(errno));
return -2;
}
rv = -3;
while(fgets(buf,sizeof(buf),fp))
{
if(strstr(buf,"netmask"))
{
ptr = strstr(buf,"inet");
if(!ptr)
{
break;
}
ptr +=strlen("inet");
while(isblank(*ptr))
ptr++;
ip_start = ptr;
while(!isblank(*ptr))
ptr++;
ip_end = ptr;
memset(ipaddr,0,sizeof(ipaddr));
len = ip_end-ip_start;
len = len>ipaddr_size ? ipaddr_size:len;
memcpy(ipaddr,ip_start,len);
rv = 0;
break;
}
}
pclose(fp);
return rv;
}
订阅端
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <mosquitto.h>
#include "cJSON.h"
#define HOST "localhost"
#define PORT 1883
#define KEEP_ALIVE 60
#define MSG_MAX_SIZE 512
static int running = 1;
/* 确认连接回调函数 */
void mqtt_connect_callback(struct mosquitto *mosq, void *obj, int rc)
{
printf("Confirm the connection to the client\n");
if(rc)
{
printf("on_connect error!\n");
exit(1);
}
else
{
if(mosquitto_subscribe(mosq, NULL, "temp", 2))
{
printf("Set the topic error!\n");
exit(1);
}
}
}
/*获取到订阅的内容*/
void mqtt_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *msg)
{
printf("Obtaining content successfully\n");
printf("\n");
printf("Succeeded in obtaining the time and temperature:%s\n", (char *)msg->payload);
}
int main (int argc, char **argv)
{
int ret;
struct mosquitto *mosq;
/* MQTT 初始化 */
ret = mosquitto_lib_init();
if(ret)
{
printf("Init lib error!\n");
goto cleanup;
return -1;
}
/* 创建新的客户端 */
mosq = mosquitto_new(NULL,true, NULL);
if(mosq == NULL)
{
printf("Create a new client failure\n");
goto cleanup;
return -1;
}
/* 回调函数 */
mosquitto_connect_callback_set(mosq, mqtt_connect_callback);
mosquitto_message_callback_set(mosq, mqtt_message_callback);
/* 连接代理 */
ret = mosquitto_connect(mosq, HOST, PORT, KEEP_ALIVE);
if(ret)
{
printf("Connect server error!\n");
goto cleanup;
return -1;
}
printf("connection client is OK\n");
while(running)
{
mosquitto_loop(mosq, -1, 1);
}
/* 释放 清空 */
cleanup:
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}