Skip to content

Commit

Permalink
Merge pull request #127 from cactusdynamics/publish-loss
Browse files Browse the repository at this point in the history
Add a warning when ROS2 publish fails
  • Loading branch information
shuhaowu authored Aug 22, 2024
2 parents 93254fc + d9bceac commit 3997bd8
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 6 deletions.
13 changes: 12 additions & 1 deletion include/cactus_rt/ros2/publisher.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
#include <rclcpp/rclcpp.hpp>
#include <type_traits>

#include "quill/Quill.h"

namespace cactus_rt::ros2 {
class Ros2Adapter;

Expand All @@ -29,6 +31,7 @@ class Publisher : public IPublisher {
using NoConversion = std::is_same<RealtimeT, RosT>;
using AdaptedRosType = typename std::conditional_t<NoConversion::value, RosT, rclcpp::TypeAdapter<RealtimeT, RosT>>;

quill::Logger* logger_;
typename rclcpp::Publisher<AdaptedRosType>::SharedPtr publisher_;
moodycamel::ReaderWriterQueue<RealtimeT> queue_;

Expand Down Expand Up @@ -74,12 +77,14 @@ class Publisher : public IPublisher {
}

static std::shared_ptr<Publisher<RealtimeT, RosT, CheckForTrivialRealtimeT>> Create(
quill::Logger* logger,
rclcpp::Node& node,
const std::string& topic_name,
const rclcpp::QoS& qos,
const size_t rt_queue_size = 1000
) {
return std::make_shared<Publisher<RealtimeT, RosT, CheckForTrivialRealtimeT>>(
logger,
node.create_publisher<AdaptedRosType>(topic_name, qos),
moodycamel::ReaderWriterQueue<RealtimeT>(rt_queue_size)
);
Expand All @@ -92,14 +97,20 @@ class Publisher : public IPublisher {
* @private
*/
Publisher(
quill::Logger* logger,
typename rclcpp::Publisher<AdaptedRosType>::SharedPtr publisher,
moodycamel::ReaderWriterQueue<RealtimeT>&& queue
) : publisher_(publisher), queue_(std::move(queue)) {}
) : logger_(logger), publisher_(publisher), queue_(std::move(queue)) {}

template <typename... Args>
bool Publish(Args&&... args) noexcept {
const bool success = queue_.try_emplace(std::forward<Args>(args)...);
// TODO: Keep track of success/failed messages and expose that to be queried

if (!success) {
LOG_WARNING_LIMIT(std::chrono::seconds(5), logger_, "failed to publish to {} due to full queue", publisher_->get_topic_name());
}

return success;
}
};
Expand Down
7 changes: 5 additions & 2 deletions include/cactus_rt/ros2/ros2_adapter.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <vector>

#include "publisher.h"
#include "quill/Quill.h"
#include "subscription.h"

namespace cactus_rt::ros2 {
Expand Down Expand Up @@ -43,6 +44,8 @@ class Ros2Adapter {
std::vector<std::shared_ptr<IPublisher>> publishers_;
std::vector<std::shared_ptr<ISubscription>> subscriptions_;

quill::Logger* logger_;

public:
Ros2Adapter(const std::string& name_, const Config& config);

Expand All @@ -56,7 +59,7 @@ class Ros2Adapter {
const rclcpp::QoS& qos,
size_t rt_queue_size = 1000
) {
auto publisher = Publisher<RealtimeT, RosT, CheckForTrivialRealtimeT>::Create(*ros_node_, topic_name, qos, rt_queue_size);
auto publisher = Publisher<RealtimeT, RosT, CheckForTrivialRealtimeT>::Create(logger_, *ros_node_, topic_name, qos, rt_queue_size);

const std::scoped_lock lock(mut_);
publishers_.push_back(publisher);
Expand All @@ -68,7 +71,7 @@ class Ros2Adapter {
const std::string& topic_name,
const rclcpp::QoS& qos
) {
auto subscriber = SubscriptionLatest<RealtimeT, RosT, CheckForTrivialRealtimeT>::Create(*this->ros_node_, topic_name, qos);
auto subscriber = SubscriptionLatest<RealtimeT, RosT, CheckForTrivialRealtimeT>::Create(logger_, *this->ros_node_, topic_name, qos);

const std::scoped_lock lock(mut_);
subscriptions_.push_back(subscriber);
Expand Down
7 changes: 5 additions & 2 deletions include/cactus_rt/ros2/subscription.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <type_traits>

#include "../experimental/lockless/spsc/realtime_readable_value.h"
#include "quill/Quill.h"

// Note: ROS subscription dispatch is here: https://github.com/ros2/rclcpp/blob/e10728c/rclcpp/include/rclcpp/any_subscription_callback.hpp#L481
// We are using the TypeAdapter method.
Expand Down Expand Up @@ -39,6 +40,7 @@ class SubscriptionLatest : public ISubscription {

using RealtimeReadableValue = cactus_rt::experimental::lockless::spsc::RealtimeReadableValue<StampedValue<RealtimeT>>;

quill::Logger* logger_;
typename rclcpp::Subscription<AdaptedRosType>::SharedPtr ros_subscription_;
int64_t current_msg_id_ = 0;
RealtimeReadableValue latest_value_;
Expand All @@ -55,11 +57,12 @@ class SubscriptionLatest : public ISubscription {
}

static std::shared_ptr<SubscriptionLatest<RealtimeT, RosT, CheckForTrivialRealtimeT>> Create(
quill::Logger* logger,
rclcpp::Node& node,
const std::string& topic_name,
const rclcpp::QoS& qos
) {
auto subscription = std::make_shared<SubscriptionLatest<RealtimeT, RosT, CheckForTrivialRealtimeT>>();
auto subscription = std::make_shared<SubscriptionLatest<RealtimeT, RosT, CheckForTrivialRealtimeT>>(logger);

subscription->ros_subscription_ = node.create_subscription<AdaptedRosType>(
topic_name,
Expand All @@ -79,7 +82,7 @@ class SubscriptionLatest : public ISubscription {
*
* @private
*/
SubscriptionLatest() = default;
explicit SubscriptionLatest(quill::Logger* logger) : logger_(logger) {}

StampedValue<RealtimeT> ReadLatest() noexcept {
return latest_value_.Read();
Expand Down
3 changes: 2 additions & 1 deletion src/cactus_rt/ros2/ros2_adapter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
namespace cactus_rt::ros2 {

Ros2Adapter::Ros2Adapter(const std::string& name, const Ros2Adapter::Config& config)
: ros_node_(std::make_shared<rclcpp::Node>(name + "_ros_adapter")) {
: ros_node_(std::make_shared<rclcpp::Node>(name + "_ros_adapter")),
logger_(quill::create_logger("Ros2Adapter")) {
timer_ = this->ros_node_->create_wall_timer(config.timer_interval, [this] { TimerCallback(); });
}

Expand Down

0 comments on commit 3997bd8

Please sign in to comment.