服务发现全流程解析-APOLLO7.0

cyber服务发现完全依赖于fastDDS,下面从底层一步一步看下服务发现的整个过程。

topology_manager.cc

首先从这个类看起,这个类是和dds接壤的,dds发现后,完全由这个类接管,然后整体开始通信。

TopologyManager::TopologyManager()
    : init_(false),
      node_manager_(nullptr),
      channel_manager_(nullptr),
      service_manager_(nullptr),
      participant_(nullptr),
      participant_listener_(nullptr) {
  Init();
}

从上面的构造函数开始分析,除了赋值一些变量,还有就是调用了init函数, 其中node_manager_、channel_manager_、service_manager_是拓扑结构的一些类成员,participant_、participant_listener_很明显就是FAST_RTPS内部的参与者以及监听事件。紧接着进行了init初始化。

init

init函数就是整个topology_manager初始化的过程,下面逐步分析代码。

bool TopologyManager::Init() {
  if (init_.exchange(true)) {
    return true;
  }

  node_manager_ = std::make_shared<NodeManager>();
  channel_manager_ = std::make_shared<ChannelManager>();
  service_manager_ = std::make_shared<ServiceManager>();

  CreateParticipant();

  bool result =
      InitNodeManager() && InitChannelManager() && InitServiceManager();
  if (!result) {
    AERROR << "init manager failed.";
    participant_ = nullptr;
    delete participant_listener_;
    participant_listener_ = nullptr;
    node_manager_ = nullptr;
    channel_manager_ = nullptr;
    service_manager_ = nullptr;
    init_.store(false);
    return false;
  }

  return true;
}

上面首先是创建了三个share_ptr,分别对应NodeManager、ChannelManager、ServiceManager单个类,然后调用CreateParticipant创建participant_。然后对于三个mangger进行了初始化操作,,我们逐步进入分析。

node_manager_ = std::make_shared<NodeManager>();

首先就是上面的代码创建nodemanger,具体要看nodemanger类里面干了啥,所以暂时跳转到nodemanger.cc.

NodeManager、ChannelManager、ServiceManager他们三个都继承了manger类,主要用于记录节点间的关系及节点网络的图结构,紧接着,调用了CreateParticipant函数创建participant,再调用了initNodeMangger去初始化,具体要看函数实现。

CreateParticipant

bool TopologyManager::CreateParticipant() {
  std::string participant_name =
      common::GlobalData::Instance()->HostName() + '+' +
      std::to_string(common::GlobalData::Instance()->ProcessId());
  participant_listener_ = new ParticipantListener(std::bind(
      &TopologyManager::OnParticipantChange, this, std::placeholders::_1));
  participant_ = std::make_shared<transport::Participant>(
      participant_name, 11511, participant_listener_);
  return true;
}

这个函数就是使用fastdds的接口创建一个用于全局服务发现的participant,对于cyber的rtps相关的操作,可以看下participant.cc,里面可以看到和rtps官方demo一样的代码,用于通信。participant.cc中官方又封装了一层,然后来初始化和创建participant。

participant
Participant::Participant(const std::string& name, int send_port,
                         eprosima::fastrtps::ParticipantListener* listener)
    : shutdown_(false),
      name_(name),
      send_port_(send_port),
      listener_(listener),
      fastrtps_participant_(nullptr) {}
      
eprosima::fastrtps::Participant* Participant::fastrtps_participant() {
  if (shutdown_.load()) {
    return nullptr;
  }

  std::lock_guard<std::mutex> lk(mutex_);
  if (fastrtps_participant_ != nullptr) {
    return fastrtps_participant_;
  }

  CreateFastRtpsParticipant(name_, send_port_, listener_);
  return fastrtps_participant_;
}

void Participant::CreateFastRtpsParticipant(
    const std::string& name, int send_port,
    eprosima::fastrtps::ParticipantListener* listener) {
  uint32_t domain_id = 80;

  const char* val = ::getenv("CYBER_DOMAIN_ID");
  if (val != nullptr) {
    try {
      domain_id = std::stoi(val);
    } catch (const std::exception& e) {
      AERROR << "convert domain_id error " << e.what();
      return;
    }
  }

  auto part_attr_conf = std::make_shared<proto::RtpsParticipantAttr>();
  auto& global_conf = common::GlobalData::Instance()->Config();
  if (global_conf.has_transport_conf() &&
      global_conf.transport_conf().has_participant_attr()) {
    part_attr_conf->CopyFrom(global_conf.transport_conf().participant_attr());
  }

  eprosima::fastrtps::ParticipantAttributes attr;
  attr.rtps.port.domainIDGain =
      static_cast<uint16_t>(part_attr_conf->domain_id_gain());
  attr.rtps.port.portBase = static_cast<uint16_t>(part_attr_conf->port_base());

  attr.rtps.builtin.discovery_config.discoveryProtocol =
      eprosima::fastrtps::rtps::DiscoveryProtocol_t::SIMPLE;
  attr.rtps.builtin.discovery_config.m_simpleEDP
      .use_PublicationWriterANDSubscriptionReader = true;
  attr.rtps.builtin.discovery_config.m_simpleEDP
      .use_PublicationReaderANDSubscriptionWriter = true;

  attr.domainId = domain_id;

  /**
   * The user should set the lease_duration and the announcement_period with
   * values that differ in at least 30%. Values too close to each other may
   * cause the failure of the writer liveliness assertion in networks with high
   * latency or with lots of communication errors.
   */

  attr.rtps.builtin.discovery_config.leaseDuration.seconds =
      part_attr_conf->lease_duration();
  attr.rtps.builtin.discovery_config.leaseDuration_anno

相关推荐

  1. 服务发现流程解析-APOLLO7.0

    2024-06-16 11:20:01       9 阅读
  2. 描述Nacos中服务发现流程

    2024-06-16 11:20:01       11 阅读
  3. 深度解析服务发布策略之蓝绿发布

    2024-06-16 11:20:01       11 阅读

最近更新

  1. TCP协议是安全的吗?

    2024-06-16 11:20:01       18 阅读
  2. 阿里云服务器执行yum,一直下载docker-ce-stable失败

    2024-06-16 11:20:01       19 阅读
  3. 【Python教程】压缩PDF文件大小

    2024-06-16 11:20:01       18 阅读
  4. 通过文章id递归查询所有评论(xml)

    2024-06-16 11:20:01       20 阅读

热门阅读

  1. uniapp实现内嵌其他网页的功能

    2024-06-16 11:20:01       7 阅读
  2. [CODE:-5504]没有[SYS.SYSOBJECTS]对象的查询权限

    2024-06-16 11:20:01       10 阅读
  3. KaTex在博客中显示数学公式

    2024-06-16 11:20:01       8 阅读
  4. Linux安装ActiveMQ

    2024-06-16 11:20:01       7 阅读
  5. golang字符串拼接和strings.Builder

    2024-06-16 11:20:01       6 阅读
  6. QT6.3学习技巧,快速入门

    2024-06-16 11:20:01       4 阅读
  7. 图像去重技术:MD5哈希在自动化中的应用

    2024-06-16 11:20:01       5 阅读
  8. QLinkedList使用详解

    2024-06-16 11:20:01       6 阅读
  9. 基于物联网的智能晾衣架研发

    2024-06-16 11:20:01       4 阅读
  10. [程序员] openstack: openvswitch: firewall丢包

    2024-06-16 11:20:01       6 阅读
  11. STM32实现多级菜单界面显示

    2024-06-16 11:20:01       9 阅读
  12. 【Golang】Go 中的生产者-消费者模式

    2024-06-16 11:20:01       9 阅读
  13. HTML中的<img>标签使用指南

    2024-06-16 11:20:01       7 阅读