Skip to content

Commit

Permalink
Merge pull request #407 from lf-lang/improve-fed-retry
Browse files Browse the repository at this point in the history
Improve handling of timeouts when federates connect to RTI/each other
  • Loading branch information
lhstrh authored Apr 12, 2024
2 parents 18d272d + 7ffb1fe commit b64a805
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 18 deletions.
26 changes: 12 additions & 14 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1678,15 +1678,14 @@ void lf_terminate_execution(environment_t* env) {

void lf_connect_to_federate(uint16_t remote_federate_id) {
int result = -1;
int count_retries = 0;

// Ask the RTI for port number of the remote federate.
// The buffer is used for both sending and receiving replies.
// The size is what is needed for receiving replies.
unsigned char buffer[sizeof(int32_t) + INET_ADDRSTRLEN + 1];
int port = -1;
struct in_addr host_ip_addr;
int count_tries = 0;
instant_t start_connect = lf_time_physical();
while (port == -1 && !_lf_termination_executed) {
buffer[0] = MSG_TYPE_ADDRESS_QUERY;
// NOTE: Sending messages in little endian.
Expand Down Expand Up @@ -1724,7 +1723,7 @@ void lf_connect_to_federate(uint16_t remote_federate_id) {
// remote federate has not yet sent an MSG_TYPE_ADDRESS_ADVERTISEMENT message to the RTI.
// Sleep for some time before retrying.
if (port == -1) {
if (count_tries++ >= CONNECT_MAX_RETRIES) {
if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) {
lf_print_error_and_exit("TIMEOUT obtaining IP/port for federate %d from the RTI.", remote_federate_id);
}
// Wait ADDRESS_QUERY_RETRY_INTERVAL nanoseconds.
Expand All @@ -1745,8 +1744,8 @@ void lf_connect_to_federate(uint16_t remote_federate_id) {
LF_PRINT_LOG("Received address %s port %d for federate %d from RTI.", hostname, uport, remote_federate_id);
#endif

// Iterate until we either successfully connect or exceed the number of
// attempts given by CONNECT_MAX_RETRIES.
// Iterate until we either successfully connect or we exceed the CONNECT_TIMEOUT
start_connect = lf_time_physical();
int socket_id = -1;
while (result < 0 && !_lf_termination_executed) {
// Create an IPv4 socket for TCP (not UDP) communication over IP (0).
Expand All @@ -1772,12 +1771,11 @@ void lf_connect_to_federate(uint16_t remote_federate_id) {
// Note that this should not really happen since the remote federate should be
// accepting socket connections. But possibly it will be busy (in process of accepting
// another socket connection?). Hence, we retry.
count_retries++;
if (count_retries > CONNECT_MAX_RETRIES) {
// If the remote federate is not accepting the connection after CONNECT_MAX_RETRIES
if (CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT)) {
// If the remote federate is not accepting the connection after CONNECT_TIMEOUT
// treat it as a soft error condition and return.
lf_print_error("Failed to connect to federate %d after %d retries. Giving up.", remote_federate_id,
CONNECT_MAX_RETRIES);
lf_print_error("Failed to connect to federate %d with timeout: " PRINTF_TIME ". Giving up.", remote_federate_id,
CONNECT_TIMEOUT);
return;
}
lf_print_warning("Could not connect to federate %d. Will try again every" PRINTF_TIME "nanoseconds.\n",
Expand Down Expand Up @@ -1859,10 +1857,10 @@ void lf_connect_to_rti(const char* hostname, int port) {
_fed.socket_TCP_RTI = create_real_time_tcp_socket_errexit();

int result = -1;
int count_retries = 0;
struct addrinfo* res = NULL;

while (count_retries++ < CONNECT_MAX_RETRIES && !_lf_termination_executed) {
instant_t start_connect = lf_time_physical();
while (!CHECK_TIMEOUT(start_connect, CONNECT_TIMEOUT) && !_lf_termination_executed) {
if (res != NULL) {
// This is a repeated attempt.
if (_fed.socket_TCP_RTI >= 0)
Expand All @@ -1884,7 +1882,7 @@ void lf_connect_to_rti(const char* hostname, int port) {
// Reconstruct the address info.
rti_address(hostname, uport, &res);
}
lf_print("Trying RTI again on port %d (attempt %d).", uport, count_retries);
lf_print("Trying RTI again on port %d.", uport);
} else {
// This is the first attempt.
rti_address(hostname, uport, &res);
Expand Down Expand Up @@ -1975,7 +1973,7 @@ void lf_connect_to_rti(const char* hostname, int port) {
}
}
if (result < 0) {
lf_print_error_and_exit("Failed to connect to RTI after %d tries.", CONNECT_MAX_RETRIES);
lf_print_error_and_exit("Failed to connect to RTI with timeout: " PRINTF_TIME, CONNECT_TIMEOUT);
}

freeaddrinfo(res); /* No longer needed */
Expand Down
2 changes: 1 addition & 1 deletion include/core/federated/federate.h
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,7 @@ void lf_connect_to_federate(uint16_t);
* This will return the socket descriptor for the connection.
* If port_number is 0, then start at DEFAULT_PORT and increment
* the port number on each attempt. If an attempt fails, wait CONNECT_RETRY_INTERVAL
* and try again. If it fails after CONNECT_MAX_RETRIES, the program exits.
* and try again. If it fails after CONNECT_TIMEOUT, the program exits.
* If it succeeds, it sets the _fed.socket_TCP_RTI global variable to refer to
* the socket for communicating with the RTI.
* @param hostname A hostname, such as "localhost".
Expand Down
6 changes: 3 additions & 3 deletions include/core/federated/network/net_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,10 +205,10 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

/**
* Bound on the number of retries to connect to the RTI.
* A federate will retry every CONNECT_RETRY_INTERVAL seconds
* this many times before giving up.
* A federate will retry every CONNECT_RETRY_INTERVAL seconds until
* CONNECTION_TIMEOUT expires.
*/
#define CONNECT_MAX_RETRIES 100
#define CONNECT_TIMEOUT MINUTES(1)

/**
* Maximum number of port addresses that a federate will try to connect to the RTI on.
Expand Down
3 changes: 3 additions & 0 deletions tag/api/tag.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@
#define ZERO_TAG \
(tag_t) { .time = 0LL, .microstep = 0u }

// Returns true if timeout has elapsed.
#define CHECK_TIMEOUT(start, duration) (lf_time_physical() > ((start) + (duration)))

// Convenience for converting times
#define BILLION ((instant_t)1000000000LL)

Expand Down

0 comments on commit b64a805

Please sign in to comment.