Skip to content

Commit

Permalink
Merge branch 'main' into event-queue-refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
byeonggiljun committed Apr 1, 2024
2 parents 64f21c4 + ec06c93 commit d25282e
Show file tree
Hide file tree
Showing 53 changed files with 557 additions and 402 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
/docs/api
**/.vscode/
**/build/
**/lib/
**/.DS_Store
/core/federated/RTI/build/
/cmake-build-debug/
Expand Down
54 changes: 17 additions & 37 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,17 +1,7 @@
set(CORE_ROOT ${CMAKE_CURRENT_SOURCE_DIR})
set(LF_ROOT ${CMAKE_CURRENT_LIST_DIR}/..)

if(${CMAKE_SYSTEM_NAME} STREQUAL "Windows")
set(CMAKE_SYSTEM_VERSION 10.0)
message("Using Windows SDK version ${CMAKE_VS_WINDOWS_TARGET_PLATFORM_VERSION}")
elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Nrf52")
list(APPEND REACTORC_COMPILE_DEFS PLATFORM_NRF52)
elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Zephyr")
list(APPEND REACTORC_COMPILE_DEFS PLATFORM_ZEPHYR)
set(PLATFORM_ZEPHYR true)
elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Rp2040")
list(APPEND REACTORC_COMPILE_DEFS PLATFORM_RP2040)
endif()
include(${LF_ROOT}/core/lf_utils.cmake)

# Get the general common sources for reactor-c
list(APPEND GENERAL_SOURCES tag.c clock.c port.c mixed_radix.c reactor_common.c lf_token.c environment.c)
Expand Down Expand Up @@ -50,23 +40,18 @@ endif()

# Include sources from subdirectories
include(utils/CMakeLists.txt)

if (DEFINED MODAL_REACTORS)
include(modal_models/CMakeLists.txt)
endif()

# Print sources used for compilation
list(JOIN REACTORC_SOURCES ", " PRINTABLE_SOURCE_LIST)
message(STATUS "Including the following sources: " ${PRINTABLE_SOURCE_LIST})

# Create the reactor-c library. If we are targeting Zephyr we have to use the
# Zephyr Cmake extension to create the library and add the sources.
if(PLATFORM_ZEPHYR)
message("--- Building Zephyr library")
zephyr_library_named(reactor-c)
zephyr_library_sources(${REACTORC_SOURCES})
zephyr_library_link_libraries(kernel)
else()
add_library(reactor-c)
target_sources(reactor-c PRIVATE ${REACTORC_SOURCES})
endif()
add_library(reactor-c)
target_sources(reactor-c PRIVATE ${REACTORC_SOURCES})
lf_enable_compiler_warnings(reactor-c)

if (DEFINED LF_TRACE)
include(${LF_ROOT}/trace/api/CMakeLists.txt)
Expand Down Expand Up @@ -98,9 +83,6 @@ include(${LF_ROOT}/platform/impl/CMakeLists.txt)
target_link_libraries(reactor-c PUBLIC lf::platform-api)
target_link_libraries(reactor-c PRIVATE lf::platform-impl)

# Apply compile definitions to the reactor-c library.
target_compile_definitions(reactor-c PUBLIC ${REACTORC_COMPILE_DEFS})

target_include_directories(reactor-c PUBLIC ../include)
target_include_directories(reactor-c PUBLIC ../include/core)
target_include_directories(reactor-c PUBLIC ../include/core/federated)
Expand Down Expand Up @@ -134,14 +116,18 @@ if(DEFINED _LF_CLOCK_SYNC_ON)
endif()
endif()

# Link with thread library, unless we are on the Zephyr platform.
if(NOT DEFINED LF_SINGLE_THREADED OR DEFINED LF_TRACE)
if (NOT PLATFORM_ZEPHYR)
find_package(Threads REQUIRED)
target_link_libraries(reactor-c PUBLIC Threads::Threads)
endif()
# Unless specified otherwise initial event queue and reaction queue to size 10
if (NOT DEFINED INITIAL_EVENT_QUEUE_SIZE)
set(INITIAL_EVENT_QUEUE_SIZE 10)
endif()
if (NOT DEFINED INITIAL_REACT_QUEUE_SIZE)
set(INITIAL_REACT_QUEUE_SIZE 10)
endif()

target_compile_definitions(reactor-c PRIVATE INITIAL_EVENT_QUEUE_SIZE=${INITIAL_EVENT_QUEUE_SIZE})
target_compile_definitions(reactor-c PRIVATE INITIAL_REACT_QUEUE_SIZE=${INITIAL_REACT_QUEUE_SIZE})
target_compile_definitions(reactor-c PUBLIC PLATFORM_${CMAKE_SYSTEM_NAME})

# Macro for translating a command-line argument into compile definition for
# reactor-c lib
macro(define X)
Expand All @@ -151,12 +137,6 @@ macro(define X)
endif(DEFINED ${X})
endmacro()

# FIXME: May want these to be application dependent, hence passed as
# parameters to Cmake.
target_compile_definitions(reactor-c PRIVATE INITIAL_EVENT_QUEUE_SIZE=10)
target_compile_definitions(reactor-c PRIVATE INITIAL_REACT_QUEUE_SIZE=10)
target_compile_definitions(reactor-c PUBLIC PLATFORM_${CMAKE_SYSTEM_NAME})

# Search and apply all possible compile definitions
message(STATUS "Applying preprocessor definitions...")
define(_LF_CLOCK_SYNC_ATTENUATION)
Expand Down
22 changes: 21 additions & 1 deletion core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ static void environment_init_threaded(environment_t* env, int num_workers) {
LF_MUTEX_INIT(&env->mutex);
LF_COND_INIT(&env->event_q_changed, &env->mutex);
LF_COND_INIT(&env->global_tag_barrier_requestors_reached_zero, &env->mutex);

#else
(void)env;
(void)num_workers;
#endif
}
/**
Expand All @@ -67,6 +69,8 @@ static void environment_init_single_threaded(environment_t* env) {
env->reaction_q = pqueue_init(INITIAL_REACT_QUEUE_SIZE, in_reverse_order, get_reaction_index, get_reaction_position,
set_reaction_position, reaction_matches, print_reaction);

#else
(void)env;
#endif
}

Expand Down Expand Up @@ -97,6 +101,10 @@ static void environment_init_modes(environment_t* env, int num_modes, int num_st
} else {
env->modes = NULL;
}
#else
(void)env;
(void)num_modes;
(void)num_state_resets;
#endif
}

Expand All @@ -113,6 +121,9 @@ static void environment_init_federated(environment_t* env, int num_is_present_fi
env->_lf_intended_tag_fields = NULL;
env->_lf_intended_tag_fields_size = 0;
}
#else
(void)env;
(void)num_is_present_fields;
#endif
}

Expand All @@ -132,12 +143,16 @@ static void environment_free_threaded(environment_t* env) {
#if !defined(LF_SINGLE_THREADED)
free(env->thread_ids);
lf_sched_free(env->scheduler);
#else
(void)env;
#endif
}

static void environment_free_single_threaded(environment_t* env) {
#ifdef LF_SINGLE_THREADED
pqueue_free(env->reaction_q);
#else
(void)env;
#endif
}

Expand All @@ -148,12 +163,16 @@ static void environment_free_modes(environment_t* env) {
free(env->modes->state_resets);
free(env->modes);
}
#else
(void)env;
#endif
}

static void environment_free_federated(environment_t* env) {
#ifdef FEDERATED_DECENTRALIZED
free(env->_lf_intended_tag_fields);
#else
(void)env;
#endif
}

Expand All @@ -178,6 +197,7 @@ int environment_init(environment_t* env, const char* name, int id, int num_worke
int num_startup_reactions, int num_shutdown_reactions, int num_reset_reactions,
int num_is_present_fields, int num_modes, int num_state_resets, int num_watchdogs,
const char* trace_file_name) {
(void)trace_file_name; // Will be used with future enclave support.

env->name = malloc(strlen(name) + 1); // +1 for the null terminator
LF_ASSERT_NON_NULL(env->name);
Expand Down
3 changes: 1 addition & 2 deletions core/federated/RTI/rti.Dockerfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Docker file for building the image of the rti
FROM alpine:latest
COPY core /lingua-franca/core
COPY include /lingua-franca/include
COPY . /lingua-franca
WORKDIR /lingua-franca/core/federated/RTI
RUN set -ex && apk add --no-cache gcc musl-dev cmake make && \
mkdir container && \
Expand Down
4 changes: 2 additions & 2 deletions core/federated/RTI/rti_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,11 @@ typedef struct scheduling_node_t {
tag_t last_provisionally_granted; // The maximum PTAG that has been provisionally granted (or NEVER if none granted)
tag_t next_event; // Most recent NET received from the scheduling node (or NEVER if none received).
scheduling_node_state_t state; // State of the scheduling node.
int* upstream; // Array of upstream scheduling node ids.
uint16_t* upstream; // Array of upstream scheduling node ids.
interval_t* upstream_delay; // Minimum delay on connections from upstream scheduling nodes.
// Here, NEVER encodes no delay. 0LL is a microstep delay.
int num_upstream; // Size of the array of upstream scheduling nodes and delays.
int* downstream; // Array of downstream scheduling node ids.
uint16_t* downstream; // Array of downstream scheduling node ids.
int num_downstream; // Size of the array of downstream scheduling nodes.
execution_mode_t mode; // FAST or REALTIME.
minimum_delay_t* min_delays; // Array of minimum delays from upstream nodes, not including this node.
Expand Down
17 changes: 8 additions & 9 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -891,11 +891,6 @@ void* clock_synchronization_thread(void* noargs) {
}

// Initiate a clock synchronization every rti->clock_sync_period_ns
// Initiate a clock synchronization every rti->clock_sync_period_ns
struct timespec sleep_time = {(time_t)rti_remote->clock_sync_period_ns / BILLION,
rti_remote->clock_sync_period_ns % BILLION};
struct timespec remaining_time;

bool any_federates_connected = true;
while (any_federates_connected) {
// Sleep
Expand Down Expand Up @@ -1323,24 +1318,28 @@ static int receive_connection_information(int* socket_id, uint16_t fed_id) {

// Allocate memory for the upstream and downstream pointers
if (fed->enclave.num_upstream > 0) {
fed->enclave.upstream = (int*)malloc(sizeof(uint16_t) * fed->enclave.num_upstream);
fed->enclave.upstream = (uint16_t*)malloc(sizeof(uint16_t) * fed->enclave.num_upstream);
LF_ASSERT_NON_NULL(fed->enclave.upstream);
// Allocate memory for the upstream delay pointers
fed->enclave.upstream_delay = (interval_t*)malloc(sizeof(interval_t) * fed->enclave.num_upstream);
LF_ASSERT_NON_NULL(fed->enclave.upstream_delay);
} else {
fed->enclave.upstream = (int*)NULL;
fed->enclave.upstream = (uint16_t*)NULL;
fed->enclave.upstream_delay = (interval_t*)NULL;
}
if (fed->enclave.num_downstream > 0) {
fed->enclave.downstream = (int*)malloc(sizeof(uint16_t) * fed->enclave.num_downstream);
fed->enclave.downstream = (uint16_t*)malloc(sizeof(uint16_t) * fed->enclave.num_downstream);
LF_ASSERT_NON_NULL(fed->enclave.downstream);
} else {
fed->enclave.downstream = (int*)NULL;
fed->enclave.downstream = (uint16_t*)NULL;
}

size_t connections_info_body_size = ((sizeof(uint16_t) + sizeof(int64_t)) * fed->enclave.num_upstream) +
(sizeof(uint16_t) * fed->enclave.num_downstream);
unsigned char* connections_info_body = NULL;
if (connections_info_body_size > 0) {
connections_info_body = (unsigned char*)malloc(connections_info_body_size);
LF_ASSERT_NON_NULL(connections_info_body);
read_from_socket_fail_on_error(socket_id, connections_info_body_size, connections_info_body, NULL,
"RTI failed to read MSG_TYPE_NEIGHBOR_STRUCTURE message body from federate %d.",
fed_id);
Expand Down
13 changes: 8 additions & 5 deletions core/federated/clock-sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ void handle_T4_clock_sync_message(unsigned char* buffer, int socket, instant_t r
* Thread that listens for UDP inputs from the RTI.
*/
void* listen_to_rti_UDP_thread(void* args) {
(void)args;
initialize_lf_thread_id();
// Listen for UDP messages from the RTI.
// The only expected messages are T1 and T4, which have
Expand Down Expand Up @@ -468,12 +469,12 @@ void* listen_to_rti_UDP_thread(void* args) {
if (bytes > 0) {
bytes_read += bytes;
}
} while ((errno == EAGAIN || errno == EWOULDBLOCK) && bytes_read < message_size);
} while ((errno == EAGAIN || errno == EWOULDBLOCK) && bytes_read < (ssize_t)message_size);

// Get local physical time before doing anything else.
instant_t receive_time = lf_time_physical();

if (bytes_read < message_size) {
if (bytes_read < (ssize_t)message_size) {
// Either the socket has closed or the RTI has sent EOF.
// Exit the thread to halt clock synchronization.
lf_print_error("Clock sync: UDP socket to RTI is broken: %s. Clock sync is now disabled.", strerror(errno));
Expand Down Expand Up @@ -533,9 +534,9 @@ void clock_sync_remove_offset(instant_t* t) { *t -= (_lf_clock_sync_offset + _lf

void clock_sync_set_constant_bias(interval_t offset) { _lf_clock_sync_constant_bias = offset; }
#else
void clock_sync_apply_offset(instant_t* t) {}
void clock_sync_remove_offset(instant_t* t) {}
void clock_sync_set_constant_bias(interval_t offset) {}
void clock_sync_apply_offset(instant_t* t) { (void)t; }
void clock_sync_remove_offset(instant_t* t) { (void)t; }
void clock_sync_set_constant_bias(interval_t offset) { (void)offset; }
#endif

/**
Expand All @@ -550,6 +551,8 @@ int create_clock_sync_thread(lf_thread_t* thread_id) {
#ifdef _LF_CLOCK_SYNC_ON
// One for UDP messages if clock synchronization is enabled for this federate
return lf_thread_create(thread_id, listen_to_rti_UDP_thread, NULL);
#else
(void)thread_id;
#endif // _LF_CLOCK_SYNC_ON
return 0;
}
Expand Down
Loading

0 comments on commit d25282e

Please sign in to comment.