Skip to content

Commit

Permalink
Enable clock synchronization with federates
Browse files Browse the repository at this point in the history
  • Loading branch information
chanijjani committed Mar 15, 2024
1 parent da9bf64 commit c5a00ba
Show file tree
Hide file tree
Showing 6 changed files with 379 additions and 16 deletions.
4 changes: 4 additions & 0 deletions rust/rti/src/federate_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ impl FederateInfo {
&self.stream
}

pub fn stream_mut(&mut self) -> &mut Option<TcpStream> {
&mut self.stream
}

pub fn clock_synchronization_enabled(&self) -> bool {
self.clock_synchronization_enabled
}
Expand Down
71 changes: 69 additions & 2 deletions rust/rti/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use server::Server;

const RTI_TRACE_FILE_NAME: &str = "rti.lft";

#[derive(PartialEq, PartialOrd, Clone)]
#[derive(PartialEq, PartialOrd, Clone, Debug)]
pub enum ClockSyncStat {
ClockSyncOff,
ClockSyncInit,
Expand Down Expand Up @@ -125,7 +125,7 @@ pub fn process_args(rti: &mut RTIRemote, argv: &[String]) -> Result<(), &'static
return Err("Fail to handle clock_sync option");
}
idx += 1;
// TODO: idx += process_clock_sync_args();
idx += process_clock_sync_args(rti, argc - idx, argv);
} else if arg == "-t" || arg == "--tracing" {
rti.base_mut().set_tracing_enabled(true);
} else if arg == " " {
Expand Down Expand Up @@ -175,6 +175,73 @@ fn usage(argc: usize, argv: &[String]) {
}
}

fn process_clock_sync_args(rti: &mut RTIRemote, argc: usize, argv: &[String]) -> usize {
for mut i in 0..argc {
let arg = argv[i].as_str();
if arg == "off" {
rti.set_clock_sync_global_status(ClockSyncStat::ClockSyncOff);
println!("RTI: Clock sync: off");
} else if arg == "init" || arg == "initial" {
rti.set_clock_sync_global_status(ClockSyncStat::ClockSyncInit);
println!("RTI: Clock sync: init");
} else if arg == "on" {
rti.set_clock_sync_global_status(ClockSyncStat::ClockSyncOn);
println!("RTI: Clock sync: on");
} else if arg == "period" {
if rti.clock_sync_global_status() != ClockSyncStat::ClockSyncOn {
println!("[ERROR] clock sync period can only be set if --clock-sync is set to on.");
usage(argc, argv);
i += 1;
continue; // Try to parse the rest of the arguments as clock sync args.
} else if argc < i + 2 {
println!("[ERROR] clock sync period needs a time (in nanoseconds) argument.");
usage(argc, argv);
continue;
}
i += 1;
let period_ns: u64 = 10;
if period_ns == 0 || period_ns == u64::MAX {
println!("[ERROR] clock sync period value is invalid.");
continue; // Try to parse the rest of the arguments as clock sync args.
}
rti.set_clock_sync_period_ns(period_ns);
println!("RTI: Clock sync period: {}", rti.clock_sync_period_ns());
} else if argv[i] == "exchanges-per-interval" {
if rti.clock_sync_global_status() != ClockSyncStat::ClockSyncOn
&& rti.clock_sync_global_status() != ClockSyncStat::ClockSyncInit
{
println!("[ERROR] clock sync exchanges-per-interval can only be set if\n--clock-sync is set to on or init.");
usage(argc, argv);
continue; // Try to parse the rest of the arguments as clock sync args.
} else if argc < i + 2 {
println!("[ERROR] clock sync exchanges-per-interval needs an integer argument.");
usage(argc, argv);
continue; // Try to parse the rest of the arguments as clock sync args.
}
i += 1;
let exchanges: u32 = 10;
if exchanges == 0 || exchanges == u32::MAX || exchanges == u32::MIN {
println!("[ERROR] clock sync exchanges-per-interval value is invalid.");
continue; // Try to parse the rest of the arguments as clock sync args.
}
rti.set_clock_sync_exchanges_per_interval(exchanges); // FIXME: Loses numbers on 64-bit machines
println!(
"RTI: Clock sync exchanges per interval: {}",
rti.clock_sync_exchanges_per_interval()
);
} else if arg == " " {
// Tolerate spaces
continue;
} else {
// Either done with the clock sync args or there is an invalid
// character. In either case, let the parent function deal with
// the rest of the characters;
return i;
}
}
argc
}

pub fn initialize_federates(rti: &mut RTIRemote) {
if rti.base().tracing_enabled() {
let _lf_number_of_workers = rti.base().number_of_scheduling_nodes();
Expand Down
18 changes: 18 additions & 0 deletions rust/rti/src/net_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ pub enum MsgType {
AddressAdvertisement,
P2pSendingFedId,
P2pTaggedMessage,
ClockSyncT1,
ClockSyncT3,
ClockSyncT4,
ClockSyncCodedProbe,
PortAbsent,
NeighborStructure,
Ignore,
Expand All @@ -129,6 +133,10 @@ impl MsgType {
MsgType::AddressAdvertisement => 14,
MsgType::P2pSendingFedId => 15,
MsgType::P2pTaggedMessage => 17,
MsgType::ClockSyncT1 => 19,
MsgType::ClockSyncT3 => 20,
MsgType::ClockSyncT4 => 21,
MsgType::ClockSyncCodedProbe => 22,
MsgType::PortAbsent => 23,
MsgType::NeighborStructure => 24,
MsgType::Ignore => 250,
Expand All @@ -150,6 +158,10 @@ impl MsgType {
12 => MsgType::StopGranted,
13 => MsgType::AddressQuery,
14 => MsgType::AddressAdvertisement,
19 => MsgType::ClockSyncT1,
20 => MsgType::ClockSyncT3,
21 => MsgType::ClockSyncT4,
22 => MsgType::ClockSyncCodedProbe,
23 => MsgType::PortAbsent,
_ => MsgType::Ignore,
}
Expand Down Expand Up @@ -183,6 +195,12 @@ impl ErrType {
}
}

#[derive(PartialEq, Clone)]
pub enum SocketType {
TCP,
UDP,
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
2 changes: 1 addition & 1 deletion rust/rti/src/net_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ impl NetUtil {
} {}
}

pub fn read_from_stream(stream: &mut TcpStream, buffer: &mut Vec<u8>, fed_id: u16) -> usize {
pub fn read_from_socket(stream: &mut TcpStream, buffer: &mut Vec<u8>, fed_id: u16) -> usize {
let mut bytes_read = 0;
while match stream.read(buffer) {
Ok(msg_size) => {
Expand Down
25 changes: 24 additions & 1 deletion rust/rti/src/rti_remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ pub struct RTIRemote {
/**
* Number of messages exchanged for each clock sync attempt.
*/
clock_sync_exchanges_per_interval: i32,
clock_sync_exchanges_per_interval: u32,

/**
* Boolean indicating that authentication is enabled.
Expand Down Expand Up @@ -146,6 +146,14 @@ impl RTIRemote {
self.clock_sync_global_status.clone()
}

pub fn clock_sync_period_ns(&self) -> u64 {
self.clock_sync_period_ns
}

pub fn clock_sync_exchanges_per_interval(&self) -> u32 {
self.clock_sync_exchanges_per_interval
}

pub fn stop_in_progress(&self) -> bool {
self.stop_in_progress
}
Expand All @@ -167,6 +175,21 @@ impl RTIRemote {
self.user_specified_port = user_specified_port;
}

pub fn set_clock_sync_global_status(&mut self, clock_sync_global_status: ClockSyncStat) {
self.clock_sync_global_status = clock_sync_global_status;
}

pub fn set_clock_sync_period_ns(&mut self, clock_sync_period_ns: u64) {
self.clock_sync_period_ns = clock_sync_period_ns;
}

pub fn set_clock_sync_exchanges_per_interval(
&mut self,
clock_sync_exchanges_per_interval: u32,
) {
self.clock_sync_exchanges_per_interval = clock_sync_exchanges_per_interval;
}

pub fn set_stop_in_progress(&mut self, stop_in_progress: bool) {
self.stop_in_progress = stop_in_progress;
}
Expand Down
Loading

0 comments on commit c5a00ba

Please sign in to comment.