Intra-process Communications in ROS 2
A. Overview
I. Motivation
II. The general idea
The core idea is that every time a Subscription that uses intra-process communication is created, also an object of type SubscriptionIntraProcess is created.
This new object is stored in the IntraProcessManager and it contains a buffer where messages can be inserted from intra-process Publishers.
The object itself is rclcpp::Waitable so it can be added to a callback group.
When a Publisher that uses intra-process communication publishes a message, this is passed to the IntraProcessManager.
This class knows how many Subscriptions require this message and can deliver the message to all of them, by adding it to their buffers.
III. Simplified workflow
Consider a simple scenario, consisting of Publishers and Subscriptions all in the same process and with the durability QoS set to volatile.
-
The proposed implementation creates one buffer per
Subscription.Size ?
Policy ?
lock/mutex ?
-
When a message is published to a topic, its
Publisherpushes the message into the buffer of each of theSubscriptions related to that topic and raises a notification, waking up the executor.pushes the message into: how??
raises a notification: How?? thread creation??
waking up the executor: Who?? When?? How??
-
The executor can then pop the message from the buffer and trigger the callback of the
Subscription.pop the message from the buffer: to where?? When??
trigger the callback: Who?? How?? When??

The choice of having independent buffers for each
Subscriptionleads to the following advantages:
- It is easy to support different QoS for each
Subscription, while, at the same time, simplifying the implementation.- Multiple
Subscriptions can extract messages from their own buffer in parallel without blocking each other, thus providing an higher throughput.The only drawback is that the system is not reusing as much resources as possible, compared to sharing buffers between entities. However, from a practical point of view, the memory overhead caused by the proposed implementation with respect to the current one, will always be only a tiny delta compared to the overall memory usage of the application.
There are three possible data-types that can be stored in the buffer:
-
MessageT: -
shared_ptr<const MessageT> -
unique_ptr<MessageT>shared_ptr 和 unique_ptr区别和联系: 扩展阅读
-
shared_ptr:允许多个指针指向同一个对象; -
unique_ptr:独占所指向的对象。与shared_ptr不同,某个时刻只能有一个unique_ptr指向一个给定对象。当unique_ptr被销毁时,它所指向的对象也被销毁。
-
B. Details
I. Creating a publisher
-
User calls
Node::create_publisher<MessageT>(...).[FILE: camera_node.hpp] ... // Create a publisher on the output topic. pub_ = this->create_publisher<sensor_msgs::msg::Image>(output, rclcpp::SensorDataQoS()); ... -
This boils down to
NodeTopics::create_publisher(...), where aPublisheris created through the factory. -
Here, if intra-process communication is enabled, eventual intra-process related variables are initialized through the
PublisherBase::setup_intra_process(...)method.[FILE: publisher.hpp] virtual void ( rclcpp::node_interfaces::NodeBaseInterface * node_base, const std::string & topic, const rclcpp::QoS & qos, const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options) { ... // If needed, setup intra process communication. if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) { auto context = node_base->get_context(); // Get the intra process manager instance for this context. auto ipm = context->get_sub_context<rclcpp::experimental::IntraProcessManager>(); // Register the publisher with the intra process manager. ... uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this()); // HERE! this->setup_intra_process( // HERE! intra_process_publisher_id, ipm); } }通过
rclcpp::detail::resolve_use_intra_process函数来判断某个node是否启用intra-process。[FILE: publisher_base.hpp] /// Implementation utility function used to setup intra process publishing after creation. RCLCPP_PUBLIC void setup_intra_process( uint64_t intra_process_publisher_id, IntraProcessManagerSharedPtr ipm); [FILE: publisher_base.cpp] void PublisherBase::setup_intra_process( uint64_t intra_process_publisher_id, IntraProcessManagerSharedPtr ipm) { intra_process_publisher_id_ = intra_process_publisher_id; weak_ipm_ = ipm; intra_process_is_enabled_ = true; }可以看到一个启用intra-process的publisher需要IntraProcessManager这个东西,IntraProcessManager可以通过
add_publisher来添加相关的publisher,添加后IntraProcessManager会返回一个uint64_t类型的标识ID;然后通过setup_intra_process将创建好的publisher对象与IntraProcessManager进行关联。 -
Then the
IntraProcessManageris notified about the existence of the newPublisherthrough the methodIntraProcessManager::add_publisher(PublisherBase::SharedPtr publisher, PublisherOptions options). -
IntraProcessManager::add_publisher(...)stores thePublisherinformation in an internal structure of typePublisherInfo. The structure contains information about thePublisher, such as its QoS and its topic name, and a weak pointer for thePublisherobject. Anuint64_t pub_idunique within therclcpp::Contextis assigned to thePublisher. TheIntraProcessManagercontains astd::map<uint64_t, PublisherInfo>object where it is possible to retrieve thePublisherInfoof a specificPublishergiven its id. The function returns thepub_id, that is stored within thePublisher.[intra_process_manager.cpp] uint64_t IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher) { std::unique_lock<std::shared_timed_mutex> lock(mutex_); auto id = IntraProcessManager::get_next_unique_id(); // SRLIU_mark1 publishers_[id].publisher = publisher; // SRLIU_mark2 publishers_[id].topic_name = publisher->get_topic_name(); publishers_[id].qos = publisher->get_actual_qos().get_rmw_qos_profile(); // Initialize the subscriptions storage for this publisher. pub_to_subs_[id] = SplittedSubscriptions(); // SRLIU_mark3 // create an entry for the publisher id and populate with already existing subscriptions for (auto & pair : subscriptions_) { if (can_communicate(publishers_[id], pair.second)) { insert_sub_id_for_pub(pair.first, id, pair.second.use_take_shared_method); // SRLIU_mark4 } } return id; }SRLIU_mark1:
get_next_unique_id(...)会修改原子变量_next_unique_id以获得唯一ID值,此值似乎是本设备上所有pub和sub共享的。SRLIU_mark2:
publishers_是类PublisherMap的实例,用于存放有效的pub的部分相关信息。PublisherMap的定义为using PublisherMap = std::unordered_map<uint64_t, PublisherInfo>是一个无序图。SRLIU_mark3:
pub_to_subs_为一个无序图,储存了一个publisher被哪些subscription订阅,SplittedSubscriptions包含两个数组,分别表示可共享订阅和独占订阅。SRLIU_mark4: 更新
pub_to_subs_中的映射关系,这之后pub_to_subs_[id]应该关联了所有与这个pub相关的sub,代码实现如下:void IntraProcessManager::insert_sub_id_for_pub( uint64_t sub_id, uint64_t pub_id, bool use_take_shared_method) { if (use_take_shared_method) { pub_to_subs_[pub_id].take_shared_subscriptions.push_back(sub_id); } else { pub_to_subs_[pub_id].take_ownership_subscriptions.push_back(sub_id); } }
II. Creating a subscription
sequenceDiagram
participant A as USER NODE
participant B as subscription.hpp
participant C as intra_process_manager.hpp
participant D as sub_intra_process.hpp
participant E as create_intra_process_buffer.hpp
participant F as ring_buffer_impl.hpp
participant G as guard_condition.hpp
A->>B: rclcpp::create_subscription()
rect rgb(175, 255, 212)
B->>D: SubscriptionIntraProcess()
D->>E: create_intra_process_buffer()
E->>F: RingBufferImplementation()
E->>F: TypedIntraProcessBuffer()
D->>G: rcl_guard_condition_init()
G-->>B: RETURN
end
rect rgb(187, 213, 259)
B->>+C: IPM::add_subscription()
loop add sub_id to all matchable pubs
C->>C: insert_sub_id_for_pub()
end
C-->>B: RETURN
end
rect rgb(255, 251, 221)
B->>D: SubscriptionBase::setup_intra_process()
D-->>B: RETURN
end
%%Note over B: [DIR: rclcpp]
%%Note over D: [DIR: rclcpp/experimental]
%%Note over E: [DIR: rclcpp/experimental]
%%Note over F: [DIR: rclcpp/experimental/buffers]
-
User calls
Node::create_subscription<MessageT>(...).[node_impl.hpp] std::shared_ptr<SubscriptionT> Node::create_subscription( const std::string & topic_name, const rclcpp::QoS & qos, CallbackT && callback, const SubscriptionOptionsWithAllocator<AllocatorT> & options, typename MessageMemoryStrategyT::SharedPtr msg_mem_strat) { return rclcpp::create_subscription<MessageT>( // SRLIU_mark1 *this, extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()), qos, std::forward<CallbackT>(callback), options, msg_mem_strat); } -
Node::create_subscription<MessageT>(...)callsrclcpp::create_subscription(...), which uses:[src/ros2/rclcpp/rclcpp/include/rclcpp/subscription.hpp] Subscription( rclcpp::node_interfaces::NodeBaseInterface * node_base, const rosidl_message_type_support_t & type_support_handle, const std::string & topic_name, const rclcpp::QoS & qos, AnySubscriptionCallback<CallbackMessageT, AllocatorT> callback, const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options, typename MessageMemoryStrategyT::SharedPtr message_memory_strategy, SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr) -
This boils down to
NodeTopics::create_subscription(...), where aSubscriptionis created through the factory.[node_topics.cpp] ... return subscription_factory.create_typed_subscription(node_base_, topic_name, qos); ... -
Here, if intra-process communication is enabled, intra-process related variables are initialized through the
SubscriptionBase::setup_intra_process(...)method. The most relevant ones being the ring buffer and the waitable object.[rclcpp/rclcpp/src/rclcpp/subscription_base.cpp] void SubscriptionBase::setup_intra_process( uint64_t intra_process_subscription_id, IntraProcessManagerWeakPtr weak_ipm) { intra_process_subscription_id_ = intra_process_subscription_id; weak_ipm_ = weak_ipm; use_intra_process_ = true; } -
Then the
IntraProcessManageris notified about the existence of the newSubscriptionthrough the methodIntraProcessManager::add_subscription(SubscriptionBase::SharedPtr subscription, SubscriptionOptions options). -
IntraProcessManager::add_subscription(...)stores theSubscriptioninformation in an internal structure of typeSubscriptionInfo. The structure contains information about theSubscription, such as its QoS, its topic name and the type of its callback, and a weak pointer for theSubscriptionobject. Anuint64_t sub_idunique within therclcpp::Contextis assigned to theSubscription. TheIntraProcessManagercontains astd::map<uint64_t, SubscriptionInfo>object where it is possible to retrieve theSubscriptionInfoof a specificSubscriptiongiven its id. There is also an additional structurestd::map<uint64_t, std::pair<std::set<uint64_t>, std::set<uint64_t>>>. The key of the map is the unique id of aPublisherand the value is a pair of sets of ids. These sets contain the ids of theSubscriptions that can communicate with thePublisher. We have two different sets because we want to differentiate theSubscriptions depending on whether they request ownership of the received messages or not (note that this decision is done looking at their buffer, since thePublisherdoes not have to interact with theSubscriptioncallback).[rclcpp/rclcpp/src/rclcpp/intra_process_manager.cpp] uint64_t IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr subscription) { std::unique_lock<std::shared_timed_mutex> lock(mutex_); // SRLIU_mark1 auto id = IntraProcessManager::get_next_unique_id(); subscriptions_[id].subscription = subscription; // SRLIU_mark2 subscriptions_[id].topic_name = subscription->get_topic_name(); subscriptions_[id].qos = subscription->get_actual_qos(); subscriptions_[id].use_take_shared_method = subscription->use_take_shared_method(); // SRLIU_mark3 // adds the subscription id to all the matchable publishers for (auto & pair : publishers_) { if (can_communicate(pair.second, subscriptions_[id])) { insert_sub_id_for_pub(id, pair.first, subscriptions_[id].use_take_shared_method); } } return id; }SRLIU_mark2:
subscriptions_是类SubscriptionMap的实例,用于存放有效的pub的部分相关信息。SubscriptionMap的定义为using SubscriptionMap = std::unordered_map<uint64_t, SubscriptionInfo>;是一个无序图。struct SubscriptionInfo{ SubscriptionInfo() = default; rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription; rmw_qos_profile_t qos; const char * topic_name; bool use_take_shared_method; };SRLIU_mark3: 更新
pub_to_subs_中的映射关系,这之后pub_to_subs_[id]应该关联了所有与这个sub相关的pub。每个sub有一个属性即是否需要use_take_shared_method,用于标识这个sub与其pub源下属的其他sub之间对于pub所释放数据是共享还是独占属性。 -
The
SubscriptionIntraProcessWaitableobject is added to the list of Waitable interfaces of the node throughnode_interfaces::NodeWaitablesInterface::add_waitable(...). It is added to the same callback group used for the standard inter-process communication of that topic.?????????? TODO:
III. Publishing only intra-process
sequenceDiagram
participant A as USER NODE
participant B as publisher.hpp
participant C as intra_process_manager.hpp-1
Note right of C: std::shared_lock<std::shared_timed_mutex> lock(mutex_);
participant D as intra_process_manager.hpp-2
participant E as sub_intra_process.hpp
participant F as intra_process_buffer.hpp
participant G as ring_buffer_implementation.hpp
A->>B: Publisher::publish (unique_ptr* msg)
B->>+C: IPM::do_intra_process_publish()
C->>C: pub_to_subs_.find()
alt None of the sub_buffers require ownership
rect rgb(175, 255, 212)
C->>D: IPM::add_shared_msg_to_buffers()
loop for (auto id : subscription_ids)
D->>E: provide_intra_process_message()
E->>F: add_shared()
F->>F: add_shared_impl()
F->>+G: enqueue()
G-->>-E: RETURN
E->>E: trigger_guard_condition()
end
E-->>C: return
end
else Each sub_buffer requires an ownership
rect rgb(255, 251, 221)
C->>D: IPM::add_owned_msg_to_buffers()
loop for (auto id : subscription_ids)
D->>D: copy message by Malloc dynamic mem.
D->>E: provide_intra_process_message()
E->>F: buffer_->add_unique()
F->>+G: enqueue()
G-->>-E: RETURN
E->>E: trigger_guard_condition()
end
E-->>C: return
end
else Some sub_buffers require ownership
rect rgb(187, 213, 259)
C->>D: IPM::add_shared_msg_to_buffers()
D->>G:
G-->>C: return
C->>D: IPM::add_owned_msg_to_buffers()
D->>G:
G-->>C: return
end
end
C-->>-B: RETURN
%%Note over D: [DIR: rclcpp/experimental]
%%Note over E: [DIR: rclcpp/experimental]
%%Note over F: [DIR: rclcpp/experimental/buffers]
- Publishing unique_ptr
-
User calls
Publisher::publish(std::unique_ptr<MessageT> msg).[rclcpp/rclcpp/include/rclcpp/publisher.hpp] virtual void publish(std::unique_ptr<MessageT, MessageDeleter> msg) { if (!intra_process_is_enabled_) { this->do_inter_process_publish(*msg); return; } // If an interprocess subscription exist, then the unique_ptr is promoted // to a shared_ptr and published. // This allows doing the intraprocess publish first and then doing the // interprocess publish, resulting in lower publish-to-subscribe latency. // It's not possible to do that with an unique_ptr, // as do_intra_process_publish takes the ownership of the message. bool inter_process_publish_needed = get_subscription_count() > get_intra_process_subscription_count(); if (inter_process_publish_needed) { auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg)); this->do_inter_process_publish(*shared_msg); } else { this->do_intra_process_publish(std::move(msg)); } } -
Publisher::publish(std::unique_ptr<MessageT> msg)callsIntraProcessManager::do_intra_process_publish(uint64_t pub_id, std::unique_ptr<MessageT> msg). -
IntraProcessManager::do_intra_process_publish(...)uses theuint64_t pub_idto callIntraProcessManager::get_subscription_ids_for_pub(uint64_t pub_id). This returns the ids corresponding toSubscriptions that have a QoS compatible for receiving the message. These ids are divided into two sublists, according to the data-type that is stored in the buffer of eachSusbscription: requesting ownership (unique_ptr<MessageT>) or accepting shared (shared_ptr<MessageT>, but alsoMessageTsince it will copy data in any case).@ 首先看IntraProcessManager的
do_intra_process_publish实现,IntraProcessManager首先检查publisher是否有对应的subscription:[rclcpp/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp] ... auto publisher_it = pub_to_subs_.find(intra_process_publisher_id); if (publisher_it == pub_to_subs_.end()) { // Publisher is either invalid or no longer exists. RCLCPP_WARN( rclcpp::get_logger("rclcpp"), "Calling do_intra_process_publish for invalid or no longer existing publisher id"); return; } ...-
pub_to_subs_储存了一个publisher被哪些subscription订阅,SplittedSubscriptions包含两个数组,分别表示可共享订阅和独占订阅。每个publisher和subscription由IntraProcessManager分配一个uint64_t唯一标识。[rclcpp/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp] struct SplittedSubscriptions { std::vector<uint64_t> take_shared_subscriptions; // 可共享订阅的sub标识 std::vector<uint64_t> take_ownership_subscriptions; // 独占订阅的sub标识 }; using SubscriptionMap = std::unordered_map<uint64_t, SubscriptionInfo>; using PublisherMap = std::unordered_map<uint64_t, PublisherInfo>; using PublisherToSubscriptionIdsMap = std::unordered_map<uint64_t, SplittedSubscriptions>; PublisherToSubscriptionIdsMap pub_to_subs_; SubscriptionMap subscriptions_; PublisherMap publishers_;
@ 接下来,IntraProcessManager将publisher发布的消息分别传送到SubscriptionIntraProcess的
buffer_成员中。上面看到subscription由共享和独占两种,IntraProcessManager做了三种处理方法:-
publisher所有subscription都是共享的。直接将消息从
unique_ptr提升为shared_ptr,用add_shared_msg_to_buffers分发数据[rclcpp/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp] if (sub_ids.take_ownership_subscriptions.empty()) { // None of the buffers require ownership, so we promote the pointer std::shared_ptr<MessageT> msg = std::move(message); this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>( msg, sub_ids.take_shared_subscriptions); } -
subscription都是独占的,或者只有一个是共享的。等价于所有subscription都是独占,用
add_owned_msg_to_buffers分发数据[rclcpp/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp] else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT sub_ids.take_shared_subscriptions.size() <= 1) { // There is at maximum 1 buffer that does not require ownership. // So we this case is equivalent to all the buffers requiring ownership // Merge the two vector of ids into a unique one std::vector<uint64_t> concatenated_vector(sub_ids.take_shared_subscriptions); concatenated_vector.insert( concatenated_vector.end(), sub_ids.take_ownership_subscriptions.begin(), sub_ids.take_ownership_subscriptions.end()); this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>( std::move(message), concatenated_vector, allocator); } -
既有独占又有共享且数量都不止一个。先将消息拷贝,分发给共享subscription,然后再分发给独享subscription。
[rclcpp/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp] else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT sub_ids.take_shared_subscriptions.size() > 1) { // Construct a new shared pointer from the message // for the buffers that do not require ownership auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message); this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>( shared_msg, sub_ids.take_shared_subscriptions); this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>( std::move(message), sub_ids.take_ownership_subscriptions, allocator); }
@
add_shared_msg_to_buffers的实现就是遍历subscription然后调用subscription的provide_intra_process_message接口。provide_intra_process_message则将数据传到相应的buffer里,并设置waitable监听的ready条件供executor查询。[rclcpp/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp] void add_shared_msg_to_buffers( std::shared_ptr<const MessageT> message, std::vector<uint64_t> subscription_ids) { for (auto id : subscription_ids) { auto subscription_it = subscriptions_.find(id); if (subscription_it == subscriptions_.end()) { throw std::runtime_error("subscription has unexpectedly gone out of scope"); } auto subscription_base = subscription_it->second.subscription; auto subscription = std::dynamic_pointer_cast< rclcpp::experimental::SubscriptionIntraProcess<MessageT, Alloc, Deleter> >(subscription_base); if (nullptr == subscription) { throw std::runtime_error("..."); } subscription->provide_intra_process_message(message); } }@
add_owned_msg_to_buffers与add_shared_msg_to_buffers类似,不过对于前面n-1个subscription做数据拷贝,最后一个不拷贝。SRLIU_mark1:每次数据拷贝时,需要调用内存malloc分配该数据类型的内存空间并拷贝,最后传送指针给
provide_intra_process_message函数。[rclcpp/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp] ... if (std::next(it) == subscription_ids.end()) { // If this is the last subscription, give up ownership subscription->provide_intra_process_message(std::move(message)); } else { // Copy the message since we have additional subscriptions to serve MessageUniquePtr copy_message; Deleter deleter = message.get_deleter(); auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1); // SRLIU_mark1 MessageAllocTraits::construct(*allocator.get(), ptr, *message); copy_message = MessageUniquePtr(ptr, deleter); subscription->provide_intra_process_message(std::move(copy_message)); } ...@
provide_intra_process_message的实现如下:[rclcpp/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp] void provide_intra_process_message(ConstMessageSharedPtr message) { buffer_->add_shared(std::move(message)); trigger_guard_condition(); } void provide_intra_process_message(MessageUniquePtr message) { buffer_->add_unique(std::move(message)); trigger_guard_condition(); }-
add_shared(...)[rclcpp/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp] void add_shared(MessageSharedPtr msg) override { add_shared_impl<BufferT>(std::move(msg)); } // MessageSharedPtr to MessageSharedPtr template<typename DestinationT> typename std::enable_if< std::is_same<DestinationT, MessageSharedPtr>::value >::type add_shared_impl(MessageSharedPtr shared_msg) { buffer_->enqueue(std::move(shared_msg)); } -
add_unique(...)[rclcpp/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp] void add_unique(MessageUniquePtr msg) override { buffer_->enqueue(std::move(msg)); } -
void enqueue(BufferT request)[rclcpp/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp] void enqueue(BufferT request) {// std::vector<BufferT> ring_buffer_; // SRLIU_mark1 std::lock_guard<std::mutex> lock(mutex_); // SRLIU_mark2 write_index_ = next(write_index_); ring_buffer_[write_index_] = std::move(request); // SRLIU_mark3 if (is_full()) { read_index_ = next(read_index_); } else { size_++; } }SRLIU_mark1:
ring_buffer_为BufferT类型的向量,BufferT则为MessageSharedPtr或MessageUniquePtr类型。SRLIU_mark2:std::lock_guard其实就是简单的RAII封装,在构造函数中进行加锁,析构函数中进行解锁,这样可以保证函数退出时,锁一定被释放。简单来说,就是防止开发者粗心大意,函数在分支中return时,忘记unlock操作导致后续操作全部被挂起甚至引发死锁情况的。
SRLIU_mark3:????
-
trigger_guard_condition():数据存放完后,会通过rcl_trigger_guard_condition(...)通知sub数据已经ready具体怎么一个过程????
-
-
The message is “added” to the ring buffer of all the items in the lists. The
rcl_guard_condition_tmember ofSubscriptionIntraProcessWaitableof eachSubscriptionis triggered (this wakes uprclcpp::spin).
- Publishing other message types
The Publisher::publish(...) method is overloaded to support different message types:
unique_ptr<MessageT>MessageT &
IV. Receiving intra-process messages
As previously described, whenever messages are added to the ring buffer of a Subscription, a condition variable specific to the Subscription is triggered. This condition variable has been added to the Node waitset so it is being monitored by the rclcpp::spin.
-
The guard condition linked with the
SubscriptionIntraProcessWaitableobject awakesrclcpp::spin. -
The
SubscriptionIntraProcessWaitable::is_ready()condition is checked. This has to ensure that the ring buffer is not empty. -
The
SubscriptionIntraProcessWaitable::execute()function is triggered. Here the first message is extracted from the buffer and then theSubscriptionIntraProcessWaitablecalls theAnySubscriptionCallback::dispatch_intra_process(...)method. There are different implementations for this method, depending on the data-type stored in the buffer.??
-
The
AnySubscriptionCallback::dispatch_intra_process(...)method triggers the associated callback. Note that in this step, if the type of the buffer is a smart pointer one, no message copies occurr, as ownership has been already taken into account when pushing a message into the queue.
V. Number of message copies
The std::unique_ptr<MessageT> msg is passed to the IntraProcessManger that decides how to add this message to the buffers. The decision is taken looking at the number and the type, i.e. if they want ownership on messages or not, of the Subscriptions.
- Case1: If all the
Subscriptions want ownership of the message, then a total ofN-1copies of the message are required, whereNis the number ofSubscriptions. The last one will receive ownership of the published message, thus saving a copy. - Case2: If none of the
Subscriptions want ownership of the message,0copies are required. It is possible to convert the message into astd::shared_ptr<MessageT> msgand to add it to every buffer. - Case3: If there is 1
Subscriptionthat does not want ownership while the others want it, the situation is equivalent to the case of everyone requesting ownership:N-1copies of the message are required. As before the lastSubscriptionwill receive ownership. - Case4: If there is more than 1
Subscriptionthat do not want ownership while the others want it, a total ofMcopies of the message are required, whereMis the number ofSubscriptions that want ownership.1copy will be shared among all theSubscriptions that do not want ownership, whileM-1copies are for the others.
As in the current implementation, if both inter and intra-process communication are needed, the std::unique_ptr<MessageT> msg will be converted into a std::shared_ptr<MessageT> msg and passed respectively to the do_intra_process_publish and do_inter_process_publish functions.
A copy of the message will be given to all the Subscriptions requesting ownership, while the others can copy the published shared pointer.
The following tables show a recap of when the proposed implementation has to create a new copy of a message. The notation @ indicates a memory address where the message is stored, different memory addresses correspond to different copies of the message.
sequenceDiagram
participant A as publish(unique_msg)
Note over A: [FILE: publisher.hpp]
participant B as do_intra_process_publish(unique_msg)
Note over B: [FILE: intra_process_manager.hpp]
participant C as
IntraProcessManager - subscription执行过程
基于ROS2探索(二)executor 中介绍的executor处理订阅的过程,其核心是Executor::execute_any_executable函数,intra-process的订阅是一个SubscriptionIntraProcess(即waitable),每当SubscriptionIntraProcess的buffer被塞入数据,它就会变成ready,exetutor就会调用:
if (any_exec.waitable)
{
any_exec.waitable->execute();
}
-
execute使用的是SubscriptionIntraProcess的成员函数:
void execute() { execute_impl<CallbackMessageT>(); } template<class T> typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value, void>::type execute_impl() { rmw_message_info_t msg_info; msg_info.publisher_gid = {0, {0}}; msg_info.from_intra_process = true; if (any_callback_.use_take_shared_method()) { ConstMessageSharedPtr msg = buffer_->consume_shared(); any_callback_.dispatch_intra_process(msg, msg_info); } else { MessageUniquePtr msg = buffer_->consume_unique(); any_callback_.dispatch_intra_process(std::move(msg), msg_info); } } -
接着调用
dispatch_intra_process函数void dispatch_intra_process( MessageUniquePtr message, const rclcpp::MessageInfo &message_info) { TRACEPOINT(callback_start, (const void *)this, true); if (shared_ptr_callback_) //case A { typename std::shared_ptr<MessageT> shared_message = std::move(message); shared_ptr_callback_(shared_message); } else if (shared_ptr_with_info_callback_) { typename std::shared_ptr<MessageT> shared_message = std::move(message); shared_ptr_with_info_callback_(shared_message, message_info); } else if (unique_ptr_callback_) //case B { unique_ptr_callback_(std::move(message)); } else if (unique_ptr_with_info_callback_) { unique_ptr_with_info_callback_(std::move(message), message_info); } else if (const_shared_ptr_callback_ || const_shared_ptr_with_info_callback_) { throw std::runtime_error( "unexpected dispatch_intra_process unique message call" " with const shared_ptr callback"); } else { throw std::runtime_error("unexpected message without any callback set"); } TRACEPOINT(callback_end, (const void *)this); }这个函数我们主要关注第一个和第三个分支,这里称为caseA和caseB。caseA表示回调函数接受的参数是一个
shared_ptr,因此将源消息通过std::move转成了std::shared_ptr传给用户的回调函数。第三个分支caseB的回调函数输入的参数是unique_ptr,直接用move操作转右值传给回调函数执行。这里注意的是AnySubscriptionCallback有6个成员变量保存用户的回调函数,但同时只能有一个被设置。 -
11