From 0fc7a1ac8a8d3d57bd429ea92e66c1aece6b3648 Mon Sep 17 00:00:00 2001 From: Shuhao Wu Date: Thu, 1 Aug 2024 22:55:31 -0400 Subject: [PATCH] Ability to publish to ROS2 from a real-time thread --- .gitignore | 2 + CMakeLists.txt | 4 +- cmake/ros2.cmake | 8 +- docs/imgs/ROS2Ownership.svg | 4 + docs/imgs/ROS2Publisher.svg | 4 + docs/ros2.md | 6 + examples/ros2/publisher/CMakeLists.txt | 22 ++++ examples/ros2/publisher/main.cc | 91 +++++++++++++++ examples/ros2_example/CMakeLists.txt | 15 --- examples/ros2_example/main.cc | 3 - examples/tracing_example/main.cc | 6 +- include/cactus_rt/app.h | 13 ++- include/cactus_rt/ros2/app.h | 65 +++++++++++ include/cactus_rt/ros2/ros2_adapter.h | 154 +++++++++++++++++++++++++ package.xml | 2 + src/cactus_rt/app.cc | 13 ++- src/cactus_rt/ros2/app.cc | 67 +++++++++++ src/cactus_rt/ros2/ros2_adapter.cc | 34 ++++++ 18 files changed, 485 insertions(+), 28 deletions(-) create mode 100644 docs/imgs/ROS2Ownership.svg create mode 100644 docs/imgs/ROS2Publisher.svg create mode 100644 docs/ros2.md create mode 100644 examples/ros2/publisher/CMakeLists.txt create mode 100644 examples/ros2/publisher/main.cc delete mode 100644 examples/ros2_example/CMakeLists.txt delete mode 100644 examples/ros2_example/main.cc create mode 100644 include/cactus_rt/ros2/app.h create mode 100644 src/cactus_rt/ros2/app.cc create mode 100644 src/cactus_rt/ros2/ros2_adapter.cc diff --git a/.gitignore b/.gitignore index 01b9655..2bb4ecf 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ build/ .cache/ .vscode/settings.json +log/ +install/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 313e94e..3ac3273 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -165,13 +165,13 @@ if(${CMAKE_PROJECT_NAME} STREQUAL ${PROJECT_NAME}) endif() endif() -setup_cactus_rt_target_options(cactus_rt) - # ROS 2 build support if (ENABLE_ROS2) include(cmake/ros2.cmake) endif() +setup_cactus_rt_target_options(cactus_rt) + # Build tests, examples, docs, only if this project is not embedded in another # project. if(${CMAKE_PROJECT_NAME} STREQUAL ${PROJECT_NAME}) diff --git a/cmake/ros2.cmake b/cmake/ros2.cmake index 752579b..c10d9b9 100644 --- a/cmake/ros2.cmake +++ b/cmake/ros2.cmake @@ -1,11 +1,17 @@ find_package(ament_cmake REQUIRED) find_package(rclcpp REQUIRED) +target_sources(cactus_rt + PRIVATE + src/cactus_rt/ros2/ros2_adapter.cc + src/cactus_rt/ros2/app.cc +) + ament_target_dependencies(cactus_rt PUBLIC rclcpp ) -add_subdirectory(examples/ros2_example) +add_subdirectory(examples/ros2/publisher) ament_package() diff --git a/docs/imgs/ROS2Ownership.svg b/docs/imgs/ROS2Ownership.svg new file mode 100644 index 0000000..31c3c7a --- /dev/null +++ b/docs/imgs/ROS2Ownership.svg @@ -0,0 +1,4 @@ + + + +
cactus_rt::ros2::App
cactus_rt::ros2::
Ros2ExecutorThread
rclcpp::
SingleThreadedExecutor
cactus_rt::ros2::
Ros2Adapter
rclcpp::Node
...
Publish<RealtimeT>(...)
MyThread : cactus_rt::Thread
CreatePublisher(...)
CreateSubscription(...)
User accessible
cactus-rt internals
Legend
cactus-rt ROS2 ownership
cactus_rt::ros2::
Publisher<RealtimeT>
rclcpp::Publisher<RosT>
RealtimeToRos2Converter
<RealtimeT, RosT>
Lockless SPSC Queue
\ No newline at end of file diff --git a/docs/imgs/ROS2Publisher.svg b/docs/imgs/ROS2Publisher.svg new file mode 100644 index 0000000..3296f9e --- /dev/null +++ b/docs/imgs/ROS2Publisher.svg @@ -0,0 +1,4 @@ + + + +
cactus-rt ROS2 publisher flow
User's real-time thread
cactus_rt::ros2::
Publisher::
Publish(Args&&...)
Lockless SPSC queue
In-place construct the message (type RealtimeT) data directly in the FIFO queue
cactus_rt::ros2::
Publisher::
DequeueFromRT
Queue is popped by a timer registered in Ros2Adapter, which periodically drains the queue
Data is dequeued as a const void* to erase the types
Data is passed as const void* and casted back to RealtimeT.
cactus_rt::ros2::Publisher::PublishToRos
rclcpp::Publisher<RosT>::borrow_loan_message()
User-specified RealtimeToRos2Converter
rclcpp::Publisher<RosT>::publish()
The loaned message has a type of RosT which may not equal RealtimeT
This converts RealtimeT (which is a POD) to RosT (which may not be a POD). The conversion is done directly into the loaned message.
Finally publishes the data
User's thread
Ros2 Executor Thread
\ No newline at end of file diff --git a/docs/ros2.md b/docs/ros2.md new file mode 100644 index 0000000..ba11785 --- /dev/null +++ b/docs/ros2.md @@ -0,0 +1,6 @@ +ROS2 integration with cactus-rt +=============================== + +![cactus-rt ROS2 integration object ownership diagram](imgs/ROS2Ownership.svg) + +![cactus-rt ROS2 publisher data flow](imgs/ROS2Publisher.svg) diff --git a/examples/ros2/publisher/CMakeLists.txt b/examples/ros2/publisher/CMakeLists.txt new file mode 100644 index 0000000..ba7dce9 --- /dev/null +++ b/examples/ros2/publisher/CMakeLists.txt @@ -0,0 +1,22 @@ +find_package(std_msgs REQUIRED) + +add_executable(rt_ros2_publisher + main.cc +) + +target_link_libraries(rt_ros2_publisher + PRIVATE + cactus_rt +) + +setup_cactus_rt_target_options(rt_ros2_publisher) + +ament_target_dependencies(rt_ros2_publisher + PUBLIC + std_msgs +) + +install( + TARGETS rt_ros2_publisher + DESTINATION lib/${PROJECT_NAME} +) diff --git a/examples/ros2/publisher/main.cc b/examples/ros2/publisher/main.cc new file mode 100644 index 0000000..d4101cd --- /dev/null +++ b/examples/ros2/publisher/main.cc @@ -0,0 +1,91 @@ +#include +#include + +#include +#include +#include +#include +#include + +using cactus_rt::CyclicThread; +using cactus_rt::ros2::App; + +struct RealtimeData { + int64_t data; + + RealtimeData() = default; + RealtimeData(int64_t d) : data(d) {} +}; +using RosData = std_msgs::msg::Int64; + +namespace { +void RealtimeToROS2ConverterFunc(const RealtimeData& rt_data, RosData& ros_data) { + // A bit of a silly example, but gets the point across. + ros_data.data = rt_data.data; +} +} // namespace + +/** + * This is a no-op thread that does nothing at 1 kHz. + */ +class RealtimeROS2PublisherThread : public CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin { + int64_t loop_counter_ = 0; + std::shared_ptr> publisher_; + + static cactus_rt::CyclicThreadConfig CreateThreadConfig() { + cactus_rt::CyclicThreadConfig thread_config; + thread_config.period_ns = 1'000'000; + thread_config.cpu_affinity = std::vector{2}; + thread_config.SetFifoScheduler(80); + + // thread_config.tracer_config.trace_sleep = true; + thread_config.tracer_config.trace_wakeup_latency = true; + return thread_config; + } + + public: + RealtimeROS2PublisherThread(const char* name) : CyclicThread(name, CreateThreadConfig()) {} + + void InitializeForRos2() override { + publisher_ = ros2_adapter_->CreatePublisher("/hello", rclcpp::QoS(10), RealtimeToROS2ConverterFunc); + } + + int64_t GetLoopCounter() const { + return loop_counter_; + } + + protected: + bool Loop(int64_t /*now*/) noexcept final { + loop_counter_++; + if (loop_counter_ % 1000 == 0) { + LOG_INFO(Logger(), "Loop {}", loop_counter_); + + const auto span = Tracer().WithSpan("Publish"); + publisher_->Publish(loop_counter_); + } + return false; + } +}; + +int main(int argc, char* argv[]) { + rclcpp::init(argc, argv); + + App app; + app.StartTraceSession("build/data.perfetto"); + + auto thread = app.CreateThread("RTROS2PublisherThread"); + app.RegisterThread(thread); + + constexpr unsigned int time = 30; + std::cout << "Testing RT loop for " << time << " seconds.\n"; + + app.Start(); + + std::this_thread::sleep_for(std::chrono::seconds(time)); + + app.RequestStop(); + app.Join(); + + std::cout << "Number of loops executed: " << thread->GetLoopCounter() << "\n"; + return 0; +} diff --git a/examples/ros2_example/CMakeLists.txt b/examples/ros2_example/CMakeLists.txt deleted file mode 100644 index 8942035..0000000 --- a/examples/ros2_example/CMakeLists.txt +++ /dev/null @@ -1,15 +0,0 @@ -add_executable(rt_ros2_example - main.cc -) - -target_link_libraries(rt_ros2_example - PRIVATE - cactus_rt -) - -setup_cactus_rt_target_options(rt_ros2_example) - -install( - TARGETS rt_ros2_example - DESTINATION lib/${PROJECT_NAME} -) diff --git a/examples/ros2_example/main.cc b/examples/ros2_example/main.cc deleted file mode 100644 index 4cce7f6..0000000 --- a/examples/ros2_example/main.cc +++ /dev/null @@ -1,3 +0,0 @@ -int main() { - return 0; -} diff --git a/examples/tracing_example/main.cc b/examples/tracing_example/main.cc index 1d8e0d7..4f9064a 100644 --- a/examples/tracing_example/main.cc +++ b/examples/tracing_example/main.cc @@ -13,7 +13,7 @@ void WasteTime(std::chrono::microseconds duration) { } } -class ExampleRTThread : public CyclicThread { +class RealtimeROS2PublisherThread : public CyclicThread { int64_t loop_counter_ = 0; static cactus_rt::CyclicThreadConfig CreateThreadConfig() { @@ -28,7 +28,7 @@ class ExampleRTThread : public CyclicThread { } public: - ExampleRTThread() : CyclicThread("ExampleRTThread", CreateThreadConfig()) {} + RealtimeROS2PublisherThread() : CyclicThread("ExampleRTThread", CreateThreadConfig()) {} int64_t GetLoopCounter() const { return loop_counter_; @@ -98,7 +98,7 @@ int main() { cactus_rt::AppConfig app_config; app_config.tracer_config.trace_aggregator_cpu_affinity = {0}; // doesn't work yet - auto thread1 = std::make_shared(); + auto thread1 = std::make_shared(); auto thread2 = std::make_shared(); App app("TracingExampleApp", app_config); app.RegisterThread(thread1); diff --git a/include/cactus_rt/app.h b/include/cactus_rt/app.h index 345ebad..c1e66c6 100644 --- a/include/cactus_rt/app.h +++ b/include/cactus_rt/app.h @@ -70,11 +70,22 @@ class App { */ void RegisterThread(std::shared_ptr thread); + /** + * @brief Sets up the trace aggregator. Call this before starting the thread + * if you don't want to call RegisterThread and maintain tracing capabilities. + */ + void SetupTraceAggregator(Thread& thread); + /** * @brief Starts the app by locking the memory and reserving the memory. Also * start all the threads in registration order. + * + * @param start_monotonic_time_ns The start time of the monotonic clock, if + * you prefer to set a time. Mostly used for child class of App to keep a + * consistent start time. Leave as default or pass -1 for the function to + * automatically determine a time (i.e. most situations). */ - virtual void Start(); + virtual void Start(int64_t start_monotonic_time_ns = -1); /** * @brief sends RequestStop to all threads in registration order. diff --git a/include/cactus_rt/ros2/app.h b/include/cactus_rt/ros2/app.h new file mode 100644 index 0000000..9094ab4 --- /dev/null +++ b/include/cactus_rt/ros2/app.h @@ -0,0 +1,65 @@ +#ifndef CACTUS_RT_ROS2_APP_H_ +#define CACTUS_RT_ROS2_APP_H_ +#include + +#include "../app.h" +#include "ros2_adapter.h" + +namespace cactus_rt::ros2 { + +class Ros2ThreadMixin { + protected: + std::shared_ptr ros2_adapter_; + + public: + void SetRos2Adapter(std::shared_ptr ros2_adapter) { + ros2_adapter_ = ros2_adapter; + } + + virtual void InitializeForRos2() = 0; + virtual ~Ros2ThreadMixin() = 0; +}; + +class Ros2ExecutorThread : public cactus_rt::Thread, public cactus_rt::ros2::Ros2ThreadMixin { + std::optional executor_; + + static cactus_rt::ThreadConfig CreateThreadConfig(); + + public: + Ros2ExecutorThread(); + + void Run() override; + + void InitializeForRos2() override {} +}; + +class App : public cactus_rt::App { + std::shared_ptr ros2_adapter_; + std::shared_ptr ros2_executor_thread_; + + public: + explicit App( + std::string name = "RTROS2App", + cactus_rt::AppConfig config = cactus_rt::AppConfig(), + Ros2Adapter::Config ros2_adapter_config = Ros2Adapter::Config() + ); + + template + std::shared_ptr CreateThread(Args&&... args) { + std::shared_ptr thread = std::make_shared(std::forward(args)...); + + thread->SetRos2Adapter(ros2_adapter_); + thread->InitializeForRos2(); + + return thread; + } + + void Start(int64_t start_monotonic_time_ns = -1) override; + + void RequestStop() override; + + void Join() override; +}; + +} // namespace cactus_rt::ros2 +#endif diff --git a/include/cactus_rt/ros2/ros2_adapter.h b/include/cactus_rt/ros2/ros2_adapter.h index 69149fa..12167c5 100644 --- a/include/cactus_rt/ros2/ros2_adapter.h +++ b/include/cactus_rt/ros2/ros2_adapter.h @@ -1,4 +1,158 @@ +#ifndef CACTUS_RT_ROS2_ROS2_ADAPTER_H_ +#define CACTUS_RT_ROS2_ROS2_ADAPTER_H_ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + namespace cactus_rt::ros2 { + +class Ros2Adapter; + +template +using RealtimeToROS2Converter = std::function; + +template +using Ros2ToRealtimeConverter = std::function; + +class IPublisher { + friend class Ros2Adapter; + + // TODO: there are other rclcpp::Publisher::publish methods that can implement. + virtual void PublishToRos(const void* rt_msg) const = 0; + virtual std::pair DequeueFromRT() = 0; + + public: + virtual ~IPublisher() = 0; +}; + +template +class Publisher : public IPublisher { + typename rclcpp::Publisher::SharedPtr publisher_; + std::optional> converter_; + moodycamel::ReaderWriterQueue queue_; + + // This is used in dequeue from RT to + RealtimeT data_; + + void PublishToRos(const void* rt_msg) const override { + const auto* rt_msg_typed = static_cast(rt_msg); + + if (converter_) { + auto loaned_msg = publisher_->borrow_loaned_message(); + converter_.value()(*rt_msg_typed, loaned_msg.get()); + publisher_->publish(std::move(loaned_msg)); + } else { + if constexpr (std::is_same_v) { + const auto* ros_msg_typed = static_cast(rt_msg_typed); + publisher_->publish(*ros_msg_typed); + } else { + throw std::invalid_argument{"converter not specified but RealtimeT and RosT are not the same?!"}; + } + } + } + + std::pair DequeueFromRT() override { + // One copy + bool ok = queue_.try_dequeue(data_); + return std::make_pair(ok, &data_); + } + + public: + /** + * Constructs a publisher. Shouldn't be called directly. Only public to enable make_shared. + * + * @private + */ + Publisher( + typename rclcpp::Publisher::SharedPtr publisher, + std::optional> converter, + moodycamel::ReaderWriterQueue&& queue + ) : publisher_(publisher), converter_(converter), queue_(std::move(queue)) {} + + ~Publisher() override = default; + + template + bool Publish(Args&&... args) noexcept { + bool success = queue_.try_emplace(std::forward(args)...); + // TODO: Keep track of success/failed messages and expose that to be queried + return success; + } +}; + class Ros2Adapter { + public: + struct Config { + /** + * The time interval for which the adapter node is polling for publisher data. + */ + std::chrono::microseconds timer_interval = std::chrono::microseconds(200'000); + }; + + private: + std::string node_name_; + + std::shared_ptr ros_node_; + + rclcpp::TimerBase::SharedPtr timer_; + + // Protects the vector of subscribers, publishers, and services + // This means that during the timer callback, no subscribers, publishers, services, and etc. can be changed. + std::mutex mut_; + + // Publisher data + // TODO: the shared_ptr is supposed to give this to the real-time thread, but not sure if this is the best idea. + std::vector> publishers_; + + public: + Ros2Adapter(const std::string& name_, const Config& config); + + std::shared_ptr Node() const { + return ros_node_; + } + + template + std::shared_ptr> CreatePublisher( + const std::string& topic_name, + const rclcpp::QoS& qos, + std::optional> converter, + size_t rt_queue_size = 1000 + ) { + if (!converter) { + if constexpr (!(std::is_trivial_v && std::is_standard_layout_v && std::is_same_v)) { + throw std::invalid_argument("RosT and RealtimeT must be the same and must be a plain object for converter not to be specified"); + } + } + + typename rclcpp::Publisher::SharedPtr publisher = this->ros_node_->create_publisher(topic_name, qos); + typename moodycamel::ReaderWriterQueue queue{rt_queue_size}; + + auto publisher_bundle = std::make_shared>( + std::move(publisher), + converter, + std::move(queue) + ); + + const std::scoped_lock lock(mut_); + publishers_.push_back(publisher_bundle); + return publisher_bundle; + } + + private: + void TimerCallback(); + void DrainQueues(); }; + } // namespace cactus_rt::ros2 + +#endif diff --git a/package.xml b/package.xml index 6196c81..f25f185 100644 --- a/package.xml +++ b/package.xml @@ -11,6 +11,8 @@ ament_cmake + + std_msgs protobuf-dev protobuf gtest diff --git a/src/cactus_rt/app.cc b/src/cactus_rt/app.cc index 954d032..fb8b2ac 100644 --- a/src/cactus_rt/app.cc +++ b/src/cactus_rt/app.cc @@ -16,8 +16,12 @@ using FileSink = cactus_rt::tracing::FileSink; namespace cactus_rt { +void App::SetupTraceAggregator(Thread& thread) { + thread.SetTraceAggregator(trace_aggregator_); +} + void App::RegisterThread(std::shared_ptr thread) { - thread->SetTraceAggregator(trace_aggregator_); + SetupTraceAggregator(*thread); threads_.push_back(thread); } @@ -40,12 +44,15 @@ App::~App() { quill::flush(); } -void App::Start() { +void App::Start(int64_t start_monotonic_time_ns) { LockMemory(); ReserveHeap(); StartQuill(); - auto start_monotonic_time_ns = NowNs(); + if (start_monotonic_time_ns == -1) { + start_monotonic_time_ns = NowNs(); + } + for (auto& thread : threads_) { thread->Start(start_monotonic_time_ns); } diff --git a/src/cactus_rt/ros2/app.cc b/src/cactus_rt/ros2/app.cc new file mode 100644 index 0000000..0de2d02 --- /dev/null +++ b/src/cactus_rt/ros2/app.cc @@ -0,0 +1,67 @@ +#include "cactus_rt/ros2/app.h" + +#include +#include + +#include "cactus_rt//utils.h" +#include "cactus_rt/app.h" +#include "cactus_rt/config.h" +#include "cactus_rt/ros2/ros2_adapter.h" +#include "cactus_rt/thread.h" + +namespace cactus_rt::ros2 { + +cactus_rt::ThreadConfig Ros2ExecutorThread::CreateThreadConfig() { + cactus_rt::CyclicThreadConfig thread_config; + return thread_config; +} + +Ros2ExecutorThread::Ros2ExecutorThread() : Thread("ROS2ExecutorThread", CreateThreadConfig()) {} + +void Ros2ExecutorThread::Run() { + rclcpp::ExecutorOptions options; + auto node_ptr = ros2_adapter_->Node()->get_node_base_interface(); + options.context = node_ptr->get_context(); + executor_.emplace(options); + executor_->add_node(node_ptr); + while (!StopRequested()) { + executor_->spin_once(); + } + + // Execute one more time to ensure everything is processed. + executor_->spin_once(); + + executor_->remove_node(node_ptr); +} + +App::App(std::string name, cactus_rt::AppConfig config, Ros2Adapter::Config ros2_adapter_config) + : cactus_rt::App(name, config), + ros2_adapter_(std::make_shared(name, ros2_adapter_config)) { + ros2_executor_thread_ = CreateThread(); + SetupTraceAggregator(*ros2_executor_thread_); +} + +void App::Start(int64_t start_monotonic_time_ns) { + // Start the Ros2ExecutorThread first. Don't think it is 100% necessary but why not get a head start. + if (start_monotonic_time_ns == -1) { + start_monotonic_time_ns = NowNs(); + } + + ros2_executor_thread_->Start(start_monotonic_time_ns); + cactus_rt::App::Start(start_monotonic_time_ns); +} + +void App::RequestStop() { + // Stop all the registered threads first, then stop the Ros2ExecutorThread to ensure we flush messages; + cactus_rt::App::RequestStop(); + ros2_executor_thread_->RequestStop(); +} + +void App::Join() { + // Join the registered threads first, then join the Ros2ExecutorThread last. + cactus_rt::App::Join(); + ros2_executor_thread_->Join(); +} + +Ros2ThreadMixin::~Ros2ThreadMixin() = default; +} // namespace cactus_rt::ros2 diff --git a/src/cactus_rt/ros2/ros2_adapter.cc b/src/cactus_rt/ros2/ros2_adapter.cc new file mode 100644 index 0000000..ac79e13 --- /dev/null +++ b/src/cactus_rt/ros2/ros2_adapter.cc @@ -0,0 +1,34 @@ +#include "cactus_rt/ros2/ros2_adapter.h" + +#include + +namespace cactus_rt::ros2 { + +IPublisher::~IPublisher() = default; + +Ros2Adapter::Ros2Adapter(const std::string& name, const Ros2Adapter::Config& config) + : ros_node_(std::make_shared(name + "_ros_adapter")) { + timer_ = this->ros_node_->create_wall_timer(config.timer_interval, [this] { TimerCallback(); }); +} + +void Ros2Adapter::TimerCallback() { + DrainQueues(); +} + +void Ros2Adapter::DrainQueues() { + for (const auto& publisher : publishers_) { + // TODO: hopefully the thread is not publishing so quickly that a single + // publisher monopolizes all resources. That said, if that happens the + // program is likely in bigger trouble anyway. + while (true) { + const auto [has_data, rt_msg_ptr] = publisher->DequeueFromRT(); + if (!has_data) { + break; + } + + publisher->PublishToRos(rt_msg_ptr); + } + } +} + +} // namespace cactus_rt::ros2