Skip to content

Commit

Permalink
Automatically clean elapsed timeouts
Browse files Browse the repository at this point in the history
  • Loading branch information
romac committed Feb 21, 2024
1 parent f7e4dd4 commit a086a9e
Show file tree
Hide file tree
Showing 2 changed files with 79 additions and 24 deletions.
8 changes: 4 additions & 4 deletions code/common/src/timeout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,22 @@ pub struct Timeout {

impl Timeout {
/// Create a new timeout for the given round and step.
pub fn new(round: Round, step: TimeoutStep) -> Self {
pub const fn new(round: Round, step: TimeoutStep) -> Self {
Self { round, step }
}

/// Create a new timeout for the propose step of the given round.
pub fn propose(round: Round) -> Self {
pub const fn propose(round: Round) -> Self {
Self::new(round, TimeoutStep::Propose)
}

/// Create a new timeout for the prevote step of the given round.
pub fn prevote(round: Round) -> Self {
pub const fn prevote(round: Round) -> Self {
Self::new(round, TimeoutStep::Prevote)
}

/// Create a new timeout for the precommit step of the given round.
pub fn precommit(round: Round) -> Self {
pub const fn precommit(round: Round) -> Self {
Self::new(round, TimeoutStep::Precommit)
}
}
95 changes: 75 additions & 20 deletions code/node/src/timers.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;

use malachite_common::{Timeout, TimeoutStep};
use tokio::sync::mpsc;
use tokio::sync::Mutex; // TODO: Use parking_lot instead?
use tokio::task::JoinHandle;

#[derive(Copy, Clone, Debug)]
pub struct Config {
pub propose_timeout: Duration,
pub prevote_timeout: Duration,
Expand All @@ -13,8 +16,7 @@ pub struct Config {

pub struct Timers {
config: Config,
timeouts: HashMap<Timeout, JoinHandle<()>>,

timeouts: Arc<Mutex<HashMap<Timeout, JoinHandle<()>>>>,
timeout_elapsed: mpsc::Sender<Timeout>,
}

Expand All @@ -24,33 +26,39 @@ impl Timers {

let timers = Self {
config,
timeouts: HashMap::new(),
timeouts: Arc::new(Mutex::new(HashMap::new())),
timeout_elapsed: tx_timeout_elapsed,
};

(timers, rx_timeout_elapsed)
}

pub fn reset(&mut self) {
for (_, handle) in self.timeouts.drain() {
pub async fn reset(&mut self) {
for (_, handle) in self.timeouts.lock().await.drain() {
handle.abort();
}
}

pub async fn scheduled(&self) -> usize {
self.timeouts.lock().await.len()
}

pub async fn schedule_timeout(&mut self, timeout: Timeout) {
let tx = self.timeout_elapsed.clone();
let duration = self.timeout_duration(&timeout);

let timeouts = self.timeouts.clone();
let handle = tokio::spawn(async move {
tokio::time::sleep(duration).await;
timeouts.lock().await.remove(&timeout);
tx.send(timeout).await.unwrap();
});

self.timeouts.insert(timeout, handle);
self.timeouts.lock().await.insert(timeout, handle);
}

pub async fn cancel_timeout(&mut self, timeout: &Timeout) {
if let Some(handle) = self.timeouts.remove(timeout) {
if let Some(handle) = self.timeouts.lock().await.remove(timeout) {
handle.abort();
}
}
Expand All @@ -65,39 +73,86 @@ impl Timers {
}

#[cfg(test)]
#[allow(non_upper_case_globals)]
mod tests {
use malachite_common::Round;

use super::*;

#[tokio::test]
async fn test_timers() {
let config = Config {
propose_timeout: Duration::from_millis(100),
prevote_timeout: Duration::from_millis(200),
precommit_timeout: Duration::from_millis(300),
};
const config: Config = Config {
propose_timeout: Duration::from_millis(50),
prevote_timeout: Duration::from_millis(100),
precommit_timeout: Duration::from_millis(150),
};

const fn timeouts() -> (Timeout, Timeout, Timeout) {
let (r0, r1, r2) = (Round::new(0), Round::new(1), Round::new(2));
let (t0, t1, t2) = (

(
Timeout::new(r0, TimeoutStep::Propose),
Timeout::new(r1, TimeoutStep::Prevote),
Timeout::new(r2, TimeoutStep::Precommit),
);
)
}

#[tokio::test]
async fn timers_no_cancel() {
let (t0, t1, t2) = timeouts();

let (mut timers, mut rx_timeout_elapsed) = Timers::new(config);

timers.schedule_timeout(t1).await;
timers.schedule_timeout(t0).await;
timers.schedule_timeout(t2).await;
assert_eq!(timers.scheduled().await, 3);

assert_eq!(rx_timeout_elapsed.recv().await.unwrap(), t0);
assert_eq!(timers.scheduled().await, 2);
assert_eq!(rx_timeout_elapsed.recv().await.unwrap(), t1);
assert_eq!(timers.scheduled().await, 1);
assert_eq!(rx_timeout_elapsed.recv().await.unwrap(), t2);
assert_eq!(timers.scheduled().await, 0);
}

#[tokio::test]
async fn timers_cancel_first() {
let (t0, t1, t2) = timeouts();

let (mut timers, mut rx_timeout_elapsed) = Timers::new(config);

timers.schedule_timeout(t0).await;
timers.schedule_timeout(t1).await;
timers.schedule_timeout(t2).await;
assert_eq!(timers.scheduled().await, 3);

timers.cancel_timeout(&t0).await;
assert_eq!(timers.scheduled().await, 2);

assert_eq!(rx_timeout_elapsed.recv().await.unwrap(), t1);
assert_eq!(timers.scheduled().await, 1);

assert_eq!(rx_timeout_elapsed.recv().await.unwrap(), t2);
assert_eq!(timers.scheduled().await, 0);
}

#[tokio::test]
async fn timers_cancel_middle() {
let (t0, t1, t2) = timeouts();

let (mut timers, mut rx_timeout_elapsed) = Timers::new(config);

timers.schedule_timeout(t2).await;
timers.schedule_timeout(t1).await;
timers.schedule_timeout(t0).await;
assert_eq!(timers.scheduled().await, 3);

assert_eq!(rx_timeout_elapsed.recv().await.unwrap(), t0);
assert_eq!(timers.scheduled().await, 2);

timers.cancel_timeout(&t1).await;
assert_eq!(timers.scheduled().await, 1);

assert_eq!(
rx_timeout_elapsed.recv().await.unwrap(),
Timeout::new(r2, TimeoutStep::Precommit)
);
assert_eq!(rx_timeout_elapsed.recv().await.unwrap(), t2);
assert_eq!(timers.scheduled().await, 0);
}
}

0 comments on commit a086a9e

Please sign in to comment.