Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve handling of timeouts when federates connect to RTI/each other #407

Merged
merged 2 commits into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 12 additions & 14 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1678,15 +1678,14 @@ void lf_terminate_execution(environment_t* env) {

void lf_connect_to_federate(uint16_t remote_federate_id) {
int result = -1;
int count_retries = 0;

// Ask the RTI for port number of the remote federate.
// The buffer is used for both sending and receiving replies.
// The size is what is needed for receiving replies.
unsigned char buffer[sizeof(int32_t) + INET_ADDRSTRLEN + 1];
int port = -1;
struct in_addr host_ip_addr;
int count_tries = 0;
instant_t start_connect = lf_time_physical();
while (port == -1 && !_lf_termination_executed) {
buffer[0] = MSG_TYPE_ADDRESS_QUERY;
// NOTE: Sending messages in little endian.
Expand Down Expand Up @@ -1724,7 +1723,7 @@ void lf_connect_to_federate(uint16_t remote_federate_id) {
// remote federate has not yet sent an MSG_TYPE_ADDRESS_ADVERTISEMENT message to the RTI.
// Sleep for some time before retrying.
if (port == -1) {
if (count_tries++ >= CONNECT_MAX_RETRIES) {
if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) {
lf_print_error_and_exit("TIMEOUT obtaining IP/port for federate %d from the RTI.", remote_federate_id);
}
// Wait ADDRESS_QUERY_RETRY_INTERVAL nanoseconds.
Expand All @@ -1745,8 +1744,8 @@ void lf_connect_to_federate(uint16_t remote_federate_id) {
LF_PRINT_LOG("Received address %s port %d for federate %d from RTI.", hostname, uport, remote_federate_id);
#endif

// Iterate until we either successfully connect or exceed the number of
// attempts given by CONNECT_MAX_RETRIES.
// Iterate until we either successfully connect or we exceed the CONNECT_TIMEOUT
start_connect = lf_time_physical();
int socket_id = -1;
while (result < 0 && !_lf_termination_executed) {
// Create an IPv4 socket for TCP (not UDP) communication over IP (0).
Expand All @@ -1772,12 +1771,11 @@ void lf_connect_to_federate(uint16_t remote_federate_id) {
// Note that this should not really happen since the remote federate should be
// accepting socket connections. But possibly it will be busy (in process of accepting
// another socket connection?). Hence, we retry.
count_retries++;
if (count_retries > CONNECT_MAX_RETRIES) {
// If the remote federate is not accepting the connection after CONNECT_MAX_RETRIES
if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) {
// If the remote federate is not accepting the connection after CONNECT_TIMEOUT
// treat it as a soft error condition and return.
lf_print_error("Failed to connect to federate %d after %d retries. Giving up.", remote_federate_id,
CONNECT_MAX_RETRIES);
lf_print_error("Failed to connect to federate %d with timeout: " PRINTF_TIME ". Giving up.", remote_federate_id,
CONNECT_TIMEOUT);
return;
}
lf_print_warning("Could not connect to federate %d. Will try again every" PRINTF_TIME "nanoseconds.\n",
Expand Down Expand Up @@ -1859,10 +1857,10 @@ void lf_connect_to_rti(const char* hostname, int port) {
_fed.socket_TCP_RTI = create_real_time_tcp_socket_errexit();

int result = -1;
int count_retries = 0;
struct addrinfo* res = NULL;

while (count_retries++ < CONNECT_MAX_RETRIES && !_lf_termination_executed) {
instant_t start_connect = lf_time_physical();
while (!CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT) && !_lf_termination_executed) {
if (res != NULL) {
// This is a repeated attempt.
if (_fed.socket_TCP_RTI >= 0)
Expand All @@ -1884,7 +1882,7 @@ void lf_connect_to_rti(const char* hostname, int port) {
// Reconstruct the address info.
rti_address(hostname, uport, &res);
}
lf_print("Trying RTI again on port %d (attempt %d).", uport, count_retries);
lf_print("Trying RTI again on port %d.", uport);
} else {
// This is the first attempt.
rti_address(hostname, uport, &res);
Expand Down Expand Up @@ -1975,7 +1973,7 @@ void lf_connect_to_rti(const char* hostname, int port) {
}
}
if (result < 0) {
lf_print_error_and_exit("Failed to connect to RTI after %d tries.", CONNECT_MAX_RETRIES);
lf_print_error_and_exit("Failed to connect to RTI with timeout: " PRINTF_TIME, CONNECT_TIMEOUT);
}

freeaddrinfo(res); /* No longer needed */
Expand Down
2 changes: 1 addition & 1 deletion include/core/federated/federate.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ void lf_connect_to_federate(uint16_t);
* This will return the socket descriptor for the connection.
* If port_number is 0, then start at DEFAULT_PORT and increment
* the port number on each attempt. If an attempt fails, wait CONNECT_RETRY_INTERVAL
* and try again. If it fails after CONNECT_MAX_RETRIES, the program exits.
* and try again. If it fails after CONNECT_TIMEOUT, the program exits.
* If it succeeds, it sets the _fed.socket_TCP_RTI global variable to refer to
* the socket for communicating with the RTI.
* @param hostname A hostname, such as "localhost".
Expand Down
6 changes: 3 additions & 3 deletions include/core/federated/network/net_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,10 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

/**
* Bound on the number of retries to connect to the RTI.
* A federate will retry every CONNECT_RETRY_INTERVAL seconds
* this many times before giving up.
* A federate will retry every CONNECT_RETRY_INTERVAL seconds until
* CONNECTION_TIMEOUT expires.
*/
#define CONNECT_MAX_RETRIES 100
#define CONNECT_TIMEOUT MINUTES(1)

/**
* Maximum number of port addresses that a federate will try to connect to the RTI on.
Expand Down
3 changes: 3 additions & 0 deletions tag/api/tag.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
#define ZERO_TAG \
(tag_t) { .time = 0LL, .microstep = 0u }

// Returns true if timeout has elapsed.
#define CHECK_TIMEOUT(start, duration) (lf_time_physical() > ((start) + (duration)))

// Convenience for converting times
#define BILLION ((instant_t)1000000000LL)

Expand Down
Loading