1、编译安装
1.1 安装python版本
$ git clone https://github.com/eclipse-cyclonedds/cyclonedds
$ cd cyclonedds && mkdir build install && cd build
$ cmake .. -DCMAKE_INSTALL_PREFIX=../install
$ cmake --build . --config RelWithDebInfo --target install
$ cd ..
$ export CYCLONEDDS_HOME="$(pwd)/install"
$ pip3 install cyclonedds --no-binary cyclonedds
1.2 安装C++版本
2、使用
2.1、编写并编译idl
// Vehicle.idl
struct Vehicle {
string name;
int32 x;
int32 y;
}; // 确保这里有一个换行符
- 编译C++版本:
idlc -l cxx Vehicle.idl
- 编译python版本:
conda activate dp
(切换到安装cyclonedds的环境);执行idlc -l py Vehicle.idl
2.2、编写publisher并编译
// publisher.cpp
#include <iostream>
#include <thread>
#include <chrono>
#include <random> // 添加随机数生成器的头文件
#include <dds/dds.hpp>
#include "Vehicle.hpp" // 假设我们有一个IDL文件生成的Vehicle类型
using namespace org::eclipse::cyclonedds;
int main() {
// 创建域参与者
dds::domain::DomainParticipant participant(domain::default_id());
// 创建主题
dds::topic::Topic<Vehicle> topic(participant, "Vehicle");
// 创建发布者
dds::pub::Publisher publisher(participant);
// 设置DataWriter的QoS策略
// 设置DataWriter的QoS策略
dds::pub::qos::DataWriterQos writer_qos = publisher.default_datawriter_qos();
writer_qos << dds::core::policy::Reliability::Reliable(dds::core::Duration::from_microsecs(60))
<< dds::core::policy::Deadline(dds::core::Duration::from_microsecs(10))
<< dds::core::policy::Durability::TransientLocal()
<< dds::core::policy::History::KeepLast(10);
// 创建数据写入器
dds::pub::DataWriter<Vehicle> writer(publisher, topic,writer_qos);
// 初始化随机数生成器
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_real_distribution<> dis(0.1, 1.0); // 生成0.1到1.0之间的随机数
// 创建一个Vehicle对象并初始化
Vehicle vehicle;
vehicle.name("Dallara IL-15");
vehicle.x(200);
vehicle.y(200);
while (true) {
// 更新车辆位置
vehicle.x(vehicle.x() + (rand() % 3 - 1)); // -1, 0, or 1
vehicle.y(vehicle.y() + (rand() % 3 - 1)); // -1, 0, or 1
// 写入数据
writer.write(vehicle);
std::cout << ">> Published vehicle: " << vehicle.x() << ", " << vehicle.y() << std::endl;
// 随机等待0.1到1.0秒之间的时间,然后继续下一次更新
std::this_thread::sleep_for(std::chrono::duration<double>(dis(gen)));
}
return 0;
}
编译:g++ -I /usr/local/include/ddscxx -o publisher publisher.cpp Vehicle.cpp -lddscxx -lddsc
2.2、编写subscriber
# 从Cyclone DDS核心库中导入Listener、Qos(质量服务)和Policy(策略)模块
from cyclonedds.core import Listener, Qos, Policy
# 从Cyclone DDS领域模块中导入DomainParticipant(域参与者)
from cyclonedds.domain import DomainParticipant
# 从Cyclone DDS主题模块中导入Topic(主题)
from cyclonedds.topic import Topic
# 从Cyclone DDS订阅模块中导入Subscriber(订阅者)和DataReader(数据读取器)
from cyclonedds.sub import Subscriber, DataReader
# 从Cyclone DDS实用模块中导入duration(持续时间)
from cyclonedds.util import duration
# 从vehicles模块中导入Vehicle类,该类是由vehicle.idl文件通过IDL编译器生成的
from vehicles import Vehicle
# 定义一个自定义监听器类,继承自Listener,用于处理数据读取器的事件
class MyListener(Listener):
# 当读取器的生存活跃度发生变化时调用此方法
def on_liveliness_changed(self, reader, status):
print(">> Liveliness event")
# 创建监听器实例
listener = MyListener()
# 创建域参与者
domain_participant = DomainParticipant()
qos = Qos(
Policy.Reliability.BestEffort, # 设置最佳努力策略,不保证可靠传输
Policy.Deadline(duration(microseconds=10)), # 设置截止日期策略,每10微秒要求一次更新
Policy.Durability.TransientLocal, # 设置持久性策略,数据在本地保持有效
Policy.History.KeepLast(10) # 设置历史记录策略,保留最近的10条数据
)
# 创建主题
topic = Topic(domain_participant, 'Vehicle', Vehicle,qos=qos)
# 创建订阅者
subscriber = Subscriber(domain_participant)
# 创建数据读取器,并传递qos对象和监听器
reader = DataReader(domain_participant, topic, listener=listener)
# 从读取器中获取数据,设置超时时间为10秒
for sample in reader.take_iter(timeout=duration(seconds=10)):
print(sample)
注意:策略Qos尽量保持一致
3、执行
./publisher
python subscriber