Skip to content

Commit

Permalink
Initial announcer implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
DanNixon committed May 11, 2024
1 parent d497402 commit 25b044a
Show file tree
Hide file tree
Showing 22 changed files with 738 additions and 37 deletions.
44 changes: 40 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ axum-extra = { version = "0.9.3", features = ["query"] }
chrono = { version = "0.4.38", features = ["serde"] }
clap = { version = "~4.4.0", features = ["derive", "env"] }
clap_complete = "~4.4.10"
derive_builder = "0.20.0"
emfcamp-schedule-api = { path = "./client/" }
metrics = "0.22.3"
metrics-exporter-prometheus = { version = "0.14.0", default-features = false, features = ["http-listener"] }
Expand All @@ -28,6 +29,7 @@ serde = { version = "1.0.200", features = ["derive"] }
serde_json = "1.0.116"
serde_with = "3.8.1"
termcolor = "1.4.1"
thiserror = "1.0.59"
tokio = { version = "1.37.0", features = ["rt-multi-thread", "macros"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
Expand Down
7 changes: 6 additions & 1 deletion client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ edition.workspace = true

[dependencies]
chrono.workspace = true
derive_builder.workspace = true
reqwest.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_with.workspace = true
thiserror.workspace = true
tokio.workspace = true
tracing.workspace = true
url.workspace = true

[dev-dependencies]
anyhow.workspace = true
tokio.workspace = true
axum.workspace = true
tracing-subscriber.workspace = true
26 changes: 26 additions & 0 deletions client/examples/announcer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use anyhow::Result;
use emfcamp_schedule_api::{
announcer::{Announcer, AnnouncerSettingsBuilder},
Client,
};
use url::Url;

#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt::init();

let url = Url::parse("https://www.emfcamp.org/schedule/2024.json")?;

let client = Client::new(url);

let settings = AnnouncerSettingsBuilder::default()
.schedule_refresh(tokio::time::Duration::from_secs(5))
.build()?;

let mut announcer = Announcer::new(settings, client).await?;

loop {
let event = announcer.poll().await;
println!("{:?}", event);
}
}
3 changes: 2 additions & 1 deletion client/examples/now_and_next.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use anyhow::Result;
use chrono::Local;
use emfcamp_schedule_api::Client;
use url::Url;

#[tokio::main]
async fn main() -> Result<()> {
let url = Url::parse("https://www.emfcamp.org/schedule/2024.json")?;

let client = emfcamp_schedule_api::Client::new(url);
let client = Client::new(url);

let schedule = client.get_schedule().await?;

Expand Down
3 changes: 2 additions & 1 deletion client/examples/schedule.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use anyhow::Result;
use emfcamp_schedule_api::Client;
use url::Url;

#[tokio::main]
async fn main() -> Result<()> {
let url = Url::parse("https://www.emfcamp.org/schedule/2024.json")?;

let client = emfcamp_schedule_api::Client::new(url);
let client = Client::new(url);

let schedule = client.get_schedule().await?;

Expand Down
3 changes: 2 additions & 1 deletion client/examples/venues.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use anyhow::Result;
use emfcamp_schedule_api::Client;
use url::Url;

#[tokio::main]
async fn main() -> Result<()> {
let url = Url::parse("https://www.emfcamp.org/schedule/2024.json")?;

let client = emfcamp_schedule_api::Client::new(url);
let client = Client::new(url);

let schedule = client.get_schedule().await?;

Expand Down
184 changes: 184 additions & 0 deletions client/src/announcer/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
#[cfg(test)]
mod test;
mod utils;

use crate::{
schedule::{event::Event, Schedule},
Client,
};
use chrono::{DateTime, Duration as ChronoDuration, FixedOffset, Utc};
use derive_builder::Builder;
use tokio::time::{Duration as TokioDuration, Instant};
use tracing::{debug, info, warn};

#[derive(Debug, Builder)]
#[builder(default)]
pub struct AnnouncerSettings {
schedule_refresh: TokioDuration,
event_start_offset: ChronoDuration,
}

impl Default for AnnouncerSettings {
fn default() -> Self {
Self {
schedule_refresh: TokioDuration::from_secs(60),
event_start_offset: ChronoDuration::zero(),
}
}
}

#[derive(Debug, PartialEq, Eq)]
pub enum AnnouncerScheduleChanges {
Changes,
NoChanges,
}

#[allow(clippy::large_enum_variant)]
#[derive(Debug, PartialEq, Eq)]
pub enum AnnouncerPollResult {
Event(Event),
NoMoreEvents,
ScheduleRefreshed(AnnouncerScheduleChanges),
}

/// A subset of fields from the last event that was notified.
/// Used to select the next event to be notified.
struct LastNotifiedEventMarker {
start: DateTime<FixedOffset>,
id: u32,
}

impl LastNotifiedEventMarker {
fn matches(&self, event: &Event) -> bool {
self.start == event.start && self.id == event.id
}
}

impl From<&Event> for LastNotifiedEventMarker {
fn from(value: &Event) -> Self {
Self {
start: value.start,
id: value.id,
}
}
}

pub struct Announcer {
settings: AnnouncerSettings,

client: Client,

schedule: Schedule,
schedule_timestamp: Instant,

last_notified_event_marker: Option<LastNotifiedEventMarker>,
}

impl Announcer {
pub async fn new(settings: AnnouncerSettings, client: Client) -> crate::Result<Self> {
let (schedule, schedule_timestamp) = self::utils::get_sorted_schedule(&client).await?;

Ok(Self {
settings,
client,
schedule,
schedule_timestamp,
last_notified_event_marker: None,
})
}

async fn update_schedule(&mut self) -> crate::Result<AnnouncerScheduleChanges> {
let (schedule, schedule_timestamp) = self::utils::get_sorted_schedule(&self.client).await?;

let changes = if self.schedule == schedule {
debug!("No changes in new schedule");
AnnouncerScheduleChanges::NoChanges
} else {
info!("New schedule is different from previously loaded");
AnnouncerScheduleChanges::Changes
};

self.schedule = schedule;
self.schedule_timestamp = schedule_timestamp;

Ok(changes)
}

pub async fn poll(&mut self) -> crate::Result<AnnouncerPollResult> {
loop {
// Calculate when the next schedule refresh is due
let next_schedule_update_time =
self.schedule_timestamp + self.settings.schedule_refresh;

// Determine what the next event to announce is and in how much time it is due to be announced
let next_event = self.get_next_event_to_announce();
let event_wait_time = match next_event {
Some(ref event) => self::utils::get_duration_before_event(
Utc::now().into(),
self.settings.event_start_offset,
event,
),
None => TokioDuration::from_secs(60),
};
debug!("Time to wait before next event: {:?}", event_wait_time);

// Wait for one of several things to happen...
tokio::select! {
// 1. The schedule is refreshed at the requested interval
_ = tokio::time::sleep_until(next_schedule_update_time) => {
match self.update_schedule().await {
Ok(changes) => {
return Ok(AnnouncerPollResult::ScheduleRefreshed(changes))
},
Err(e) => {
warn!("Failed to update schedule {e}")
},
}
}
// 2. The next event to be announced needs to be announced
_ = tokio::time::sleep(event_wait_time) => {
if let Some(event) = next_event {
self.update_event_marker(&event);
return Ok(AnnouncerPollResult::Event(event))
}
}
}
}
}

fn get_next_event_to_announce(&self) -> Option<Event> {
match &self.last_notified_event_marker {
Some(marker) => {
match self
.schedule
.events
.iter()
.position(|event| marker.matches(event))
{
Some(idx) => {
debug!("Matched last notified event marker, picking next in schedule as next to announce");
self.schedule.events.get(idx + 1).cloned()
}
None => {
debug!("Last notified event marker matched no events (something's fucky...), picking next chronological event as next to announce");
self.schedule
.events
.iter()
.find(|event| event.start > marker.start)
.cloned()
}
}
}
None => {
debug!(
"No last notified event marker, picking first in schedule as next to announce"
);
self.schedule.events.first().cloned()
}
}
}

fn update_event_marker(&mut self, event: &Event) {
self.last_notified_event_marker = Some(event.into());
}
}
Loading

0 comments on commit 25b044a

Please sign in to comment.