Skip to content

Commit

Permalink
Add sequences support
Browse files Browse the repository at this point in the history
Signed-off-by: Juan López Fernández <juanlopez@eprosima.com>
  • Loading branch information
juanlofer-eprosima committed Feb 2, 2023
1 parent 7110a50 commit 953a6fc
Show file tree
Hide file tree
Showing 7 changed files with 241 additions and 66 deletions.
54 changes: 37 additions & 17 deletions plugins/datastreamer_plugin/datastreamer/FastDdsDataStreamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,23 +97,7 @@ bool FastDdsDataStreamer::start(
}

// Get all series from topics and create them
// NUMERIC
std::vector<types::DatumLabel> numeric_series = fastdds_handler_.numeric_data_series_names();
for (const auto& series : numeric_series)
{
// Create a series
DEBUG("Creating numeric series: " << series);
dataMap().addNumeric(series);
}

// STRING
std::vector<types::DatumLabel> string_series = fastdds_handler_.string_data_series_names();
for (const auto& series : string_series)
{
// Create a series
DEBUG("Creating string series: " << series);
dataMap().addStringSeries(series);
}
create_series_();

running_ = true;
return true;
Expand Down Expand Up @@ -161,6 +145,20 @@ bool FastDdsDataStreamer::xmlLoadState(
// FASTDDS LISTENER METHODS
////////////////////////////////////////////////////

void FastDdsDataStreamer::on_data_available()
{
DEBUG("FastDdsDataStreamer on_data_available");

// Locking DataStream
std::lock_guard<std::mutex> lock(mutex());

// Clear data created from previous sample
dataMap().clear();

// Create series from new received sample
create_series_();
}

void FastDdsDataStreamer::on_double_data_read(
const std::vector<std::pair<std::string, double>>& data_per_topic_value,
double timestamp)
Expand Down Expand Up @@ -255,6 +253,28 @@ void FastDdsDataStreamer::connect_to_domain_(
select_topics_dialog_.connect_to_domain(domain_id);
}

void FastDdsDataStreamer::create_series_()
{
// Get all series from topics and create them
// NUMERIC
std::vector<types::DatumLabel> numeric_series = fastdds_handler_.numeric_data_series_names();
for (const auto& series : numeric_series)
{
// Create a series
DEBUG("Creating numeric series: " << series);
dataMap().addNumeric(series);
}

// STRING
std::vector<types::DatumLabel> string_series = fastdds_handler_.string_data_series_names();
for (const auto& series : string_series)
{
// Create a series
DEBUG("Creating string series: " << series);
dataMap().addStringSeries(series);
}
}

} /* namespace datastreamer */
} /* namespace plotjuggler */
} /* namespace eprosima */
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class FastDdsDataStreamer :
// FASTDDS LISTENER METHODS
////////////////////////////////////////////////////

virtual void on_data_available() override;

virtual void on_double_data_read(
const std::vector<types::NumericDatum>& data_per_topic_value,
double timestamp) override;
Expand Down Expand Up @@ -132,6 +134,8 @@ class FastDdsDataStreamer :
void connect_to_domain_(
unsigned int domain_id);

void create_series_();


////////////////////////////////////////////////////
// INTERNAL VALUES
Expand Down
5 changes: 5 additions & 0 deletions plugins/datastreamer_plugin/fastdds/FastDdsListener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ class FastDdsListener
{
public:

virtual void on_data_available()
{
DEBUG("Calling on_data_available");
}

virtual void on_double_data_read(
const std::vector<types::NumericDatum>& numeric_data,
double timestamp)
Expand Down
113 changes: 78 additions & 35 deletions plugins/datastreamer_plugin/fastdds/ReaderHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,36 +48,18 @@ ReaderHandler::ReaderHandler(
, type_(type)
, listener_(listener)
, stop_(false)
, data_type_configuration_(data_type_configuration)
{
// Create data so it is not required to create it each time and avoid reallocation if possible
data_ = eprosima::fastrtps::types::DynamicDataFactory::get_instance()->create_data(type_);

// Create the static structures to store the data introspection information AND the data itself
utils::get_introspection_type_names(
topic_name(),
type_,
data_type_configuration,
numeric_data_info_,
string_data_info_);
// Determine whether the data tree will be recreated for every received sample
static_type_ = utils::is_type_static(type);

// Create the data structures so they are not copied in the future
for (const auto& info : numeric_data_info_)
if (static_type_)
{
numeric_data_.push_back({ std::get<0>(info), 0});
}
for (const auto& info : string_data_info_)
{
string_data_.push_back({ std::get<0>(info), "-"});
}

DEBUG("Reader created in topic: " << topic_name() << " with types: ");
for (const auto& info : numeric_data_info_)
{
DEBUG("\tNumeric: " << std::get<0>(info));
}
for (const auto& info : string_data_info_)
{
DEBUG("\tString: " << std::get<0>(info));
// Create the static structures to store the data introspection information AND the data itself
create_data_structures_();
}

// Set this object as this reader's listener
Expand All @@ -89,7 +71,7 @@ ReaderHandler::~ReaderHandler()
// Stop the reader
stop();

// Delete created data
// Delete created data
eprosima::fastrtps::types::DynamicDataFactory::get_instance()->delete_data(data_);
}

Expand All @@ -115,6 +97,13 @@ void ReaderHandler::on_data_available(
eprosima::fastrtps::types::ReturnCode_t read_ret =
eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK;

// Non-fixed size types require data to be recreated (e.g. to avoid having sequence members from previous samples)
if (!static_type_)
{
eprosima::fastrtps::types::DynamicDataFactory::get_instance()->delete_data(data_);
data_ = eprosima::fastrtps::types::DynamicDataFactory::get_instance()->create_data(type_);
}

// Read Data from reader while there is data available and not should stop
while (!stop_ && read_ret == eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK)
{
Expand All @@ -125,6 +114,23 @@ void ReaderHandler::on_data_available(
if (read_ret == eprosima::fastrtps::types::ReturnCode_t::RETCODE_OK &&
info.instance_state == eprosima::fastdds::dds::InstanceStateKind::ALIVE_INSTANCE_STATE)
{
if (!static_type_)
{
// Reset stored data info
numeric_data_info_.clear();
string_data_info_.clear();

// Reset stored data
numeric_data_.clear();
string_data_.clear();

// Recreate the structures to store the data introspection information AND the data itself
create_data_structures_(data_);

// Update previous data view according to new received data structure
listener_->on_data_available();
}

// Get timestamp
double timestamp = utils::get_timestamp_seconds_numeric_value(info.reception_timestamp);

Expand Down Expand Up @@ -172,6 +178,53 @@ const std::string& ReaderHandler::type_name() const
return topic_->get_type_name();
}

std::vector<std::string> ReaderHandler::numeric_data_series_names() const
{
return utils::get_introspection_type_names(numeric_data_info_);
}

std::vector<std::string> ReaderHandler::string_data_series_names() const
{
return utils::get_introspection_type_names(string_data_info_);
}

////////////////////////////////////////////////////
// AUXILIAR METHODS
////////////////////////////////////////////////////

void ReaderHandler::create_data_structures_(
eprosima::fastrtps::types::DynamicData* data /* = nullptr */)
{
// Create the structures to store the data introspection information AND the data itself
utils::get_introspection_type_names(
topic_name(),
type_,
data_type_configuration_,
numeric_data_info_,
string_data_info_,
data);

// Create the data structures
for (const auto& info : numeric_data_info_)
{
numeric_data_.push_back({ std::get<0>(info), 0});
}
for (const auto& info : string_data_info_)
{
string_data_.push_back({ std::get<0>(info), "-"});
}

DEBUG("Completed type introspection created in topic: " << topic_name() << " with types: ");
for (const auto& info : numeric_data_info_)
{
DEBUG("\tNumeric: " << std::get<0>(info));
}
for (const auto& info : string_data_info_)
{
DEBUG("\tString: " << std::get<0>(info));
}
}

////////////////////////////////////////////////////
// AUXILIAR STATIC METHODS
////////////////////////////////////////////////////
Expand All @@ -187,16 +240,6 @@ eprosima::fastdds::dds::StatusMask ReaderHandler::default_listener_mask_()
return mask;
}

std::vector<std::string> ReaderHandler::numeric_data_series_names() const
{
return utils::get_introspection_type_names(numeric_data_info_);
}

std::vector<std::string> ReaderHandler::string_data_series_names() const
{
return utils::get_introspection_type_names(string_data_info_);
}

} /* namespace fastdds */
} /* namespace plotjuggler */
} /* namespace eprosima */
13 changes: 13 additions & 0 deletions plugins/datastreamer_plugin/fastdds/ReaderHandler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,14 @@ struct ReaderHandler : public eprosima::fastdds::dds::DataReaderListener
std::vector<types::DatumLabel> string_data_series_names() const;


////////////////////////////////////////////////////
// AUXILIAR METHODS
////////////////////////////////////////////////////

void create_data_structures_(
eprosima::fastrtps::types::DynamicData* data = nullptr);


////////////////////////////////////////////////////
// AUXILIAR STATIC METHODS
////////////////////////////////////////////////////
Expand Down Expand Up @@ -127,6 +135,9 @@ struct ReaderHandler : public eprosima::fastdds::dds::DataReaderListener
//! Type Informantion
eprosima::fastrtps::types::DynamicType_ptr type_;

//! Whether it is composed of variable sized types (e.g. sequences)
bool static_type_;

//! Data Type element
eprosima::fastrtps::types::DynamicData* data_;

Expand All @@ -137,6 +148,8 @@ struct ReaderHandler : public eprosima::fastdds::dds::DataReaderListener

utils::TypeIntrospectionNumericStruct numeric_data_;
utils::TypeIntrospectionStringStruct string_data_;

DataTypeConfiguration data_type_configuration_;
};

} /* namespace fastdds */
Expand Down
Loading

0 comments on commit 953a6fc

Please sign in to comment.