diff --git a/.gitignore b/.gitignore index 01b9655..2bb4ecf 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ build/ .cache/ .vscode/settings.json +log/ +install/ diff --git a/CMakeLists.txt b/CMakeLists.txt index 313e94e..3ac3273 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -165,13 +165,13 @@ if(${CMAKE_PROJECT_NAME} STREQUAL ${PROJECT_NAME}) endif() endif() -setup_cactus_rt_target_options(cactus_rt) - # ROS 2 build support if (ENABLE_ROS2) include(cmake/ros2.cmake) endif() +setup_cactus_rt_target_options(cactus_rt) + # Build tests, examples, docs, only if this project is not embedded in another # project. if(${CMAKE_PROJECT_NAME} STREQUAL ${PROJECT_NAME}) diff --git a/cmake/ros2.cmake b/cmake/ros2.cmake index 752579b..c10d9b9 100644 --- a/cmake/ros2.cmake +++ b/cmake/ros2.cmake @@ -1,11 +1,17 @@ find_package(ament_cmake REQUIRED) find_package(rclcpp REQUIRED) +target_sources(cactus_rt + PRIVATE + src/cactus_rt/ros2/ros2_adapter.cc + src/cactus_rt/ros2/app.cc +) + ament_target_dependencies(cactus_rt PUBLIC rclcpp ) -add_subdirectory(examples/ros2_example) +add_subdirectory(examples/ros2/publisher) ament_package() diff --git a/docs/imgs/ROS2Ownership.svg b/docs/imgs/ROS2Ownership.svg new file mode 100644 index 0000000..31c3c7a --- /dev/null +++ b/docs/imgs/ROS2Ownership.svg @@ -0,0 +1,4 @@ + + + +cactus_rt::ros2::Appcactus_rt::ros2::Ros2ExecutorThreadrclcpp::SingleThreadedExecutorcactus_rt::ros2::Ros2Adapterrclcpp::Node...Publish<RealtimeT>(...)MyThread : cactus_rt::ThreadCreatePublisher(...)CreateSubscription(...)User accessiblecactus-rt internalsLegendcactus-rt ROS2 ownershipcactus_rt::ros2::Publisher<RealtimeT>rclcpp::Publisher<RosT>RealtimeToRos2Converter<RealtimeT, RosT>Lockless SPSC Queue \ No newline at end of file diff --git a/docs/imgs/ROS2Publisher.svg b/docs/imgs/ROS2Publisher.svg new file mode 100644 index 0000000..b26f2f6 --- /dev/null +++ b/docs/imgs/ROS2Publisher.svg @@ -0,0 +1,4 @@ + + + +cactus-rt ROS2 publisher flowUser's real-time threadcactus_rt::ros2::Publisher::Publish(Args&&...)Lockless SPSC queueIn-place construct the message (type RealtimeT) data directly in the FIFO queueQueue is popped by a timer registered in Ros2Adapter, which periodically drains the queueData is dequeued from the SPSC queuecactus_rt::ros2::Publisher::DequeueAndPublishToROSrclcpp::Publisher<RosT>::borrow_loan_message()User-specified RealtimeToRos2Converterrclcpp::Publisher<RosT>::publish()queue.try_dequeue()The loaned message has a type of RosT which may not equal RealtimeTThis converts RealtimeT (which is a POD) to RosT (which may not be a POD). The conversion is done directly into the loaned message. This is optionally skipped if RealtimeT == RosT and it is a POD.Finally publishes the dataUser's threadRos2 Executor Thread \ No newline at end of file diff --git a/docs/ros2.md b/docs/ros2.md new file mode 100644 index 0000000..ba11785 --- /dev/null +++ b/docs/ros2.md @@ -0,0 +1,6 @@ +ROS2 integration with cactus-rt +=============================== + +![cactus-rt ROS2 integration object ownership diagram](imgs/ROS2Ownership.svg) + +![cactus-rt ROS2 publisher data flow](imgs/ROS2Publisher.svg) diff --git a/examples/ros2/publisher/CMakeLists.txt b/examples/ros2/publisher/CMakeLists.txt new file mode 100644 index 0000000..ba7dce9 --- /dev/null +++ b/examples/ros2/publisher/CMakeLists.txt @@ -0,0 +1,22 @@ +find_package(std_msgs REQUIRED) + +add_executable(rt_ros2_publisher + main.cc +) + +target_link_libraries(rt_ros2_publisher + PRIVATE + cactus_rt +) + +setup_cactus_rt_target_options(rt_ros2_publisher) + +ament_target_dependencies(rt_ros2_publisher + PUBLIC + std_msgs +) + +install( + TARGETS rt_ros2_publisher + DESTINATION lib/${PROJECT_NAME} +) diff --git a/examples/ros2/publisher/main.cc b/examples/ros2/publisher/main.cc new file mode 100644 index 0000000..d38739f --- /dev/null +++ b/examples/ros2/publisher/main.cc @@ -0,0 +1,87 @@ +#include +#include + +#include +#include +#include +#include +#include + +using cactus_rt::CyclicThread; +using cactus_rt::ros2::App; + +struct RealtimeData { + int64_t data; + + explicit RealtimeData(int64_t d) : data(d) {} +}; +using RosData = std_msgs::msg::Int64; + +namespace { +void RealtimeToROS2ConverterFunc(const RealtimeData& rt_data, RosData& ros_data) { + // A bit of a silly example, but gets the point across. + ros_data.data = rt_data.data; +} +} // namespace + +class RealtimeROS2PublisherThread : public CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin { + int64_t loop_counter_ = 0; + std::shared_ptr> publisher_; + + static cactus_rt::CyclicThreadConfig CreateThreadConfig() { + cactus_rt::CyclicThreadConfig thread_config; + thread_config.period_ns = 1'000'000; + thread_config.cpu_affinity = std::vector{2}; + thread_config.SetFifoScheduler(80); + + // thread_config.tracer_config.trace_sleep = true; + thread_config.tracer_config.trace_wakeup_latency = true; + return thread_config; + } + + public: + RealtimeROS2PublisherThread(const char* name) : CyclicThread(name, CreateThreadConfig()) {} + + void InitializeForRos2() override { + publisher_ = ros2_adapter_->CreatePublisher("/hello", rclcpp::QoS(10), RealtimeToROS2ConverterFunc); + } + + int64_t GetLoopCounter() const { + return loop_counter_; + } + + protected: + bool Loop(int64_t /*now*/) noexcept final { + loop_counter_++; + if (loop_counter_ % 1000 == 0) { + LOG_INFO(Logger(), "Loop {}", loop_counter_); + + const auto span = Tracer().WithSpan("Publish"); + publisher_->Publish(loop_counter_); + } + return false; + } +}; + +int main(int argc, char* argv[]) { + rclcpp::init(argc, argv); + + App app; + app.StartTraceSession("build/data.perfetto"); + + auto thread = app.CreateROS2EnabledThread("RTROS2PublisherThread"); + app.RegisterThread(thread); + + constexpr unsigned int time = 30; + std::cout << "Testing RT loop for " << time << " seconds.\n"; + + app.Start(); + + std::this_thread::sleep_for(std::chrono::seconds(time)); + + app.RequestStop(); + app.Join(); + + std::cout << "Number of loops executed: " << thread->GetLoopCounter() << "\n"; + return 0; +} diff --git a/examples/ros2_example/CMakeLists.txt b/examples/ros2_example/CMakeLists.txt deleted file mode 100644 index 8942035..0000000 --- a/examples/ros2_example/CMakeLists.txt +++ /dev/null @@ -1,15 +0,0 @@ -add_executable(rt_ros2_example - main.cc -) - -target_link_libraries(rt_ros2_example - PRIVATE - cactus_rt -) - -setup_cactus_rt_target_options(rt_ros2_example) - -install( - TARGETS rt_ros2_example - DESTINATION lib/${PROJECT_NAME} -) diff --git a/examples/ros2_example/main.cc b/examples/ros2_example/main.cc deleted file mode 100644 index 4cce7f6..0000000 --- a/examples/ros2_example/main.cc +++ /dev/null @@ -1,3 +0,0 @@ -int main() { - return 0; -} diff --git a/include/cactus_rt/app.h b/include/cactus_rt/app.h index 345ebad..c1e66c6 100644 --- a/include/cactus_rt/app.h +++ b/include/cactus_rt/app.h @@ -70,11 +70,22 @@ class App { */ void RegisterThread(std::shared_ptr thread); + /** + * @brief Sets up the trace aggregator. Call this before starting the thread + * if you don't want to call RegisterThread and maintain tracing capabilities. + */ + void SetupTraceAggregator(Thread& thread); + /** * @brief Starts the app by locking the memory and reserving the memory. Also * start all the threads in registration order. + * + * @param start_monotonic_time_ns The start time of the monotonic clock, if + * you prefer to set a time. Mostly used for child class of App to keep a + * consistent start time. Leave as default or pass -1 for the function to + * automatically determine a time (i.e. most situations). */ - virtual void Start(); + virtual void Start(int64_t start_monotonic_time_ns = -1); /** * @brief sends RequestStop to all threads in registration order. diff --git a/include/cactus_rt/ros2/app.h b/include/cactus_rt/ros2/app.h new file mode 100644 index 0000000..53bbf13 --- /dev/null +++ b/include/cactus_rt/ros2/app.h @@ -0,0 +1,68 @@ +#ifndef CACTUS_RT_ROS2_APP_H_ +#define CACTUS_RT_ROS2_APP_H_ +#include +#include +#include + +#include "../app.h" +#include "ros2_adapter.h" + +namespace cactus_rt::ros2 { + +class Ros2ThreadMixin { + protected: + std::shared_ptr ros2_adapter_; + + public: + void SetRos2Adapter(std::shared_ptr ros2_adapter) { + ros2_adapter_ = ros2_adapter; + } + + virtual void InitializeForRos2() = 0; + virtual ~Ros2ThreadMixin() = 0; +}; + +class Ros2ExecutorThread : public cactus_rt::Thread, public cactus_rt::ros2::Ros2ThreadMixin { + std::optional executor_; + + static cactus_rt::ThreadConfig CreateThreadConfig(); + + public: + Ros2ExecutorThread(); + + void Run() override; + + void InitializeForRos2() override {} +}; + +class App : public cactus_rt::App { + std::shared_ptr ros2_adapter_; + std::shared_ptr ros2_executor_thread_; + + public: + explicit App( + std::string name = "RTROS2App", + cactus_rt::AppConfig config = cactus_rt::AppConfig(), + Ros2Adapter::Config ros2_adapter_config = Ros2Adapter::Config() + ); + + template + std::shared_ptr CreateROS2EnabledThread(Args&&... args) { + static_assert(std::is_base_of_v, "Must derive ROS2 thread from Ros2ThreadMixin"); + std::shared_ptr thread = std::make_shared(std::forward(args)...); + + thread->SetRos2Adapter(ros2_adapter_); + thread->InitializeForRos2(); + + return thread; + } + + void Start(int64_t start_monotonic_time_ns = -1) override; + + void RequestStop() override; + + void Join() override; +}; + +} // namespace cactus_rt::ros2 +#endif diff --git a/include/cactus_rt/ros2/publisher.h b/include/cactus_rt/ros2/publisher.h new file mode 100644 index 0000000..2855629 --- /dev/null +++ b/include/cactus_rt/ros2/publisher.h @@ -0,0 +1,92 @@ +#ifndef CACTUS_RT_ROS2_PUBLISHER_H_ +#define CACTUS_RT_ROS2_PUBLISHER_H_ + +#include + +#include +#include +#include + +namespace cactus_rt::ros2 { +class Ros2Adapter; + +template +using RealtimeToROS2Converter = std::function; + +template +using Ros2ToRealtimeConverter = std::function; + +class IPublisher { + friend class Ros2Adapter; + + virtual bool DequeueAndPublishToRos() = 0; + virtual void FullyDrainAndPublishToRos() = 0; + + public: + virtual ~IPublisher() = 0; +}; + +template +class Publisher : public IPublisher { + typename rclcpp::Publisher::SharedPtr publisher_; + std::optional> converter_; + moodycamel::ReaderWriterQueue queue_; + + bool DequeueAndPublishToRos() override { + RealtimeT rt_msg; + + const bool has_data = queue_.try_dequeue(rt_msg); + if (!has_data) { + return false; + } + + if (converter_) { + auto loaned_msg = publisher_->borrow_loaned_message(); + converter_.value()(rt_msg, loaned_msg.get()); + publisher_->publish(std::move(loaned_msg)); + } else { + if constexpr (std::is_same_v) { + const auto* ros_msg = static_cast(rt_msg); + publisher_->publish(*ros_msg); + } else { + throw std::invalid_argument{"converter not specified but RealtimeT and RosT are not the same?!"}; + } + } + + return true; + } + + void FullyDrainAndPublishToRos() override { + while (true) { + const auto has_data = DequeueAndPublishToRos(); + if (!has_data) { + break; + } + } + } + + public: + /** + * Constructs a publisher. Shouldn't be called directly. Only public to enable make_shared. + * + * @private + */ + Publisher( + typename rclcpp::Publisher::SharedPtr publisher, + std::optional> converter, + moodycamel::ReaderWriterQueue&& queue + ) : publisher_(publisher), converter_(converter), queue_(std::move(queue)) {} + + ~Publisher() override = default; + + template + bool Publish(Args&&... args) noexcept { + const bool success = queue_.try_emplace(std::forward(args)...); + // TODO: Keep track of success/failed messages and expose that to be queried + return success; + } +}; + +} // namespace cactus_rt::ros2 + +#endif diff --git a/include/cactus_rt/ros2/ros2_adapter.h b/include/cactus_rt/ros2/ros2_adapter.h index 69149fa..991caec 100644 --- a/include/cactus_rt/ros2/ros2_adapter.h +++ b/include/cactus_rt/ros2/ros2_adapter.h @@ -1,4 +1,83 @@ +#ifndef CACTUS_RT_ROS2_ROS2_ADAPTER_H_ +#define CACTUS_RT_ROS2_ROS2_ADAPTER_H_ + +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "publisher.h" + namespace cactus_rt::ros2 { + class Ros2Adapter { + public: + struct Config { + /** + * The time interval for which the adapter node is polling for publisher data. + */ + std::chrono::microseconds timer_interval = std::chrono::microseconds(200'000); + }; + + private: + std::string node_name_; + + std::shared_ptr ros_node_; + + rclcpp::TimerBase::SharedPtr timer_; + + // Protects the vector of subscribers, publishers, and services + // This means that during the timer callback, no subscribers, publishers, services, and etc. can be changed. + std::mutex mut_; + + // Publisher data + std::vector> publishers_; + + public: + Ros2Adapter(const std::string& name_, const Config& config); + + std::shared_ptr Node() const { + return ros_node_; + } + + template + std::shared_ptr> CreatePublisher( + const std::string& topic_name, + const rclcpp::QoS& qos, + std::optional> converter, + size_t rt_queue_size = 1000 + ) { + if (!converter) { + if constexpr (!(std::is_trivial_v && std::is_standard_layout_v && std::is_same_v)) { + throw std::invalid_argument("RosT and RealtimeT must be the same and must be a plain object for converter not to be specified"); + } + } + + typename rclcpp::Publisher::SharedPtr publisher = this->ros_node_->create_publisher(topic_name, qos); + typename moodycamel::ReaderWriterQueue queue{rt_queue_size}; + + auto publisher_bundle = std::make_shared>( + std::move(publisher), + converter, + std::move(queue) + ); + + const std::scoped_lock lock(mut_); + publishers_.push_back(publisher_bundle); + return publisher_bundle; + } + + private: + void TimerCallback(); + void DrainQueues(); }; + } // namespace cactus_rt::ros2 + +#endif diff --git a/package.xml b/package.xml index 6196c81..f25f185 100644 --- a/package.xml +++ b/package.xml @@ -11,6 +11,8 @@ ament_cmake + + std_msgs protobuf-dev protobuf gtest diff --git a/src/cactus_rt/app.cc b/src/cactus_rt/app.cc index 954d032..fb8b2ac 100644 --- a/src/cactus_rt/app.cc +++ b/src/cactus_rt/app.cc @@ -16,8 +16,12 @@ using FileSink = cactus_rt::tracing::FileSink; namespace cactus_rt { +void App::SetupTraceAggregator(Thread& thread) { + thread.SetTraceAggregator(trace_aggregator_); +} + void App::RegisterThread(std::shared_ptr thread) { - thread->SetTraceAggregator(trace_aggregator_); + SetupTraceAggregator(*thread); threads_.push_back(thread); } @@ -40,12 +44,15 @@ App::~App() { quill::flush(); } -void App::Start() { +void App::Start(int64_t start_monotonic_time_ns) { LockMemory(); ReserveHeap(); StartQuill(); - auto start_monotonic_time_ns = NowNs(); + if (start_monotonic_time_ns == -1) { + start_monotonic_time_ns = NowNs(); + } + for (auto& thread : threads_) { thread->Start(start_monotonic_time_ns); } diff --git a/src/cactus_rt/ros2/app.cc b/src/cactus_rt/ros2/app.cc new file mode 100644 index 0000000..db39dd3 --- /dev/null +++ b/src/cactus_rt/ros2/app.cc @@ -0,0 +1,71 @@ +#include "cactus_rt/ros2/app.h" + +#include +#include + +#include "cactus_rt//utils.h" +#include "cactus_rt/app.h" +#include "cactus_rt/config.h" +#include "cactus_rt/ros2/ros2_adapter.h" +#include "cactus_rt/thread.h" + +namespace cactus_rt::ros2 { + +cactus_rt::ThreadConfig Ros2ExecutorThread::CreateThreadConfig() { + cactus_rt::CyclicThreadConfig thread_config; + return thread_config; +} + +Ros2ExecutorThread::Ros2ExecutorThread() : Thread("ROS2ExecutorThread", CreateThreadConfig()) {} + +void Ros2ExecutorThread::Run() { + rclcpp::ExecutorOptions options; + auto node_ptr = ros2_adapter_->Node()->get_node_base_interface(); + options.context = node_ptr->get_context(); + executor_.emplace(options); + executor_->add_node(node_ptr); + while (!StopRequested()) { + executor_->spin_once(); + } + + // Execute one more time to ensure everything is processed. + executor_->spin_once(); + + executor_->remove_node(node_ptr); +} + +App::App(std::string name, cactus_rt::AppConfig config, Ros2Adapter::Config ros2_adapter_config) + : cactus_rt::App(name, config), + ros2_adapter_(std::make_shared(name, ros2_adapter_config)) { + ros2_executor_thread_ = CreateROS2EnabledThread(); + SetupTraceAggregator(*ros2_executor_thread_); +} + +void App::Start(int64_t start_monotonic_time_ns) { + // Start the Ros2ExecutorThread first. Don't think it is 100% necessary but why not get a head start. + if (start_monotonic_time_ns == -1) { + start_monotonic_time_ns = NowNs(); + } + + ros2_executor_thread_->Start(start_monotonic_time_ns); + cactus_rt::App::Start(start_monotonic_time_ns); +} + +void App::RequestStop() { + // Stop all the registered threads first, then stop the Ros2ExecutorThread + // after the normal threads are joined to ensure we flush messages. See App::Join. + cactus_rt::App::RequestStop(); +} + +void App::Join() { + // Join the registered threads first. + cactus_rt::App::Join(); + + // At this point, all threads are stopped. We need to stop the ros2 executor + // thread as well. + ros2_executor_thread_->RequestStop(); + ros2_executor_thread_->Join(); +} + +Ros2ThreadMixin::~Ros2ThreadMixin() = default; +} // namespace cactus_rt::ros2 diff --git a/src/cactus_rt/ros2/ros2_adapter.cc b/src/cactus_rt/ros2/ros2_adapter.cc new file mode 100644 index 0000000..a3bfcdf --- /dev/null +++ b/src/cactus_rt/ros2/ros2_adapter.cc @@ -0,0 +1,31 @@ +#include "cactus_rt/ros2/ros2_adapter.h" + +#include + +namespace cactus_rt::ros2 { + +IPublisher::~IPublisher() = default; + +Ros2Adapter::Ros2Adapter(const std::string& name, const Ros2Adapter::Config& config) + : ros_node_(std::make_shared(name + "_ros_adapter")) { + timer_ = this->ros_node_->create_wall_timer(config.timer_interval, [this] { TimerCallback(); }); +} + +void Ros2Adapter::TimerCallback() { + DrainQueues(); +} + +void Ros2Adapter::DrainQueues() { + std::scoped_lock lock(mut_); + + for (const auto& publisher : publishers_) { + // Hopefully the thread is not publishing so quickly that a single + // publisher monopolizes all resources. That said, if that happens the + // program is likely in bigger trouble anyway. + // + // TODO: make it so we dequeue once. + publisher->FullyDrainAndPublishToRos(); + } +} + +} // namespace cactus_rt::ros2