Skip to content

Commit

Permalink
Fix bugs due to rebase + Tracepoint lf_stop()
Browse files Browse the repository at this point in the history
  • Loading branch information
ChadliaJerad committed Mar 14, 2024
1 parent d0596d1 commit 5975c39
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 73 deletions.
23 changes: 9 additions & 14 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,7 @@ static void* pending_grant_thread(void* federate) {
* @param fed The federate.
* @param tag The tag to grant.
*/
static void notify_tag_advance_grant_delayed(scheduling_node_t* e, tag_t tag) {
federate_info_t* fed = (federate_info_t*)GET_FED_INFO(e->id);

static void notify_tag_advance_grant_delayed(federate_info_t* fed, tag_t tag) {
// Check wether there is already a pending grant
// And check the pending provisional grant as well
lf_mutex_lock(&rti_mutex);
Expand Down Expand Up @@ -418,9 +416,7 @@ static void* pending_provisional_grant_thread(void* federate) {
* @param fed The federate.
* @param tag The provisional tag to grant.
*/
static void notify_provisional_tag_advance_grant_delayed(scheduling_node_t* e, tag_t tag) {
federate_info_t* fed = (federate_info_t*)e;

static void notify_provisional_tag_advance_grant_delayed(federate_info_t* fed, tag_t tag) {
// Proceed with the delayed provisional tag grant notification only if
// there is no pending grant and no provisional pending grant
LF_MUTEX_LOCK(&rti_mutex);
Expand Down Expand Up @@ -981,9 +977,10 @@ void send_start_tag(federate_info_t* my_fed, instant_t federation_start_time, ta
// message.
// In the startup phase, federates will receive identical start_time and
// effective_start_tag
unsigned char start_time_buffer[MSG_TYPE_TIMESTAMP_LENGTH];
start_time_buffer[0] = MSG_TYPE_TIMESTAMP;
unsigned char start_time_buffer[MSG_TYPE_TIMESTAMP_START_LENGTH];
start_time_buffer[0] = MSG_TYPE_TIMESTAMP_START;
encode_int64(swap_bytes_if_big_endian_int64(start_time), &start_time_buffer[1]);
encode_tag(&(start_time_buffer[1 + sizeof(instant_t)]), federate_start_tag);

if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(send_TIMESTAMP, my_fed->enclave.id, &federate_start_tag);
Expand Down Expand Up @@ -1509,7 +1506,7 @@ static int32_t receive_and_check_fed_id_message(int* socket_id, struct sockaddr_
// First byte received is the message type.
if (buffer[0] != MSG_TYPE_FED_IDS) {
if (rti_remote->base.tracing_enabled) {
tracepoint_rti_to_federate(rti_remote->base.trace, send_REJECT, fed_id, NULL);
tracepoint_rti_to_federate(send_REJECT, fed_id, NULL);
}
if (buffer[0] == MSG_TYPE_P2P_SENDING_FED_ID || buffer[0] == MSG_TYPE_P2P_TAGGED_MESSAGE) {
// The federate is trying to connect to a peer, not to the RTI.
Expand Down Expand Up @@ -2022,15 +2019,13 @@ void* lf_connect_to_transient_federates_thread(void* nothing) {
// The following blocks until a federate connects.
int socket_id = -1;
while (1) {
if (!rti_remote->all_persistent_federates_exited) {
return NULL;
}
// if (!rti_remote->all_persistent_federates_exited) {
// return NULL;
// }
socket_id = accept(rti_remote->socket_descriptor_TCP, &client_fd, &client_length);
if (socket_id >= 0) {
// Got a socket
break;
} else if (socket_id < 0 && (errno != EAGAIN || errno != EWOULDBLOCK)) {
lf_print_error_system_failure("RTI failed to accept the socket.");
} else {
// Try again
lf_print_warning("RTI failed to accept the socket. %s. Trying again.", strerror(errno));
Expand Down
7 changes: 3 additions & 4 deletions core/threaded/reactor_threaded.c
Original file line number Diff line number Diff line change
Expand Up @@ -615,14 +615,13 @@ void _lf_initialize_start_tag(environment_t* env) {
}

_lf_initialize_timers(env);
env->current_tag = effective_start_tag;

#if defined FEDERATED_DECENTRALIZED
// If we have a non-zero STA offset, then we need to allow messages to arrive
// prior to the start time. To avoid spurious STP violations, we temporarily
// set the current time back by the STA offset.
env->current_tag =
(tag_t){.time = effective_start_tag.time - lf_fed_STA_offset, .microstep = effective_start_tag.microstep};

env->current_tag.time -= lf_fed_STA_offset;
LF_PRINT_LOG("Waiting for start time " PRINTF_TIME " plus STA " PRINTF_TIME ".", start_time, lf_fed_STA_offset);
#else
// For other than federated decentralized execution, there is no lf_fed_STA_offset variable defined.
Expand Down Expand Up @@ -674,7 +673,7 @@ void _lf_initialize_start_tag(environment_t* env) {
// from exceeding the timestamp of the message. It will remove that barrier
// once the complete message has been read. Here, we wait for that barrier
// to be removed, if appropriate before proceeding to executing tag (0,0).
_lf_wait_on_tag_barrier(env, (tag_t){.time = start_time, .microstep = 0});
_lf_wait_on_tag_barrier(env, effective_start_tag);
lf_spawn_staa_thread();

#else // NOT FEDERATED_DECENTRALIZED
Expand Down
69 changes: 14 additions & 55 deletions include/core/tracepoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ typedef enum {
receive_ADR_AD,
receive_ADR_QR,
receive_UNIDENTIFIED,
send_STOP,
receive_STOP,
NUM_EVENT_TYPES
} trace_event_t;

Expand All @@ -110,63 +112,20 @@ typedef enum {
* String description of event types.
*/
static const char* trace_event_names[] = {
"Reaction starts",
"Reaction ends",
"Reaction deadline missed",
"Schedule called",
"User-defined event",
"User-defined valued event",
"Worker wait starts",
"Worker wait ends",
"Scheduler advancing time starts",
"Scheduler advancing time ends",
"Federated marker",
"Reaction starts", "Reaction ends", "Reaction deadline missed", "Schedule called", "User-defined event",
"User-defined valued event", "Worker wait starts", "Worker wait ends", "Scheduler advancing time starts",
"Scheduler advancing time ends", "Federated marker",
// Sending messages
"Sending ACK",
"Sending FAILED",
"Sending TIMESTAMP",
"Sending NET",
"Sending LTC",
"Sending STOP_REQ",
"Sending STOP_REQ_REP",
"Sending STOP_GRN",
"Sending FED_ID",
"Sending PTAG",
"Sending TAG",
"Sending REJECT",
"Sending RESIGN",
"Sending PORT_ABS",
"Sending CLOSE_RQ",
"Sending TAGGED_MSG",
"Sending P2P_TAGGED_MSG",
"Sending MSG",
"Sending P2P_MSG",
"Sending ADR_AD",
"Sending ADR_QR",
"Sending ACK", "Sending FAILED", "Sending TIMESTAMP", "Sending NET", "Sending LTC", "Sending STOP_REQ",
"Sending STOP_REQ_REP", "Sending STOP_GRN", "Sending FED_ID", "Sending PTAG", "Sending TAG", "Sending REJECT",
"Sending RESIGN", "Sending PORT_ABS", "Sending CLOSE_RQ", "Sending TAGGED_MSG", "Sending P2P_TAGGED_MSG",
"Sending MSG", "Sending P2P_MSG", "Sending ADR_AD", "Sending ADR_QR",
// Receiving messages
"Receiving ACK",
"Receiving FAILED",
"Receiving TIMESTAMP",
"Receiving NET",
"Receiving LTC",
"Receiving STOP_REQ",
"Receiving STOP_REQ_REP",
"Receiving STOP_GRN",
"Receiving FED_ID",
"Receiving PTAG",
"Receiving TAG",
"Receiving REJECT",
"Receiving RESIGN",
"Receiving PORT_ABS",
"Receiving CLOSE_RQ",
"Receiving TAGGED_MSG",
"Receiving P2P_TAGGED_MSG",
"Receiving MSG",
"Receiving P2P_MSG",
"Receiving ADR_AD",
"Receiving ADR_QR",
"Receiving UNIDENTIFIED",
};
"Receiving ACK", "Receiving FAILED", "Receiving TIMESTAMP", "Receiving NET", "Receiving LTC", "Receiving STOP_REQ",
"Receiving STOP_REQ_REP", "Receiving STOP_GRN", "Receiving FED_ID", "Receiving PTAG", "Receiving TAG",
"Receiving REJECT", "Receiving RESIGN", "Receiving PORT_ABS", "Receiving CLOSE_RQ", "Receiving TAGGED_MSG",
"Receiving P2P_TAGGED_MSG", "Receiving MSG", "Receiving P2P_MSG", "Receiving ADR_AD", "Receiving ADR_QR",
"Receiving UNIDENTIFIED", "Sending STOP", "Receiving STOP"};

/**
* @brief A trace record that gets written in binary to the trace file in the default implementation.
Expand Down

0 comments on commit 5975c39

Please sign in to comment.