From eb42a82031d49531af7c44b57b7bc2a649bf39f8 Mon Sep 17 00:00:00 2001 From: Chanhee Lee Date: Sun, 18 Feb 2024 08:10:55 -0700 Subject: [PATCH] Improve shared rti accesses by replacing mutex with RwLock - Multiple read accesses without any write access are now possible and only one write access is allowed for shared rti variables. --- .github/workflows/rust.yml | 17 +-- rust/rti/src/federate_info.rs | 4 +- rust/rti/src/lib.rs | 8 +- rust/rti/src/rti_common.rs | 213 ++++++++++++++++--------------- rust/rti/src/rti_remote.rs | 6 +- rust/rti/src/server.rs | 231 +++++++++++++++++----------------- 6 files changed, 246 insertions(+), 233 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index bcc8be5..5a5cd91 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -26,11 +26,12 @@ jobs: - name: Unit tests run: cd rust/rti; cargo test - # fetch-lf: - # uses: chanijjani/lingua-franca/.github/workflows/extract-ref.yml@master - # with: - # file: 'lingua-franca-ref.txt' - - lf-default: - # TODO(chanijjani): Change the pointer to point to the main lingua-franca repo. - uses: chanijjani/lingua-franca/.github/workflows/c-tests-with-rust-rti.yml@integration_tests_with_rust_rti + lf-rust-rti: + runs-on: ubuntu-latest + + steps: + - name: Check out lingua-franca repository + uses: actions/checkout@v3 + - name: Execute C federated tests from lf-lang/lingua-franca repository + # TODO(chanijjani): Change the pointer to point to the main lingua-franca repo. + uses: chanijjani/lingua-franca/.github/workflows/c-tests-with-rust-rti.yml@integration_tests_with_rust_rti diff --git a/rust/rti/src/federate_info.rs b/rust/rti/src/federate_info.rs index 2ed3cea..29dbf55 100644 --- a/rust/rti/src/federate_info.rs +++ b/rust/rti/src/federate_info.rs @@ -61,11 +61,11 @@ impl FederateInfo { } } - pub fn e(&self) -> &SchedulingNode { + pub fn enclave(&self) -> &SchedulingNode { &self.enclave } - pub fn enclave(&mut self) -> &mut SchedulingNode { + pub fn enclave_mut(&mut self) -> &mut SchedulingNode { &mut self.enclave } diff --git a/rust/rti/src/lib.rs b/rust/rti/src/lib.rs index 7279f3b..337ad58 100644 --- a/rust/rti/src/lib.rs +++ b/rust/rti/src/lib.rs @@ -82,7 +82,7 @@ pub fn process_args(rti: &mut RTIRemote, argv: &[String]) -> Result<(), &'static return Err("Fail to parse a string to i64"); } }; - rti.base() + rti.base_mut() .set_number_of_scheduling_nodes(num_federates.try_into().unwrap()); // FIXME: panic if the converted value doesn't fit println!( "RTI: Number of federates: {}", @@ -176,13 +176,13 @@ pub fn initialize_federates(rti: &mut RTIRemote) { let mut federate = FederateInfo::new(); // FIXME: Handle "as u16" properly. initialize_federate(&mut federate, i as u16); - let scheduling_nodes: &mut Vec = rti.base().scheduling_nodes(); + let scheduling_nodes: &mut Vec = rti.base_mut().scheduling_nodes_mut(); scheduling_nodes.insert(i as usize, federate); } } fn initialize_federate(fed: &mut FederateInfo, id: u16) { - let enclave = fed.enclave(); + let enclave = fed.enclave_mut(); enclave.initialize_scheduling_node(id); // TODO: fed.set_in_transit_message_tags(); // TODO: fed.set_server_ip_addr(); @@ -213,5 +213,5 @@ pub fn start_rti_server(_f_rti: &mut RTIRemote) -> Result * Initialize the _RTI instance. */ pub fn initialize_rti() -> RTIRemote { - RTIRemote::new() + RTIRemote::new( } diff --git a/rust/rti/src/rti_common.rs b/rust/rti/src/rti_common.rs index 5f68b88..d18b984 100644 --- a/rust/rti/src/rti_common.rs +++ b/rust/rti/src/rti_common.rs @@ -21,7 +21,7 @@ use crate::SchedulingNodeState::*; use std::io::Write; use std::mem; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Condvar, Mutex, RwLock}; const IS_IN_ZERO_DELAY_CYCLE: i32 = 1; const IS_IN_CYCLE: i32 = 2; @@ -158,7 +158,11 @@ impl SchedulingNode { self.num_downstream } - pub fn min_delays(&mut self) -> &mut Vec { + pub fn min_delays(&self) -> &Vec { + &self.min_delays + } + + pub fn min_delays_mut(&mut self) -> &mut Vec { &mut self.min_delays } @@ -230,7 +234,7 @@ impl SchedulingNode { * This function assumes that the caller is holding the RTI mutex. */ pub fn update_scheduling_node_next_event_tag_locked( - _f_rti: Arc>, + _f_rti: Arc>, fed_id: u16, next_event_tag: Tag, start_time: Instant, @@ -239,11 +243,11 @@ impl SchedulingNode { let num_upstream; let number_of_scheduling_nodes; { - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); number_of_scheduling_nodes = locked_rti.base().number_of_scheduling_nodes(); let idx: usize = fed_id.into(); - let fed = &mut locked_rti.base().scheduling_nodes()[idx]; - let e = fed.enclave(); + let fed = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; + let e = fed.enclave_mut(); e.set_next_event(next_event_tag.clone()); num_upstream = e.num_upstream(); } @@ -266,10 +270,10 @@ impl SchedulingNode { sent_start_time.clone(), ); } else { - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); let idx: usize = fed_id.into(); - let fed = &mut locked_rti.base().scheduling_nodes()[idx]; - let e = fed.enclave(); + let fed = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; + let e = fed.enclave_mut(); e.set_last_granted(next_event_tag.clone()); } // Check downstream enclaves to see whether they should now be granted a TAG. @@ -294,7 +298,7 @@ impl SchedulingNode { * This assumes the caller holds the RTI mutex. */ fn notify_advance_grant_if_safe( - _f_rti: Arc>, + _f_rti: Arc>, fed_id: u16, number_of_enclaves: i32, start_time: Instant, @@ -350,7 +354,7 @@ impl SchedulingNode { * This function assumes that the caller holds the RTI mutex. */ fn tag_advance_grant_if_safe( - _f_rti: Arc>, + _f_rti: Arc>, fed_id: u16, // number_of_enclaves: i32, start_time: Instant, @@ -361,15 +365,15 @@ impl SchedulingNode { { let mut min_upstream_completed = Tag::forever_tag(); - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let scheduling_nodes = locked_rti.base().scheduling_nodes(); let idx: usize = fed_id.into(); - let e = scheduling_nodes[idx].e(); + let e = scheduling_nodes[idx].enclave(); let upstreams = e.upstream(); let upstream_delay = e.upstream_delay(); for j in 0..upstreams.len() { let delay = upstream_delay[j]; - let upstream = &scheduling_nodes[upstreams[j] as usize].e(); + let upstream = &scheduling_nodes[upstreams[j] as usize].enclave(); // Ignore this enclave if it no longer connected. if upstream.state() == SchedulingNodeState::NotConnected { continue; @@ -446,10 +450,10 @@ impl SchedulingNode { let last_provisionally_granted; let last_granted; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let scheduling_nodes = locked_rti.base().scheduling_nodes(); let idx: usize = fed_id.into(); - let e = scheduling_nodes[idx].e(); + let e = scheduling_nodes[idx].enclave(); next_event = e.next_event(); last_provisionally_granted = e.last_provisionally_granted(); last_granted = e.last_granted(); @@ -517,7 +521,7 @@ impl SchedulingNode { * upstream node. */ fn earliest_future_incoming_message_tag( - _f_rti: Arc>, + _f_rti: Arc>, fed_id: u16, start_time: Instant, ) -> Tag { @@ -526,7 +530,7 @@ impl SchedulingNode { // Update the shortest paths, if necessary. let is_first_time; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let scheduling_nodes = locked_rti.base().scheduling_nodes(); let idx: usize = fed_id.into(); is_first_time = scheduling_nodes[idx].enclave().min_delays().len() == 0; @@ -541,30 +545,30 @@ impl SchedulingNode { let mut t_d = Tag::forever_tag(); let num_min_delays; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let enclaves = locked_rti.base().scheduling_nodes(); let idx: usize = fed_id.into(); let fed: &FederateInfo = &enclaves[idx]; - let e = fed.e(); + let e = fed.enclave(); num_min_delays = e.num_min_delays(); } for i in 0..num_min_delays { let upstream_id; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let enclaves = locked_rti.base().scheduling_nodes(); let idx: usize = fed_id.into(); let fed: &FederateInfo = &enclaves[idx]; - let e = fed.e(); + let e = fed.enclave(); upstream_id = e.min_delays[i as usize].id() as usize; } let upstream_next_event; { // Node e->min_delays[i].id is upstream of e with min delay e->min_delays[i].min_delay. - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.base().scheduling_nodes(); - let fed: &mut FederateInfo = &mut enclaves[upstream_id]; - let upstream = fed.enclave(); + let mut locked_rti = _f_rti.write().unwrap(); + let fed: &mut FederateInfo = + &mut locked_rti.base_mut().scheduling_nodes_mut()[upstream_id]; + let upstream = fed.enclave_mut(); // If we haven't heard from the upstream node, then assume it can send an event at the start time. upstream_next_event = upstream.next_event(); if Tag::lf_tag_compare(&upstream_next_event, &Tag::never_tag()) == 0 { @@ -580,10 +584,9 @@ impl SchedulingNode { let min_delay; let earliest_tag_from_upstream; { - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.base().scheduling_nodes(); + let locked_rti = _f_rti.read().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut FederateInfo = &mut enclaves[idx]; + let fed = &locked_rti.base().scheduling_nodes()[idx]; let e = fed.enclave(); min_delay = e.min_delays()[i as usize].min_delay(); earliest_tag_from_upstream = Tag::lf_tag_add(&upstream_next_event, &min_delay); @@ -616,11 +619,11 @@ impl SchedulingNode { * updated only if they have not been previously updated or if invalidate_min_delays_upstream * has been called since they were last updated. */ - fn update_min_delays_upstream(_f_rti: Arc>, node_idx: u16) { + fn update_min_delays_upstream(_f_rti: Arc>, node_idx: u16) { let num_min_delays; let number_of_scheduling_nodes; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let scheduling_nodes = locked_rti.base().scheduling_nodes(); let idx: usize = node_idx.into(); num_min_delays = scheduling_nodes[idx].enclave().min_delays().len(); @@ -650,10 +653,10 @@ impl SchedulingNode { // Put the results onto the node's struct. { - let mut locked_rti = _f_rti.lock().unwrap(); - let scheduling_nodes = locked_rti.base().scheduling_nodes(); + let mut locked_rti = _f_rti.write().unwrap(); + let scheduling_nodes = locked_rti.base_mut().scheduling_nodes_mut(); let idx: usize = node_idx.into(); - let node = scheduling_nodes[idx].enclave(); + let node = scheduling_nodes[idx].enclave_mut(); node.set_num_min_delays(count); node.set_min_delays(Vec::new()); println!( @@ -674,10 +677,12 @@ impl SchedulingNode { } let min_delay = MinimumDelay::new(i, path_delays[i as usize].clone()); if node.min_delays().len() > k as usize { - let _ = - std::mem::replace(&mut node.min_delays()[k as usize], min_delay); + let _ = std::mem::replace( + &mut node.min_delays_mut()[k as usize], + min_delay, + ); } else { - node.min_delays().insert(k as usize, min_delay); + node.min_delays_mut().insert(k as usize, min_delay); } k = k + 1; // N^2 debug statement could be a problem with large benchmarks. @@ -694,10 +699,10 @@ impl SchedulingNode { } } - fn is_in_zero_delay_cycle(_f_rti: Arc>, fed_id: u16) -> bool { + fn is_in_zero_delay_cycle(_f_rti: Arc>, fed_id: u16) -> bool { let is_first_time; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let scheduling_nodes = locked_rti.base().scheduling_nodes(); let idx: usize = fed_id.into(); is_first_time = scheduling_nodes[idx].enclave().min_delays().len() == 0; @@ -707,10 +712,10 @@ impl SchedulingNode { } let flags; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let scheduling_nodes = locked_rti.base().scheduling_nodes(); let idx: usize = fed_id.into(); - let node = scheduling_nodes[idx].e(); + let node = scheduling_nodes[idx].enclave(); flags = node.flags() } (flags & IS_IN_ZERO_DELAY_CYCLE) != 0 @@ -725,17 +730,16 @@ impl SchedulingNode { * introducing a deadlock. This will return FOREVER_TAG if there are no non-ZDC upstream nodes. * @return The earliest possible incoming message tag from a non-ZDC upstream node. */ - fn eimt_strict(_f_rti: Arc>, fed_id: u16, start_time: Instant) -> Tag { + fn eimt_strict(_f_rti: Arc>, fed_id: u16, start_time: Instant) -> Tag { // Find the tag of the earliest possible incoming message from immediately upstream // enclaves or federates that are not part of a zero-delay cycle. // This will be the smallest upstream NET plus the least delay. // This could be NEVER_TAG if the RTI has not seen a NET from some upstream node. let num_upstream; { - let mut locked_rti = _f_rti.lock().unwrap(); - let scheduling_nodes = locked_rti.base().scheduling_nodes(); + let locked_rti = _f_rti.read().unwrap(); let idx: usize = fed_id.into(); - let e = scheduling_nodes[idx].e(); + let e = locked_rti.base().scheduling_nodes()[idx].enclave(); num_upstream = e.num_upstream(); } let mut t_d = Tag::forever_tag(); @@ -744,10 +748,10 @@ impl SchedulingNode { let upstream_delay; let next_event; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let scheduling_nodes = locked_rti.base().scheduling_nodes(); let idx: usize = fed_id.into(); - let e = scheduling_nodes[idx].e(); + let e = scheduling_nodes[idx].enclave(); // let upstreams = e.upstream(); // let upstream_id = upstreams[i] as usize; upstream_id = e.upstream()[i as usize] as usize; @@ -760,9 +764,9 @@ impl SchedulingNode { } // If we haven't heard from the upstream node, then assume it can send an event at the start time. if Tag::lf_tag_compare(&next_event, &Tag::never_tag()) == 0 { - let mut locked_rti = _f_rti.lock().unwrap(); - let scheduling_nodes = locked_rti.base().scheduling_nodes(); - let upstream = scheduling_nodes[upstream_id].enclave(); + let mut locked_rti = _f_rti.write().unwrap(); + let scheduling_nodes = locked_rti.base_mut().scheduling_nodes_mut(); + let upstream = scheduling_nodes[upstream_id].enclave_mut(); let start_tag = Tag::new(start_time, 0); upstream.set_next_event(start_tag); } @@ -814,18 +818,17 @@ impl SchedulingNode { * This function assumes that the caller holds the RTI mutex. */ fn notify_tag_advance_grant( - _f_rti: Arc>, + _f_rti: Arc>, fed_id: u16, tag: Tag, start_time: Instant, sent_start_time: Arc<(Mutex, Condvar)>, ) { { - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.base().scheduling_nodes(); + let locked_rti = _f_rti.read().unwrap(); let idx: usize = fed_id.into(); - let fed: &FederateInfo = &enclaves[idx]; - let e = fed.e(); + let fed: &FederateInfo = &locked_rti.base().scheduling_nodes()[idx]; + let e = fed.enclave(); if e.state() == SchedulingNodeState::NotConnected || Tag::lf_tag_compare(&tag, &e.last_granted()) <= 0 || Tag::lf_tag_compare(&tag, &e.last_provisionally_granted()) <= 0 @@ -859,10 +862,9 @@ impl SchedulingNode { // to fail. Consider a failure here a soft failure and update the federate's status. let mut error_occurred = false; { - let mut locked_rti = _f_rti.lock().unwrap(); - let scheduling_nodes = locked_rti.base().scheduling_nodes(); - let fed: &FederateInfo = &scheduling_nodes[fed_id as usize]; - let e = fed.e(); + let locked_rti = _f_rti.read().unwrap(); + let fed: &FederateInfo = &locked_rti.base().scheduling_nodes()[fed_id as usize]; + let e = fed.enclave(); let mut stream = fed.stream().as_ref().unwrap(); match stream.write(&buffer) { Ok(bytes_written) => { @@ -879,10 +881,10 @@ impl SchedulingNode { } } { - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); let mut_fed: &mut FederateInfo = - &mut locked_rti.base().scheduling_nodes()[fed_id as usize]; - let enclave = mut_fed.enclave(); + &mut locked_rti.base_mut().scheduling_nodes_mut()[fed_id as usize]; + let enclave = mut_fed.enclave_mut(); if error_occurred { enclave.set_state(SchedulingNodeState::NotConnected); // FIXME: We need better error handling, but don't stop other execution here. @@ -908,7 +910,7 @@ impl SchedulingNode { * This function assumes that the caller holds the RTI mutex. */ fn notify_provisional_tag_advance_grant( - _f_rti: Arc>, + _f_rti: Arc>, fed_id: u16, number_of_enclaves: i32, tag: Tag, @@ -916,11 +918,10 @@ impl SchedulingNode { sent_start_time: Arc<(Mutex, Condvar)>, ) { { - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.base().scheduling_nodes(); + let locked_rti = _f_rti.read().unwrap(); let idx: usize = fed_id.into(); - let fed: &FederateInfo = &enclaves[idx]; - let e = fed.e(); + let fed: &FederateInfo = &locked_rti.base().scheduling_nodes()[idx]; + let e = fed.enclave(); if e.state() == SchedulingNodeState::NotConnected || Tag::lf_tag_compare(&tag, &e.last_granted()) <= 0 || Tag::lf_tag_compare(&tag, &e.last_provisionally_granted()) <= 0 @@ -954,10 +955,10 @@ impl SchedulingNode { // to fail. Consider a failure here a soft failure and update the federate's status. let mut error_occurred = false; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let enclaves = locked_rti.base().scheduling_nodes(); let fed: &FederateInfo = &enclaves[fed_id as usize]; - let e = fed.e(); + let e = fed.enclave(); let mut stream = fed.stream().as_ref().unwrap(); match stream.write(&buffer) { Ok(bytes_written) => { @@ -975,10 +976,10 @@ impl SchedulingNode { } } { - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); let mut_fed: &mut FederateInfo = - &mut locked_rti.base().scheduling_nodes()[fed_id as usize]; - let enclave = mut_fed.enclave(); + &mut locked_rti.base_mut().scheduling_nodes_mut()[fed_id as usize]; + let enclave = mut_fed.enclave_mut(); if error_occurred { enclave.set_state(SchedulingNodeState::NotConnected); // FIXME: We need better error handling, but don't stop other execution here. @@ -1003,25 +1004,25 @@ impl SchedulingNode { // It's only needed for federates, which is why this is implemented here. let num_upstream; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let enclaves = locked_rti.base().scheduling_nodes(); let idx: usize = fed_id.into(); let fed: &FederateInfo = &enclaves[idx]; - let e = fed.e(); + let e = fed.enclave(); num_upstream = e.num_upstream(); } for j in 0..num_upstream { let e_id; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let enclaves = locked_rti.base().scheduling_nodes(); let idx: usize = fed_id.into(); let fed: &FederateInfo = &enclaves[idx]; - e_id = fed.e().upstream()[j as usize]; + e_id = fed.enclave().upstream()[j as usize]; let upstream: &FederateInfo = &enclaves[e_id as usize]; // Ignore this federate if it has resigned. - if upstream.e().state() == NotConnected { + if upstream.enclave().state() == NotConnected { continue; } } @@ -1048,7 +1049,7 @@ impl SchedulingNode { // Local function used recursively to find minimum delays upstream. // Return in count the number of non-FOREVER_TAG entries in path_delays[]. fn _update_min_delays_upstream( - _f_rti: Arc>, + _f_rti: Arc>, end_idx: i32, mut intermediate_idx: i32, path_delays: &mut Vec, @@ -1063,10 +1064,10 @@ impl SchedulingNode { delay_from_intermediate_so_far = path_delays[intermediate_idx as usize].clone(); } { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let fed: &FederateInfo = &locked_rti.base().scheduling_nodes()[intermediate_idx as usize]; - let intermediate = fed.e(); + let intermediate = fed.enclave(); if intermediate.state() == SchedulingNodeState::NotConnected { // Enclave or federate is not connected. // No point in checking upstream scheduling_nodes. @@ -1079,19 +1080,19 @@ impl SchedulingNode { // upstream nodes. let num_upstream; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let fed: &FederateInfo = &locked_rti.base().scheduling_nodes()[intermediate_idx as usize]; - let e = fed.e(); + let e = fed.enclave(); num_upstream = e.num_upstream(); } for i in 0..num_upstream { let upstream_id; let upstream_delay; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let scheduling_nodes = locked_rti.base().scheduling_nodes(); - let e = scheduling_nodes[intermediate_idx as usize].e(); + let e = scheduling_nodes[intermediate_idx as usize].enclave(); upstream_id = e.upstream[i as usize]; upstream_delay = e.upstream_delay[i as usize]; } @@ -1123,9 +1124,10 @@ impl SchedulingNode { count, ); } else { - let mut locked_rti = _f_rti.lock().unwrap(); - let scheduling_nodes = locked_rti.base().scheduling_nodes(); - let end: &mut SchedulingNode = scheduling_nodes[end_idx as usize].enclave(); + let mut locked_rti = _f_rti.write().unwrap(); + let end: &mut SchedulingNode = locked_rti.base_mut().scheduling_nodes_mut() + [end_idx as usize] + .enclave_mut(); // Found a cycle. end.set_flags(end.flags() | IS_IN_CYCLE); // Is it a zero-delay cycle? @@ -1149,7 +1151,7 @@ impl SchedulingNode { * This assumes the caller holds the RTI mutex. */ pub fn notify_downstream_advance_grant_if_safe( - _f_rti: Arc>, + _f_rti: Arc>, fed_id: u16, number_of_enclaves: i32, start_time: Instant, @@ -1159,20 +1161,19 @@ impl SchedulingNode { visited[fed_id as usize] = true; let num_downstream; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let idx: usize = fed_id.into(); - let fed: &FederateInfo = &locked_rti.base().scheduling_nodes()[idx]; - let e = fed.e(); + let fed = &locked_rti.base().scheduling_nodes()[idx]; + let e = fed.enclave(); num_downstream = e.num_downstream(); } for i in 0..num_downstream { let e_id; { - let mut locked_rti = _f_rti.lock().unwrap(); - let enclaves = locked_rti.base().scheduling_nodes(); + let locked_rti = _f_rti.read().unwrap(); let idx: usize = fed_id.into(); - let fed: &FederateInfo = &enclaves[idx]; - let downstreams = fed.e().downstream(); + let fed: &FederateInfo = &locked_rti.base().scheduling_nodes()[idx]; + let downstreams = fed.enclave().downstream(); // FIXME: Replace "as u16" properly. e_id = downstreams[i as usize] as u16; if visited[e_id as usize] { @@ -1198,7 +1199,7 @@ impl SchedulingNode { } pub fn _logical_tag_complete( - _f_rti: Arc>, + _f_rti: Arc>, fed_id: u16, number_of_enclaves: i32, start_time: Instant, @@ -1208,10 +1209,10 @@ impl SchedulingNode { // FIXME: Consolidate this message with NET to get NMR (Next Message Request). // Careful with handling startup and shutdown. { - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; - let enclave = fed.enclave(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; + let enclave = fed.enclave_mut(); enclave.set_completed(completed); println!( @@ -1225,19 +1226,19 @@ impl SchedulingNode { // Check downstream enclaves to see whether they should now be granted a TAG. let num_downstream; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let idx: usize = fed_id.into(); let fed: &FederateInfo = &locked_rti.base().scheduling_nodes()[idx]; - let e = fed.e(); + let e = fed.enclave(); num_downstream = e.num_downstream(); } for i in 0..num_downstream { let e_id; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let idx: usize = fed_id.into(); let fed: &FederateInfo = &locked_rti.base().scheduling_nodes()[idx]; - let downstreams = fed.e().downstream(); + let downstreams = fed.enclave().downstream(); // FIXME: Replace "as u16" properly. e_id = downstreams[i as usize] as u16; } @@ -1296,7 +1297,11 @@ impl RTICommon { } } - pub fn scheduling_nodes(&mut self) -> &mut Vec { + pub fn scheduling_nodes(&self) -> &Vec { + &self.scheduling_nodes + } + + pub fn scheduling_nodes_mut(&mut self) -> &mut Vec { &mut self.scheduling_nodes } diff --git a/rust/rti/src/rti_remote.rs b/rust/rti/src/rti_remote.rs index e884d27..e883923 100644 --- a/rust/rti/src/rti_remote.rs +++ b/rust/rti/src/rti_remote.rs @@ -114,7 +114,11 @@ impl RTIRemote { } } - pub fn base(&mut self) -> &mut RTICommon { + pub fn base(&self) -> &RTICommon { + &self.base + } + + pub fn base_mut(&mut self) -> &mut RTICommon { &mut self.base } diff --git a/rust/rti/src/server.rs b/rust/rti/src/server.rs index 283e906..574e9e1 100644 --- a/rust/rti/src/server.rs +++ b/rust/rti/src/server.rs @@ -9,7 +9,7 @@ use std::io::Write; use std::mem; use std::net::{Shutdown, TcpListener, TcpStream}; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Condvar, Mutex, RwLock}; use std::thread; use std::thread::JoinHandle; @@ -122,7 +122,7 @@ impl Server { .number_of_scheduling_nodes() .try_into() .unwrap(); - let arc_rti = Arc::new(Mutex::new(_f_rti)); + let arc_rti = Arc::new(RwLock::new(_f_rti)); let mut handle_list: Vec> = vec![]; for _i in 0..number_of_enclaves { let cloned_rti = Arc::clone(&arc_rti); @@ -160,9 +160,10 @@ impl Server { let _handle = thread::spawn(move || { // This closure is the implementation of federate_thread_TCP in rti_lib.c { - let mut locked_rti = cloned_rti.lock().unwrap(); + let mut locked_rti = cloned_rti.write().unwrap(); let fed: &mut FederateInfo = - &mut locked_rti.base().scheduling_nodes()[fed_id as usize]; + &mut locked_rti.base_mut().scheduling_nodes_mut() + [fed_id as usize]; fed.set_stream(stream.try_clone().unwrap()); } @@ -174,9 +175,9 @@ impl Server { // Listen for messages from the federate_info. loop { { - let mut locked_rti = cloned_rti.lock().unwrap(); + let locked_rti = cloned_rti.read().unwrap(); let enclaves = locked_rti.base().scheduling_nodes(); - let fed: &mut FederateInfo = &mut enclaves[fed_id as usize]; + let fed = &enclaves[fed_id as usize]; let enclave = fed.enclave(); if enclave.state() == SchedulingNodeState::NotConnected { break; @@ -193,10 +194,11 @@ impl Server { // Socket is closed println!("RTI: Socket to federate_info {} is closed. Exiting the thread.", fed_id); - let mut locked_rti = cloned_rti.lock().unwrap(); - let enclaves = locked_rti.base().scheduling_nodes(); + let mut locked_rti = cloned_rti.write().unwrap(); + let enclaves = locked_rti.base_mut().scheduling_nodes_mut(); let fed: &mut FederateInfo = &mut enclaves[fed_id as usize]; - fed.enclave().set_state(SchedulingNodeState::NotConnected); + fed.enclave_mut() + .set_state(SchedulingNodeState::NotConnected); // FIXME: We need better error handling here, but do not stop execution here. break; } @@ -276,10 +278,9 @@ impl Server { cloned_sent_start_time.clone(), ), _ => { - let mut locked_rti = cloned_rti.lock().unwrap(); - let fed: &mut FederateInfo = - &mut locked_rti.base().scheduling_nodes() - [fed_id as usize]; + let locked_rti = cloned_rti.read().unwrap(); + let fed = &locked_rti.base().scheduling_nodes() + [fed_id as usize]; println!("RTI received from federate_info {} an unrecognized TCP message type: {}.", fed.enclave().id(), buffer[0]); } } @@ -301,7 +302,7 @@ impl Server { println!("All federates have connected to RTI."); let cloned_rti = Arc::clone(&arc_rti); - let mut locked_rti = cloned_rti.lock().unwrap(); + let locked_rti = cloned_rti.read().unwrap(); let clock_sync_global_status = locked_rti.clock_sync_global_status(); if clock_sync_global_status >= ClockSyncStat::ClockSyncOn { // Create the thread that performs periodic PTP clock synchronization sessions @@ -328,7 +329,7 @@ impl Server { fn receive_and_check_fed_id_message( &mut self, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, ) -> i32 { // Buffer for message ID, federate_info ID, and federation ID length. let length = 1 + mem::size_of::() + 1; @@ -404,7 +405,7 @@ impl Server { let number_of_enclaves; let federation_id; { - let mut locked_rti = cloned_rti.lock().unwrap(); + let locked_rti = cloned_rti.read().unwrap(); number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); federation_id = locked_rti.federation_id(); } @@ -427,10 +428,9 @@ impl Server { Self::send_reject(stream, ErrType::FederateIdOutOfRange.to_byte()); return -1; } else { - let mut locked_rti = cloned_rti.lock().unwrap(); + let locked_rti = cloned_rti.read().unwrap(); let idx: usize = fed_id.into(); - let federate_info: &mut FederateInfo = - &mut locked_rti.base().scheduling_nodes()[idx]; + let federate_info = &locked_rti.base().scheduling_nodes()[idx]; let enclave = federate_info.enclave(); if enclave.state() != SchedulingNodeState::NotConnected { println!("RTI received duplicate federate_info ID: {}.", fed_id); @@ -450,11 +450,11 @@ impl Server { // because it is waiting for the start time to be // sent by the RTI before beginning its execution. { - let mut locked_rti = cloned_rti.lock().unwrap(); + let mut locked_rti = cloned_rti.write().unwrap(); let idx: usize = fed_id.into(); let federate_info: &mut FederateInfo = - &mut locked_rti.base().scheduling_nodes()[idx]; - let enclave: &mut SchedulingNode = federate_info.enclave(); + &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; + let enclave: &mut SchedulingNode = federate_info.enclave_mut(); enclave.set_state(SchedulingNodeState::Pending); } println!( @@ -498,14 +498,13 @@ impl Server { &mut self, fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, ) -> bool { println!( "RTI waiting for MsgType::NeighborStructure from federate_info {}.", fed_id ); let cloned_rti = Arc::clone(&_f_rti); - let mut locked_rti = cloned_rti.lock().unwrap(); let mut connection_info_header = vec![0 as u8; MSG_TYPE_NEIGHBOR_STRUCTURE_HEADER_SIZE.try_into().unwrap()]; NetUtil::read_from_socket_fail_on_error( @@ -521,8 +520,9 @@ impl Server { return false; } else { let idx: usize = fed_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; - let enclave: &mut SchedulingNode = fed.enclave(); + let mut locked_rti = cloned_rti.write().unwrap(); + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; + let enclave: &mut SchedulingNode = fed.enclave_mut(); // Read the number of upstream and downstream connections enclave.set_num_upstream(connection_info_header[1].into()); enclave.set_num_downstream(connection_info_header[1 + mem::size_of::()].into()); @@ -599,7 +599,7 @@ impl Server { &mut self, fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, ) -> bool { // Read the MsgType::UdpPort message from the federate_info regardless of the status of // clock synchronization. This message will tell the RTI whether the federate_info @@ -623,7 +623,7 @@ impl Server { let cloned_rti = Arc::clone(&_f_rti); let clock_sync_global_status; { - let locked_rti = cloned_rti.lock().unwrap(); + let locked_rti = cloned_rti.read().unwrap(); clock_sync_global_status = locked_rti.clock_sync_global_status(); } @@ -656,9 +656,10 @@ impl Server { } } else { // Disable clock sync after initial round. - let mut locked_rti = cloned_rti.lock().unwrap(); + let mut locked_rti = cloned_rti.write().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let fed: &mut FederateInfo = + &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; fed.set_clock_synchronization_enabled(false); } } else { @@ -666,9 +667,9 @@ impl Server { // Clock synchronization is universally disabled via the clock-sync command-line parameter // (-c off was passed to the RTI). // Note that the federates are still going to send a MSG_TYPE_UdpPort message but with a payload (port) of -1. - let mut locked_rti = cloned_rti.lock().unwrap(); + let mut locked_rti = cloned_rti.write().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; fed.set_clock_synchronization_enabled(false); } } @@ -682,7 +683,7 @@ impl Server { fn handle_timestamp( fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, received_start_times: Arc<(Mutex, Condvar)>, sent_start_time: Arc<(Mutex, Condvar)>, @@ -701,7 +702,7 @@ impl Server { let mut num_feds_proposed_start; let number_of_enclaves; { - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); let max_start_time = locked_rti.max_start_time(); num_feds_proposed_start = locked_rti.num_feds_proposed_start(); @@ -729,7 +730,7 @@ impl Server { notified = condvar.wait(notified).unwrap(); } { - let locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); num_feds_proposed_start = locked_rti.num_feds_proposed_start(); } } @@ -742,7 +743,7 @@ impl Server { // Add an offset to this start time to get everyone starting together. let max_start_time; { - let locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); max_start_time = locked_rti.max_start_time(); } let mut locked_start_time = start_time.lock().unwrap(); @@ -751,9 +752,9 @@ impl Server { NetUtil::encode_int64(locked_start_time.start_time(), &mut start_time_buffer, 1); { - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); let idx: usize = fed_id.into(); - let my_fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let my_fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; let stream = my_fed.stream().as_ref().unwrap(); let bytes_written = NetUtil::write_to_stream(stream, &start_time_buffer, fed_id); if bytes_written < MSG_TYPE_TIMESTAMP_LENGTH { @@ -766,7 +767,7 @@ impl Server { // Update state for the federate_info to indicate that the MSG_TYPE_Timestamp // message has been sent. That MSG_TYPE_Timestamp message grants time advance to // the federate_info to the start time. - my_fed.enclave().set_state(SchedulingNodeState::Granted); + my_fed.enclave_mut().set_state(SchedulingNodeState::Granted); let sent_start_time_notifier = Arc::clone(&sent_start_time); let (lock, condvar) = &*sent_start_time_notifier; let mut notified = lock.lock().unwrap(); @@ -792,7 +793,7 @@ impl Server { */ fn handle_federate_resign( fed_id: u16, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, sent_start_time: Arc<(Mutex, Condvar)>, ) { @@ -801,20 +802,20 @@ impl Server { println!("FederateInfo {} has resigned.", fed_id); { - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); let idx: usize = fed_id.into(); - let my_fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let my_fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; my_fed - .enclave() + .enclave_mut() .set_state(SchedulingNodeState::NotConnected); } // Indicate that there will no further events from this federate_info. { - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); let idx: usize = fed_id.into(); - let my_fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; - my_fed.enclave().set_next_event(Tag::forever_tag()); + let my_fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; + my_fed.enclave_mut().set_next_event(Tag::forever_tag()); } // According to this: https://stackoverflow.com/questions/4160347/close-vs-shutdown-socket, @@ -822,9 +823,9 @@ impl Server { // Here, we just signal the other side that no further writes to the socket are // forthcoming, which should result in the other end getting a zero-length reception. { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let idx: usize = fed_id.into(); - let my_fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let my_fed = &locked_rti.base().scheduling_nodes()[idx]; my_fed .stream() .as_ref() @@ -845,7 +846,7 @@ impl Server { // track of which upstream federates have been visited. let number_of_enclaves; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); } let start_time_value; @@ -868,7 +869,7 @@ impl Server { message_type: u8, fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, sent_start_time: Arc<(Mutex, Condvar)>, ) { @@ -947,12 +948,12 @@ impl Server { // Need to acquire the mutex lock to ensure that the thread handling // messages coming from the socket connected to the destination does not // issue a TAG before this message has been forwarded. - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); // If the destination federate_info is no longer connected, issue a warning // and return. let idx: usize = federate_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let fed = &locked_rti.base().scheduling_nodes()[idx]; let enclave = fed.enclave(); next_event = enclave.next_event(); if enclave.state() == SchedulingNodeState::NotConnected { @@ -984,9 +985,9 @@ impl Server { // Need to make sure that the destination federate_info's thread has already // sent the starting MsgType::Timestamp message. { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let idx: usize = federate_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let fed = &locked_rti.base().scheduling_nodes()[idx]; while fed.enclave().state() == SchedulingNodeState::Pending { // Need to wait here. let (lock, condvar) = &*sent_start_time; @@ -1034,9 +1035,9 @@ impl Server { // holding the rti_mutex might be very expensive. Instead, each outgoing // socket should probably have its own mutex. { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let idx: usize = federate_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let fed = &locked_rti.base().scheduling_nodes()[idx]; // FIXME: Handle unwrap properly. let destination_stream = fed.stream().as_ref().unwrap(); NetUtil::write_to_socket_fail_on_error( @@ -1051,9 +1052,9 @@ impl Server { // Record this in-transit message in federate_info's in-transit message queue. if Tag::lf_tag_compare(&completed, &intended_tag) < 0 { // Add a record of this message to the list of in-transit messages to this federate_info. - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); let idx: usize = federate_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; // TODO: Replace 'MessageRecord::add_in_transit_message_record()' into 'pqueue_tag_insert_if_no_match()'. MessageRecord::add_in_transit_message_record( fed.in_transit_message_tags(), @@ -1092,7 +1093,7 @@ impl Server { } fn update_federate_next_event_tag_locked( - _f_rti: Arc>, + _f_rti: Arc>, fed_id: u16, mut next_event_tag: Tag, start_time: Instant, @@ -1100,9 +1101,9 @@ impl Server { ) { let min_in_transit_tag; { - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; min_in_transit_tag = MessageRecord::get_minimum_in_transit_message_tag( fed.in_transit_message_tags(), start_time, @@ -1123,7 +1124,7 @@ impl Server { fn handle_next_event_tag( fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, sent_start_time: Arc<(Mutex, Condvar)>, ) { @@ -1139,9 +1140,9 @@ impl Server { // message is in transport or being used to determine a TAG. let enclave_id; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let fed = &locked_rti.base().scheduling_nodes()[idx]; enclave_id = fed.enclave().id(); } let intended_tag = NetUtil::extract_tag( @@ -1172,7 +1173,7 @@ impl Server { fn handle_latest_tag_complete( fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, sent_start_time: Arc<(Mutex, Condvar)>, ) { @@ -1190,7 +1191,7 @@ impl Server { ); let number_of_enclaves; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); } let start_time_value; @@ -1209,9 +1210,9 @@ impl Server { // See if we can remove any of the recorded in-transit messages for this. { - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; let in_transit_message_tags = fed.in_transit_message_tags(); // TODO: Replace 'MessageRecord::clean_in_transit_message_record_up_to_tag()' into 'pqueue_tag_remove_up_to()'. MessageRecord::clean_in_transit_message_record_up_to_tag( @@ -1225,7 +1226,7 @@ impl Server { fn handle_stop_request_message( fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, stop_granted: Arc>, ) { @@ -1262,9 +1263,9 @@ impl Server { // message is in transport or being used to determine a TAG. let requested_stop; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let fed = &locked_rti.base().scheduling_nodes()[idx]; requested_stop = fed.requested_stop(); } // Check whether we have already received a stop_tag @@ -1273,7 +1274,7 @@ impl Server { // If stop request messages have already been broadcast, treat this as if it were a reply. let stop_in_progress; { - let locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); stop_in_progress = locked_rti.stop_in_progress(); } if stop_in_progress { @@ -1289,10 +1290,10 @@ impl Server { // Update the maximum stop tag received from federates { - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); if Tag::lf_tag_compare(&proposed_stop_tag, &locked_rti.base().max_stop_tag()) > 0 { locked_rti - .base() + .base_mut() .set_max_stop_tag(proposed_stop_tag.clone()); } } @@ -1312,7 +1313,7 @@ impl Server { // also issued a stop request. let mut stop_request_buffer = vec![0 as u8; MSG_TYPE_STOP_REQUEST_LENGTH]; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); Self::encode_stop_request( &mut stop_request_buffer, locked_rti.base().max_stop_tag().time(), @@ -1323,7 +1324,7 @@ impl Server { // Iterate over federates and send each the MSG_TYPE_StopRequest message // if we do not have a stop_time already for them. Do not do this more than once. { - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); if locked_rti.stop_in_progress() { return; } @@ -1335,16 +1336,16 @@ impl Server { let number_of_enclaves; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); } for i in 0..number_of_enclaves { - let mut locked_rti = _f_rti.lock().unwrap(); - let f: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[i as usize]; - if f.e().id() != fed_id && f.requested_stop() == false { - if f.e().state() == SchedulingNodeState::NotConnected { + let locked_rti = _f_rti.read().unwrap(); + let f = &locked_rti.base().scheduling_nodes()[i as usize]; + if f.enclave().id() != fed_id && f.requested_stop() == false { + if f.enclave().state() == SchedulingNodeState::NotConnected { Self::mark_federate_requesting_stop( - f.e().id(), + f.enclave().id(), _f_rti.clone(), stop_granted.clone(), start_time_value, @@ -1356,13 +1357,13 @@ impl Server { NetUtil::write_to_socket_fail_on_error( stream, &stop_request_buffer, - f.e().id(), + f.enclave().id(), "MsgType::StopRequest message", ); } } { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); println!( "RTI forwarded to federates MsgType::StopRequest with tag ({}, {}).", locked_rti.base().max_stop_tag().time() - start_time_value, @@ -1378,31 +1379,31 @@ impl Server { */ fn mark_federate_requesting_stop( fed_id: u16, - _f_rti: Arc>, + _f_rti: Arc>, stop_granted: Arc>, start_time_value: Instant, ) -> bool { let mut num_enclaves_handling_stop; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); num_enclaves_handling_stop = locked_rti.base().num_scheduling_nodes_handling_stop(); } { - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; if !fed.requested_stop() { // Assume that the federate_info // has requested stop locked_rti - .base() + .base_mut() .set_num_scheduling_nodes_handling_stop(num_enclaves_handling_stop + 1); } } { - let mut locked_rti = _f_rti.lock().unwrap(); + let mut locked_rti = _f_rti.write().unwrap(); let idx: usize = fed_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx]; if !fed.requested_stop() { // Assume that the federate_info // has requested stop @@ -1411,7 +1412,7 @@ impl Server { } let number_of_enclaves; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); num_enclaves_handling_stop = locked_rti.base().num_scheduling_nodes_handling_stop(); number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); } @@ -1437,7 +1438,7 @@ impl Server { * This function assumes the caller holds the _RTI.rti_mutex lock. */ fn _lf_rti_broadcast_stop_time_to_federates_locked( - _f_rti: Arc>, + _f_rti: Arc>, stop_granted: Arc>, start_time_value: Instant, ) { @@ -1450,7 +1451,7 @@ impl Server { // Reply with a stop granted to all federates let mut outgoing_buffer = vec![0 as u8; MSG_TYPE_STOP_GRANTED_LENGTH]; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); Self::encode_stop_granted( &mut outgoing_buffer, locked_rti.base().max_stop_tag().time(), @@ -1460,7 +1461,7 @@ impl Server { let number_of_enclaves; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); } // Iterate over federates and send each the message. @@ -1468,39 +1469,40 @@ impl Server { let next_event; let max_stop_tag; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); max_stop_tag = locked_rti.base().max_stop_tag(); // FIXME: Handle usize properly. let fed: &FederateInfo = &locked_rti.base().scheduling_nodes()[i as usize]; - next_event = fed.e().next_event(); - if fed.e().state() == SchedulingNodeState::NotConnected { + next_event = fed.enclave().next_event(); + if fed.enclave().state() == SchedulingNodeState::NotConnected { continue; } } { - let mut locked_rti = _f_rti.lock().unwrap(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[i as usize]; + let mut locked_rti = _f_rti.write().unwrap(); + let fed: &mut FederateInfo = + &mut locked_rti.base_mut().scheduling_nodes_mut()[i as usize]; if Tag::lf_tag_compare(&next_event, &max_stop_tag) >= 0 { // Need the next_event to be no greater than the stop tag. - fed.enclave().set_next_event(max_stop_tag); + fed.enclave_mut().set_next_event(max_stop_tag); } } { - let mut locked_rti = _f_rti.lock().unwrap(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[i as usize]; + let locked_rti = _f_rti.read().unwrap(); + let fed = &locked_rti.base().scheduling_nodes()[i as usize]; // FIXME: Handle unwrap properly. let stream = fed.stream().as_ref().unwrap(); NetUtil::write_to_socket_fail_on_error( stream, &outgoing_buffer, - fed.e().id(), + fed.enclave().id(), "MsgType::StopGranted message", ); } } { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); println!( "RTI sent to federates MsgType::StopGranted with tag ({}, {}).", locked_rti.base().max_stop_tag().time() - start_time_value, @@ -1538,7 +1540,7 @@ impl Server { fn handle_stop_request_reply( fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, stop_granted: Arc>, ) { @@ -1572,12 +1574,12 @@ impl Server { // If the federate_info has not requested stop before, count the reply let max_stop_tag; { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); max_stop_tag = locked_rti.base().max_stop_tag(); } if Tag::lf_tag_compare(&federate_stop_tag, &max_stop_tag) > 0 { - let mut locked_rti = _f_rti.lock().unwrap(); - locked_rti.base().set_max_stop_tag(federate_stop_tag); + let mut locked_rti = _f_rti.write().unwrap(); + locked_rti.base_mut().set_max_stop_tag(federate_stop_tag); } Self::mark_federate_requesting_stop( fed_id, @@ -1591,7 +1593,7 @@ impl Server { buffer: &Vec, fed_id: u16, stream: &mut TcpStream, - _f_rti: Arc>, + _f_rti: Arc>, start_time: Arc>, sent_start_time: Arc<(Mutex, Condvar)>, ) { @@ -1632,12 +1634,12 @@ impl Server { // messages coming from the socket connected to the destination does not // issue a TAG before this message has been forwarded. { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); // If the destination federate_info is no longer connected, issue a warning // and return. let idx: usize = federate_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let fed = &locked_rti.base().scheduling_nodes()[idx]; let enclave = fed.enclave(); if enclave.state() == SchedulingNodeState::NotConnected { println!( @@ -1665,9 +1667,10 @@ impl Server { // Need to make sure that the destination federate_info's thread has already // sent the starting MsgType::Timestamp message. { - let mut locked_rti = _f_rti.lock().unwrap(); + let locked_rti = _f_rti.read().unwrap(); let idx: usize = federate_id.into(); - let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + // let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; + let fed = &locked_rti.base().scheduling_nodes()[idx]; while fed.enclave().state() == SchedulingNodeState::Pending { // Need to wait here. let (lock, condvar) = &*sent_start_time;