Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update GraphCache for services & clients along with remaining graph introspection methods #90

Merged
merged 15 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
182 changes: 156 additions & 26 deletions rmw_zenoh_cpp/src/detail/graph_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,14 @@ void GraphCache::parse_put(const std::string & keyexpr)
auto add_topic_data =
[this](const Entity & entity, GraphNode & graph_node) -> void
{
if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) {
if (entity.type() != EntityType::Publisher &&
entity.type() != EntityType::Subscription &&
entity.type() != EntityType::Service &&
entity.type() != EntityType::Client)
{
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"add_topic_data() for invalid EntityType. Report this.");
return;
}

Expand All @@ -84,12 +91,31 @@ void GraphCache::parse_put(const std::string & keyexpr)
}

const liveliness::TopicInfo topic_info = entity.topic_info().value();
GraphNode::TopicMap & topic_map = entity.type() ==
EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_;
const std::string entity_desc = entity.type() ==
EntityType::Publisher ? "publisher" : "subscription";
const std::size_t pub_count = entity.type() == EntityType::Publisher ? 1 : 0;
std::string entity_desc = "";
GraphNode::TopicMap & topic_map =
[&]() -> GraphNode::TopicMap &
clalancette marked this conversation as resolved.
Show resolved Hide resolved
{
if (entity.type() == EntityType::Publisher) {
entity_desc = "publisher";
return graph_node.pubs_;
} else if (entity.type() == EntityType::Subscription) {
entity_desc = "subscription";
return graph_node.subs_;
} else if (entity.type() == EntityType::Service) {
entity_desc = "service";
return graph_node.services_;

} else {
entity_desc = "client";
return graph_node.clients_;
}
}();
// For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent.
// Similarly, subscriptions and services are equivalent.
clalancette marked this conversation as resolved.
Show resolved Hide resolved
const std::size_t pub_count = entity.type() == EntityType::Publisher ||
entity.type() == EntityType::Client ? 1 : 0;
const std::size_t sub_count = !pub_count;

TopicDataPtr graph_topic_data = std::make_shared<TopicData>(
topic_info,
TopicStats{pub_count, sub_count});
Expand All @@ -115,14 +141,18 @@ void GraphCache::parse_put(const std::string & keyexpr)
}

// Bookkeeping: Update graph_topics_ which keeps track of topics across all nodes in the graph
GraphNode::TopicMap::iterator cache_topic_it = graph_topics_.find(topic_info.name_);
if (cache_topic_it == graph_topics_.end()) {
GraphNode::TopicMap & graph_endpoints =
entity.type() == EntityType::Publisher || entity.type() == EntityType::Subscription ?
graph_topics_ :
graph_services_;
GraphNode::TopicMap::iterator cache_topic_it = graph_endpoints.find(topic_info.name_);
if (cache_topic_it == graph_endpoints.end()) {
// First time this topic name is added to the graph.
std::shared_ptr<TopicData> topic_data_ptr = std::make_shared<TopicData>(
topic_info,
TopicStats{pub_count, sub_count}
);
graph_topics_[topic_info.name_] = GraphNode::TopicDataMap{
graph_endpoints[topic_info.name_] = GraphNode::TopicDataMap{
{topic_info.type_, topic_data_ptr}
};
} else {
Expand Down Expand Up @@ -153,6 +183,7 @@ void GraphCache::parse_put(const std::string & keyexpr)
};

// Helper lambda to convert an Entity into a GraphNode.
// Note: this will update bookkeeping variables in GraphCache.
auto make_graph_node =
[&](const Entity & entity) -> std::shared_ptr<GraphNode>
{
Expand All @@ -166,7 +197,7 @@ void GraphCache::parse_put(const std::string & keyexpr)
// Token was for a node.
return graph_node;
}
// Add pub/sub entries.
// Add endpoint entries.
add_topic_data(entity, *graph_node);

return graph_node;
Expand Down Expand Up @@ -257,15 +288,19 @@ void GraphCache::parse_del(const std::string & keyexpr)

// Helper lambda to update graph_topics_.
auto update_graph_topics =
[&](const liveliness::TopicInfo topic_info, std::size_t pub_count,
[&](const liveliness::TopicInfo topic_info, const EntityType entity_type, std::size_t pub_count,
clalancette marked this conversation as resolved.
Show resolved Hide resolved
std::size_t sub_count) -> void
{
GraphNode::TopicMap & graph_endpoints =
entity_type == EntityType::Publisher || entity_type == EntityType::Subscription ?
graph_topics_ :
graph_services_;
GraphNode::TopicMap::iterator cache_topic_it =
graph_topics_.find(topic_info.name_);
if (cache_topic_it == graph_topics_.end()) {
graph_endpoints.find(topic_info.name_);
if (cache_topic_it == graph_endpoints.end()) {
// This should not happen.
RCUTILS_LOG_ERROR_NAMED(
"rmw_zenoh_cpp", "topic_key %s not found in graph_topics_. Report this.",
"rmw_zenoh_cpp", "topic_key %s not found in graph_endpoints. Report this.",
topic_info.name_.c_str());
} else {
GraphNode::TopicDataMap::iterator cache_topic_data_it =
Expand All @@ -281,7 +316,7 @@ void GraphCache::parse_del(const std::string & keyexpr)
}
// If the topic does not have any TopicData entries, erase the topic from the map.
if (cache_topic_it->second.empty()) {
graph_topics_.erase(cache_topic_it);
graph_endpoints.erase(cache_topic_it);
}
}
}
Expand All @@ -292,7 +327,14 @@ void GraphCache::parse_del(const std::string & keyexpr)
auto remove_topic_data =
[&](const Entity & entity, GraphNode & graph_node) -> void
{
if (entity.type() != EntityType::Publisher && entity.type() != EntityType::Subscription) {
if (entity.type() != EntityType::Publisher &&
entity.type() != EntityType::Subscription &&
entity.type() != EntityType::Service &&
entity.type() != EntityType::Client)
{
RCUTILS_LOG_WARN_NAMED(
"rmw_zenoh_cpp",
"remove_topic_data() for invalid EntityType. Report this.");
return;
}

Expand All @@ -304,13 +346,30 @@ void GraphCache::parse_del(const std::string & keyexpr)
"remove_topic_data() called without valid TopicInfo. Report this.");
return;
}

const liveliness::TopicInfo topic_info = entity.topic_info().value();
GraphNode::TopicMap & topic_map = entity.type() ==
EntityType::Publisher ? graph_node.pubs_ : graph_node.subs_;
const std::string entity_desc = entity.type() ==
EntityType::Publisher ? "publisher" : "subscription";
const std::size_t pub_count = entity.type() == EntityType::Publisher ? 1 : 0;
std::string entity_desc = "";
GraphNode::TopicMap & topic_map =
[&]() -> GraphNode::TopicMap &
{
if (entity.type() == EntityType::Publisher) {
entity_desc = "publisher";
return graph_node.pubs_;
} else if (entity.type() == EntityType::Subscription) {
entity_desc = "subscription";
return graph_node.subs_;
} else if (entity.type() == EntityType::Service) {
entity_desc = "service";
return graph_node.services_;

} else {
entity_desc = "client";
return graph_node.clients_;
}
}();
// For the sake of reusing data structures and lookup functions, we treat publishers and clients are equivalent.
// Similarly, subscriptions and services are equivalent.
const std::size_t pub_count = entity.type() == EntityType::Publisher ||
entity.type() == EntityType::Client ? 1 : 0;
const std::size_t sub_count = !pub_count;

GraphNode::TopicMap::iterator topic_it = topic_map.find(topic_info.name_);
Expand Down Expand Up @@ -346,7 +405,7 @@ void GraphCache::parse_del(const std::string & keyexpr)
}

// Bookkeeping: Update graph_topic_ which keeps track of topics across all nodes in the graph.
update_graph_topics(topic_info, pub_count, sub_count);
update_graph_topics(topic_info, entity.type(), pub_count, sub_count);

RCUTILS_LOG_INFO_NAMED(
"rmw_zenoh_cpp",
Expand Down Expand Up @@ -402,18 +461,21 @@ void GraphCache::parse_del(const std::string & keyexpr)
);
auto remove_topics =
[&](const GraphNode::TopicMap & topic_map, const EntityType & entity_type) -> void {
std::size_t pub_count = entity_type == EntityType::Publisher ? 1 : 0;
std::size_t pub_count = entity_type == EntityType::Publisher ||
entity_type == EntityType::Client ? 1 : 0;
std::size_t sub_count = !pub_count;
for (auto topic_it = topic_map.begin(); topic_it != topic_map.end(); ++topic_it) {
for (auto type_it = topic_it->second.begin(); type_it != topic_it->second.end();
++type_it)
{
update_graph_topics(type_it->second->info_, pub_count, sub_count);
update_graph_topics(type_it->second->info_, entity_type, pub_count, sub_count);
}
}
};
remove_topics(graph_node->pubs_, EntityType::Publisher);
remove_topics(graph_node->subs_, EntityType::Subscription);
remove_topics(graph_node->services_, EntityType::Service);
remove_topics(graph_node->clients_, EntityType::Client);
}
ns_it->second.erase(node_it);
RCUTILS_LOG_WARN_NAMED(
Expand Down Expand Up @@ -645,6 +707,18 @@ rmw_ret_t GraphCache::get_topic_names_and_types(
return fill_names_and_types(graph_topics_, allocator, topic_names_and_types);
}

///=============================================================================
rmw_ret_t GraphCache::get_service_names_and_types(
rcutils_allocator_t * allocator,
rmw_names_and_types_t * service_names_and_types) const
{
RCUTILS_CHECK_ALLOCATOR_WITH_MSG(
allocator, "get_node_names allocator is not valid", return RMW_RET_INVALID_ARGUMENT);

std::lock_guard<std::mutex> lock(graph_mutex_);
return fill_names_and_types(graph_services_, allocator, service_names_and_types);
}

///=============================================================================
rmw_ret_t GraphCache::count_publishers(
const char * topic_name,
Expand Down Expand Up @@ -679,6 +753,40 @@ rmw_ret_t GraphCache::count_subscriptions(
return RMW_RET_OK;
}

///=============================================================================
rmw_ret_t GraphCache::count_services(
const char * service_name,
size_t * count) const
{
*count = 0;
std::lock_guard<std::mutex> lock(graph_mutex_);
if (graph_services_.count(service_name) != 0) {
for (const std::pair<const std::string, TopicDataPtr> & it : graph_services_.at(service_name)) {
// Iterate through all the types and increment count.
*count += it.second->stats_.sub_count_;
}
}

return RMW_RET_OK;
}

///=============================================================================
rmw_ret_t GraphCache::count_clients(
const char * service_name,
size_t * count) const
{
*count = 0;
std::lock_guard<std::mutex> lock(graph_mutex_);
if (graph_services_.count(service_name) != 0) {
for (const std::pair<const std::string, TopicDataPtr> & it : graph_services_.at(service_name)) {
// Iterate through all the types and increment count.
*count += it.second->stats_.pub_count_;
}
}

return RMW_RET_OK;
}

///=============================================================================
rmw_ret_t GraphCache::get_entity_names_and_types_by_node(
liveliness::EntityType entity_type,
Expand Down Expand Up @@ -737,6 +845,10 @@ rmw_ret_t GraphCache::get_entity_names_and_types_by_node(
return fill_names_and_types(node_it->second->pubs_, allocator, names_and_types);
} else if (entity_type == EntityType::Subscription) {
return fill_names_and_types(node_it->second->subs_, allocator, names_and_types);
} else if (entity_type == EntityType::Service) {
return fill_names_and_types(node_it->second->services_, allocator, names_and_types);
} else if (entity_type == EntityType::Client) {
return fill_names_and_types(node_it->second->clients_, allocator, names_and_types);
} else {
return RMW_RET_OK;
}
Expand Down Expand Up @@ -787,7 +899,6 @@ rmw_ret_t GraphCache::get_entities_info_by_topic(
}
}


rmw_ret_t ret = rmw_topic_endpoint_info_array_init_with_size(
endpoints_info,
nodes.size(),
Expand Down Expand Up @@ -851,3 +962,22 @@ rmw_ret_t GraphCache::get_entities_info_by_topic(
cleanup_endpoints_info.cancel();
return RMW_RET_OK;
}

///=============================================================================
rmw_ret_t GraphCache::service_server_is_available(
const char * service_name,
const char * service_type,
bool * is_available)
{
*is_available = false;
std::lock_guard<std::mutex> lock(graph_mutex_);
GraphNode::TopicMap::iterator service_it = graph_services_.find(service_name);
if (service_it != graph_services_.end()) {
GraphNode::TopicDataMap::iterator type_it = service_it->second.find(service_type);
if (type_it != service_it->second.end()) {
*is_available = true;
}
}

return RMW_RET_OK;
}
33 changes: 32 additions & 1 deletion rmw_zenoh_cpp/src/detail/graph_cache.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,14 @@


///=============================================================================
// TODO(Yadunund): Since we reuse pub_count_ and sub_count_ for pub/sub and
// service/client consider more general names for these fields.
struct TopicStats
{
// The count of publishers or clients.
std::size_t pub_count_;

// The count of subscriptions or services.
std::size_t sub_count_;

// Constructor which initializes counters to 0.
Expand Down Expand Up @@ -69,8 +74,15 @@ struct GraphNode
using TopicDataMap = std::unordered_map<std::string, TopicDataPtr>;
// Map topic name to TopicDataMap
using TopicMap = std::unordered_map<std::string, TopicDataMap>;

// Entries for pub/sub.
TopicMap pubs_ = {};
TopicMap subs_ = {};

// Entires for service/client.
TopicMap clients_ = {};
TopicMap services_ = {};

};
using GraphNodePtr = std::shared_ptr<GraphNode>;

Expand All @@ -94,6 +106,10 @@ class GraphCache final
bool no_demangle,
rmw_names_and_types_t * topic_names_and_types) const;

rmw_ret_t get_service_names_and_types(
rcutils_allocator_t * allocator,
rmw_names_and_types_t * service_names_and_types) const;

rmw_ret_t count_publishers(
const char * topic_name,
size_t * count) const;
Expand All @@ -102,6 +118,14 @@ class GraphCache final
const char * topic_name,
size_t * count) const;

rmw_ret_t count_services(
const char * service_name,
size_t * count) const;

rmw_ret_t count_clients(
const char * service_name,
size_t * count) const;

rmw_ret_t get_entity_names_and_types_by_node(
liveliness::EntityType entity_type,
rcutils_allocator_t * allocator,
Expand All @@ -117,6 +141,11 @@ class GraphCache final
bool no_demangle,
rmw_topic_endpoint_info_array_t * endpoints_info) const;

rmw_ret_t service_server_is_available(
const char * service_name,
const char * service_type,
bool * is_available);

private:
/*
namespace_1:
Expand Down Expand Up @@ -146,8 +175,10 @@ class GraphCache final
// Map namespace to a map of <node_name, GraphNodePtr>.
NamespaceMap graph_ = {};

// Optimize topic lookups across the graph.
// Optimize pub/sub lookups across the graph.
GraphNode::TopicMap graph_topics_ = {};
// Optimize service/client lookups across the graph.
GraphNode::TopicMap graph_services_ = {};

mutable std::mutex graph_mutex_;
};
Expand Down
Loading
Loading