diff --git a/core/environment.c b/core/environment.c
index 40f369c62..4523c4721 100644
--- a/core/environment.c
+++ b/core/environment.c
@@ -38,6 +38,31 @@
#include "scheduler.h"
#endif
+//////////////////
+// Local functions, not intended for use outside this file.
+
+/**
+ * @brief Callback function to determine whether two events have the same trigger.
+ * This function is used by event queue and recycle.
+ * Return 1 if the triggers are identical, 0 otherwise.
+ * @param event1 A pointer to an event.
+ * @param event2 A pointer to an event.
+ */
+static int event_matches(void* event1, void* event2) {
+ return (((event_t*)event1)->trigger == ((event_t*)event2)->trigger);
+}
+
+/**
+ * @brief Callback function to print information about an event.
+ * This function is used by event queue and recycle.
+ * @param element A pointer to an event.
+ */
+static void print_event(void* event) {
+ event_t* e = (event_t*)event;
+ LF_PRINT_DEBUG("tag: " PRINTF_TAG ", trigger: %p, token: %p", e->base.tag.time, e->base.tag.microstep,
+ (void*)e->trigger, (void*)e->token);
+}
+
/**
* @brief Initialize the threaded part of the environment struct.
*/
@@ -58,6 +83,7 @@ static void environment_init_threaded(environment_t* env, int num_workers) {
(void)num_workers;
#endif
}
+
/**
* @brief Initialize the single-threaded-specific parts of the environment struct.
*/
@@ -127,18 +153,6 @@ static void environment_init_federated(environment_t* env, int num_is_present_fi
#endif
}
-void environment_init_tags(environment_t* env, instant_t start_time, interval_t duration) {
- env->current_tag = (tag_t){.time = start_time, .microstep = 0u};
-
- tag_t stop_tag = FOREVER_TAG_INITIALIZER;
- if (duration >= 0LL) {
- // A duration has been specified. Calculate the stop time.
- stop_tag.time = env->current_tag.time + duration;
- stop_tag.microstep = 0;
- }
- env->stop_tag = stop_tag;
-}
-
static void environment_free_threaded(environment_t* env) {
#if !defined(LF_SINGLE_THREADED)
free(env->thread_ids);
@@ -176,6 +190,9 @@ static void environment_free_federated(environment_t* env) {
#endif
}
+//////////////////
+// Functions defined in environment.h.
+
void environment_free(environment_t* env) {
free(env->name);
free(env->timer_triggers);
@@ -184,9 +201,8 @@ void environment_free(environment_t* env) {
free(env->reset_reactions);
free(env->is_present_fields);
free(env->is_present_fields_abbreviated);
- pqueue_free(env->event_q);
- pqueue_free(env->recycle_q);
- pqueue_free(env->next_q);
+ pqueue_tag_free(env->event_q);
+ pqueue_tag_free(env->recycle_q);
environment_free_threaded(env);
environment_free_single_threaded(env);
@@ -194,6 +210,18 @@ void environment_free(environment_t* env) {
environment_free_federated(env);
}
+void environment_init_tags(environment_t* env, instant_t start_time, interval_t duration) {
+ env->current_tag = (tag_t){.time = start_time, .microstep = 0u};
+
+ tag_t stop_tag = FOREVER_TAG_INITIALIZER;
+ if (duration >= 0LL) {
+ // A duration has been specified. Calculate the stop time.
+ stop_tag.time = env->current_tag.time + duration;
+ stop_tag.microstep = 0;
+ }
+ env->stop_tag = stop_tag;
+}
+
int environment_init(environment_t* env, const char* name, int id, int num_workers, int num_timers,
int num_startup_reactions, int num_shutdown_reactions, int num_reset_reactions,
int num_is_present_fields, int num_modes, int num_state_resets, int num_watchdogs,
@@ -261,12 +289,9 @@ int environment_init(environment_t* env, const char* name, int id, int num_worke
env->_lf_handle = 1;
// Initialize our priority queues.
- env->event_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_reverse_order, get_event_time, get_event_position,
- set_event_position, event_matches, print_event);
- env->recycle_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, get_event_time, get_event_position,
- set_event_position, event_matches, print_event);
- env->next_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, get_event_time, get_event_position,
- set_event_position, event_matches, print_event);
+ env->event_q = pqueue_tag_init_customize(INITIAL_EVENT_QUEUE_SIZE, pqueue_tag_compare, event_matches, print_event);
+ env->recycle_q =
+ pqueue_tag_init_customize(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, event_matches, print_event);
// Initialize functionality depending on target properties.
environment_init_threaded(env, num_workers);
diff --git a/core/federated/federate.c b/core/federated/federate.c
index 584e9fc3b..510c50cdd 100644
--- a/core/federated/federate.c
+++ b/core/federated/federate.c
@@ -1255,9 +1255,6 @@ static void handle_provisional_tag_advance_grant() {
// (which it should be). Do not do this if the federate has not fully
// started yet.
- instant_t dummy_event_time = PTAG.time;
- microstep_t dummy_event_relative_microstep = PTAG.microstep;
-
if (lf_tag_compare(env->current_tag, PTAG) == 0) {
// The current tag can equal the PTAG if we are at the start time
// or if this federate has been able to advance time to the current
@@ -1281,22 +1278,18 @@ static void handle_provisional_tag_advance_grant() {
// Nothing more to do.
LF_MUTEX_UNLOCK(&env->mutex);
return;
- } else if (PTAG.time == env->current_tag.time) {
- // We now know env->current_tag < PTAG, but the times are equal.
- // Adjust the microstep for scheduling the dummy event.
- dummy_event_relative_microstep -= env->current_tag.microstep;
}
// We now know env->current_tag < PTAG.
- if (dummy_event_time != FOREVER) {
- // Schedule a dummy event at the specified time and (relative) microstep.
+ if (PTAG.time != FOREVER) {
+ // Schedule a dummy event at the specified tag.
LF_PRINT_DEBUG("At tag " PRINTF_TAG ", inserting into the event queue a dummy event "
- "with time " PRINTF_TIME " and (relative) microstep " PRINTF_MICROSTEP ".",
- env->current_tag.time - start_time, env->current_tag.microstep, dummy_event_time - start_time,
- dummy_event_relative_microstep);
- // Dummy event points to a NULL trigger and NULL real event.
- event_t* dummy = _lf_create_dummy_events(env, NULL, dummy_event_time, NULL, dummy_event_relative_microstep);
- pqueue_insert(env->event_q, dummy);
+ "with time " PRINTF_TIME " and microstep " PRINTF_MICROSTEP ".",
+ env->current_tag.time - start_time, env->current_tag.microstep, PTAG.time - start_time,
+ PTAG.microstep);
+ // Dummy event points to a NULL trigger.
+ event_t* dummy = _lf_create_dummy_events(env, PTAG);
+ pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)dummy);
}
LF_MUTEX_UNLOCK(&env->mutex);
@@ -2410,8 +2403,9 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply)
// Create a dummy event that will force this federate to advance time and subsequently
// enable progress for downstream federates. Increment the time by ADVANCE_MESSAGE_INTERVAL
// to prevent too frequent dummy events.
- event_t* dummy = _lf_create_dummy_events(env, NULL, tag.time + ADVANCE_MESSAGE_INTERVAL, NULL, 0);
- pqueue_insert(env->event_q, dummy);
+ tag_t dummy_event_tag = (tag_t){.time = tag.time + ADVANCE_MESSAGE_INTERVAL, .microstep = tag.microstep};
+ event_t* dummy = _lf_create_dummy_events(env, dummy_event_tag);
+ pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)dummy);
}
LF_PRINT_DEBUG("Inserted a dummy event for logical time " PRINTF_TIME ".", tag.time - lf_time_start());
diff --git a/core/modal_models/modes.c b/core/modal_models/modes.c
index e6e6f5d95..922d02f9e 100644
--- a/core/modal_models/modes.c
+++ b/core/modal_models/modes.c
@@ -56,8 +56,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
// Forward declaration of functions and variables supplied by reactor_common.c
void _lf_trigger_reaction(environment_t* env, reaction_t* reaction, int worker_number);
-event_t* _lf_create_dummy_events(environment_t* env, trigger_t* trigger, instant_t time, event_t* next,
- microstep_t offset);
+event_t* _lf_create_dummy_events(environment_t* env, tag_t tag);
// ----------------------------------------------------------------------------
@@ -400,28 +399,17 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]
event->trigger != NULL) { // History transition to a different mode
// Remaining time that the event would have been waiting before mode was left
instant_t local_remaining_delay =
- event->time -
+ event->base.tag.time -
(state->next_mode->deactivation_time != 0 ? state->next_mode->deactivation_time : lf_time_start());
tag_t current_logical_tag = env->current_tag;
// Reschedule event with original local delay
LF_PRINT_DEBUG("Modes: Re-enqueuing event with a suspended delay of " PRINTF_TIME
" (previous TTH: " PRINTF_TIME ", Mode suspended at: " PRINTF_TIME ").",
- local_remaining_delay, event->time, state->next_mode->deactivation_time);
+ local_remaining_delay, event->base.tag.time, state->next_mode->deactivation_time);
tag_t schedule_tag = {.time = current_logical_tag.time + local_remaining_delay,
.microstep = (local_remaining_delay == 0 ? current_logical_tag.microstep + 1 : 0)};
_lf_schedule_at_tag(env, event->trigger, schedule_tag, event->token);
-
- // Also schedule events stacked up in super dense time.
- event_t* e = event;
- while (e->next != NULL) {
- schedule_tag.microstep++;
- _lf_schedule_at_tag(env, e->next->trigger, schedule_tag, e->next->token);
- event_t* tmp = e->next;
- e = tmp->next;
- // A fresh event was created by schedule, hence, recycle old one
- lf_recycle_event(env, tmp);
- }
}
// A fresh event was created by schedule, hence, recycle old one
lf_recycle_event(env, event);
@@ -490,7 +478,7 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]
// Retract all events from the event queue that are associated with now inactive modes
if (env->event_q != NULL) {
- size_t q_size = pqueue_size(env->event_q);
+ size_t q_size = pqueue_tag_size(env->event_q);
if (q_size > 0) {
event_t** delayed_removal = (event_t**)calloc(q_size, sizeof(event_t*));
size_t delayed_removal_count = 0;
@@ -509,7 +497,7 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]
LF_PRINT_DEBUG("Modes: Pulling %zu events from the event queue to suspend them. %d events are now suspended.",
delayed_removal_count, _lf_suspended_events_num);
for (size_t i = 0; i < delayed_removal_count; i++) {
- pqueue_remove(env->event_q, delayed_removal[i]);
+ pqueue_tag_remove(env->event_q, (pqueue_tag_element_t*)(delayed_removal[i]));
}
free(delayed_removal);
@@ -519,7 +507,8 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]
if (env->modes->triggered_reactions_request) {
// Insert a dummy event in the event queue for the next microstep to make
// sure startup/reset reactions (if any) are triggered as soon as possible.
- pqueue_insert(env->event_q, _lf_create_dummy_events(env, NULL, env->current_tag.time, NULL, 1));
+ tag_t dummy_event_tag = (tag_t){.time = env->current_tag.time, .microstep = 1};
+ pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)_lf_create_dummy_events(env, dummy_event_tag));
}
}
}
diff --git a/core/reactor.c b/core/reactor.c
index 235e1d34d..00df9e07f 100644
--- a/core/reactor.c
+++ b/core/reactor.c
@@ -220,7 +220,7 @@ int next(environment_t* env) {
// Enter the critical section and do not leave until we have
// determined which tag to commit to and start invoking reactions for.
LF_CRITICAL_SECTION_ENTER(env);
- event_t* event = (event_t*)pqueue_peek(env->event_q);
+ event_t* event = (event_t*)pqueue_tag_peek(env->event_q);
// pqueue_dump(event_q, event_q->prt);
// If there is no next event and -keepalive has been specified
// on the command line, then we will wait the maximum time possible.
@@ -231,13 +231,7 @@ int next(environment_t* env) {
lf_set_stop_tag(env, (tag_t){.time = env->current_tag.time, .microstep = env->current_tag.microstep + 1});
}
} else {
- next_tag.time = event->time;
- // Deduce the microstep
- if (next_tag.time == env->current_tag.time) {
- next_tag.microstep = env->current_tag.microstep + 1;
- } else {
- next_tag.microstep = 0;
- }
+ next_tag = event->base.tag;
}
if (lf_is_tag_after_stop_tag(env, next_tag)) {
@@ -245,10 +239,10 @@ int next(environment_t* env) {
next_tag = env->stop_tag;
}
- LF_PRINT_LOG("Next event (elapsed) time is " PRINTF_TIME ".", next_tag.time - start_time);
+ LF_PRINT_LOG("Next event (elapsed) tag is " PRINTF_TAG ".", next_tag.time - start_time, next_tag.microstep);
// Wait until physical time >= event.time.
int finished_sleep = wait_until(env, next_tag.time);
- LF_PRINT_LOG("Next event (elapsed) time is " PRINTF_TIME ".", next_tag.time - start_time);
+ LF_PRINT_LOG("Next event (elapsed) tag is " PRINTF_TAG ".", next_tag.time - start_time, next_tag.microstep);
if (finished_sleep != 0) {
LF_PRINT_DEBUG("***** wait_until was interrupted.");
// Sleep was interrupted. This could happen when a physical action
@@ -258,10 +252,10 @@ int next(environment_t* env) {
LF_CRITICAL_SECTION_EXIT(env);
return 1;
}
- // Advance current time to match that of the first event on the queue.
+ // Advance current tag to match that of the first event on the queue.
// We can now leave the critical section. Any events that will be added
// to the queue asynchronously will have a later tag than the current one.
- _lf_advance_logical_time(env, next_tag.time);
+ _lf_advance_tag(env, next_tag);
// Trigger shutdown reactions if appropriate.
if (lf_tag_compare(env->current_tag, env->stop_tag) >= 0) {
diff --git a/core/reactor_common.c b/core/reactor_common.c
index ba6f9574c..33e5582f5 100644
--- a/core/reactor_common.c
+++ b/core/reactor_common.c
@@ -230,19 +230,15 @@ void _lf_pop_events(environment_t* env) {
_lf_handle_mode_triggered_reactions(env);
#endif
- event_t* event = (event_t*)pqueue_peek(env->event_q);
- while (event != NULL && event->time == env->current_tag.time) {
- event = (event_t*)pqueue_pop(env->event_q);
+ event_t* event = (event_t*)pqueue_tag_peek(env->event_q);
+ while (event != NULL && lf_tag_compare(event->base.tag, env->current_tag) == 0) {
+ event = (event_t*)pqueue_tag_pop(env->event_q);
- if (event->is_dummy) {
+ if (event->trigger == NULL) {
LF_PRINT_DEBUG("Popped dummy event from the event queue.");
- if (event->next != NULL) {
- LF_PRINT_DEBUG("Putting event from the event queue for the next microstep.");
- pqueue_insert(env->next_q, event->next);
- }
lf_recycle_event(env, event);
// Peek at the next event in the event queue.
- event = (event_t*)pqueue_peek(env->event_q);
+ event = (event_t*)pqueue_tag_peek(env->event_q);
continue;
}
@@ -328,31 +324,17 @@ void _lf_pop_events(environment_t* env) {
// Mark the trigger present.
event->trigger->status = present;
- // If this event points to a next event, insert it into the next queue.
- if (event->next != NULL) {
- // Insert the next event into the next queue.
- pqueue_insert(env->next_q, event->next);
- }
-
lf_recycle_event(env, event);
// Peek at the next event in the event queue.
- event = (event_t*)pqueue_peek(env->event_q);
+ event = (event_t*)pqueue_tag_peek(env->event_q);
};
-
- LF_PRINT_DEBUG("There are %zu events deferred to the next microstep.", pqueue_size(env->next_q));
-
- // After populating the reaction queue, see if there are things on the
- // next queue to put back into the event queue.
- while (pqueue_peek(env->next_q) != NULL) {
- pqueue_insert(env->event_q, pqueue_pop(env->next_q));
- }
}
event_t* lf_get_new_event(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);
// Recycle event_t structs, if possible.
- event_t* e = (event_t*)pqueue_pop(env->recycle_q);
+ event_t* e = (event_t*)pqueue_tag_pop(env->recycle_q);
if (e == NULL) {
e = (event_t*)calloc(1, sizeof(struct event_t));
if (e == NULL)
@@ -376,7 +358,7 @@ void _lf_initialize_timer(environment_t* env, trigger_t* timer) {
// && (timer->offset != 0 || timer->period != 0)) {
event_t* e = lf_get_new_event(env);
e->trigger = timer;
- e->time = lf_time_logical(env) + timer->offset;
+ e->base.tag = (tag_t){.time = lf_time_logical(env) + timer->offset, .microstep = 0};
_lf_add_suspended_event(e);
return;
}
@@ -401,9 +383,9 @@ void _lf_initialize_timer(environment_t* env, trigger_t* timer) {
// Recycle event_t structs, if possible.
event_t* e = lf_get_new_event(env);
e->trigger = timer;
- e->time = lf_time_logical(env) + delay;
+ e->base.tag = (tag_t){.time = lf_time_logical(env) + delay, .microstep = 0};
// NOTE: No lock is being held. Assuming this only happens at startup.
- pqueue_insert(env->event_q, e);
+ pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e);
tracepoint_schedule(env, timer, delay); // Trace even though schedule is not called.
}
@@ -462,38 +444,21 @@ void _lf_trigger_shutdown_reactions(environment_t* env) {
void lf_recycle_event(environment_t* env, event_t* e) {
assert(env != GLOBAL_ENVIRONMENT);
- e->time = 0LL;
+ e->base.tag = (tag_t){.time = 0LL, .microstep = 0};
e->trigger = NULL;
- e->pos = 0;
e->token = NULL;
- e->is_dummy = false;
#ifdef FEDERATED_DECENTRALIZED
e->intended_tag = (tag_t){.time = NEVER, .microstep = 0u};
#endif
- e->next = NULL;
- pqueue_insert(env->recycle_q, e);
+ pqueue_tag_insert(env->recycle_q, (pqueue_tag_element_t*)e);
}
-event_t* _lf_create_dummy_events(environment_t* env, trigger_t* trigger, instant_t time, event_t* next,
- microstep_t offset) {
- event_t* first_dummy = lf_get_new_event(env);
- event_t* dummy = first_dummy;
- dummy->time = time;
- dummy->is_dummy = true;
- dummy->trigger = trigger;
- while (offset > 0) {
- if (offset == 1) {
- dummy->next = next;
- break;
- }
- dummy->next = lf_get_new_event(env);
- dummy = dummy->next;
- dummy->time = time;
- dummy->is_dummy = true;
- dummy->trigger = trigger;
- offset--;
- }
- return first_dummy;
+event_t* _lf_create_dummy_events(environment_t* env, tag_t tag) {
+ event_t* dummy = lf_get_new_event(env);
+ dummy->base.tag = tag;
+
+ dummy->trigger = NULL;
+ return dummy;
}
void lf_replace_token(event_t* event, lf_token_t* token) {
@@ -531,7 +496,7 @@ trigger_handle_t _lf_schedule_at_tag(environment_t* env, trigger_t* trigger, tag
event_t* e = lf_get_new_event(env);
// Set the event time
- e->time = tag.time;
+ e->base.tag = tag;
tracepoint_schedule(env, trigger, tag.time - current_logical_tag.time);
@@ -547,133 +512,37 @@ trigger_handle_t _lf_schedule_at_tag(environment_t* env, trigger_t* trigger, tag
e->intended_tag = trigger->intended_tag;
#endif
- event_t* found = (event_t*)pqueue_find_equal_same_priority(env->event_q, e);
+ event_t* found = (event_t*)pqueue_tag_find_equal_same_tag(env->event_q, (pqueue_tag_element_t*)e);
if (found != NULL) {
- if (tag.microstep == 0u) {
- // The microstep is 0, which means that the event is being scheduled
- // at a future time and at the beginning of the skip list of events
- // at that time.
- // In case the event is a dummy event
- // convert it to a real event.
- found->is_dummy = false;
- switch (trigger->policy) {
- case drop:
- if (found->token != token) {
- _lf_done_using(token);
- }
- lf_recycle_event(env, e);
- return (0);
- break;
- case replace:
- // Replace the payload of the event at the head with our
- // current payload.
- lf_replace_token(found, token);
+ switch (trigger->policy) {
+ case drop:
+ if (found->token != token) {
+ _lf_done_using(token);
+ }
+ lf_recycle_event(env, e);
+ return (0);
+ break;
+ case replace:
+ // Replace the payload of the event at the head with our
+ // current payload.
+ lf_replace_token(found, token);
+ lf_recycle_event(env, e);
+ return 0;
+ break;
+ default:
+ // Adding a microstep to the original
+ // intended tag.
+ tag.microstep++;
+ e->base.tag = tag;
+ if (lf_is_tag_after_stop_tag(env, (tag_t){.time = tag.time, .microstep = tag.microstep})) {
+ // Scheduling e will incur a microstep after the stop tag,
+ // which is illegal.
lf_recycle_event(env, e);
return 0;
- break;
- default:
- // Adding a microstep to the original
- // intended tag.
- if (lf_is_tag_after_stop_tag(env, (tag_t){.time = found->time, .microstep = 1})) {
- // Scheduling e will incur a microstep after the stop tag,
- // which is illegal.
- lf_recycle_event(env, e);
- return 0;
- }
- if (found->next != NULL) {
- lf_print_error("_lf_schedule_at_tag: in-order contract violated.");
- return -1;
- }
- found->next = e;
}
- } else {
- // We are requesting a microstep greater than 0
- // where there is already an event for this trigger on the event queue.
- // That event may itself be a dummy event for a real event that is
- // also at a microstep greater than 0.
- // We have to insert our event into the chain or append it
- // to the end of the chain, depending on which microstep is lesser.
- microstep_t microstep_of_found = 0;
- if (tag.time == current_logical_tag.time) {
- // This is a situation where the head of the queue
- // is an event with microstep == current_microstep + 1
- // which should be reflected in our steps calculation.
- microstep_of_found += current_logical_tag.microstep + 1; // Indicating that
- // the found event
- // is at this microstep.
- }
- // Follow the chain of events until the right point
- // to insert the new event.
- while (microstep_of_found < tag.microstep - 1) {
- if (found->next == NULL) {
- // The chain stops short of where we want to be.
- // If it exactly one microstep short of where we want to be,
- // then we don't need a dummy. Otherwise, we do.
- microstep_t undershot_by = (tag.microstep - 1) - microstep_of_found;
- if (undershot_by > 0) {
- found->next = _lf_create_dummy_events(env, trigger, tag.time, e, undershot_by);
- } else {
- found->next = e;
- }
- return 1;
- }
- found = found->next;
- microstep_of_found++;
- }
- // At this point, microstep_of_found == tag.microstep - 1.
- if (found->next == NULL) {
- found->next = e;
- } else {
- switch (trigger->policy) {
- case drop:
- if (found->next->token != token) {
- _lf_done_using(token);
- }
- lf_recycle_event(env, e);
- return 0;
- break;
- case replace:
- // Replace the payload of the event at the head with our
- // current payload.
- lf_replace_token(found->next, token);
- lf_recycle_event(env, e);
- return 0;
- break;
- default:
- // Adding a microstep to the original
- // intended tag.
- if (lf_is_tag_after_stop_tag(env, (tag_t){.time = found->time, .microstep = microstep_of_found + 1})) {
- // Scheduling e will incur a microstep at timeout,
- // which is illegal.
- lf_recycle_event(env, e);
- return 0;
- }
- if (found->next->next != NULL) {
- lf_print_error("_lf_schedule_at_tag: in-order contract violated.");
- return -1;
- }
- found->next->next = e;
- }
- }
- }
- } else {
- // No existing event queued.
- microstep_t relative_microstep = tag.microstep;
- if (tag.time == current_logical_tag.time) {
- relative_microstep -= current_logical_tag.microstep;
- }
- if ((tag.time == current_logical_tag.time && relative_microstep == 1 && env->execution_started) ||
- tag.microstep == 0) {
- // Do not need a dummy event if we are scheduling at 1 microstep
- // in the future at current time or at microstep 0 in a future time.
- // Note that if execution hasn't started, then we have to insert dummy events.
- pqueue_insert(env->event_q, e);
- } else {
- // Create a dummy event. Insert it into the queue, and let its next
- // pointer point to the actual event.
- pqueue_insert(env->event_q, _lf_create_dummy_events(env, trigger, tag.time, e, relative_microstep));
}
}
+ pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e);
trigger_handle_t return_value = env->_lf_handle++;
if (env->_lf_handle < 0) {
env->_lf_handle = 1;
@@ -757,36 +626,37 @@ trigger_handle_t _lf_insert_reactions_for_trigger(environment_t* env, trigger_t*
return 1;
}
-void _lf_advance_logical_time(environment_t* env, instant_t next_time) {
+void _lf_advance_tag(environment_t* env, tag_t next_tag) {
assert(env != GLOBAL_ENVIRONMENT);
-// FIXME: The following checks that _lf_advance_logical_time()
+// FIXME: The following checks that _lf_advance_tag()
// is being called correctly. Namely, check if logical time
// is being pushed past the head of the event queue. This should
-// never happen if _lf_advance_logical_time() is called correctly.
+// never happen if _lf_advance_tag() is called correctly.
// This is commented out because it will add considerable overhead
// to the ordinary execution of LF programs. Instead, there might
// be a need for a target property that enables these kinds of logic
// assertions for development purposes only.
#ifndef NDEBUG
- event_t* next_event = (event_t*)pqueue_peek(env->event_q);
+ event_t* next_event = (event_t*)pqueue_tag_peek(env->event_q);
if (next_event != NULL) {
- if (next_time > next_event->time) {
- lf_print_error_and_exit("_lf_advance_logical_time(): Attempted to move time to " PRINTF_TIME ", which is "
- "past the head of the event queue, " PRINTF_TIME ".",
- next_time - start_time, next_event->time - start_time);
+ if (lf_tag_compare(next_tag, next_event->base.tag) > 0) {
+ lf_print_error_and_exit("_lf_advance_tag(): Attempted to move tag to " PRINTF_TAG ", which is "
+ "past the head of the event queue, " PRINTF_TAG ".",
+ next_tag.time - start_time, next_tag.microstep, next_event->base.tag.time - start_time,
+ next_event->base.tag.microstep);
}
}
#endif
- if (env->current_tag.time < next_time) {
- env->current_tag.time = next_time;
- env->current_tag.microstep = 0;
- } else if (env->current_tag.time == next_time) {
- env->current_tag.microstep++;
+ if (lf_tag_compare(env->current_tag, next_tag) < 0) {
+ env->current_tag = next_tag;
} else {
- lf_print_error_and_exit("_lf_advance_logical_time(): Attempted to move tag back in time.");
+ lf_print_error_and_exit("_lf_advance_tag(): Attempted to move (elapsed) tag to " PRINTF_TAG ", which is "
+ "earlier than or equal to the (elapsed) current tag, " PRINTF_TAG ".",
+ next_tag.time - start_time, next_tag.microstep, env->current_tag.time - start_time,
+ env->current_tag.microstep);
}
- LF_PRINT_LOG("Advanced (elapsed) tag to " PRINTF_TAG " at physical time " PRINTF_TIME, next_time - start_time,
+ LF_PRINT_LOG("Advanced (elapsed) tag to " PRINTF_TAG " at physical time " PRINTF_TIME, next_tag.time - start_time,
env->current_tag.microstep, lf_time_physical_elapsed());
}
@@ -1269,12 +1139,12 @@ void termination(void) {
_lf_terminate_modal_reactors(&env[i]);
#endif
// If the event queue still has events on it, report that.
- if (env[i].event_q != NULL && pqueue_size(env[i].event_q) > 0) {
+ if (env[i].event_q != NULL && pqueue_tag_size(env[i].event_q) > 0) {
lf_print_warning("---- There are %zu unprocessed future events on the event queue.",
- pqueue_size(env[i].event_q));
- event_t* event = (event_t*)pqueue_peek(env[i].event_q);
- interval_t event_time = event->time - start_time;
- lf_print_warning("---- The first future event has timestamp " PRINTF_TIME " after start time.", event_time);
+ pqueue_tag_size(env[i].event_q));
+ event_t* event = (event_t*)pqueue_tag_peek(env[i].event_q);
+ lf_print_warning("---- The first future event has timestamp " PRINTF_TAG " after start tag.",
+ event->base.tag.time - start_time, event->base.tag.microstep);
}
// Print elapsed times.
// If these are negative, then the program failed to start up.
diff --git a/core/threaded/reactor_threaded.c b/core/threaded/reactor_threaded.c
index e364675d6..57f888fc2 100644
--- a/core/threaded/reactor_threaded.c
+++ b/core/threaded/reactor_threaded.c
@@ -276,24 +276,18 @@ tag_t get_next_event_tag(environment_t* env) {
assert(env != GLOBAL_ENVIRONMENT);
// Peek at the earliest event in the event queue.
- event_t* event = (event_t*)pqueue_peek(env->event_q);
+ event_t* event = (event_t*)pqueue_tag_peek(env->event_q);
tag_t next_tag = FOREVER_TAG;
if (event != NULL) {
// There is an event in the event queue.
- if (event->time < env->current_tag.time) {
- lf_print_error_and_exit("get_next_event_tag(): Earliest event on the event queue (" PRINTF_TIME ") is "
- "earlier than the current time (" PRINTF_TIME ").",
- event->time - start_time, env->current_tag.time - start_time);
+ if (lf_tag_compare(event->base.tag, env->current_tag) < 0) {
+ lf_print_error_and_exit("get_next_event_tag(): Earliest event on the event queue (" PRINTF_TAG ") is "
+ "earlier than the current tag (" PRINTF_TAG ").",
+ event->base.tag.time - start_time, event->base.tag.microstep,
+ env->current_tag.time - start_time, env->current_tag.microstep);
}
- next_tag.time = event->time;
- if (next_tag.time == env->current_tag.time) {
- LF_PRINT_DEBUG("Earliest event matches current time. Incrementing microstep. Event is dummy: %d.",
- event->is_dummy);
- next_tag.microstep = env->current_tag.microstep + 1;
- } else {
- next_tag.microstep = 0;
- }
+ next_tag = event->base.tag;
}
// If a timeout tag was given, adjust the next_tag from the
@@ -302,7 +296,7 @@ tag_t get_next_event_tag(environment_t* env) {
next_tag = env->stop_tag;
}
LF_PRINT_LOG("Earliest event on the event queue (or stop time if empty) is " PRINTF_TAG ". Event queue has size %zu.",
- next_tag.time - start_time, next_tag.microstep, pqueue_size(env->event_q));
+ next_tag.time - start_time, next_tag.microstep, pqueue_tag_size(env->event_q));
return next_tag;
}
@@ -410,7 +404,7 @@ void _lf_next_locked(environment_t* env) {
// behavior with centralized coordination as with unfederated execution.
#else // not FEDERATED_CENTRALIZED nor LF_ENCLAVES
- if (pqueue_peek(env->event_q) == NULL && !keepalive_specified) {
+ if (pqueue_tag_peek(env->event_q) == NULL && !keepalive_specified) {
// There is no event on the event queue and keepalive is false.
// No event in the queue
// keepalive is not set so we should stop.
@@ -478,7 +472,7 @@ void _lf_next_locked(environment_t* env) {
}
// At this point, finally, we have an event to process.
- _lf_advance_logical_time(env, next_tag.time);
+ _lf_advance_tag(env, next_tag);
_lf_start_time_step(env);
@@ -488,7 +482,7 @@ void _lf_next_locked(environment_t* env) {
_lf_trigger_shutdown_reactions(env);
}
- // Pop all events from event_q with timestamp equal to env->current_tag.time,
+ // Pop all events from event_q with timestamp equal to env->current_tag,
// extract all the reactions triggered by these events, and
// stick them into the reaction queue.
_lf_pop_events(env);
@@ -975,8 +969,9 @@ void lf_print_snapshot(environment_t* env) {
LF_PRINT_DEBUG("Pending:");
// pqueue_dump(reaction_q, print_reaction); FIXME: reaction_q is not
// accessible here
- LF_PRINT_DEBUG("Event queue size: %zu. Contents:", pqueue_size(env->event_q));
- pqueue_dump(env->event_q, print_reaction);
+ LF_PRINT_DEBUG("Event queue size: %zu. Contents:", pqueue_tag_size(env->event_q));
+ // FIXME: There is no pqueue_tag_dump now
+ pqueue_tag_dump(env->event_q);
LF_PRINT_DEBUG(">>> END Snapshot");
}
}
diff --git a/core/utils/pqueue.c b/core/utils/pqueue.c
index d26d8b2aa..b2bf05090 100644
--- a/core/utils/pqueue.c
+++ b/core/utils/pqueue.c
@@ -2,10 +2,14 @@
* @file pqueue.c
* @author Marten Lohstroh
* @author Edward A. Lee
+ * @author Byeonggil Jun
* @copyright (c) 2020-2023, The University of California at Berkeley.
* License: BSD 2-clause
*
- * @brief Priority queue definitions for the event queue and reaction queue.
+ * @brief Priority queue definitions for queues where the priority is a number that can be compared with ordinary
+ * numerical comparisons.
+ *
+ * This is used for the reaction queue. The event queue uses a `tag_t` struct for its priority, so it cannot use this.
*/
#include "low_level_platform.h"
@@ -13,7 +17,7 @@
#include "util.h"
#include "lf_types.h"
-int in_reverse_order(pqueue_pri_t thiz, pqueue_pri_t that) { return (thiz > that); }
+int in_reverse_order(pqueue_pri_t thiz, pqueue_pri_t that) { return (thiz > that) ? 1 : (thiz < that) ? -1 : 0; }
int in_no_particular_order(pqueue_pri_t thiz, pqueue_pri_t that) {
(void)thiz;
@@ -21,28 +25,15 @@ int in_no_particular_order(pqueue_pri_t thiz, pqueue_pri_t that) {
return 0;
}
-int event_matches(void* event1, void* event2) { return (((event_t*)event1)->trigger == ((event_t*)event2)->trigger); }
-
int reaction_matches(void* a, void* b) { return (a == b); }
-pqueue_pri_t get_event_time(void* event) { return (pqueue_pri_t)(((event_t*)event)->time); }
-
pqueue_pri_t get_reaction_index(void* reaction) { return ((reaction_t*)reaction)->index; }
-size_t get_event_position(void* event) { return ((event_t*)event)->pos; }
-
size_t get_reaction_position(void* reaction) { return ((reaction_t*)reaction)->pos; }
-void set_event_position(void* event, size_t pos) { ((event_t*)event)->pos = pos; }
-
void set_reaction_position(void* reaction, size_t pos) { ((reaction_t*)reaction)->pos = pos; }
void print_reaction(void* reaction) {
reaction_t* r = (reaction_t*)reaction;
LF_PRINT_DEBUG("%s: chain_id: %llu, index: %llx, reaction: %p", r->name, r->chain_id, r->index, reaction);
}
-
-void print_event(void* event) {
- event_t* e = (event_t*)event;
- LF_PRINT_DEBUG("time: " PRINTF_TIME ", trigger: %p, token: %p", e->time, (void*)e->trigger, (void*)e->token);
-}
diff --git a/core/utils/pqueue_base.c b/core/utils/pqueue_base.c
index 30d84286e..5d602e1de 100644
--- a/core/utils/pqueue_base.c
+++ b/core/utils/pqueue_base.c
@@ -29,6 +29,15 @@
* - The provided pqueue_eq_elem_f implementation is used to test and
* search for equal elements present in the queue; and
* - Removed capability to reassign priorities.
+ *
+ * Modified by Byeonggil Jun (Apr, 2024).
+ * Changes:
+ * - Made the pqueue_cmp_pri_f function return do the three-way comparison
+ * rather than the two-way comparison.
+ * - The changed pqueue_cmp_pri_f function is used to check the equality of
+ * two elements in the pqueue_find_equal_same_priority function.
+ * - Remove the pqueue_find_equal function.
+ *
*/
#include
@@ -44,9 +53,9 @@
#define LF_RIGHT(i) (((i) << 1) + 1)
#define LF_PARENT(i) ((i) >> 1)
-void* find_equal(pqueue_t* q, void* e, int pos, pqueue_pri_t max) {
+static void* find_same_priority(pqueue_t* q, void* e, int pos) {
if (pos < 0) {
- lf_print_error_and_exit("find_equal() called with a negative pos index.");
+ lf_print_error_and_exit("find_same_priority() called with a negative pos index.");
}
// Stop the recursion when we've reached the end of the
@@ -59,19 +68,20 @@ void* find_equal(pqueue_t* q, void* e, int pos, pqueue_pri_t max) {
void* rval;
void* curr = q->d[pos];
- // Stop the recursion when we've surpassed the maximum priority.
- if (!curr || q->cmppri(q->getpri(curr), max)) {
+ // Stop the recursion once we've surpassed the priority of the element
+ // we're looking for.
+ if (!curr || q->cmppri(q->getpri(curr), q->getpri(e)) == 1) {
return NULL;
}
- if (q->eqelem(curr, e)) {
+ if (q->cmppri(q->getpri(curr), q->getpri(e)) == 0) {
return curr;
} else {
- rval = find_equal(q, e, LF_LEFT(pos), max);
+ rval = find_same_priority(q, e, LF_LEFT(pos));
if (rval)
return rval;
else
- return find_equal(q, e, LF_RIGHT(pos), max);
+ return find_same_priority(q, e, LF_RIGHT(pos));
}
return NULL;
}
@@ -93,11 +103,11 @@ void* find_equal_same_priority(pqueue_t* q, void* e, int pos) {
// Stop the recursion once we've surpassed the priority of the element
// we're looking for.
- if (!curr || q->cmppri(q->getpri(curr), q->getpri(e))) {
+ if (!curr || q->cmppri(q->getpri(curr), q->getpri(e)) == 1) {
return NULL;
}
- if (q->getpri(curr) == q->getpri(e) && q->eqelem(curr, e)) {
+ if (q->cmppri(q->getpri(curr), q->getpri(e)) == 0 && q->eqelem(curr, e)) {
return curr;
} else {
rval = find_equal_same_priority(q, e, LF_LEFT(pos));
@@ -157,7 +167,7 @@ static size_t maxchild(pqueue_t* q, size_t i) {
if (child_node >= q->size)
return 0;
- if ((child_node + 1) < q->size && (q->cmppri(q->getpri(q->d[child_node]), q->getpri(q->d[child_node + 1]))))
+ if ((child_node + 1) < q->size && (q->cmppri(q->getpri(q->d[child_node]), q->getpri(q->d[child_node + 1])) == 1))
child_node++; /* use right child instead of left */
return child_node;
@@ -168,7 +178,7 @@ static size_t bubble_up(pqueue_t* q, size_t i) {
void* moving_node = q->d[i];
pqueue_pri_t moving_pri = q->getpri(moving_node);
- for (parent_node = LF_PARENT(i); ((i > 1) && q->cmppri(q->getpri(q->d[parent_node]), moving_pri));
+ for (parent_node = LF_PARENT(i); ((i > 1) && q->cmppri(q->getpri(q->d[parent_node]), moving_pri) == 1);
i = parent_node, parent_node = LF_PARENT(i)) {
q->d[i] = q->d[parent_node];
q->setpos(q->d[i], i);
@@ -184,7 +194,7 @@ static void percolate_down(pqueue_t* q, size_t i) {
void* moving_node = q->d[i];
pqueue_pri_t moving_pri = q->getpri(moving_node);
- while ((child_node = maxchild(q, i)) && q->cmppri(moving_pri, q->getpri(q->d[child_node]))) {
+ while ((child_node = maxchild(q, i)) && (q->cmppri(moving_pri, q->getpri(q->d[child_node])) == 1)) {
q->d[i] = q->d[child_node];
q->setpos(q->d[i], i);
i = child_node;
@@ -194,9 +204,9 @@ static void percolate_down(pqueue_t* q, size_t i) {
q->setpos(moving_node, i);
}
-void* pqueue_find_equal_same_priority(pqueue_t* q, void* e) { return find_equal_same_priority(q, e, 1); }
+void* pqueue_find_same_priority(pqueue_t* q, void* e) { return find_same_priority(q, e, 1); }
-void* pqueue_find_equal(pqueue_t* q, void* e, pqueue_pri_t max) { return find_equal(q, e, 1, max); }
+void* pqueue_find_equal_same_priority(pqueue_t* q, void* e) { return find_equal_same_priority(q, e, 1); }
int pqueue_insert(pqueue_t* q, void* d) {
void** tmp;
@@ -227,7 +237,7 @@ int pqueue_remove(pqueue_t* q, void* d) {
return 0; // Nothing to remove
size_t posn = q->getpos(d);
q->d[posn] = q->d[--q->size];
- if (q->cmppri(q->getpri(d), q->getpri(q->d[posn])))
+ if (q->cmppri(q->getpri(d), q->getpri(q->d[posn])) == 1)
bubble_up(q, posn);
else
percolate_down(q, posn);
@@ -320,7 +330,7 @@ static int subtree_is_valid(pqueue_t* q, int pos) {
if ((size_t)left_pos < q->size) {
/* has a left child */
- if (q->cmppri(q->getpri(q->d[pos]), q->getpri(q->d[LF_LEFT(pos)])))
+ if (q->cmppri(q->getpri(q->d[pos]), q->getpri(q->d[LF_LEFT(pos)])) == 1)
return 0;
if (!subtree_is_valid(q, LF_LEFT(pos)))
return 0;
@@ -332,7 +342,7 @@ static int subtree_is_valid(pqueue_t* q, int pos) {
}
if ((size_t)right_pos < q->size) {
/* has a right child */
- if (q->cmppri(q->getpri(q->d[pos]), q->getpri(q->d[LF_RIGHT(pos)])))
+ if (q->cmppri(q->getpri(q->d[pos]), q->getpri(q->d[LF_RIGHT(pos)])) == 1)
return 0;
if (!subtree_is_valid(q, LF_RIGHT(pos)))
return 0;
diff --git a/core/utils/pqueue_support.h b/core/utils/pqueue_support.h
deleted file mode 100644
index c751b67f1..000000000
--- a/core/utils/pqueue_support.h
+++ /dev/null
@@ -1,114 +0,0 @@
-/*************
-Copyright (c) 2022, The University of California at Berkeley.
-
-Redistribution and use in source and binary forms, with or without modification,
-are permitted provided that the following conditions are met:
-
-1. Redistributions of source code must retain the above copyright notice,
- this list of conditions and the following disclaimer.
-
-2. Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
-EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
-MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
-THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
-SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
-PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
-STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF
-THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-***************/
-
-/**
- * @file pqueue_support.h
- * @author Edward A. Lee
- * @author Marten Lohstroh
- * @brief Header-only support functions for pqueue.
- */
-
-#ifndef PQUEUE_SUPPORT_H
-#define PQUEUE_SUPPORT_H
-
-#include "../reactor.h"
-
-// ********** Priority Queue Support Start
-
-/**
- * Return whether the first and second argument are given in reverse order.
- */
-static int in_reverse_order(pqueue_pri_t thiz, pqueue_pri_t that) { return (thiz > that); }
-
-/**
- * Return false (0) regardless of reaction order.
- */
-static int in_no_particular_order(pqueue_pri_t thiz, pqueue_pri_t that) { return false; }
-
-/**
- * Return whether or not the given events have matching triggers.
- */
-static int event_matches(void* next, void* curr) { return (((event_t*)next)->trigger == ((event_t*)curr)->trigger); }
-
-/**
- * Return whether or not the given reaction_t pointers
- * point to the same struct.
- */
-static int reaction_matches(void* next, void* curr) { return (next == curr); }
-
-/**
- * Report a priority equal to the time of the given event.
- * Used for sorting pointers to event_t structs in the event queue.
- */
-static pqueue_pri_t get_event_time(void* a) { return (pqueue_pri_t)(((event_t*)a)->time); }
-
-/**
- * Report a priority equal to the index of the given reaction.
- * Used for sorting pointers to reaction_t structs in the
- * blocked and executing queues.
- */
-static pqueue_pri_t get_reaction_index(void* a) { return ((reaction_t*)a)->index; }
-
-/**
- * Return the given event's position in the queue.
- */
-static size_t get_event_position(void* a) { return ((event_t*)a)->pos; }
-
-/**
- * Return the given reaction's position in the queue.
- */
-static size_t get_reaction_position(void* a) { return ((reaction_t*)a)->pos; }
-
-/**
- * Set the given event's position in the queue.
- */
-static void set_event_position(void* a, size_t pos) { ((event_t*)a)->pos = pos; }
-
-/**
- * Return the given reaction's position in the queue.
- */
-static void set_reaction_position(void* a, size_t pos) { ((reaction_t*)a)->pos = pos; }
-
-/**
- * Print some information about the given reaction.
- *
- * DEBUG function only.
- */
-static void print_reaction(void* reaction) {
- reaction_t* r = (reaction_t*)reaction;
- LF_PRINT_DEBUG("%s: chain_id:%llu, index: %llx, reaction: %p", r->name, r->chain_id, r->index, (void*)r);
-}
-
-/**
- * Print some information about the given event.
- *
- * DEBUG function only.
- */
-static void print_event(void* event) {
- event_t* e = (event_t*)event;
- LF_PRINT_DEBUG("time: " PRINTF_TIME ", trigger: %p, token: %p", e->time, (void*)e->trigger, (void*)e->token);
-}
-
-// ********** Priority Queue Support End
-#endif
diff --git a/core/utils/pqueue_tag.c b/core/utils/pqueue_tag.c
index 9fe00653d..24899374b 100644
--- a/core/utils/pqueue_tag.c
+++ b/core/utils/pqueue_tag.c
@@ -25,17 +25,6 @@
*/
static pqueue_pri_t pqueue_tag_get_priority(void* element) { return (pqueue_pri_t)element; }
-/**
- * @brief Callback comparison function for the tag-based priority queue.
- * Return 0 if the first argument is less than second and 1 otherwise.
- * This function is of type pqueue_cmp_pri_f.
- * @param priority1 A pointer to a pqueue_tag_element_t, cast to pqueue_pri_t.
- * @param priority2 A pointer to a pqueue_tag_element_t, cast to pqueue_pri_t.
- */
-static int pqueue_tag_compare(pqueue_pri_t priority1, pqueue_pri_t priority2) {
- return (lf_tag_compare(((pqueue_tag_element_t*)priority1)->tag, ((pqueue_tag_element_t*)priority2)->tag) > 0);
-}
-
/**
* @brief Callback function to determine whether two elements are equivalent.
* Return 1 if the tags contained by given elements are identical, 0 otherwise.
@@ -75,11 +64,21 @@ static void pqueue_tag_print_element(void* element) {
//////////////////
// Functions defined in pqueue_tag.h.
+int pqueue_tag_compare(pqueue_pri_t priority1, pqueue_pri_t priority2) {
+ return (lf_tag_compare(((pqueue_tag_element_t*)priority1)->tag, ((pqueue_tag_element_t*)priority2)->tag));
+}
+
pqueue_tag_t* pqueue_tag_init(size_t initial_size) {
return (pqueue_tag_t*)pqueue_init(initial_size, pqueue_tag_compare, pqueue_tag_get_priority, pqueue_tag_get_position,
pqueue_tag_set_position, pqueue_tag_matches, pqueue_tag_print_element);
}
+pqueue_tag_t* pqueue_tag_init_customize(size_t initial_size, pqueue_cmp_pri_f cmppri, pqueue_eq_elem_f eqelem,
+ pqueue_print_entry_f prt) {
+ return (pqueue_tag_t*)pqueue_init(initial_size, cmppri, pqueue_tag_get_priority, pqueue_tag_get_position,
+ pqueue_tag_set_position, eqelem, prt);
+}
+
void pqueue_tag_free(pqueue_tag_t* q) {
for (size_t i = 1; i < q->size; i++) {
if (q->d[i] != NULL && ((pqueue_tag_element_t*)q->d[i])->is_dynamic) {
@@ -101,11 +100,14 @@ int pqueue_tag_insert_tag(pqueue_tag_t* q, tag_t t) {
}
pqueue_tag_element_t* pqueue_tag_find_with_tag(pqueue_tag_t* q, tag_t t) {
- // Create elements on the stack. These elements are only needed during
- // the duration of this function call, so putting them on the stack is OK.
+ // Create an element on the stack. This element is only needed during
+ // the duration of this function call, so putting it on the stack is OK.
pqueue_tag_element_t element = {.tag = t, .pos = 0, .is_dynamic = false};
- pqueue_tag_element_t forever = {.tag = FOREVER_TAG, .pos = 0, .is_dynamic = false};
- return pqueue_find_equal((pqueue_t*)q, (void*)&element, (pqueue_pri_t)&forever);
+ return pqueue_find_same_priority((pqueue_t*)q, (void*)&element);
+}
+
+pqueue_tag_element_t* pqueue_tag_find_equal_same_tag(pqueue_tag_t* q, pqueue_tag_element_t* e) {
+ return pqueue_find_equal_same_priority((pqueue_t*)q, (void*)e);
}
int pqueue_tag_insert_if_no_match(pqueue_tag_t* q, tag_t t) {
@@ -149,3 +151,5 @@ void pqueue_tag_remove_up_to(pqueue_tag_t* q, tag_t t) {
head = pqueue_tag_peek_tag(q);
}
}
+
+void pqueue_tag_dump(pqueue_tag_t* q) { pqueue_dump((pqueue_t*)q, pqueue_tag_print_element); }
diff --git a/include/core/environment.h b/include/core/environment.h
index 98753c6fb..a776dee95 100644
--- a/include/core/environment.h
+++ b/include/core/environment.h
@@ -73,9 +73,8 @@ typedef struct environment_t {
int id;
tag_t current_tag;
tag_t stop_tag;
- pqueue_t* event_q;
- pqueue_t* recycle_q;
- pqueue_t* next_q;
+ pqueue_tag_t* event_q;
+ pqueue_tag_t* recycle_q;
bool** is_present_fields;
int is_present_fields_size;
bool** is_present_fields_abbreviated;
diff --git a/include/core/lf_types.h b/include/core/lf_types.h
index 5598cf820..75a61e405 100644
--- a/include/core/lf_types.h
+++ b/include/core/lf_types.h
@@ -21,6 +21,7 @@
#include "modal_models/modes.h" // Modal model support
#include "utils/pqueue.h"
+#include "utils/pqueue_tag.h"
#include "lf_token.h"
#include "tag.h"
#include "vector.h"
@@ -195,15 +196,12 @@ typedef struct event_t event_t;
/** Event activation record to push onto the event queue. */
struct event_t {
- instant_t time; // Time of release.
- trigger_t* trigger; // Associated trigger, NULL if this is a dummy event.
- size_t pos; // Position in the priority queue.
- lf_token_t* token; // Pointer to the token wrapping the value.
- bool is_dummy; // Flag to indicate whether this event is merely a placeholder or an actual event.
+ pqueue_tag_element_t base; // Elements of pqueue_tag. It contains tag of release and position in the priority queue.
+ trigger_t* trigger; // Associated trigger, NULL if this is a dummy event.
+ lf_token_t* token; // Pointer to the token wrapping the value.
#ifdef FEDERATED
tag_t intended_tag; // The intended tag.
#endif
- event_t* next; // Pointer to the next event lined up in superdense time.
};
/**
diff --git a/include/core/reactor_common.h b/include/core/reactor_common.h
index fc1451a96..8086ed7db 100644
--- a/include/core/reactor_common.h
+++ b/include/core/reactor_common.h
@@ -195,16 +195,15 @@ void _lf_trigger_startup_reactions(environment_t* env);
void _lf_trigger_shutdown_reactions(environment_t* env);
/**
- * Create dummy events to be used as spacers in the event queue.
+ * @brief Create a dummy event with the specified tag.
+ *
+ * A dummy event is an event with no triggers that can be put on the event queue to trigger a tag advance to the
+ * specified tag.
* @param env Environment in which we are executing.
- * @param trigger The eventual event to be triggered.
- * @param time The logical time of that event.
- * @param next The event to place after the dummy events.
- * @param offset The number of dummy events to insert.
- * @return A pointer to the first dummy event.
+ * @param tag The tag of that event.
+ * @return A pointer to the dummy event.
*/
-event_t* _lf_create_dummy_events(environment_t* env, trigger_t* trigger, instant_t time, event_t* next,
- microstep_t offset);
+event_t* _lf_create_dummy_events(environment_t* env, tag_t tag);
/**
* @brief Schedule an event at a specific tag (time, microstep).
@@ -250,9 +249,9 @@ trigger_handle_t _lf_insert_reactions_for_trigger(environment_t* env, trigger_t*
* the current time, then increase the microstep. Otherwise, update the current
* time and set the microstep to zero.
* @param env The environment in which we are executing
- * @param next_time The time step to advance to.
+ * @param next_tag The tag step to advance to.
*/
-void _lf_advance_logical_time(environment_t* env, instant_t next_time);
+void _lf_advance_tag(environment_t* env, tag_t next_tag);
/**
* @brief Pop all events from event_q with tag equal to current tag.
diff --git a/include/core/utils/pqueue.h b/include/core/utils/pqueue.h
index e317acbcd..448b94462 100644
--- a/include/core/utils/pqueue.h
+++ b/include/core/utils/pqueue.h
@@ -2,10 +2,14 @@
* @file pqueue.h
* @author Marten Lohstroh
* @author Edward A. Lee
+ * @author Byeonggil Jun
* @copyright (c) 2020-2023, The University of California at Berkeley.
* License: BSD 2-clause
*
- * @brief Priority queue declarations for the event queue and reaction queue.
+ * @brief Priority queue definitions for queues where the priority is a number that can be compared with ordinary
+ * numerical comparisons.
+ *
+ * This is used for the reaction queue. The event queue uses a `tag_t` struct for its priority, so it cannot use this.
*/
#ifndef PQUEUE_H
@@ -27,13 +31,6 @@ int in_reverse_order(pqueue_pri_t thiz, pqueue_pri_t that);
*/
int in_no_particular_order(pqueue_pri_t thiz, pqueue_pri_t that);
-/**
- * Return 1 if the two events have the same trigger.
- * @param event1 A pointer to an event_t.
- * @param event2 A pointer to an event_t.
- */
-int event_matches(void* event1, void* event2);
-
/**
* Return 1 if the two arguments are identical pointers.
* @param a First argument.
@@ -41,13 +38,6 @@ int event_matches(void* event1, void* event2);
*/
int reaction_matches(void* a, void* b);
-/**
- * Report a priority equal to the time of the given event.
- * This is used for sorting pointers to event_t structs in the event queue.
- * @param a A pointer to an event_t.
- */
-pqueue_pri_t get_event_time(void* event);
-
/**
* Report a priority equal to the index of the given reaction.
* Used for sorting pointers to reaction_t structs in the
@@ -56,25 +46,12 @@ pqueue_pri_t get_event_time(void* event);
*/
pqueue_pri_t get_reaction_index(void* reaction_t);
-/**
- * Return the given event's position in the queue.
- * @param event A pointer to an event_t.
- */
-size_t get_event_position(void* event);
-
/**
* Return the given reaction's position in the queue.
* @param reaction A pointer to a reaction_t.
*/
size_t get_reaction_position(void* reaction);
-/**
- * Set the given event's position in the queue.
- * @param event A pointer to an event_t
- * @param pos The position.
- */
-void set_event_position(void* event, size_t pos);
-
/**
* Set the given reaction's position in the queue.
* @param event A pointer to a reaction_t.
@@ -89,11 +66,4 @@ void set_reaction_position(void* reaction, size_t pos);
*/
void print_reaction(void* reaction);
-/**
- * Print some information about the given event.
- * This only prints something if logging is set to DEBUG.
- * @param event A pointer to an event_t.
- */
-void print_event(void* event);
-
#endif /* PQUEUE_H */
diff --git a/include/core/utils/pqueue_base.h b/include/core/utils/pqueue_base.h
index 8c9fc8f2c..b913ab64f 100644
--- a/include/core/utils/pqueue_base.h
+++ b/include/core/utils/pqueue_base.h
@@ -30,6 +30,14 @@
* search for equal elements present in the queue; and
* - Removed capability to reassign priorities.
*
+ * Modified by Byeonggil Jun (Apr, 2024).
+ * Changes:
+ * - Made the pqueue_cmp_pri_f function return do the three-way comparison
+ * rather than the two-way comparison.
+ * - The changed pqueue_cmp_pri_f function is used to check the equality of
+ * two elements in the pqueue_find_equal_same_priority function.
+ * - Remove the pqueue_find_equal function.
+ *
* @brief Priority Queue function declarations used as a base for Lingua Franca priority queues.
*
* @{
@@ -81,7 +89,7 @@ typedef struct pqueue_t {
* @param n the initial estimate of the number of queue items for which memory
* should be preallocated
* @param cmppri The callback function to run to compare two elements
- * This callback should return 0 for 'lower' and non-zero
+ * This callback should return -1 for 'lower', 0 for 'same', and 1
* for 'higher', or vice versa if reverse priority is desired
* @param getpri the callback function to run to set a score to an element
* @param getpos the callback function to get the current element's position
@@ -140,23 +148,21 @@ void* pqueue_pop(pqueue_t* q);
void pqueue_empty_into(pqueue_t** dest, pqueue_t** src);
/**
- * Find the highest-ranking item with the same priority that matches the
- * supplied entry.
+ * Return an entry with the same priority as the specified entry or NULL if there is no such entry.
* @param q the queue
* @param e the entry to compare against
* @return NULL if no matching event has been found, otherwise the entry
*/
-void* pqueue_find_equal_same_priority(pqueue_t* q, void* e);
+void* pqueue_find_same_priority(pqueue_t* q, void* e);
/**
- * Find the highest-ranking item with priority up to and including the given
- * maximum priority that matches the supplied entry.
+ * Return an entry with the same priority (determined by `cmppri`) that matches the supplied entry (determined
+ * by `eqelem`) or `NULL` if there is no such entry.
* @param q the queue
* @param e the entry to compare against
- * @param max_priority the maximum priority to consider
* @return NULL if no matching event has been found, otherwise the entry
*/
-void* pqueue_find_equal(pqueue_t* q, void* e, pqueue_pri_t max_priority);
+void* pqueue_find_equal_same_priority(pqueue_t* q, void* e);
/**
* Remove an item from the queue.
diff --git a/include/core/utils/pqueue_tag.h b/include/core/utils/pqueue_tag.h
index d69de5e56..e06e074be 100644
--- a/include/core/utils/pqueue_tag.h
+++ b/include/core/utils/pqueue_tag.h
@@ -1,5 +1,5 @@
/**
- * @file tag_pqueue.h
+ * @file pqueue_tag.h
* @author Byeonggil Jun
* @author Edward A. Lee
* @copyright (c) 2023, The University of California at Berkeley
@@ -60,6 +60,16 @@ typedef struct {
*/
typedef pqueue_t pqueue_tag_t;
+/**
+ * @brief Callback comparison function for the tag-based priority queue.
+ * Return -1 if the first argument is less than second, 0 if the two arguments are the same,
+ * and 1 otherwise.
+ * This function is of type pqueue_cmp_pri_f.
+ * @param priority1 A pointer to a pqueue_tag_element_t, cast to pqueue_pri_t.
+ * @param priority2 A pointer to a pqueue_tag_element_t, cast to pqueue_pri_t.
+ */
+int pqueue_tag_compare(pqueue_pri_t priority1, pqueue_pri_t priority2);
+
/**
* @brief Create a priority queue sorted by tags.
*
@@ -69,6 +79,20 @@ typedef pqueue_t pqueue_tag_t;
*/
pqueue_tag_t* pqueue_tag_init(size_t initial_size);
+/**
+ * @brief Create a priority queue that stores elements with a particular payload.
+ *
+ * @param cmppri the callback function to compare priorities
+ * @param eqelem the callback function to check equivalence of payloads.
+ * @param prt the callback function to print elements
+ *
+ * The elements of the priority queue will be of type pqueue_tag_element_t.
+ * The caller should call pqueue_tag_free() when finished with the queue.
+ * @return A dynamically allocated priority queue or NULL if memory allocation fails.
+ */
+pqueue_tag_t* pqueue_tag_init_customize(size_t initial_size, pqueue_cmp_pri_f cmppri, pqueue_eq_elem_f eqelem,
+ pqueue_print_entry_f prt);
+
/**
* @brief Free all memory used by the queue including elements that are marked dynamic.
*
@@ -124,6 +148,15 @@ int pqueue_tag_insert_if_no_match(pqueue_tag_t* q, tag_t t);
*/
pqueue_tag_element_t* pqueue_tag_find_with_tag(pqueue_tag_t* q, tag_t t);
+/**
+ * @brief Return an item with the same tag (`cmppri` returns 0) that matches the supplied element
+ * (`eqelem` returns non-zero) or NULL if there is none.
+ * @param q The queue.
+ * @param e The element.
+ * @return An entry with the specified tag or NULL if there isn't one.
+ */
+pqueue_tag_element_t* pqueue_tag_find_equal_same_tag(pqueue_tag_t* q, pqueue_tag_element_t* e);
+
/**
* @brief Return highest-ranking item (the one with the least tag) without removing it.
* @param q The queue.
@@ -175,4 +208,12 @@ void pqueue_tag_remove(pqueue_tag_t* q, pqueue_tag_element_t* e);
*/
void pqueue_tag_remove_up_to(pqueue_tag_t* q, tag_t t);
+/**
+ * Dump the queue and it's internal structure.
+ * @internal
+ * debug function only
+ * @param q the queue
+ */
+void pqueue_tag_dump(pqueue_tag_t* q);
+
#endif // PQUEUE_TAG_H
diff --git a/lib/schedule.c b/lib/schedule.c
index 8d237ce55..5aa9fd528 100644
--- a/lib/schedule.c
+++ b/lib/schedule.c
@@ -149,17 +149,14 @@ trigger_handle_t lf_schedule_trigger(environment_t* env, trigger_t* trigger, int
if (!trigger->is_timer) {
delay += trigger->offset;
}
- tag_t intended_tag = (tag_t){.time = env->current_tag.time + delay, .microstep = 0};
+ tag_t intended_tag = lf_delay_tag(env->current_tag, delay);
- LF_PRINT_DEBUG("lf_schedule_trigger: env->current_tag.time = " PRINTF_TIME ". Total logical delay = " PRINTF_TIME "",
- env->current_tag.time, delay);
+ LF_PRINT_DEBUG("lf_schedule_trigger: env->current_tag = " PRINTF_TAG ". Total logical delay = " PRINTF_TIME "",
+ env->current_tag.time, env->current_tag.microstep, delay);
interval_t min_spacing = trigger->period;
event_t* e = lf_get_new_event(env);
- // Initialize the next pointer.
- e->next = NULL;
-
// Set the payload.
e->token = token;
@@ -173,6 +170,7 @@ trigger_handle_t lf_schedule_trigger(environment_t* env, trigger_t* trigger, int
if (trigger->is_physical) {
// Get the current physical time and assign it as the intended time.
intended_tag.time = lf_time_physical() + delay;
+ intended_tag.microstep = 0;
} else {
// FIXME: We need to verify that we are executing within a reaction?
// See reactor_threaded.
@@ -198,18 +196,17 @@ trigger_handle_t lf_schedule_trigger(environment_t* env, trigger_t* trigger, int
e->intended_tag = trigger->intended_tag;
#endif
- // Check for conflicts (a queued event with the same trigger and time).
+ // Check for conflicts (a queued event with the same trigger and tag).
if (min_spacing <= 0) {
// No minimum spacing defined.
- e->time = intended_tag.time;
- event_t* found = (event_t*)pqueue_find_equal_same_priority(env->event_q, e);
+ e->base.tag = intended_tag;
+ event_t* found = (event_t*)pqueue_tag_find_equal_same_tag(env->event_q, (pqueue_tag_element_t*)e);
// Check for conflicts. Let events pile up in super dense time.
if (found != NULL) {
- intended_tag.microstep++;
- // Skip to the last node in the linked list.
- while (found->next != NULL) {
- found = found->next;
+ while (found != NULL) {
intended_tag.microstep++;
+ e->base.tag = intended_tag;
+ found = (event_t*)pqueue_tag_find_equal_same_tag(env->event_q, (pqueue_tag_element_t*)e);
}
if (lf_is_tag_after_stop_tag(env, intended_tag)) {
LF_PRINT_DEBUG("Attempt to schedule an event after stop_tag was rejected.");
@@ -218,9 +215,8 @@ trigger_handle_t lf_schedule_trigger(environment_t* env, trigger_t* trigger, int
lf_recycle_event(env, e);
return 0;
}
- // Hook the event into the list.
- found->next = e;
trigger->last_tag = intended_tag;
+ pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e);
return (0); // FIXME: return value
}
// If there are not conflicts, schedule as usual. If intended time is
@@ -248,14 +244,13 @@ trigger_handle_t lf_schedule_trigger(environment_t* env, trigger_t* trigger, int
return (0);
case replace:
LF_PRINT_DEBUG("Policy is replace. Replacing the previous event.");
- // If the event with the previous time is still on the event
+ // If the event with the previous tag is still on the event
// queue, then replace the token. To find this event, we have
// to construct a dummy event_t struct.
event_t* dummy = lf_get_new_event(env);
- dummy->next = NULL;
dummy->trigger = trigger;
- dummy->time = trigger->last_tag.time;
- event_t* found = (event_t*)pqueue_find_equal_same_priority(env->event_q, dummy);
+ dummy->base.tag = trigger->last_tag;
+ event_t* found = (event_t*)pqueue_tag_find_equal_same_tag(env->event_q, (pqueue_tag_element_t*)dummy);
if (found != NULL) {
// Recycle the existing token and the new event
@@ -292,16 +287,20 @@ trigger_handle_t lf_schedule_trigger(environment_t* env, trigger_t* trigger, int
intended_tag.time = env->current_tag.time;
}
#endif
+ if (lf_tag_compare(intended_tag, env->current_tag) == 0) {
+ // Increment microstep.
+ intended_tag.microstep++;
+ }
// Set the tag of the event.
- e->time = intended_tag.time;
-
- // Do not schedule events if if the event time is past the stop time
- // (current microsteps are checked earlier).
- LF_PRINT_DEBUG("Comparing event with elapsed time " PRINTF_TIME " against stop time " PRINTF_TIME ".",
- e->time - lf_time_start(), env->stop_tag.time - lf_time_start());
- if (e->time > env->stop_tag.time) {
- LF_PRINT_DEBUG("lf_schedule_trigger: event time is past the timeout. Discarding event.");
+ e->base.tag = intended_tag;
+
+ // Do not schedule events if the event time is past the stop tag.
+ LF_PRINT_DEBUG("Comparing event with elapsed tag " PRINTF_TAG " against stop tag " PRINTF_TAG ".",
+ e->base.tag.time - lf_time_start(), e->base.tag.microstep, env->stop_tag.time - lf_time_start(),
+ env->stop_tag.microstep);
+ if (lf_is_tag_after_stop_tag(env, intended_tag)) {
+ LF_PRINT_DEBUG("lf_schedule_trigger: event tag is past the timeout. Discarding event.");
_lf_done_using(token);
lf_recycle_event(env, e);
return (0);
@@ -312,16 +311,11 @@ trigger_handle_t lf_schedule_trigger(environment_t* env, trigger_t* trigger, int
trigger->last_tag = intended_tag;
// Queue the event.
- // NOTE: There is no need for an explicit microstep because
- // when this is called, all events at the current tag
- // (time and microstep) have been pulled from the queue,
- // and any new events added at this tag will go into the reaction_q
- // rather than the event_q, so anything put in the event_q with this
- // same time will automatically be executed at the next microstep.
- LF_PRINT_LOG("Inserting event in the event queue with elapsed time " PRINTF_TIME ".", e->time - lf_time_start());
- pqueue_insert(env->event_q, e);
-
- tracepoint_schedule(env, trigger, e->time - env->current_tag.time);
+ LF_PRINT_LOG("Inserting event in the event queue with elapsed tag " PRINTF_TAG ".",
+ e->base.tag.time - lf_time_start(), e->base.tag.microstep);
+ pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)e);
+
+ tracepoint_schedule(env, trigger, e->base.tag.time - env->current_tag.time);
// FIXME: make a record of handle and implement unschedule.
// NOTE: Rather than wrapping around to get a negative number,