Skip to content

Commit

Permalink
Handle a message type for address advertisement/query
Browse files Browse the repository at this point in the history
- Functions for MsgType::AddressAdvertisement (Byte value 14) and
  MsgType::AddressQuery (Byte value: 13) are added.
  • Loading branch information
chanijjani committed Feb 27, 2024
1 parent 1be07d2 commit 9368760
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 20 deletions.
2 changes: 1 addition & 1 deletion rust/rti/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ edition = "2021"

[dependencies]
byteorder = "1"
priority-queue = "1.3.2"
priority-queue = "2.0.2"
38 changes: 29 additions & 9 deletions rust/rti/src/federate_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,16 @@ use crate::message_record::message_record::InTransitMessageRecordQueue;
* @author Chadlia Jerad (chadlia.jerad@ensi-uma.tn)
* @author Chanhee Lee (chanheel@asu.edu)
* @author Hokeun Kim (hokeun@asu.edu)
* @copyright (c) 2020-2023, The University of California at Berkeley
* @copyright (c) 2020-2024, The University of California at Berkeley
* License in [BSD 2-clause](..)
* @brief Declarations for runtime infrastructure (RTI) for distributed Lingua Franca programs.
* This file extends rti_common.h with RTI features that are specific to federations and are not
* used by scheduling enclaves.
*/
use crate::rti_common::*;

use std::net::TcpStream;
use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpStream};

use std::option::Option;

/**
Expand All @@ -31,21 +32,23 @@ pub struct FederateInfo {
requested_stop: bool, // Indicates that the federate has requested stop or has replied
// to a request for stop from the RTI. Used to prevent double-counting
// a federate when handling lf_request_stop().
// TODO: lf_thread_t thread_id; // The ID of the thread handling communication with this federate.
// TODO: lf_thread_t thread_id;
stream: Option<TcpStream>, // The TCP socket descriptor for communicating with this federate.
// TODO: struct sockaddr_in UDP_addr; // The UDP address for the federate.
// TODO: struct sockaddr_in UDP_addr;
clock_synchronization_enabled: bool, // Indicates the status of clock synchronization
// for this federate. Enabled by default.
in_transit_message_tags: InTransitMessageRecordQueue, // Record of in-transit messages to this federate that are not
// yet processed. This record is ordered based on the time
// value of each message for a more efficient access.
server_hostname: String, // Human-readable IP address and
server_port: i32, // port number of the socket server of the federate
// if it has any incoming direct connections from other federates.
// The port number will be -1 if there is no server or if the
// RTI has not been informed of the port number.
// TODO: struct in_addr server_ip_addr; // Information about the IP address of the socket
// server of the federate.
// if it has any incoming direct connections from other federates.
// The port number will be -1 if there is no server or if the
// RTI has not been informed of the port number.
// TODO: struct in_addr server_ip_addr;
// server of the federate.
server_ip_addr: SocketAddr, // Information about the IP address of the socket
// server of the federate.
}

impl FederateInfo {
Expand All @@ -58,6 +61,7 @@ impl FederateInfo {
in_transit_message_tags: InTransitMessageRecordQueue::new(),
server_hostname: String::from("localhost"),
server_port: -1,
server_ip_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080),
}
}

Expand All @@ -81,6 +85,18 @@ impl FederateInfo {
self.clock_synchronization_enabled
}

pub fn server_hostname(&self) -> String {
self.server_hostname.clone()
}

pub fn server_port(&self) -> i32 {
self.server_port
}

pub fn server_ip_addr(&self) -> SocketAddr {
self.server_ip_addr.clone()
}

pub fn set_requested_stop(&mut self, requested_stop: bool) {
self.requested_stop = requested_stop;
}
Expand All @@ -104,4 +120,8 @@ impl FederateInfo {
pub fn set_server_port(&mut self, server_port: i32) {
self.server_port = server_port;
}

pub fn set_server_ip_addr(&mut self, server_ip_addr: SocketAddr) {
self.server_ip_addr = server_ip_addr;
}
}
3 changes: 3 additions & 0 deletions rust/rti/src/net_common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ pub enum MsgType {
StopRequestReply,
StopGranted,
AddressQuery,
AddressAdvertisement,
P2pSendingFedId,
P2pTaggedMessage,
PortAbsent,
Expand All @@ -125,6 +126,7 @@ impl MsgType {
MsgType::StopRequestReply => 11,
MsgType::StopGranted => 12,
MsgType::AddressQuery => 13,
MsgType::AddressAdvertisement => 14,
MsgType::P2pSendingFedId => 15,
MsgType::P2pTaggedMessage => 17,
MsgType::PortAbsent => 23,
Expand All @@ -147,6 +149,7 @@ impl MsgType {
11 => MsgType::StopRequestReply,
12 => MsgType::StopGranted,
13 => MsgType::AddressQuery,
14 => MsgType::AddressAdvertisement,
23 => MsgType::PortAbsent,
_ => MsgType::Ignore,
}
Expand Down
126 changes: 116 additions & 10 deletions rust/rti/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/
use std::io::Write;
use std::mem;
use std::net::{Shutdown, TcpListener, TcpStream};
use std::net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, TcpListener, TcpStream};
use std::sync::{Arc, Condvar, Mutex, RwLock};
use std::thread;
use std::thread::JoinHandle;
Expand Down Expand Up @@ -269,6 +269,16 @@ impl Server {
cloned_stop_granted.clone(),
)
}
MsgType::AddressQuery => Self::handle_address_query(
fed_id.try_into().unwrap(),
&mut stream,
cloned_rti.clone(),
),
MsgType::AddressAdvertisement => Self::handle_address_ad(
fed_id.try_into().unwrap(),
&mut stream,
cloned_rti.clone(),
),
MsgType::PortAbsent => Self::handle_port_absent_message(
&buffer,
fed_id.try_into().unwrap(),
Expand Down Expand Up @@ -365,13 +375,13 @@ impl Server {
return -1;
} else {
// Received federate_info ID.
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let u16_size = mem::size_of::<u16>();
fed_id = u16::from_le_bytes(first_buffer[1..(1 + u16_size)].try_into().unwrap());
println!("RTI received federate_info ID: {}.", fed_id);

// Read the federation ID. First read the length, which is one byte.
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let federation_id_length = u8::from_le_bytes(
first_buffer[(1 + u16_size)..(1 + u16_size + 1)]
.try_into()
Expand Down Expand Up @@ -443,8 +453,20 @@ impl Server {
"Federation ID matches! \"{}(received)\" <-> \"{}(_f_rti)\"",
federation_id_received, federation_id
);

// TODO: Assign the address information for federate_info.
{
let mut locked_rti = cloned_rti.write().unwrap();
let idx: usize = fed_id.into();
let federate_info: &mut FederateInfo =
&mut locked_rti.base_mut().scheduling_nodes_mut()[idx];
// The MsgType::FedIds message has the right federation ID.
// Assign the address information for federate.
// The IP address is stored here as an in_addr struct (in .server_ip_addr) that can be useful
// to create sockets and can be efficiently sent over the network.
// First, convert the sockaddr structure into a sockaddr_in that contains an internet address.
// Then extract the internet address (which is in IPv4 format) and assign it as the federate's socket server
// TODO: Handle unwrap() in the below line properly.
federate_info.set_server_ip_addr(stream.peer_addr().unwrap());
}

// Set the federate_info's state as pending
// because it is waiting for the start time to be
Expand Down Expand Up @@ -550,7 +572,7 @@ impl Server {
let mut message_head: usize = 0;
// First, read the info about upstream federates
for i in 0..num_upstream {
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let upstream_id = u16::from_le_bytes(
connection_info_body[message_head..(message_head + mem::size_of::<u16>())]
.try_into()
Expand All @@ -562,7 +584,7 @@ impl Server {
"upstream_id: {}, message_head: {}",
upstream_id, message_head
);
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let upstream_delay = i64::from_le_bytes(
connection_info_body[message_head..(message_head + mem::size_of::<i64>())]
.try_into()
Expand All @@ -578,7 +600,7 @@ impl Server {

// Next, read the info about downstream federates
for i in 0..num_downstream {
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let downstream_id = u16::from_le_bytes(
connection_info_body[message_head..(message_head + mem::size_of::<u16>())]
.try_into()
Expand Down Expand Up @@ -629,7 +651,7 @@ impl Server {

if clock_sync_global_status >= ClockSyncStat::ClockSyncInit {
// If no initial clock sync, no need perform initial clock sync.
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let federate_udp_port_number =
u16::from_le_bytes(response[1..3].try_into().unwrap());

Expand Down Expand Up @@ -1589,6 +1611,90 @@ impl Server {
);
}

fn handle_address_query(fed_id: u16, stream: &mut TcpStream, _f_rti: Arc<RwLock<RTIRemote>>) {
// Use buffer both for reading and constructing the reply.
// The length is what is needed for the reply.
let mut buffer = vec![0 as u8; mem::size_of::<u16>()];
NetUtil::read_from_socket_fail_on_error(stream, &mut buffer, fed_id, "address query.");
// let remote_fed_id =
// u16::from_le_bytes(buffer[0..mem::size_of::<u16>()].try_into().unwrap());
let remote_fed_id = u16::from_le_bytes(buffer.clone().try_into().unwrap());

println!(
"RTI received address query from {} for {}.",
fed_id, remote_fed_id
);

// NOTE: server_port initializes to -1, which means the RTI does not know
// the port number because it has not yet received an MsgType::AddressAdvertisement message
// from this federate. In that case, it will respond by sending -1.

// Response message is also of type MsgType::AddressQuery.
let mut response_message = vec![0 as u8; 1 + mem::size_of::<i32>()];
response_message[0] = MsgType::AddressQuery.to_byte();

// Encode the port number.
let server_hostname;
let server_port;
let server_ip_addr;
{
let locked_rti = _f_rti.read().unwrap();
let idx: usize = remote_fed_id.into();
let remote_fed = &locked_rti.base().scheduling_nodes()[idx];
server_hostname = remote_fed.server_hostname();
server_port = remote_fed.server_port();
server_ip_addr = remote_fed.server_ip_addr();
}
// Send the port number (which could be -1).
NetUtil::encode_int32(server_port, &mut response_message, 1);
NetUtil::write_to_socket_fail_on_error(stream, &response_message, fed_id, "port number");

// Send the server IP address to federate.
match server_ip_addr.ip() {
IpAddr::V4(ip_addr) => {
NetUtil::write_to_socket_fail_on_error(
stream,
&(ip_addr.octets().to_vec()),
fed_id,
"ip address",
);
}
IpAddr::V6(_e) => {
println!("IPv6 is not supported yet. Exit.");
std::process::exit(1);
}
}

println!(
"Replied to address query from federate {} with address {}({}):{}.",
fed_id, server_hostname, server_ip_addr, server_port
);
}

fn handle_address_ad(federate_id: u16, stream: &mut TcpStream, _f_rti: Arc<RwLock<RTIRemote>>) {
// Read the port number of the federate that can be used for physical
// connections to other federates
let mut buffer = vec![0 as u8; mem::size_of::<i32>()];
NetUtil::read_from_socket_fail_on_error(stream, &mut buffer, federate_id, "port data");

// FIXME: Update from_le_bytes if endian types should be considered.
let server_port = i32::from_le_bytes(buffer.try_into().unwrap());

assert!(server_port <= u16::MAX.into());

{
let mut locked_rti = _f_rti.write().unwrap();
let idx: usize = federate_id.into();
let fed: &mut FederateInfo = &mut locked_rti.base_mut().scheduling_nodes_mut()[idx];
fed.set_server_port(server_port);
}

println!(
"Received address advertisement with port {} from federate {}.",
server_port, federate_id
);
}

fn handle_port_absent_message(
buffer: &Vec<u8>,
fed_id: u16,
Expand All @@ -1609,7 +1715,7 @@ impl Server {
);

let u16_size = mem::size_of::<u16>();
// FIXME: Change from_le_bytes properly.
// FIXME: Update from_le_bytes if endian types should be considered.
let reactor_port_id = u16::from_le_bytes(header_buffer[0..(u16_size)].try_into().unwrap());
let federate_id = u16::from_le_bytes(
header_buffer[(u16_size)..(u16_size + u16_size)]
Expand Down

0 comments on commit 9368760

Please sign in to comment.