From 953a6fc7f568bcc7127227e1edde3d9ab1149b21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20L=C3=B3pez=20Fern=C3=A1ndez?= Date: Wed, 1 Feb 2023 15:02:35 +0100 Subject: [PATCH] Add sequences support MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Juan López Fernández --- .../datastreamer/FastDdsDataStreamer.cpp | 54 ++++++--- .../datastreamer/FastDdsDataStreamer.hpp | 4 + .../fastdds/FastDdsListener.hpp | 5 + .../fastdds/ReaderHandler.cpp | 113 ++++++++++++------ .../fastdds/ReaderHandler.hpp | 13 ++ .../utils/dynamic_types_utils.cpp | 112 +++++++++++++++-- .../utils/dynamic_types_utils.hpp | 6 +- 7 files changed, 241 insertions(+), 66 deletions(-) diff --git a/plugins/datastreamer_plugin/datastreamer/FastDdsDataStreamer.cpp b/plugins/datastreamer_plugin/datastreamer/FastDdsDataStreamer.cpp index 5a85887..0939258 100644 --- a/plugins/datastreamer_plugin/datastreamer/FastDdsDataStreamer.cpp +++ b/plugins/datastreamer_plugin/datastreamer/FastDdsDataStreamer.cpp @@ -97,23 +97,7 @@ bool FastDdsDataStreamer::start( } // Get all series from topics and create them - // NUMERIC - std::vector numeric_series = fastdds_handler_.numeric_data_series_names(); - for (const auto& series : numeric_series) - { - // Create a series - DEBUG("Creating numeric series: " << series); - dataMap().addNumeric(series); - } - - // STRING - std::vector string_series = fastdds_handler_.string_data_series_names(); - for (const auto& series : string_series) - { - // Create a series - DEBUG("Creating string series: " << series); - dataMap().addStringSeries(series); - } + create_series_(); running_ = true; return true; @@ -161,6 +145,20 @@ bool FastDdsDataStreamer::xmlLoadState( // FASTDDS LISTENER METHODS //////////////////////////////////////////////////// +void FastDdsDataStreamer::on_data_available() +{ + DEBUG("FastDdsDataStreamer on_data_available"); + + // Locking DataStream + std::lock_guard lock(mutex()); + + // Clear data created from previous sample + dataMap().clear(); + + // Create series from new received sample + create_series_(); +} + void FastDdsDataStreamer::on_double_data_read( const std::vector>& data_per_topic_value, double timestamp) @@ -255,6 +253,28 @@ void FastDdsDataStreamer::connect_to_domain_( select_topics_dialog_.connect_to_domain(domain_id); } +void FastDdsDataStreamer::create_series_() +{ + // Get all series from topics and create them + // NUMERIC + std::vector numeric_series = fastdds_handler_.numeric_data_series_names(); + for (const auto& series : numeric_series) + { + // Create a series + DEBUG("Creating numeric series: " << series); + dataMap().addNumeric(series); + } + + // STRING + std::vector string_series = fastdds_handler_.string_data_series_names(); + for (const auto& series : string_series) + { + // Create a series + DEBUG("Creating string series: " << series); + dataMap().addStringSeries(series); + } +} + } /* namespace datastreamer */ } /* namespace plotjuggler */ } /* namespace eprosima */ diff --git a/plugins/datastreamer_plugin/datastreamer/FastDdsDataStreamer.hpp b/plugins/datastreamer_plugin/datastreamer/FastDdsDataStreamer.hpp index e222039..795335a 100644 --- a/plugins/datastreamer_plugin/datastreamer/FastDdsDataStreamer.hpp +++ b/plugins/datastreamer_plugin/datastreamer/FastDdsDataStreamer.hpp @@ -92,6 +92,8 @@ class FastDdsDataStreamer : // FASTDDS LISTENER METHODS //////////////////////////////////////////////////// + virtual void on_data_available() override; + virtual void on_double_data_read( const std::vector& data_per_topic_value, double timestamp) override; @@ -132,6 +134,8 @@ class FastDdsDataStreamer : void connect_to_domain_( unsigned int domain_id); + void create_series_(); + //////////////////////////////////////////////////// // INTERNAL VALUES diff --git a/plugins/datastreamer_plugin/fastdds/FastDdsListener.hpp b/plugins/datastreamer_plugin/fastdds/FastDdsListener.hpp index 57eacab..e0ff842 100644 --- a/plugins/datastreamer_plugin/fastdds/FastDdsListener.hpp +++ b/plugins/datastreamer_plugin/fastdds/FastDdsListener.hpp @@ -41,6 +41,11 @@ class FastDdsListener { public: + virtual void on_data_available() + { + DEBUG("Calling on_data_available"); + } + virtual void on_double_data_read( const std::vector& numeric_data, double timestamp) diff --git a/plugins/datastreamer_plugin/fastdds/ReaderHandler.cpp b/plugins/datastreamer_plugin/fastdds/ReaderHandler.cpp index 66915c9..c73cba8 100644 --- a/plugins/datastreamer_plugin/fastdds/ReaderHandler.cpp +++ b/plugins/datastreamer_plugin/fastdds/ReaderHandler.cpp @@ -48,36 +48,18 @@ ReaderHandler::ReaderHandler( , type_(type) , listener_(listener) , stop_(false) + , data_type_configuration_(data_type_configuration) { // Create data so it is not required to create it each time and avoid reallocation if possible data_ = eprosima::fastrtps::types::DynamicDataFactory::get_instance()->create_data(type_); - // Create the static structures to store the data introspection information AND the data itself - utils::get_introspection_type_names( - topic_name(), - type_, - data_type_configuration, - numeric_data_info_, - string_data_info_); + // Determine whether the data tree will be recreated for every received sample + static_type_ = utils::is_type_static(type); - // Create the data structures so they are not copied in the future - for (const auto& info : numeric_data_info_) + if (static_type_) { - numeric_data_.push_back({ std::get<0>(info), 0}); - } - for (const auto& info : string_data_info_) - { - string_data_.push_back({ std::get<0>(info), "-"}); - } - - DEBUG("Reader created in topic: " << topic_name() << " with types: "); - for (const auto& info : numeric_data_info_) - { - DEBUG("\tNumeric: " << std::get<0>(info)); - } - for (const auto& info : string_data_info_) - { - DEBUG("\tString: " << std::get<0>(info)); + // Create the static structures to store the data introspection information AND the data itself + create_data_structures_(); } // Set this object as this reader's listener @@ -89,7 +71,7 @@ ReaderHandler::~ReaderHandler() // Stop the reader stop(); - // Delete created data + // Delete created data eprosima::fastrtps::types::DynamicDataFactory::get_instance()->delete_data(data_); } @@ -115,6 +97,13 @@ void ReaderHandler::on_data_available( eprosima::fastrtps::types::ReturnCode_t read_ret = eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK; + // Non-fixed size types require data to be recreated (e.g. to avoid having sequence members from previous samples) + if (!static_type_) + { + eprosima::fastrtps::types::DynamicDataFactory::get_instance()->delete_data(data_); + data_ = eprosima::fastrtps::types::DynamicDataFactory::get_instance()->create_data(type_); + } + // Read Data from reader while there is data available and not should stop while (!stop_ && read_ret == eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK) { @@ -125,6 +114,23 @@ void ReaderHandler::on_data_available( if (read_ret == eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK && info.instance_state == eprosima::fastdds::dds::InstanceStateKind::ALIVE_INSTANCE_STATE) { + if (!static_type_) + { + // Reset stored data info + numeric_data_info_.clear(); + string_data_info_.clear(); + + // Reset stored data + numeric_data_.clear(); + string_data_.clear(); + + // Recreate the structures to store the data introspection information AND the data itself + create_data_structures_(data_); + + // Update previous data view according to new received data structure + listener_->on_data_available(); + } + // Get timestamp double timestamp = utils::get_timestamp_seconds_numeric_value(info.reception_timestamp); @@ -172,6 +178,53 @@ const std::string& ReaderHandler::type_name() const return topic_->get_type_name(); } +std::vector ReaderHandler::numeric_data_series_names() const +{ + return utils::get_introspection_type_names(numeric_data_info_); +} + +std::vector ReaderHandler::string_data_series_names() const +{ + return utils::get_introspection_type_names(string_data_info_); +} + +//////////////////////////////////////////////////// +// AUXILIAR METHODS +//////////////////////////////////////////////////// + +void ReaderHandler::create_data_structures_( + eprosima::fastrtps::types::DynamicData* data /* = nullptr */) +{ + // Create the structures to store the data introspection information AND the data itself + utils::get_introspection_type_names( + topic_name(), + type_, + data_type_configuration_, + numeric_data_info_, + string_data_info_, + data); + + // Create the data structures + for (const auto& info : numeric_data_info_) + { + numeric_data_.push_back({ std::get<0>(info), 0}); + } + for (const auto& info : string_data_info_) + { + string_data_.push_back({ std::get<0>(info), "-"}); + } + + DEBUG("Completed type introspection created in topic: " << topic_name() << " with types: "); + for (const auto& info : numeric_data_info_) + { + DEBUG("\tNumeric: " << std::get<0>(info)); + } + for (const auto& info : string_data_info_) + { + DEBUG("\tString: " << std::get<0>(info)); + } +} + //////////////////////////////////////////////////// // AUXILIAR STATIC METHODS //////////////////////////////////////////////////// @@ -187,16 +240,6 @@ eprosima::fastdds::dds::StatusMask ReaderHandler::default_listener_mask_() return mask; } -std::vector ReaderHandler::numeric_data_series_names() const -{ - return utils::get_introspection_type_names(numeric_data_info_); -} - -std::vector ReaderHandler::string_data_series_names() const -{ - return utils::get_introspection_type_names(string_data_info_); -} - } /* namespace fastdds */ } /* namespace plotjuggler */ } /* namespace eprosima */ diff --git a/plugins/datastreamer_plugin/fastdds/ReaderHandler.hpp b/plugins/datastreamer_plugin/fastdds/ReaderHandler.hpp index fd38421..67f5da1 100644 --- a/plugins/datastreamer_plugin/fastdds/ReaderHandler.hpp +++ b/plugins/datastreamer_plugin/fastdds/ReaderHandler.hpp @@ -92,6 +92,14 @@ struct ReaderHandler : public eprosima::fastdds::dds::DataReaderListener std::vector string_data_series_names() const; + //////////////////////////////////////////////////// + // AUXILIAR METHODS + //////////////////////////////////////////////////// + + void create_data_structures_( + eprosima::fastrtps::types::DynamicData* data = nullptr); + + //////////////////////////////////////////////////// // AUXILIAR STATIC METHODS //////////////////////////////////////////////////// @@ -127,6 +135,9 @@ struct ReaderHandler : public eprosima::fastdds::dds::DataReaderListener //! Type Informantion eprosima::fastrtps::types::DynamicType_ptr type_; + //! Whether it is composed of variable sized types (e.g. sequences) + bool static_type_; + //! Data Type element eprosima::fastrtps::types::DynamicData* data_; @@ -137,6 +148,8 @@ struct ReaderHandler : public eprosima::fastdds::dds::DataReaderListener utils::TypeIntrospectionNumericStruct numeric_data_; utils::TypeIntrospectionStringStruct string_data_; + + DataTypeConfiguration data_type_configuration_; }; } /* namespace fastdds */ diff --git a/plugins/datastreamer_plugin/utils/dynamic_types_utils.cpp b/plugins/datastreamer_plugin/utils/dynamic_types_utils.cpp index 307a47d..0deb712 100644 --- a/plugins/datastreamer_plugin/utils/dynamic_types_utils.cpp +++ b/plugins/datastreamer_plugin/utils/dynamic_types_utils.cpp @@ -54,6 +54,7 @@ void get_introspection_type_names( const DataTypeConfiguration& data_type_configuration, TypeIntrospectionCollection& numeric_type_names, TypeIntrospectionCollection& string_type_names, + DynamicData* data /* = nullptr */, const std::vector& current_members_tree /* = {} */, const std::vector& current_kinds_tree /* = {} */, const std::string& separator /* = "/" */) @@ -91,47 +92,69 @@ void get_introspection_type_names( break; case fastrtps::types::TK_ARRAY: + case fastrtps::types::TK_SEQUENCE: { - DynamicType_ptr internal_type = array_internal_kind(type); - unsigned int this_array_size = array_size(type); + DynamicType_ptr internal_type = type_internal_kind(type); + + std::string kind_str; + unsigned int size; + if (kind == fastrtps::types::TK_ARRAY) + { + kind_str = "array"; + size = array_size(type); + } + else if (kind == fastrtps::types::TK_SEQUENCE) + { + kind_str = "sequence"; + assert(data != nullptr); + size = data->get_item_count(); + } - // Allow this array depending on data type configuration - if (this_array_size >= data_type_configuration.max_array_size) + // Allow this array/sequence depending on data type configuration + if (size >= data_type_configuration.max_array_size) { if (data_type_configuration.discard_large_arrays) { - // Discard array - DEBUG("Discarding array " << base_type_name << " of size " << this_array_size); + // Discard array/sequence + DEBUG("Discarding " << kind_str << " " << base_type_name << " of size " << size); break; } else { // Truncate array DEBUG( - "Truncating array " << base_type_name << - " of size " << this_array_size << + "Truncating " << kind_str << " " << base_type_name << + " of size " << size << " to size " << data_type_configuration.max_array_size); - this_array_size = data_type_configuration.max_array_size; + size = data_type_configuration.max_array_size; } // Could not be neither of them, it would be an inconsistency } - for (MemberId member_id = 0; member_id < this_array_size; member_id++) + for (MemberId member_id = 0; member_id < size; member_id++) { std::vector new_members_tree(current_members_tree); new_members_tree.push_back(member_id); std::vector new_kinds_tree(current_kinds_tree); new_kinds_tree.push_back(kind); + DynamicData* member_data = data ? data->loan_value(member_id) : nullptr; + get_introspection_type_names( base_type_name + "[" + std::to_string(member_id) + "]", internal_type, data_type_configuration, numeric_type_names, string_type_names, + member_data, new_members_tree, new_kinds_tree, separator); + + if (member_data) + { + data->return_loaned_value(member_data); + } } break; } @@ -153,22 +176,29 @@ void get_introspection_type_names( std::vector new_kinds_tree(current_kinds_tree); new_kinds_tree.push_back(kind); + DynamicData* member_data = data ? data->loan_value(members.second->get_id()) : nullptr; + get_introspection_type_names( base_type_name + separator + members.first, members.second->get_descriptor()->get_type(), data_type_configuration, numeric_type_names, string_type_names, + member_data, new_members_tree, new_kinds_tree, separator); + + if (member_data) + { + data->return_loaned_value(member_data); + } } break; } case fastrtps::types::TK_BITSET: case fastrtps::types::TK_UNION: - case fastrtps::types::TK_SEQUENCE: case fastrtps::types::TK_MAP: case fastrtps::types::TK_NONE: default: @@ -281,11 +311,12 @@ DynamicData* get_parent_data_of_member( { case fastrtps::types::TK_STRUCTURE: case fastrtps::types::TK_ARRAY: + case fastrtps::types::TK_SEQUENCE: { // Access to the data inside the structure DynamicData* child_data; // Get data pointer to the child_data - // The loan and return is a workaround to avoid creating a unecessary copy of the data + // The loan and return is a workaround to avoid creating an unnecessary copy of the data child_data = data->loan_value(member_id); data->return_loaned_value(child_data); @@ -424,7 +455,7 @@ bool is_kind_string( } } -DynamicType_ptr array_internal_kind( +DynamicType_ptr type_internal_kind( const DynamicType_ptr& dyn_type) { return dyn_type->get_descriptor()->get_element_type(); @@ -436,6 +467,61 @@ unsigned int array_size( return dyn_type->get_descriptor()->get_total_bounds(); } +bool is_type_static( + const DynamicType_ptr& dyn_type) +{ + // Get type kind and store it as kind tree + TypeKind kind = dyn_type->get_kind(); + + switch (kind) + { + case fastrtps::types::TK_BOOLEAN: + case fastrtps::types::TK_BYTE: + case fastrtps::types::TK_INT16: + case fastrtps::types::TK_INT32: + case fastrtps::types::TK_INT64: + case fastrtps::types::TK_UINT16: + case fastrtps::types::TK_UINT32: + case fastrtps::types::TK_UINT64: + case fastrtps::types::TK_FLOAT32: + case fastrtps::types::TK_FLOAT64: + case fastrtps::types::TK_FLOAT128: + case fastrtps::types::TK_CHAR8: + case fastrtps::types::TK_CHAR16: + case fastrtps::types::TK_STRING8: + case fastrtps::types::TK_STRING16: + case fastrtps::types::TK_ENUM: + case fastrtps::types::TK_BITSET: + case fastrtps::types::TK_BITMASK: + case fastrtps::types::TK_UNION: + case fastrtps::types::TK_ARRAY: + return true; + + case fastrtps::types::TK_SEQUENCE: + case fastrtps::types::TK_MAP: + return false; + + case fastrtps::types::TK_STRUCTURE: + { + // Using the Dynamic type, retrieve the name of the fields and its descriptions + std::map members_by_name; + dyn_type->get_all_members_by_name(members_by_name); + + bool ret = true; + for (const auto& members : members_by_name) + { + ret = ret & is_type_static(members.second->get_descriptor()->get_type()); + } + return ret; + } + + case fastrtps::types::TK_NONE: + default: + WARNING(kind << " DataKind Not supported"); + throw std::runtime_error("Unsupported Dynamic Types kind"); + } +} + } /* namespace utils */ } /* namespace plotjuggler */ } /* namespace eprosima */ diff --git a/plugins/datastreamer_plugin/utils/dynamic_types_utils.hpp b/plugins/datastreamer_plugin/utils/dynamic_types_utils.hpp index 3bd7298..7037fc4 100644 --- a/plugins/datastreamer_plugin/utils/dynamic_types_utils.hpp +++ b/plugins/datastreamer_plugin/utils/dynamic_types_utils.hpp @@ -87,6 +87,7 @@ void get_introspection_type_names( const DataTypeConfiguration& data_type_configuration, TypeIntrospectionCollection& numeric_type_names, TypeIntrospectionCollection& string_type_names, + DynamicData* data = nullptr, const std::vector& current_members_tree = {}, const std::vector& current_kinds_tree = {}, const std::string& separator = "/"); @@ -172,12 +173,15 @@ bool is_kind_numeric( bool is_kind_string( const TypeKind& kind); -DynamicType_ptr array_internal_kind( +DynamicType_ptr type_internal_kind( const DynamicType_ptr& dyn_type); unsigned int array_size( const DynamicType_ptr& dyn_type); +bool is_type_static( + const DynamicType_ptr& dyn_type); + } /* namespace utils */ } /* namespace plotjuggler */ } /* namespace eprosima */