From c5a00baba90ff00589ad2fe4122eb76c0a92c915 Mon Sep 17 00:00:00 2001 From: Chanhee Lee Date: Sun, 10 Mar 2024 20:48:20 -0700 Subject: [PATCH] Enable clock synchronization with federates --- rust/rti/src/federate_info.rs | 4 + rust/rti/src/lib.rs | 71 ++++++++- rust/rti/src/net_common.rs | 18 +++ rust/rti/src/net_util.rs | 2 +- rust/rti/src/rti_remote.rs | 25 +++- rust/rti/src/server.rs | 275 ++++++++++++++++++++++++++++++++-- 6 files changed, 379 insertions(+), 16 deletions(-) diff --git a/rust/rti/src/federate_info.rs b/rust/rti/src/federate_info.rs index daf5b48..eb0f00f 100644 --- a/rust/rti/src/federate_info.rs +++ b/rust/rti/src/federate_info.rs @@ -78,6 +78,10 @@ impl FederateInfo { &self.stream } + pub fn stream_mut(&mut self) -> &mut Option { + &mut self.stream + } + pub fn clock_synchronization_enabled(&self) -> bool { self.clock_synchronization_enabled } diff --git a/rust/rti/src/lib.rs b/rust/rti/src/lib.rs index 3f7ab20..85799a4 100644 --- a/rust/rti/src/lib.rs +++ b/rust/rti/src/lib.rs @@ -29,7 +29,7 @@ use server::Server; const RTI_TRACE_FILE_NAME: &str = "rti.lft"; -#[derive(PartialEq, PartialOrd, Clone)] +#[derive(PartialEq, PartialOrd, Clone, Debug)] pub enum ClockSyncStat { ClockSyncOff, ClockSyncInit, @@ -125,7 +125,7 @@ pub fn process_args(rti: &mut RTIRemote, argv: &[String]) -> Result<(), &'static return Err("Fail to handle clock_sync option"); } idx += 1; - // TODO: idx += process_clock_sync_args(); + idx += process_clock_sync_args(rti, argc - idx, argv); } else if arg == "-t" || arg == "--tracing" { rti.base_mut().set_tracing_enabled(true); } else if arg == " " { @@ -175,6 +175,73 @@ fn usage(argc: usize, argv: &[String]) { } } +fn process_clock_sync_args(rti: &mut RTIRemote, argc: usize, argv: &[String]) -> usize { + for mut i in 0..argc { + let arg = argv[i].as_str(); + if arg == "off" { + rti.set_clock_sync_global_status(ClockSyncStat::ClockSyncOff); + println!("RTI: Clock sync: off"); + } else if arg == "init" || arg == "initial" { + rti.set_clock_sync_global_status(ClockSyncStat::ClockSyncInit); + println!("RTI: Clock sync: init"); + } else if arg == "on" { + rti.set_clock_sync_global_status(ClockSyncStat::ClockSyncOn); + println!("RTI: Clock sync: on"); + } else if arg == "period" { + if rti.clock_sync_global_status() != ClockSyncStat::ClockSyncOn { + println!("[ERROR] clock sync period can only be set if --clock-sync is set to on."); + usage(argc, argv); + i += 1; + continue; // Try to parse the rest of the arguments as clock sync args. + } else if argc < i + 2 { + println!("[ERROR] clock sync period needs a time (in nanoseconds) argument."); + usage(argc, argv); + continue; + } + i += 1; + let period_ns: u64 = 10; + if period_ns == 0 || period_ns == u64::MAX { + println!("[ERROR] clock sync period value is invalid."); + continue; // Try to parse the rest of the arguments as clock sync args. + } + rti.set_clock_sync_period_ns(period_ns); + println!("RTI: Clock sync period: {}", rti.clock_sync_period_ns()); + } else if argv[i] == "exchanges-per-interval" { + if rti.clock_sync_global_status() != ClockSyncStat::ClockSyncOn + && rti.clock_sync_global_status() != ClockSyncStat::ClockSyncInit + { + println!("[ERROR] clock sync exchanges-per-interval can only be set if\n--clock-sync is set to on or init."); + usage(argc, argv); + continue; // Try to parse the rest of the arguments as clock sync args. + } else if argc < i + 2 { + println!("[ERROR] clock sync exchanges-per-interval needs an integer argument."); + usage(argc, argv); + continue; // Try to parse the rest of the arguments as clock sync args. + } + i += 1; + let exchanges: u32 = 10; + if exchanges == 0 || exchanges == u32::MAX || exchanges == u32::MIN { + println!("[ERROR] clock sync exchanges-per-interval value is invalid."); + continue; // Try to parse the rest of the arguments as clock sync args. + } + rti.set_clock_sync_exchanges_per_interval(exchanges); // FIXME: Loses numbers on 64-bit machines + println!( + "RTI: Clock sync exchanges per interval: {}", + rti.clock_sync_exchanges_per_interval() + ); + } else if arg == " " { + // Tolerate spaces + continue; + } else { + // Either done with the clock sync args or there is an invalid + // character. In either case, let the parent function deal with + // the rest of the characters; + return i; + } + } + argc +} + pub fn initialize_federates(rti: &mut RTIRemote) { if rti.base().tracing_enabled() { let _lf_number_of_workers = rti.base().number_of_scheduling_nodes(); diff --git a/rust/rti/src/net_common.rs b/rust/rti/src/net_common.rs index 8dfe47d..d33c8d1 100644 --- a/rust/rti/src/net_common.rs +++ b/rust/rti/src/net_common.rs @@ -103,6 +103,10 @@ pub enum MsgType { AddressAdvertisement, P2pSendingFedId, P2pTaggedMessage, + ClockSyncT1, + ClockSyncT3, + ClockSyncT4, + ClockSyncCodedProbe, PortAbsent, NeighborStructure, Ignore, @@ -129,6 +133,10 @@ impl MsgType { MsgType::AddressAdvertisement => 14, MsgType::P2pSendingFedId => 15, MsgType::P2pTaggedMessage => 17, + MsgType::ClockSyncT1 => 19, + MsgType::ClockSyncT3 => 20, + MsgType::ClockSyncT4 => 21, + MsgType::ClockSyncCodedProbe => 22, MsgType::PortAbsent => 23, MsgType::NeighborStructure => 24, MsgType::Ignore => 250, @@ -150,6 +158,10 @@ impl MsgType { 12 => MsgType::StopGranted, 13 => MsgType::AddressQuery, 14 => MsgType::AddressAdvertisement, + 19 => MsgType::ClockSyncT1, + 20 => MsgType::ClockSyncT3, + 21 => MsgType::ClockSyncT4, + 22 => MsgType::ClockSyncCodedProbe, 23 => MsgType::PortAbsent, _ => MsgType::Ignore, } @@ -183,6 +195,12 @@ impl ErrType { } } +#[derive(PartialEq, Clone)] +pub enum SocketType { + TCP, + UDP, +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/rti/src/net_util.rs b/rust/rti/src/net_util.rs index bd53636..e4a2cb0 100644 --- a/rust/rti/src/net_util.rs +++ b/rust/rti/src/net_util.rs @@ -31,7 +31,7 @@ impl NetUtil { } {} } - pub fn read_from_stream(stream: &mut TcpStream, buffer: &mut Vec, fed_id: u16) -> usize { + pub fn read_from_socket(stream: &mut TcpStream, buffer: &mut Vec, fed_id: u16) -> usize { let mut bytes_read = 0; while match stream.read(buffer) { Ok(msg_size) => { diff --git a/rust/rti/src/rti_remote.rs b/rust/rti/src/rti_remote.rs index 724537f..c1bc27f 100644 --- a/rust/rti/src/rti_remote.rs +++ b/rust/rti/src/rti_remote.rs @@ -80,7 +80,7 @@ pub struct RTIRemote { /** * Number of messages exchanged for each clock sync attempt. */ - clock_sync_exchanges_per_interval: i32, + clock_sync_exchanges_per_interval: u32, /** * Boolean indicating that authentication is enabled. @@ -146,6 +146,14 @@ impl RTIRemote { self.clock_sync_global_status.clone() } + pub fn clock_sync_period_ns(&self) -> u64 { + self.clock_sync_period_ns + } + + pub fn clock_sync_exchanges_per_interval(&self) -> u32 { + self.clock_sync_exchanges_per_interval + } + pub fn stop_in_progress(&self) -> bool { self.stop_in_progress } @@ -167,6 +175,21 @@ impl RTIRemote { self.user_specified_port = user_specified_port; } + pub fn set_clock_sync_global_status(&mut self, clock_sync_global_status: ClockSyncStat) { + self.clock_sync_global_status = clock_sync_global_status; + } + + pub fn set_clock_sync_period_ns(&mut self, clock_sync_period_ns: u64) { + self.clock_sync_period_ns = clock_sync_period_ns; + } + + pub fn set_clock_sync_exchanges_per_interval( + &mut self, + clock_sync_exchanges_per_interval: u32, + ) { + self.clock_sync_exchanges_per_interval = clock_sync_exchanges_per_interval; + } + pub fn set_stop_in_progress(&mut self, stop_in_progress: bool) { self.stop_in_progress = stop_in_progress; } diff --git a/rust/rti/src/server.rs b/rust/rti/src/server.rs index 5e1c16f..4147c6a 100644 --- a/rust/rti/src/server.rs +++ b/rust/rti/src/server.rs @@ -12,6 +12,7 @@ use std::net::{IpAddr, Shutdown, TcpListener, TcpStream}; use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::thread; use std::thread::JoinHandle; +use std::time::Duration; use crate::in_transit_message_queue::InTransitMessageQueue; use crate::net_common; @@ -187,7 +188,7 @@ impl Server { } // Read no more than one byte to get the message type. // FIXME: Handle unwrap properly. - let bytes_read = NetUtil::read_from_stream( + let bytes_read = NetUtil::read_from_socket( &mut stream, &mut buffer, fed_id.try_into().unwrap(), @@ -324,24 +325,196 @@ impl Server { println!("All federates have connected to RTI."); let cloned_rti = Arc::clone(&arc_rti); - let locked_rti = cloned_rti.read().unwrap(); - let clock_sync_global_status = locked_rti.clock_sync_global_status(); + let clock_sync_global_status; + let number_of_scheduling_nodes; + let final_port_udp; + { + let locked_rti = cloned_rti.read().unwrap(); + clock_sync_global_status = locked_rti.clock_sync_global_status(); + number_of_scheduling_nodes = locked_rti.base().number_of_scheduling_nodes(); + final_port_udp = locked_rti.final_port_udp(); + } + println!("clock_sync_global_status = {:?}", clock_sync_global_status); if clock_sync_global_status >= ClockSyncStat::ClockSyncOn { + println!("clock_sync_global_status >= ClockSyncStat::ClockSyncOn"); // Create the thread that performs periodic PTP clock synchronization sessions // over the UDP channel, but only if the UDP channel is open and at least one // federate_info is performing runtime clock synchronization. let mut clock_sync_enabled = false; - for i in 0..locked_rti.base().number_of_scheduling_nodes() { - if locked_rti.base().scheduling_nodes()[i as usize].clock_synchronization_enabled() + for i in 0..number_of_scheduling_nodes { { - clock_sync_enabled = true; - break; + let locked_rti = cloned_rti.read().unwrap(); + if locked_rti.base().scheduling_nodes()[i as usize] + .clock_synchronization_enabled() + { + clock_sync_enabled = true; + break; + } } } - if locked_rti.final_port_udp() != u16::MAX && clock_sync_enabled { - println!("\tNEED to create clock_synchronization_thread thread.."); - // TODO: Implement the following. - // lf_thread_create(&_f_rti->clock_thread, clock_synchronization_thread, NULL); + println!("After inspecting federates..."); + // let cloned_start_time = Arc::clone(&start_time); + // let cloned_received_start_times = Arc::clone(&received_start_times); + + if final_port_udp != u16::MAX && clock_sync_enabled { + let handle = thread::spawn(move || { + println!("Thread is spanwed!!!"); + // Wait until all federates have been notified of the start time. + // FIXME: Use lf_ version of this when merged with master. + { + let locked_rti = cloned_rti.read().unwrap(); + while locked_rti.num_feds_proposed_start() + < locked_rti.base().number_of_scheduling_nodes() + { + // Need to wait here. + let received_start_times_notifier = Arc::clone(&received_start_times); + let (lock, condvar) = &*received_start_times_notifier; + let mut notified = lock.lock().unwrap(); + while !*notified { + notified = condvar.wait(notified).unwrap(); + } + } + } + + // Wait until the start time before starting clock synchronization. + // The above wait ensures that start_time has been set. + println!("After cond wait"); + let start_time_value; + { + let locked_start_time = start_time.lock().unwrap(); + start_time_value = locked_start_time.start_time(); + } + let ns_to_wait = start_time_value - Tag::lf_time_physical(); + + if ns_to_wait > 0 { + // TODO: Handle unwrap() properly. + let ns = Duration::from_nanos(ns_to_wait.try_into().unwrap()); + thread::sleep(ns); + } + println!("After sleep for {} ns", ns_to_wait); + // Initiate a clock synchronization every rti->clock_sync_period_ns + // Initiate a clock synchronization every rti->clock_sync_period_ns + // let sleep_time = {(time_t)rti_remote->clock_sync_period_ns / BILLION, + // rti_remote->clock_sync_period_ns % BILLION}; + // let remaining_time; + + let mut any_federates_connected = true; + while any_federates_connected { + // Sleep + let clock_sync_period_ns; + let number_of_scheduling_nodes; + { + let locked_rti = cloned_rti.read().unwrap(); + clock_sync_period_ns = locked_rti.clock_sync_period_ns(); + number_of_scheduling_nodes = + locked_rti.base().number_of_scheduling_nodes(); + } + let ns = Duration::from_nanos(clock_sync_period_ns); // Can be interrupted + thread::sleep(ns); + any_federates_connected = false; + for fed_id in 0..number_of_scheduling_nodes { + let state; + let clock_synchronization_enabled; + { + let locked_rti = cloned_rti.read().unwrap(); + let idx: usize = fed_id as usize; + let fed = &locked_rti.base().scheduling_nodes()[idx]; + state = fed.enclave().state(); + clock_synchronization_enabled = fed.clock_synchronization_enabled(); + } + if state == SchedulingNodeState::NotConnected { + // FIXME: We need better error handling here, but clock sync failure + // should not stop execution. + println!( + "[ERROR] Clock sync failed with federate {}. Not connected.", + fed_id + ); + continue; + } else if !clock_synchronization_enabled { + continue; + } + // Send the RTI's current physical time to the federate + // Send on UDP. + println!( + "[DEBUG] RTI sending T1 message to initiate clock sync round." + ); + // TODO: Handle unwrap() properly. + Self::send_physical_clock( + fed_id.try_into().unwrap(), + cloned_rti.clone(), + MsgType::ClockSyncT1.to_byte(), + SocketType::UDP, + ); + + // Listen for reply message, which should be T3. + let message_size = 1 + std::mem::size_of::(); + let mut buffer = vec![0 as u8; message_size]; + // Maximum number of messages that we discard before giving up on this cycle. + // If the T3 message from this federate does not arrive and we keep receiving + // other messages, then give up on this federate and move to the next federate. + let mut remaining_attempts = 5; + while remaining_attempts > 0 { + remaining_attempts -= 1; + let read_failed; + { + let mut locked_rti = cloned_rti.write().unwrap(); + let idx: usize = fed_id as usize; + let fed: &mut FederateInfo = + &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; + let stream: &mut TcpStream = + &mut fed.stream_mut().as_mut().unwrap(); + // TODO: Handle unwrap() properly. + read_failed = NetUtil::read_from_socket( + stream, + &mut buffer, + fed_id.try_into().unwrap(), + ); + } + // If any errors occur, either discard the message or the clock sync round. + if read_failed == 0 { + if buffer[0] == MsgType::ClockSyncT3.to_byte() { + // TODO: Change from_le_bytes properly. + let fed_id_2 = i32::from_le_bytes( + buffer[1..1 + std::mem::size_of::()] + .try_into() + .unwrap(), + ); + // Check that this message came from the correct federate. + if fed_id_2 != fed_id { + // Message is from the wrong federate. Discard the message. + println!("[WARNING] Clock sync: Received T3 message from federate {}, but expected one from {}. Discarding message.", + fed_id_2, fed_id); + continue; + } + println!("[EDBUG] Clock sync: RTI received T3 message from federate {}.", fed_id_2); + // TODO: Handle unwrap() properly. + Self::handle_physical_clock_sync_message( + fed_id_2.try_into().unwrap(), + cloned_rti.clone(), + SocketType::UDP, + ); + break; + } else { + // The message is not a T3 message. Discard the message and + // continue waiting for the T3 message. This is possibly a message + // from a previous cycle that was discarded. + println!("[WARNING] Clock sync: Unexpected UDP message {}. Expected MsgType::ClockSyncT3 from federate {}. Discarding message.", + buffer[0], fed_id); + continue; + } + } else { + println!("[WARNING] Clock sync: Read from UDP socket failed: Skipping clock sync round for federate {}.", + fed_id); + remaining_attempts -= 1; + } + } + if remaining_attempts > 0 { + any_federates_connected = true; + } + } + } + }); + handle_list.push(handle); } } @@ -591,6 +764,7 @@ impl Server { fed_id ); let cloned_rti = Arc::clone(&_f_rti); + // TODO: Handle unwrap() properly. let mut connection_info_header = vec![0 as u8; MSG_TYPE_NEIGHBOR_STRUCTURE_HEADER_SIZE.try_into().unwrap()]; NetUtil::read_from_socket_fail_on_error( @@ -776,7 +950,7 @@ impl Server { ) { let mut buffer = vec![0 as u8; mem::size_of::()]; // Read bytes from the socket. We need 8 bytes. - let bytes_read = NetUtil::read_from_stream(stream, &mut buffer, fed_id); + let bytes_read = NetUtil::read_from_socket(stream, &mut buffer, fed_id); if bytes_read < mem::size_of::() { println!("ERROR reading timestamp from federate_info {}.", fed_id); } @@ -2020,4 +2194,81 @@ impl Server { ); } } + + fn send_physical_clock( + fed_id: u16, + _f_rti: Arc>, + message_type: u8, + socket_type: SocketType, + ) { + let state; + { + let locked_rti = _f_rti.read().unwrap(); + let idx: usize = fed_id.into(); + let fed = &locked_rti.base().scheduling_nodes()[idx]; + state = fed.enclave().state(); + } + if state == SchedulingNodeState::NotConnected { + println!("[WARNING] Clock sync: RTI failed to send physical time to federate {}. Socket not connected.\n", + fed_id); + return; + } + let mut buffer = vec![0 as u8; std::mem::size_of::() + 1]; + buffer[0] = message_type; + let current_physical_time = Tag::lf_time_physical(); + NetUtil::encode_int64(current_physical_time, &mut buffer, 1); + + // Send the message + if socket_type == SocketType::UDP { + println!("Sending messages through UDP is not supported yet."); + // TODO: Enable the following code. + // FIXME: UDP_addr is never initialized. + // println!("[Debug] Clock sync: RTI sending UDP message type %u.", buffer[0]); + // let bytes_written = sendto(rti_remote->socket_descriptor_UDP, buffer, 1 + sizeof(int64_t), 0, + // (struct sockaddr*)&fed->UDP_addr, sizeof(fed->UDP_addr)); + // if bytes_written < std::mem::size_of() + 1 { + // println!("[WARNING] Clock sync: RTI failed to send physical time to federate {}: \n", fed_id); + // return; + // } + } else if socket_type == SocketType::TCP { + println!( + "[DEBUG] Clock sync: RTI sending TCP message type {}.", + buffer[0] + ); + let locked_rti = _f_rti.read().unwrap(); + let idx: usize = fed_id.into(); + let fed = &locked_rti.base().scheduling_nodes()[idx]; + let stream = fed.stream().as_ref().unwrap(); + NetUtil::write_to_socket_fail_on_error(stream, &buffer, fed_id, "physical time"); + } + println!("[DEBUG] Clock sync: RTI sent PHYSICAL_TIME_SYNC_MESSAGE with timestamp ({}) to federate {}.", + current_physical_time, fed_id); + } + + fn handle_physical_clock_sync_message( + fed_id: u16, + _f_rti: Arc>, + socket_type: SocketType, + ) { + // Lock the mutex to prevent interference between sending the two + // coded probe messages. + let _locked_rti = _f_rti.write().unwrap(); + // Reply with a T4 type message + Self::send_physical_clock( + fed_id, + _f_rti.clone(), + MsgType::ClockSyncT4.to_byte(), + socket_type.clone(), + ); + // Send the corresponding coded probe immediately after, + // but only if this is a UDP channel. + if socket_type == SocketType::UDP { + Self::send_physical_clock( + fed_id, + _f_rti.clone(), + MsgType::ClockSyncCodedProbe.to_byte(), + socket_type, + ); + } + } }