diff --git a/core/environment.c b/core/environment.c index 40f369c62..4523c4721 100644 --- a/core/environment.c +++ b/core/environment.c @@ -38,6 +38,31 @@ #include "scheduler.h" #endif +////////////////// +// Local functions, not intended for use outside this file. + +/** + * @brief Callback function to determine whether two events have the same trigger. + * This function is used by event queue and recycle. + * Return 1 if the triggers are identical, 0 otherwise. + * @param event1 A pointer to an event. + * @param event2 A pointer to an event. + */ +static int event_matches(void* event1, void* event2) { + return (((event_t*)event1)->trigger == ((event_t*)event2)->trigger); +} + +/** + * @brief Callback function to print information about an event. + * This function is used by event queue and recycle. + * @param element A pointer to an event. + */ +static void print_event(void* event) { + event_t* e = (event_t*)event; + LF_PRINT_DEBUG("tag: " PRINTF_TAG ", trigger: %p, token: %p", e->base.tag.time, e->base.tag.microstep, + (void*)e->trigger, (void*)e->token); +} + /** * @brief Initialize the threaded part of the environment struct. */ @@ -58,6 +83,7 @@ static void environment_init_threaded(environment_t* env, int num_workers) { (void)num_workers; #endif } + /** * @brief Initialize the single-threaded-specific parts of the environment struct. */ @@ -127,18 +153,6 @@ static void environment_init_federated(environment_t* env, int num_is_present_fi #endif } -void environment_init_tags(environment_t* env, instant_t start_time, interval_t duration) { - env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; - - tag_t stop_tag = FOREVER_TAG_INITIALIZER; - if (duration >= 0LL) { - // A duration has been specified. Calculate the stop time. - stop_tag.time = env->current_tag.time + duration; - stop_tag.microstep = 0; - } - env->stop_tag = stop_tag; -} - static void environment_free_threaded(environment_t* env) { #if !defined(LF_SINGLE_THREADED) free(env->thread_ids); @@ -176,6 +190,9 @@ static void environment_free_federated(environment_t* env) { #endif } +////////////////// +// Functions defined in environment.h. + void environment_free(environment_t* env) { free(env->name); free(env->timer_triggers); @@ -184,9 +201,8 @@ void environment_free(environment_t* env) { free(env->reset_reactions); free(env->is_present_fields); free(env->is_present_fields_abbreviated); - pqueue_free(env->event_q); - pqueue_free(env->recycle_q); - pqueue_free(env->next_q); + pqueue_tag_free(env->event_q); + pqueue_tag_free(env->recycle_q); environment_free_threaded(env); environment_free_single_threaded(env); @@ -194,6 +210,18 @@ void environment_free(environment_t* env) { environment_free_federated(env); } +void environment_init_tags(environment_t* env, instant_t start_time, interval_t duration) { + env->current_tag = (tag_t){.time = start_time, .microstep = 0u}; + + tag_t stop_tag = FOREVER_TAG_INITIALIZER; + if (duration >= 0LL) { + // A duration has been specified. Calculate the stop time. + stop_tag.time = env->current_tag.time + duration; + stop_tag.microstep = 0; + } + env->stop_tag = stop_tag; +} + int environment_init(environment_t* env, const char* name, int id, int num_workers, int num_timers, int num_startup_reactions, int num_shutdown_reactions, int num_reset_reactions, int num_is_present_fields, int num_modes, int num_state_resets, int num_watchdogs, @@ -261,12 +289,9 @@ int environment_init(environment_t* env, const char* name, int id, int num_worke env->_lf_handle = 1; // Initialize our priority queues. - env->event_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_reverse_order, get_event_time, get_event_position, - set_event_position, event_matches, print_event); - env->recycle_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, get_event_time, get_event_position, - set_event_position, event_matches, print_event); - env->next_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, get_event_time, get_event_position, - set_event_position, event_matches, print_event); + env->event_q = pqueue_tag_init_customize(INITIAL_EVENT_QUEUE_SIZE, pqueue_tag_compare, event_matches, print_event); + env->recycle_q = + pqueue_tag_init_customize(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, event_matches, print_event); // Initialize functionality depending on target properties. environment_init_threaded(env, num_workers); diff --git a/core/federated/federate.c b/core/federated/federate.c index 584e9fc3b..510c50cdd 100644 --- a/core/federated/federate.c +++ b/core/federated/federate.c @@ -1255,9 +1255,6 @@ static void handle_provisional_tag_advance_grant() { // (which it should be). Do not do this if the federate has not fully // started yet. - instant_t dummy_event_time = PTAG.time; - microstep_t dummy_event_relative_microstep = PTAG.microstep; - if (lf_tag_compare(env->current_tag, PTAG) == 0) { // The current tag can equal the PTAG if we are at the start time // or if this federate has been able to advance time to the current @@ -1281,22 +1278,18 @@ static void handle_provisional_tag_advance_grant() { // Nothing more to do. LF_MUTEX_UNLOCK(&env->mutex); return; - } else if (PTAG.time == env->current_tag.time) { - // We now know env->current_tag < PTAG, but the times are equal. - // Adjust the microstep for scheduling the dummy event. - dummy_event_relative_microstep -= env->current_tag.microstep; } // We now know env->current_tag < PTAG. - if (dummy_event_time != FOREVER) { - // Schedule a dummy event at the specified time and (relative) microstep. + if (PTAG.time != FOREVER) { + // Schedule a dummy event at the specified tag. LF_PRINT_DEBUG("At tag " PRINTF_TAG ", inserting into the event queue a dummy event " - "with time " PRINTF_TIME " and (relative) microstep " PRINTF_MICROSTEP ".", - env->current_tag.time - start_time, env->current_tag.microstep, dummy_event_time - start_time, - dummy_event_relative_microstep); - // Dummy event points to a NULL trigger and NULL real event. - event_t* dummy = _lf_create_dummy_events(env, NULL, dummy_event_time, NULL, dummy_event_relative_microstep); - pqueue_insert(env->event_q, dummy); + "with time " PRINTF_TIME " and microstep " PRINTF_MICROSTEP ".", + env->current_tag.time - start_time, env->current_tag.microstep, PTAG.time - start_time, + PTAG.microstep); + // Dummy event points to a NULL trigger. + event_t* dummy = _lf_create_dummy_events(env, PTAG); + pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)dummy); } LF_MUTEX_UNLOCK(&env->mutex); @@ -2410,8 +2403,9 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply) // Create a dummy event that will force this federate to advance time and subsequently // enable progress for downstream federates. Increment the time by ADVANCE_MESSAGE_INTERVAL // to prevent too frequent dummy events. - event_t* dummy = _lf_create_dummy_events(env, NULL, tag.time + ADVANCE_MESSAGE_INTERVAL, NULL, 0); - pqueue_insert(env->event_q, dummy); + tag_t dummy_event_tag = (tag_t){.time = tag.time + ADVANCE_MESSAGE_INTERVAL, .microstep = tag.microstep}; + event_t* dummy = _lf_create_dummy_events(env, dummy_event_tag); + pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)dummy); } LF_PRINT_DEBUG("Inserted a dummy event for logical time " PRINTF_TIME ".", tag.time - lf_time_start()); diff --git a/core/modal_models/modes.c b/core/modal_models/modes.c index e6e6f5d95..922d02f9e 100644 --- a/core/modal_models/modes.c +++ b/core/modal_models/modes.c @@ -56,8 +56,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // Forward declaration of functions and variables supplied by reactor_common.c void _lf_trigger_reaction(environment_t* env, reaction_t* reaction, int worker_number); -event_t* _lf_create_dummy_events(environment_t* env, trigger_t* trigger, instant_t time, event_t* next, - microstep_t offset); +event_t* _lf_create_dummy_events(environment_t* env, tag_t tag); // ---------------------------------------------------------------------------- @@ -400,28 +399,17 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[] event->trigger != NULL) { // History transition to a different mode // Remaining time that the event would have been waiting before mode was left instant_t local_remaining_delay = - event->time - + event->base.tag.time - (state->next_mode->deactivation_time != 0 ? state->next_mode->deactivation_time : lf_time_start()); tag_t current_logical_tag = env->current_tag; // Reschedule event with original local delay LF_PRINT_DEBUG("Modes: Re-enqueuing event with a suspended delay of " PRINTF_TIME " (previous TTH: " PRINTF_TIME ", Mode suspended at: " PRINTF_TIME ").", - local_remaining_delay, event->time, state->next_mode->deactivation_time); + local_remaining_delay, event->base.tag.time, state->next_mode->deactivation_time); tag_t schedule_tag = {.time = current_logical_tag.time + local_remaining_delay, .microstep = (local_remaining_delay == 0 ? current_logical_tag.microstep + 1 : 0)}; _lf_schedule_at_tag(env, event->trigger, schedule_tag, event->token); - - // Also schedule events stacked up in super dense time. - event_t* e = event; - while (e->next != NULL) { - schedule_tag.microstep++; - _lf_schedule_at_tag(env, e->next->trigger, schedule_tag, e->next->token); - event_t* tmp = e->next; - e = tmp->next; - // A fresh event was created by schedule, hence, recycle old one - lf_recycle_event(env, tmp); - } } // A fresh event was created by schedule, hence, recycle old one lf_recycle_event(env, event); @@ -490,7 +478,7 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[] // Retract all events from the event queue that are associated with now inactive modes if (env->event_q != NULL) { - size_t q_size = pqueue_size(env->event_q); + size_t q_size = pqueue_tag_size(env->event_q); if (q_size > 0) { event_t** delayed_removal = (event_t**)calloc(q_size, sizeof(event_t*)); size_t delayed_removal_count = 0; @@ -509,7 +497,7 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[] LF_PRINT_DEBUG("Modes: Pulling %zu events from the event queue to suspend them. %d events are now suspended.", delayed_removal_count, _lf_suspended_events_num); for (size_t i = 0; i < delayed_removal_count; i++) { - pqueue_remove(env->event_q, delayed_removal[i]); + pqueue_tag_remove(env->event_q, (pqueue_tag_element_t*)(delayed_removal[i])); } free(delayed_removal); @@ -519,7 +507,8 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[] if (env->modes->triggered_reactions_request) { // Insert a dummy event in the event queue for the next microstep to make // sure startup/reset reactions (if any) are triggered as soon as possible. - pqueue_insert(env->event_q, _lf_create_dummy_events(env, NULL, env->current_tag.time, NULL, 1)); + tag_t dummy_event_tag = (tag_t){.time = env->current_tag.time, .microstep = 1}; + pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)_lf_create_dummy_events(env, dummy_event_tag)); } } } diff --git a/core/reactor.c b/core/reactor.c index 235e1d34d..00df9e07f 100644 --- a/core/reactor.c +++ b/core/reactor.c @@ -220,7 +220,7 @@ int next(environment_t* env) { // Enter the critical section and do not leave until we have // determined which tag to commit to and start invoking reactions for. LF_CRITICAL_SECTION_ENTER(env); - event_t* event = (event_t*)pqueue_peek(env->event_q); + event_t* event = (event_t*)pqueue_tag_peek(env->event_q); // pqueue_dump(event_q, event_q->prt); // If there is no next event and -keepalive has been specified // on the command line, then we will wait the maximum time possible. @@ -231,13 +231,7 @@ int next(environment_t* env) { lf_set_stop_tag(env, (tag_t){.time = env->current_tag.time, .microstep = env->current_tag.microstep + 1}); } } else { - next_tag.time = event->time; - // Deduce the microstep - if (next_tag.time == env->current_tag.time) { - next_tag.microstep = env->current_tag.microstep + 1; - } else { - next_tag.microstep = 0; - } + next_tag = event->base.tag; } if (lf_is_tag_after_stop_tag(env, next_tag)) { @@ -245,10 +239,10 @@ int next(environment_t* env) { next_tag = env->stop_tag; } - LF_PRINT_LOG("Next event (elapsed) time is " PRINTF_TIME ".", next_tag.time - start_time); + LF_PRINT_LOG("Next event (elapsed) tag is " PRINTF_TAG ".", next_tag.time - start_time, next_tag.microstep); // Wait until physical time >= event.time. int finished_sleep = wait_until(env, next_tag.time); - LF_PRINT_LOG("Next event (elapsed) time is " PRINTF_TIME ".", next_tag.time - start_time); + LF_PRINT_LOG("Next event (elapsed) tag is " PRINTF_TAG ".", next_tag.time - start_time, next_tag.microstep); if (finished_sleep != 0) { LF_PRINT_DEBUG("***** wait_until was interrupted."); // Sleep was interrupted. This could happen when a physical action @@ -258,10 +252,10 @@ int next(environment_t* env) { LF_CRITICAL_SECTION_EXIT(env); return 1; } - // Advance current time to match that of the first event on the queue. + // Advance current tag to match that of the first event on the queue. // We can now leave the critical section. Any events that will be added // to the queue asynchronously will have a later tag than the current one. - _lf_advance_logical_time(env, next_tag.time); + _lf_advance_tag(env, next_tag); // Trigger shutdown reactions if appropriate. if (lf_tag_compare(env->current_tag, env->stop_tag) >= 0) { diff --git a/core/reactor_common.c b/core/reactor_common.c index ba6f9574c..33e5582f5 100644 --- a/core/reactor_common.c +++ b/core/reactor_common.c @@ -230,19 +230,15 @@ void _lf_pop_events(environment_t* env) { _lf_handle_mode_triggered_reactions(env); #endif - event_t* event = (event_t*)pqueue_peek(env->event_q); - while (event != NULL && event->time == env->current_tag.time) { - event = (event_t*)pqueue_pop(env->event_q); + event_t* event = (event_t*)pqueue_tag_peek(env->event_q); + while (event != NULL && lf_tag_compare(event->base.tag, env->current_tag) == 0) { + event = (event_t*)pqueue_tag_pop(env->event_q); - if (event->is_dummy) { + if (event->trigger == NULL) { LF_PRINT_DEBUG("Popped dummy event from the event queue."); - if (event->next != NULL) { - LF_PRINT_DEBUG("Putting event from the event queue for the next microstep."); - pqueue_insert(env->next_q, event->next); - } lf_recycle_event(env, event); // Peek at the next event in the event queue. - event = (event_t*)pqueue_peek(env->event_q); + event = (event_t*)pqueue_tag_peek(env->event_q); continue; } @@ -328,31 +324,17 @@ void _lf_pop_events(environment_t* env) { // Mark the trigger present. event->trigger->status = present; - // If this event points to a next event, insert it into the next queue. - if (event->next != NULL) { - // Insert the next event into the next queue. - pqueue_insert(env->next_q, event->next); - } - lf_recycle_event(env, event); // Peek at the next event in the event queue. - event = (event_t*)pqueue_peek(env->event_q); + event = (event_t*)pqueue_tag_peek(env->event_q); }; - - LF_PRINT_DEBUG("There are %zu events deferred to the next microstep.", pqueue_size(env->next_q)); - - // After populating the reaction queue, see if there are things on the - // next queue to put back into the event queue. - while (pqueue_peek(env->next_q) != NULL) { - pqueue_insert(env->event_q, pqueue_pop(env->next_q)); - } } event_t* lf_get_new_event(environment_t* env) { assert(env != GLOBAL_ENVIRONMENT); // Recycle event_t structs, if possible. - event_t* e = (event_t*)pqueue_pop(env->recycle_q); + event_t* e = (event_t*)pqueue_tag_pop(env->recycle_q); if (e == NULL) { e = (event_t*)calloc(1, sizeof(struct event_t)); if (e == NULL) @@ -376,7 +358,7 @@ void _lf_initialize_timer(environment_t* env, trigger_t* timer) { // && (timer->offset != 0 || timer->period != 0)) { event_t* e = lf_get_new_event(env); e->trigger = timer; - e->time = lf_time_logical(env) + timer->offset; + e->base.tag = (tag_t){.time = lf_time_logical(env) + timer->offset, .microstep = 0}; _lf_add_suspended_event(e); return; } @@ -401,9 +383,9 @@ void _lf_initialize_timer(environment_t* env, trigger_t* timer) { // Recycle event_t structs, if possible. event_t* e = lf_get_new_event(env); e->trigger = timer; - e->time = lf_time_logical(env) + delay; + e->base.tag = (tag_t){.time = lf_time_logical(env) + delay, .microstep = 0}; // NOTE: No lock is being held. Assuming this only happens at startup. - pqueue_insert(env->event_q, e); + pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e); tracepoint_schedule(env, timer, delay); // Trace even though schedule is not called. } @@ -462,38 +444,21 @@ void _lf_trigger_shutdown_reactions(environment_t* env) { void lf_recycle_event(environment_t* env, event_t* e) { assert(env != GLOBAL_ENVIRONMENT); - e->time = 0LL; + e->base.tag = (tag_t){.time = 0LL, .microstep = 0}; e->trigger = NULL; - e->pos = 0; e->token = NULL; - e->is_dummy = false; #ifdef FEDERATED_DECENTRALIZED e->intended_tag = (tag_t){.time = NEVER, .microstep = 0u}; #endif - e->next = NULL; - pqueue_insert(env->recycle_q, e); + pqueue_tag_insert(env->recycle_q, (pqueue_tag_element_t*)e); } -event_t* _lf_create_dummy_events(environment_t* env, trigger_t* trigger, instant_t time, event_t* next, - microstep_t offset) { - event_t* first_dummy = lf_get_new_event(env); - event_t* dummy = first_dummy; - dummy->time = time; - dummy->is_dummy = true; - dummy->trigger = trigger; - while (offset > 0) { - if (offset == 1) { - dummy->next = next; - break; - } - dummy->next = lf_get_new_event(env); - dummy = dummy->next; - dummy->time = time; - dummy->is_dummy = true; - dummy->trigger = trigger; - offset--; - } - return first_dummy; +event_t* _lf_create_dummy_events(environment_t* env, tag_t tag) { + event_t* dummy = lf_get_new_event(env); + dummy->base.tag = tag; + + dummy->trigger = NULL; + return dummy; } void lf_replace_token(event_t* event, lf_token_t* token) { @@ -531,7 +496,7 @@ trigger_handle_t _lf_schedule_at_tag(environment_t* env, trigger_t* trigger, tag event_t* e = lf_get_new_event(env); // Set the event time - e->time = tag.time; + e->base.tag = tag; tracepoint_schedule(env, trigger, tag.time - current_logical_tag.time); @@ -547,133 +512,37 @@ trigger_handle_t _lf_schedule_at_tag(environment_t* env, trigger_t* trigger, tag e->intended_tag = trigger->intended_tag; #endif - event_t* found = (event_t*)pqueue_find_equal_same_priority(env->event_q, e); + event_t* found = (event_t*)pqueue_tag_find_equal_same_tag(env->event_q, (pqueue_tag_element_t*)e); if (found != NULL) { - if (tag.microstep == 0u) { - // The microstep is 0, which means that the event is being scheduled - // at a future time and at the beginning of the skip list of events - // at that time. - // In case the event is a dummy event - // convert it to a real event. - found->is_dummy = false; - switch (trigger->policy) { - case drop: - if (found->token != token) { - _lf_done_using(token); - } - lf_recycle_event(env, e); - return (0); - break; - case replace: - // Replace the payload of the event at the head with our - // current payload. - lf_replace_token(found, token); + switch (trigger->policy) { + case drop: + if (found->token != token) { + _lf_done_using(token); + } + lf_recycle_event(env, e); + return (0); + break; + case replace: + // Replace the payload of the event at the head with our + // current payload. + lf_replace_token(found, token); + lf_recycle_event(env, e); + return 0; + break; + default: + // Adding a microstep to the original + // intended tag. + tag.microstep++; + e->base.tag = tag; + if (lf_is_tag_after_stop_tag(env, (tag_t){.time = tag.time, .microstep = tag.microstep})) { + // Scheduling e will incur a microstep after the stop tag, + // which is illegal. lf_recycle_event(env, e); return 0; - break; - default: - // Adding a microstep to the original - // intended tag. - if (lf_is_tag_after_stop_tag(env, (tag_t){.time = found->time, .microstep = 1})) { - // Scheduling e will incur a microstep after the stop tag, - // which is illegal. - lf_recycle_event(env, e); - return 0; - } - if (found->next != NULL) { - lf_print_error("_lf_schedule_at_tag: in-order contract violated."); - return -1; - } - found->next = e; } - } else { - // We are requesting a microstep greater than 0 - // where there is already an event for this trigger on the event queue. - // That event may itself be a dummy event for a real event that is - // also at a microstep greater than 0. - // We have to insert our event into the chain or append it - // to the end of the chain, depending on which microstep is lesser. - microstep_t microstep_of_found = 0; - if (tag.time == current_logical_tag.time) { - // This is a situation where the head of the queue - // is an event with microstep == current_microstep + 1 - // which should be reflected in our steps calculation. - microstep_of_found += current_logical_tag.microstep + 1; // Indicating that - // the found event - // is at this microstep. - } - // Follow the chain of events until the right point - // to insert the new event. - while (microstep_of_found < tag.microstep - 1) { - if (found->next == NULL) { - // The chain stops short of where we want to be. - // If it exactly one microstep short of where we want to be, - // then we don't need a dummy. Otherwise, we do. - microstep_t undershot_by = (tag.microstep - 1) - microstep_of_found; - if (undershot_by > 0) { - found->next = _lf_create_dummy_events(env, trigger, tag.time, e, undershot_by); - } else { - found->next = e; - } - return 1; - } - found = found->next; - microstep_of_found++; - } - // At this point, microstep_of_found == tag.microstep - 1. - if (found->next == NULL) { - found->next = e; - } else { - switch (trigger->policy) { - case drop: - if (found->next->token != token) { - _lf_done_using(token); - } - lf_recycle_event(env, e); - return 0; - break; - case replace: - // Replace the payload of the event at the head with our - // current payload. - lf_replace_token(found->next, token); - lf_recycle_event(env, e); - return 0; - break; - default: - // Adding a microstep to the original - // intended tag. - if (lf_is_tag_after_stop_tag(env, (tag_t){.time = found->time, .microstep = microstep_of_found + 1})) { - // Scheduling e will incur a microstep at timeout, - // which is illegal. - lf_recycle_event(env, e); - return 0; - } - if (found->next->next != NULL) { - lf_print_error("_lf_schedule_at_tag: in-order contract violated."); - return -1; - } - found->next->next = e; - } - } - } - } else { - // No existing event queued. - microstep_t relative_microstep = tag.microstep; - if (tag.time == current_logical_tag.time) { - relative_microstep -= current_logical_tag.microstep; - } - if ((tag.time == current_logical_tag.time && relative_microstep == 1 && env->execution_started) || - tag.microstep == 0) { - // Do not need a dummy event if we are scheduling at 1 microstep - // in the future at current time or at microstep 0 in a future time. - // Note that if execution hasn't started, then we have to insert dummy events. - pqueue_insert(env->event_q, e); - } else { - // Create a dummy event. Insert it into the queue, and let its next - // pointer point to the actual event. - pqueue_insert(env->event_q, _lf_create_dummy_events(env, trigger, tag.time, e, relative_microstep)); } } + pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e); trigger_handle_t return_value = env->_lf_handle++; if (env->_lf_handle < 0) { env->_lf_handle = 1; @@ -757,36 +626,37 @@ trigger_handle_t _lf_insert_reactions_for_trigger(environment_t* env, trigger_t* return 1; } -void _lf_advance_logical_time(environment_t* env, instant_t next_time) { +void _lf_advance_tag(environment_t* env, tag_t next_tag) { assert(env != GLOBAL_ENVIRONMENT); -// FIXME: The following checks that _lf_advance_logical_time() +// FIXME: The following checks that _lf_advance_tag() // is being called correctly. Namely, check if logical time // is being pushed past the head of the event queue. This should -// never happen if _lf_advance_logical_time() is called correctly. +// never happen if _lf_advance_tag() is called correctly. // This is commented out because it will add considerable overhead // to the ordinary execution of LF programs. Instead, there might // be a need for a target property that enables these kinds of logic // assertions for development purposes only. #ifndef NDEBUG - event_t* next_event = (event_t*)pqueue_peek(env->event_q); + event_t* next_event = (event_t*)pqueue_tag_peek(env->event_q); if (next_event != NULL) { - if (next_time > next_event->time) { - lf_print_error_and_exit("_lf_advance_logical_time(): Attempted to move time to " PRINTF_TIME ", which is " - "past the head of the event queue, " PRINTF_TIME ".", - next_time - start_time, next_event->time - start_time); + if (lf_tag_compare(next_tag, next_event->base.tag) > 0) { + lf_print_error_and_exit("_lf_advance_tag(): Attempted to move tag to " PRINTF_TAG ", which is " + "past the head of the event queue, " PRINTF_TAG ".", + next_tag.time - start_time, next_tag.microstep, next_event->base.tag.time - start_time, + next_event->base.tag.microstep); } } #endif - if (env->current_tag.time < next_time) { - env->current_tag.time = next_time; - env->current_tag.microstep = 0; - } else if (env->current_tag.time == next_time) { - env->current_tag.microstep++; + if (lf_tag_compare(env->current_tag, next_tag) < 0) { + env->current_tag = next_tag; } else { - lf_print_error_and_exit("_lf_advance_logical_time(): Attempted to move tag back in time."); + lf_print_error_and_exit("_lf_advance_tag(): Attempted to move (elapsed) tag to " PRINTF_TAG ", which is " + "earlier than or equal to the (elapsed) current tag, " PRINTF_TAG ".", + next_tag.time - start_time, next_tag.microstep, env->current_tag.time - start_time, + env->current_tag.microstep); } - LF_PRINT_LOG("Advanced (elapsed) tag to " PRINTF_TAG " at physical time " PRINTF_TIME, next_time - start_time, + LF_PRINT_LOG("Advanced (elapsed) tag to " PRINTF_TAG " at physical time " PRINTF_TIME, next_tag.time - start_time, env->current_tag.microstep, lf_time_physical_elapsed()); } @@ -1269,12 +1139,12 @@ void termination(void) { _lf_terminate_modal_reactors(&env[i]); #endif // If the event queue still has events on it, report that. - if (env[i].event_q != NULL && pqueue_size(env[i].event_q) > 0) { + if (env[i].event_q != NULL && pqueue_tag_size(env[i].event_q) > 0) { lf_print_warning("---- There are %zu unprocessed future events on the event queue.", - pqueue_size(env[i].event_q)); - event_t* event = (event_t*)pqueue_peek(env[i].event_q); - interval_t event_time = event->time - start_time; - lf_print_warning("---- The first future event has timestamp " PRINTF_TIME " after start time.", event_time); + pqueue_tag_size(env[i].event_q)); + event_t* event = (event_t*)pqueue_tag_peek(env[i].event_q); + lf_print_warning("---- The first future event has timestamp " PRINTF_TAG " after start tag.", + event->base.tag.time - start_time, event->base.tag.microstep); } // Print elapsed times. // If these are negative, then the program failed to start up. diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c index e364675d6..57f888fc2 100644 --- a/core/threaded/reactor_threaded.c +++ b/core/threaded/reactor_threaded.c @@ -276,24 +276,18 @@ tag_t get_next_event_tag(environment_t* env) { assert(env != GLOBAL_ENVIRONMENT); // Peek at the earliest event in the event queue. - event_t* event = (event_t*)pqueue_peek(env->event_q); + event_t* event = (event_t*)pqueue_tag_peek(env->event_q); tag_t next_tag = FOREVER_TAG; if (event != NULL) { // There is an event in the event queue. - if (event->time < env->current_tag.time) { - lf_print_error_and_exit("get_next_event_tag(): Earliest event on the event queue (" PRINTF_TIME ") is " - "earlier than the current time (" PRINTF_TIME ").", - event->time - start_time, env->current_tag.time - start_time); + if (lf_tag_compare(event->base.tag, env->current_tag) < 0) { + lf_print_error_and_exit("get_next_event_tag(): Earliest event on the event queue (" PRINTF_TAG ") is " + "earlier than the current tag (" PRINTF_TAG ").", + event->base.tag.time - start_time, event->base.tag.microstep, + env->current_tag.time - start_time, env->current_tag.microstep); } - next_tag.time = event->time; - if (next_tag.time == env->current_tag.time) { - LF_PRINT_DEBUG("Earliest event matches current time. Incrementing microstep. Event is dummy: %d.", - event->is_dummy); - next_tag.microstep = env->current_tag.microstep + 1; - } else { - next_tag.microstep = 0; - } + next_tag = event->base.tag; } // If a timeout tag was given, adjust the next_tag from the @@ -302,7 +296,7 @@ tag_t get_next_event_tag(environment_t* env) { next_tag = env->stop_tag; } LF_PRINT_LOG("Earliest event on the event queue (or stop time if empty) is " PRINTF_TAG ". Event queue has size %zu.", - next_tag.time - start_time, next_tag.microstep, pqueue_size(env->event_q)); + next_tag.time - start_time, next_tag.microstep, pqueue_tag_size(env->event_q)); return next_tag; } @@ -410,7 +404,7 @@ void _lf_next_locked(environment_t* env) { // behavior with centralized coordination as with unfederated execution. #else // not FEDERATED_CENTRALIZED nor LF_ENCLAVES - if (pqueue_peek(env->event_q) == NULL && !keepalive_specified) { + if (pqueue_tag_peek(env->event_q) == NULL && !keepalive_specified) { // There is no event on the event queue and keepalive is false. // No event in the queue // keepalive is not set so we should stop. @@ -478,7 +472,7 @@ void _lf_next_locked(environment_t* env) { } // At this point, finally, we have an event to process. - _lf_advance_logical_time(env, next_tag.time); + _lf_advance_tag(env, next_tag); _lf_start_time_step(env); @@ -488,7 +482,7 @@ void _lf_next_locked(environment_t* env) { _lf_trigger_shutdown_reactions(env); } - // Pop all events from event_q with timestamp equal to env->current_tag.time, + // Pop all events from event_q with timestamp equal to env->current_tag, // extract all the reactions triggered by these events, and // stick them into the reaction queue. _lf_pop_events(env); @@ -975,8 +969,9 @@ void lf_print_snapshot(environment_t* env) { LF_PRINT_DEBUG("Pending:"); // pqueue_dump(reaction_q, print_reaction); FIXME: reaction_q is not // accessible here - LF_PRINT_DEBUG("Event queue size: %zu. Contents:", pqueue_size(env->event_q)); - pqueue_dump(env->event_q, print_reaction); + LF_PRINT_DEBUG("Event queue size: %zu. Contents:", pqueue_tag_size(env->event_q)); + // FIXME: There is no pqueue_tag_dump now + pqueue_tag_dump(env->event_q); LF_PRINT_DEBUG(">>> END Snapshot"); } } diff --git a/core/utils/pqueue.c b/core/utils/pqueue.c index d26d8b2aa..b2bf05090 100644 --- a/core/utils/pqueue.c +++ b/core/utils/pqueue.c @@ -2,10 +2,14 @@ * @file pqueue.c * @author Marten Lohstroh * @author Edward A. Lee + * @author Byeonggil Jun * @copyright (c) 2020-2023, The University of California at Berkeley. * License: BSD 2-clause * - * @brief Priority queue definitions for the event queue and reaction queue. + * @brief Priority queue definitions for queues where the priority is a number that can be compared with ordinary + * numerical comparisons. + * + * This is used for the reaction queue. The event queue uses a `tag_t` struct for its priority, so it cannot use this. */ #include "low_level_platform.h" @@ -13,7 +17,7 @@ #include "util.h" #include "lf_types.h" -int in_reverse_order(pqueue_pri_t thiz, pqueue_pri_t that) { return (thiz > that); } +int in_reverse_order(pqueue_pri_t thiz, pqueue_pri_t that) { return (thiz > that) ? 1 : (thiz < that) ? -1 : 0; } int in_no_particular_order(pqueue_pri_t thiz, pqueue_pri_t that) { (void)thiz; @@ -21,28 +25,15 @@ int in_no_particular_order(pqueue_pri_t thiz, pqueue_pri_t that) { return 0; } -int event_matches(void* event1, void* event2) { return (((event_t*)event1)->trigger == ((event_t*)event2)->trigger); } - int reaction_matches(void* a, void* b) { return (a == b); } -pqueue_pri_t get_event_time(void* event) { return (pqueue_pri_t)(((event_t*)event)->time); } - pqueue_pri_t get_reaction_index(void* reaction) { return ((reaction_t*)reaction)->index; } -size_t get_event_position(void* event) { return ((event_t*)event)->pos; } - size_t get_reaction_position(void* reaction) { return ((reaction_t*)reaction)->pos; } -void set_event_position(void* event, size_t pos) { ((event_t*)event)->pos = pos; } - void set_reaction_position(void* reaction, size_t pos) { ((reaction_t*)reaction)->pos = pos; } void print_reaction(void* reaction) { reaction_t* r = (reaction_t*)reaction; LF_PRINT_DEBUG("%s: chain_id: %llu, index: %llx, reaction: %p", r->name, r->chain_id, r->index, reaction); } - -void print_event(void* event) { - event_t* e = (event_t*)event; - LF_PRINT_DEBUG("time: " PRINTF_TIME ", trigger: %p, token: %p", e->time, (void*)e->trigger, (void*)e->token); -} diff --git a/core/utils/pqueue_base.c b/core/utils/pqueue_base.c index 30d84286e..5d602e1de 100644 --- a/core/utils/pqueue_base.c +++ b/core/utils/pqueue_base.c @@ -29,6 +29,15 @@ * - The provided pqueue_eq_elem_f implementation is used to test and * search for equal elements present in the queue; and * - Removed capability to reassign priorities. + * + * Modified by Byeonggil Jun (Apr, 2024). + * Changes: + * - Made the pqueue_cmp_pri_f function return do the three-way comparison + * rather than the two-way comparison. + * - The changed pqueue_cmp_pri_f function is used to check the equality of + * two elements in the pqueue_find_equal_same_priority function. + * - Remove the pqueue_find_equal function. + * */ #include @@ -44,9 +53,9 @@ #define LF_RIGHT(i) (((i) << 1) + 1) #define LF_PARENT(i) ((i) >> 1) -void* find_equal(pqueue_t* q, void* e, int pos, pqueue_pri_t max) { +static void* find_same_priority(pqueue_t* q, void* e, int pos) { if (pos < 0) { - lf_print_error_and_exit("find_equal() called with a negative pos index."); + lf_print_error_and_exit("find_same_priority() called with a negative pos index."); } // Stop the recursion when we've reached the end of the @@ -59,19 +68,20 @@ void* find_equal(pqueue_t* q, void* e, int pos, pqueue_pri_t max) { void* rval; void* curr = q->d[pos]; - // Stop the recursion when we've surpassed the maximum priority. - if (!curr || q->cmppri(q->getpri(curr), max)) { + // Stop the recursion once we've surpassed the priority of the element + // we're looking for. + if (!curr || q->cmppri(q->getpri(curr), q->getpri(e)) == 1) { return NULL; } - if (q->eqelem(curr, e)) { + if (q->cmppri(q->getpri(curr), q->getpri(e)) == 0) { return curr; } else { - rval = find_equal(q, e, LF_LEFT(pos), max); + rval = find_same_priority(q, e, LF_LEFT(pos)); if (rval) return rval; else - return find_equal(q, e, LF_RIGHT(pos), max); + return find_same_priority(q, e, LF_RIGHT(pos)); } return NULL; } @@ -93,11 +103,11 @@ void* find_equal_same_priority(pqueue_t* q, void* e, int pos) { // Stop the recursion once we've surpassed the priority of the element // we're looking for. - if (!curr || q->cmppri(q->getpri(curr), q->getpri(e))) { + if (!curr || q->cmppri(q->getpri(curr), q->getpri(e)) == 1) { return NULL; } - if (q->getpri(curr) == q->getpri(e) && q->eqelem(curr, e)) { + if (q->cmppri(q->getpri(curr), q->getpri(e)) == 0 && q->eqelem(curr, e)) { return curr; } else { rval = find_equal_same_priority(q, e, LF_LEFT(pos)); @@ -157,7 +167,7 @@ static size_t maxchild(pqueue_t* q, size_t i) { if (child_node >= q->size) return 0; - if ((child_node + 1) < q->size && (q->cmppri(q->getpri(q->d[child_node]), q->getpri(q->d[child_node + 1])))) + if ((child_node + 1) < q->size && (q->cmppri(q->getpri(q->d[child_node]), q->getpri(q->d[child_node + 1])) == 1)) child_node++; /* use right child instead of left */ return child_node; @@ -168,7 +178,7 @@ static size_t bubble_up(pqueue_t* q, size_t i) { void* moving_node = q->d[i]; pqueue_pri_t moving_pri = q->getpri(moving_node); - for (parent_node = LF_PARENT(i); ((i > 1) && q->cmppri(q->getpri(q->d[parent_node]), moving_pri)); + for (parent_node = LF_PARENT(i); ((i > 1) && q->cmppri(q->getpri(q->d[parent_node]), moving_pri) == 1); i = parent_node, parent_node = LF_PARENT(i)) { q->d[i] = q->d[parent_node]; q->setpos(q->d[i], i); @@ -184,7 +194,7 @@ static void percolate_down(pqueue_t* q, size_t i) { void* moving_node = q->d[i]; pqueue_pri_t moving_pri = q->getpri(moving_node); - while ((child_node = maxchild(q, i)) && q->cmppri(moving_pri, q->getpri(q->d[child_node]))) { + while ((child_node = maxchild(q, i)) && (q->cmppri(moving_pri, q->getpri(q->d[child_node])) == 1)) { q->d[i] = q->d[child_node]; q->setpos(q->d[i], i); i = child_node; @@ -194,9 +204,9 @@ static void percolate_down(pqueue_t* q, size_t i) { q->setpos(moving_node, i); } -void* pqueue_find_equal_same_priority(pqueue_t* q, void* e) { return find_equal_same_priority(q, e, 1); } +void* pqueue_find_same_priority(pqueue_t* q, void* e) { return find_same_priority(q, e, 1); } -void* pqueue_find_equal(pqueue_t* q, void* e, pqueue_pri_t max) { return find_equal(q, e, 1, max); } +void* pqueue_find_equal_same_priority(pqueue_t* q, void* e) { return find_equal_same_priority(q, e, 1); } int pqueue_insert(pqueue_t* q, void* d) { void** tmp; @@ -227,7 +237,7 @@ int pqueue_remove(pqueue_t* q, void* d) { return 0; // Nothing to remove size_t posn = q->getpos(d); q->d[posn] = q->d[--q->size]; - if (q->cmppri(q->getpri(d), q->getpri(q->d[posn]))) + if (q->cmppri(q->getpri(d), q->getpri(q->d[posn])) == 1) bubble_up(q, posn); else percolate_down(q, posn); @@ -320,7 +330,7 @@ static int subtree_is_valid(pqueue_t* q, int pos) { if ((size_t)left_pos < q->size) { /* has a left child */ - if (q->cmppri(q->getpri(q->d[pos]), q->getpri(q->d[LF_LEFT(pos)]))) + if (q->cmppri(q->getpri(q->d[pos]), q->getpri(q->d[LF_LEFT(pos)])) == 1) return 0; if (!subtree_is_valid(q, LF_LEFT(pos))) return 0; @@ -332,7 +342,7 @@ static int subtree_is_valid(pqueue_t* q, int pos) { } if ((size_t)right_pos < q->size) { /* has a right child */ - if (q->cmppri(q->getpri(q->d[pos]), q->getpri(q->d[LF_RIGHT(pos)]))) + if (q->cmppri(q->getpri(q->d[pos]), q->getpri(q->d[LF_RIGHT(pos)])) == 1) return 0; if (!subtree_is_valid(q, LF_RIGHT(pos))) return 0; diff --git a/core/utils/pqueue_support.h b/core/utils/pqueue_support.h deleted file mode 100644 index c751b67f1..000000000 --- a/core/utils/pqueue_support.h +++ /dev/null @@ -1,114 +0,0 @@ -/************* -Copyright (c) 2022, The University of California at Berkeley. - -Redistribution and use in source and binary forms, with or without modification, -are permitted provided that the following conditions are met: - -1. Redistributions of source code must retain the above copyright notice, - this list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY -EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF -MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL -THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, -SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, -PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, -STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF -THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -***************/ - -/** - * @file pqueue_support.h - * @author Edward A. Lee - * @author Marten Lohstroh - * @brief Header-only support functions for pqueue. - */ - -#ifndef PQUEUE_SUPPORT_H -#define PQUEUE_SUPPORT_H - -#include "../reactor.h" - -// ********** Priority Queue Support Start - -/** - * Return whether the first and second argument are given in reverse order. - */ -static int in_reverse_order(pqueue_pri_t thiz, pqueue_pri_t that) { return (thiz > that); } - -/** - * Return false (0) regardless of reaction order. - */ -static int in_no_particular_order(pqueue_pri_t thiz, pqueue_pri_t that) { return false; } - -/** - * Return whether or not the given events have matching triggers. - */ -static int event_matches(void* next, void* curr) { return (((event_t*)next)->trigger == ((event_t*)curr)->trigger); } - -/** - * Return whether or not the given reaction_t pointers - * point to the same struct. - */ -static int reaction_matches(void* next, void* curr) { return (next == curr); } - -/** - * Report a priority equal to the time of the given event. - * Used for sorting pointers to event_t structs in the event queue. - */ -static pqueue_pri_t get_event_time(void* a) { return (pqueue_pri_t)(((event_t*)a)->time); } - -/** - * Report a priority equal to the index of the given reaction. - * Used for sorting pointers to reaction_t structs in the - * blocked and executing queues. - */ -static pqueue_pri_t get_reaction_index(void* a) { return ((reaction_t*)a)->index; } - -/** - * Return the given event's position in the queue. - */ -static size_t get_event_position(void* a) { return ((event_t*)a)->pos; } - -/** - * Return the given reaction's position in the queue. - */ -static size_t get_reaction_position(void* a) { return ((reaction_t*)a)->pos; } - -/** - * Set the given event's position in the queue. - */ -static void set_event_position(void* a, size_t pos) { ((event_t*)a)->pos = pos; } - -/** - * Return the given reaction's position in the queue. - */ -static void set_reaction_position(void* a, size_t pos) { ((reaction_t*)a)->pos = pos; } - -/** - * Print some information about the given reaction. - * - * DEBUG function only. - */ -static void print_reaction(void* reaction) { - reaction_t* r = (reaction_t*)reaction; - LF_PRINT_DEBUG("%s: chain_id:%llu, index: %llx, reaction: %p", r->name, r->chain_id, r->index, (void*)r); -} - -/** - * Print some information about the given event. - * - * DEBUG function only. - */ -static void print_event(void* event) { - event_t* e = (event_t*)event; - LF_PRINT_DEBUG("time: " PRINTF_TIME ", trigger: %p, token: %p", e->time, (void*)e->trigger, (void*)e->token); -} - -// ********** Priority Queue Support End -#endif diff --git a/core/utils/pqueue_tag.c b/core/utils/pqueue_tag.c index 9fe00653d..24899374b 100644 --- a/core/utils/pqueue_tag.c +++ b/core/utils/pqueue_tag.c @@ -25,17 +25,6 @@ */ static pqueue_pri_t pqueue_tag_get_priority(void* element) { return (pqueue_pri_t)element; } -/** - * @brief Callback comparison function for the tag-based priority queue. - * Return 0 if the first argument is less than second and 1 otherwise. - * This function is of type pqueue_cmp_pri_f. - * @param priority1 A pointer to a pqueue_tag_element_t, cast to pqueue_pri_t. - * @param priority2 A pointer to a pqueue_tag_element_t, cast to pqueue_pri_t. - */ -static int pqueue_tag_compare(pqueue_pri_t priority1, pqueue_pri_t priority2) { - return (lf_tag_compare(((pqueue_tag_element_t*)priority1)->tag, ((pqueue_tag_element_t*)priority2)->tag) > 0); -} - /** * @brief Callback function to determine whether two elements are equivalent. * Return 1 if the tags contained by given elements are identical, 0 otherwise. @@ -75,11 +64,21 @@ static void pqueue_tag_print_element(void* element) { ////////////////// // Functions defined in pqueue_tag.h. +int pqueue_tag_compare(pqueue_pri_t priority1, pqueue_pri_t priority2) { + return (lf_tag_compare(((pqueue_tag_element_t*)priority1)->tag, ((pqueue_tag_element_t*)priority2)->tag)); +} + pqueue_tag_t* pqueue_tag_init(size_t initial_size) { return (pqueue_tag_t*)pqueue_init(initial_size, pqueue_tag_compare, pqueue_tag_get_priority, pqueue_tag_get_position, pqueue_tag_set_position, pqueue_tag_matches, pqueue_tag_print_element); } +pqueue_tag_t* pqueue_tag_init_customize(size_t initial_size, pqueue_cmp_pri_f cmppri, pqueue_eq_elem_f eqelem, + pqueue_print_entry_f prt) { + return (pqueue_tag_t*)pqueue_init(initial_size, cmppri, pqueue_tag_get_priority, pqueue_tag_get_position, + pqueue_tag_set_position, eqelem, prt); +} + void pqueue_tag_free(pqueue_tag_t* q) { for (size_t i = 1; i < q->size; i++) { if (q->d[i] != NULL && ((pqueue_tag_element_t*)q->d[i])->is_dynamic) { @@ -101,11 +100,14 @@ int pqueue_tag_insert_tag(pqueue_tag_t* q, tag_t t) { } pqueue_tag_element_t* pqueue_tag_find_with_tag(pqueue_tag_t* q, tag_t t) { - // Create elements on the stack. These elements are only needed during - // the duration of this function call, so putting them on the stack is OK. + // Create an element on the stack. This element is only needed during + // the duration of this function call, so putting it on the stack is OK. pqueue_tag_element_t element = {.tag = t, .pos = 0, .is_dynamic = false}; - pqueue_tag_element_t forever = {.tag = FOREVER_TAG, .pos = 0, .is_dynamic = false}; - return pqueue_find_equal((pqueue_t*)q, (void*)&element, (pqueue_pri_t)&forever); + return pqueue_find_same_priority((pqueue_t*)q, (void*)&element); +} + +pqueue_tag_element_t* pqueue_tag_find_equal_same_tag(pqueue_tag_t* q, pqueue_tag_element_t* e) { + return pqueue_find_equal_same_priority((pqueue_t*)q, (void*)e); } int pqueue_tag_insert_if_no_match(pqueue_tag_t* q, tag_t t) { @@ -149,3 +151,5 @@ void pqueue_tag_remove_up_to(pqueue_tag_t* q, tag_t t) { head = pqueue_tag_peek_tag(q); } } + +void pqueue_tag_dump(pqueue_tag_t* q) { pqueue_dump((pqueue_t*)q, pqueue_tag_print_element); } diff --git a/include/core/environment.h b/include/core/environment.h index 98753c6fb..a776dee95 100644 --- a/include/core/environment.h +++ b/include/core/environment.h @@ -73,9 +73,8 @@ typedef struct environment_t { int id; tag_t current_tag; tag_t stop_tag; - pqueue_t* event_q; - pqueue_t* recycle_q; - pqueue_t* next_q; + pqueue_tag_t* event_q; + pqueue_tag_t* recycle_q; bool** is_present_fields; int is_present_fields_size; bool** is_present_fields_abbreviated; diff --git a/include/core/lf_types.h b/include/core/lf_types.h index 5598cf820..75a61e405 100644 --- a/include/core/lf_types.h +++ b/include/core/lf_types.h @@ -21,6 +21,7 @@ #include "modal_models/modes.h" // Modal model support #include "utils/pqueue.h" +#include "utils/pqueue_tag.h" #include "lf_token.h" #include "tag.h" #include "vector.h" @@ -195,15 +196,12 @@ typedef struct event_t event_t; /** Event activation record to push onto the event queue. */ struct event_t { - instant_t time; // Time of release. - trigger_t* trigger; // Associated trigger, NULL if this is a dummy event. - size_t pos; // Position in the priority queue. - lf_token_t* token; // Pointer to the token wrapping the value. - bool is_dummy; // Flag to indicate whether this event is merely a placeholder or an actual event. + pqueue_tag_element_t base; // Elements of pqueue_tag. It contains tag of release and position in the priority queue. + trigger_t* trigger; // Associated trigger, NULL if this is a dummy event. + lf_token_t* token; // Pointer to the token wrapping the value. #ifdef FEDERATED tag_t intended_tag; // The intended tag. #endif - event_t* next; // Pointer to the next event lined up in superdense time. }; /** diff --git a/include/core/reactor_common.h b/include/core/reactor_common.h index fc1451a96..8086ed7db 100644 --- a/include/core/reactor_common.h +++ b/include/core/reactor_common.h @@ -195,16 +195,15 @@ void _lf_trigger_startup_reactions(environment_t* env); void _lf_trigger_shutdown_reactions(environment_t* env); /** - * Create dummy events to be used as spacers in the event queue. + * @brief Create a dummy event with the specified tag. + * + * A dummy event is an event with no triggers that can be put on the event queue to trigger a tag advance to the + * specified tag. * @param env Environment in which we are executing. - * @param trigger The eventual event to be triggered. - * @param time The logical time of that event. - * @param next The event to place after the dummy events. - * @param offset The number of dummy events to insert. - * @return A pointer to the first dummy event. + * @param tag The tag of that event. + * @return A pointer to the dummy event. */ -event_t* _lf_create_dummy_events(environment_t* env, trigger_t* trigger, instant_t time, event_t* next, - microstep_t offset); +event_t* _lf_create_dummy_events(environment_t* env, tag_t tag); /** * @brief Schedule an event at a specific tag (time, microstep). @@ -250,9 +249,9 @@ trigger_handle_t _lf_insert_reactions_for_trigger(environment_t* env, trigger_t* * the current time, then increase the microstep. Otherwise, update the current * time and set the microstep to zero. * @param env The environment in which we are executing - * @param next_time The time step to advance to. + * @param next_tag The tag step to advance to. */ -void _lf_advance_logical_time(environment_t* env, instant_t next_time); +void _lf_advance_tag(environment_t* env, tag_t next_tag); /** * @brief Pop all events from event_q with tag equal to current tag. diff --git a/include/core/utils/pqueue.h b/include/core/utils/pqueue.h index e317acbcd..448b94462 100644 --- a/include/core/utils/pqueue.h +++ b/include/core/utils/pqueue.h @@ -2,10 +2,14 @@ * @file pqueue.h * @author Marten Lohstroh * @author Edward A. Lee + * @author Byeonggil Jun * @copyright (c) 2020-2023, The University of California at Berkeley. * License: BSD 2-clause * - * @brief Priority queue declarations for the event queue and reaction queue. + * @brief Priority queue definitions for queues where the priority is a number that can be compared with ordinary + * numerical comparisons. + * + * This is used for the reaction queue. The event queue uses a `tag_t` struct for its priority, so it cannot use this. */ #ifndef PQUEUE_H @@ -27,13 +31,6 @@ int in_reverse_order(pqueue_pri_t thiz, pqueue_pri_t that); */ int in_no_particular_order(pqueue_pri_t thiz, pqueue_pri_t that); -/** - * Return 1 if the two events have the same trigger. - * @param event1 A pointer to an event_t. - * @param event2 A pointer to an event_t. - */ -int event_matches(void* event1, void* event2); - /** * Return 1 if the two arguments are identical pointers. * @param a First argument. @@ -41,13 +38,6 @@ int event_matches(void* event1, void* event2); */ int reaction_matches(void* a, void* b); -/** - * Report a priority equal to the time of the given event. - * This is used for sorting pointers to event_t structs in the event queue. - * @param a A pointer to an event_t. - */ -pqueue_pri_t get_event_time(void* event); - /** * Report a priority equal to the index of the given reaction. * Used for sorting pointers to reaction_t structs in the @@ -56,25 +46,12 @@ pqueue_pri_t get_event_time(void* event); */ pqueue_pri_t get_reaction_index(void* reaction_t); -/** - * Return the given event's position in the queue. - * @param event A pointer to an event_t. - */ -size_t get_event_position(void* event); - /** * Return the given reaction's position in the queue. * @param reaction A pointer to a reaction_t. */ size_t get_reaction_position(void* reaction); -/** - * Set the given event's position in the queue. - * @param event A pointer to an event_t - * @param pos The position. - */ -void set_event_position(void* event, size_t pos); - /** * Set the given reaction's position in the queue. * @param event A pointer to a reaction_t. @@ -89,11 +66,4 @@ void set_reaction_position(void* reaction, size_t pos); */ void print_reaction(void* reaction); -/** - * Print some information about the given event. - * This only prints something if logging is set to DEBUG. - * @param event A pointer to an event_t. - */ -void print_event(void* event); - #endif /* PQUEUE_H */ diff --git a/include/core/utils/pqueue_base.h b/include/core/utils/pqueue_base.h index 8c9fc8f2c..b913ab64f 100644 --- a/include/core/utils/pqueue_base.h +++ b/include/core/utils/pqueue_base.h @@ -30,6 +30,14 @@ * search for equal elements present in the queue; and * - Removed capability to reassign priorities. * + * Modified by Byeonggil Jun (Apr, 2024). + * Changes: + * - Made the pqueue_cmp_pri_f function return do the three-way comparison + * rather than the two-way comparison. + * - The changed pqueue_cmp_pri_f function is used to check the equality of + * two elements in the pqueue_find_equal_same_priority function. + * - Remove the pqueue_find_equal function. + * * @brief Priority Queue function declarations used as a base for Lingua Franca priority queues. * * @{ @@ -81,7 +89,7 @@ typedef struct pqueue_t { * @param n the initial estimate of the number of queue items for which memory * should be preallocated * @param cmppri The callback function to run to compare two elements - * This callback should return 0 for 'lower' and non-zero + * This callback should return -1 for 'lower', 0 for 'same', and 1 * for 'higher', or vice versa if reverse priority is desired * @param getpri the callback function to run to set a score to an element * @param getpos the callback function to get the current element's position @@ -140,23 +148,21 @@ void* pqueue_pop(pqueue_t* q); void pqueue_empty_into(pqueue_t** dest, pqueue_t** src); /** - * Find the highest-ranking item with the same priority that matches the - * supplied entry. + * Return an entry with the same priority as the specified entry or NULL if there is no such entry. * @param q the queue * @param e the entry to compare against * @return NULL if no matching event has been found, otherwise the entry */ -void* pqueue_find_equal_same_priority(pqueue_t* q, void* e); +void* pqueue_find_same_priority(pqueue_t* q, void* e); /** - * Find the highest-ranking item with priority up to and including the given - * maximum priority that matches the supplied entry. + * Return an entry with the same priority (determined by `cmppri`) that matches the supplied entry (determined + * by `eqelem`) or `NULL` if there is no such entry. * @param q the queue * @param e the entry to compare against - * @param max_priority the maximum priority to consider * @return NULL if no matching event has been found, otherwise the entry */ -void* pqueue_find_equal(pqueue_t* q, void* e, pqueue_pri_t max_priority); +void* pqueue_find_equal_same_priority(pqueue_t* q, void* e); /** * Remove an item from the queue. diff --git a/include/core/utils/pqueue_tag.h b/include/core/utils/pqueue_tag.h index d69de5e56..e06e074be 100644 --- a/include/core/utils/pqueue_tag.h +++ b/include/core/utils/pqueue_tag.h @@ -1,5 +1,5 @@ /** - * @file tag_pqueue.h + * @file pqueue_tag.h * @author Byeonggil Jun * @author Edward A. Lee * @copyright (c) 2023, The University of California at Berkeley @@ -60,6 +60,16 @@ typedef struct { */ typedef pqueue_t pqueue_tag_t; +/** + * @brief Callback comparison function for the tag-based priority queue. + * Return -1 if the first argument is less than second, 0 if the two arguments are the same, + * and 1 otherwise. + * This function is of type pqueue_cmp_pri_f. + * @param priority1 A pointer to a pqueue_tag_element_t, cast to pqueue_pri_t. + * @param priority2 A pointer to a pqueue_tag_element_t, cast to pqueue_pri_t. + */ +int pqueue_tag_compare(pqueue_pri_t priority1, pqueue_pri_t priority2); + /** * @brief Create a priority queue sorted by tags. * @@ -69,6 +79,20 @@ typedef pqueue_t pqueue_tag_t; */ pqueue_tag_t* pqueue_tag_init(size_t initial_size); +/** + * @brief Create a priority queue that stores elements with a particular payload. + * + * @param cmppri the callback function to compare priorities + * @param eqelem the callback function to check equivalence of payloads. + * @param prt the callback function to print elements + * + * The elements of the priority queue will be of type pqueue_tag_element_t. + * The caller should call pqueue_tag_free() when finished with the queue. + * @return A dynamically allocated priority queue or NULL if memory allocation fails. + */ +pqueue_tag_t* pqueue_tag_init_customize(size_t initial_size, pqueue_cmp_pri_f cmppri, pqueue_eq_elem_f eqelem, + pqueue_print_entry_f prt); + /** * @brief Free all memory used by the queue including elements that are marked dynamic. * @@ -124,6 +148,15 @@ int pqueue_tag_insert_if_no_match(pqueue_tag_t* q, tag_t t); */ pqueue_tag_element_t* pqueue_tag_find_with_tag(pqueue_tag_t* q, tag_t t); +/** + * @brief Return an item with the same tag (`cmppri` returns 0) that matches the supplied element + * (`eqelem` returns non-zero) or NULL if there is none. + * @param q The queue. + * @param e The element. + * @return An entry with the specified tag or NULL if there isn't one. + */ +pqueue_tag_element_t* pqueue_tag_find_equal_same_tag(pqueue_tag_t* q, pqueue_tag_element_t* e); + /** * @brief Return highest-ranking item (the one with the least tag) without removing it. * @param q The queue. @@ -175,4 +208,12 @@ void pqueue_tag_remove(pqueue_tag_t* q, pqueue_tag_element_t* e); */ void pqueue_tag_remove_up_to(pqueue_tag_t* q, tag_t t); +/** + * Dump the queue and it's internal structure. + * @internal + * debug function only + * @param q the queue + */ +void pqueue_tag_dump(pqueue_tag_t* q); + #endif // PQUEUE_TAG_H diff --git a/lib/schedule.c b/lib/schedule.c index 8d237ce55..5aa9fd528 100644 --- a/lib/schedule.c +++ b/lib/schedule.c @@ -149,17 +149,14 @@ trigger_handle_t lf_schedule_trigger(environment_t* env, trigger_t* trigger, int if (!trigger->is_timer) { delay += trigger->offset; } - tag_t intended_tag = (tag_t){.time = env->current_tag.time + delay, .microstep = 0}; + tag_t intended_tag = lf_delay_tag(env->current_tag, delay); - LF_PRINT_DEBUG("lf_schedule_trigger: env->current_tag.time = " PRINTF_TIME ". Total logical delay = " PRINTF_TIME "", - env->current_tag.time, delay); + LF_PRINT_DEBUG("lf_schedule_trigger: env->current_tag = " PRINTF_TAG ". Total logical delay = " PRINTF_TIME "", + env->current_tag.time, env->current_tag.microstep, delay); interval_t min_spacing = trigger->period; event_t* e = lf_get_new_event(env); - // Initialize the next pointer. - e->next = NULL; - // Set the payload. e->token = token; @@ -173,6 +170,7 @@ trigger_handle_t lf_schedule_trigger(environment_t* env, trigger_t* trigger, int if (trigger->is_physical) { // Get the current physical time and assign it as the intended time. intended_tag.time = lf_time_physical() + delay; + intended_tag.microstep = 0; } else { // FIXME: We need to verify that we are executing within a reaction? // See reactor_threaded. @@ -198,18 +196,17 @@ trigger_handle_t lf_schedule_trigger(environment_t* env, trigger_t* trigger, int e->intended_tag = trigger->intended_tag; #endif - // Check for conflicts (a queued event with the same trigger and time). + // Check for conflicts (a queued event with the same trigger and tag). if (min_spacing <= 0) { // No minimum spacing defined. - e->time = intended_tag.time; - event_t* found = (event_t*)pqueue_find_equal_same_priority(env->event_q, e); + e->base.tag = intended_tag; + event_t* found = (event_t*)pqueue_tag_find_equal_same_tag(env->event_q, (pqueue_tag_element_t*)e); // Check for conflicts. Let events pile up in super dense time. if (found != NULL) { - intended_tag.microstep++; - // Skip to the last node in the linked list. - while (found->next != NULL) { - found = found->next; + while (found != NULL) { intended_tag.microstep++; + e->base.tag = intended_tag; + found = (event_t*)pqueue_tag_find_equal_same_tag(env->event_q, (pqueue_tag_element_t*)e); } if (lf_is_tag_after_stop_tag(env, intended_tag)) { LF_PRINT_DEBUG("Attempt to schedule an event after stop_tag was rejected."); @@ -218,9 +215,8 @@ trigger_handle_t lf_schedule_trigger(environment_t* env, trigger_t* trigger, int lf_recycle_event(env, e); return 0; } - // Hook the event into the list. - found->next = e; trigger->last_tag = intended_tag; + pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e); return (0); // FIXME: return value } // If there are not conflicts, schedule as usual. If intended time is @@ -248,14 +244,13 @@ trigger_handle_t lf_schedule_trigger(environment_t* env, trigger_t* trigger, int return (0); case replace: LF_PRINT_DEBUG("Policy is replace. Replacing the previous event."); - // If the event with the previous time is still on the event + // If the event with the previous tag is still on the event // queue, then replace the token. To find this event, we have // to construct a dummy event_t struct. event_t* dummy = lf_get_new_event(env); - dummy->next = NULL; dummy->trigger = trigger; - dummy->time = trigger->last_tag.time; - event_t* found = (event_t*)pqueue_find_equal_same_priority(env->event_q, dummy); + dummy->base.tag = trigger->last_tag; + event_t* found = (event_t*)pqueue_tag_find_equal_same_tag(env->event_q, (pqueue_tag_element_t*)dummy); if (found != NULL) { // Recycle the existing token and the new event @@ -292,16 +287,20 @@ trigger_handle_t lf_schedule_trigger(environment_t* env, trigger_t* trigger, int intended_tag.time = env->current_tag.time; } #endif + if (lf_tag_compare(intended_tag, env->current_tag) == 0) { + // Increment microstep. + intended_tag.microstep++; + } // Set the tag of the event. - e->time = intended_tag.time; - - // Do not schedule events if if the event time is past the stop time - // (current microsteps are checked earlier). - LF_PRINT_DEBUG("Comparing event with elapsed time " PRINTF_TIME " against stop time " PRINTF_TIME ".", - e->time - lf_time_start(), env->stop_tag.time - lf_time_start()); - if (e->time > env->stop_tag.time) { - LF_PRINT_DEBUG("lf_schedule_trigger: event time is past the timeout. Discarding event."); + e->base.tag = intended_tag; + + // Do not schedule events if the event time is past the stop tag. + LF_PRINT_DEBUG("Comparing event with elapsed tag " PRINTF_TAG " against stop tag " PRINTF_TAG ".", + e->base.tag.time - lf_time_start(), e->base.tag.microstep, env->stop_tag.time - lf_time_start(), + env->stop_tag.microstep); + if (lf_is_tag_after_stop_tag(env, intended_tag)) { + LF_PRINT_DEBUG("lf_schedule_trigger: event tag is past the timeout. Discarding event."); _lf_done_using(token); lf_recycle_event(env, e); return (0); @@ -312,16 +311,11 @@ trigger_handle_t lf_schedule_trigger(environment_t* env, trigger_t* trigger, int trigger->last_tag = intended_tag; // Queue the event. - // NOTE: There is no need for an explicit microstep because - // when this is called, all events at the current tag - // (time and microstep) have been pulled from the queue, - // and any new events added at this tag will go into the reaction_q - // rather than the event_q, so anything put in the event_q with this - // same time will automatically be executed at the next microstep. - LF_PRINT_LOG("Inserting event in the event queue with elapsed time " PRINTF_TIME ".", e->time - lf_time_start()); - pqueue_insert(env->event_q, e); - - tracepoint_schedule(env, trigger, e->time - env->current_tag.time); + LF_PRINT_LOG("Inserting event in the event queue with elapsed tag " PRINTF_TAG ".", + e->base.tag.time - lf_time_start(), e->base.tag.microstep); + pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e); + + tracepoint_schedule(env, trigger, e->base.tag.time - env->current_tag.time); // FIXME: make a record of handle and implement unschedule. // NOTE: Rather than wrapping around to get a negative number,