Skip to content

Commit

Permalink
Simplify the code of message dropping in the remote RTI.
Browse files Browse the repository at this point in the history
  • Loading branch information
ChadliaJerad committed Dec 13, 2024
1 parent 9053feb commit 34d51b3
Showing 1 changed file with 7 additions and 36 deletions.
43 changes: 7 additions & 36 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -663,20 +663,14 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff
// issue a TAG before this message has been forwarded.
LF_MUTEX_LOCK(&rti_mutex);

// If the destination federate is no longer connected, issue a warning,
// remove the message from the socket and return.
// If the destination federate is no longer connected, or it is a transient that has not started executing yet
// (the intended tag is less than the effective start tag of the destination), issue a warning, remove the message
// from the socket, and return.
federate_info_t* fed = GET_FED_INFO(federate_id);
if (fed->enclave.state == NOT_CONNECTED) {
lf_print_warning("RTI: Destination federate %d is no longer connected. Dropping message.", federate_id);
LF_PRINT_LOG("Fed status: next_event " PRINTF_TAG ", "
"completed " PRINTF_TAG ", "
"last_granted " PRINTF_TAG ", "
"last_provisionally_granted " PRINTF_TAG ".",
fed->enclave.next_event.time - start_time, fed->enclave.next_event.microstep,
fed->enclave.completed.time - start_time, fed->enclave.completed.microstep,
fed->enclave.last_granted.time - start_time, fed->enclave.last_granted.microstep,
fed->enclave.last_provisionally_granted.time - start_time,
fed->enclave.last_provisionally_granted.microstep);
if (fed->enclave.state == NOT_CONNECTED || lf_tag_compare(intended_tag, fed->effective_start_tag) < 0) {
lf_print_warning("RTI: Destination federate %d is not connected at logical time (" PRINTF_TAG
"). Dropping message.",
federate_id, intended_tag.time - start_time, intended_tag.microstep);
// If the message was larger than the buffer, we must empty out the remainder also.
size_t total_bytes_read = bytes_read;
while (total_bytes_read < total_bytes_to_read) {
Expand All @@ -690,29 +684,6 @@ void handle_timed_message(federate_info_t* sending_federate, unsigned char* buff
}
LF_MUTEX_UNLOCK(&rti_mutex);
return;
} else {
// Do not forward the message if the federate is connected, but its
// start_time is not reached yet
if (lf_tag_compare(intended_tag, fed->effective_start_tag) < 0) {
LF_PRINT_LOG("RTI: Effective start tag of the destination federate %d (" PRINTF_TAG "), "
"is not reached yet, while the received message tag is ()" PRINTF_TAG "). "
"Dropping message.",
federate_id, fed->effective_start_tag.time - start_time, fed->effective_start_tag.microstep,
intended_tag.time - start_time, intended_tag.microstep);
// Similarly, if the message was larger than the buffer, we must empty out the remainder also.
size_t total_bytes_read = bytes_read;
while (total_bytes_read < total_bytes_to_read) {
bytes_to_read = total_bytes_to_read - total_bytes_read;
if (bytes_to_read > FED_COM_BUFFER_SIZE) {
bytes_to_read = FED_COM_BUFFER_SIZE;
}
read_from_socket_fail_on_error(&sending_federate->socket, bytes_to_read, buffer, NULL,
"RTI failed to clear message chunks.");
total_bytes_read += bytes_to_read;
}
LF_MUTEX_UNLOCK(&rti_mutex);
return;
}
}

LF_PRINT_DEBUG("RTI forwarding message to port %d of federate %hu of length %zu.", reactor_port_id, federate_id,
Expand Down

0 comments on commit 34d51b3

Please sign in to comment.