diff --git a/.github/workflows/build-rti.yml b/.github/workflows/build-rti.yml index bcf714ea7..2561b7da0 100644 --- a/.github/workflows/build-rti.yml +++ b/.github/workflows/build-rti.yml @@ -4,7 +4,7 @@ on: workflow_call: jobs: - run: + native-build: strategy: matrix: platform: [ubuntu-latest, macos-latest, windows-latest] @@ -12,8 +12,24 @@ jobs: steps: - name: Check out reactor-c repository - uses: actions/checkout@v2 + uses: actions/checkout@v4 - name: Build the RTI with AUTH=OFF run: .github/scripts/build-rti.sh -DAUTH=OFF - name: Build the RTI with AUTH=ON run: .github/scripts/build-rti.sh -DAUTH=ON + + docker-build: + runs-on: ubuntu-latest + steps: + - name: Check out reactor-c repository + uses: actions/checkout@v4 + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v3 + - name: Build Docker image + uses: docker/build-push-action@v6 + with: + file: ./core/federated/RTI/rti.Dockerfile + context: . + platforms: linux/amd64, linux/arm64, linux/arm/v7, linux/riscv64 + push: false + tags: lflang/rti:latest diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4ba3c90c3..ae670e1fa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -11,7 +11,7 @@ on: concurrency: group: ci-${{ github.ref }}-${{ github.event_path }} - cancel-in-progress: ${{ github.ref != 'refs/heads/master' }} + cancel-in-progress: ${{ github.ref != 'refs/heads/main' }} jobs: check-labels: @@ -56,12 +56,12 @@ jobs: if: ${{ !github.event.pull_request.draft ||contains( github.event.pull_request.labels.*.name, 'zephyr') }} lf-default-flexpret: - needs: [fetch-lf] - uses: lf-lang/lingua-franca/.github/workflows/c-flexpret-tests.yml@master - with: - runtime-ref: ${{ github.ref }} - compiler-ref: ${{ needs.fetch-lf.outputs.ref }} - if: ${{ !github.event.pull_request.draft ||contains( github.event.pull_request.labels.*.name, 'flexpret') }} + needs: [fetch-lf] + uses: lf-lang/lingua-franca/.github/workflows/c-flexpret-tests.yml@master + with: + runtime-ref: ${{ github.ref }} + compiler-ref: ${{ needs.fetch-lf.outputs.ref }} + if: ${{ !github.event.pull_request.draft ||contains( github.event.pull_request.labels.*.name, 'flexpret') }} lf-default: needs: [fetch-lf] @@ -97,4 +97,4 @@ jobs: compiler-ref: ${{ needs.fetch-lf.outputs.ref }} scheduler: ADAPTIVE all-platforms: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'mac') || contains( github.event.pull_request.labels.*.name, 'windows') }} - if: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'schedulers') }} \ No newline at end of file + if: ${{ !github.event.pull_request.draft || contains( github.event.pull_request.labels.*.name, 'schedulers') }} diff --git a/.github/workflows/clang-format.yml b/.github/workflows/clang-format.yml index 11028dd38..2f099df81 100644 --- a/.github/workflows/clang-format.yml +++ b/.github/workflows/clang-format.yml @@ -5,12 +5,15 @@ on: [pull_request] jobs: clang-format: - runs-on: ubuntu-20.04 + runs-on: ubuntu-24.04 steps: - uses: actions/checkout@v2 - - name: Install clang-tidy + - name: Install clang-tidy and clang-format run: | sudo apt-get update sudo apt-get install -y clang-tidy + sudo apt-get install -y pipx + pipx install clang-format + clang-format --version - name: Analyze run: make format-check diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index ca51fd50c..6d938ae0c 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -7,7 +7,7 @@ include(${LF_ROOT}/core/lf_utils.cmake) list(APPEND GENERAL_SOURCES tag.c clock.c port.c mixed_radix.c reactor_common.c lf_token.c environment.c) # Add tracing support if requested -if (DEFINED LF_TRACE) +if(DEFINED LF_TRACE) message(STATUS "Including sources specific to tracing.") list(APPEND GENERAL_SOURCES tracepoint.c) endif() @@ -16,7 +16,7 @@ endif() list(APPEND REACTORC_SOURCES ${GENERAL_SOURCES}) # Add sources for either threaded or single-threaded runtime -if (DEFINED FEDERATED) +if(DEFINED FEDERATED) include(federated/CMakeLists.txt) include(federated/network/CMakeLists.txt) endif() @@ -35,14 +35,14 @@ endif() # Add sources for the local RTI if we are using scheduling enclaves if(DEFINED LF_ENCLAVES) -include(federated/RTI/local_rti.cmake) + include(federated/RTI/local_rti.cmake) endif() # Include sources from subdirectories include(utils/CMakeLists.txt) -if (DEFINED MODAL_REACTORS) -include(modal_models/CMakeLists.txt) +if(DEFINED MODAL_REACTORS) + include(modal_models/CMakeLists.txt) endif() # Print sources used for compilation @@ -53,7 +53,7 @@ add_library(reactor-c) target_sources(reactor-c PRIVATE ${REACTORC_SOURCES}) lf_enable_compiler_warnings(reactor-c) -if (DEFINED LF_TRACE) +if(DEFINED LF_TRACE) include(${LF_ROOT}/trace/api/CMakeLists.txt) target_link_libraries(reactor-c PUBLIC lf::trace-api) # If the user specified an external trace plugin. Find it and link with it @@ -106,18 +106,19 @@ target_include_directories(reactor-c PUBLIC ../include/core/threaded) target_include_directories(reactor-c PUBLIC ../include/core/utils) target_include_directories(reactor-c PUBLIC federated/RTI/) -if (APPLE) - SET(CMAKE_C_ARCHIVE_CREATE " Scr ") +if(APPLE) + SET(CMAKE_C_ARCHIVE_CREATE " Scr ") SET(CMAKE_CXX_ARCHIVE_CREATE " Scr ") - SET(CMAKE_C_ARCHIVE_FINISH " -no_warning_for_no_symbols -c ") + SET(CMAKE_C_ARCHIVE_FINISH " -no_warning_for_no_symbols -c ") SET(CMAKE_CXX_ARCHIVE_FINISH " -no_warning_for_no_symbols -c ") endif() # Link with OpenSSL library if(DEFINED FEDERATED_AUTHENTICATED) - if (APPLE) + if(APPLE) set(OPENSSL_ROOT_DIR /usr/local/opt/openssl) endif() + find_package(OpenSSL REQUIRED) target_link_libraries(reactor-c PUBLIC OpenSSL::SSL) endif() @@ -130,10 +131,11 @@ if(DEFINED FEDERATED) endif() # Unless specified otherwise initial event queue and reaction queue to size 10 -if (NOT DEFINED INITIAL_EVENT_QUEUE_SIZE) +if(NOT DEFINED INITIAL_EVENT_QUEUE_SIZE) set(INITIAL_EVENT_QUEUE_SIZE 10) endif() -if (NOT DEFINED INITIAL_REACT_QUEUE_SIZE) + +if(NOT DEFINED INITIAL_REACT_QUEUE_SIZE) set(INITIAL_REACT_QUEUE_SIZE 10) endif() diff --git a/core/environment.c b/core/environment.c index af192c09b..d24de4bfc 100644 --- a/core/environment.c +++ b/core/environment.c @@ -210,10 +210,16 @@ int environment_init(environment_t* env, const char* name, int id, int num_worke const char* trace_file_name) { (void)trace_file_name; // Will be used with future enclave support. - env->name = malloc(strlen(name) + 1); // +1 for the null terminator - LF_ASSERT_NON_NULL(env->name); - strcpy(env->name, name); - + // Space for the name string with the null terminator. + if (name != NULL) { + size_t name_size = strlen(name) + 1; // +1 for the null terminator + env->name = (char*)malloc(name_size); + LF_ASSERT_NON_NULL(env->name); + // Use strncpy rather than strcpy to avoid compiler warnings. + strncpy(env->name, name, name_size); + } else { + env->name = NULL; + } env->id = id; env->stop_tag = FOREVER_TAG; @@ -284,3 +290,9 @@ int environment_init(environment_t* env, const char* name, int id, int num_worke env->initialized = true; return 0; } + +void environment_verify(environment_t* env) { + for (int i = 0; i < env->is_present_fields_size; i++) { + LF_ASSERT_NON_NULL(env->is_present_fields[i]); + } +} \ No newline at end of file diff --git a/core/federated/RTI/.gitignore b/core/federated/RTI/.gitignore new file mode 100644 index 000000000..a1e6bc234 --- /dev/null +++ b/core/federated/RTI/.gitignore @@ -0,0 +1,3 @@ +CMakeFiles +Makefile +cmake_install.cmake \ No newline at end of file diff --git a/core/federated/RTI/rti.Dockerfile b/core/federated/RTI/rti.Dockerfile index e70e34584..bbe4f9d97 100644 --- a/core/federated/RTI/rti.Dockerfile +++ b/core/federated/RTI/rti.Dockerfile @@ -1,5 +1,5 @@ -# Docker file for building the image of the rti -FROM alpine:latest +ARG BASEIMAGE=alpine:latest +FROM ${BASEIMAGE} AS builder COPY . /lingua-franca WORKDIR /lingua-franca/core/federated/RTI RUN set -ex && apk add --no-cache gcc musl-dev cmake make && \ @@ -9,5 +9,14 @@ RUN set -ex && apk add --no-cache gcc musl-dev cmake make && \ make && \ make install -# Use ENTRYPOINT not CMD so that command-line arguments go through -ENTRYPOINT ["RTI"] +WORKDIR /lingua-franca + +# application stage +FROM ${BASEIMAGE} AS app +LABEL maintainer="lf-lang" +LABEL source="https://github.com/lf-lang/reactor-c/tree/main/core/federated/RTI" +COPY --from=builder /usr/local/bin/RTI /usr/local/bin/RTI + +WORKDIR /lingua-franca + +ENTRYPOINT ["/usr/local/bin/RTI"] diff --git a/core/federated/federate.c b/core/federated/federate.c index 395d89b18..c98b5d0bc 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -51,7 +51,6 @@ extern bool _lf_termination_executed; // Global variables references in federate.h lf_mutex_t lf_outbound_socket_mutex; lf_cond_t lf_port_status_changed; -lf_cond_t lf_current_tag_changed; /** * The max level allowed to advance (MLAA) is a variable that tracks how far in the reaction @@ -203,7 +202,7 @@ static lf_action_base_t* action_for_port(int port_id) { * @param tag The tag on which the latest status of all network input * ports is known. */ -static void update_last_known_status_on_input_ports(tag_t tag) { +static void update_last_known_status_on_input_ports(tag_t tag, environment_t* env) { LF_PRINT_DEBUG("In update_last_known_status_on_input ports."); bool notify = false; for (size_t i = 0; i < _lf_action_table_size; i++) { @@ -229,6 +228,8 @@ static void update_last_known_status_on_input_ports(tag_t tag) { if (notify && lf_update_max_level(tag, false)) { // Notify network input reactions lf_cond_broadcast(&lf_port_status_changed); + // Could be blocked waiting for physical time to advance to the STA, so unblock that too. + lf_cond_broadcast(&env->event_q_changed); } } @@ -281,6 +282,7 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag // federate that is far ahead of other upstream federates in logical time. lf_update_max_level(_fed.last_TAG, _fed.is_last_TAG_provisional); lf_cond_broadcast(&lf_port_status_changed); + lf_cond_broadcast(&env->event_q_changed); } else { // Message arrivals should be monotonic, so this should not occur. lf_print_warning("Attempt to update the last known status tag " @@ -289,6 +291,34 @@ static void update_last_known_status_on_input_port(environment_t* env, tag_t tag } } +/** + * @brief Mark all the input ports connected to the given federate as known to be absent until FOREVER. + * + * This does nothing if the federate is not using decentralized coordination. + * This function acquires the mutex on the top-level environment. + * @param fed_id The ID of the federate. + */ +static void mark_inputs_known_absent(int fed_id) { +#ifdef FEDERATED_DECENTRALIZED + // Note that when transient federates are supported, this will need to be updated because the + // federate could rejoin. + environment_t* env; + _lf_get_environments(&env); + LF_MUTEX_LOCK(&env->mutex); + + for (size_t i = 0; i < _lf_action_table_size; i++) { + lf_action_base_t* action = _lf_action_table[i]; + if (action->source_id == fed_id) { + update_last_known_status_on_input_port(env, FOREVER_TAG, i); + } + } + LF_MUTEX_UNLOCK(&env->mutex); +#else + // Do nothing, except suppress unused parameter error. + (void)fed_id; +#endif // FEDERATED_DECENTRALIZED +} + /** * Set the status of network port with id portID. * @@ -623,7 +653,7 @@ static int handle_tagged_message(int* socket, int fed_id) { } #endif // FEDERATED_DECENTRALIZED // The following will update the input_port_action->last_known_status_tag. - // For decentralized coordination, this is needed for the thread implementing STAA. + // For decentralized coordination, this is needed to unblock the STAA. update_last_known_status_on_input_port(env, actual_tag, port_id); // If the current time >= stop time, discard the message. @@ -732,46 +762,46 @@ static void* listen_to_federates(void* _args) { bool socket_closed = false; // Read one byte to get the message type. LF_PRINT_DEBUG("Waiting for a P2P message on socket %d.", *socket_id); + bool bad_message = false; if (read_from_socket_close_on_error(socket_id, 1, buffer)) { // Socket has been closed. lf_print("Socket from federate %d is closed.", fed_id); // Stop listening to this federate. socket_closed = true; - break; - } - LF_PRINT_DEBUG("Received a P2P message on socket %d of type %d.", *socket_id, buffer[0]); - bool bad_message = false; - switch (buffer[0]) { - case MSG_TYPE_P2P_MESSAGE: - LF_PRINT_LOG("Received untimed message from federate %d.", fed_id); - if (handle_message(socket_id, fed_id)) { - // Failed to complete the reading of a message on a physical connection. - lf_print_warning("Failed to complete reading of message on physical connection."); - socket_closed = true; - } - break; - case MSG_TYPE_P2P_TAGGED_MESSAGE: - LF_PRINT_LOG("Received tagged message from federate %d.", fed_id); - if (handle_tagged_message(socket_id, fed_id)) { - // P2P tagged messages are only used in decentralized coordination, and - // it is not a fatal error if the socket is closed before the whole message is read. - // But this thread should exit. - lf_print_warning("Failed to complete reading of tagged message."); - socket_closed = true; - } - break; - case MSG_TYPE_PORT_ABSENT: - LF_PRINT_LOG("Received port absent message from federate %d.", fed_id); - if (handle_port_absent_message(socket_id, fed_id)) { - // P2P tagged messages are only used in decentralized coordination, and - // it is not a fatal error if the socket is closed before the whole message is read. - // But this thread should exit. - lf_print_warning("Failed to complete reading of tagged message."); - socket_closed = true; + } else { + LF_PRINT_DEBUG("Received a P2P message on socket %d of type %d.", *socket_id, buffer[0]); + switch (buffer[0]) { + case MSG_TYPE_P2P_MESSAGE: + LF_PRINT_LOG("Received untimed message from federate %d.", fed_id); + if (handle_message(socket_id, fed_id)) { + // Failed to complete the reading of a message on a physical connection. + lf_print_warning("Failed to complete reading of message on physical connection."); + socket_closed = true; + } + break; + case MSG_TYPE_P2P_TAGGED_MESSAGE: + LF_PRINT_LOG("Received tagged message from federate %d.", fed_id); + if (handle_tagged_message(socket_id, fed_id)) { + // P2P tagged messages are only used in decentralized coordination, and + // it is not a fatal error if the socket is closed before the whole message is read. + // But this thread should exit. + lf_print_warning("Failed to complete reading of tagged message."); + socket_closed = true; + } + break; + case MSG_TYPE_PORT_ABSENT: + LF_PRINT_LOG("Received port absent message from federate %d.", fed_id); + if (handle_port_absent_message(socket_id, fed_id)) { + // P2P tagged messages are only used in decentralized coordination, and + // it is not a fatal error if the socket is closed before the whole message is read. + // But this thread should exit. + lf_print_warning("Failed to complete reading of tagged message."); + socket_closed = true; + } + break; + default: + bad_message = true; } - break; - default: - bad_message = true; } if (bad_message) { lf_print_error("Received erroneous message type: %d. Closing the socket.", buffer[0]); @@ -780,12 +810,10 @@ static void* listen_to_federates(void* _args) { break; // while loop } if (socket_closed) { - // NOTE: For decentralized execution, once this socket is closed, we could + // For decentralized execution, once this socket is closed, we // update last known tags of all ports connected to the specified federate to FOREVER_TAG, // which would eliminate the need to wait for STAA to assume an input is absent. - // However, at this time, we don't know which ports correspond to which upstream federates. - // The code generator would have to encode this information. Once that is done, - // we could call update_last_known_status_on_input_port with FOREVER_TAG. + mark_inputs_known_absent(fed_id); break; // while loop } @@ -1020,7 +1048,7 @@ static void handle_tag_advance_grant(void) { // knows the status of network ports up to and including the granted tag, // so by extension, we assume that the federate can safely rely // on the RTI to handle port statuses up until the granted tag. - update_last_known_status_on_input_ports(TAG); + update_last_known_status_on_input_ports(TAG, env); // It is possible for this federate to have received a PTAG // earlier with the same tag as this TAG. @@ -1044,7 +1072,7 @@ static void handle_tag_advance_grant(void) { #ifdef FEDERATED_DECENTRALIZED /** - * @brief Return whether there exists an input port whose status is unknown. + * @brief Return true if there is an input port among those with a given STAA whose status is unknown. * * @param staa_elem A record of all input port actions. */ @@ -1073,9 +1101,28 @@ static int id_of_action(lf_action_base_t* input_port_action) { #endif +#ifdef FEDERATED_DECENTRALIZED +/** + * @brief Return true if all network input ports are known up to the specified tag. + * @param tag The tag. + */ +static bool inputs_known_to(tag_t tag) { + for (size_t i = 0; i < _lf_action_table_size; i++) { + tag_t known_to = _lf_action_table[i]->trigger->last_known_status_tag; + if (lf_tag_compare(known_to, tag) < 0) { + // There is a network input port for which it is not known whether a message with tag earlier + // than or equal to the specified tag may later arrive. + return false; + } + } + return true; +} +#endif // FEDERATED_DECENTRALIZED + /** * @brief Thread handling setting the known absent status of input ports. - * For the code-generated array of staa offsets `staa_lst`, which is sorted by STAA offset, + * + * For the code-generated array of STAA offsets `staa_lst`, which is sorted by STAA offset, * wait for physical time to advance to the current time plus the STAA offset, * then set the absent status of the input ports associated with the STAA. * Then wait for current time to advance and start over. @@ -1098,26 +1145,30 @@ static void* update_ports_from_staa_offsets(void* args) { staa_t* staa_elem = staa_lst[i]; // The staa_elem is adjusted in the code generator to have subtracted the delay on the connection. // The list is sorted in increasing order of adjusted STAA offsets. - // The wait_until function automatically adds the lf_fed_STA_offset to the wait time. - interval_t wait_until_time = env->current_tag.time + staa_elem->STAA; - LF_PRINT_DEBUG("**** (update thread) original wait_until_time: " PRINTF_TIME, wait_until_time - lf_time_start()); + // We need to add the lf_fed_STA_offset to the wait time and guard against overflow. + interval_t wait_time = lf_time_add(staa_elem->STAA, lf_fed_STA_offset); + instant_t wait_until_time = lf_time_add(env->current_tag.time, wait_time); + LF_PRINT_DEBUG("**** (update thread) wait_until_time: " PRINTF_TIME, wait_until_time - lf_time_start()); // The wait_until call will release the env->mutex while it is waiting. // However, it will not release the env->mutex if the wait time is too small. // At the cost of a small additional delay in deciding a port is absent, - // we require a minimum wait time here. Otherwise, if both the STAA and STA are - // zero, this thread will fail to ever release the environment mutex. + // we require a minimum wait time here. Note that zero-valued STAAs are + // included, and STA might be zero or very small. + // In this case, this thread will fail to ever release the environment mutex. // This causes chaos. The MIN_SLEEP_DURATION is the smallest amount of time // that wait_until will actually wait. Note that this strategy does not // block progress of any execution that is actually processing events. // It only slightly delays the decision that an event is absent, and only // if the STAA and STA are extremely small. - if (lf_fed_STA_offset + staa_elem->STAA < 5 * MIN_SLEEP_DURATION) { + if (wait_time < 5 * MIN_SLEEP_DURATION) { wait_until_time += 5 * MIN_SLEEP_DURATION; } while (a_port_is_unknown(staa_elem)) { LF_PRINT_DEBUG("**** (update thread) waiting until: " PRINTF_TIME, wait_until_time - lf_time_start()); if (wait_until(wait_until_time, &lf_port_status_changed)) { + // Specified timeout time was reached. + // If the current tag has changed, start over. if (lf_tag_compare(lf_tag(env), tag_when_started_waiting) != 0) { break; } @@ -1125,16 +1176,17 @@ static void* update_ports_from_staa_offsets(void* args) { tag_t current_tag = lf_tag(env); LF_PRINT_DEBUG("**** (update thread) Assuming absent! " PRINTF_TAG, current_tag.time - lf_time_start(), current_tag.microstep); LF_PRINT_DEBUG("**** (update thread) Lag is " PRINTF_TIME, current_tag.time - - lf_time_physical()); LF_PRINT_DEBUG("**** (update thread) Wait until time is " PRINTF_TIME, wait_until_time - - lf_time_start()); + lf_time_physical()); LF_PRINT_DEBUG("**** (update thread) Wait until time is " PRINTF_TIME, + wait_until_time - lf_time_start()); */ + // Mark input ports absent. for (size_t j = 0; j < staa_elem->num_actions; ++j) { lf_action_base_t* input_port_action = staa_elem->actions[j]; if (input_port_action->trigger->status == unknown) { input_port_action->trigger->status = absent; - LF_PRINT_DEBUG("**** (update thread) Assuming port absent at time " PRINTF_TIME, - lf_tag(env).time - start_time); + LF_PRINT_DEBUG("**** (update thread) Assuming port absent at tag " PRINTF_TAG, + lf_tag(env).time - start_time, lf_tag(env).microstep); update_last_known_status_on_input_port(env, lf_tag(env), id_of_action(input_port_action)); lf_cond_broadcast(&lf_port_status_changed); } @@ -1160,23 +1212,27 @@ static void* update_ports_from_staa_offsets(void* args) { // Some ports may have been reset to uknown during that wait, in which case, // it would be huge mistake to enter the wait for a new tag below because the // program will freeze. First, check whether any ports are unknown: - bool port_unkonwn = false; + bool port_unknown = false; for (size_t i = 0; i < staa_lst_size; ++i) { staa_t* staa_elem = staa_lst[i]; if (a_port_is_unknown(staa_elem)) { - port_unkonwn = true; + port_unknown = true; break; } } - if (!port_unkonwn) { + if (!port_unknown) { // If this occurs, then there is a race condition that can lead to deadlocks. lf_print_error_and_exit("**** (update thread) Inconsistency: All ports are known, but MLAA is blocking."); } // Since max_level_allowed_to_advance will block advancement of time, we cannot follow // through to the next step without deadlocking. Wait some time, then continue. - // The wait is necessary to prevent a busy wait. - lf_sleep(2 * MIN_SLEEP_DURATION); + // The wait is necessary to prevent a busy wait, which will only occur if port + // status are always known inside the while loop + // Be sure to use wait_until() instead of sleep() because sleep() will not release the mutex. + instant_t wait_until_time = lf_time_add(env->current_tag.time, 2 * MIN_SLEEP_DURATION); + wait_until(wait_until_time, &lf_port_status_changed); + continue; } @@ -1189,7 +1245,7 @@ static void* update_ports_from_staa_offsets(void* args) { // Ports are reset to unknown at the start of new tag, so that will wake this up. lf_cond_wait(&lf_port_status_changed); } - LF_PRINT_DEBUG("**** (update thread) Tags after wait: " PRINTF_TAG ", " PRINTF_TAG, + LF_PRINT_DEBUG("**** (update thread) Tags after wait: " PRINTF_TAG ", and before: " PRINTF_TAG, lf_tag(env).time - lf_time_start(), lf_tag(env).microstep, tag_when_started_waiting.time - lf_time_start(), tag_when_started_waiting.microstep); } @@ -1894,9 +1950,9 @@ void lf_connect_to_rti(const char* hostname, int port) { if (result < 0) continue; // Connect failed. - // Have connected to an RTI, but not sure it's the right RTI. - // Send a MSG_TYPE_FED_IDS message and wait for a reply. - // Notify the RTI of the ID of this federate and its federation. + // Have connected to an RTI, but not sure it's the right RTI. + // Send a MSG_TYPE_FED_IDS message and wait for a reply. + // Notify the RTI of the ID of this federate and its federation. #ifdef FEDERATED_AUTHENTICATED LF_PRINT_LOG("Connected to an RTI. Performing HMAC-based authentication using federation ID."); @@ -2711,4 +2767,29 @@ bool lf_update_max_level(tag_t tag, bool is_provisional) { return (prev_max_level_allowed_to_advance != max_level_allowed_to_advance); } -#endif +#ifdef FEDERATED_DECENTRALIZED +instant_t lf_wait_until_time(tag_t tag) { + instant_t result = tag.time; // Default. + + // Do not add the STA if the tag is the starting tag. + if (tag.time != start_time || tag.microstep != 0u) { + + // Apply the STA to the logical time, but only if at least one network input port is not known up to this tag. + // Subtract one microstep because it is sufficient to commit to a tag if the input ports are known + // up to one microstep earlier. + if (tag.microstep > 0) { + tag.microstep--; + } else { + tag.microstep = UINT_MAX; + tag.time -= 1; + } + + if (!inputs_known_to(tag)) { + result = lf_time_add(result, lf_fed_STA_offset); + } + } + return result; +} +#endif // FEDERATED_DECENTRALIZED + +#endif // FEDERATED diff --git a/core/lf_token.c b/core/lf_token.c index 48913f85b..b97a3b366 100644 --- a/core/lf_token.c +++ b/core/lf_token.c @@ -26,7 +26,20 @@ int _lf_count_token_allocations; #include "platform.h" // Enter/exit critical sections #include "port.h" // Defines lf_port_base_t. -lf_token_t* _lf_tokens_allocated_in_reactions = NULL; +/** + * @brief List of tokens created within reactions that must be freed. + * + * Tokens created by lf_writable_copy, which is automatically invoked + * when an input is mutable, must have their reference count decremented + * at the end of a tag (or the beginning of the next tag). + * Otherwise, their memory could leak. If they are passed on to + * an output or to a call to lf_schedule during the reaction, then + * those will also result in incremented reference counts, enabling + * the token to live on until used. For example, a new token created + * by lf_writable_copy could become the new template token for an output + * via a call to lf_set. + */ +static lf_token_t* _lf_tokens_allocated_in_reactions = NULL; //////////////////////////////////////////////////////////////////// //// Global variables not visible outside this file. @@ -197,6 +210,8 @@ lf_token_t* _lf_new_token(token_type_t* type, void* value, size_t length) { if (hashset_iterator_next(iterator) >= 0) { result = hashset_iterator_value(iterator); hashset_remove(_lf_token_recycling_bin, result); + // Make sure there isn't a previous value. + result->value = NULL; LF_PRINT_DEBUG("_lf_new_token: Retrieved token from the recycling bin: %p", (void*)result); } free(iterator); @@ -222,22 +237,20 @@ lf_token_t* _lf_new_token(token_type_t* type, void* value, size_t length) { } lf_token_t* _lf_get_token(token_template_t* tmplt) { - if (tmplt->token != NULL) { - if (tmplt->token->ref_count == 1) { - LF_PRINT_DEBUG("_lf_get_token: Reusing template token: %p with ref_count %zu", (void*)tmplt->token, - tmplt->token->ref_count); - // Free any previous value in the token. - _lf_free_token_value(tmplt->token); - return tmplt->token; - } else { - // Liberate the token. - _lf_done_using(tmplt->token); - } + LF_CRITICAL_SECTION_ENTER(GLOBAL_ENVIRONMENT); + if (tmplt->token != NULL && tmplt->token->ref_count == 1) { + LF_PRINT_DEBUG("_lf_get_token: Reusing template token: %p with ref_count %zu", (void*)tmplt->token, + tmplt->token->ref_count); + // Free any previous value in the token. + _lf_free_token_value(tmplt->token); + LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); + return tmplt->token; } + LF_CRITICAL_SECTION_EXIT(GLOBAL_ENVIRONMENT); // If we get here, we need a new token. - tmplt->token = _lf_new_token((token_type_t*)tmplt, NULL, 0); - tmplt->token->ref_count = 1; - return tmplt->token; + lf_token_t* result = _lf_new_token((token_type_t*)tmplt, NULL, 0); + result->ref_count = 1; + return result; } void _lf_initialize_template(token_template_t* tmplt, size_t element_size) { @@ -352,8 +365,7 @@ token_freed _lf_done_using(lf_token_t* token) { void _lf_free_token_copies() { while (_lf_tokens_allocated_in_reactions != NULL) { - lf_token_t* next = _lf_tokens_allocated_in_reactions->next; _lf_done_using(_lf_tokens_allocated_in_reactions); - _lf_tokens_allocated_in_reactions = next; + _lf_tokens_allocated_in_reactions = _lf_tokens_allocated_in_reactions->next; } } diff --git a/core/reactor_common.c b/core/reactor_common.c index 34f9d68eb..358480eb8 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -217,8 +217,6 @@ void _lf_start_time_step(environment_t* env) { // Reset absent fields on network ports because // their status is unknown lf_reset_status_fields_on_input_port_triggers(); - // Signal the helper thread to reset its progress since the logical time has changed. - lf_cond_signal(&lf_current_tag_changed); } #endif // FEDERATED } @@ -305,13 +303,19 @@ void _lf_pop_events(environment_t* env) { } } - // Mark the trigger present. + // Mark the trigger present event->trigger->status = present; // If the trigger is a periodic timer, create a new event for its next execution. if (event->trigger->is_timer && event->trigger->period > 0LL) { // Reschedule the trigger. lf_schedule_trigger(env, event->trigger, event->trigger->period, NULL); + } else { + // For actions, store a pointer to status field so it is reset later. + int ipfas = lf_atomic_fetch_add(&env->is_present_fields_abbreviated_size, 1); + if (ipfas < env->is_present_fields_size) { + env->is_present_fields_abbreviated[ipfas] = (bool*)&event->trigger->status; + } } // Copy the token pointer into the trigger struct so that the @@ -325,9 +329,6 @@ void _lf_pop_events(environment_t* env) { // freed prematurely. _lf_done_using(token); - // Mark the trigger present. - event->trigger->status = present; - lf_recycle_event(env, event); // Peek at the next event in the event queue. @@ -385,12 +386,16 @@ void _lf_initialize_timer(environment_t* env, trigger_t* timer) { // Get an event_t struct to put on the event queue. // Recycle event_t structs, if possible. - event_t* e = lf_get_new_event(env); - e->trigger = timer; - e->base.tag = (tag_t){.time = lf_time_logical(env) + delay, .microstep = 0}; - // NOTE: No lock is being held. Assuming this only happens at startup. - pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e); - tracepoint_schedule(env, timer, delay); // Trace even though schedule is not called. + tag_t next_tag = (tag_t){.time = lf_time_logical(env) + delay, .microstep = 0}; + // Do not schedule the next event if it is after the timeout. + if (!lf_is_tag_after_stop_tag(env, next_tag)) { + event_t* e = lf_get_new_event(env); + e->trigger = timer; + e->base.tag = next_tag; + // NOTE: No lock is being held. Assuming this only happens at startup. + pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e); + tracepoint_schedule(env, timer, delay); // Trace even though schedule is not called. + } } void _lf_initialize_timers(environment_t* env) { @@ -605,8 +610,12 @@ trigger_handle_t _lf_insert_reactions_for_trigger(environment_t* env, trigger_t* // for which we decrement the reference count. _lf_replace_template_token((token_template_t*)trigger, token); - // Mark the trigger present. + // Mark the trigger present and store a pointer to it for marking it as absent later. trigger->status = present; + int ipfas = lf_atomic_fetch_add(&env->is_present_fields_abbreviated_size, 1); + if (ipfas < env->is_present_fields_size) { + env->is_present_fields_abbreviated[ipfas] = (bool*)&trigger->status; + } // Push the corresponding reactions for this trigger // onto the reaction queue. @@ -1098,6 +1107,13 @@ void initialize_global(void) { // Call the code-generated function to initialize all actions, timers, and ports // This is done for all environments/enclaves at the same time. _lf_initialize_trigger_objects(); + +#if !defined(LF_SINGLE_THREADED) && !defined(NDEBUG) + // If we are testing, verify that environment with pointers is correctly set up. + for (int i = 0; i < num_envs; i++) { + environment_verify(&envs[i]); + } +#endif } /** diff --git a/core/tag.c b/core/tag.c index c5f8f88c6..e777eccc1 100644 --- a/core/tag.c +++ b/core/tag.c @@ -58,6 +58,25 @@ instant_t lf_time_add(instant_t a, interval_t b) { return res; } +instant_t lf_time_subtract(instant_t a, interval_t b) { + if (a == NEVER || b == FOREVER) { + return NEVER; + } + if (a == FOREVER || b == NEVER) { + return FOREVER; + } + instant_t res = a - b; + // Check for overflow + if (res < a && b < 0) { + return FOREVER; + } + // Check for underflow + if (res > a && b > 0) { + return NEVER; + } + return res; +} + tag_t lf_tag_add(tag_t a, tag_t b) { instant_t res = lf_time_add(a.time, b.time); if (res == FOREVER) { diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index ce4c463b5..493bd5a3e 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -176,7 +176,7 @@ void lf_set_present(lf_port_base_t* port) { return; environment_t* env = port->source_reactor->environment; bool* is_present_field = &port->is_present; - int ipfas = lf_atomic_fetch_add32(&env->is_present_fields_abbreviated_size, 1); + int ipfas = lf_atomic_fetch_add(&env->is_present_fields_abbreviated_size, 1); if (ipfas < env->is_present_fields_size) { env->is_present_fields_abbreviated[ipfas] = is_present_field; } @@ -184,7 +184,7 @@ void lf_set_present(lf_port_base_t* port) { // Support for sparse destination multiports. if (port->sparse_record && port->destination_channel >= 0 && port->sparse_record->size >= 0) { - size_t next = (size_t)lf_atomic_fetch_add32(&port->sparse_record->size, 1); + size_t next = (size_t)lf_atomic_fetch_add(&port->sparse_record->size, 1); if (next >= port->sparse_record->capacity) { // Buffer is full. Have to revert to the classic iteration. port->sparse_record->size = -1; @@ -194,46 +194,9 @@ void lf_set_present(lf_port_base_t* port) { } } -/** - * Wait until physical time matches or exceeds the specified logical time, - * unless -fast is given. For decentralized coordination, this function will - * add the STA offset to the wait time. - * - * If an event is put on the event queue during the wait, then the wait is - * interrupted and this function returns false. It also returns false if the - * timeout time is reached before the wait has completed. Note this this could - * return true even if the a new event was placed on the queue if that event - * time matches or exceeds the specified time. - * - * The mutex lock associated with the condition argument is assumed to be held by - * the calling thread. This mutex is released while waiting. If the wait time is - * too small to actually wait (less than MIN_SLEEP_DURATION), then this function - * immediately returns true and the mutex is not released. - * - * @param env Environment within which we are executing. - * @param logical_time Logical time to wait until physical time matches it. - * @param condition A condition variable that can interrupt the wait. The mutex - * associated with this condition variable will be released during the wait. - * - * @return Return false if the wait is interrupted either because of an event - * queue signal or if the wait time was interrupted early by reaching - * the stop time, if one was specified. Return true if the full wait time - * was reached. - */ -bool wait_until(instant_t logical_time, lf_cond_t* condition) { - LF_PRINT_DEBUG("-------- Waiting until physical time matches logical time " PRINTF_TIME, logical_time); - interval_t wait_until_time = logical_time; -#ifdef FEDERATED_DECENTRALIZED // Only apply the STA if coordination is decentralized - // Apply the STA to the logical time - // Prevent an overflow - if (start_time != logical_time && wait_until_time < FOREVER - lf_fed_STA_offset) { - // If wait_time is not forever - LF_PRINT_DEBUG("Adding STA " PRINTF_TIME " to wait until time " PRINTF_TIME ".", lf_fed_STA_offset, - wait_until_time - start_time); - wait_until_time += lf_fed_STA_offset; - } -#endif - if (!fast) { +bool wait_until(instant_t wait_until_time, lf_cond_t* condition) { + if (!fast || (wait_until_time == FOREVER && keepalive_specified)) { + LF_PRINT_DEBUG("-------- Waiting until physical time " PRINTF_TIME, wait_until_time - start_time); // Check whether we actually need to wait, or if we have already passed the timepoint. interval_t wait_duration = wait_until_time - lf_time_physical(); if (wait_duration < MIN_SLEEP_DURATION) { @@ -252,10 +215,8 @@ bool wait_until(instant_t logical_time, lf_cond_t* condition) { // Wait did not time out, which means that there // may have been an asynchronous call to lf_schedule(). - // Continue waiting. - // Do not adjust logical tag here. If there was an asynchronous - // call to lf_schedule(), it will have put an event on the event queue, - // and logical tag will be set to that time when that event is pulled. + // If there was an asynchronous call to lf_schedule(), it will have put an event on the event queue, + // and the tag will be set to that value when that event is pulled. return false; } else { // Reached timeout. @@ -420,8 +381,18 @@ void _lf_next_locked(environment_t* env) { // Wait for physical time to advance to the next event time (or stop time). // This can be interrupted if a physical action triggers (e.g., a message // arrives from an upstream federate or a local physical action triggers). - LF_PRINT_LOG("Waiting until elapsed time " PRINTF_TIME ".", (next_tag.time - start_time)); - while (!wait_until(next_tag.time, &env->event_q_changed)) { + while (true) { +#ifdef FEDERATED_DECENTRALIZED + // Apply the STA, if needed. + interval_t wait_until_time = lf_wait_until_time(next_tag); +#else // not FEDERATED_DECENTRALIZED + interval_t wait_until_time = next_tag.time; +#endif // FEDERATED_DECENTRALIZED + LF_PRINT_LOG("Waiting until elapsed time " PRINTF_TIME ".", (wait_until_time - start_time)); + if (wait_until(wait_until_time, &env->event_q_changed)) { + // Waited the full time. + break; + } LF_PRINT_DEBUG("_lf_next_locked(): Wait until time interrupted."); // Sleep was interrupted. Check for a new next_event. // The interruption could also have been due to a call to lf_request_stop(). @@ -611,13 +582,11 @@ void _lf_initialize_start_tag(environment_t* env) { _lf_initialize_timers(env); - env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; - #if defined FEDERATED_DECENTRALIZED // If we have a non-zero STA offset, then we need to allow messages to arrive // at the start time. To avoid spurious STP violations, we temporarily // set the current time back by the STA offset. - env->current_tag.time -= lf_fed_STA_offset; + env->current_tag.time = lf_time_subtract(env->current_tag.time, lf_fed_STA_offset); #else // For other than federated decentralized execution, there is no lf_fed_STA_offset variable defined. // To use uniform code below, we define it here as a local variable. @@ -625,23 +594,15 @@ void _lf_initialize_start_tag(environment_t* env) { #endif LF_PRINT_LOG("Waiting for start time " PRINTF_TIME ".", start_time); - // Call wait_until if federated. This is required because the startup procedure + // Wait until the start time. This is required for federates because the startup procedure // in lf_synchronize_with_other_federates() can decide on a new start_time that is // larger than the current physical time. - // Therefore, if --fast was not specified, wait until physical time matches - // or exceeds the start time. Microstep is ignored. // This wait_until() is deliberately called after most precursor operations // for tag (0,0) are performed (e.g., injecting startup reactions, etc.). // This has two benefits: First, the startup overheads will reduce // the required waiting time. Second, this call releases the mutex lock and allows // other threads (specifically, federate threads that handle incoming p2p messages - // from other federates) to hold the lock and possibly raise a tag barrier. This is - // especially useful if an STA is set properly because the federate will get - // a chance to process incoming messages while utilizing the STA. - - // Here we wait until the start time and also release the environment mutex. - // this means that the other worker threads will be allowed to start. We need - // this to avoid potential deadlock in federated startup. + // from other federates) to hold the lock and possibly raise a tag barrier. while (!wait_until(start_time, &env->event_q_changed)) { }; LF_PRINT_DEBUG("Done waiting for start time + STA offset " PRINTF_TIME ".", start_time + lf_fed_STA_offset); @@ -1062,13 +1023,17 @@ int lf_reactor_c_main(int argc, const char* argv[]) { #endif LF_PRINT_DEBUG("Start time: " PRINTF_TIME "ns", start_time); - struct timespec physical_time_timespec = {start_time / BILLION, start_time % BILLION}; #ifdef MINIMAL_STDLIB lf_print("---- Start execution ----"); #else - lf_print("---- Start execution at time %s---- plus %ld nanoseconds", ctime(&physical_time_timespec.tv_sec), - physical_time_timespec.tv_nsec); + struct timespec physical_time_timespec = {start_time / BILLION, start_time % BILLION}; + struct tm* time_info = localtime(&physical_time_timespec.tv_sec); + char buffer[80]; // Long enough to hold the formatted time string. + // Use strftime rather than ctime because as of C23, ctime is deprecated. + strftime(buffer, sizeof(buffer), "%a %b %d %H:%M:%S %Y", time_info); + + lf_print("---- Start execution on %s ---- plus %ld nanoseconds", buffer, physical_time_timespec.tv_nsec); #endif // MINIMAL_STDLIB // Create and initialize the environments for each enclave @@ -1153,6 +1118,8 @@ int lf_reactor_c_main(int argc, const char* argv[]) { } else { int failure = lf_thread_join(env->thread_ids[j], &worker_thread_exit_status); if (failure) { + // Windows warns that strerror is deprecated but doesn't define strerror_r. + // There seems to be no portable replacement. lf_print_error("Failed to join thread listening for incoming messages: %s", strerror(failure)); } } diff --git a/core/threaded/scheduler_GEDF_NP.c b/core/threaded/scheduler_GEDF_NP.c index e77257209..84afee379 100644 --- a/core/threaded/scheduler_GEDF_NP.c +++ b/core/threaded/scheduler_GEDF_NP.c @@ -228,14 +228,14 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction) { (void)worker_number; // Suppress unused parameter warning. - if (!lf_atomic_bool_compare_and_swap32((int32_t*)&done_reaction->status, queued, inactive)) { + if (!lf_atomic_bool_compare_and_swap((int*)&done_reaction->status, queued, inactive)) { lf_print_error_and_exit("Unexpected reaction status: %d. Expected %d.", done_reaction->status, queued); } } void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) { (void)worker_number; // Suppress unused parameter warning. - if (reaction == NULL || !lf_atomic_bool_compare_and_swap32((int32_t*)&reaction->status, inactive, queued)) { + if (reaction == NULL || !lf_atomic_bool_compare_and_swap((int*)&reaction->status, inactive, queued)) { return; } LF_PRINT_DEBUG("Scheduler: Enqueueing reaction %s, which has level %lld.", reaction->name, LF_LEVEL(reaction->index)); diff --git a/core/threaded/scheduler_NP.c b/core/threaded/scheduler_NP.c index 7edd41a81..fd0ccfb04 100644 --- a/core/threaded/scheduler_NP.c +++ b/core/threaded/scheduler_NP.c @@ -77,7 +77,7 @@ static inline void _lf_sched_insert_reaction(lf_scheduler_t* scheduler, reaction scheduler->indexes[reaction_level] = 0; } #endif - int reaction_q_level_index = lf_atomic_fetch_add32((int32_t*)&scheduler->indexes[reaction_level], 1); + int reaction_q_level_index = lf_atomic_fetch_add((int*)&scheduler->indexes[reaction_level], 1); assert(reaction_q_level_index >= 0); LF_PRINT_DEBUG("Scheduler: Accessing triggered reactions at the level %zu with index %d.", reaction_level, reaction_q_level_index); @@ -159,7 +159,7 @@ static void _lf_sched_signal_stop(lf_scheduler_t* scheduler) { * Advance tag if there are no reactions in the array of reaction vectors. If * there are such reactions, distribute them to worker threads. * - * This function assumes the caller does not hold the 'mutex' lock. + * This function assumes the caller does not hold the mutex lock. */ static void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* scheduler) { // Reset the index @@ -203,7 +203,7 @@ static void _lf_scheduler_try_advance_tag_and_distribute(lf_scheduler_t* schedul static void _lf_sched_wait_for_work(lf_scheduler_t* scheduler, size_t worker_number) { // Increment the number of idle workers by 1 and check if this is the last // worker thread to become idle. - if (lf_atomic_add_fetch32((int32_t*)&scheduler->number_of_idle_workers, 1) == (int)scheduler->number_of_workers) { + if (lf_atomic_add_fetch((int*)&scheduler->number_of_idle_workers, 1) == (int)scheduler->number_of_workers) { // Last thread to go idle LF_PRINT_DEBUG("Scheduler: Worker %zu is the last idle thread.", worker_number); // Call on the scheduler to distribute work or advance tag. @@ -322,7 +322,7 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu // the current level (if there is a causality loop) LF_MUTEX_LOCK(&scheduler->custom_data->array_of_mutexes[current_level]); #endif - int current_level_q_index = lf_atomic_add_fetch32((int32_t*)&scheduler->indexes[current_level], -1); + int current_level_q_index = lf_atomic_add_fetch((int*)&scheduler->indexes[current_level], -1); if (current_level_q_index >= 0) { LF_PRINT_DEBUG("Scheduler: Worker %d popping reaction with level %zu, index " "for level: %d.", @@ -361,7 +361,7 @@ reaction_t* lf_sched_get_ready_reaction(lf_scheduler_t* scheduler, int worker_nu */ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction) { (void)worker_number; - if (!lf_atomic_bool_compare_and_swap32((int32_t*)&done_reaction->status, queued, inactive)) { + if (!lf_atomic_bool_compare_and_swap((int*)&done_reaction->status, queued, inactive)) { lf_print_error_and_exit("Unexpected reaction status: %d. Expected %d.", done_reaction->status, queued); } } @@ -388,7 +388,7 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) { (void)worker_number; - if (reaction == NULL || !lf_atomic_bool_compare_and_swap32((int32_t*)&reaction->status, inactive, queued)) { + if (reaction == NULL || !lf_atomic_bool_compare_and_swap((int*)&reaction->status, inactive, queued)) { return; } LF_PRINT_DEBUG("Scheduler: Enqueueing reaction %s, which has level %lld.", reaction->name, LF_LEVEL(reaction->index)); diff --git a/core/threaded/scheduler_adaptive.c b/core/threaded/scheduler_adaptive.c index 1f90c90a6..5a926aba6 100644 --- a/core/threaded/scheduler_adaptive.c +++ b/core/threaded/scheduler_adaptive.c @@ -207,7 +207,7 @@ static void worker_assignments_free(lf_scheduler_t* scheduler) { static reaction_t* get_reaction(lf_scheduler_t* scheduler, size_t worker) { worker_assignments_t* worker_assignments = scheduler->custom_data->worker_assignments; #ifndef FEDERATED - int index = lf_atomic_add_fetch32((int32_t*)(worker_assignments->num_reactions_by_worker + worker), -1); + int index = lf_atomic_add_fetch(worker_assignments->num_reactions_by_worker + worker, -1); if (index >= 0) { return worker_assignments->reactions_by_worker[worker][index]; } @@ -223,9 +223,9 @@ static reaction_t* get_reaction(lf_scheduler_t* scheduler, size_t worker) { old_num_reactions = current_num_reactions; if (old_num_reactions <= 0) return NULL; - } while ((current_num_reactions = lf_atomic_val_compare_and_swap32( - (int32_t*)(worker_assignments->num_reactions_by_worker + worker), old_num_reactions, - (index = old_num_reactions - 1))) != old_num_reactions); + } while ((current_num_reactions = + lf_atomic_val_compare_and_swap(worker_assignments->num_reactions_by_worker + worker, old_num_reactions, + (index = old_num_reactions - 1))) != old_num_reactions); return worker_assignments->reactions_by_worker[worker][index]; #endif } @@ -282,7 +282,7 @@ static void worker_assignments_put(lf_scheduler_t* scheduler, reaction_t* reacti hash = hash ^ (hash >> 31); size_t worker = hash % worker_assignments->num_workers_by_level[level]; size_t num_preceding_reactions = - lf_atomic_fetch_add32((int32_t*)&worker_assignments->num_reactions_by_worker_by_level[level][worker], 1); + lf_atomic_fetch_add(&worker_assignments->num_reactions_by_worker_by_level[level][worker], 1); worker_assignments->reactions_by_worker_by_level[level][worker][num_preceding_reactions] = reaction; } @@ -383,7 +383,7 @@ static bool worker_states_finished_with_level_locked(lf_scheduler_t* scheduler, assert(((int64_t)worker_assignments->num_reactions_by_worker[worker]) <= 0); // Why use an atomic operation when we are supposed to be "as good as locked"? Because I took a // shortcut, and the shortcut was imperfect. - size_t ret = lf_atomic_add_fetch32((int32_t*)&worker_states->num_loose_threads, -1); + size_t ret = lf_atomic_add_fetch(&worker_states->num_loose_threads, -1); assert(ret <= worker_assignments->max_num_workers); // Check for underflow return !ret; } @@ -726,7 +726,7 @@ void lf_sched_done_with_reaction(size_t worker_number, reaction_t* done_reaction void lf_scheduler_trigger_reaction(lf_scheduler_t* scheduler, reaction_t* reaction, int worker_number) { LF_ASSERT(worker_number >= -1, "Sched: Invalid worker number"); - if (!lf_atomic_bool_compare_and_swap32((int32_t*)&reaction->status, inactive, queued)) + if (!lf_atomic_bool_compare_and_swap((int*)&reaction->status, inactive, queued)) return; worker_assignments_put(scheduler, reaction); } diff --git a/core/threaded/scheduler_sync_tag_advance.c b/core/threaded/scheduler_sync_tag_advance.c index cc91c88f0..1af8bcb32 100644 --- a/core/threaded/scheduler_sync_tag_advance.c +++ b/core/threaded/scheduler_sync_tag_advance.c @@ -20,16 +20,19 @@ void _lf_next_locked(struct environment_t* env); /** - * @brief Indicator that execution of at least one tag has completed. + * @brief Indicator that execution of the specified tag has completed. */ -static bool _latest_tag_completed = false; +static tag_t _latest_tag_completed = NEVER_TAG_INITIALIZER; bool should_stop_locked(lf_scheduler_t* sched) { // If this is not the very first step, check against the stop tag to see whether this is the last step. - if (_latest_tag_completed) { + // Also, stop only after completing the stop tag. + if (lf_tag_compare(_latest_tag_completed, sched->env->current_tag) == 0) { // If we are at the stop tag, do not call _lf_next_locked() // to prevent advancing the logical time. if (lf_tag_compare(sched->env->current_tag, sched->env->stop_tag) >= 0) { + LF_PRINT_DEBUG("****************** Stopping execution at tag " PRINTF_TAG, + sched->env->current_tag.time - lf_time_start(), sched->env->current_tag.microstep); return true; } } @@ -50,11 +53,10 @@ bool _lf_sched_advance_tag_locked(lf_scheduler_t* sched) { return true; } - _latest_tag_completed = true; + _latest_tag_completed = env->current_tag; // Advance time. - // _lf_next_locked() may block waiting for real time to pass or events to appear. - // to appear on the event queue. Note that we already + // _lf_next_locked() may block waiting for real time to pass or events to appear on the event queue. tracepoint_scheduler_advancing_time_starts(env); _lf_next_locked(env); tracepoint_scheduler_advancing_time_ends(env); diff --git a/core/utils/lf_semaphore.c b/core/utils/lf_semaphore.c index 2d0255ecb..3d79f9e4c 100644 --- a/core/utils/lf_semaphore.c +++ b/core/utils/lf_semaphore.c @@ -41,7 +41,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * @param count The count to start with. * @return lf_semaphore_t* Can be NULL on error. */ -lf_semaphore_t* lf_semaphore_new(int count) { +lf_semaphore_t* lf_semaphore_new(size_t count) { lf_semaphore_t* semaphore = (lf_semaphore_t*)malloc(sizeof(lf_semaphore_t)); LF_MUTEX_INIT(&semaphore->mutex); LF_COND_INIT(&semaphore->cond, &semaphore->mutex); @@ -55,7 +55,7 @@ lf_semaphore_t* lf_semaphore_new(int count) { * @param semaphore Instance of a semaphore * @param i The count to add. */ -void lf_semaphore_release(lf_semaphore_t* semaphore, int i) { +void lf_semaphore_release(lf_semaphore_t* semaphore, size_t i) { assert(semaphore != NULL); LF_MUTEX_LOCK(&semaphore->mutex); semaphore->count += i; diff --git a/core/utils/util.c b/core/utils/util.c index cc2529a59..f7c260af8 100644 --- a/core/utils/util.c +++ b/core/utils/util.c @@ -206,6 +206,8 @@ void lf_print_error_system_failure(const char* format, ...) { va_start(args, format); lf_vprint_error(format, args); va_end(args); + // Windows warns that strerror is deprecated but doesn't define strerror_r. + // There seems to be no portable replacement. lf_print_error_and_exit("Error %d: %s", errno, strerror(errno)); exit(EXIT_FAILURE); } diff --git a/include/api/reaction_macros.h b/include/api/reaction_macros.h index 37d90258d..52c39e125 100644 --- a/include/api/reaction_macros.h +++ b/include/api/reaction_macros.h @@ -77,6 +77,7 @@ /* The cast "*((void**) &out->value)" is a hack to make the code */ \ /* compile with non-token types where value is not a pointer. */ \ lf_token_t* token = _lf_initialize_token_with_value((token_template_t*)out, *((void**)&out->value), 1); \ + out->token = token; \ } \ } while (0) @@ -97,6 +98,7 @@ do { \ lf_set_present(out); \ lf_token_t* token = _lf_initialize_token_with_value((token_template_t*)out, val, len); \ + out->token = token; \ out->value = token->value; \ out->length = len; \ } while (0) @@ -105,6 +107,7 @@ do { \ lf_set_present(out); \ lf_token_t* token = _lf_initialize_token_with_value((token_template_t*)out, val, len); \ + out->token = token; \ out->value = static_castvalue)>(token->value); \ out->length = len; \ } while (0) diff --git a/include/core/environment.h b/include/core/environment.h index 8099eed26..9f3960a6f 100644 --- a/include/core/environment.h +++ b/include/core/environment.h @@ -112,6 +112,13 @@ int environment_init(environment_t* env, const char* name, int id, int num_worke int num_is_present_fields, int num_modes, int num_state_resets, int num_watchdogs, const char* trace_file_name); +/** + * @brief Verify that the environment is correctly set up. + * + * @param env + */ +void environment_verify(environment_t* env); + /** * @brief Free the dynamically allocated memory on the environment struct. * @param env The environment in which we are executing. diff --git a/include/core/federated/federate.h b/include/core/federated/federate.h index 8ab5dab2e..50c59daa1 100644 --- a/include/core/federated/federate.h +++ b/include/core/federated/federate.h @@ -215,11 +215,6 @@ extern lf_mutex_t lf_outbound_socket_mutex; */ extern lf_cond_t lf_port_status_changed; -/** - * Condition variable for blocking on tag advance in - */ -extern lf_cond_t lf_current_tag_changed; - ////////////////////////////////////////////////////////////////////////////////// // Public functions (in alphabetical order) @@ -529,4 +524,19 @@ void lf_synchronize_with_other_federates(); */ bool lf_update_max_level(tag_t tag, bool is_provisional); +#ifdef FEDERATED_DECENTRALIZED +/** + * @brief Return the physical time that we should wait until before advancing to the specified tag. + * + * This function adds the STA offset (STP_offset parameter) to the time of the specified tag unless + * the tag is the starting tag (it is always safe to advance to the starting tag). It also avoids + * adding the STA offset if all network input ports are known at least up to one microstep earlier + * than the specified tag. + * + * This function assumes that the caller holds the environment mutex. + * @param time The specified time. + */ +instant_t lf_wait_until_time(tag_t tag); +#endif // FEDERATED_DECENTRALIZED + #endif // FEDERATE_H diff --git a/include/core/lf_token.h b/include/core/lf_token.h index 219538dd3..49069fa95 100644 --- a/include/core/lf_token.h +++ b/include/core/lf_token.h @@ -108,9 +108,9 @@ typedef struct lf_token_t { * A record of the subset of channels of a multiport that have present inputs. */ typedef struct lf_sparse_io_record_t { - int size; // -1 if overflowed. 0 if empty. - size_t capacity; // Max number of writes to be considered sparse. - size_t present_channels[]; // Array of channel indices that are present. + int size; // -1 if overflowed. 0 if empty. + size_t capacity; // Max number of writes to be considered sparse. + size_t* present_channels; // Array of channel indices that are present. } lf_sparse_io_record_t; /** @@ -151,20 +151,6 @@ typedef struct lf_port_base_t { ////////////////////////////////////////////////////////// //// Global variables -/** - * @brief List of tokens created within reactions that must be freed. - * Tokens created by lf_writable_copy, which is automatically invoked - * when an input is mutable, must have their reference count decremented - * at the end of a tag (or the beginning of the next tag). - * Otherwise, their memory could leak. If they are passed on to - * an output or to a call to lf_schedule during the reaction, then - * those will also result in incremented reference counts, enabling - * the token to live on until used. For example, a new token created - * by lf_writable_copy could become the new template token for an output - * via a call to lf_set. - */ -extern lf_token_t* _lf_tokens_allocated_in_reactions; - /** * Counter used to issue a warning if memory is * allocated for tokens and never freed. Note that diff --git a/include/core/lf_types.h b/include/core/lf_types.h index a3a103041..b6d754ca2 100644 --- a/include/core/lf_types.h +++ b/include/core/lf_types.h @@ -298,6 +298,7 @@ typedef struct { trigger_t* trigger; // THIS HAS TO MATCH lf_action_internal_t self_base_t* parent; bool has_value; + int source_id; // Used only for federated network input actions. } lf_action_base_t; /** diff --git a/include/core/threaded/reactor_threaded.h b/include/core/threaded/reactor_threaded.h index 0d58f7431..2f5463165 100644 --- a/include/core/threaded/reactor_threaded.h +++ b/include/core/threaded/reactor_threaded.h @@ -80,7 +80,35 @@ void _lf_decrement_tag_barrier_locked(environment_t* env); int _lf_wait_on_tag_barrier(environment_t* env, tag_t proposed_tag); void lf_synchronize_with_other_federates(void); -bool wait_until(instant_t logical_time_ns, lf_cond_t* condition); + +/** + * @brief Wait until physical time matches or exceeds the time of the specified tag. + * + * If -fast is given, there will be no wait. + * + * If an event is put on the event queue during the wait, then the wait is + * interrupted and this function returns false. It also returns false if the + * timeout time is reached before the wait has completed. Note this this could + * return true even if the a new event was placed on the queue. This will occur + * if that event time matches or exceeds the specified time. + * + * The mutex lock associated with the condition argument is assumed to be held by + * the calling thread. This mutex is released while waiting. If the wait time is + * too small to actually wait (less than MIN_SLEEP_DURATION), then this function + * immediately returns true and the mutex is not released. + * + * @param env Environment within which we are executing. + * @param wait_until_time The time to wait until physical time matches it. + * @param condition A condition variable that can interrupt the wait. The mutex + * associated with this condition variable will be released during the wait. + * + * @return Return false if the wait is interrupted either because of an event + * queue signal or if the wait time was interrupted early by reaching + * the stop time, if one was specified. Return true if the full wait time + * was reached. + */ +bool wait_until(instant_t wait_until_time, lf_cond_t* condition); + tag_t get_next_event_tag(environment_t* env); tag_t send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply); void _lf_next_locked(environment_t* env); diff --git a/include/core/utils/impl/hashmap.h b/include/core/utils/impl/hashmap.h index e64774887..94d5969a7 100644 --- a/include/core/utils/impl/hashmap.h +++ b/include/core/utils/impl/hashmap.h @@ -19,7 +19,7 @@ #define V void* #endif #ifndef HASH_OF -#define HASH_OF(key) (size_t) key +#define HASH_OF(key) (size_t)key #endif #ifndef HASHMAP #define HASHMAP(token) hashmap##_##token diff --git a/include/core/utils/impl/pointer_hashmap.h b/include/core/utils/impl/pointer_hashmap.h index 2184518b3..c2a60aef1 100644 --- a/include/core/utils/impl/pointer_hashmap.h +++ b/include/core/utils/impl/pointer_hashmap.h @@ -30,7 +30,7 @@ #define HASHMAP(token) hashmap_object2int##_##token #define K void* #define V int -#define HASH_OF(key) (size_t) key +#define HASH_OF(key) (size_t)key #include "hashmap.h" #undef HASHMAP #undef K diff --git a/include/core/utils/lf_semaphore.h b/include/core/utils/lf_semaphore.h index 73d3e4eb4..341c43cc4 100644 --- a/include/core/utils/lf_semaphore.h +++ b/include/core/utils/lf_semaphore.h @@ -41,7 +41,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include typedef struct { - int count; + size_t count; lf_mutex_t mutex; lf_cond_t cond; } lf_semaphore_t; @@ -52,7 +52,7 @@ typedef struct { * @param count The count to start with. * @return lf_semaphore_t* Can be NULL on error. */ -lf_semaphore_t* lf_semaphore_new(int count); +lf_semaphore_t* lf_semaphore_new(size_t count); /** * @brief Release the 'semaphore' and add 'i' to its count. @@ -60,7 +60,7 @@ lf_semaphore_t* lf_semaphore_new(int count); * @param semaphore Instance of a semaphore * @param i The count to add. */ -void lf_semaphore_release(lf_semaphore_t* semaphore, int i); +void lf_semaphore_release(lf_semaphore_t* semaphore, size_t i); /** * @brief Acquire the 'semaphore'. Will block if count is 0. diff --git a/lingua-franca-ref.txt b/lingua-franca-ref.txt index 1f7391f92..8b25206ff 100644 --- a/lingua-franca-ref.txt +++ b/lingua-franca-ref.txt @@ -1 +1 @@ -master +master \ No newline at end of file diff --git a/logging/api/logging_macros.h b/logging/api/logging_macros.h index 6f7ea1eba..3e22950b5 100644 --- a/logging/api/logging_macros.h +++ b/logging/api/logging_macros.h @@ -1,3 +1,5 @@ +#ifndef LOGGING_MACROS_H +#define LOGGING_MACROS_H #include "logging.h" /** @@ -12,6 +14,11 @@ #define LOG_LEVEL LOG_LEVEL_INFO #endif +// To prevent warnings "conditional expression is constant", we define static booleans +// here instead of directly testing LOG_LEVEL in the if statements in the macros below. +static const bool _lf_log_level_is_log = LOG_LEVEL >= LOG_LEVEL_LOG; +static const bool _lf_log_level_is_debug = LOG_LEVEL >= LOG_LEVEL_DEBUG; + /** * A macro used to print useful logging information. It can be enabled * by setting the target property 'logging' to 'LOG' or @@ -31,7 +38,7 @@ */ #define LF_PRINT_LOG(format, ...) \ do { \ - if (LOG_LEVEL >= LOG_LEVEL_LOG) { \ + if (_lf_log_level_is_log) { \ lf_print_log(format, ##__VA_ARGS__); \ } \ } while (0) @@ -54,7 +61,7 @@ */ #define LF_PRINT_DEBUG(format, ...) \ do { \ - if (LOG_LEVEL >= LOG_LEVEL_DEBUG) { \ + if (_lf_log_level_is_debug) { \ lf_print_debug(format, ##__VA_ARGS__); \ } \ } while (0) @@ -100,3 +107,4 @@ } \ } while (0) #endif // NDEBUG +#endif // LOGGING_MACROS_H \ No newline at end of file diff --git a/low_level_platform/api/CMakeLists.txt b/low_level_platform/api/CMakeLists.txt index 9f2172bce..599f87e59 100644 --- a/low_level_platform/api/CMakeLists.txt +++ b/low_level_platform/api/CMakeLists.txt @@ -7,6 +7,8 @@ if(${CMAKE_SYSTEM_NAME} STREQUAL "nRF52") target_compile_definitions(lf-low-level-platform-api INTERFACE PLATFORM_NRF52) elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Zephyr") target_compile_definitions(lf-low-level-platform-api INTERFACE PLATFORM_ZEPHYR) +elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Patmos") + target_compile_definitions(lf-low-level-platform-api INTERFACE PLATFORM_PATMOS) elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Rp2040") target_compile_definitions(lf-low-level-platform-api INTERFACE PLATFORM_RP2040) target_link_libraries(lf-low-level-platform-api INTERFACE pico_stdlib) diff --git a/low_level_platform/api/low_level_platform.h b/low_level_platform/api/low_level_platform.h index 9611870cc..afffd2a9e 100644 --- a/low_level_platform/api/low_level_platform.h +++ b/low_level_platform/api/low_level_platform.h @@ -48,6 +48,8 @@ int lf_critical_section_exit(environment_t* env); #include "platform/lf_zephyr_support.h" #elif defined(PLATFORM_NRF52) #include "platform/lf_nrf52_support.h" +#elif defined(PLATFORM_PATMOS) +#include "platform/lf_patmos_support.h" #elif defined(PLATFORM_RP2040) #include "platform/lf_rp2040_support.h" #elif defined(PLATFORM_FLEXPRET) diff --git a/low_level_platform/api/platform/lf_atomic.h b/low_level_platform/api/platform/lf_atomic.h index 391678293..e40de9b25 100644 --- a/low_level_platform/api/platform/lf_atomic.h +++ b/low_level_platform/api/platform/lf_atomic.h @@ -11,14 +11,14 @@ #include /** - * @brief Atomically fetch a 32bit integer from memory and add a value to it. + * @brief Atomically fetch an integer from memory and add a value to it. * Return the value that was previously in memory. * * @param ptr A pointer to the memory location. * @param val The value to be added. * @return The value previously in memory. */ -int32_t lf_atomic_fetch_add32(int32_t* ptr, int32_t val); +int lf_atomic_fetch_add(int* ptr, int val); /** * @brief Atomically fetch 64-bit integer from memory and add a value to it. @@ -31,14 +31,14 @@ int32_t lf_atomic_fetch_add32(int32_t* ptr, int32_t val); int64_t lf_atomic_fetch_add64(int64_t* ptr, int64_t val); /** - * @brief Atomically fetch a 32-bit integer from memory and add a value to it. + * @brief Atomically fetch an integer from memory and add a value to it. * Return the new value of the memory. * * @param ptr A pointer to the memory location. * @param val The value to be added. * @return The new value in memory. */ -int32_t lf_atomic_add_fetch32(int32_t* ptr, int32_t val); +int lf_atomic_add_fetch(int* ptr, int val); /** * @brief Atomically fetch a 64-bit integer from memory and add a value to it. @@ -60,7 +60,7 @@ int64_t lf_atomic_add_fetch64(int64_t* ptr, int64_t val); * @param newval The value to swap in. * @return Whether a swap was performed or not. */ -bool lf_atomic_bool_compare_and_swap32(int32_t* ptr, int32_t oldval, int32_t newval); +bool lf_atomic_bool_compare_and_swap(int* ptr, int oldval, int newval); /** * @brief Atomically perform a compare-and-swap operation on a 64 bit integer in @@ -75,7 +75,7 @@ bool lf_atomic_bool_compare_and_swap32(int32_t* ptr, int32_t oldval, int32_t new bool lf_atomic_bool_compare_and_swap64(int64_t* ptr, int64_t oldval, int64_t newval); /** - * @brief Atomically perform a compare-and-swap operation on a 32 bit integer in + * @brief Atomically perform a compare-and-swap operation on an integer in * memory. If the value in memory is equal to `oldval` replace it with `newval`. * Return the content of the memory before the potential swap operation is * performed. @@ -85,7 +85,7 @@ bool lf_atomic_bool_compare_and_swap64(int64_t* ptr, int64_t oldval, int64_t new * @param newval The value to swap in. * @return The value in memory prior to the swap. */ -int32_t lf_atomic_val_compare_and_swap32(int32_t* ptr, int32_t oldval, int32_t newval); +int lf_atomic_val_compare_and_swap(int* ptr, int oldval, int newval); /** * @brief Atomically perform a compare-and-swap operation on a 64 bit integer in diff --git a/low_level_platform/api/platform/lf_patmos_support.h b/low_level_platform/api/platform/lf_patmos_support.h new file mode 100644 index 000000000..afd7e1ce0 --- /dev/null +++ b/low_level_platform/api/platform/lf_patmos_support.h @@ -0,0 +1,52 @@ + +/* Patmos API support for the C target of Lingua Franca. */ + +/************* +Copyright (c) 2024, The University of California at Berkeley. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +***************/ + +/** + * Patmos API support for the C target of Lingua Franca. + * + * This is based on lf_nrf_support.h in icyphy/lf-buckler. + * + * @author{Ehsan Khodadad } + * @author{Luca Pezzarossa } + * @author{Martin Schoeberl } + */ + +#ifndef LF_PATMOS_SUPPORT_H +#define LF_PATMOS_SUPPORT_H + +// This embedded platform has no TTY suport +#define NO_TTY + +#include // For fixed-width integral types +#include + +#include // Needed to define PRId64 and PRIu32 +#define PRINTF_TIME "%" PRId64 +#define PRINTF_MICROSTEP "%" PRIu32 +#define PRINTF_TAG "(%" PRId64 ", %" PRIu32 ")" + +#endif // LF_PATMOS_SUPPORT_H diff --git a/low_level_platform/api/platform/lf_zephyr_support.h b/low_level_platform/api/platform/lf_zephyr_support.h index 724bbe4e5..44d91bcbd 100644 --- a/low_level_platform/api/platform/lf_zephyr_support.h +++ b/low_level_platform/api/platform/lf_zephyr_support.h @@ -50,6 +50,8 @@ typedef struct { } lf_cond_t; typedef struct k_thread* lf_thread_t; +void _lf_initialize_clock_zephyr_common(); + #endif // !LF_SINGLE_THREADED #endif // LF_ZEPHYR_SUPPORT_H diff --git a/low_level_platform/impl/CMakeLists.txt b/low_level_platform/impl/CMakeLists.txt index c0f2d8bb5..ac035dcf4 100644 --- a/low_level_platform/impl/CMakeLists.txt +++ b/low_level_platform/impl/CMakeLists.txt @@ -44,8 +44,13 @@ elseif(${CMAKE_SYSTEM_NAME} STREQUAL "FlexPRET") ${CMAKE_CURRENT_LIST_DIR}/src/lf_flexpret_support.c ${CMAKE_CURRENT_LIST_DIR}/src/lf_atomic_irq.c ) +elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Patmos") + set(LF_LOW_LEVEL_PLATFORM_FILES + ${CMAKE_CURRENT_LIST_DIR}/src/lf_patmos_support.c + ${CMAKE_CURRENT_LIST_DIR}/src/lf_atomic_irq.c + ) else() - message(FATAL_ERROR "Your platform is not supported! The C target supports FlexPRET, Linux, MacOS, nRF52, RP2040, Windows, and Zephyr.") + message(FATAL_ERROR "Your platform is not supported! The C target supports FlexPRET, Patmos, Linux, MacOS, nRF52, RP2040, Windows, and Zephyr.") endif() list(APPEND LF_LOW_LEVEL_PLATFORM_FILES ${CMAKE_CURRENT_LIST_DIR}/src/lf_platform_util.c) @@ -56,6 +61,7 @@ if(${CMAKE_SYSTEM_NAME} STREQUAL "Zephyr") else() message(STATUS "Building Zephyr library with Kernel clock ") endif() + zephyr_library_named(lf-low-level-platform-impl) zephyr_library_sources(${LF_LOW_LEVEL_PLATFORM_FILES}) zephyr_library_link_libraries(kernel) @@ -93,8 +99,11 @@ elseif(${CMAKE_SYSTEM_NAME} STREQUAL "FlexPRET") ) endif() endif() +elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Patmos") + add_library(lf-low-level-platform-impl STATIC ${LF_LOW_LEVEL_PLATFORM_FILES}) else() add_library(lf-low-level-platform-impl STATIC ${LF_LOW_LEVEL_PLATFORM_FILES}) + # Link the platform to a threading library if(NOT DEFINED LF_SINGLE_THREADED OR DEFINED LF_TRACE) find_package(Threads REQUIRED) @@ -116,6 +125,7 @@ macro(low_level_platform_define X) target_compile_definitions(lf-low-level-platform-impl PUBLIC ${X}=${${X}}) endif(DEFINED ${X}) endmacro() + low_level_platform_define(LF_SINGLE_THREADED) low_level_platform_define(LOG_LEVEL) low_level_platform_define(MODAL_REACTORS) diff --git a/low_level_platform/impl/src/lf_atomic_gcc_clang.c b/low_level_platform/impl/src/lf_atomic_gcc_clang.c index 30d671a8a..bca144459 100644 --- a/low_level_platform/impl/src/lf_atomic_gcc_clang.c +++ b/low_level_platform/impl/src/lf_atomic_gcc_clang.c @@ -11,17 +11,17 @@ #include "platform/lf_atomic.h" #include "low_level_platform.h" -int32_t lf_atomic_fetch_add32(int32_t* ptr, int32_t value) { return __sync_fetch_and_add(ptr, value); } +int lf_atomic_fetch_add(int* ptr, int value) { return __sync_fetch_and_add(ptr, value); } int64_t lf_atomic_fetch_add64(int64_t* ptr, int64_t value) { return __sync_fetch_and_add(ptr, value); } -int32_t lf_atomic_add_fetch32(int32_t* ptr, int32_t value) { return __sync_add_and_fetch(ptr, value); } +int lf_atomic_add_fetch(int* ptr, int value) { return __sync_add_and_fetch(ptr, value); } int64_t lf_atomic_add_fetch64(int64_t* ptr, int64_t value) { return __sync_add_and_fetch(ptr, value); } -bool lf_atomic_bool_compare_and_swap32(int32_t* ptr, int32_t oldval, int32_t newval) { +bool lf_atomic_bool_compare_and_swap(int* ptr, int oldval, int newval) { return __sync_bool_compare_and_swap(ptr, oldval, newval); } bool lf_atomic_bool_compare_and_swap64(int64_t* ptr, int64_t oldval, int64_t newval) { return __sync_bool_compare_and_swap(ptr, oldval, newval); } -int32_t lf_atomic_val_compare_and_swap32(int32_t* ptr, int32_t oldval, int32_t newval) { +int lf_atomic_val_compare_and_swap(int* ptr, int oldval, int newval) { return __sync_val_compare_and_swap(ptr, oldval, newval); } int64_t lf_atomic_val_compare_and_swap64(int64_t* ptr, int64_t oldval, int64_t newval) { diff --git a/low_level_platform/impl/src/lf_atomic_irq.c b/low_level_platform/impl/src/lf_atomic_irq.c index 2854a6f11..7d78ab445 100644 --- a/low_level_platform/impl/src/lf_atomic_irq.c +++ b/low_level_platform/impl/src/lf_atomic_irq.c @@ -1,5 +1,5 @@ #if defined(PLATFORM_ARDUINO) || defined(PLATFORM_NRF52) || defined(PLATFORM_ZEPHYR) || defined(PLATFORM_RP2040) || \ - defined(PLATFORM_FLEXPRET) + defined(PLATFORM_FLEXPRET) || defined(PLATFORM_PATMOS) /** * @author Erling Rennemo Jellum * @copyright (c) 2023 @@ -17,9 +17,9 @@ int lf_disable_interrupts_nested(); int lf_enable_interrupts_nested(); -int32_t lf_atomic_fetch_add32(int32_t* ptr, int32_t value) { +int lf_atomic_fetch_add(int* ptr, int value) { lf_disable_interrupts_nested(); - int32_t res = *ptr; + int res = *ptr; *ptr += value; lf_enable_interrupts_nested(); return res; @@ -33,7 +33,7 @@ int64_t lf_atomic_fetch_add64(int64_t* ptr, int64_t value) { return res; } -int32_t lf_atomic_add_fetch32(int32_t* ptr, int32_t value) { +int lf_atomic_add_fetch(int* ptr, int value) { lf_disable_interrupts_nested(); int res = *ptr + value; *ptr = res; @@ -49,7 +49,7 @@ int64_t lf_atomic_add_fetch64(int64_t* ptr, int64_t value) { return res; } -bool lf_atomic_bool_compare_and_swap32(int32_t* ptr, int32_t oldval, int32_t newval) { +bool lf_atomic_bool_compare_and_swap(int* ptr, int oldval, int newval) { lf_disable_interrupts_nested(); bool res = false; if ((*ptr) == oldval) { @@ -71,7 +71,7 @@ bool lf_atomic_bool_compare_and_swap64(int64_t* ptr, int64_t oldval, int64_t new return res; } -int32_t lf_atomic_val_compare_and_swap32(int32_t* ptr, int32_t oldval, int32_t newval) { +int lf_atomic_val_compare_and_swap(int* ptr, int oldval, int newval) { lf_disable_interrupts_nested(); int res = *ptr; if ((*ptr) == oldval) { diff --git a/low_level_platform/impl/src/lf_atomic_windows.c b/low_level_platform/impl/src/lf_atomic_windows.c index 1db0fa2de..ff5a01750 100644 --- a/low_level_platform/impl/src/lf_atomic_windows.c +++ b/low_level_platform/impl/src/lf_atomic_windows.c @@ -10,18 +10,18 @@ #include "platform/lf_atomic.h" #include -int32_t lf_atomic_fetch_add32(int32_t* ptr, int32_t value) { return InterlockedExchangeAdd(ptr, value); } +int lf_atomic_fetch_add(int* ptr, int value) { return InterlockedExchangeAdd((LONG*)ptr, (LONG)value); } int64_t lf_atomic_fetch_add64(int64_t* ptr, int64_t value) { return InterlockedExchangeAdd64(ptr, value); } -int32_t lf_atomic_add_fetch32(int32_t* ptr, int32_t value) { return InterlockedAdd(ptr, value); } +int lf_atomic_add_fetch(int* ptr, int value) { return InterlockedAdd((LONG*)ptr, (LONG)value); } int64_t lf_atomic_add_fetch64(int64_t* ptr, int64_t value) { return InterlockedAdd64(ptr, value); } -bool lf_atomic_bool_compare_and_swap32(int32_t* ptr, int32_t oldval, int32_t newval) { - return (InterlockedCompareExchange(ptr, newval, oldval) == oldval); +bool lf_atomic_bool_compare_and_swap(int* ptr, int oldval, int newval) { + return (InterlockedCompareExchange((LONG*)ptr, (LONG)newval, (LONG)oldval) == oldval); } bool lf_atomic_bool_compare_and_swap64(int64_t* ptr, int64_t oldval, int64_t newval) { return (InterlockedCompareExchange64(ptr, newval, oldval) == oldval); } -int32_t lf_atomic_val_compare_and_swap32(int32_t* ptr, int32_t oldval, int32_t newval) { - return InterlockedCompareExchange(ptr, newval, oldval); +int lf_atomic_val_compare_and_swap(int* ptr, int oldval, int newval) { + return InterlockedCompareExchange((LONG*)ptr, (LONG)newval, (LONG)oldval); } int64_t lf_atomic_val_compare_and_swap64(int64_t* ptr, int64_t oldval, int64_t newval) { return InterlockedCompareExchange64(ptr, newval, oldval); diff --git a/low_level_platform/impl/src/lf_flexpret_support.c b/low_level_platform/impl/src/lf_flexpret_support.c index cf37c1b8a..7fb6d2a48 100644 --- a/low_level_platform/impl/src/lf_flexpret_support.c +++ b/low_level_platform/impl/src/lf_flexpret_support.c @@ -178,10 +178,7 @@ int lf_available_cores() { return FP_THREADS - 1; // Return the number of Flexpret HW threads } -lf_thread_t lf_thread_self() { - // Not implemented. - return NULL; -} +lf_thread_t lf_thread_self() { return read_hartid(); } int lf_thread_create(lf_thread_t* thread, void* (*lf_thread)(void*), void* arguments) { /** diff --git a/low_level_platform/impl/src/lf_patmos_support.c b/low_level_platform/impl/src/lf_patmos_support.c new file mode 100644 index 000000000..6529c95bc --- /dev/null +++ b/low_level_platform/impl/src/lf_patmos_support.c @@ -0,0 +1,138 @@ +#if defined(PLATFORM_PATMOS) +/************* +Copyright (c) 2024, The University of California at Berkeley. + +Redistribution and use in source and binary forms, with or without modification, +are permitted provided that the following conditions are met: + +1. Redistributions of source code must retain the above copyright notice, + this list of conditions and the following disclaimer. + +2. Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY +EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL +THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, +STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF +THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +***************/ + +/** + * @author{Ehsan Khodadad } + * @author{Luca Pezzarossa } + * @author{Martin Schoeberl } + */ +#include +#include +#include +#include "platform/lf_patmos_support.h" +#include "low_level_platform.h" +#include +#include +#include + +// Keep track of physical actions being entered into the system +static volatile bool _lf_async_event = false; +// Keep track of whether we are in a critical section or not +static volatile int _lf_num_nested_critical_sections = 0; +/** + * @brief Sleep until an absolute time. + * Since there is no sleep mode in Patmos, and energy saving is not important for real-time systems, + * we just used a busy sleep. + * + * @param wakeup int64_t time of wakeup + * @return int 0 if successful sleep, -1 if awoken by async event + */ + +int _lf_interruptable_sleep_until_locked(environment_t* env, instant_t wakeup) { + instant_t now; + _lf_async_event = false; + lf_enable_interrupts_nested(); + + // Do busy sleep + do { + _lf_clock_gettime(&now); + } while ((now < wakeup) && !_lf_async_event); + + lf_disable_interrupts_nested(); + + if (_lf_async_event) { + _lf_async_event = false; + return -1; + } else { + return 0; + } +} + +int lf_sleep(interval_t sleep_duration) { + instant_t now; + _lf_clock_gettime(&now); + instant_t wakeup = now + sleep_duration; + + // Do busy sleep + do { + _lf_clock_gettime(&now); + } while ((now < wakeup)); + return 0; +} + +/** + * Pause execution for a number of nanoseconds. + * + * @return 0 for success, or -1 for failure. In case of failure, errno will be + * set appropriately (see `man 2 clock_nanosleep`). + */ +int lf_nanosleep(interval_t requested_time) { return lf_sleep(requested_time); } + +/** + * Patmos clock does not need initialization. + */ +void _lf_initialize_clock() {} + +/** + * Write the current time in nanoseconds into the location given by the argument. + * This returns 0 (it never fails, assuming the argument gives a valid memory location). + */ + +int _lf_clock_gettime(instant_t* t) { + + assert(t != NULL); + + *t = get_cpu_usecs() * 1000; + + return 0; +} + +#if defined(LF_SINGLE_THREADED) + +int lf_disable_interrupts_nested() { + if (_lf_num_nested_critical_sections++ == 0) { + intr_disable(); + } + return 0; +} + +int lf_enable_interrupts_nested() { + if (_lf_num_nested_critical_sections <= 0) { + return 1; + } + + if (--_lf_num_nested_critical_sections == 0) { + intr_enable(); + } + return 0; +} + +int _lf_single_threaded_notify_of_event() { + _lf_async_event = true; + return 0; +} +#endif // LF_SINGLE_THREADED + +#endif // PLATFORM_PATMOS diff --git a/low_level_platform/impl/src/lf_platform_util.c b/low_level_platform/impl/src/lf_platform_util.c index 0225aa423..212e6ea83 100644 --- a/low_level_platform/impl/src/lf_platform_util.c +++ b/low_level_platform/impl/src/lf_platform_util.c @@ -21,6 +21,6 @@ static thread_local int lf_thread_id_var = -1; int lf_thread_id() { return lf_thread_id_var; } -void initialize_lf_thread_id() { lf_thread_id_var = lf_atomic_fetch_add32(&_lf_worker_thread_count, 1); } +void initialize_lf_thread_id() { lf_thread_id_var = lf_atomic_fetch_add(&_lf_worker_thread_count, 1); } #endif #endif diff --git a/low_level_platform/impl/src/lf_windows_support.c b/low_level_platform/impl/src/lf_windows_support.c index 61424ac7f..c4524eda4 100644 --- a/low_level_platform/impl/src/lf_windows_support.c +++ b/low_level_platform/impl/src/lf_windows_support.c @@ -39,6 +39,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include #include #include +#include // For fprintf() #include "platform/lf_windows_support.h" #include "low_level_platform.h" @@ -64,7 +65,7 @@ void _lf_initialize_clock() { if (_lf_use_performance_counter) { _lf_frequency_to_ns = (double)performance_frequency.QuadPart / BILLION; } else { - lf_print_error("High resolution performance counter is not supported on this machine."); + fprintf(stderr, "ERROR: High resolution performance counter is not supported on this machine.\n"); _lf_frequency_to_ns = 0.01; } } @@ -89,9 +90,9 @@ int _lf_clock_gettime(instant_t* t) { } LARGE_INTEGER windows_time; if (_lf_use_performance_counter) { - int result = QueryPerformanceCounter(&windows_time); + result = QueryPerformanceCounter(&windows_time); if (result == 0) { - lf_print_error("_lf_clock_gettime(): Failed to read the value of the physical clock."); + fprintf(stderr, "ERROR: _lf_clock_gettime(): Failed to read the value of the physical clock.\n"); return result; } } else { @@ -140,6 +141,7 @@ int lf_sleep(interval_t sleep_duration) { } int _lf_interruptable_sleep_until_locked(environment_t* env, instant_t wakeup_time) { + (void)env; // Suppress unused variable warning. interval_t sleep_duration = wakeup_time - lf_time_physical(); if (sleep_duration <= 0) { @@ -165,7 +167,11 @@ int lf_available_cores() { lf_thread_t lf_thread_self() { return GetCurrentThread(); } int lf_thread_create(lf_thread_t* thread, void* (*lf_thread)(void*), void* arguments) { - uintptr_t handle = _beginthreadex(NULL, 0, lf_thread, arguments, 0, NULL); + // _beginthreadex requires a function that returns unsigned rather than void*. + // So the following double cast suppresses the warning: + // '_beginthreadex_proc_type' differs in levels of indirection from 'void *(__cdecl *)(void *)' + uintptr_t handle = + _beginthreadex(NULL, 0, (unsigned(__stdcall*)(void*))(uintptr_t(__stdcall*)(void*))lf_thread, arguments, 0, NULL); *thread = (HANDLE)handle; if (handle == 0) { return errno; @@ -183,6 +189,9 @@ int lf_thread_create(lf_thread_t* thread, void* (*lf_thread)(void*), void* argum */ int lf_thread_join(lf_thread_t thread, void** thread_return) { DWORD retvalue = WaitForSingleObject(thread, INFINITE); + if (thread_return != NULL) { + *thread_return = (void*)retvalue; + } if (retvalue == WAIT_FAILED) { return EINVAL; } @@ -192,11 +201,23 @@ int lf_thread_join(lf_thread_t thread, void** thread_return) { /** * Real-time scheduling API not implemented for Windows. */ -int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) { return -1; } +int lf_thread_set_cpu(lf_thread_t thread, size_t cpu_number) { + (void)thread; // Suppress unused variable warning. + (void)cpu_number; // Suppress unused variable warning. + return -1; +} -int lf_thread_set_priority(lf_thread_t thread, int priority) { return -1; } +int lf_thread_set_priority(lf_thread_t thread, int priority) { + (void)thread; // Suppress unused variable warning. + (void)priority; // Suppress unused variable warning. + return -1; +} -int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy) { return -1; } +int lf_thread_set_scheduling_policy(lf_thread_t thread, lf_scheduling_policy_t* policy) { + (void)thread; // Suppress unused variable warning. + (void)policy; // Suppress unused variable warning. + return -1; +} int lf_mutex_init(_lf_critical_section_t* critical_section) { // Set up a recursive mutex @@ -278,10 +299,20 @@ int _lf_cond_timedwait(lf_cond_t* cond, instant_t wakeup_time) { } // convert ns to ms and round up to closest full integer - DWORD wait_duration_ms = (wait_duration + 999999LL) / 1000000LL; + interval_t wait_duration_ms = (wait_duration + 999999LL) / 1000000LL; + DWORD wait_duration_saturated; + if (wait_duration_ms > 0xFFFFFFFFLL) { + // Saturate at 0xFFFFFFFFLL + wait_duration_saturated = (DWORD)0xFFFFFFFFLL; + } else if (wait_duration_ms <= 0) { + // No need to wait. Return indicating that the wait is complete. + return LF_TIMEOUT; + } else { + wait_duration_saturated = (DWORD)wait_duration_ms; + } int return_value = (int)SleepConditionVariableCS((PCONDITION_VARIABLE)&cond->condition, - (PCRITICAL_SECTION)cond->critical_section, wait_duration_ms); + (PCRITICAL_SECTION)cond->critical_section, wait_duration_saturated); if (return_value == 0) { // Error if (GetLastError() == ERROR_TIMEOUT) { diff --git a/low_level_platform/impl/src/lf_zephyr_clock_kernel.c b/low_level_platform/impl/src/lf_zephyr_clock_kernel.c index e23332f81..9dc343bc5 100644 --- a/low_level_platform/impl/src/lf_zephyr_clock_kernel.c +++ b/low_level_platform/impl/src/lf_zephyr_clock_kernel.c @@ -113,6 +113,7 @@ int _lf_interruptable_sleep_until_locked(environment_t* env, instant_t wakeup) { return 0; } else { lf_print_error_and_exit("k_sem_take returned %d", res); + return -1; } } diff --git a/low_level_platform/impl/src/lf_zephyr_support.c b/low_level_platform/impl/src/lf_zephyr_support.c index 5e5efb82d..74ae9bf90 100644 --- a/low_level_platform/impl/src/lf_zephyr_support.c +++ b/low_level_platform/impl/src/lf_zephyr_support.c @@ -36,14 +36,21 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. #include "platform/lf_platform_util.h" #include "low_level_platform.h" #include "tag.h" +#include "logging.h" #include +#include // Keep track of nested critical sections static uint32_t num_nested_critical_sections = 0; // Keep track of IRQ mask when entering critical section so we can enable again after static volatile unsigned irq_mask = 0; +// Catch kernel panics from Zephyr +void k_sys_fatal_error_handler(unsigned int reason, const struct arch_esf* esf) { + lf_print_error_and_exit("Zephyr kernel panic reason=%d", reason); +} + int lf_sleep(interval_t sleep_duration) { k_sleep(K_NSEC(sleep_duration)); return 0; @@ -81,8 +88,12 @@ int lf_enable_interrupts_nested() { // If NUMBER_OF_WORKERS is not specified, or set to 0, then we default to 1. #if !defined(NUMBER_OF_WORKERS) || NUMBER_OF_WORKERS == 0 #undef NUMBER_OF_WORKERS +#if defined(LF_REACTION_GRAPH_BREADTH) +#define NUMBER_OF_WORKERS LF_REACTION_GRAPH_BREADTH +#else #define NUMBER_OF_WORKERS 1 #endif +#endif // If USER_THREADS is not specified, then default to 0. #if !defined(USER_THREADS) @@ -149,9 +160,9 @@ int lf_thread_create(lf_thread_t* thread, void* (*lf_thread)(void*), void* argum int lf_thread_join(lf_thread_t thread, void** thread_return) { return k_thread_join(thread, K_FOREVER); } void initialize_lf_thread_id() { - static int _lf_worker_thread_count = 0; + static int32_t _lf_worker_thread_count = 0; int* thread_id = (int*)malloc(sizeof(int)); - *thread_id = lf_atomic_fetch_add32(&_lf_worker_thread_count, 1); + *thread_id = lf_atomic_fetch_add(&_lf_worker_thread_count, 1); k_thread_custom_data_set(thread_id); } diff --git a/platform/impl/CMakeLists.txt b/platform/impl/CMakeLists.txt index 32753e7eb..3c86b3817 100644 --- a/platform/impl/CMakeLists.txt +++ b/platform/impl/CMakeLists.txt @@ -9,6 +9,9 @@ elseif(${CMAKE_SYSTEM_NAME} STREQUAL "FlexPRET") add_library(lf-platform-impl STATIC) target_sources(lf-platform-impl PUBLIC ${LF_PLATFORM_FILES}) target_link_libraries(lf-platform-impl PRIVATE fp-sdk) +elseif(${CMAKE_SYSTEM_NAME} STREQUAL "Patmos") + add_library(lf-platform-impl STATIC) + target_sources(lf-platform-impl PUBLIC ${LF_PLATFORM_FILES}) else() add_library(lf-platform-impl STATIC) target_sources(lf-platform-impl PUBLIC ${LF_PLATFORM_FILES}) diff --git a/python/include/python_action.h b/python/include/python_action.h index 880bfe149..f0682f9b5 100644 --- a/python/include/python_action.h +++ b/python/include/python_action.h @@ -61,6 +61,7 @@ typedef struct { lf_action_internal_t _base; self_base_t* parent; bool has_value; + int source_id; PyObject* value; FEDERATED_GENERIC_EXTENSION } generic_action_instance_struct; diff --git a/python/lib/modal_models/impl.c b/python/lib/modal_models/impl.c index c8cdff1f2..f47c7135f 100644 --- a/python/lib/modal_models/impl.c +++ b/python/lib/modal_models/impl.c @@ -46,12 +46,14 @@ static PyObject* py_mode_set(PyObject* mode_capsule, PyObject* args) { lf_print_error("Null pointer received."); exit(1); } + Py_INCREF(m->mode); self_base_t* self = PyCapsule_GetPointer(m->lf_self, "lf_self"); if (self == NULL) { lf_print_error("Null pointer received."); exit(1); } + Py_INCREF(m->lf_self); _LF_SET_MODE_WITH_TYPE(mode, m->change_type); @@ -61,6 +63,24 @@ static PyObject* py_mode_set(PyObject* mode_capsule, PyObject* args) { //////////// Python Struct ///////////// +/** + * Called when an mode in Python is to be created. Note that this is not normally + * used because modes are not created in Python. + * + * To initialize the mode_capsule, this function first calls the tp_alloc + * method of type mode_capsule_struct_t and then assign default values of NULL, NULL, 0 + * to the members of the generic_mode_capsule_struct. + */ +PyObject* py_mode_capsule_new(PyTypeObject* type, PyObject* args, PyObject* kwds) { + mode_capsule_struct_t* self = (mode_capsule_struct_t*)type->tp_alloc(type, 0); + if (self != NULL) { + self->mode = NULL; + self->lf_self = NULL; + self->change_type = 0; + } + return (PyObject*)self; +} + /* * The function members of mode_capsule. * The set function is used to set a new mode. @@ -69,6 +89,37 @@ static PyMethodDef mode_capsule_methods[] = { {"set", (PyCFunction)py_mode_set, METH_NOARGS, "Set a new mode."}, {NULL} /* Sentinel */ }; +/** + * Initialize the mode capsule "self" with NULL pointers and default change_type. + */ +static int py_mode_capsule_init(mode_capsule_struct_t* self, PyObject* args, PyObject* kwds) { + self->mode = NULL; + self->lf_self = NULL; + self->change_type = 0; + return 0; +} + +/** + * Called when an mode capsule in Python is deallocated (generally + * called by the Python grabage collector). + * @param self + */ +void py_mode_capsule_dealloc(mode_capsule_struct_t* self) { + Py_XDECREF(self->mode); + Py_XDECREF(self->lf_self); + Py_TYPE(self)->tp_free((PyObject*)self); +} + +/* + * The members of a mode_capsule that are accessible from a Python program, used to define + * a native Python type. + */ +PyMemberDef py_mode_capsule_members[] = { + {"mode", T_OBJECT, offsetof(mode_capsule_struct_t, mode), 0, "The pointer to the C mode struct"}, + {"lf_self", T_OBJECT, offsetof(mode_capsule_struct_t, lf_self), 0, "Pointer to LF self"}, + {NULL} /* Sentinel */ +}; + /* * The definition of mode_capsule type object, which is * used to describe how mode_capsule behaves. @@ -79,7 +130,10 @@ static PyTypeObject mode_capsule_t = { .tp_basicsize = sizeof(mode_capsule_struct_t), .tp_itemsize = 0, .tp_flags = Py_TPFLAGS_DEFAULT, - .tp_new = PyType_GenericNew, + .tp_new = py_mode_capsule_new, + .tp_init = (initproc)py_mode_capsule_init, + .tp_dealloc = (destructor)py_mode_capsule_dealloc, + .tp_members = py_mode_capsule_members, .tp_methods = mode_capsule_methods, }; @@ -100,6 +154,7 @@ void initialize_mode_capsule_t(PyObject* current_module) { if (PyModule_AddObject(current_module, "mode_capsule", (PyObject*)&mode_capsule_t) < 0) { Py_DECREF(&mode_capsule_t); Py_DECREF(current_module); + lf_print_error_and_exit("Failed to initialize mode_capsule."); return; } } @@ -109,16 +164,19 @@ void initialize_mode_capsule_t(PyObject* current_module) { */ PyObject* convert_C_mode_to_py(reactor_mode_t* mode, self_base_t* lf_self, lf_mode_change_type_t change_type) { // Create the mode struct in Python - mode_capsule_struct_t* cap = (mode_capsule_struct_t*)PyObject_GC_New(mode_capsule_struct_t, &mode_capsule_t); + mode_capsule_struct_t* cap = (mode_capsule_struct_t*)PyObject_New(mode_capsule_struct_t, &mode_capsule_t); + if (cap == NULL) { lf_print_error_and_exit("Failed to convert mode."); } + Py_INCREF(cap); // Create the capsule to hold the reactor_mode_t* mode PyObject* capsule = PyCapsule_New(mode, "mode", NULL); if (capsule == NULL) { lf_print_error_and_exit("Failed to convert mode."); } + Py_INCREF(capsule); // Fill in the Python mode struct. cap->mode = capsule; @@ -127,6 +185,7 @@ PyObject* convert_C_mode_to_py(reactor_mode_t* mode, self_base_t* lf_self, lf_mo if (self_capsule == NULL) { lf_print_error_and_exit("Failed to convert self."); } + Py_INCREF(self_capsule); cap->lf_self = self_capsule; cap->change_type = change_type; diff --git a/python/lib/python_port.c b/python/lib/python_port.c index 62f4b4732..be12861d5 100644 --- a/python/lib/python_port.c +++ b/python/lib/python_port.c @@ -94,14 +94,13 @@ PyObject* py_port_set(PyObject* self, PyObject* args) { } if (val) { - LF_PRINT_DEBUG("Setting value %p with reference count %d.", val, (int)Py_REFCNT(val)); - // Py_INCREF(val); // python_count_decrement(port->value); lf_token_t* token = lf_new_token((void*)port, val, 1); lf_set_destructor(port, python_count_decrement); lf_set_token(port, token); Py_INCREF(val); + LF_PRINT_DEBUG("Setting value %p with reference count %d.", val, (int)Py_REFCNT(val)); // Also set the values for the port capsule. p->value = val; @@ -117,9 +116,9 @@ PyObject* py_port_set(PyObject* self, PyObject* args) { * garbage collector). * @param self An instance of generic_port_instance_struct* */ -void py_port_capsule_dealloc(generic_port_capsule_struct* self) { - Py_XDECREF(self->port); - Py_XDECREF(self->value); +static void py_port_capsule_dealloc(generic_port_capsule_struct* self) { + Py_CLEAR(self->port); + Py_CLEAR(self->value); Py_TYPE(self)->tp_free((PyObject*)self); } @@ -147,7 +146,8 @@ PyObject* py_port_capsule_new(PyTypeObject* type, PyObject* args, PyObject* kwds generic_port_capsule_struct* self; self = (generic_port_capsule_struct*)type->tp_alloc(type, 0); if (self != NULL) { - self->port = NULL; + Py_INCREF(Py_None); + self->port = Py_None; Py_INCREF(Py_None); self->value = Py_None; self->is_present = false; @@ -325,7 +325,7 @@ PyMappingMethods py_port_as_mapping = {(lenfunc)py_port_length, (binaryfunc)py_p */ int py_port_capsule_init(generic_port_capsule_struct* self, PyObject* args, PyObject* kwds) { static char* kwlist[] = {"port", "value", "is_present", "width", "current_index", NULL}; - PyObject *value = NULL, *tmp, *port = NULL; + PyObject *value = NULL, *port = NULL; if (!PyArg_ParseTupleAndKeywords(args, kwds, "|OOp", kwlist, &port, &value, &self->is_present, &self->width, &self->current_index)) { @@ -333,14 +333,14 @@ int py_port_capsule_init(generic_port_capsule_struct* self, PyObject* args, PyOb } if (value) { - tmp = self->value; + PyObject* tmp = self->value; Py_INCREF(value); self->value = value; Py_XDECREF(tmp); } if (port) { - tmp = self->port; + PyObject* tmp = self->port; Py_INCREF(port); self->port = port; Py_XDECREF(tmp); diff --git a/python/lib/python_tag.c b/python/lib/python_tag.c index 991f94f64..f32eb0e9d 100644 --- a/python/lib/python_tag.c +++ b/python/lib/python_tag.c @@ -191,10 +191,11 @@ PyTypeObject PyTagType = { * @return PyObject* The tag in Python. */ py_tag_t* convert_C_tag_to_py(tag_t c_tag) { - py_tag_t* py_tag = PyObject_GC_New(py_tag_t, &PyTagType); + py_tag_t* py_tag = PyObject_New(py_tag_t, &PyTagType); if (py_tag == NULL) { lf_print_error_and_exit("Failed to convert tag from C to Python."); } + Py_INCREF(py_tag); py_tag->tag = c_tag; return py_tag; } diff --git a/python/lib/pythontarget.c b/python/lib/pythontarget.c index ae43959d3..cc5c51c24 100644 --- a/python/lib/pythontarget.c +++ b/python/lib/pythontarget.c @@ -445,16 +445,18 @@ void destroy_action_capsule(PyObject* capsule) { free(PyCapsule_GetPointer(capsu */ PyObject* convert_C_port_to_py(void* port, int width) { // Create the port struct in Python - PyObject* cap = (PyObject*)PyObject_GC_New(generic_port_capsule_struct, &py_port_capsule_t); + PyObject* cap = (PyObject*)PyObject_New(generic_port_capsule_struct, &py_port_capsule_t); if (cap == NULL) { lf_print_error_and_exit("Failed to convert port."); } + Py_INCREF(cap); // Create the capsule to hold the void* port PyObject* capsule = PyCapsule_New(port, "port", NULL); if (capsule == NULL) { lf_print_error_and_exit("Failed to convert port."); } + Py_INCREF(capsule); // Fill in the Python port struct ((generic_port_capsule_struct*)cap)->port = capsule; @@ -512,16 +514,18 @@ PyObject* convert_C_action_to_py(void* action) { trigger_t* trigger = ((lf_action_base_t*)action)->trigger; // Create the action struct in Python - PyObject* cap = (PyObject*)PyObject_GC_New(generic_action_capsule_struct, &py_action_capsule_t); + PyObject* cap = (PyObject*)PyObject_New(generic_action_capsule_struct, &py_action_capsule_t); if (cap == NULL) { lf_print_error_and_exit("Failed to convert action."); } + Py_INCREF(cap); // Create the capsule to hold the void* action PyObject* capsule = PyCapsule_New(action, "action", NULL); if (capsule == NULL) { lf_print_error_and_exit("Failed to convert action."); } + Py_INCREF(capsule); // Fill in the Python action struct ((generic_action_capsule_struct*)cap)->action = capsule; @@ -542,7 +546,8 @@ PyObject* convert_C_action_to_py(void* action) { } // Actions in Python always use token type - ((generic_action_capsule_struct*)cap)->value = trigger->tmplt.token->value; + if (((generic_action_instance_struct*)action)->token != NULL) + ((generic_action_capsule_struct*)cap)->value = ((generic_action_instance_struct*)action)->token->value; return cap; } @@ -602,7 +607,14 @@ PyObject* get_python_function(string module, string class, int instance_id, stri mbstowcs(wcwd, cwd, PATH_MAX); - Py_SetPath(wcwd); + // Deprecated: Py_SetPath(wcwd); + // Replace with the following more verbose version: + PyConfig config; + PyConfig_InitPythonConfig(&config); + // Add paths to the configuration + PyWideStringList_Append(&config.module_search_paths, wcwd); + // Initialize Python with the custom configuration + Py_InitializeFromConfig(&config); LF_PRINT_DEBUG("Loading module %s in %s.", module, cwd); diff --git a/tag/api/tag.h b/tag/api/tag.h index c40e490f8..97c1aa0d7 100644 --- a/tag/api/tag.h +++ b/tag/api/tag.h @@ -37,15 +37,12 @@ #define NEVER_TAG \ (tag_t) { .time = NEVER, .microstep = NEVER_MICROSTEP } // Need a separate initializer expression to comply with some C compilers -#define NEVER_TAG_INITIALIZER \ - { NEVER, NEVER_MICROSTEP } +#define NEVER_TAG_INITIALIZER {NEVER, NEVER_MICROSTEP} #define FOREVER_TAG \ (tag_t) { .time = FOREVER, .microstep = FOREVER_MICROSTEP } // Need a separate initializer expression to comply with some C compilers -#define FOREVER_TAG_INITIALIZER \ - { FOREVER, FOREVER_MICROSTEP } -#define ZERO_TAG \ - (tag_t) { .time = 0LL, .microstep = 0u } +#define FOREVER_TAG_INITIALIZER {FOREVER, FOREVER_MICROSTEP} +#define ZERO_TAG (tag_t){.time = 0LL, .microstep = 0u} // Returns true if timeout has elapsed. #define CHECK_TIMEOUT(start, duration) (lf_time_physical() > ((start) + (duration))) @@ -113,6 +110,15 @@ tag_t lf_tag_add(tag_t a, tag_t b); */ instant_t lf_time_add(instant_t a, interval_t b); +/** + * @brief Return an instant minus an interval, saturating on overflow and underflow. + * + * @param a + * @param b + * @return instant_t + */ +instant_t lf_time_subtract(instant_t a, interval_t b); + /** * Compare two tags. Return -1 if the first is less than * the second, 0 if they are equal, and +1 if the first is diff --git a/trace/impl/src/trace_impl.c b/trace/impl/src/trace_impl.c index 895247e87..f819507c4 100644 --- a/trace/impl/src/trace_impl.c +++ b/trace/impl/src/trace_impl.c @@ -43,46 +43,46 @@ static version_t version = {.build_config = * See trace.h. * @return The number of items written to the object table or -1 for failure. */ -static int write_trace_header(trace_t* trace) { - if (trace->_lf_trace_file != NULL) { - size_t items_written = fwrite(&start_time, sizeof(int64_t), 1, trace->_lf_trace_file); +static int write_trace_header(trace_t* t) { + if (t->_lf_trace_file != NULL) { + size_t items_written = fwrite(&start_time, sizeof(int64_t), 1, t->_lf_trace_file); if (items_written != 1) - _LF_TRACE_FAILURE(trace); + _LF_TRACE_FAILURE(t); // The next item in the header is the size of the // _lf_trace_object_descriptions table. - items_written = fwrite(&trace->_lf_trace_object_descriptions_size, sizeof(int), 1, trace->_lf_trace_file); + items_written = fwrite(&t->_lf_trace_object_descriptions_size, sizeof(int), 1, t->_lf_trace_file); if (items_written != 1) - _LF_TRACE_FAILURE(trace); + _LF_TRACE_FAILURE(t); // Next we write the table. - for (size_t i = 0; i < trace->_lf_trace_object_descriptions_size; i++) { + for (size_t i = 0; i < t->_lf_trace_object_descriptions_size; i++) { // Write the pointer to the self struct. - items_written = fwrite(&trace->_lf_trace_object_descriptions[i].pointer, sizeof(void*), 1, trace->_lf_trace_file); + items_written = fwrite(&t->_lf_trace_object_descriptions[i].pointer, sizeof(void*), 1, t->_lf_trace_file); if (items_written != 1) - _LF_TRACE_FAILURE(trace); + _LF_TRACE_FAILURE(t); // Write the pointer to the trigger_t struct. - items_written = fwrite(&trace->_lf_trace_object_descriptions[i].trigger, sizeof(void*), 1, trace->_lf_trace_file); + items_written = fwrite(&t->_lf_trace_object_descriptions[i].trigger, sizeof(void*), 1, t->_lf_trace_file); if (items_written != 1) - _LF_TRACE_FAILURE(trace); + _LF_TRACE_FAILURE(t); // Write the object type. - items_written = fwrite(&trace->_lf_trace_object_descriptions[i].type, // Write the pointer value. - sizeof(_lf_trace_object_t), 1, trace->_lf_trace_file); + items_written = fwrite(&t->_lf_trace_object_descriptions[i].type, // Write the pointer value. + sizeof(_lf_trace_object_t), 1, t->_lf_trace_file); if (items_written != 1) - _LF_TRACE_FAILURE(trace); + _LF_TRACE_FAILURE(t); // Write the description. - size_t description_size = strlen(trace->_lf_trace_object_descriptions[i].description); - items_written = fwrite(trace->_lf_trace_object_descriptions[i].description, sizeof(char), + size_t description_size = strlen(t->_lf_trace_object_descriptions[i].description); + items_written = fwrite(t->_lf_trace_object_descriptions[i].description, sizeof(char), description_size + 1, // Include null terminator. - trace->_lf_trace_file); + t->_lf_trace_file); if (items_written != description_size + 1) - _LF_TRACE_FAILURE(trace); + _LF_TRACE_FAILURE(t); } } - return trace->_lf_trace_object_descriptions_size; + return (int)t->_lf_trace_object_descriptions_size; } /** @@ -125,37 +125,38 @@ static void flush_trace_locked(trace_t* trace, int worker) { /** * @brief Flush the specified buffer to a file. + * @param t The trace struct. * @param worker Index specifying the trace to flush. */ -static void flush_trace(trace_t* trace, int worker) { +static void flush_trace(trace_t* t, int worker) { // To avoid having more than one worker writing to the file at the same time, // enter a critical section. lf_platform_mutex_lock(trace_mutex); - flush_trace_locked(trace, worker); + flush_trace_locked(t, worker); lf_platform_mutex_unlock(trace_mutex); } -static void start_trace(trace_t* trace, int max_num_local_threads) { +static void start_trace(trace_t* t, int max_num_local_threads) { // Do not write the trace header information to the file yet - // so that startup reactions can register user-defined trace objects. + // so that startup reactions can register user-defined t objects. // write_trace_header(); - trace->_lf_trace_header_written = false; + t->_lf_trace_header_written = false; // Allocate an array of arrays of trace records, one per worker thread plus one // for the 0 thread (the main thread, or in an single-threaded program, the only // thread). - trace->_lf_number_of_trace_buffers = max_num_local_threads; - trace->_lf_trace_buffer = - (trace_record_nodeps_t**)malloc(sizeof(trace_record_nodeps_t*) * (trace->_lf_number_of_trace_buffers + 1)); - trace->_lf_trace_buffer++; // the buffer at index -1 is a fallback for user threads. - for (int i = -1; i < (int)trace->_lf_number_of_trace_buffers; i++) { - trace->_lf_trace_buffer[i] = (trace_record_nodeps_t*)malloc(sizeof(trace_record_nodeps_t) * TRACE_BUFFER_CAPACITY); + t->_lf_number_of_trace_buffers = max_num_local_threads; + t->_lf_trace_buffer = + (trace_record_nodeps_t**)malloc(sizeof(trace_record_nodeps_t*) * (t->_lf_number_of_trace_buffers + 1)); + t->_lf_trace_buffer++; // the buffer at index -1 is a fallback for user threads. + for (int i = -1; i < (int)t->_lf_number_of_trace_buffers; i++) { + t->_lf_trace_buffer[i] = (trace_record_nodeps_t*)malloc(sizeof(trace_record_nodeps_t) * TRACE_BUFFER_CAPACITY); } // Array of counters that track the size of each trace record (per thread). - trace->_lf_trace_buffer_size = (size_t*)calloc(sizeof(size_t), trace->_lf_number_of_trace_buffers + 1); - trace->_lf_trace_buffer_size++; + t->_lf_trace_buffer_size = (size_t*)calloc(t->_lf_number_of_trace_buffers + 1, sizeof(size_t)); + t->_lf_trace_buffer_size++; - trace->_lf_trace_stop = 0; + t->_lf_trace_stop = 0; LF_PRINT_DEBUG("Started tracing."); }