Skip to content

Commit

Permalink
Added subscription queued
Browse files Browse the repository at this point in the history
Now we can theoretically buffer some data in the queue.
  • Loading branch information
shuhaowu committed Sep 4, 2024
1 parent c950edb commit 9ec1f1b
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 0 deletions.
21 changes: 21 additions & 0 deletions examples/ros2/subscriber/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,24 @@ install(
TARGETS rt_ros2_subscriber_complex_data
DESTINATION lib/${PROJECT_NAME}
)

add_executable(rt_ros2_subscriber_queued_data
queued_data.cc
)

target_link_libraries(rt_ros2_subscriber_queued_data
PRIVATE
cactus_rt
)

setup_cactus_rt_target_options(rt_ros2_subscriber_queued_data)

ament_target_dependencies(rt_ros2_subscriber_queued_data
PUBLIC
std_msgs
)

install(
TARGETS rt_ros2_subscriber_queued_data
DESTINATION lib/${PROJECT_NAME}
)
73 changes: 73 additions & 0 deletions examples/ros2/subscriber/queued_data.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
#include <cactus_rt/ros2/app.h>
#include <cactus_rt/rt.h>

#include <chrono>
#include <memory>
#include <std_msgs/msg/int64.hpp>

using RealtimeType = std_msgs::msg::Int64;
using RosType = std_msgs::msg::Int64;

class RTROS2SubscriberThread : public cactus_rt::CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin {
int64_t run_duration_;

// We force turn off the trivial data check, because the ROS message data type
// has a defined constructor with code in it. That said, the code really is
// pretty trivial so it is safe to use in real-time. We thus disable the trivial
// type check manually.
std::shared_ptr<cactus_rt::ros2::SubscriptionQueued<RealtimeType, RosType, false>> subscription_;

static cactus_rt::CyclicThreadConfig CreateThreadConfig() {
cactus_rt::CyclicThreadConfig thread_config;
thread_config.period_ns = 1'000'000;
thread_config.cpu_affinity = std::vector<size_t>{2};
thread_config.SetFifoScheduler(80);

return thread_config;
}

public:
explicit RTROS2SubscriberThread(std::chrono::nanoseconds run_duration = std::chrono::seconds(30))
: cactus_rt::CyclicThread("RTROS2Subscriber", CreateThreadConfig()),
run_duration_(run_duration.count()) {}

void InitializeForRos2() override {
subscription_ = ros2_adapter_->CreateSubscriptionQueued<RealtimeType, RosType, false>("/cactus_rt/simple", rclcpp::QoS(100));
}

protected:
LoopControl Loop(int64_t elapsed_ns) noexcept override {
while (true) {
auto msg = subscription_->Read();
if (!msg) {
break;
}

LOG_INFO(Logger(), "Got new data at {}: {} {}", elapsed_ns, msg->id, msg->value.data);
}

return elapsed_ns > run_duration_ ? LoopControl::Stop : LoopControl::Continue;
}
};

int main(int argc, const char* argv[]) {
cactus_rt::ros2::App app(argc, argv, "QueuedDataROS2Subscriber");

constexpr std::chrono::seconds time(30);
std::cout << "Testing RT loop for " << time.count() << " seconds.\n";

auto thread = app.CreateROS2EnabledThread<RTROS2SubscriberThread>(time);

app.Start();

std::thread t([&app, &time]() {
std::this_thread::sleep_for(time);
app.RequestStop();
});
t.detach();

app.Join();

std::cout << "Done\n";
return 0;
}
13 changes: 13 additions & 0 deletions include/cactus_rt/ros2/ros2_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,19 @@ class Ros2Adapter {
return subscriber;
}

template <typename RealtimeT, typename RosT, bool CheckForTrivialRealtimeT = true>
std::shared_ptr<SubscriptionQueued<RealtimeT, RosT, CheckForTrivialRealtimeT>> CreateSubscriptionQueued(
const std::string& topic_name,
const rclcpp::QoS& qos,
size_t rt_queue_size = 1000
) {
auto subscriber = SubscriptionQueued<RealtimeT, RosT, CheckForTrivialRealtimeT>::Create(logger_, *this->ros_node_, topic_name, qos, rt_queue_size);

const std::scoped_lock lock(mut_);
subscriptions_.push_back(subscriber);
return subscriber;
}

private:
void TimerCallback();
void DrainQueues();
Expand Down
70 changes: 70 additions & 0 deletions include/cactus_rt/ros2/subscription.h
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#ifndef CACTUS_RT_ROS2_SUBSCRIPTION_H_
#define CACTUS_RT_ROS2_SUBSCRIPTION_H_

#include <readerwriterqueue.h>

#include <memory>
#include <optional>
#include <rclcpp/rclcpp.hpp>
#include <type_traits>

Expand Down Expand Up @@ -85,6 +88,73 @@ class SubscriptionLatest : public ISubscription {
return latest_value_.Read();
}
};

template <typename RealtimeT, typename RosT, bool CheckForTrivialRealtimeT = true>
class SubscriptionQueued : public ISubscription {
friend class Ros2Adapter;

static_assert(!CheckForTrivialRealtimeT || std::is_trivial_v<RealtimeT>, "RealtimeT must be a trivial object to be real-time safe");

using NoConversion = std::is_same<RealtimeT, RosT>;
using AdaptedRosType = typename std::conditional_t<NoConversion::value, RosT, rclcpp::TypeAdapter<RealtimeT, RosT>>;

using Queue = moodycamel::ReaderWriterQueue<StampedValue<RealtimeT>>;

quill::Logger* logger_;
typename rclcpp::Subscription<AdaptedRosType>::SharedPtr ros_subscription_;
int64_t current_msg_id_ = 0;
Queue queue_;

void SubscriptionCallback(const RealtimeT& rt_msg) {
current_msg_id_++; // Useful to detect message has changed by the real-time thread.

// A copy directly into the queue.
// TODO: warn that we are losing data?
queue_.try_emplace(current_msg_id_, rt_msg);
}

static std::shared_ptr<SubscriptionQueued<RealtimeT, RosT, CheckForTrivialRealtimeT>> Create(
quill::Logger* logger,
rclcpp::Node& node,
const std::string& topic_name,
const rclcpp::QoS& qos,
const size_t rt_queue_size = 1000
) {
std::shared_ptr<SubscriptionQueued<RealtimeT, RosT, CheckForTrivialRealtimeT>> subscription(
new SubscriptionQueued<RealtimeT, RosT, CheckForTrivialRealtimeT>(
logger,
moodycamel::ReaderWriterQueue<StampedValue<RealtimeT>>(rt_queue_size)
)
);

subscription->ros_subscription_ = node.create_subscription<AdaptedRosType>(
topic_name,
qos,
[subscription](const RealtimeT& rt_msg) {
// TODO: we are capturing the subscription shared_ptr by value. Will this cause a memory leak?
subscription->SubscriptionCallback(rt_msg);
}
);

return subscription;
}

SubscriptionQueued(
quill::Logger* logger,
moodycamel::ReaderWriterQueue<StampedValue<RealtimeT>>&& queue
) : logger_(logger), queue_(std::move(queue)) {}

public:
std::optional<StampedValue<RealtimeT>> Read() noexcept {
StampedValue<RealtimeT> v;
if (queue_.try_dequeue(v)) {
return v;
}

return std::nullopt;
}
};

} // namespace cactus_rt::ros2

#endif

0 comments on commit 9ec1f1b

Please sign in to comment.