From 0ba4effe7431fb7e65b2194074586479ab5ba453 Mon Sep 17 00:00:00 2001 From: Chanhee Lee Date: Mon, 19 Feb 2024 16:56:39 -0700 Subject: [PATCH] Enable to log tracing information for RTI --- rust/rti/Cargo.toml | 1 + rust/rti/src/lib.rs | 18 ++ rust/rti/src/rti_common.rs | 40 ++- rust/rti/src/server.rs | 280 +++++++++++++--- rust/rti/src/tag.rs | 40 +++ rust/rti/src/trace.rs | 632 +++++++++++++++++++++++++++++++++++++ 6 files changed, 969 insertions(+), 42 deletions(-) create mode 100644 rust/rti/src/trace.rs diff --git a/rust/rti/Cargo.toml b/rust/rti/Cargo.toml index 720c8a4..a6b39cf 100644 --- a/rust/rti/Cargo.toml +++ b/rust/rti/Cargo.toml @@ -8,3 +8,4 @@ edition = "2021" [dependencies] byteorder = "1" priority-queue = "1.3.2" +zerocopy = { version = "0.7.32", features = ["derive"] } diff --git a/rust/rti/src/lib.rs b/rust/rti/src/lib.rs index 74eeb27..cf590e4 100644 --- a/rust/rti/src/lib.rs +++ b/rust/rti/src/lib.rs @@ -18,6 +18,7 @@ mod net_common; mod net_util; mod server; mod tag; +mod trace; use std::error::Error; @@ -25,9 +26,12 @@ use crate::constants::*; use crate::federate_info::*; use crate::rti_common::*; use crate::rti_remote::*; +use crate::trace::Trace; use server::Server; +const RTI_TRACE_FILE_NAME: &str = "rti.lft"; + #[derive(PartialEq, PartialOrd, Clone)] pub enum ClockSyncStat { ClockSyncOff, @@ -124,6 +128,8 @@ pub fn process_args(rti: &mut RTIRemote, argv: &[String]) -> Result<(), &'static } idx += 1; // TODO: idx += process_clock_sync_args(); + } else if arg == "-t" || arg == "--tracing" { + rti.base_mut().set_tracing_enabled(true); } else if arg == " " { // Tolerate spaces continue; @@ -162,6 +168,7 @@ fn usage(argc: usize, argv: &[String]) { println!(" (period in nanoseconds, default is 5 msec). Only applies to 'on'."); println!(" - exchanges-per-interval : Controls the number of messages that are exchanged for each"); println!(" clock sync attempt (default is 10). Applies to 'init' and 'on'."); + println!(" -t, --tracing Turn on tracing."); println!("Command given:"); let mut idx = 0; @@ -172,6 +179,17 @@ fn usage(argc: usize, argv: &[String]) { } pub fn initialize_federates(rti: &mut RTIRemote) { + if rti.base().tracing_enabled() { + let _lf_number_of_workers = rti.base().number_of_scheduling_nodes(); + rti.base_mut() + .set_trace(Trace::trace_new(RTI_TRACE_FILE_NAME)); + Trace::start_trace( + rti.base_mut().trace(), + _lf_number_of_workers.try_into().unwrap(), + ); + println!("Tracing the RTI execution in {} file.", RTI_TRACE_FILE_NAME); + } + for i in 0..rti.base().number_of_scheduling_nodes() { let mut federate = FederateInfo::new(); // FIXME: Handle "as u16" properly. diff --git a/rust/rti/src/rti_common.rs b/rust/rti/src/rti_common.rs index d18b984..b274555 100644 --- a/rust/rti/src/rti_common.rs +++ b/rust/rti/src/rti_common.rs @@ -16,6 +16,7 @@ use crate::net_util::NetUtil; use crate::rti_remote::RTIRemote; use crate::tag; use crate::tag::{Instant, Interval, Tag, FOREVER}; +use crate::trace::{Trace, TraceDirection, TraceEvent}; use crate::FederateInfo; use crate::SchedulingNodeState::*; @@ -827,7 +828,7 @@ impl SchedulingNode { { let locked_rti = _f_rti.read().unwrap(); let idx: usize = fed_id.into(); - let fed: &FederateInfo = &locked_rti.base().scheduling_nodes()[idx]; + let fed = &locked_rti.base().scheduling_nodes()[idx]; let e = fed.enclave(); if e.state() == SchedulingNodeState::NotConnected || Tag::lf_tag_compare(&tag, &e.last_granted()) <= 0 @@ -857,6 +858,14 @@ impl SchedulingNode { 1 + mem::size_of::(), ); + Trace::log_trace( + _f_rti.clone(), + TraceEvent::SendTag, + fed_id, + &tag, + start_time, + TraceDirection::To, + ); // This function is called in notify_advance_grant_if_safe(), which is a long // function. During this call, the socket might close, causing the following write_to_socket // to fail. Consider a failure here a soft failure and update the federate's status. @@ -950,6 +959,14 @@ impl SchedulingNode { 1 + mem::size_of::(), ); + Trace::log_trace( + _f_rti.clone(), + TraceEvent::SendPTag, + fed_id, + &tag, + start_time, + TraceDirection::To, + ); // This function is called in notify_advance_grant_if_safe(), which is a long // function. During this call, the socket might close, causing the following write_to_socket // to fail. Consider a failure here a soft failure and update the federate's status. @@ -1279,9 +1296,9 @@ pub struct RTICommon { // Boolean indicating that tracing is enabled. tracing_enabled: bool, - // Pointer to a tracing object - // TODO: trace_t* trace; + // Pointer to a tracing object + trace: Trace, // The RTI mutex for making thread-safe access to the shared state. // TODO: lf_mutex_t* mutex; } @@ -1294,6 +1311,7 @@ impl RTICommon { max_stop_tag: Tag::never_tag(), num_scheduling_nodes_handling_stop: 0, tracing_enabled: false, + trace: Trace::trace_new(""), } } @@ -1317,6 +1335,14 @@ impl RTICommon { self.num_scheduling_nodes_handling_stop } + pub fn tracing_enabled(&self) -> bool { + self.tracing_enabled + } + + pub fn trace(&mut self) -> &mut Trace { + &mut self.trace + } + pub fn set_max_stop_tag(&mut self, max_stop_tag: Tag) { self.max_stop_tag = max_stop_tag.clone(); } @@ -1331,6 +1357,14 @@ impl RTICommon { ) { self.num_scheduling_nodes_handling_stop = num_scheduling_nodes_handling_stop; } + + pub fn set_tracing_enabled(&mut self, tracing_enabled: bool) { + self.tracing_enabled = tracing_enabled; + } + + pub fn set_trace(&mut self, trace: Trace) { + self.trace = trace; + } } struct TagAdvanceGrant { diff --git a/rust/rti/src/server.rs b/rust/rti/src/server.rs index 574e9e1..10ef891 100644 --- a/rust/rti/src/server.rs +++ b/rust/rti/src/server.rs @@ -2,7 +2,7 @@ * @file * @author Hokeun Kim (hokeun@asu.edu) * @author Chanhee Lee (chanheel@asu.edu) - * @copyright (c) 2023, Arizona State University + * @copyright (c) 2023-2024, Arizona State University * License in [BSD 2-clause](..) * @brief .. */ @@ -19,11 +19,14 @@ use crate::net_common::*; use crate::net_util::*; use crate::tag; use crate::tag::*; +use crate::trace::TraceDirection; +use crate::trace::TraceEvent; use crate::ClockSyncStat; use crate::FederateInfo; use crate::RTIRemote; use crate::SchedulingNode; use crate::SchedulingNodeState; +use crate::Trace; struct StopGranted { _lf_rti_stop_granted_already_sent_to_federates: bool, @@ -282,6 +285,14 @@ impl Server { let fed = &locked_rti.base().scheduling_nodes() [fed_id as usize]; println!("RTI received from federate_info {} an unrecognized TCP message type: {}.", fed.enclave().id(), buffer[0]); + Trace::log_trace( + cloned_rti.clone(), + TraceEvent::ReceiveUnidentified, + fed_id.try_into().unwrap(), + &Tag::forever_tag(), + i64::MAX, + TraceDirection::From, + ); } } } @@ -340,10 +351,18 @@ impl Server { NetUtil::read_from_socket_fail_on_error(stream, &mut first_buffer, 0, ""); // Initialize to an invalid value. - let fed_id; + let mut fed_id = u16::MAX; // First byte received is the message type. if first_buffer[0] != MsgType::FedIds.to_byte() { + Trace::log_trace( + _f_rti.clone(), + TraceEvent::SendReject, + fed_id, + &Tag::forever_tag(), + i64::MIN, + TraceDirection::To, + ); if first_buffer[0] == MsgType::P2pSendingFedId.to_byte() || first_buffer[0] == MsgType::P2pTaggedMessage.to_byte() { @@ -401,6 +420,7 @@ impl Server { } println!("RTI received federation ID: {}.", federation_id_received); + let cloned_rti = Arc::clone(&_f_rti); let number_of_enclaves; let federation_id; @@ -409,6 +429,14 @@ impl Server { number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); federation_id = locked_rti.federation_id(); } + Trace::log_trace( + cloned_rti.clone(), + TraceEvent::ReceiveFedId, + fed_id, + &Tag::forever_tag(), + i64::MIN, + TraceDirection::From, + ); // Compare the received federation ID to mine. if federation_id_received != federation_id { // Federation IDs do not match. Send back a MSG_TYPE_Reject message. @@ -416,6 +444,14 @@ impl Server { "WARNING: FederateInfo from another federation {} attempted to connect to RTI in federation {}.", federation_id_received, federation_id ); + Trace::log_trace( + cloned_rti.clone(), + TraceEvent::SendReject, + fed_id, + &Tag::forever_tag(), + 0, + TraceDirection::To, + ); Self::send_reject(stream, ErrType::FederationIdDoesNotMatch.to_byte()); return -1; } else { @@ -425,15 +461,34 @@ impl Server { "RTI received federate_info ID {}, which is out of range.", fed_id ); + Trace::log_trace( + cloned_rti.clone(), + TraceEvent::SendReject, + fed_id, + &Tag::forever_tag(), + i64::MIN, + TraceDirection::To, + ); Self::send_reject(stream, ErrType::FederateIdOutOfRange.to_byte()); return -1; } else { - let locked_rti = cloned_rti.read().unwrap(); - let idx: usize = fed_id.into(); - let federate_info = &locked_rti.base().scheduling_nodes()[idx]; - let enclave = federate_info.enclave(); - if enclave.state() != SchedulingNodeState::NotConnected { + let state; + { + let locked_rti = cloned_rti.read().unwrap(); + let idx: usize = fed_id.into(); + let federate_info = &locked_rti.base().scheduling_nodes()[idx]; + state = federate_info.enclave().state(); + } + if state != SchedulingNodeState::NotConnected { println!("RTI received duplicate federate_info ID: {}.", fed_id); + Trace::log_trace( + cloned_rti.clone(), + TraceEvent::SendReject, + fed_id, + &Tag::forever_tag(), + i64::MIN, + TraceDirection::To, + ); Self::send_reject(stream, ErrType::FederateIdInUse.to_byte()); return -1; } @@ -462,6 +517,14 @@ impl Server { fed_id ); // Send an MsgType::Ack message. + Trace::log_trace( + cloned_rti.clone(), + TraceEvent::SendAck, + fed_id, + &Tag::forever_tag(), + i64::MIN, + TraceDirection::To, + ); let ack_message: Vec = vec![MsgType::Ack.to_byte()]; NetUtil::write_to_socket_fail_on_error( stream, @@ -697,6 +760,14 @@ impl Server { // FIXME: Check whether swap_bytes_if_big_endian_int64() is implemented correctly let timestamp = i64::from_le_bytes(buffer.try_into().unwrap()); + Trace::log_trace( + _f_rti.clone(), + TraceEvent::ReceiveTimestamp, + fed_id, + &Tag::new(timestamp, 0), + timestamp, + TraceDirection::From, + ); println!("RTI received timestamp message with time: {} .", timestamp); let mut num_feds_proposed_start; @@ -749,8 +820,17 @@ impl Server { let mut locked_start_time = start_time.lock().unwrap(); locked_start_time.set_start_time(max_start_time + net_common::DELAY_START); // TODO: Consider swap_bytes_if_big_endian_int64() - NetUtil::encode_int64(locked_start_time.start_time(), &mut start_time_buffer, 1); + let start_time = locked_start_time.start_time(); + NetUtil::encode_int64(start_time, &mut start_time_buffer, 1); + Trace::log_trace( + _f_rti.clone(), + TraceEvent::SendTimestamp, + fed_id, + &Tag::new(start_time, 0), + start_time, + TraceDirection::To, + ); { let mut locked_rti = _f_rti.write().unwrap(); let idx: usize = fed_id.into(); @@ -799,6 +879,20 @@ impl Server { ) { // Nothing more to do. Close the socket and exit. + let start_time_value; + { + let locked_start_time = start_time.lock().unwrap(); + start_time_value = locked_start_time.start_time(); + } + Trace::log_trace( + _f_rti.clone(), + TraceEvent::ReceiveResign, + fed_id, + &Tag::forever_tag(), + start_time_value, + TraceDirection::From, + ); + println!("FederateInfo {} has resigned.", fed_id); { @@ -849,11 +943,6 @@ impl Server { let locked_rti = _f_rti.read().unwrap(); number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); } - let start_time_value; - { - let locked_start_time = start_time.lock().unwrap(); - start_time_value = locked_start_time.start_time(); - } let mut visited = vec![false as bool; number_of_enclaves as usize]; // Initializes to 0. SchedulingNode::notify_downstream_advance_grant_if_safe( _f_rti.clone(), @@ -863,6 +952,18 @@ impl Server { &mut visited, sent_start_time, ); + + // TODO: Move the below tracing into proper position. + { + let mut locked_rti = _f_rti.write().unwrap(); + if locked_rti.base().tracing_enabled() { + // No need for a mutex lock because all threads have exited. + if Trace::stop_trace_locked(locked_rti.base_mut().trace(), start_time_value) == true + { + println!("RTI trace file saved."); + } + } + } } fn handle_timed_message( @@ -942,6 +1043,15 @@ impl Server { // Following only works for string messages. // println!("Message received by RTI: {}.", buffer + header_size); + Trace::log_trace( + _f_rti.clone(), + TraceEvent::ReceiveTaggedMsg, + fed_id, + &intended_tag, + start_time_value, + TraceDirection::From, + ); + let completed; let next_event; { @@ -996,7 +1106,21 @@ impl Server { notified = condvar.wait(notified).unwrap(); } } + } + + Trace::log_trace( + _f_rti.clone(), + TraceEvent::ReceiveTaggedMsg, + fed_id, + &intended_tag, + start_time_value, + TraceDirection::To, + ); + { + let locked_rti = _f_rti.read().unwrap(); + let idx: usize = federate_id.into(); + let fed = &locked_rti.base().scheduling_nodes()[idx]; // FIXME: Handle unwrap properly. let destination_stream = fed.stream().as_ref().unwrap(); let mut result_buffer = vec![0 as u8; 1]; @@ -1155,6 +1279,14 @@ impl Server { let locked_start_time = start_time.lock().unwrap(); start_time_value = locked_start_time.start_time(); } + Trace::log_trace( + _f_rti.clone(), + TraceEvent::ReceiveNet, + fed_id, + &intended_tag, + start_time_value, + TraceDirection::From, + ); println!( "RTI received from federate_info {} the Next Event Tag (NET) ({},{})", enclave_id, @@ -1189,16 +1321,24 @@ impl Server { .try_into() .unwrap(), ); - let number_of_enclaves; - { - let locked_rti = _f_rti.read().unwrap(); - number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); - } let start_time_value; { let locked_start_time = start_time.lock().unwrap(); start_time_value = locked_start_time.start_time(); } + Trace::log_trace( + _f_rti.clone(), + TraceEvent::ReceiveLtc, + fed_id, + &completed, + start_time_value, + TraceDirection::From, + ); + let number_of_enclaves; + { + let locked_rti = _f_rti.read().unwrap(); + number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); + } SchedulingNode::_logical_tag_complete( _f_rti.clone(), fed_id, @@ -1252,6 +1392,15 @@ impl Server { let locked_start_time = start_time.lock().unwrap(); start_time_value = locked_start_time.start_time(); } + Trace::log_trace( + _f_rti.clone(), + TraceEvent::ReceiveStopReq, + fed_id, + &proposed_stop_tag, + start_time_value, + TraceDirection::From, + ); + println!( "RTI received from federate_info {} a MsgType::StopRequest message with tag ({},{}).", fed_id, @@ -1340,26 +1489,50 @@ impl Server { number_of_enclaves = locked_rti.base().number_of_scheduling_nodes(); } for i in 0..number_of_enclaves { - let locked_rti = _f_rti.read().unwrap(); - let f = &locked_rti.base().scheduling_nodes()[i as usize]; - if f.enclave().id() != fed_id && f.requested_stop() == false { - if f.enclave().state() == SchedulingNodeState::NotConnected { + let max_stop_tag; + let enc_id; + let requested_stop; + let enc_state; + { + let locked_rti = _f_rti.read().unwrap(); + max_stop_tag = locked_rti.base().max_stop_tag(); + let f = &locked_rti.base().scheduling_nodes()[i as usize]; + enc_id = f.enclave().id(); + requested_stop = f.requested_stop(); + enc_state = f.enclave().state(); + } + if enc_id != fed_id && requested_stop == false { + if enc_state == SchedulingNodeState::NotConnected { Self::mark_federate_requesting_stop( - f.enclave().id(), + enc_id, _f_rti.clone(), stop_granted.clone(), start_time_value, ); continue; } - // FIXME: Handle unwrap properly. - let stream = f.stream().as_ref().unwrap(); - NetUtil::write_to_socket_fail_on_error( - stream, - &stop_request_buffer, - f.enclave().id(), - "MsgType::StopRequest message", + + Trace::log_trace( + _f_rti.clone(), + TraceEvent::SendStopReq, + fed_id, + &max_stop_tag, + start_time_value, + TraceDirection::To, ); + + { + let locked_rti = _f_rti.read().unwrap(); + let f = &locked_rti.base().scheduling_nodes()[i as usize]; + // FIXME: Handle unwrap properly. + let stream = f.stream().as_ref().unwrap(); + NetUtil::write_to_socket_fail_on_error( + stream, + &stop_request_buffer, + f.enclave().id(), + "MsgType::StopRequest message", + ); + } } } { @@ -1563,6 +1736,15 @@ impl Server { let locked_start_time = start_time.lock().unwrap(); start_time_value = locked_start_time.start_time(); } + Trace::log_trace( + _f_rti.clone(), + TraceEvent::ReceiveStopReqRep, + fed_id, + &federate_stop_tag, + start_time_value, + TraceDirection::From, + ); + println!( "RTI received from federate_info {} STOP reply tag ({}, {}).", fed_id, @@ -1617,19 +1799,26 @@ impl Server { .unwrap(), ); - // TODO: Can be used when tracing_enabled - // let start_idx = u16_size * 2; - // let tag = NetUtil::extract_tag( - // header_buffer[start_idx..(start_idx + mem::size_of::() + mem::size_of::())] - // .try_into() - // .unwrap(), - // ); - + let start_idx = u16_size * 2; + let tag = NetUtil::extract_tag( + header_buffer[start_idx..(start_idx + mem::size_of::() + mem::size_of::())] + .try_into() + .unwrap(), + ); let start_time_value; { let locked_start_time = start_time.lock().unwrap(); start_time_value = locked_start_time.start_time(); } + Trace::log_trace( + _f_rti.clone(), + TraceEvent::ReceivePortAbs, + fed_id, + &tag, + start_time_value, + TraceDirection::From, + ); + // Need to acquire the mutex lock to ensure that the thread handling // messages coming from the socket connected to the destination does not // issue a TAG before this message has been forwarded. @@ -1669,7 +1858,6 @@ impl Server { { let locked_rti = _f_rti.read().unwrap(); let idx: usize = federate_id.into(); - // let fed: &mut FederateInfo = &mut locked_rti.base().scheduling_nodes()[idx]; let fed = &locked_rti.base().scheduling_nodes()[idx]; while fed.enclave().state() == SchedulingNodeState::Pending { // Need to wait here. @@ -1679,7 +1867,21 @@ impl Server { notified = condvar.wait(notified).unwrap(); } } + } + Trace::log_trace( + _f_rti.clone(), + TraceEvent::SendPTag, + fed_id, + &tag, + start_time_value, + TraceDirection::To, + ); + + { + let locked_rti = _f_rti.read().unwrap(); + let idx: usize = federate_id.into(); + let fed = &locked_rti.base().scheduling_nodes()[idx]; // Forward the message. let destination_stream = fed.stream().as_ref().unwrap(); let mut result_buffer = vec![0 as u8]; diff --git a/rust/rti/src/tag.rs b/rust/rti/src/tag.rs index 35440ae..2cdd462 100644 --- a/rust/rti/src/tag.rs +++ b/rust/rti/src/tag.rs @@ -12,6 +12,7 @@ * This file extends enclave.h with RTI features that are specific to federations and are not * used by scheduling enclaves. */ +use std::time::{Duration, SystemTime}; //////////////// Type definitions @@ -106,6 +107,45 @@ impl Tag { self.microstep = microstep; } + /** + * Return the current physical time in nanoseconds. + * On many platforms, this is the number of nanoseconds + * since January 1, 1970, but it is actually platform dependent. + * @return A time instant. + */ + pub fn lf_time_physical() -> Instant { + Tag::_lf_physical_time() + } + + fn _lf_physical_time() -> Instant { + // Get the current clock value + let mut result: i64 = 0; + Self::_lf_clock_now(&mut result); + + if result == 0 { + println!("Failed to read the physical clock."); + return -1; + } + + // TODO: Implement adjustment logic in reactor-c/core/tag.c if needed. + + result + } + + fn _lf_clock_now(t: &mut Instant) { + match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) { + Ok(n) => *t = Self::convert_timespec_to_ns(n), + Err(_) => panic!("SystemTime before UNIX EPOCH!"), + } + } + + fn convert_timespec_to_ns(tp: Duration) -> Instant { + // TODO: Handle unwrap() properly. + return (tp.as_secs() * 1000000000 + u64::from(tp.subsec_nanos())) + .try_into() + .unwrap(); + } + pub fn lf_tag_compare(tag1: &Tag, tag2: &Tag) -> i32 { let tag1_time = tag1.time(); let tag2_time = tag2.time(); diff --git a/rust/rti/src/trace.rs b/rust/rti/src/trace.rs new file mode 100644 index 0000000..59f73d5 --- /dev/null +++ b/rust/rti/src/trace.rs @@ -0,0 +1,632 @@ +/** + * @file + * @author Edward A. Lee + * @author Chanhee Lee + * + * @section LICENSE +Copyright (c) 2020, The University of California at Berkeley and TU Dresden + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + * @section DESCRIPTION + * Definitions of tracepoint events for use with the C code generator and any other + * code generator that uses the C infrastructure (such as the Python code generator). + * + * See: https://www.lf-lang.org/docs/handbook/tracing?target=c + * + * The trace file is named trace.lft and is a binary file with the following format: + * + * Header: + * * instant_t: The start time. This is both the starting physical time and the starting logical time. + * * int: Size N of the table mapping pointers to descriptions. + * This is followed by N records each of which has: + * * A pointer value (the key). + * * A null-terminated string (the description). + * + * Traces: + * A sequence of traces, each of which begins with an int giving the length of the trace + * followed by binary representations of the trace_record struct written using fwrite(). + */ +use std::fs::{File, OpenOptions}; +use std::io::Write; +use std::sync::{Arc, RwLock}; + +use crate::tag::{Instant, Interval, Microstep, Tag}; +use crate::RTIRemote; + +use zerocopy::AsBytes; + +type Void = i64; + +const TRACE_BUFFER_CAPACITY: u32 = 2048; + +#[derive(Clone, Copy, Debug)] +pub enum TraceEvent { + ReceiveFedId, + ReceiveNet, + ReceiveLtc, + ReceivePortAbs, + ReceiveResign, + ReceiveStopReq, + ReceiveStopReqRep, + ReceiveTaggedMsg, + ReceiveTimestamp, + ReceiveUnidentified, + SendAck, + SendPTag, + SendReject, + SendStopReq, + SendTag, + SendTimestamp, +} + +impl TraceEvent { + pub fn to_value(&self) -> u64 { + match self { + TraceEvent::ReceiveFedId => 40, + TraceEvent::ReceiveNet => 35, + TraceEvent::ReceiveLtc => 36, + TraceEvent::ReceivePortAbs => 45, + TraceEvent::ReceiveResign => 44, + TraceEvent::ReceiveStopReq => 37, + TraceEvent::ReceiveStopReqRep => 38, + TraceEvent::ReceiveTaggedMsg => 47, + TraceEvent::ReceiveTimestamp => 34, + TraceEvent::ReceiveUnidentified => 53, + TraceEvent::SendAck => 11, + TraceEvent::SendPTag => 20, + TraceEvent::SendReject => 22, + TraceEvent::SendStopReq => 16, + TraceEvent::SendTag => 21, + TraceEvent::SendTimestamp => 13, + } + } +} + +#[derive(Debug, Clone, Copy)] +pub struct TraceRecord { + event_type: u64, + pointer: Void, // pointer identifying the record, e.g. to self struct for a reactor. + src_id: i32, // The ID number of the source (e.g. worker or federate) or -1 for no ID number. + dst_id: i32, // The ID number of the destination (e.g. reaction or federate) or -1 for no ID number. + logical_time: Instant, + microstep: Microstep, + physical_time: Instant, + trigger: Void, + extra_delay: Instant, +} + +impl TraceRecord { + pub fn new() -> TraceRecord { + TraceRecord { + event_type: 0, + pointer: 0, + src_id: 0, + dst_id: 0, + logical_time: 0, + microstep: 0, + physical_time: 0, + trigger: 0, + extra_delay: 0, + } + } + + pub fn event_type(&self) -> u64 { + self.event_type + } + + pub fn pointer(&self) -> Void { + self.pointer + } + + pub fn src_id(&self) -> i32 { + self.src_id + } + + pub fn dst_id(&self) -> i32 { + self.dst_id + } + + pub fn logical_time(&self) -> Instant { + self.logical_time + } + + pub fn microstep(&self) -> Microstep { + self.microstep + } + + pub fn physical_time(&self) -> Instant { + self.physical_time + } + + pub fn trigger(&self) -> Void { + self.trigger + } + + pub fn extra_delay(&self) -> Instant { + self.extra_delay + } + + pub fn set_event_type(&mut self, event_type: u64) { + self.event_type = event_type; + } + + pub fn set_pointer(&mut self, pointer: Void) { + self.pointer = pointer; + } + + pub fn set_src_id(&mut self, src_id: i32) { + self.src_id = src_id; + } + + pub fn set_dst_id(&mut self, dst_id: i32) { + self.dst_id = dst_id; + } + + pub fn set_logical_time(&mut self, logical_time: Instant) { + self.logical_time = logical_time; + } + + pub fn set_microstep(&mut self, microstep: Microstep) { + self.microstep = microstep; + } + + pub fn set_physical_time(&mut self, physical_time: Instant) { + self.physical_time = physical_time; + } + + pub fn set_trigger(&mut self, trigger: Void) { + self.trigger = trigger; + } + + pub fn set_extra_delay(&mut self, extra_delay: Instant) { + self.extra_delay = extra_delay; + } +} + +pub struct Trace { + /** + * Array of buffers into which traces are written. + * When a buffer becomes full, the contents is flushed to the file, + * which will create a significant pause in the calling thread. + */ + _lf_trace_buffer: Vec>, + _lf_trace_buffer_size: Vec, + + /** The number of trace buffers allocated when tracing starts. */ + _lf_number_of_trace_buffers: usize, + + /** Marker that tracing is stopping or has stopped. */ + _lf_trace_stop: usize, + + /** The file into which traces are written. */ + _lf_trace_file: Option, + + /** The file name where the traces are written*/ + filename: String, + + /** Table of pointers to a description of the object. */ + // TODO: Enable followings. + // object_description_t _lf_trace_object_descriptions[TRACE_OBJECT_TABLE_SIZE]; + _lf_trace_object_descriptions_size: i32, + + /** Indicator that the trace header information has been written to the file. */ + _lf_trace_header_written: bool, + // Pointer back to the environment which we are tracing within + // TODO: env: Option, +} + +impl Trace { + // TODO: pub fn trace_new(filename: &str, env: Environment) -> Trace { + pub fn trace_new(filename: &str) -> Trace { + Trace { + _lf_trace_buffer: Vec::new(), + _lf_trace_buffer_size: Vec::new(), + _lf_number_of_trace_buffers: 0, + _lf_trace_stop: 1, + _lf_trace_file: None, + filename: String::from(filename), + _lf_trace_object_descriptions_size: 0, + _lf_trace_header_written: false, + // TODO: env: env, + } + } + + pub fn lf_trace_buffer(&self) -> &Vec> { + &self._lf_trace_buffer + } + + pub fn lf_trace_buffer_mut(&mut self) -> &mut Vec> { + &mut self._lf_trace_buffer + } + + pub fn lf_trace_buffer_size(&self) -> &Vec { + &self._lf_trace_buffer_size + } + + pub fn lf_trace_buffer_size_mut(&mut self) -> &mut Vec { + &mut self._lf_trace_buffer_size + } + + pub fn lf_number_of_trace_buffers(&self) -> usize { + self._lf_number_of_trace_buffers + } + + pub fn lf_trace_stop(&self) -> usize { + self._lf_trace_stop + } + + pub fn lf_trace_file(&self) -> &Option { + &self._lf_trace_file + } + + pub fn lf_trace_file_mut(&mut self) -> &mut Option { + &mut self._lf_trace_file + } + + pub fn filename(&self) -> &String { + &self.filename + } + + pub fn lf_trace_object_descriptions_size(&self) -> i32 { + self._lf_trace_object_descriptions_size + } + + pub fn lf_trace_header_written(&self) -> bool { + self._lf_trace_header_written + } + + pub fn set_lf_number_of_trace_buffers(&mut self, _lf_number_of_trace_buffers: usize) { + self._lf_number_of_trace_buffers = _lf_number_of_trace_buffers; + } + + pub fn set_lf_trace_stop(&mut self, _lf_trace_stop: usize) { + self._lf_trace_stop = _lf_trace_stop; + } + + pub fn set_lf_trace_file(&mut self, _lf_trace_file: Option) { + self._lf_trace_file = _lf_trace_file; + } + + pub fn set_lf_trace_header_written(&mut self, _lf_trace_header_written: bool) { + self._lf_trace_header_written = _lf_trace_header_written; + } + + pub fn start_trace(trace: &mut Trace, _lf_number_of_workers: usize) { + // FIXME: location of trace file should be customizable. + match File::create(trace.filename()) { + Ok(trace_file) => { + trace.set_lf_trace_file(Some(trace_file)); + } + Err(e) => { + println!( + "WARNING: Failed to open log file with error code {}. No log will be written.", + e + ); + } + } + // Do not write the trace header information to the file yet + // so that startup reactions can register user-defined trace objects. + // write_trace_header(); + trace.set_lf_trace_header_written(false); + + // Allocate an array of arrays of trace records, one per worker thread plus one + // for the 0 thread (the main thread, or in an single-threaded program, the only + // thread). + trace.set_lf_number_of_trace_buffers(_lf_number_of_workers + 1); + for _i in 0.._lf_number_of_workers { + trace.lf_trace_buffer_mut().push(Vec::new()); + trace.lf_trace_buffer_size_mut().push(0); + } + + trace.set_lf_trace_stop(0); + println!("Started tracing."); + } + + pub fn stop_trace_locked(trace: &mut Trace, start_time: Instant) -> bool { + let trace_stopped = trace.lf_trace_stop(); + if trace_stopped + 1 < trace.lf_number_of_trace_buffers() - 1 { + // Not all federates finish yet. Nothing to do. + trace.set_lf_trace_stop(trace_stopped + 1); + return false; + } + // In multithreaded execution, thread 0 invokes wrapup reactions, so we + // put that trace last. However, it could also include some startup events. + // In any case, the trace file does not guarantee any ordering. + for i in 1..trace.lf_number_of_trace_buffers() - 1 { + // Flush the buffer if it has data. + println!( + "DEBUG: Trace buffer {} has {} records.\n", + i, + trace.lf_trace_buffer_size()[i] + ); + if trace.lf_trace_buffer_size()[i] > 0 { + Self::flush_trace_locked(trace, i, start_time); + } + } + if trace.lf_trace_buffer_size()[0] > 0 { + println!( + "DEBUG: Trace buffer 0 has {} records.\n", + trace.lf_trace_buffer_size()[0] + ); + Self::flush_trace_locked(trace, 0, start_time); + } + println!("Stopped tracing."); + true + } + + pub fn tracepoint_rti_from_federate( + trace: &mut Trace, + event_type: TraceEvent, + fed_id: u16, + tag: Tag, + start_time: Instant, + ) { + Self::tracepoint( + trace, + event_type, + // NULL, // void* pointer, + &Some(tag), // tag_t* tag, + fed_id.into(), // int worker (one thread per federate) + -1, // int src_id + fed_id.into(), // int dst_id + &mut 0, // instant_t* physical_time (will be generated) + // NULL, // trigger_t* trigger, + Some(0), // interval_t extra_delay + true, // is_interval_start + start_time, + ); + } + + pub fn tracepoint_rti_to_federate( + trace: &mut Trace, + event_type: TraceEvent, + fed_id: u16, + tag: Tag, + start_time: Instant, + ) { + Self::tracepoint( + trace, + event_type, + // NULL, // void* pointer, + &Some(tag), // tag_t* tag, + fed_id.into(), // int worker (one thread per federate) + -1, // int src_id + fed_id.into(), // int dst_id + &mut 0, // instant_t* physical_time (will be generated) + // NULL, // trigger_t* trigger, + Some(0), // interval_t extra_delay + true, // is_interval_start + start_time, + ); + } + + fn tracepoint( + trace: &mut Trace, + event_type: TraceEvent, + // void* reactor, + tag: &Option, + worker: usize, + src_id: i32, + dst_id: i32, + physical_time: &mut Instant, + // trigger_t* trigger, + extra_delay: Interval, + is_interval_start: bool, + start_time: Instant, + ) { + let mut time; + if !is_interval_start && *physical_time == 0 { + time = Tag::lf_time_physical(); + *physical_time = time; + } + + // TODO: environment_t *env = trace->env; + // Worker argument determines which buffer to write to. + let index; + if worker >= 0 { + index = worker; + } else { + index = 0; + } + + // Flush the buffer if it is full. + if trace.lf_trace_buffer_size()[index] >= TRACE_BUFFER_CAPACITY { + // No more room in the buffer. Write the buffer to the file. + Self::flush_trace(trace, index, start_time); + } + // The above flush_trace resets the write pointer. + let _lf_trace_buffer_size = trace.lf_trace_buffer_size()[index]; + let i = _lf_trace_buffer_size as usize; + + // Write to memory buffer. + // Get the correct time of the event + let buffer_len = trace.lf_trace_buffer_mut()[index].len(); + if buffer_len <= i { + for _a in 0..(i as usize + 1 - buffer_len) { + trace.lf_trace_buffer_mut()[index].push(TraceRecord::new()); + } + } + let lf_trace_buffer: &mut Vec = &mut trace.lf_trace_buffer_mut()[index]; + lf_trace_buffer[i].set_event_type(event_type.to_value()); + lf_trace_buffer[i].set_pointer(0); + lf_trace_buffer[i].set_src_id(src_id); + lf_trace_buffer[i].set_dst_id(dst_id); + // TODO: Handle unwrap() properly. + let _tag = tag.as_ref().unwrap(); + lf_trace_buffer[i].set_logical_time(_tag.time()); + lf_trace_buffer[i].set_microstep(_tag.microstep()); + // TODO: Enable following code + // else if (env != NULL) { + // trace->_lf_trace_buffer[index][i].logical_time = ((environment_t *)env)->current_tag.time; + // trace->_lf_trace_buffer[index][i].microstep = ((environment_t*)env)->current_tag.microstep; + // } + lf_trace_buffer[i].set_trigger(0); + lf_trace_buffer[i].set_extra_delay(extra_delay.unwrap()); + if is_interval_start && *physical_time == 0 { + time = Tag::lf_time_physical(); + *physical_time = time; + } + lf_trace_buffer[i].set_physical_time(*physical_time); + let lf_trace_buffer_size = _lf_trace_buffer_size + 1; + let _ = std::mem::replace( + &mut trace.lf_trace_buffer_size_mut()[index], + lf_trace_buffer_size, + ); + } + + fn flush_trace(trace: &mut Trace, worker: usize, start_time: Instant) { + Self::flush_trace_locked(trace, worker, start_time); + } + + fn flush_trace_locked(trace: &mut Trace, worker: usize, start_time: Instant) { + let _lf_trace_buffer_size = trace.lf_trace_buffer_size()[worker]; + if + // trace.lf_trace_stop() == 0 + // TODO: && trace.lf_trace_file() != None && + _lf_trace_buffer_size > 0 { + // If the trace header has not been written, write it now. + // This is deferred to here so that user trace objects can be + // registered in startup reactions. + if !trace.lf_trace_header_written() { + if Self::write_trace_header(trace, start_time) < 0 { + println!("Failed to write trace header. Trace file will be incomplete."); + return; + } + trace.set_lf_trace_header_written(true); + } + + // Write first the length of the array. + Self::write_to_trace_file( + trace.filename(), + &_lf_trace_buffer_size.to_le_bytes().to_vec(), + ); + + // Write the contents. + if worker >= trace.lf_trace_buffer().len() { + println!( + "[WARNING] worker({}) >= trace buffer length({})", + worker, + trace.lf_trace_buffer().len() + ); + return; + } + for i in 0..trace.lf_trace_buffer_size_mut()[worker] { + let trace_record = &trace.lf_trace_buffer_mut()[worker][i as usize]; + let encoded_trace_records = vec![ + trace_record.event_type().as_bytes(), + trace_record.pointer().as_bytes(), + trace_record.src_id().as_bytes(), + trace_record.dst_id().as_bytes(), + trace_record.logical_time().as_bytes(), + trace_record.microstep().as_bytes(), + 0.as_bytes(), + trace_record.physical_time().as_bytes(), + trace_record.trigger().as_bytes(), + trace_record.extra_delay().as_bytes(), + ] + .concat(); + Self::write_to_trace_file(trace.filename(), &encoded_trace_records); + } + trace.lf_trace_buffer_size_mut()[worker] = 0; + } + } + + fn write_trace_header(trace: &mut Trace, start_time: Instant) -> i32 { + // The first item in the header is the start time. + // This is both the starting physical time and the starting logical time. + println!( + "[DEBUG]: Start time written to trace file is {}.\n", + start_time + ); + Self::write_to_trace_file(trace.filename(), &start_time.to_le_bytes().to_vec()); + + // The next item in the header is the size of the + // _lf_trace_object_descriptions table. + Self::write_to_trace_file( + trace.filename(), + &trace + .lf_trace_object_descriptions_size() + .to_le_bytes() + .to_vec(), + ); + + // TODO: Implement "Next we write the table". + + trace.lf_trace_object_descriptions_size() + } + + fn write_to_trace_file(filename: &String, buffer: &Vec) { + match OpenOptions::new().write(true).append(true).open(filename) { + Ok(mut file) => match file.write(buffer) { + Ok(bytes_written) => { + if bytes_written != buffer.len() { + println!("WARNING: Access to trace file failed.\n"); + return; + } + } + Err(_e) => { + println!("Fail to write to the RTI trace file."); + return; + } + }, + Err(_e) => { + println!("Fail to open the RTI trace file."); + return; + } + } + } + pub fn log_trace( + rti: Arc>, + trace_event: TraceEvent, + fed_id: u16, + tag: &Tag, + start_time: Instant, + direction: TraceDirection, + ) { + let mut locked_rti = rti.write().unwrap(); + let tracing_enabled = locked_rti.base().tracing_enabled(); + let trace = locked_rti.base_mut().trace(); + if tracing_enabled { + match direction { + TraceDirection::From => Trace::tracepoint_rti_from_federate( + trace, + trace_event, + fed_id, + tag.clone(), + start_time, + ), + TraceDirection::To => Trace::tracepoint_rti_to_federate( + trace, + trace_event, + fed_id, + tag.clone(), + start_time, + ), + } + } + } +} + +pub enum TraceDirection { + From, + To, +}