Consider a simple scenario, consisting of Publisher
s and Subscription
s all in the same process and with the durability QoS set to volatile
.
The proposed implementation creates one buffer per Subscription
.
Size ?
Policy ?
lock/mutex ?
When a message is published to a topic, its Publisher
pushes the message into the buffer of each of the Subscription
s related to that topic and raises a notification, waking up the executor.
pushes the message into: how??
raises a notification: How?? thread creation??
waking up the executor: Who?? When?? How??
The executor can then pop the message from the buffer and trigger the callback of the Subscription
.
pop the message from the buffer: to where?? When??
trigger the callback: Who?? How?? When??
The choice of having independent buffers for each
Subscription
leads to the following advantages:
- It is easy to support different QoS for each
Subscription
, while, at the same time, simplifying the implementation.- Multiple
Subscription
s can extract messages from their own buffer in parallel without blocking each other, thus providing an higher throughput.The only drawback is that the system is not reusing as much resources as possible, compared to sharing buffers between entities. However, from a practical point of view, the memory overhead caused by the proposed implementation with respect to the current one, will always be only a tiny delta compared to the overall memory usage of the application.
There are three possible data-types that can be stored in the buffer:
MessageT
:
shared_ptr<const MessageT>
unique_ptr<MessageT>
shared_ptr 和 unique_ptr区别和联系: 扩展阅读
shared_ptr:
允许多个指针指向同一个对象;
unique_ptr:
独占所指向的对象。与shared_ptr不同,某个时刻只能有一个unique_ptr指向一个给定对象。当unique_ptr被销毁时,它所指向的对象也被销毁。
User calls Node::create_publisher<MessageT>(...)
.
51[FILE: camera_node.hpp]
2...
3// Create a publisher on the output topic.
4pub_ = this->create_publisher<sensor_msgs::msg::Image>(output, rclcpp::SensorDataQoS());
5...
This boils down to NodeTopics::create_publisher(...)
, where a Publisher
is created through the factory.
Here, if intra-process communication is enabled, eventual intra-process related variables are initialized through the PublisherBase::setup_intra_process(...)
method.
211[FILE: publisher.hpp]
2virtual void (
3rclcpp::node_interfaces::NodeBaseInterface * node_base,
4const std::string & topic,
5const rclcpp::QoS & qos,
6const rclcpp::PublisherOptionsWithAllocator<AllocatorT> & options)
7{
8...
9// If needed, setup intra process communication.
10if (rclcpp::detail::resolve_use_intra_process(options_, *node_base)) {
11auto context = node_base->get_context();
12// Get the intra process manager instance for this context.
13auto ipm = context->get_sub_context<rclcpp::experimental::IntraProcessManager>();
14// Register the publisher with the intra process manager.
15...
16uint64_t intra_process_publisher_id = ipm->add_publisher(this->shared_from_this()); // HERE!
17this->setup_intra_process( // HERE!
18intra_process_publisher_id,
19ipm);
20}
21}
通过
rclcpp::detail::resolve_use_intra_process
函数来判断某个node是否启用intra-process。161[FILE: publisher_base.hpp]
2/// Implementation utility function used to setup intra process publishing after creation.
3RCLCPP_PUBLIC
4void setup_intra_process(
5uint64_t intra_process_publisher_id,
6IntraProcessManagerSharedPtr ipm);
7
8[FILE: publisher_base.cpp]
9void PublisherBase::setup_intra_process(
10uint64_t intra_process_publisher_id,
11IntraProcessManagerSharedPtr ipm)
12{
13intra_process_publisher_id_ = intra_process_publisher_id;
14weak_ipm_ = ipm;
15intra_process_is_enabled_ = true;
16}
可以看到一个启用intra-process的publisher需要IntraProcessManager这个东西,IntraProcessManager可以通过
add_publisher
来添加相关的publisher,添加后IntraProcessManager会返回一个uint64_t
类型的标识ID;然后通过setup_intra_process
将创建好的publisher对象与IntraProcessManager进行关联。
Then the IntraProcessManager
is notified about the existence of the new Publisher
through the method IntraProcessManager::add_publisher(PublisherBase::SharedPtr publisher, PublisherOptions options)
.
IntraProcessManager::add_publisher(...)
stores the Publisher
information in an internal structure of type PublisherInfo
. The structure contains information about the Publisher
, such as its QoS and its topic name, and a weak pointer for the Publisher
object. An uint64_t pub_id
unique within the rclcpp::Context
is assigned to the Publisher
. The IntraProcessManager
contains a std::map<uint64_t, PublisherInfo>
object where it is possible to retrieve the PublisherInfo
of a specific Publisher
given its id. The function returns the pub_id
, that is stored within the Publisher
.
221[intra_process_manager.cpp]
2uint64_t IntraProcessManager::add_publisher(rclcpp::PublisherBase::SharedPtr publisher)
3{
4std::unique_lock<std::shared_timed_mutex> lock(mutex_);
5
6auto id = IntraProcessManager::get_next_unique_id(); // SRLIU_mark1
7
8publishers_[id].publisher = publisher; // SRLIU_mark2
9publishers_[id].topic_name = publisher->get_topic_name();
10publishers_[id].qos = publisher->get_actual_qos().get_rmw_qos_profile();
11
12// Initialize the subscriptions storage for this publisher.
13pub_to_subs_[id] = SplittedSubscriptions(); // SRLIU_mark3
14
15// create an entry for the publisher id and populate with already existing subscriptions
16for (auto & pair : subscriptions_) {
17if (can_communicate(publishers_[id], pair.second)) {
18insert_sub_id_for_pub(pair.first, id, pair.second.use_take_shared_method); // SRLIU_mark4
19}
20}
21return id;
22}
SRLIU_mark1:
get_next_unique_id(...)
会修改原子变量_next_unique_id
以获得唯一ID值,此值似乎是本设备上所有pub和sub共享的。SRLIU_mark2:
publishers_
是类PublisherMap
的实例,用于存放有效的pub的部分相关信息。PublisherMap
的定义为using PublisherMap = std::unordered_map<uint64_t, PublisherInfo>
是一个无序图。SRLIU_mark3:
pub_to_subs_
为一个无序图,储存了一个publisher被哪些subscription订阅,SplittedSubscriptions包含两个数组,分别表示可共享订阅和独占订阅。SRLIU_mark4: 更新
pub_to_subs_
中的映射关系,这之后pub_to_subs_[id]
应该关联了所有与这个pub相关的sub,代码实现如下:111void IntraProcessManager::insert_sub_id_for_pub(
2uint64_t sub_id,
3uint64_t pub_id,
4bool use_take_shared_method)
5{
6if (use_take_shared_method) {
7pub_to_subs_[pub_id].take_shared_subscriptions.push_back(sub_id);
8} else {
9pub_to_subs_[pub_id].take_ownership_subscriptions.push_back(sub_id);
10}
11}
User calls Node::create_subscription<MessageT>(...)
.
171[node_impl.hpp]
2std::shared_ptr<SubscriptionT>
3Node::create_subscription(
4const std::string & topic_name,
5const rclcpp::QoS & qos,
6CallbackT && callback,
7const SubscriptionOptionsWithAllocator<AllocatorT> & options,
8typename MessageMemoryStrategyT::SharedPtr msg_mem_strat)
9{
10return rclcpp::create_subscription<MessageT>( // SRLIU_mark1
11*this,
12extend_name_with_sub_namespace(topic_name, this->get_sub_namespace()),
13qos,
14std::forward<CallbackT>(callback),
15options,
16msg_mem_strat);
17}
Node::create_subscription<MessageT>(...)
calls rclcpp::create_subscription(...)
, which uses:
101[src/ros2/rclcpp/rclcpp/include/rclcpp/subscription.hpp]
2Subscription(
3rclcpp::node_interfaces::NodeBaseInterface * node_base,
4const rosidl_message_type_support_t & type_support_handle,
5const std::string & topic_name,
6const rclcpp::QoS & qos,
7AnySubscriptionCallback<CallbackMessageT, AllocatorT> callback,
8const rclcpp::SubscriptionOptionsWithAllocator<AllocatorT> & options,
9typename MessageMemoryStrategyT::SharedPtr message_memory_strategy,
10SubscriptionTopicStatisticsSharedPtr subscription_topic_statistics = nullptr)
This boils down to NodeTopics::create_subscription(...)
, where a Subscription
is created through the factory.
41[node_topics.cpp]
2...
3return subscription_factory.create_typed_subscription(node_base_, topic_name, qos);
4...
Here, if intra-process communication is enabled, intra-process related variables are initialized through the SubscriptionBase::setup_intra_process(...)
method. The most relevant ones being the ring buffer and the waitable object.
101[rclcpp/rclcpp/src/rclcpp/subscription_base.cpp]
2void
3SubscriptionBase::setup_intra_process(
4uint64_t intra_process_subscription_id,
5IntraProcessManagerWeakPtr weak_ipm)
6{
7intra_process_subscription_id_ = intra_process_subscription_id;
8weak_ipm_ = weak_ipm;
9use_intra_process_ = true;
10}
Then the IntraProcessManager
is notified about the existence of the new Subscription
through the method IntraProcessManager::add_subscription(SubscriptionBase::SharedPtr subscription, SubscriptionOptions options)
.
IntraProcessManager::add_subscription(...)
stores the Subscription
information in an internal structure of type SubscriptionInfo
. The structure contains information about the Subscription
, such as its QoS, its topic name and the type of its callback, and a weak pointer for the Subscription
object. An uint64_t sub_id
unique within the rclcpp::Context
is assigned to the Subscription
. The IntraProcessManager
contains a std::map<uint64_t, SubscriptionInfo>
object where it is possible to retrieve the SubscriptionInfo
of a specific Subscription
given its id. There is also an additional structure std::map<uint64_t, std::pair<std::set<uint64_t>, std::set<uint64_t>>>
. The key of the map is the unique id of a Publisher
and the value is a pair of sets of ids. These sets contain the ids of the Subscription
s that can communicate with the Publisher
. We have two different sets because we want to differentiate the Subscription
s depending on whether they request ownership of the received messages or not (note that this decision is done looking at their buffer, since the Publisher
does not have to interact with the Subscription
callback).
211[rclcpp/rclcpp/src/rclcpp/intra_process_manager.cpp]
2uint64_t
3IntraProcessManager::add_subscription(SubscriptionIntraProcessBase::SharedPtr subscription)
4{
5std::unique_lock<std::shared_timed_mutex> lock(mutex_); // SRLIU_mark1
6
7auto id = IntraProcessManager::get_next_unique_id();
8
9subscriptions_[id].subscription = subscription; // SRLIU_mark2
10subscriptions_[id].topic_name = subscription->get_topic_name();
11subscriptions_[id].qos = subscription->get_actual_qos();
12subscriptions_[id].use_take_shared_method = subscription->use_take_shared_method(); // SRLIU_mark3
13
14// adds the subscription id to all the matchable publishers
15for (auto & pair : publishers_) {
16if (can_communicate(pair.second, subscriptions_[id])) {
17insert_sub_id_for_pub(id, pair.first, subscriptions_[id].use_take_shared_method);
18}
19}
20return id;
21}
SRLIU_mark2:
subscriptions_
是类SubscriptionMap
的实例,用于存放有效的pub的部分相关信息。SubscriptionMap
的定义为using SubscriptionMap = std::unordered_map<uint64_t, SubscriptionInfo>;
是一个无序图。81struct SubscriptionInfo{
2SubscriptionInfo() = default;
3
4rclcpp::experimental::SubscriptionIntraProcessBase::SharedPtr subscription;
5rmw_qos_profile_t qos;
6const char * topic_name;
7bool use_take_shared_method;
8};
SRLIU_mark3: 更新
pub_to_subs_
中的映射关系,这之后pub_to_subs_[id]
应该关联了所有与这个sub相关的pub。每个sub有一个属性即是否需要use_take_shared_method
,用于标识这个sub与其pub源下属的其他sub之间对于pub所释放数据是共享还是独占属性。
The SubscriptionIntraProcessWaitable
object is added to the list of Waitable interfaces of the node through node_interfaces::NodeWaitablesInterface::add_waitable(...)
. It is added to the same callback group used for the standard inter-process communication of that topic.
?????????? TODO:
unique_ptr
User calls Publisher::publish(std::unique_ptr<MessageT> msg)
.
x1[rclcpp/rclcpp/include/rclcpp/publisher.hpp]
2virtual void publish(std::unique_ptr<MessageT, MessageDeleter> msg)
3{
4if (!intra_process_is_enabled_) {
5this->do_inter_process_publish(*msg);
6return;
7}
8// If an interprocess subscription exist, then the unique_ptr is promoted
9// to a shared_ptr and published.
10// This allows doing the intraprocess publish first and then doing the
11// interprocess publish, resulting in lower publish-to-subscribe latency.
12// It's not possible to do that with an unique_ptr,
13// as do_intra_process_publish takes the ownership of the message.
14bool inter_process_publish_needed =
15get_subscription_count() > get_intra_process_subscription_count();
16
17if (inter_process_publish_needed) {
18auto shared_msg = this->do_intra_process_publish_and_return_shared(std::move(msg));
19this->do_inter_process_publish(*shared_msg);
20} else {
21this->do_intra_process_publish(std::move(msg));
22}
23}
Publisher::publish(std::unique_ptr<MessageT> msg)
calls IntraProcessManager::do_intra_process_publish(uint64_t pub_id, std::unique_ptr<MessageT> msg)
.
IntraProcessManager::do_intra_process_publish(...)
uses the uint64_t pub_id
to call IntraProcessManager::get_subscription_ids_for_pub(uint64_t pub_id)
. This returns the ids corresponding to Subscription
s that have a QoS compatible for receiving the message. These ids are divided into two sublists, according to the data-type that is stored in the buffer of each Susbscription
: requesting ownership (unique_ptr<MessageT>
) or accepting shared (shared_ptr<MessageT>
, but also MessageT
since it will copy data in any case).
@ 首先看IntraProcessManager的
do_intra_process_publish
实现,IntraProcessManager首先检查publisher是否有对应的subscription:111[rclcpp/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp]
2...
3auto publisher_it = pub_to_subs_.find(intra_process_publisher_id);
4if (publisher_it == pub_to_subs_.end()) {
5// Publisher is either invalid or no longer exists.
6RCLCPP_WARN(
7rclcpp::get_logger("rclcpp"),
8"Calling do_intra_process_publish for invalid or no longer existing publisher id");
9return;
10}
11...
pub_to_subs_
储存了一个publisher被哪些subscription订阅,SplittedSubscriptions
包含两个数组,分别表示可共享订阅和独占订阅。每个publisher和subscription由IntraProcessManager分配一个uint64_t唯一标识。191[rclcpp/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp]
2struct SplittedSubscriptions
3{
4std::vector<uint64_t> take_shared_subscriptions; // 可共享订阅的sub标识
5std::vector<uint64_t> take_ownership_subscriptions; // 独占订阅的sub标识
6};
7
8using SubscriptionMap =
9std::unordered_map<uint64_t, SubscriptionInfo>;
10
11using PublisherMap =
12std::unordered_map<uint64_t, PublisherInfo>;
13
14using PublisherToSubscriptionIdsMap =
15std::unordered_map<uint64_t, SplittedSubscriptions>;
16
17PublisherToSubscriptionIdsMap pub_to_subs_;
18SubscriptionMap subscriptions_;
19PublisherMap publishers_;
@ 接下来,IntraProcessManager将publisher发布的消息分别传送到SubscriptionIntraProcess的
buffer_
成员中。上面看到subscription由共享和独占两种,IntraProcessManager做了三种处理方法:
publisher所有subscription都是共享的。直接将消息从
unique_ptr
提升为shared_ptr
,用add_shared_msg_to_buffers
分发数据81[rclcpp/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp]
2if (sub_ids.take_ownership_subscriptions.empty()) {
3// None of the buffers require ownership, so we promote the pointer
4std::shared_ptr<MessageT> msg = std::move(message);
5
6this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
7msg, sub_ids.take_shared_subscriptions);
8}
subscription都是独占的,或者只有一个是共享的。等价于所有subscription都是独占,用
add_owned_msg_to_buffers
分发数据191[rclcpp/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp]
2else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
3sub_ids.take_shared_subscriptions.size() <= 1)
4{
5// There is at maximum 1 buffer that does not require ownership.
6// So we this case is equivalent to all the buffers requiring ownership
7
8// Merge the two vector of ids into a unique one
9std::vector<uint64_t> concatenated_vector(sub_ids.take_shared_subscriptions);
10concatenated_vector.insert(
11concatenated_vector.end(),
12sub_ids.take_ownership_subscriptions.begin(),
13sub_ids.take_ownership_subscriptions.end());
14
15this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
16std::move(message),
17concatenated_vector,
18allocator);
19}
既有独占又有共享且数量都不止一个。先将消息拷贝,分发给共享subscription,然后再分发给独享subscription。
131[rclcpp/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp]
2else if (!sub_ids.take_ownership_subscriptions.empty() && // NOLINT
3sub_ids.take_shared_subscriptions.size() > 1)
4{
5// Construct a new shared pointer from the message
6// for the buffers that do not require ownership
7auto shared_msg = std::allocate_shared<MessageT, MessageAllocatorT>(*allocator, *message);
8
9this->template add_shared_msg_to_buffers<MessageT, Alloc, Deleter>(
10shared_msg, sub_ids.take_shared_subscriptions);
11this->template add_owned_msg_to_buffers<MessageT, Alloc, Deleter>(
12std::move(message), sub_ids.take_ownership_subscriptions, allocator);
13}
@
add_shared_msg_to_buffers
的实现就是遍历subscription然后调用subscription的provide_intra_process_message
接口。provide_intra_process_message
则将数据传到相应的buffer里,并设置waitable监听的ready条件供executor查询。211[rclcpp/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp]
2void add_shared_msg_to_buffers(
3std::shared_ptr<const MessageT> message,
4std::vector<uint64_t> subscription_ids)
5{
6for (auto id : subscription_ids) {
7auto subscription_it = subscriptions_.find(id);
8if (subscription_it == subscriptions_.end()) {
9throw std::runtime_error("subscription has unexpectedly gone out of scope");
10}
11auto subscription_base = subscription_it->second.subscription;
12
13auto subscription = std::dynamic_pointer_cast<
14rclcpp::experimental::SubscriptionIntraProcess<MessageT, Alloc, Deleter>
15>(subscription_base);
16if (nullptr == subscription) {
17throw std::runtime_error("...");
18}
19subscription->provide_intra_process_message(message);
20}
21}
@
add_owned_msg_to_buffers
与add_shared_msg_to_buffers
类似,不过对于前面n-1个subscription做数据拷贝,最后一个不拷贝。SRLIU_mark1:每次数据拷贝时,需要调用内存malloc分配该数据类型的内存空间并拷贝,最后传送指针给
provide_intra_process_message
函数。161[rclcpp/rclcpp/include/rclcpp/experimental/intra_process_manager.hpp]
2...
3if (std::next(it) == subscription_ids.end()) {
4// If this is the last subscription, give up ownership
5subscription->provide_intra_process_message(std::move(message));
6} else {
7// Copy the message since we have additional subscriptions to serve
8MessageUniquePtr copy_message;
9Deleter deleter = message.get_deleter();
10auto ptr = MessageAllocTraits::allocate(*allocator.get(), 1); // SRLIU_mark1
11MessageAllocTraits::construct(*allocator.get(), ptr, *message);
12copy_message = MessageUniquePtr(ptr, deleter);
13
14subscription->provide_intra_process_message(std::move(copy_message));
15}
16...
@
provide_intra_process_message
的实现如下:121[rclcpp/rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp]
2void provide_intra_process_message(ConstMessageSharedPtr message)
3{
4buffer_->add_shared(std::move(message));
5trigger_guard_condition();
6}
7
8void provide_intra_process_message(MessageUniquePtr message)
9{
10buffer_->add_unique(std::move(message));
11trigger_guard_condition();
12}
add_shared(...)
141[rclcpp/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp]
2void add_shared(MessageSharedPtr msg) override
3{
4add_shared_impl<BufferT>(std::move(msg));
5}
6// MessageSharedPtr to MessageSharedPtr
7template<typename DestinationT>
8typename std::enable_if<
9std::is_same<DestinationT, MessageSharedPtr>::value
10>::type
11add_shared_impl(MessageSharedPtr shared_msg)
12{
13buffer_->enqueue(std::move(shared_msg));
14}
add_unique(...)
51[rclcpp/rclcpp/include/rclcpp/experimental/buffers/intra_process_buffer.hpp]
2void add_unique(MessageUniquePtr msg) override
3{
4buffer_->enqueue(std::move(msg));
5}
void enqueue(BufferT request)
141[rclcpp/rclcpp/include/rclcpp/experimental/buffers/ring_buffer_implementation.hpp]
2void enqueue(BufferT request)
3{// std::vector<BufferT> ring_buffer_; // SRLIU_mark1
4std::lock_guard<std::mutex> lock(mutex_); // SRLIU_mark2
5
6write_index_ = next(write_index_);
7ring_buffer_[write_index_] = std::move(request); // SRLIU_mark3
8
9if (is_full()) {
10read_index_ = next(read_index_);
11} else {
12size_++;
13}
14}
SRLIU_mark1:
ring_buffer_
为BufferT
类型的向量,BufferT
则为MessageSharedPtr
或MessageUniquePtr
类型。SRLIU_mark2:std::lock_guard其实就是简单的RAII封装,在构造函数中进行加锁,析构函数中进行解锁,这样可以保证函数退出时,锁一定被释放。简单来说,就是防止开发者粗心大意,函数在分支中return时,忘记unlock操作导致后续操作全部被挂起甚至引发死锁情况的。
SRLIU_mark3:????
trigger_guard_condition()
:数据存放完后,会通过rcl_trigger_guard_condition(...)
通知sub数据已经ready具体怎么一个过程????
The message is “added” to the ring buffer of all the items in the lists. The rcl_guard_condition_t
member of SubscriptionIntraProcessWaitable
of each Subscription
is triggered (this wakes up rclcpp::spin
).
The Publisher::publish(...)
method is overloaded to support different message types:
unique_ptr<MessageT>
MessageT &
As previously described, whenever messages are added to the ring buffer of a Subscription
, a condition variable specific to the Subscription
is triggered. This condition variable has been added to the Node
waitset so it is being monitored by the rclcpp::spin
.
The guard condition linked with the SubscriptionIntraProcessWaitable
object awakes rclcpp::spin
.
The SubscriptionIntraProcessWaitable::is_ready()
condition is checked. This has to ensure that the ring buffer is not empty.
The SubscriptionIntraProcessWaitable::execute()
function is triggered. Here the first message is extracted from the buffer and then the SubscriptionIntraProcessWaitable
calls the AnySubscriptionCallback::dispatch_intra_process(...)
method. There are different implementations for this method, depending on the data-type stored in the buffer.
??
The AnySubscriptionCallback::dispatch_intra_process(...)
method triggers the associated callback. Note that in this step, if the type of the buffer is a smart pointer one, no message copies occurr, as ownership has been already taken into account when pushing a message into the queue.
The std::unique_ptr<MessageT> msg
is passed to the IntraProcessManger
that decides how to add this message to the buffers. The decision is taken looking at the number and the type, i.e. if they want ownership on messages or not, of the Subscription
s.
Subscription
s want ownership of the message, then a total of N-1
copies of the message are required, where N
is the number of Subscription
s. The last one will receive ownership of the published message, thus saving a copy.Subscription
s want ownership of the message, 0
copies are required. It is possible to convert the message into a std::shared_ptr<MessageT> msg
and to add it to every buffer.Subscription
that does not want ownership while the others want it, the situation is equivalent to the case of everyone requesting ownership:N-1
copies of the message are required. As before the last Subscription
will receive ownership.Subscription
that do not want ownership while the others want it, a total of M
copies of the message are required, where M
is the number of Subscription
s that want ownership. 1
copy will be shared among all the Subscription
s that do not want ownership, while M-1
copies are for the others.As in the current implementation, if both inter and intra-process communication are needed, the std::unique_ptr<MessageT> msg
will be converted into a std::shared_ptr<MessageT> msg
and passed respectively to the do_intra_process_publish
and do_inter_process_publish
functions.
A copy of the message will be given to all the Subscription
s requesting ownership, while the others can copy the published shared pointer.
The following tables show a recap of when the proposed implementation has to create a new copy of a message. The notation @
indicates a memory address where the message is stored, different memory addresses correspond to different copies of the message.
基于ROS2探索(二)executor 中介绍的executor处理订阅的过程,其核心是Executor::execute_any_executable
函数,intra-process的订阅是一个SubscriptionIntraProcess(即waitable),每当SubscriptionIntraProcess的buffer被塞入数据,它就会变成ready,exetutor就会调用:
41 if (any_exec.waitable)
2 {
3 any_exec.waitable->execute();
4 }
execute使用的是SubscriptionIntraProcess的成员函数:
211 void execute()
2 {
3 execute_impl<CallbackMessageT>();
4 }
5
6 template<class T>
7 typename std::enable_if<!std::is_same<T, rcl_serialized_message_t>::value, void>::type
8 execute_impl()
9 {
10 rmw_message_info_t msg_info;
11 msg_info.publisher_gid = {0, {0}};
12 msg_info.from_intra_process = true;
13
14 if (any_callback_.use_take_shared_method()) {
15 ConstMessageSharedPtr msg = buffer_->consume_shared();
16 any_callback_.dispatch_intra_process(msg, msg_info);
17 } else {
18 MessageUniquePtr msg = buffer_->consume_unique();
19 any_callback_.dispatch_intra_process(std::move(msg), msg_info);
20 }
21 }
接着调用dispatch_intra_process
函数
341void dispatch_intra_process(
2 MessageUniquePtr message, const rclcpp::MessageInfo &message_info)
3 {
4 TRACEPOINT(callback_start, (const void *)this, true);
5 if (shared_ptr_callback_) //case A
6 {
7 typename std::shared_ptr<MessageT> shared_message = std::move(message);
8 shared_ptr_callback_(shared_message);
9 }
10 else if (shared_ptr_with_info_callback_)
11 {
12 typename std::shared_ptr<MessageT> shared_message = std::move(message);
13 shared_ptr_with_info_callback_(shared_message, message_info);
14 }
15 else if (unique_ptr_callback_) //case B
16 {
17 unique_ptr_callback_(std::move(message));
18 }
19 else if (unique_ptr_with_info_callback_)
20 {
21 unique_ptr_with_info_callback_(std::move(message), message_info);
22 }
23 else if (const_shared_ptr_callback_ || const_shared_ptr_with_info_callback_)
24 {
25 throw std::runtime_error(
26 "unexpected dispatch_intra_process unique message call"
27 " with const shared_ptr callback");
28 }
29 else
30 {
31 throw std::runtime_error("unexpected message without any callback set");
32 }
33 TRACEPOINT(callback_end, (const void *)this);
34 }
这个函数我们主要关注第一个和第三个分支,这里称为caseA和caseB。caseA表示回调函数接受的参数是一个shared_ptr
,因此将源消息通过std::move
转成了std::shared_ptr
传给用户的回调函数。第三个分支caseB的回调函数输入的参数是unique_ptr
,直接用move操作转右值传给回调函数执行。这里注意的是AnySubscriptionCallback
有6个成员变量保存用户的回调函数,但同时只能有一个被设置。
11