Skip to content

Commit

Permalink
Added publishing messages at a certain rate to the comms bridge. (#775)
Browse files Browse the repository at this point in the history
* Added rate to comms bridge.

* making gnc/ekf the full rate since that what gs users need

---------

Co-authored-by: Marina Moreira <67443181+marinagmoreira@users.noreply.github.com>
  • Loading branch information
kbrowne15 and marinagmoreira authored May 3, 2024
1 parent 98c61b2 commit 6da4ac0
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 40 deletions.
23 changes: 14 additions & 9 deletions astrobee/config/communications/comms_bridge.config
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,20 @@ links = {
-- and relay_both (to be relayed in both directions). Providing all three fields gives the user
-- full directional control while minimizing repetition and copy/paste errors.

-- Each topic entry can contain an input topic and an output topic. The
-- in topic is the topic being published on the robot sending the data and
-- must be specified. The optional out topic is the name of the topic a
-- user wants the data published on in the receiving robot. If the
-- out topic is not specified, the comms bridge will set it to be the name
-- of the robot sending the data combined with the in topic name. For
-- instance, if the from robot was Bumble and the to robot was Honey and
-- one of the in topics in the relay forward list was "mgt/ack", then it
-- would be published on Honey on topic "bumble/mgt/ack".
-- Each topic entry can contain an input topic, output topic, and a rate.
-- In topic: Required. This is the topic being published on the robot
-- sending the data.
-- Out topic: Optional. This is the name of the topic a user wants the data
-- published on in the receiving robot. If the out topic is not specified,
-- the comms bridge will set it to be the name of the robot sending the data
-- combined with the in topic name. For instance, if the from robot was
-- Bumble and the to robot was Honey and one of the in topics in the relay
-- forward list was "mgt/ack", then it would be published on Honey on topic
-- "bumble/mgt/ack".
-- Rate: Optional. The rate in seconds at which a user wants the topic
-- published to the receiving robot. Please omit the rate, if the topic
-- should be published at the same rate it is published on the robot.

-- Please note that only one unique in topic can exist in the relay forward
-- and relay both lists and the relay backward and relay both lists. It is
-- fine to have the same in topic in the relay forward and relay backward
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,58 @@

namespace ff {

class TopicEntry {
public:
TopicEntry() : connecting_robot_(""),
out_topic_(""),
rate_seconds_(0),
last_time_pub_(0),
seq_num_(0),
type_md5_sum_(""),
data_size_(0),
data_(NULL) {}
TopicEntry(std::string robot_in, std::string topic_in, double rate_in) :
connecting_robot_(robot_in),
out_topic_(topic_in),
rate_seconds_(rate_in),
last_time_pub_(0),
seq_num_(0),
type_md5_sum_(""),
data_size_(0),
data_(NULL) {}

void SetDataToSend(const int seq_num,
std::string const& md5_sum,
const size_t data_size,
uint8_t const* data) {
seq_num_ = seq_num;
type_md5_sum_ = md5_sum;
data_size_ = data_size;
data_ = data;
}

std::string connecting_robot_;
std::string out_topic_;
double rate_seconds_;

// Info only needed for rate messages. A little wasted space but quicker
// program execution
double last_time_pub_;
int seq_num_;
std::string type_md5_sum_;
size_t data_size_;
uint8_t const* data_;
};

class GenericROSSubRapidPub : public BridgeSubscriber {
public:
GenericROSSubRapidPub();
explicit GenericROSSubRapidPub(ros::NodeHandle const* nh);
~GenericROSSubRapidPub();

void AddTopics(std::map<std::string,
std::vector<std::pair<std::string, std::string>>> const& link_entries);
std::vector<std::shared_ptr<TopicEntry>>> const& link_entries);

float FloatMod(float x, float y);

void InitializeDDS(std::vector<std::string> const& connections);

Expand All @@ -62,11 +107,16 @@ class GenericROSSubRapidPub : public BridgeSubscriber {
void ConvertRequest(rapid::ext::astrobee::GenericCommsRequest const* data,
std::string const& connecting_robot);

void CheckPubMsg(const ros::TimerEvent& event);

private:
bool dds_initialized_;
bool dds_initialized_, pub_rate_msgs_;

ros::Timer rate_msg_pub_timer_;

std::map<std::string, std::vector<std::pair<std::string, std::string>>> topic_mapping_;
std::map<std::string, std::vector<std::shared_ptr<TopicEntry>>> topic_mapping_;
std::map<std::string, GenericRapidPubPtr> robot_connections_;
std::vector<std::shared_ptr<TopicEntry>> rate_topic_entries_;
};

} // end namespace ff
Expand Down
15 changes: 11 additions & 4 deletions communications/comms_bridge/src/comms_bridge_nodelet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet {
}
ROS_INFO_STREAM("Comms Bridge Nodelet: agent name " << agent_name_);

ros_sub_ = std::make_shared<ff::GenericROSSubRapidPub>();
ros_sub_ = std::make_shared<ff::GenericROSSubRapidPub>(nh);

int fake_argc = 1;

Expand Down Expand Up @@ -367,6 +367,7 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet {
int &num_topics) {
config_reader::ConfigReader::Table relay_table, relay_item;
std::string in_topic, out_topic;
double rate = -1.0;
if (link_table.GetTable(table_name.c_str(), &relay_table)) {
num_topics += relay_table.GetSize();
for (size_t i = 1; i <= relay_table.GetSize(); i++) {
Expand All @@ -380,10 +381,16 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet {
out_topic = current_robot_ns + in_topic;
}

if (!relay_item.GetReal("rate", &rate)) {
rate = -1.0;
}

// Save all output topics under the same in topic since we don't want
// to subscribe to the same topic multiple times
link_entries_[in_topic].push_back(std::make_pair(connection_robot,
out_topic));
link_entries_[in_topic].push_back(
std::make_shared<ff::TopicEntry>(connection_robot,
out_topic,
rate));
}
}
}
Expand All @@ -401,7 +408,7 @@ class CommsBridgeNodelet : public ff_util::FreeFlyerNodelet {
std::shared_ptr<ff::GenericROSSubRapidPub> ros_sub_;

std::string agent_name_, participant_name_;
std::map<std::string, std::vector<std::pair<std::string, std::string>>> link_entries_;
std::map<std::string, std::vector<std::shared_ptr<ff::TopicEntry>>> link_entries_;
ros::ServiceServer dds_initialize_srv_;
};

Expand Down
130 changes: 107 additions & 23 deletions communications/comms_bridge/src/generic_ros_sub_rapid_pub.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,63 @@

namespace ff {

GenericROSSubRapidPub::GenericROSSubRapidPub() : dds_initialized_(false) {}
GenericROSSubRapidPub::GenericROSSubRapidPub(ros::NodeHandle const* nh) :
dds_initialized_(false), pub_rate_msgs_(false) {
// Setup timer for checking when to publish messages that need to be published
// at a certain rate. The timer will trigger at 10Hz
rate_msg_pub_timer_ = nh->createTimer(ros::Rate(10.0),
&GenericROSSubRapidPub::CheckPubMsg,
this,
false,
false);
}

GenericROSSubRapidPub::~GenericROSSubRapidPub() {}

void GenericROSSubRapidPub::AddTopics(
std::map<std::string, std::vector<std::pair<std::string, std::string>>> const& link_entries) {
void GenericROSSubRapidPub::AddTopics(std::map<std::string,
std::vector<std::shared_ptr<TopicEntry>>> const& link_entries) {
std::string in_topic, primary_out_topic;
// Make sure dds is initialized before adding topics
if (dds_initialized_) {
for (auto it = link_entries.begin(); it != link_entries.end(); ++it) {
in_topic = it->first;
// Use the first out_topic we read in as the out topic the base class uses
primary_out_topic = it->second[0].second;
primary_out_topic = it->second[0]->out_topic_;
// Save all robot/out topic pairs so that the bridge can pass the correct
// advertisement info and content message to each roboot that needs it
topic_mapping_[primary_out_topic] = it->second;

// Check if there is a valid rate that a message needs to be published at
for (size_t i = 0; i < it->second.size(); i++) {
if (it->second[i]->rate_seconds_ > 0) {
pub_rate_msgs_ = true;
rate_topic_entries_.push_back(it->second[i]);
if (FloatMod(it->second[i]->rate_seconds_, 0.1) != 0) {
ROS_ERROR("Rate for %s is not divisible by 0.1 and will not be published at the desired rate.\n",
primary_out_topic.c_str());
}
}
}

// Add topic to base class
ROS_INFO("Adding topic %s to base class.", in_topic.c_str());
addTopic(in_topic, primary_out_topic);
}
} else {
ROS_ERROR("Comms Bridge: Cannot add topics until dds is initialized.\n");
}

// If we have messages that get published at a certain rate, start the timer
// that checks when to publish the rate messages
if (pub_rate_msgs_) {
rate_msg_pub_timer_.start();
}
}

float GenericROSSubRapidPub::FloatMod(float x, float y) {
int n = static_cast<int>(x/y);
float result = x - n * y;
return result;
}

void GenericROSSubRapidPub::InitializeDDS(std::vector<std::string> const& connections) {
Expand Down Expand Up @@ -83,18 +117,20 @@ void GenericROSSubRapidPub::advertiseTopic(const RelayTopicInfo& relay_info) {
}

for (size_t i = 0; i < topic_mapping_[out_topic].size(); ++i) {
robot_name = topic_mapping_[out_topic][i].first;
robot_out_topic = topic_mapping_[out_topic][i].second;
robot_name = topic_mapping_[out_topic][i]->connecting_robot_;
robot_out_topic = topic_mapping_[out_topic][i]->out_topic_;

ROS_INFO("Robot name: %s Robot out topic: %s\n", robot_name.c_str(), robot_out_topic.c_str());
ROS_INFO("Robot name: %s Robot out topic: %s\n",
robot_name.c_str(),
robot_out_topic.c_str());

// Check robot connection exists
if (robot_connections_.find(robot_name) == robot_connections_.end()) {
ROS_ERROR("Comms Bridge: No connection for %s.\n", robot_name.c_str());
continue;
}

robot_connections_[robot_name]->SendAdvertisementInfo(out_topic,
robot_connections_[robot_name]->SendAdvertisementInfo(robot_out_topic,
info.latching,
info.data_type,
info.md5_sum,
Expand All @@ -117,22 +153,33 @@ void GenericROSSubRapidPub::relayMessage(const RelayTopicInfo& topic_info,
}

for (size_t i = 0; i < topic_mapping_[out_topic].size(); ++i) {
robot_name = topic_mapping_[out_topic][i].first;
robot_out_topic = topic_mapping_[out_topic][i].second;

ROS_INFO("Robot name: %s Robot out topic: %s\n", robot_name.c_str(), robot_out_topic.c_str());
robot_name = topic_mapping_[out_topic][i]->connecting_robot_;
robot_out_topic = topic_mapping_[out_topic][i]->out_topic_;

ROS_INFO("Robot name: %s Robot out topic: %s\n",
robot_name.c_str(),
robot_out_topic.c_str());

// Check the message is a rate message. If so, don't publish the message.
// Save the data so it can be sent when it needs to be sent.
if (topic_mapping_[out_topic][i]->rate_seconds_ > 0) {
topic_mapping_[out_topic][i]->SetDataToSend(topic_info.relay_seqnum,
content_info.type_md5_sum,
content_info.data_size,
content_info.data);
} else {
// Check robot connection exists
if (robot_connections_.find(robot_name) == robot_connections_.end()) {
ROS_ERROR("Comms Bridge: No connection for %s.\n", robot_name.c_str());
continue;
}

// Check robot connection exists
if (robot_connections_.find(robot_name) == robot_connections_.end()) {
ROS_ERROR("Comms Bridge: No connection for %s.\n", robot_name.c_str());
continue;
robot_connections_[robot_name]->SendContent(robot_out_topic,
content_info.type_md5_sum,
content_info.data,
content_info.data_size,
topic_info.relay_seqnum);
}

robot_connections_[robot_name]->SendContent(out_topic,
content_info.type_md5_sum,
content_info.data,
content_info.data_size,
topic_info.relay_seqnum);
}
}

Expand All @@ -155,7 +202,7 @@ void GenericROSSubRapidPub::ConvertRequest(
// If it is not the keyed topic, try to find it.
for (auto it = topic_mapping_.begin(); it != topic_mapping_.end() && !found; it++) {
for (size_t i = 0; it->second.size() && !found; i++) {
if (robot_out_topic == it->second[i].second) {
if (robot_out_topic == it->second[i]->out_topic_) {
out_topic = it->first;
found = true;
}
Expand Down Expand Up @@ -199,4 +246,41 @@ void GenericROSSubRapidPub::ConvertRequest(
info.definition);
}

void GenericROSSubRapidPub::CheckPubMsg(const ros::TimerEvent& event) {
double time_now = ros::Time::now().toSec();
std::string robot_name, robot_out_topic;
std::shared_ptr<TopicEntry> topic_entry;
ROS_ERROR_STREAM("Time now in seconds is: " << time_now);
// Go through all rate messages and publish them if it is time to
for (size_t i = 0; i < rate_topic_entries_.size(); i++) {
topic_entry = rate_topic_entries_[i];
double time_diff = time_now - topic_entry->last_time_pub_;
// Check if it is time to publish the rate nmessage.
// Add a little time buffer since this timer isn't tailored specifically to
// the message
if ((time_diff + 0.05) > topic_entry->rate_seconds_) {
robot_name = topic_entry->connecting_robot_;
robot_out_topic = topic_entry->out_topic_;

ROS_ERROR("Rate message for topic %s is being published.",
robot_out_topic.c_str());

// Check robot connection exists
if (robot_connections_.find(robot_name) == robot_connections_.end()) {
ROS_ERROR("Comms bridge: No connection for %s.\n", robot_name.c_str());
continue;
}

robot_connections_[robot_name]->SendContent(robot_out_topic,
topic_entry->type_md5_sum_,
topic_entry->data_,
topic_entry->data_size_,
topic_entry->seq_num_);

// Set time published for message
topic_entry->last_time_pub_ = time_now;
}
}
}

} // end namespace ff

0 comments on commit 6da4ac0

Please sign in to comment.