Skip to content

Commit

Permalink
Ability to publish to ROS2 from a real-time thread
Browse files Browse the repository at this point in the history
  • Loading branch information
shuhaowu committed Aug 2, 2024
1 parent 01c9f0e commit ebb25db
Show file tree
Hide file tree
Showing 12 changed files with 450 additions and 7 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,5 @@ build/
.cache/

.vscode/settings.json
log/
install/
4 changes: 2 additions & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,13 @@ if(${CMAKE_PROJECT_NAME} STREQUAL ${PROJECT_NAME})
endif()
endif()

setup_cactus_rt_target_options(cactus_rt)

# ROS 2 build support
if (ENABLE_ROS2)
include(cmake/ros2.cmake)
endif()

setup_cactus_rt_target_options(cactus_rt)

# Build tests, examples, docs, only if this project is not embedded in another
# project.
if(${CMAKE_PROJECT_NAME} STREQUAL ${PROJECT_NAME})
Expand Down
6 changes: 6 additions & 0 deletions cmake/ros2.cmake
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
find_package(ament_cmake REQUIRED)
find_package(rclcpp REQUIRED)

target_sources(cactus_rt
PRIVATE
src/cactus_rt/ros2/ros2_adapter.cc
src/cactus_rt/ros2/app.cc
)

ament_target_dependencies(cactus_rt
PUBLIC
rclcpp
Expand Down
7 changes: 7 additions & 0 deletions examples/ros2_example/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
find_package(std_msgs REQUIRED)

add_executable(rt_ros2_example
main.cc
)
Expand All @@ -9,6 +11,11 @@ target_link_libraries(rt_ros2_example

setup_cactus_rt_target_options(rt_ros2_example)

ament_target_dependencies(rt_ros2_example
PUBLIC
std_msgs
)

install(
TARGETS rt_ros2_example
DESTINATION lib/${PROJECT_NAME}
Expand Down
90 changes: 89 additions & 1 deletion examples/ros2_example/main.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,91 @@
int main() {
#include <cactus_rt/ros2/app.h>
#include <cactus_rt/rt.h>

#include <chrono>
#include <iostream>
#include <memory>
#include <std_msgs/msg/int64.hpp>
#include <thread>

using cactus_rt::CyclicThread;
using cactus_rt::ros2::App;

struct RealtimeData {
int64_t data;

RealtimeData() = default;
RealtimeData(int64_t d) : data(d) {}
};
using RosData = std_msgs::msg::Int64;

namespace {
void RealtimeToROS2ConverterFunc(const RealtimeData& rt_data, RosData& ros_data) {
// A bit of a silly example, but gets the point across.
ros_data.data = rt_data.data;
}
} // namespace

/**
* This is a no-op thread that does nothing at 1 kHz.
*/
class ExampleRTThread : public CyclicThread, public cactus_rt::ros2::Ros2ThreadMixin {
int64_t loop_counter_ = 0;
std::shared_ptr<cactus_rt::ros2::Publisher<RealtimeData, RosData>> publisher_;

static cactus_rt::CyclicThreadConfig CreateThreadConfig() {
cactus_rt::CyclicThreadConfig thread_config;
thread_config.period_ns = 1'000'000;
thread_config.cpu_affinity = std::vector<size_t>{2};
thread_config.SetFifoScheduler(80);

// thread_config.tracer_config.trace_sleep = true;
thread_config.tracer_config.trace_wakeup_latency = true;
return thread_config;
}

public:
ExampleRTThread(const char* name) : CyclicThread(name, CreateThreadConfig()) {}

void InitializeForRos2() override {
publisher_ = ros2_adapter_->CreatePublisher<RealtimeData, RosData>("/hello", rclcpp::QoS(10), RealtimeToROS2ConverterFunc);
}

int64_t GetLoopCounter() const {
return loop_counter_;
}

protected:
bool Loop(int64_t /*now*/) noexcept final {
loop_counter_++;
if (loop_counter_ % 1000 == 0) {
LOG_INFO(Logger(), "Loop {}", loop_counter_);

const auto span = Tracer().WithSpan("Publish");
publisher_->Publish(loop_counter_);
}
return false;
}
};

int main(int argc, char* argv[]) {
rclcpp::init(argc, argv);

App app;
app.StartTraceSession("build/data.perfetto");

auto thread = app.CreateThread<ExampleRTThread>("ExampleRTThread");
app.RegisterThread(thread);

constexpr unsigned int time = 30;
std::cout << "Testing RT loop for " << time << " seconds.\n";

app.Start();

std::this_thread::sleep_for(std::chrono::seconds(time));

app.RequestStop();
app.Join();

std::cout << "Number of loops executed: " << thread->GetLoopCounter() << "\n";
return 0;
}
13 changes: 12 additions & 1 deletion include/cactus_rt/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,22 @@ class App {
*/
void RegisterThread(std::shared_ptr<Thread> thread);

/**
* @brief Sets up the trace aggregator. Call this before starting the thread
* if you don't want to call RegisterThread and maintain tracing capabilities.
*/
void SetupTraceAggregator(Thread& thread);

/**
* @brief Starts the app by locking the memory and reserving the memory. Also
* start all the threads in registration order.
*
* @param start_monotonic_time_ns The start time of the monotonic clock, if
* you prefer to set a time. Mostly used for child class of App to keep a
* consistent start time. Leave as default or pass -1 for the function to
* automatically determine a time (i.e. most situations).
*/
virtual void Start();
virtual void Start(int64_t start_monotonic_time_ns = -1);

/**
* @brief sends RequestStop to all threads in registration order.
Expand Down
65 changes: 65 additions & 0 deletions include/cactus_rt/ros2/app.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#ifndef CACTUS_RT_ROS2_APP_H_
#define CACTUS_RT_ROS2_APP_H_
#include <memory>

#include "../app.h"
#include "ros2_adapter.h"

namespace cactus_rt::ros2 {

class Ros2ThreadMixin {
protected:
std::shared_ptr<Ros2Adapter> ros2_adapter_;

public:
void SetRos2Adapter(std::shared_ptr<Ros2Adapter> ros2_adapter) {
ros2_adapter_ = ros2_adapter;
}

virtual void InitializeForRos2() = 0;
virtual ~Ros2ThreadMixin() = 0;
};

class Ros2ExecutorThread : public cactus_rt::Thread, public cactus_rt::ros2::Ros2ThreadMixin {
std::optional<rclcpp::executors::SingleThreadedExecutor> executor_;

static cactus_rt::ThreadConfig CreateThreadConfig();

public:
Ros2ExecutorThread();

void Run() override;

void InitializeForRos2() override {}
};

class App : public cactus_rt::App {
std::shared_ptr<Ros2Adapter> ros2_adapter_;
std::shared_ptr<Ros2ExecutorThread> ros2_executor_thread_;

public:
explicit App(
std::string name = "RTROS2App",
cactus_rt::AppConfig config = cactus_rt::AppConfig(),
Ros2Adapter::Config ros2_adapter_config = Ros2Adapter::Config()
);

template <typename ThreadT, typename... Args>
std::shared_ptr<ThreadT> CreateThread(Args&&... args) {
std::shared_ptr<ThreadT> thread = std::make_shared<ThreadT>(std::forward<Args>(args)...);

thread->SetRos2Adapter(ros2_adapter_);
thread->InitializeForRos2();

return thread;
}

void Start(int64_t start_monotonic_time_ns = -1) override;

void RequestStop() override;

void Join() override;
};

} // namespace cactus_rt::ros2
#endif
154 changes: 154 additions & 0 deletions include/cactus_rt/ros2/ros2_adapter.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,158 @@
#ifndef CACTUS_RT_ROS2_ROS2_ADAPTER_H_
#define CACTUS_RT_ROS2_ROS2_ADAPTER_H_

#include <readerwriterqueue.h>

#include <chrono>
#include <functional>
#include <memory>
#include <mutex>
#include <optional>
#include <rclcpp/rclcpp.hpp>
#include <stdexcept>
#include <string>
#include <type_traits>
#include <utility>
#include <vector>

namespace cactus_rt::ros2 {

class Ros2Adapter;

template <typename RealtimeT, typename RosT>
using RealtimeToROS2Converter = std::function<void(const RealtimeT&, RosT&)>;

template <typename RealtimeT, typename RosT>
using Ros2ToRealtimeConverter = std::function<RealtimeT(const RosT&)>;

class IPublisher {
friend class Ros2Adapter;

// TODO: there are other rclcpp::Publisher::publish methods that can implement.
virtual void PublishToRos(const void* rt_msg) const = 0;
virtual std::pair<bool, const void* const> DequeueFromRT() = 0;

public:
virtual ~IPublisher() = 0;
};

template <typename RealtimeT, typename RosT>
class Publisher : public IPublisher {
typename rclcpp::Publisher<RosT>::SharedPtr publisher_;
std::optional<RealtimeToROS2Converter<RealtimeT, RosT>> converter_;
moodycamel::ReaderWriterQueue<RealtimeT> queue_;

// This is used in dequeue from RT to
RealtimeT data_;

void PublishToRos(const void* rt_msg) const override {
const auto* rt_msg_typed = static_cast<const RealtimeT*>(rt_msg);

if (converter_) {
auto loaned_msg = publisher_->borrow_loaned_message();
converter_.value()(*rt_msg_typed, loaned_msg.get());
publisher_->publish(std::move(loaned_msg));
} else {
if constexpr (std::is_same_v<RealtimeT, RosT>) {
const auto* ros_msg_typed = static_cast<const RosT*>(rt_msg_typed);
publisher_->publish(*ros_msg_typed);
} else {
throw std::invalid_argument{"converter not specified but RealtimeT and RosT are not the same?!"};
}
}
}

std::pair<bool, const void* const> DequeueFromRT() override {
// One copy
bool ok = queue_.try_dequeue(data_);
return std::make_pair(ok, &data_);
}

public:
/**
* Constructs a publisher. Shouldn't be called directly. Only public to enable make_shared.
*
* @private
*/
Publisher(
typename rclcpp::Publisher<RosT>::SharedPtr publisher,
std::optional<RealtimeToROS2Converter<RealtimeT, RosT>> converter,
moodycamel::ReaderWriterQueue<RealtimeT>&& queue
) : publisher_(publisher), converter_(converter), queue_(std::move(queue)) {}

~Publisher() override = default;

template <typename... Args>
bool Publish(Args&&... args) noexcept {
bool success = queue_.try_emplace(std::forward<Args>(args)...);
// TODO: Keep track of success/failed messages and expose that to be queried
return success;
}
};

class Ros2Adapter {
public:
struct Config {
/**
* The time interval for which the adapter node is polling for publisher data.
*/
std::chrono::microseconds timer_interval = std::chrono::microseconds(200'000);
};

private:
std::string node_name_;

std::shared_ptr<rclcpp::Node> ros_node_;

rclcpp::TimerBase::SharedPtr timer_;

// Protects the vector of subscribers, publishers, and services
// This means that during the timer callback, no subscribers, publishers, services, and etc. can be changed.
std::mutex mut_;

// Publisher data
// TODO: the shared_ptr is supposed to give this to the real-time thread, but not sure if this is the best idea.
std::vector<std::shared_ptr<IPublisher>> publishers_;

public:
Ros2Adapter(const std::string& name_, const Config& config);

std::shared_ptr<rclcpp::Node> Node() const {
return ros_node_;
}

template <typename RealtimeT, typename RosT>
std::shared_ptr<Publisher<RealtimeT, RosT>> CreatePublisher(
const std::string& topic_name,
const rclcpp::QoS& qos,
std::optional<RealtimeToROS2Converter<RealtimeT, RosT>> converter,
size_t rt_queue_size = 1000
) {
if (!converter) {
if constexpr (!(std::is_trivial_v<RosT> && std::is_standard_layout_v<RosT> && std::is_same_v<RosT, RealtimeT>)) {
throw std::invalid_argument("RosT and RealtimeT must be the same and must be a plain object for converter not to be specified");
}
}

typename rclcpp::Publisher<RosT>::SharedPtr publisher = this->ros_node_->create_publisher<RosT>(topic_name, qos);
typename moodycamel::ReaderWriterQueue<RealtimeT> queue{rt_queue_size};

auto publisher_bundle = std::make_shared<Publisher<RealtimeT, RosT>>(
std::move(publisher),
converter,
std::move(queue)
);

const std::scoped_lock lock(mut_);
publishers_.push_back(publisher_bundle);
return publisher_bundle;
}

private:
void TimerCallback();
void DrainQueues();
};

} // namespace cactus_rt::ros2

#endif
Loading

0 comments on commit ebb25db

Please sign in to comment.