Intra-process Communications in ROS 2

A. Overview

Consider a simple scenario, consisting of Publishers and Subscriptions all in the same process and with the durability QoS set to volatile.

  1. The proposed implementation creates one buffer per Subscription.

    Size ?

    Policy ?

    lock/mutex ?

  2. When a message is published to a topic, its Publisher pushes the message into the buffer of each of the Subscriptions related to that topic and raises a notification, waking up the executor.

    pushes the message into: how??

    raises a notification: How?? thread creation??

    waking up the executor: Who?? When?? How??

  3. The executor can then pop the message from the buffer and trigger the callback of the Subscription.

    pop the message from the buffer: to where?? When??

    trigger the callback: Who?? How?? When??

Proposed IPC Block Diagram

The choice of having independent buffers for each Subscription leads to the following advantages:

  • It is easy to support different QoS for each Subscription, while, at the same time, simplifying the implementation.
  • Multiple Subscriptions can extract messages from their own buffer in parallel without blocking each other, thus providing an higher throughput.

The only drawback is that the system is not reusing as much resources as possible, compared to sharing buffers between entities. However, from a practical point of view, the memory overhead caused by the proposed implementation with respect to the current one, will always be only a tiny delta compared to the overall memory usage of the application.

There are three possible data-types that can be stored in the buffer:

B. Details

I. Creating a publisher

  1. User calls Node::create_publisher<MessageT>(...).

  2. This boils down to NodeTopics::create_publisher(...), where a Publisher is created through the factory.

  3. Here, if intra-process communication is enabled, eventual intra-process related variables are initialized through the PublisherBase::setup_intra_process(...) method.

    通过rclcpp::detail::resolve_use_intra_process函数来判断某个node是否启用intra-process。

    可以看到一个启用intra-process的publisher需要IntraProcessManager这个东西,IntraProcessManager可以通过add_publisher来添加相关的publisher,添加后IntraProcessManager会返回一个uint64_t类型的标识ID;然后通过setup_intra_process将创建好的publisher对象与IntraProcessManager进行关联。

  4. Then the IntraProcessManager is notified about the existence of the new Publisher through the method IntraProcessManager::add_publisher(PublisherBase::SharedPtr publisher, PublisherOptions options).

  5. IntraProcessManager::add_publisher(...) stores the Publisher information in an internal structure of type PublisherInfo. The structure contains information about the Publisher, such as its QoS and its topic name, and a weak pointer for the Publisher object. An uint64_t pub_id unique within the rclcpp::Context is assigned to the Publisher. The IntraProcessManager contains a std::map<uint64_t, PublisherInfo> object where it is possible to retrieve the PublisherInfo of a specific Publisher given its id. The function returns the pub_id, that is stored within the Publisher.

    SRLIU_mark1: get_next_unique_id(...)会修改原子变量_next_unique_id以获得唯一ID值,此值似乎是本设备上所有pub和sub共享的。

    SRLIU_mark2: publishers_是类PublisherMap的实例,用于存放有效的pub的部分相关信息。PublisherMap的定义为using PublisherMap = std::unordered_map<uint64_t, PublisherInfo>是一个无序图。

    SRLIU_mark3: pub_to_subs_为一个无序图,储存了一个publisher被哪些subscription订阅,SplittedSubscriptions包含两个数组,分别表示可共享订阅和独占订阅。

    SRLIU_mark4: 更新pub_to_subs_中的映射关系,这之后pub_to_subs_[id]应该关联了所有与这个pub相关的sub,代码实现如下:


II. Creating a subscription

USER NODEsubscription.hppintra_process_manager.hppsub_intra_process.hppcreate_intra_process_buffer.hppring_buffer_impl.hppguard_condition.hpprclcpp::create_subscription()SubscriptionIntraProcess()create_intra_process_buffer()RingBufferImplementation()TypedIntraProcessBuffer()rcl_guard_condition_init()RETURNIPM::add_subscription()insert_sub_id_for_pub()loop[add sub_id toall matchablepubs]RETURNSubscriptionBase::setup_intra_process()RETURNUSER NODEsubscription.hppintra_process_manager.hppsub_intra_process.hppcreate_intra_process_buffer.hppring_buffer_impl.hppguard_condition.hpp
  1. User calls Node::create_subscription<MessageT>(...).

  2. Node::create_subscription<MessageT>(...) calls rclcpp::create_subscription(...), which uses:

  3. This boils down to NodeTopics::create_subscription(...), where a Subscription is created through the factory.

  4. Here, if intra-process communication is enabled, intra-process related variables are initialized through the SubscriptionBase::setup_intra_process(...) method. The most relevant ones being the ring buffer and the waitable object.

  5. Then the IntraProcessManager is notified about the existence of the new Subscription through the method IntraProcessManager::add_subscription(SubscriptionBase::SharedPtr subscription, SubscriptionOptions options).

  6. IntraProcessManager::add_subscription(...) stores the Subscription information in an internal structure of type SubscriptionInfo. The structure contains information about the Subscription, such as its QoS, its topic name and the type of its callback, and a weak pointer for the Subscription object. An uint64_t sub_id unique within the rclcpp::Context is assigned to the Subscription. The IntraProcessManager contains a std::map<uint64_t, SubscriptionInfo> object where it is possible to retrieve the SubscriptionInfo of a specific Subscription given its id. There is also an additional structure std::map<uint64_t, std::pair<std::set<uint64_t>, std::set<uint64_t>>>. The key of the map is the unique id of a Publisher and the value is a pair of sets of ids. These sets contain the ids of the Subscriptions that can communicate with the Publisher. We have two different sets because we want to differentiate the Subscriptions depending on whether they request ownership of the received messages or not (note that this decision is done looking at their buffer, since the Publisher does not have to interact with the Subscription callback).

    SRLIU_mark2: subscriptions_是类SubscriptionMap的实例,用于存放有效的pub的部分相关信息。SubscriptionMap的定义为using SubscriptionMap = std::unordered_map<uint64_t, SubscriptionInfo>;是一个无序图。

    SRLIU_mark3: 更新pub_to_subs_中的映射关系,这之后pub_to_subs_[id]应该关联了所有与这个sub相关的pub。每个sub有一个属性即是否需要use_take_shared_method,用于标识这个sub与其pub源下属的其他sub之间对于pub所释放数据是共享还是独占属性。

  7. The SubscriptionIntraProcessWaitable object is added to the list of Waitable interfaces of the node through node_interfaces::NodeWaitablesInterface::add_waitable(...). It is added to the same callback group used for the standard inter-process communication of that topic.

    ?????????? TODO:


III. Publishing only intra-process

USER NODEpublisher.hppintra_process_manager.hpp-1intra_process_manager.hpp-2sub_intra_process.hppintra_process_buffer.hppring_buffer_implementation.hppstd::shared_lock<std::shared_timed_mutex> lock(mutex_)Publisher::publish (unique_ptr* msg)IPM::do_intra_process_publish()pub_to_subs_.find()IPM::add_shared_msg_to_buffers()provide_intra_process_message()add_shared()add_shared_impl()enqueue()RETURNtrigger_guard_condition()loop[for (auto id : subscription_ids)]returnIPM::add_owned_msg_to_buffers()copy message by Malloc dynamic mem.provide_intra_process_message()buffer_->add_unique()enqueue()RETURNtrigger_guard_condition()loop[for (auto id : subscription_ids)]returnIPM::add_shared_msg_to_buffers()returnIPM::add_owned_msg_to_buffers()returnalt[None of the sub_buffers require ownership][Each sub_buffer requires an ownership][Some sub_buffers require ownership]RETURNUSER NODEpublisher.hppintra_process_manager.hpp-1intra_process_manager.hpp-2sub_intra_process.hppintra_process_buffer.hppring_buffer_implementation.hpp

- Publishing unique_ptr

  1. User calls Publisher::publish(std::unique_ptr<MessageT> msg).

  2. Publisher::publish(std::unique_ptr<MessageT> msg) calls IntraProcessManager::do_intra_process_publish(uint64_t pub_id, std::unique_ptr<MessageT> msg).

  3. IntraProcessManager::do_intra_process_publish(...) uses the uint64_t pub_id to call IntraProcessManager::get_subscription_ids_for_pub(uint64_t pub_id). This returns the ids corresponding to Subscriptions that have a QoS compatible for receiving the message. These ids are divided into two sublists, according to the data-type that is stored in the buffer of each Susbscription: requesting ownership (unique_ptr<MessageT>) or accepting shared (shared_ptr<MessageT>, but also MessageT since it will copy data in any case).

    @ 首先看IntraProcessManagerdo_intra_process_publish实现,IntraProcessManager首先检查publisher是否有对应的subscription:

    • pub_to_subs_储存了一个publisher被哪些subscription订阅,SplittedSubscriptions包含两个数组,分别表示可共享订阅独占订阅。每个publisher和subscription由IntraProcessManager分配一个uint64_t唯一标识。

    @ 接下来,IntraProcessManager将publisher发布的消息分别传送到SubscriptionIntraProcessbuffer_成员中。上面看到subscription由共享和独占两种,IntraProcessManager做了三种处理方法:

    • publisher所有subscription都是共享的。直接将消息从unique_ptr提升为shared_ptr,用add_shared_msg_to_buffers分发数据

    • subscription都是独占的,或者只有一个是共享的。等价于所有subscription都是独占,用add_owned_msg_to_buffers分发数据

    • 既有独占又有共享且数量都不止一个先将消息拷贝,分发给共享subscription,然后再分发给独享subscription。

    @ add_shared_msg_to_buffers的实现就是遍历subscription然后调用subscription的provide_intra_process_message接口。provide_intra_process_message则将数据传到相应的buffer里,并设置waitable监听的ready条件供executor查询。

    @ add_owned_msg_to_buffersadd_shared_msg_to_buffers类似,不过对于前面n-1个subscription做数据拷贝,最后一个不拷贝。

    SRLIU_mark1:每次数据拷贝时,需要调用内存malloc分配该数据类型的内存空间并拷贝,最后传送指针给provide_intra_process_message函数。

    @ provide_intra_process_message的实现如下:

    • add_shared(...)

    • add_unique(...)

    • void enqueue(BufferT request)

      SRLIU_mark1ring_buffer_BufferT类型的向量,BufferT则为MessageSharedPtrMessageUniquePtr类型。

      SRLIU_mark2:std::lock_guard其实就是简单的RAII封装,在构造函数中进行加锁,析构函数中进行解锁,这样可以保证函数退出时,锁一定被释放。简单来说,就是防止开发者粗心大意,函数在分支中return时,忘记unlock操作导致后续操作全部被挂起甚至引发死锁情况的。

      SRLIU_mark3:????

    • trigger_guard_condition():数据存放完后,会通过rcl_trigger_guard_condition(...)通知sub数据已经ready

      具体怎么一个过程????

  4. The message is “added” to the ring buffer of all the items in the lists. The rcl_guard_condition_t member of SubscriptionIntraProcessWaitable of each Subscription is triggered (this wakes up rclcpp::spin).

- Publishing other message types

The Publisher::publish(...) method is overloaded to support different message types:

 


IV. Receiving intra-process messages

As previously described, whenever messages are added to the ring buffer of a Subscription, a condition variable specific to the Subscription is triggered. This condition variable has been added to the Node waitset so it is being monitored by the rclcpp::spin.

  1. The guard condition linked with the SubscriptionIntraProcessWaitable object awakes rclcpp::spin.

  2. The SubscriptionIntraProcessWaitable::is_ready() condition is checked. This has to ensure that the ring buffer is not empty.

  3. The SubscriptionIntraProcessWaitable::execute() function is triggered. Here the first message is extracted from the buffer and then the SubscriptionIntraProcessWaitable calls the AnySubscriptionCallback::dispatch_intra_process(...) method. There are different implementations for this method, depending on the data-type stored in the buffer.

    ??

  4. The AnySubscriptionCallback::dispatch_intra_process(...) method triggers the associated callback. Note that in this step, if the type of the buffer is a smart pointer one, no message copies occurr, as ownership has been already taken into account when pushing a message into the queue.


V. Number of message copies

The std::unique_ptr<MessageT> msg is passed to the IntraProcessManger that decides how to add this message to the buffers. The decision is taken looking at the number and the type, i.e. if they want ownership on messages or not, of the Subscriptions.

As in the current implementation, if both inter and intra-process communication are needed, the std::unique_ptr<MessageT> msg will be converted into a std::shared_ptr<MessageT> msg and passed respectively to the do_intra_process_publish and do_inter_process_publish functions.

A copy of the message will be given to all the Subscriptions requesting ownership, while the others can copy the published shared pointer.

The following tables show a recap of when the proposed implementation has to create a new copy of a message. The notation @ indicates a memory address where the message is stored, different memory addresses correspond to different copies of the message.

 

 

 

 

 

 

 

 

publish(unique_msg)do_intra_process_publish(unique_msg)[FILE: publisher.hpp][FILE: intra_process_manager.hpp]publish(unique_msg)do_intra_process_publish(unique_msg)

IntraProcessManager - subscription执行过程

基于ROS2探索(二)executor 中介绍的executor处理订阅的过程,其核心是Executor::execute_any_executable函数,intra-process的订阅是一个SubscriptionIntraProcess(即waitable),每当SubscriptionIntraProcess的buffer被塞入数据,它就会变成ready,exetutor就会调用:

  1. execute使用的是SubscriptionIntraProcess的成员函数:

  2. 接着调用dispatch_intra_process函数

    这个函数我们主要关注第一个和第三个分支,这里称为caseA和caseB。caseA表示回调函数接受的参数是一个shared_ptr,因此将源消息通过std::move转成了std::shared_ptr传给用户的回调函数。第三个分支caseB的回调函数输入的参数是unique_ptr,直接用move操作转右值传给回调函数执行。这里注意的是AnySubscriptionCallback有6个成员变量保存用户的回调函数,但同时只能有一个被设置。

  3. 11

 

 

 

 

 

 

扩展阅读

  1. ROS2探索(一)Publisher-Subscriber的内部过程
  2. ROS2探索(二)executor
  3. ROS2探索(三)service
  4. ROS2探索(四)DDS与RTPS
  5. ROS2探索(五)intra-process的内部原理
  1. C++中using的三种用法