Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring of event queue #390

Merged
merged 20 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
898fa1c
Use pqueue_tag instead of pqueue for event_q and simplfy scehduling n…
byeonggiljun Mar 8, 2024
52b5bbc
Schedule events with appropriate tag
byeonggiljun Mar 12, 2024
8e6226e
Apply clang-format
byeonggiljun Mar 12, 2024
733f81b
Merge branch 'main' into event-queue-refactoring
byeonggiljun Mar 12, 2024
31a5b1b
Let pqueue_tag be able to compare payloads of elements
byeonggiljun Mar 12, 2024
1d4d05c
Allow the priority comparison function to return 0 if received priori…
byeonggiljun Mar 12, 2024
2ccd7cb
Cleanup codes
byeonggiljun Mar 12, 2024
ca62f29
Add a function for finding an element in a priority queue with the pr…
byeonggiljun Mar 13, 2024
64f21c4
Merge branch 'main' into event-queue-refactoring
byeonggiljun Mar 13, 2024
d25282e
Merge branch 'main' into event-queue-refactoring
byeonggiljun Apr 1, 2024
7fa757a
Allow the mac default test
byeonggiljun Apr 1, 2024
a3584f3
Correctly pile up super dense time
byeonggiljun Apr 3, 2024
0200a84
Revert change to CI and minor fix
byeonggiljun Apr 3, 2024
1bd320f
Update comments to align with the code
byeonggiljun Apr 10, 2024
5ec3325
Apply suggestions from code review
byeonggiljun Apr 11, 2024
3b50275
Add an authorship and remove unused functions
byeonggiljun Apr 11, 2024
4b8d42d
Make `pqueue_tag_init_customize` more flexible and remove some callback
byeonggiljun Apr 11, 2024
5c966b5
Minor fix
byeonggiljun Apr 11, 2024
739b696
Remove the `is_dummy` flag from the `event_q` structure
byeonggiljun Apr 11, 2024
d10375d
Move some callback functions for the event queue to environment.c
byeonggiljun Apr 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 2 additions & 6 deletions core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -184,9 +184,8 @@ void environment_free(environment_t* env) {
free(env->reset_reactions);
free(env->is_present_fields);
free(env->is_present_fields_abbreviated);
pqueue_free(env->event_q);
pqueue_tag_free(env->event_q);
pqueue_free(env->recycle_q);
pqueue_free(env->next_q);

environment_free_threaded(env);
environment_free_single_threaded(env);
Expand Down Expand Up @@ -261,12 +260,9 @@ int environment_init(environment_t* env, const char* name, int id, int num_worke
env->_lf_handle = 1;

// Initialize our priority queues.
env->event_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_reverse_order, get_event_time, get_event_position,
set_event_position, event_matches, print_event);
env->event_q = pqueue_tag_init_customize(INITIAL_EVENT_QUEUE_SIZE, event_matches);
env->recycle_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, get_event_time, get_event_position,
set_event_position, event_matches, print_event);
env->next_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, get_event_time, get_event_position,
set_event_position, event_matches, print_event);

// Initialize functionality depending on target properties.
environment_init_threaded(env, num_workers);
Expand Down
28 changes: 11 additions & 17 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1255,9 +1255,6 @@ static void handle_provisional_tag_advance_grant() {
// (which it should be). Do not do this if the federate has not fully
// started yet.

instant_t dummy_event_time = PTAG.time;
microstep_t dummy_event_relative_microstep = PTAG.microstep;

if (lf_tag_compare(env->current_tag, PTAG) == 0) {
// The current tag can equal the PTAG if we are at the start time
// or if this federate has been able to advance time to the current
Expand All @@ -1281,22 +1278,18 @@ static void handle_provisional_tag_advance_grant() {
// Nothing more to do.
LF_MUTEX_UNLOCK(&env->mutex);
return;
} else if (PTAG.time == env->current_tag.time) {
// We now know env->current_tag < PTAG, but the times are equal.
// Adjust the microstep for scheduling the dummy event.
dummy_event_relative_microstep -= env->current_tag.microstep;
}
// We now know env->current_tag < PTAG.

if (dummy_event_time != FOREVER) {
// Schedule a dummy event at the specified time and (relative) microstep.
if (PTAG.time != FOREVER) {
// Schedule a dummy event at the specified tag.
LF_PRINT_DEBUG("At tag " PRINTF_TAG ", inserting into the event queue a dummy event "
"with time " PRINTF_TIME " and (relative) microstep " PRINTF_MICROSTEP ".",
env->current_tag.time - start_time, env->current_tag.microstep, dummy_event_time - start_time,
dummy_event_relative_microstep);
// Dummy event points to a NULL trigger and NULL real event.
event_t* dummy = _lf_create_dummy_events(env, NULL, dummy_event_time, NULL, dummy_event_relative_microstep);
pqueue_insert(env->event_q, dummy);
"with time " PRINTF_TIME " and microstep " PRINTF_MICROSTEP ".",
env->current_tag.time - start_time, env->current_tag.microstep, PTAG.time - start_time,
PTAG.microstep);
// Dummy event points to a NULL trigger.
event_t* dummy = _lf_create_dummy_events(env, NULL, PTAG);
pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)dummy);
}

LF_MUTEX_UNLOCK(&env->mutex);
Expand Down Expand Up @@ -2410,8 +2403,9 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply)
// Create a dummy event that will force this federate to advance time and subsequently
// enable progress for downstream federates. Increment the time by ADVANCE_MESSAGE_INTERVAL
// to prevent too frequent dummy events.
event_t* dummy = _lf_create_dummy_events(env, NULL, tag.time + ADVANCE_MESSAGE_INTERVAL, NULL, 0);
pqueue_insert(env->event_q, dummy);
tag_t dummy_event_tag = (tag_t){.time = tag.time + ADVANCE_MESSAGE_INTERVAL, .microstep = tag.microstep};
event_t* dummy = _lf_create_dummy_events(env, NULL, dummy_event_tag);
pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)dummy);
}

LF_PRINT_DEBUG("Inserted a dummy event for logical time " PRINTF_TIME ".", tag.time - lf_time_start());
Expand Down
25 changes: 7 additions & 18 deletions core/modal_models/modes.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

// Forward declaration of functions and variables supplied by reactor_common.c
void _lf_trigger_reaction(environment_t* env, reaction_t* reaction, int worker_number);
event_t* _lf_create_dummy_events(environment_t* env, trigger_t* trigger, instant_t time, event_t* next,
microstep_t offset);
event_t* _lf_create_dummy_events(environment_t* env, trigger_t* trigger, tag_t tag);

// ----------------------------------------------------------------------------

Expand Down Expand Up @@ -400,28 +399,17 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]
event->trigger != NULL) { // History transition to a different mode
// Remaining time that the event would have been waiting before mode was left
instant_t local_remaining_delay =
event->time -
event->base.tag.time -
(state->next_mode->deactivation_time != 0 ? state->next_mode->deactivation_time : lf_time_start());
tag_t current_logical_tag = env->current_tag;

// Reschedule event with original local delay
LF_PRINT_DEBUG("Modes: Re-enqueuing event with a suspended delay of " PRINTF_TIME
" (previous TTH: " PRINTF_TIME ", Mode suspended at: " PRINTF_TIME ").",
local_remaining_delay, event->time, state->next_mode->deactivation_time);
local_remaining_delay, event->base.tag.time, state->next_mode->deactivation_time);
tag_t schedule_tag = {.time = current_logical_tag.time + local_remaining_delay,
.microstep = (local_remaining_delay == 0 ? current_logical_tag.microstep + 1 : 0)};
_lf_schedule_at_tag(env, event->trigger, schedule_tag, event->token);

// Also schedule events stacked up in super dense time.
event_t* e = event;
while (e->next != NULL) {
schedule_tag.microstep++;
_lf_schedule_at_tag(env, e->next->trigger, schedule_tag, e->next->token);
event_t* tmp = e->next;
e = tmp->next;
// A fresh event was created by schedule, hence, recycle old one
lf_recycle_event(env, tmp);
}
}
// A fresh event was created by schedule, hence, recycle old one
lf_recycle_event(env, event);
Expand Down Expand Up @@ -490,7 +478,7 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]

// Retract all events from the event queue that are associated with now inactive modes
if (env->event_q != NULL) {
size_t q_size = pqueue_size(env->event_q);
size_t q_size = pqueue_tag_size(env->event_q);
if (q_size > 0) {
event_t** delayed_removal = (event_t**)calloc(q_size, sizeof(event_t*));
size_t delayed_removal_count = 0;
Expand All @@ -509,7 +497,7 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]
LF_PRINT_DEBUG("Modes: Pulling %zu events from the event queue to suspend them. %d events are now suspended.",
delayed_removal_count, _lf_suspended_events_num);
for (size_t i = 0; i < delayed_removal_count; i++) {
pqueue_remove(env->event_q, delayed_removal[i]);
pqueue_tag_remove(env->event_q, (pqueue_tag_element_t*)(delayed_removal[i]));
}

free(delayed_removal);
Expand All @@ -519,7 +507,8 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]
if (env->modes->triggered_reactions_request) {
// Insert a dummy event in the event queue for the next microstep to make
// sure startup/reset reactions (if any) are triggered as soon as possible.
pqueue_insert(env->event_q, _lf_create_dummy_events(env, NULL, env->current_tag.time, NULL, 1));
tag_t dummy_event_tag = (tag_t){.time = env->current_tag.time, .microstep = 1};
pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)_lf_create_dummy_events(env, NULL, dummy_event_tag));
}
}
}
Expand Down
18 changes: 6 additions & 12 deletions core/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ int next(environment_t* env) {
// Enter the critical section and do not leave until we have
// determined which tag to commit to and start invoking reactions for.
LF_CRITICAL_SECTION_ENTER(env);
event_t* event = (event_t*)pqueue_peek(env->event_q);
event_t* event = (event_t*)pqueue_tag_peek(env->event_q);
// pqueue_dump(event_q, event_q->prt);
// If there is no next event and -keepalive has been specified
// on the command line, then we will wait the maximum time possible.
Expand All @@ -231,24 +231,18 @@ int next(environment_t* env) {
lf_set_stop_tag(env, (tag_t){.time = env->current_tag.time, .microstep = env->current_tag.microstep + 1});
}
} else {
next_tag.time = event->time;
// Deduce the microstep
if (next_tag.time == env->current_tag.time) {
next_tag.microstep = env->current_tag.microstep + 1;
} else {
next_tag.microstep = 0;
}
next_tag = event->base.tag;
}

if (lf_is_tag_after_stop_tag(env, next_tag)) {
// Cannot process events after the stop tag.
next_tag = env->stop_tag;
}

LF_PRINT_LOG("Next event (elapsed) time is " PRINTF_TIME ".", next_tag.time - start_time);
LF_PRINT_LOG("Next event (elapsed) tag is " PRINTF_TAG ".", next_tag.time - start_time, next_tag.microstep);
// Wait until physical time >= event.time.
int finished_sleep = wait_until(env, next_tag.time);
LF_PRINT_LOG("Next event (elapsed) time is " PRINTF_TIME ".", next_tag.time - start_time);
LF_PRINT_LOG("Next event (elapsed) tag is " PRINTF_TAG ".", next_tag.time - start_time, next_tag.microstep);
if (finished_sleep != 0) {
LF_PRINT_DEBUG("***** wait_until was interrupted.");
// Sleep was interrupted. This could happen when a physical action
Expand All @@ -258,10 +252,10 @@ int next(environment_t* env) {
LF_CRITICAL_SECTION_EXIT(env);
return 1;
}
// Advance current time to match that of the first event on the queue.
// Advance current tag to match that of the first event on the queue.
// We can now leave the critical section. Any events that will be added
// to the queue asynchronously will have a later tag than the current one.
_lf_advance_logical_time(env, next_tag.time);
_lf_advance_tag(env, next_tag);

// Trigger shutdown reactions if appropriate.
if (lf_tag_compare(env->current_tag, env->stop_tag) >= 0) {
Expand Down
Loading
Loading