基于Linux共享内存的数据分发DDS——C语言实现

基于共享内存的数据分发DDS——C语言实现

一、软件功能介绍

在linux环境下用C语言开发的基于共享内存的数据分发DDS软件。采用了共享内存、多线程、读写锁以及互斥锁实现。

软件支持功能如下:

  • 内部采用共享内存进行数据传输,支持多进程、多线程的数据发布、订阅功能。
  • 支持在数据发布端或订阅端使用队列功能,以适应不同的应用场景
  • 集成类似rostopic、roabag命令行工具
  • 支持采用cjson、protobuf-c、结构体格式数据发布订阅数据。

二、软件接口介绍

/*******************************************************************
* 函 数 名:init
* 功能描述:数据分发功能初始化
* 输入参数:无
* 输出参数:无
* 返 回 值:无
******************************************************************/
void init(void);

/*******************************************************************
* 函 数 名:deinit
* 功能描述:数据分发功能销毁
* 输入参数:无
* 输出参数:无
* 返 回 值:无
******************************************************************/
void deinit(void);

/*******************************************************************
* 函 数 名:create_node
* 功能描述:创建一个节点
* 输入参数:node_name 节点名称
* 输出参数:无
* 返 回 值:节点结构体指针
******************************************************************/
node_struct* create_node(const uint8_t node_name[NODE_MAX_LENGTH]);

/*******************************************************************
* 函 数 名:destroy_node
* 功能描述:销毁节点
* 输入参数:node 节点结构体指针
* 输出参数:无
* 返 回 值:无
******************************************************************/
void destroy_node(node_struct* node);

/*******************************************************************
* 函 数 名:show_topic_list
* 功能描述:获取所有话题
* 输入参数:node 节点结构体指针
* 输出参数:topic_list 话题名数组;topic_count 话题个数
* 返 回 值:无
******************************************************************/
void show_topic_list(node_struct* node,uint8_t topic_list[TOPIC_COUNT_MAX_SUPPORT][TOPIC_MAX_LENGTH],uint32_t* topic_count);

/*******************************************************************
* 函 数 名:create_pub_with_topic
* 功能描述:创建一个发布者,并绑定到一个话题中
* 输入参数:node 节点结构体指针;topic_name 话题名;max_data_size 话题数据的最大长度单位字节;pub_rate 数据发布频率单位hz;
* 输入参数:pub_model 发布模式;queue_max_count 队列元素最大个数
* 输出参数:无
* 返 回 值:发布结构体指针
******************************************************************/
topic_pub_struct* create_pub_with_topic(node_struct* node,const uint8_t topic_name[TOPIC_MAX_LENGTH],const int max_data_size,
                                                           float pub_rate,PUB_DATA_MODEL pub_model,uint32_t queue_max_count);

/*******************************************************************
* 函 数 名:pub_data
* 功能描述:发布数据
* 输入参数:pub_dev 发布结构体指针;data 发布数据;data_size 发布数据大小
* 输出参数:无
* 返 回 值:成功返回0,失败返回-1
******************************************************************/
int pub_data(topic_pub_struct* pub_dev,const uint8_t* data,const size_t data_size);

/*******************************************************************
* 函 数 名:destroy_pub
* 功能描述:销毁发布者
* 输入参数:pub_ptr 发布结构体指针
* 输出参数:无
* 返 回 值:无
******************************************************************/
void destroy_pub(topic_pub_struct* pub_ptr);

/*******************************************************************
* 函 数 名:create_sub_with_topic
* 功能描述:创建一个订阅者,并绑定到一个话题中
* 输入参数:node 节点结构体指针;topic_name 话题名;max_data_size 话题数据的最大长度单位字节;data_process 数据处理回调函数;
* 输入参数:sub_rate 数据订阅频率单位hz;sub_model 订阅模式;queue_max_count 队列元素最大个数
* 输出参数:无
* 返 回 值:订阅结构体指针
******************************************************************/
topic_sub_struct* create_sub_with_topic(node_struct* node,const uint8_t topic_name[TOPIC_MAX_LENGTH],const int max_data_size,
                      sub_callback_func data_process,float sub_rate,SUB_DATA_HANDLE_MODEL sub_model,uint32_t queue_max_count);

/*******************************************************************
* 函 数 名:sub_data_run
* 功能描述:开始启动子线程订阅数据
* 输入参数:sub_ptr 订阅结构体指针
* 输出参数:无
* 返 回 值:成功返回0;失败返回-1
******************************************************************/
int sub_data_run(topic_sub_struct* sub_ptr);

/*******************************************************************
* 函 数 名:destroy_sub
* 功能描述:销毁订阅者
* 输入参数:sub_ptr 订阅结构体指针
* 输出参数:无
* 返 回 值:无
******************************************************************/
void destroy_sub(topic_sub_struct* sub_ptr);

三、软件实现原理

软件架构

首先在共享内存上申请绑定固定key的话题管理内存块,用于存储所有正在使用的话题信息。里面将话题名与共享内存块的ID一一对应。

typedef struct
{
    int32_t reference;                               //话题管理引用计数,当计数为0后表示可以释放该共享内存
    mutex_struct lock;                               //同步锁
    uint32_t topic_count;                            //已经存储的topic个数
    topic_struct topic[TOPIC_COUNT_MAX_SUPPORT];  
}topic_manage_struct;                                //话题管理,所有进程都可以访问

应用程序首先从话题管理共享内存块获取话题名对应的共享内存ID(如果没有则创建,并将id与话题名填入话题管理共享内存块中)。然后开始向话题名对应的共享内存块发布或者订阅数据。

每块共享内存分为数据头块和数据块,数据头块保存当前数据的信息,包括用于同步的读写锁、当前数据的大小以及数据更新标识符。

typedef struct
{
    int32_t reference;                               //话题引用计数,当计数为0后表示可以释放该共享内存
    mutex_struct lock;                               //话题信息同步锁
    size_t max_data_size;                            //话题最大数据大小 pub与sub创建时指定的大小必须一致
    rwlock_struct rwlock;                            //数据同步锁
    size_t data_size;                                //话题当前数据大小 
    uint8_t data_has_flag;                           //数据标识,0 表示没有数据;每更新一次数据增1
    uint32_t data_pub_cycle;                         //数据发布周期
}topic_header_struct;                                //话题数据结构体

四、数据分发命令行工具

cdds-topic

tcdds-topic list          #列出当前运行的话题
cdds-topic hz topicname   #计算当前话题数据的发布的平均频率

cdds-bag

cdds-bag record topicname1 topicname2 topicname3 ...  #对执行的话题进行数据录制
cdds-bag player bag_file     #播放录制的文件

五、测试DEMO

基于结构体数据发布订阅

/****************************************************************
文件名称: main.c
功能描述: 基于共享内存的数据分发测试demo
创建日期: 2024-02-15
作者    : skynet
版本    : V1.0
修订记录:
***************************************************************/
#include "cshmdds.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/time.h>

typedef struct
{
    uint64_t timestamp;
    uint64_t sequence_num;

    uint8_t age;
    uint8_t name[9];
    uint8_t phone_num[12];
    uint8_t email[64];
}person_info_struct;

static uint64_t get_time()
{
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return tv.tv_sec * 1000000 + tv.tv_usec;
}

void help_usage(void)
{
    printf("Usage:\n");
    printf("\tcdds-test  pub topicname pubfreq\n");
    printf("\tcdds-test  sub topicname subfreq\n");
}

void test_sub_callback(const uint8_t* data,const size_t data_size)
{
    person_info_struct* person_data=(person_info_struct*)data; LOG_RECORD(LOGINFO,"sequence_num=%ld,timestamp=%ld,age=%d,name=%s,phone_num=%s,email=%s\n",person_data->sequence_num,person_data->timestamp,person_data->age,person_data->name,person_data->phone_num,person_data->email);
}

int main(int argc, char* argv[]) 
{
    uint8_t pub_sub_type[4]={'\0'};
    uint8_t topic_name[TOPIC_MAX_LENGTH]={'\0'};
    float pub_or_sub_freq=0;

    LOG_RECORD(LOGDEBUG,"start...argc=%d\n",argc);
    if(argc!=4)
    {
        help_usage();
        return -1;
    }
    else
    {
        strcpy(pub_sub_type,argv[1]);
        strcpy(topic_name,argv[2]);
        pub_or_sub_freq=atof(argv[3]);
        LOG_RECORD(LOGINFO,"pub_sub_type=%s\ntopic_name=%s\ndata_size=%ld\npub_or_sub_freq=%f\n",pub_sub_type,topic_name,sizeof(person_info_struct),pub_or_sub_freq);

        if(pub_or_sub_freq<=0)
        {
            help_usage();
            return -1;
        }
    }

    init();

    if(strcmp("pub", pub_sub_type)==0)
    {
        LOG_RECORD(LOGDEBUG,"create node\n");
        node_struct* tnode=create_node("pub_data");

        LOG_RECORD(LOGDEBUG,"create pub\n");
        topic_pub_struct* tpub_dev=create_pub_with_topic(tnode, topic_name, sizeof(person_info_struct),pub_or_sub_freq,DIRECT_PUB,5);
        if(tpub_dev==NULL)
        {
            LOG_RECORD(LOGDEBUG,"no topicname create\n");
            deinit();
            return -1;
        }

        LOG_RECORD(LOGDEBUG,"begin pub data\n");
        int i=0;
        while(1)
        {
            i++;
            person_info_struct person_data;
            person_data.timestamp=get_time();
            person_data.sequence_num=i;
            person_data.age=i%62+1;
            memset(person_data.name,'\0',sizeof(person_data.name));
            sprintf(person_data.name,"Bob%d",i%10);
            memset(person_data.phone_num,'\0',sizeof(person_data.phone_num));
            sprintf(person_data.phone_num,"1326874651%d",i%10);
            memset(person_data.email,'\0',sizeof(person_data.email));
            sprintf(person_data.email,"Bob%d_workspace@163.com",i%10);

            pub_data(tpub_dev, (uint8_t*)&person_data, sizeof(person_data));
            LOG_RECORD(LOGINFO,"sequence_num=%ld,timestamp=%ld,age=%d,name=%s,phone_num=%s,email=%s\n",person_data.sequence_num,
                                        person_data.timestamp,person_data.age,person_data.name,person_data.phone_num,person_data.email);
        }

    }
    else if (strcmp("sub", pub_sub_type)==0) 
    {
        LOG_RECORD(LOGDEBUG,"create node\n");
        node_struct* tnode=create_node("pub_data");
        LOG_RECORD(LOGDEBUG,"create sub\n");
        topic_sub_struct* tsub_dev=create_sub_with_topic(tnode, topic_name,  sizeof(person_info_struct), test_sub_callback,pub_or_sub_freq,DIRECT_SUB_HANDLE,5);
        if(tsub_dev==NULL)
        {
            LOG_RECORD(LOGDEBUG,"no topicname create\n");
            deinit();
            return -1;
        }
        if(sub_data_run(tsub_dev)!=0)
        {
            LOG_RECORD(LOGDEBUG,"cannot create sub thread\n");
            deinit();
            return -1;
        }
        LOG_RECORD(LOGDEBUG,"begin sub data\n");
        while(1)
        {
                usleep(5000*1000);
        }
    }

    return 0;
}

基于cjson数据发布订阅

/****************************************************************
文件名称: main.c
功能描述: 基于共享内存的数据分发测试demo
创建日期: 2024-02-15
作者    : skynet
版本    : V1.0
修订记录:
***************************************************************/
#include "cshmdds.h"
#include "cJSON.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/time.h>

/* 数据格式
{
"timestamp":123123123,
"sequence_num":1,
"name":"skynet",
"age":20,
"interests":[ "read", "movie", "sports" ],
"phone_num":"13278765213",
"email":"skynet1_workspace@163.com"
}
*/

static uint64_t get_time()
{
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return tv.tv_sec * 1000000 + tv.tv_usec;
}

void help_usage(void)
{
    printf("Usage:\n");
    printf("\tcdds-test  pub topicname pubfreq\n");
    printf("\tcdds-test  sub topicname subfreq\n");
}

void test_sub_callback(const uint8_t* data,const size_t data_size)
{
    cJSON* cjson_test=cJSON_Parse(data);
    char* t_data=cJSON_Print(cjson_test);
    LOG_RECORD(LOGINFO,"\n%s\n",t_data);
    cJSON_Delete(cjson_test);
    cJSON_free(t_data);
}

int main(int argc, char* argv[]) 
{
    uint8_t pub_sub_type[4]={'\0'};
    uint8_t topic_name[TOPIC_MAX_LENGTH]={'\0'};
    float pub_or_sub_freq=0;

    LOG_RECORD(LOGDEBUG,"start...argc=%d\n",argc);
    if(argc!=4)
    {
        help_usage();
        return -1;
    }
    else
    {
        strcpy(pub_sub_type,argv[1]);
        strcpy(topic_name,argv[2]);
        pub_or_sub_freq=atof(argv[3]);
        LOG_RECORD(LOGINFO,"pub_sub_type=%s\ntopic_name=%s\ndata_size=%ld\npub_or_sub_freq=%f\n",pub_sub_type,topic_name,256,pub_or_sub_freq);

        if(pub_or_sub_freq<=0)
        {
            help_usage();
            return -1;
        }
    }

    init();

    if(strcmp("pub", pub_sub_type)==0)
    {
        LOG_RECORD(LOGDEBUG,"create node\n");
        node_struct* tnode=create_node("pub_data");

        LOG_RECORD(LOGDEBUG,"create pub\n");
        topic_pub_struct* tpub_dev=create_pub_with_topic(tnode, topic_name, 256,pub_or_sub_freq,DIRECT_PUB,5);
        if(tpub_dev==NULL)
        {
            LOG_RECORD(LOGDEBUG,"no topicname create\n");
            deinit();
            return -1;
        }

        LOG_RECORD(LOGDEBUG,"begin pub data\n");
        int i=0;
        while(1)
        {
            i++;
            cJSON* cjson_test = cJSON_CreateObject();
            cJSON_AddNumberToObject(cjson_test, "timestamp", get_time());
            cJSON_AddNumberToObject(cjson_test, "sequence_num", i);
            cJSON_AddStringToObject(cjson_test, "name", "skynet");
            cJSON_AddNumberToObject(cjson_test, "age", i%80+1);

            cJSON* cjson_interests=cJSON_CreateArray();
            cJSON_AddItemToArray(cjson_interests, cJSON_CreateString( "read" ));
            cJSON_AddItemToArray(cjson_interests, cJSON_CreateString( "movie" ));
            cJSON_AddItemToArray(cjson_interests, cJSON_CreateString( "sports" ));
            cJSON_AddItemToObject(cjson_test, "interests", cjson_interests);

            cJSON_AddStringToObject(cjson_test, "phone_num", "13278765213");
            cJSON_AddStringToObject(cjson_test, "email", "skynet1_workspace@163.com");

            char* t_pub_data=cJSON_Print(cjson_test);
            size_t pub_data_len=strlen(t_pub_data)+5;

            pub_data(tpub_dev, (uint8_t*)t_pub_data, pub_data_len);
            LOG_RECORD(LOGINFO,"\n%s\n",t_pub_data);
            cJSON_Delete(cjson_test);
            cJSON_free(t_pub_data);

        }

    }
    else if (strcmp("sub", pub_sub_type)==0) 
    {
        LOG_RECORD(LOGDEBUG,"create node\n");
        node_struct* tnode=create_node("pub_data");
        LOG_RECORD(LOGDEBUG,"create sub\n");
        topic_sub_struct* tsub_dev=create_sub_with_topic(tnode, topic_name,  256, test_sub_callback,pub_or_sub_freq,DIRECT_SUB_HANDLE,5);
        if(tsub_dev==NULL)
        {
            LOG_RECORD(LOGDEBUG,"no topicname create\n");
            deinit();
            return -1;
        }
        if(sub_data_run(tsub_dev)!=0)
        {
            LOG_RECORD(LOGDEBUG,"cannot create sub thread\n");
            deinit();
            return -1;
        }
        LOG_RECORD(LOGDEBUG,"begin sub data\n");
        while(1)
        {
                usleep(5000*1000);
        }
    }

    return 0;
}

基于protobuf-c数据发布订阅

/****************************************************************
文件名称: main.c
功能描述: 基于共享内存的数据分发测试demo
创建日期: 2024-02-15
作者    : skynet
版本    : V1.0
修订记录:
***************************************************************/
#include "cshmdds.h"

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <sys/time.h>

#include "message.pb-c.h"

TestMessage test_msg = TEST_MESSAGE__INIT;

static uint64_t get_time()
{
    struct timeval tv;
    gettimeofday(&tv, NULL);
    return tv.tv_sec * 1000000 + tv.tv_usec;
}

void help_usage(void)
{
    printf("Usage:\n");
    printf("\tcdds-test  pub topicname pubfreq\n");
    printf("\tcdds-test  sub topicname subfreq\n");
}

void test_sub_callback(const uint8_t* data,const size_t data_size)
{
    TestMessage *test_unpack = test_message__unpack(NULL, data_size,data);
    LOG_RECORD(LOGINFO,"\nsub size=%d;\ntimestamp=%ld\nsequence_num=%ld\nage=%d\nname=%s\nphone=%s\nemail=%s\n", data_size,test_unpack->timestamp,test_unpack->sequence_num,test_unpack->age,test_unpack->name,test_unpack->phone_num,test_unpack->email);
    test_message__free_unpacked(test_unpack, NULL);
}

uint32_t init_message()
{
    uint32_t total_size=0;
    
    test_msg.timestamp=get_time();
    total_size+=sizeof(test_msg.timestamp);
    
    test_msg.sequence_num=0;
    total_size+=sizeof(test_msg.sequence_num);

    test_msg.age=0;
    total_size+=sizeof(test_msg.age);

    test_msg.name = (char*)calloc(1, 32);
    sprintf(test_msg.name,"helloworld");
    total_size+=32;

    test_msg.phone_num = (char*)calloc(1, 12);
    sprintf(test_msg.phone_num,"11133327651");
    total_size+=12;

    test_msg.email = (char*)calloc(1, 18);
    sprintf(test_msg.email,"skynet@111.com.cn");
    total_size+=18;

    return total_size;
}

int main(int argc, char* argv[]) 
{
    uint8_t pub_sub_type[4]={'\0'};
    uint8_t topic_name[TOPIC_MAX_LENGTH]={'\0'};
    float pub_or_sub_freq=0;

    LOG_RECORD(LOGDEBUG,"start...argc=%d\n",argc);
    if(argc!=4)
    {
        help_usage();
        return -1;
    }
    else
    {
        strcpy(pub_sub_type,argv[1]);
        strcpy(topic_name,argv[2]);
        pub_or_sub_freq=atof(argv[3]);
        LOG_RECORD(LOGINFO,"pub_sub_type=%s\ntopic_name=%s\npub_or_sub_freq=%f\n",pub_sub_type,topic_name,pub_or_sub_freq);

        if(pub_or_sub_freq<=0)
        {
            help_usage();
            return -1;
        }
    }

    int pack_len = init_message()*3/2;
    LOG_RECORD(LOGINFO,"max_pack_len=%d\n",pack_len);

    init();

    if(strcmp("pub", pub_sub_type)==0)
    {
        LOG_RECORD(LOGDEBUG,"create node\n");
        node_struct* tnode=create_node("pub_data");

        LOG_RECORD(LOGDEBUG,"create pub\n");
        topic_pub_struct* tpub_dev=create_pub_with_topic(tnode, topic_name, pack_len,pub_or_sub_freq,DIRECT_PUB,5);
        if(tpub_dev==NULL)
        {
            LOG_RECORD(LOGDEBUG,"no topicname create\n");
            deinit();
            return -1;
        }

        LOG_RECORD(LOGDEBUG,"begin pub data\n");

        char *pack_buf = NULL;
        pack_buf = (char*)calloc(1, pack_len);
        int token=1;
        while(1)
        {

            test_msg.timestamp=get_time();
            test_msg.sequence_num=token;
            test_msg.age=1+(token*23)%100;
            memset(test_msg.name,'\0',32);
            sprintf(test_msg.name,"helloworld%d",token);

            memset(test_msg.phone_num,'\0',12);
            sprintf(test_msg.phone_num,"%011d",token);

            memset(test_msg.email,'\0',18);
            sprintf(test_msg.email,"skynet@%d.com.cn",100+token%100);

            
            memset(pack_buf,'\0',pack_len);
            int unpack_len=test_message__pack(&test_msg, pack_buf);

            pub_data(tpub_dev, (uint8_t*)pack_buf, unpack_len);
            LOG_RECORD(LOGINFO,"\nsize=%d:\ntimestamp=%ld\nsequence_num=%ld\nage=%d\nname=%s\nphone=%s\nemail=%s\n", unpack_len,test_msg.timestamp,test_msg.sequence_num,test_msg.age,test_msg.name,test_msg.phone_num,test_msg.email);

            token++;
        }

    }
    else if (strcmp("sub", pub_sub_type)==0) 
    {
        LOG_RECORD(LOGDEBUG,"create node\n");
        node_struct* tnode=create_node("pub_data");
        LOG_RECORD(LOGDEBUG,"create sub\n");
        topic_sub_struct* tsub_dev=create_sub_with_topic(tnode, topic_name,  pack_len, test_sub_callback,pub_or_sub_freq,DIRECT_SUB_HANDLE,5);
        if(tsub_dev==NULL)
        {
            LOG_RECORD(LOGDEBUG,"no topicname create\n");
            deinit();
            return -1;
        }
        if(sub_data_run(tsub_dev)!=0)
        {
            LOG_RECORD(LOGDEBUG,"cannot create sub thread\n");
            deinit();
            return -1;
        }
        LOG_RECORD(LOGDEBUG,"begin sub data\n");
        while(1)
        {
            usleep(5000*1000);
        }
    }

    return 0;
}

六、源码

目录结构

c-shm-dds
├── build            #cmake 编译目录
├── build.sh       #编译脚本
├── cdds           #cdds源码目录
│   ├── CMakeLists.txt
│   ├── cshmdds.c
│   └── cshmdds.h
├── CMakeLists.txt
├── install        #cdds编译结果目录
│   ├── bin
│   ├── include
│   └── lib
└── tools           #工具目录
    ├── cdds-bag
    ├── cdds-demo1
    ├── cdds-demo2
    ├── cdds-demo3
    ├── cdds-test
    └── cdds-topic

源码地址
基于C语言和linux共享内存的数据分发软件

相关推荐

  1. C语言共享

    2024-04-22 07:06:02       11 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-04-22 07:06:02       19 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-04-22 07:06:02       20 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-04-22 07:06:02       20 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-04-22 07:06:02       20 阅读

热门阅读

  1. 搜索引擎中的倒排索引是什么

    2024-04-22 07:06:02       11 阅读
  2. 【数据结构】冒泡排序

    2024-04-22 07:06:02       14 阅读
  3. docker安装mysql,允许远程连接

    2024-04-22 07:06:02       15 阅读
  4. React Router 6 路由重定向与编程式导航指南

    2024-04-22 07:06:02       12 阅读
  5. 第4章 EC2 - 4.3 密钥对

    2024-04-22 07:06:02       14 阅读
  6. 欧鹏RHCE 第三次作业

    2024-04-22 07:06:02       12 阅读
  7. Pytorch或Tensorflow 深度学习库安装 (简易版)

    2024-04-22 07:06:02       14 阅读
  8. JVM 引用的分类

    2024-04-22 07:06:02       16 阅读
  9. vue3 依赖-组件tablepage-vue3版本1.0.2更新内容

    2024-04-22 07:06:02       15 阅读