Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactoring of event queue #390

Merged
merged 20 commits into from
Apr 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
898fa1c
Use pqueue_tag instead of pqueue for event_q and simplfy scehduling n…
byeonggiljun Mar 8, 2024
52b5bbc
Schedule events with appropriate tag
byeonggiljun Mar 12, 2024
8e6226e
Apply clang-format
byeonggiljun Mar 12, 2024
733f81b
Merge branch 'main' into event-queue-refactoring
byeonggiljun Mar 12, 2024
31a5b1b
Let pqueue_tag be able to compare payloads of elements
byeonggiljun Mar 12, 2024
1d4d05c
Allow the priority comparison function to return 0 if received priori…
byeonggiljun Mar 12, 2024
2ccd7cb
Cleanup codes
byeonggiljun Mar 12, 2024
ca62f29
Add a function for finding an element in a priority queue with the pr…
byeonggiljun Mar 13, 2024
64f21c4
Merge branch 'main' into event-queue-refactoring
byeonggiljun Mar 13, 2024
d25282e
Merge branch 'main' into event-queue-refactoring
byeonggiljun Apr 1, 2024
7fa757a
Allow the mac default test
byeonggiljun Apr 1, 2024
a3584f3
Correctly pile up super dense time
byeonggiljun Apr 3, 2024
0200a84
Revert change to CI and minor fix
byeonggiljun Apr 3, 2024
1bd320f
Update comments to align with the code
byeonggiljun Apr 10, 2024
5ec3325
Apply suggestions from code review
byeonggiljun Apr 11, 2024
3b50275
Add an authorship and remove unused functions
byeonggiljun Apr 11, 2024
4b8d42d
Make `pqueue_tag_init_customize` more flexible and remove some callback
byeonggiljun Apr 11, 2024
5c966b5
Minor fix
byeonggiljun Apr 11, 2024
739b696
Remove the `is_dummy` flag from the `event_q` structure
byeonggiljun Apr 11, 2024
d10375d
Move some callback functions for the event queue to environment.c
byeonggiljun Apr 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 46 additions & 21 deletions core/environment.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,31 @@
#include "scheduler.h"
#endif

//////////////////
// Local functions, not intended for use outside this file.

/**
* @brief Callback function to determine whether two events have the same trigger.
* This function is used by event queue and recycle.
* Return 1 if the triggers are identical, 0 otherwise.
* @param event1 A pointer to an event.
* @param event2 A pointer to an event.
*/
static int event_matches(void* event1, void* event2) {
return (((event_t*)event1)->trigger == ((event_t*)event2)->trigger);
}

/**
* @brief Callback function to print information about an event.
* This function is used by event queue and recycle.
* @param element A pointer to an event.
*/
static void print_event(void* event) {
event_t* e = (event_t*)event;
LF_PRINT_DEBUG("tag: " PRINTF_TAG ", trigger: %p, token: %p", e->base.tag.time, e->base.tag.microstep,
(void*)e->trigger, (void*)e->token);
}

Comment on lines +41 to +65
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These function definition seem a little out-of-place here in environment.c why not have them in pqueue.c or pqueue_tag.c?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rationale for this is to avoid creating unnecessary dependencies.
Neither pqueue.c nor pqueue_tag.c have any dependence on event_t.
They are more generic. It is in evironment.c that pqueue_tag is adapted to contain items of type event_t. The way it does that is by providing these two functions as arguments to pqueue_tag_init_customize.
These functions are made static, so this design of the event queue is entirely defined in environment.c.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I see, thanks for the explanation.

/**
* @brief Initialize the threaded part of the environment struct.
*/
Expand All @@ -58,6 +83,7 @@ static void environment_init_threaded(environment_t* env, int num_workers) {
(void)num_workers;
#endif
}

/**
* @brief Initialize the single-threaded-specific parts of the environment struct.
*/
Expand Down Expand Up @@ -127,18 +153,6 @@ static void environment_init_federated(environment_t* env, int num_is_present_fi
#endif
}

void environment_init_tags(environment_t* env, instant_t start_time, interval_t duration) {
env->current_tag = (tag_t){.time = start_time, .microstep = 0u};

tag_t stop_tag = FOREVER_TAG_INITIALIZER;
if (duration >= 0LL) {
// A duration has been specified. Calculate the stop time.
stop_tag.time = env->current_tag.time + duration;
stop_tag.microstep = 0;
}
env->stop_tag = stop_tag;
}

static void environment_free_threaded(environment_t* env) {
#if !defined(LF_SINGLE_THREADED)
free(env->thread_ids);
Expand Down Expand Up @@ -176,6 +190,9 @@ static void environment_free_federated(environment_t* env) {
#endif
}

//////////////////
// Functions defined in environment.h.

void environment_free(environment_t* env) {
free(env->name);
free(env->timer_triggers);
Expand All @@ -184,16 +201,27 @@ void environment_free(environment_t* env) {
free(env->reset_reactions);
free(env->is_present_fields);
free(env->is_present_fields_abbreviated);
pqueue_free(env->event_q);
pqueue_free(env->recycle_q);
pqueue_free(env->next_q);
pqueue_tag_free(env->event_q);
pqueue_tag_free(env->recycle_q);

environment_free_threaded(env);
environment_free_single_threaded(env);
environment_free_modes(env);
environment_free_federated(env);
}

void environment_init_tags(environment_t* env, instant_t start_time, interval_t duration) {
env->current_tag = (tag_t){.time = start_time, .microstep = 0u};

tag_t stop_tag = FOREVER_TAG_INITIALIZER;
if (duration >= 0LL) {
// A duration has been specified. Calculate the stop time.
stop_tag.time = env->current_tag.time + duration;
stop_tag.microstep = 0;
}
env->stop_tag = stop_tag;
}

int environment_init(environment_t* env, const char* name, int id, int num_workers, int num_timers,
int num_startup_reactions, int num_shutdown_reactions, int num_reset_reactions,
int num_is_present_fields, int num_modes, int num_state_resets, int num_watchdogs,
Expand Down Expand Up @@ -261,12 +289,9 @@ int environment_init(environment_t* env, const char* name, int id, int num_worke
env->_lf_handle = 1;

// Initialize our priority queues.
env->event_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_reverse_order, get_event_time, get_event_position,
set_event_position, event_matches, print_event);
env->recycle_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, get_event_time, get_event_position,
set_event_position, event_matches, print_event);
env->next_q = pqueue_init(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, get_event_time, get_event_position,
set_event_position, event_matches, print_event);
env->event_q = pqueue_tag_init_customize(INITIAL_EVENT_QUEUE_SIZE, pqueue_tag_compare, event_matches, print_event);
env->recycle_q =
pqueue_tag_init_customize(INITIAL_EVENT_QUEUE_SIZE, in_no_particular_order, event_matches, print_event);

// Initialize functionality depending on target properties.
environment_init_threaded(env, num_workers);
Expand Down
28 changes: 11 additions & 17 deletions core/federated/federate.c
Original file line number Diff line number Diff line change
Expand Up @@ -1255,9 +1255,6 @@ static void handle_provisional_tag_advance_grant() {
// (which it should be). Do not do this if the federate has not fully
// started yet.

instant_t dummy_event_time = PTAG.time;
microstep_t dummy_event_relative_microstep = PTAG.microstep;

if (lf_tag_compare(env->current_tag, PTAG) == 0) {
// The current tag can equal the PTAG if we are at the start time
// or if this federate has been able to advance time to the current
Expand All @@ -1281,22 +1278,18 @@ static void handle_provisional_tag_advance_grant() {
// Nothing more to do.
LF_MUTEX_UNLOCK(&env->mutex);
return;
} else if (PTAG.time == env->current_tag.time) {
// We now know env->current_tag < PTAG, but the times are equal.
// Adjust the microstep for scheduling the dummy event.
dummy_event_relative_microstep -= env->current_tag.microstep;
}
// We now know env->current_tag < PTAG.

if (dummy_event_time != FOREVER) {
// Schedule a dummy event at the specified time and (relative) microstep.
if (PTAG.time != FOREVER) {
// Schedule a dummy event at the specified tag.
LF_PRINT_DEBUG("At tag " PRINTF_TAG ", inserting into the event queue a dummy event "
"with time " PRINTF_TIME " and (relative) microstep " PRINTF_MICROSTEP ".",
env->current_tag.time - start_time, env->current_tag.microstep, dummy_event_time - start_time,
dummy_event_relative_microstep);
// Dummy event points to a NULL trigger and NULL real event.
event_t* dummy = _lf_create_dummy_events(env, NULL, dummy_event_time, NULL, dummy_event_relative_microstep);
pqueue_insert(env->event_q, dummy);
"with time " PRINTF_TIME " and microstep " PRINTF_MICROSTEP ".",
env->current_tag.time - start_time, env->current_tag.microstep, PTAG.time - start_time,
PTAG.microstep);
// Dummy event points to a NULL trigger.
event_t* dummy = _lf_create_dummy_events(env, PTAG);
pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)dummy);
}

LF_MUTEX_UNLOCK(&env->mutex);
Expand Down Expand Up @@ -2410,8 +2403,9 @@ tag_t lf_send_next_event_tag(environment_t* env, tag_t tag, bool wait_for_reply)
// Create a dummy event that will force this federate to advance time and subsequently
// enable progress for downstream federates. Increment the time by ADVANCE_MESSAGE_INTERVAL
// to prevent too frequent dummy events.
event_t* dummy = _lf_create_dummy_events(env, NULL, tag.time + ADVANCE_MESSAGE_INTERVAL, NULL, 0);
pqueue_insert(env->event_q, dummy);
tag_t dummy_event_tag = (tag_t){.time = tag.time + ADVANCE_MESSAGE_INTERVAL, .microstep = tag.microstep};
event_t* dummy = _lf_create_dummy_events(env, dummy_event_tag);
pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)dummy);
}

LF_PRINT_DEBUG("Inserted a dummy event for logical time " PRINTF_TIME ".", tag.time - lf_time_start());
Expand Down
25 changes: 7 additions & 18 deletions core/modal_models/modes.c
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

// Forward declaration of functions and variables supplied by reactor_common.c
void _lf_trigger_reaction(environment_t* env, reaction_t* reaction, int worker_number);
event_t* _lf_create_dummy_events(environment_t* env, trigger_t* trigger, instant_t time, event_t* next,
microstep_t offset);
event_t* _lf_create_dummy_events(environment_t* env, tag_t tag);

// ----------------------------------------------------------------------------

Expand Down Expand Up @@ -400,28 +399,17 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]
event->trigger != NULL) { // History transition to a different mode
// Remaining time that the event would have been waiting before mode was left
instant_t local_remaining_delay =
event->time -
event->base.tag.time -
(state->next_mode->deactivation_time != 0 ? state->next_mode->deactivation_time : lf_time_start());
tag_t current_logical_tag = env->current_tag;

// Reschedule event with original local delay
LF_PRINT_DEBUG("Modes: Re-enqueuing event with a suspended delay of " PRINTF_TIME
" (previous TTH: " PRINTF_TIME ", Mode suspended at: " PRINTF_TIME ").",
local_remaining_delay, event->time, state->next_mode->deactivation_time);
local_remaining_delay, event->base.tag.time, state->next_mode->deactivation_time);
tag_t schedule_tag = {.time = current_logical_tag.time + local_remaining_delay,
.microstep = (local_remaining_delay == 0 ? current_logical_tag.microstep + 1 : 0)};
_lf_schedule_at_tag(env, event->trigger, schedule_tag, event->token);

// Also schedule events stacked up in super dense time.
event_t* e = event;
while (e->next != NULL) {
schedule_tag.microstep++;
_lf_schedule_at_tag(env, e->next->trigger, schedule_tag, e->next->token);
event_t* tmp = e->next;
e = tmp->next;
// A fresh event was created by schedule, hence, recycle old one
lf_recycle_event(env, tmp);
}
}
// A fresh event was created by schedule, hence, recycle old one
lf_recycle_event(env, event);
Expand Down Expand Up @@ -490,7 +478,7 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]

// Retract all events from the event queue that are associated with now inactive modes
if (env->event_q != NULL) {
size_t q_size = pqueue_size(env->event_q);
size_t q_size = pqueue_tag_size(env->event_q);
if (q_size > 0) {
event_t** delayed_removal = (event_t**)calloc(q_size, sizeof(event_t*));
size_t delayed_removal_count = 0;
Expand All @@ -509,7 +497,7 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]
LF_PRINT_DEBUG("Modes: Pulling %zu events from the event queue to suspend them. %d events are now suspended.",
delayed_removal_count, _lf_suspended_events_num);
for (size_t i = 0; i < delayed_removal_count; i++) {
pqueue_remove(env->event_q, delayed_removal[i]);
pqueue_tag_remove(env->event_q, (pqueue_tag_element_t*)(delayed_removal[i]));
}

free(delayed_removal);
Expand All @@ -519,7 +507,8 @@ void _lf_process_mode_changes(environment_t* env, reactor_mode_state_t* states[]
if (env->modes->triggered_reactions_request) {
// Insert a dummy event in the event queue for the next microstep to make
// sure startup/reset reactions (if any) are triggered as soon as possible.
pqueue_insert(env->event_q, _lf_create_dummy_events(env, NULL, env->current_tag.time, NULL, 1));
tag_t dummy_event_tag = (tag_t){.time = env->current_tag.time, .microstep = 1};
pqueue_tag_insert(env->event_q, (pqueue_tag_element_t*)_lf_create_dummy_events(env, dummy_event_tag));
}
}
}
Expand Down
18 changes: 6 additions & 12 deletions core/reactor.c
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ int next(environment_t* env) {
// Enter the critical section and do not leave until we have
// determined which tag to commit to and start invoking reactions for.
LF_CRITICAL_SECTION_ENTER(env);
event_t* event = (event_t*)pqueue_peek(env->event_q);
event_t* event = (event_t*)pqueue_tag_peek(env->event_q);
// pqueue_dump(event_q, event_q->prt);
// If there is no next event and -keepalive has been specified
// on the command line, then we will wait the maximum time possible.
Expand All @@ -231,24 +231,18 @@ int next(environment_t* env) {
lf_set_stop_tag(env, (tag_t){.time = env->current_tag.time, .microstep = env->current_tag.microstep + 1});
}
} else {
next_tag.time = event->time;
// Deduce the microstep
if (next_tag.time == env->current_tag.time) {
next_tag.microstep = env->current_tag.microstep + 1;
} else {
next_tag.microstep = 0;
}
next_tag = event->base.tag;
}

if (lf_is_tag_after_stop_tag(env, next_tag)) {
// Cannot process events after the stop tag.
next_tag = env->stop_tag;
}

LF_PRINT_LOG("Next event (elapsed) time is " PRINTF_TIME ".", next_tag.time - start_time);
LF_PRINT_LOG("Next event (elapsed) tag is " PRINTF_TAG ".", next_tag.time - start_time, next_tag.microstep);
// Wait until physical time >= event.time.
int finished_sleep = wait_until(env, next_tag.time);
LF_PRINT_LOG("Next event (elapsed) time is " PRINTF_TIME ".", next_tag.time - start_time);
LF_PRINT_LOG("Next event (elapsed) tag is " PRINTF_TAG ".", next_tag.time - start_time, next_tag.microstep);
if (finished_sleep != 0) {
LF_PRINT_DEBUG("***** wait_until was interrupted.");
// Sleep was interrupted. This could happen when a physical action
Expand All @@ -258,10 +252,10 @@ int next(environment_t* env) {
LF_CRITICAL_SECTION_EXIT(env);
return 1;
}
// Advance current time to match that of the first event on the queue.
// Advance current tag to match that of the first event on the queue.
// We can now leave the critical section. Any events that will be added
// to the queue asynchronously will have a later tag than the current one.
_lf_advance_logical_time(env, next_tag.time);
_lf_advance_tag(env, next_tag);

// Trigger shutdown reactions if appropriate.
if (lf_tag_compare(env->current_tag, env->stop_tag) >= 0) {
Expand Down
Loading
Loading