diff --git a/.vscode/settings.json b/.vscode/settings.json index 31a9bc8..c225c18 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -5,6 +5,7 @@ "editor.defaultFormatter": "llvm-vs-code-extensions.vscode-clangd" }, "clangd.arguments": [ - "--compile-commands-dir=${workspaceFolder}/build/debug" + "--compile-commands-dir=${workspaceFolder}/build/debug", + "--header-insertion=never" ] } diff --git a/cmake/ros2.cmake b/cmake/ros2.cmake index c10d9b9..5b4e7f7 100644 --- a/cmake/ros2.cmake +++ b/cmake/ros2.cmake @@ -13,5 +13,6 @@ ament_target_dependencies(cactus_rt ) add_subdirectory(examples/ros2/publisher) +add_subdirectory(examples/ros2/subscriber) ament_package() diff --git a/examples/ros2/publisher/CMakeLists.txt b/examples/ros2/publisher/CMakeLists.txt index ba7dce9..0a7a02f 100644 --- a/examples/ros2/publisher/CMakeLists.txt +++ b/examples/ros2/publisher/CMakeLists.txt @@ -1,22 +1,43 @@ find_package(std_msgs REQUIRED) -add_executable(rt_ros2_publisher - main.cc +add_executable(rt_ros2_publisher_complex_data + complex_data.cc ) -target_link_libraries(rt_ros2_publisher +target_link_libraries(rt_ros2_publisher_complex_data PRIVATE cactus_rt ) -setup_cactus_rt_target_options(rt_ros2_publisher) +setup_cactus_rt_target_options(rt_ros2_publisher_complex_data) -ament_target_dependencies(rt_ros2_publisher +ament_target_dependencies(rt_ros2_publisher_complex_data PUBLIC std_msgs ) install( - TARGETS rt_ros2_publisher + TARGETS rt_ros2_publisher_complex_data + DESTINATION lib/${PROJECT_NAME} +) + +add_executable(rt_ros2_publisher_simple_data + simple_data.cc +) + +target_link_libraries(rt_ros2_publisher_simple_data + PRIVATE + cactus_rt +) + +setup_cactus_rt_target_options(rt_ros2_publisher_simple_data) + +ament_target_dependencies(rt_ros2_publisher_simple_data + PUBLIC + std_msgs +) + +install( + TARGETS rt_ros2_publisher_simple_data DESTINATION lib/${PROJECT_NAME} ) diff --git a/examples/ros2/publisher/complex_data.cc b/examples/ros2/publisher/complex_data.cc new file mode 100644 index 0000000..fb40cde --- /dev/null +++ b/examples/ros2/publisher/complex_data.cc @@ -0,0 +1,122 @@ +#include +#include + +#include +#include +#include + +struct MyCoefficientData { + float a; + float b; + float c; + float d; +}; + +using RealtimeType = MyCoefficientData; +using RosType = std_msgs::msg::Float32MultiArray; + +template <> +struct rclcpp::TypeAdapter { + using is_specialized = std::true_type; + using custom_type = RealtimeType; + using ros_message_type = RosType; + + static void convert_to_ros_message(const custom_type& source, ros_message_type& destination) { + destination.data.reserve(4); + destination.data.push_back(source.a); + destination.data.push_back(source.b); + destination.data.push_back(source.c); + destination.data.push_back(source.d); + } + + static void convert_to_custom(const ros_message_type& source, custom_type& destination) { + destination.a = source.data.at(0); + destination.b = source.data.at(1); + destination.c = source.data.at(2); + destination.d = source.data.at(3); + } +}; + +class RTROS2PublisherThread : public cactus_rt::CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin { + int64_t last_published_at_ = 0; + int64_t run_duration_; + + std::shared_ptr> publisher_; + + static cactus_rt::CyclicThreadConfig CreateThreadConfig() { + cactus_rt::CyclicThreadConfig thread_config; + thread_config.period_ns = 1'000'000; + thread_config.cpu_affinity = std::vector{2}; + thread_config.SetFifoScheduler(80); + + return thread_config; + } + + public: + explicit RTROS2PublisherThread(std::chrono::nanoseconds run_duration = std::chrono::seconds(30)) + : cactus_rt::CyclicThread("RTROS2Publisher", CreateThreadConfig()), + run_duration_(run_duration.count()) {} + + void InitializeForRos2() override { + publisher_ = ros2_adapter_->CreatePublisher("/cactus_rt/complex", rclcpp::QoS(100)); + } + + protected: + bool Loop(int64_t elapsed_ns) noexcept override { + if (elapsed_ns - last_published_at_ > 10'000'000) { + last_published_at_ = elapsed_ns; + + const auto span = Tracer().WithSpan("Publish"); + + const auto elapsed_ns_f = static_cast(elapsed_ns); + + MyCoefficientData msg{ + elapsed_ns_f, + elapsed_ns_f / 10.0F, + elapsed_ns_f / 100.0F, + elapsed_ns_f / 1000.0F + }; + + const auto success = publisher_->Publish(msg); + LOG_INFO( + Logger(), + "{} data {}, {}, {}, {}", + success ? "Published" : "Did not publish", + msg.a, + msg.b, + msg.c, + msg.d + ); + } + + return elapsed_ns > run_duration_; + } +}; + +int main(int argc, char* argv[]) { + rclcpp::init(argc, argv); + + const cactus_rt::AppConfig app_config; + + cactus_rt::ros2::Ros2Adapter::Config ros2_adapter_config; + ros2_adapter_config.timer_interval = std::chrono::milliseconds(50); + + cactus_rt::ros2::App app("ComplexDataROS2Publisher", app_config, ros2_adapter_config); + app.StartTraceSession("build/publisher.perfetto"); + + constexpr std::chrono::seconds time(30); + std::cout << "Testing RT loop for " << time.count() << " seconds.\n"; + + auto thread = app.CreateROS2EnabledThread(time); + app.RegisterThread(thread); + + app.Start(); + + std::this_thread::sleep_for(time); + + app.RequestStop(); + app.Join(); + + std::cout << "Done\n"; + return 0; +} diff --git a/examples/ros2/publisher/main.cc b/examples/ros2/publisher/main.cc deleted file mode 100644 index b933376..0000000 --- a/examples/ros2/publisher/main.cc +++ /dev/null @@ -1,88 +0,0 @@ -#include -#include - -#include -#include -#include -#include -#include - -using cactus_rt::CyclicThread; -using cactus_rt::ros2::App; - -struct RealtimeData { - int64_t data; - - RealtimeData() = default; - explicit RealtimeData(int64_t d) : data(d) {} -}; -using RosData = std_msgs::msg::Int64; - -namespace { -void RealtimeToROS2ConverterFunc(const RealtimeData& rt_data, RosData& ros_data) { - // A bit of a silly example, but gets the point across. - ros_data.data = rt_data.data; -} -} // namespace - -class RealtimeROS2PublisherThread : public CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin { - int64_t loop_counter_ = 0; - std::shared_ptr> publisher_; - - static cactus_rt::CyclicThreadConfig CreateThreadConfig() { - cactus_rt::CyclicThreadConfig thread_config; - thread_config.period_ns = 1'000'000; - thread_config.cpu_affinity = std::vector{2}; - thread_config.SetFifoScheduler(80); - - // thread_config.tracer_config.trace_sleep = true; - thread_config.tracer_config.trace_wakeup_latency = true; - return thread_config; - } - - public: - RealtimeROS2PublisherThread(const char* name) : CyclicThread(name, CreateThreadConfig()) {} - - void InitializeForRos2() override { - publisher_ = ros2_adapter_->CreatePublisher("/hello", rclcpp::QoS(10), RealtimeToROS2ConverterFunc); - } - - int64_t GetLoopCounter() const { - return loop_counter_; - } - - protected: - bool Loop(int64_t /*now*/) noexcept final { - loop_counter_++; - if (loop_counter_ % 1000 == 0) { - LOG_INFO(Logger(), "Loop {}", loop_counter_); - - const auto span = Tracer().WithSpan("Publish"); - publisher_->Publish(loop_counter_); - } - return false; - } -}; - -int main(int argc, char* argv[]) { - rclcpp::init(argc, argv); - - App app; - app.StartTraceSession("build/data.perfetto"); - - auto thread = app.CreateROS2EnabledThread("RTROS2PublisherThread"); - app.RegisterThread(thread); - - constexpr unsigned int time = 30; - std::cout << "Testing RT loop for " << time << " seconds.\n"; - - app.Start(); - - std::this_thread::sleep_for(std::chrono::seconds(time)); - - app.RequestStop(); - app.Join(); - - std::cout << "Number of loops executed: " << thread->GetLoopCounter() << "\n"; - return 0; -} diff --git a/examples/ros2/publisher/simple_data.cc b/examples/ros2/publisher/simple_data.cc new file mode 100644 index 0000000..77853e8 --- /dev/null +++ b/examples/ros2/publisher/simple_data.cc @@ -0,0 +1,82 @@ +#include +#include + +#include +#include +#include + +using RealtimeType = std_msgs::msg::Int64; +using RosType = std_msgs::msg::Int64; + +class RTROS2PublisherThread : public cactus_rt::CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin { + int64_t last_published_at_ = 0; + int64_t run_duration_; + + // We force turn off the trivial data check, because the ROS message data type + // has a defined constructor with code in it. That said, the code really is + // pretty trivial so it is safe to use in real-time. We thus disable the trivial + // type check manually. + std::shared_ptr> publisher_; + + static cactus_rt::CyclicThreadConfig CreateThreadConfig() { + cactus_rt::CyclicThreadConfig thread_config; + thread_config.period_ns = 1'000'000; + thread_config.cpu_affinity = std::vector{2}; + thread_config.SetFifoScheduler(80); + + return thread_config; + } + + public: + explicit RTROS2PublisherThread(std::chrono::nanoseconds run_duration = std::chrono::seconds(30)) + : cactus_rt::CyclicThread("RTROS2Publisher", CreateThreadConfig()), + run_duration_(run_duration.count()) {} + + void InitializeForRos2() override { + publisher_ = ros2_adapter_->CreatePublisher("/cactus_rt/simple", rclcpp::QoS(100)); + } + + protected: + bool Loop(int64_t elapsed_ns) noexcept override { + if (elapsed_ns - last_published_at_ > 10'000'000) { + last_published_at_ = elapsed_ns; + + const auto span = Tracer().WithSpan("Publish"); + + std_msgs::msg::Int64 msg; + msg.data = elapsed_ns; + const auto success = publisher_->Publish(msg); + LOG_INFO(Logger(), "{} integer {}", success ? "Published" : "Did not publish", msg.data); + } + + return elapsed_ns > run_duration_; + } +}; + +int main(int argc, char* argv[]) { + rclcpp::init(argc, argv); + + const cactus_rt::AppConfig app_config; + + cactus_rt::ros2::Ros2Adapter::Config ros2_adapter_config; + ros2_adapter_config.timer_interval = std::chrono::milliseconds(50); + + cactus_rt::ros2::App app("SimpleDataROS2Publisher", app_config, ros2_adapter_config); + app.StartTraceSession("build/publisher.perfetto"); + + constexpr std::chrono::seconds time(30); + std::cout << "Testing RT loop for " << time.count() << " seconds.\n"; + + auto thread = app.CreateROS2EnabledThread(time); + app.RegisterThread(thread); + + app.Start(); + + std::this_thread::sleep_for(time); + + app.RequestStop(); + app.Join(); + + std::cout << "Done\n"; + return 0; +} diff --git a/examples/ros2/subscriber/CMakeLists.txt b/examples/ros2/subscriber/CMakeLists.txt new file mode 100644 index 0000000..2663257 --- /dev/null +++ b/examples/ros2/subscriber/CMakeLists.txt @@ -0,0 +1,43 @@ +find_package(std_msgs REQUIRED) + +add_executable(rt_ros2_subscriber_simple_data + simple_data.cc +) + +target_link_libraries(rt_ros2_subscriber_simple_data + PRIVATE + cactus_rt +) + +setup_cactus_rt_target_options(rt_ros2_subscriber_simple_data) + +ament_target_dependencies(rt_ros2_subscriber_simple_data + PUBLIC + std_msgs +) + +install( + TARGETS rt_ros2_subscriber_simple_data + DESTINATION lib/${PROJECT_NAME} +) + +add_executable(rt_ros2_subscriber_complex_data + complex_data.cc +) + +target_link_libraries(rt_ros2_subscriber_complex_data + PRIVATE + cactus_rt +) + +setup_cactus_rt_target_options(rt_ros2_subscriber_complex_data) + +ament_target_dependencies(rt_ros2_subscriber_complex_data + PUBLIC + std_msgs +) + +install( + TARGETS rt_ros2_subscriber_complex_data + DESTINATION lib/${PROJECT_NAME} +) diff --git a/examples/ros2/subscriber/complex_data.cc b/examples/ros2/subscriber/complex_data.cc new file mode 100644 index 0000000..a0b16cf --- /dev/null +++ b/examples/ros2/subscriber/complex_data.cc @@ -0,0 +1,102 @@ +#include +#include + +#include +#include +#include + +struct MyCoefficientData { + float a; + float b; + float c; + float d; +}; + +using RealtimeType = MyCoefficientData; +using RosType = std_msgs::msg::Float32MultiArray; + +template <> +struct rclcpp::TypeAdapter { + using is_specialized = std::true_type; + using custom_type = RealtimeType; + using ros_message_type = RosType; + + static void convert_to_ros_message(const custom_type& source, ros_message_type& destination) { + destination.data.reserve(4); + destination.data.push_back(source.a); + destination.data.push_back(source.b); + destination.data.push_back(source.c); + destination.data.push_back(source.d); + } + + static void convert_to_custom(const ros_message_type& source, custom_type& destination) { + destination.a = source.data.at(0); + destination.b = source.data.at(1); + destination.c = source.data.at(2); + destination.d = source.data.at(3); + } +}; + +class RTROS2SubscriberThread : public cactus_rt::CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin { + int64_t last_msg_id_ = 0; + int64_t run_duration_; + + std::shared_ptr> subscription_; + + static cactus_rt::CyclicThreadConfig CreateThreadConfig() { + cactus_rt::CyclicThreadConfig thread_config; + thread_config.period_ns = 1'000'000; + thread_config.cpu_affinity = std::vector{2}; + thread_config.SetFifoScheduler(80); + + return thread_config; + } + + public: + explicit RTROS2SubscriberThread(std::chrono::nanoseconds run_duration = std::chrono::seconds(30)) + : cactus_rt::CyclicThread("RTROS2Subscriber", CreateThreadConfig()), + run_duration_(run_duration.count()) {} + + void InitializeForRos2() override { + subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue("/cactus_rt/complex", rclcpp::QoS(100)); + } + + protected: + bool Loop(int64_t elapsed_ns) noexcept override { + cactus_rt::ros2::StampedValue msg; + { + const auto span = Tracer().WithSpan("Subscription::ReadLatest"); + msg = subscription_->ReadLatest(); + } + + if (msg.id != last_msg_id_) { + LOG_INFO(Logger(), "Got new data at {}: {} [{}, {}, {}, {}]", elapsed_ns, msg.id, msg.value.a, msg.value.b, msg.value.c, msg.value.d); + last_msg_id_ = msg.id; + } + + return elapsed_ns > run_duration_; + } +}; + +int main(int argc, char* argv[]) { + rclcpp::init(argc, argv); + + cactus_rt::ros2::App app("SimpleDataROS2Subscriber"); + app.StartTraceSession("build/subscriber.perfetto"); + + constexpr std::chrono::seconds time(30); + std::cout << "Testing RT loop for " << time.count() << " seconds.\n"; + + auto thread = app.CreateROS2EnabledThread(time); + app.RegisterThread(thread); + + app.Start(); + + std::this_thread::sleep_for(time); + + app.RequestStop(); + app.Join(); + + std::cout << "Done\n"; + return 0; +} diff --git a/examples/ros2/subscriber/simple_data.cc b/examples/ros2/subscriber/simple_data.cc new file mode 100644 index 0000000..a805ac0 --- /dev/null +++ b/examples/ros2/subscriber/simple_data.cc @@ -0,0 +1,77 @@ +#include +#include + +#include +#include +#include + +using RealtimeType = std_msgs::msg::Int64; +using RosType = std_msgs::msg::Int64; + +class RTROS2SubscriberThread : public cactus_rt::CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin { + int64_t last_msg_id_ = 0; + int64_t run_duration_; + + // We force turn off the trivial data check, because the ROS message data type + // has a defined constructor with code in it. That said, the code really is + // pretty trivial so it is safe to use in real-time. We thus disable the trivial + // type check manually. + std::shared_ptr> subscription_; + + static cactus_rt::CyclicThreadConfig CreateThreadConfig() { + cactus_rt::CyclicThreadConfig thread_config; + thread_config.period_ns = 1'000'000; + thread_config.cpu_affinity = std::vector{2}; + thread_config.SetFifoScheduler(80); + + return thread_config; + } + + public: + explicit RTROS2SubscriberThread(std::chrono::nanoseconds run_duration = std::chrono::seconds(30)) + : cactus_rt::CyclicThread("RTROS2Subscriber", CreateThreadConfig()), + run_duration_(run_duration.count()) {} + + void InitializeForRos2() override { + subscription_ = ros2_adapter_->CreateSubscriptionForLatestValue("/cactus_rt/simple", rclcpp::QoS(100)); + } + + protected: + bool Loop(int64_t elapsed_ns) noexcept override { + cactus_rt::ros2::StampedValue msg; + { + const auto span = Tracer().WithSpan("Subscription::ReadLatest"); + msg = subscription_->ReadLatest(); + } + + if (msg.id != last_msg_id_) { + LOG_INFO(Logger(), "Got new data at {}: {} {}", elapsed_ns, msg.id, msg.value.data); + last_msg_id_ = msg.id; + } + + return elapsed_ns > run_duration_; + } +}; + +int main(int argc, char* argv[]) { + rclcpp::init(argc, argv); + + cactus_rt::ros2::App app("SimpleDataROS2Subscriber"); + app.StartTraceSession("build/subscriber.perfetto"); + + constexpr std::chrono::seconds time(30); + std::cout << "Testing RT loop for " << time.count() << " seconds.\n"; + + auto thread = app.CreateROS2EnabledThread(time); + app.RegisterThread(thread); + + app.Start(); + + std::this_thread::sleep_for(time); + + app.RequestStop(); + app.Join(); + + std::cout << "Done\n"; + return 0; +} diff --git a/include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h b/include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h index 6f6ecae..8b56576 100644 --- a/include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h +++ b/include/cactus_rt/experimental/lockless/spsc/realtime_readable_value.h @@ -36,6 +36,8 @@ class RealtimeReadableValue { * This atomically reads the value. It returns a copy of the data. */ T Read() { + // TODO: static assert that T is a POD. + // TODO: need to figure out the atomic memory order here! T* data_ptr = atomic_ptr_.exchange(nullptr); T data = *data_ptr; @@ -43,11 +45,18 @@ class RealtimeReadableValue { return data; } + // TODO: maybe create a "loan" method that would not cause a copy in read, but + // can hold the data for longer. + + // TODO: possibly create a Write method with a timeout that will simply fail + // if the RT threads holds on too long. + /** * This atomically writes the value. It will copy the value into the storage * and free a previous storage pointer. */ void Write(const T& new_value) { + // TODO: make this perfect forwarding. auto new_ptr = std::make_unique(new_value); T* expected; diff --git a/include/cactus_rt/experimental/lockless/spsc/realtime_writable_value.h b/include/cactus_rt/experimental/lockless/spsc/realtime_writable_value.h index 2b3c580..0af7fa4 100644 --- a/include/cactus_rt/experimental/lockless/spsc/realtime_writable_value.h +++ b/include/cactus_rt/experimental/lockless/spsc/realtime_writable_value.h @@ -47,6 +47,7 @@ class RealtimeWritableValue { } void Write(const T& new_value) { + // TODO: static assert that T is a POD auto i = idx_.fetch_or(kBusyMask) & kIdxMask; buf_[i] = new_value; idx_.store((i & kIdxMask) | kNewDataMask); diff --git a/include/cactus_rt/ros2/app.h b/include/cactus_rt/ros2/app.h index 53bbf13..9e93815 100644 --- a/include/cactus_rt/ros2/app.h +++ b/include/cactus_rt/ros2/app.h @@ -9,15 +9,20 @@ namespace cactus_rt::ros2 { +class App; + class Ros2ThreadMixin { + friend class App; + protected: std::shared_ptr ros2_adapter_; - public: + private: void SetRos2Adapter(std::shared_ptr ros2_adapter) { ros2_adapter_ = ros2_adapter; } + public: virtual void InitializeForRos2() = 0; virtual ~Ros2ThreadMixin() = 0; }; diff --git a/include/cactus_rt/ros2/publisher.h b/include/cactus_rt/ros2/publisher.h index 2855629..506bfd5 100644 --- a/include/cactus_rt/ros2/publisher.h +++ b/include/cactus_rt/ros2/publisher.h @@ -3,68 +3,88 @@ #include -#include -#include +#include #include +#include namespace cactus_rt::ros2 { class Ros2Adapter; -template -using RealtimeToROS2Converter = std::function; - -template -using Ros2ToRealtimeConverter = std::function; - class IPublisher { friend class Ros2Adapter; - virtual bool DequeueAndPublishToRos() = 0; - virtual void FullyDrainAndPublishToRos() = 0; + virtual bool DequeueAndPublishToRos() = 0; + virtual size_t FullyDrainAndPublishToRos() = 0; public: - virtual ~IPublisher() = 0; + virtual ~IPublisher() = default; }; -template +template class Publisher : public IPublisher { - typename rclcpp::Publisher::SharedPtr publisher_; - std::optional> converter_; - moodycamel::ReaderWriterQueue queue_; + friend class Ros2Adapter; + + static_assert(!CheckForTrivialRealtimeT || std::is_trivial_v, "RealtimeT must be a trivial object to be real-time safe"); + + using NoConversion = std::is_same; + using AdaptedRosType = typename std::conditional_t>; + + typename rclcpp::Publisher::SharedPtr publisher_; + moodycamel::ReaderWriterQueue queue_; bool DequeueAndPublishToRos() override { - RealtimeT rt_msg; + if constexpr (NoConversion::value) { + rclcpp::LoanedMessage loaned_msg = publisher_->borrow_loaned_message(); - const bool has_data = queue_.try_dequeue(rt_msg); - if (!has_data) { - return false; - } + RealtimeT& rt_msg = loaned_msg.get(); + // First copy + if (!queue_.try_dequeue(rt_msg)) { + return false; + } - if (converter_) { - auto loaned_msg = publisher_->borrow_loaned_message(); - converter_.value()(rt_msg, loaned_msg.get()); publisher_->publish(std::move(loaned_msg)); + return true; } else { - if constexpr (std::is_same_v) { - const auto* ros_msg = static_cast(rt_msg); - publisher_->publish(*ros_msg); - } else { - throw std::invalid_argument{"converter not specified but RealtimeT and RosT are not the same?!"}; + RealtimeT rt_msg; + // First copy + if (!queue_.try_dequeue(rt_msg)) { + return false; } - } - return true; + // Possible allocation if middleware requires it. + rclcpp::LoanedMessage loaned_msg = publisher_->borrow_loaned_message(); + RosT& ros_msg = loaned_msg.get(); + + // Second copy + AdaptedRosType::convert_to_ros_message(rt_msg, ros_msg); + + publisher_->publish(std::move(loaned_msg)); + return true; + } } - void FullyDrainAndPublishToRos() override { + size_t FullyDrainAndPublishToRos() override { + size_t count = 0; while (true) { - const auto has_data = DequeueAndPublishToRos(); - if (!has_data) { - break; + if (!DequeueAndPublishToRos()) { + return count; } + count++; } } + static std::shared_ptr> Create( + rclcpp::Node& node, + const std::string& topic_name, + const rclcpp::QoS& qos, + const size_t rt_queue_size = 1000 + ) { + return std::make_shared>( + node.create_publisher(topic_name, qos), + moodycamel::ReaderWriterQueue(rt_queue_size) + ); + } + public: /** * Constructs a publisher. Shouldn't be called directly. Only public to enable make_shared. @@ -72,12 +92,9 @@ class Publisher : public IPublisher { * @private */ Publisher( - typename rclcpp::Publisher::SharedPtr publisher, - std::optional> converter, - moodycamel::ReaderWriterQueue&& queue - ) : publisher_(publisher), converter_(converter), queue_(std::move(queue)) {} - - ~Publisher() override = default; + typename rclcpp::Publisher::SharedPtr publisher, + moodycamel::ReaderWriterQueue&& queue + ) : publisher_(publisher), queue_(std::move(queue)) {} template bool Publish(Args&&... args) noexcept { diff --git a/include/cactus_rt/ros2/ros2_adapter.h b/include/cactus_rt/ros2/ros2_adapter.h index 991caec..1899872 100644 --- a/include/cactus_rt/ros2/ros2_adapter.h +++ b/include/cactus_rt/ros2/ros2_adapter.h @@ -7,12 +7,11 @@ #include #include #include -#include #include -#include -#include +#include #include "publisher.h" +#include "subscription.h" namespace cactus_rt::ros2 { @@ -22,7 +21,7 @@ class Ros2Adapter { /** * The time interval for which the adapter node is polling for publisher data. */ - std::chrono::microseconds timer_interval = std::chrono::microseconds(200'000); + std::chrono::microseconds timer_interval = std::chrono::milliseconds(100); }; private: @@ -36,8 +35,9 @@ class Ros2Adapter { // This means that during the timer callback, no subscribers, publishers, services, and etc. can be changed. std::mutex mut_; - // Publisher data - std::vector> publishers_; + // Publishers and subscriptions + std::vector> publishers_; + std::vector> subscriptions_; public: Ros2Adapter(const std::string& name_, const Config& config); @@ -46,31 +46,29 @@ class Ros2Adapter { return ros_node_; } - template - std::shared_ptr> CreatePublisher( - const std::string& topic_name, - const rclcpp::QoS& qos, - std::optional> converter, - size_t rt_queue_size = 1000 + template + std::shared_ptr> CreatePublisher( + const std::string& topic_name, + const rclcpp::QoS& qos, + size_t rt_queue_size = 1000 ) { - if (!converter) { - if constexpr (!(std::is_trivial_v && std::is_standard_layout_v && std::is_same_v)) { - throw std::invalid_argument("RosT and RealtimeT must be the same and must be a plain object for converter not to be specified"); - } - } + auto publisher = Publisher::Create(*ros_node_, topic_name, qos, rt_queue_size); - typename rclcpp::Publisher::SharedPtr publisher = this->ros_node_->create_publisher(topic_name, qos); - typename moodycamel::ReaderWriterQueue queue{rt_queue_size}; + const std::scoped_lock lock(mut_); + publishers_.push_back(publisher); + return publisher; + } - auto publisher_bundle = std::make_shared>( - std::move(publisher), - converter, - std::move(queue) - ); + template + std::shared_ptr> CreateSubscriptionForLatestValue( + const std::string& topic_name, + const rclcpp::QoS& qos + ) { + auto subscriber = SubscriptionLatest::Create(*this->ros_node_, topic_name, qos); const std::scoped_lock lock(mut_); - publishers_.push_back(publisher_bundle); - return publisher_bundle; + subscriptions_.push_back(subscriber); + return subscriber; } private: diff --git a/include/cactus_rt/ros2/subscription.h b/include/cactus_rt/ros2/subscription.h new file mode 100644 index 0000000..ccd1508 --- /dev/null +++ b/include/cactus_rt/ros2/subscription.h @@ -0,0 +1,90 @@ +#ifndef CACTUS_RT_ROS2_SUBSCRIPTION_H_ +#define CACTUS_RT_ROS2_SUBSCRIPTION_H_ + +#include +#include +#include + +#include "../experimental/lockless/spsc/realtime_readable_value.h" + +// Note: ROS subscription dispatch is here: https://github.com/ros2/rclcpp/blob/e10728c/rclcpp/include/rclcpp/any_subscription_callback.hpp#L481 +// We are using the TypeAdapter method. + +namespace cactus_rt::ros2 { + +class Ros2Adapter; + +class ISubscription { + public: + virtual ~ISubscription() = default; +}; + +template +struct StampedValue { + int64_t id; + RealtimeT value; + + StampedValue() = default; + StampedValue(const int64_t i, const RealtimeT& v) : id(i), value(v) {} +}; + +template +class SubscriptionLatest : public ISubscription { + friend class Ros2Adapter; + + static_assert(!CheckForTrivialRealtimeT || std::is_trivial_v, "RealtimeT must be a trivial object to be real-time safe"); + + using NoConversion = std::is_same; + using AdaptedRosType = typename std::conditional_t>; + + using RealtimeReadableValue = cactus_rt::experimental::lockless::spsc::RealtimeReadableValue>; + + typename rclcpp::Subscription::SharedPtr ros_subscription_; + int64_t current_msg_id_ = 0; + RealtimeReadableValue latest_value_; + + void SubscriptionCallback(const RealtimeT& rt_msg) { + current_msg_id_++; // Useful to detect message has changed by the real-time thread. + + // First copy + const StampedValue stamped_value(current_msg_id_, rt_msg); + + // A second copy then an allocation + // TODO: directly write to the value (perfect forwarding required). + latest_value_.Write(stamped_value); + } + + static std::shared_ptr> Create( + rclcpp::Node& node, + const std::string& topic_name, + const rclcpp::QoS& qos + ) { + auto subscription = std::make_shared>(); + + subscription->ros_subscription_ = node.create_subscription( + topic_name, + qos, + [subscription](const RealtimeT& rt_msg) { + // TODO: we are capturing the subscription shared_ptr by value. Will this cause a memory leak? + subscription->SubscriptionCallback(rt_msg); + } + ); + + return subscription; + } + + public: + /** + * @brief Do not use this method. Defined to allow make_shared to work. + * + * @private + */ + SubscriptionLatest() = default; + + StampedValue ReadLatest() noexcept { + return latest_value_.Read(); + } +}; +} // namespace cactus_rt::ros2 + +#endif diff --git a/src/cactus_rt/ros2/ros2_adapter.cc b/src/cactus_rt/ros2/ros2_adapter.cc index bb3fdf6..a1d0fbc 100644 --- a/src/cactus_rt/ros2/ros2_adapter.cc +++ b/src/cactus_rt/ros2/ros2_adapter.cc @@ -4,8 +4,6 @@ namespace cactus_rt::ros2 { -IPublisher::~IPublisher() = default; - Ros2Adapter::Ros2Adapter(const std::string& name, const Ros2Adapter::Config& config) : ros_node_(std::make_shared(name + "_ros_adapter")) { timer_ = this->ros_node_->create_wall_timer(config.timer_interval, [this] { TimerCallback(); });