Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use an interval to drive schedule updates in announcer #30

Merged
merged 1 commit into from
May 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 10 additions & 15 deletions client/src/announcer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{
};
use chrono::{DateTime, Duration as ChronoDuration, FixedOffset, Utc};
use derive_builder::Builder;
use tokio::time::{Duration as TokioDuration, Instant};
use tokio::time::{Duration as TokioDuration, Interval};
use tracing::{debug, info, warn};

#[derive(Debug, Builder)]
Expand Down Expand Up @@ -65,30 +65,30 @@ impl From<&Event> for LastNotifiedEventMarker {

pub struct Announcer {
settings: AnnouncerSettings,

client: Client,

schedule: Schedule,
schedule_timestamp: Instant,

schedule_update_interval: Interval,
last_notified_event_marker: Option<LastNotifiedEventMarker>,
}

impl Announcer {
pub async fn new(settings: AnnouncerSettings, client: Client) -> crate::Result<Self> {
let (schedule, schedule_timestamp) = self::utils::get_sorted_schedule(&client).await?;
let schedule = self::utils::get_sorted_schedule(&client).await?;

let mut schedule_update_interval = tokio::time::interval(settings.schedule_refresh);
schedule_update_interval.reset();

Ok(Self {
settings,
client,
schedule,
schedule_timestamp,
schedule_update_interval,
last_notified_event_marker: None,
})
}

async fn update_schedule(&mut self) -> crate::Result<AnnouncerScheduleChanges> {
let (schedule, schedule_timestamp) = self::utils::get_sorted_schedule(&self.client).await?;
let schedule = self::utils::get_sorted_schedule(&self.client).await?;

let changes = if self.schedule == schedule {
debug!("No changes in new schedule");
Expand All @@ -99,17 +99,12 @@ impl Announcer {
};

self.schedule = schedule;
self.schedule_timestamp = schedule_timestamp;

Ok(changes)
}

pub async fn poll(&mut self) -> crate::Result<AnnouncerPollResult> {
loop {
// Calculate when the next schedule refresh is due
let next_schedule_update_time =
self.schedule_timestamp + self.settings.schedule_refresh;

// Determine what the next event to announce is and in how much time it is due to be announced
let next_event = self.get_next_event_to_announce();
let event_wait_time = match next_event {
Expand All @@ -125,13 +120,13 @@ impl Announcer {
// Wait for one of several things to happen...
tokio::select! {
// 1. The schedule is refreshed at the requested interval
_ = tokio::time::sleep_until(next_schedule_update_time) => {
_ = self.schedule_update_interval.tick() => {
match self.update_schedule().await {
Ok(changes) => {
return Ok(AnnouncerPollResult::ScheduleRefreshed(changes))
},
Err(e) => {
warn!("Failed to update schedule {e}")
warn!("Failed to update schedule: {e}")
},
}
}
Expand Down
9 changes: 3 additions & 6 deletions client/src/announcer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,13 @@ use crate::{
Client,
};
use chrono::{DateTime, Duration as ChronoDuration, FixedOffset};
use tokio::time::{Duration as TokioDuration, Instant};
use tokio::time::Duration as TokioDuration;
use tracing::warn;

pub(super) async fn get_sorted_schedule(client: &Client) -> crate::Result<(Schedule, Instant)> {
pub(super) async fn get_sorted_schedule(client: &Client) -> crate::Result<Schedule> {
let mut schedule = client.get_schedule().await?;
schedule.mutate(&Mutators::new_single(Box::new(SortedByStartTime {})));

let now = Instant::now();

Ok((schedule, now))
Ok(schedule)
}

pub(super) fn get_duration_before_event_notification(
Expand Down