From 0793781824dead5f78959eaa3e945bc64eeea632 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 4 Jun 2024 16:15:18 +0200 Subject: [PATCH] feat(code): Increase timeouts by the configured delta when they elapse --- code/actors/src/consensus.rs | 5 +- code/actors/src/timers.rs | 103 +++++++++++++++++++++++++++-------- 2 files changed, 83 insertions(+), 25 deletions(-) diff --git a/code/actors/src/consensus.rs b/code/actors/src/consensus.rs index c32c0d0e8..254725287 100644 --- a/code/actors/src/consensus.rs +++ b/code/actors/src/consensus.rs @@ -309,7 +309,7 @@ where ) -> Result<(), ractor::ActorProcessingErr> { match &input { DriverInput::NewRound(_, _, _) => { - state.timers.cast(TimersMsg::Reset)?; + state.timers.cast(TimersMsg::CancelAllTimeouts)?; } DriverInput::ProposeValue(round, _) => state @@ -646,7 +646,8 @@ where } Msg::MoveToHeight(height) => { - state.timers.cast(TimersMsg::Reset)?; + state.timers.cast(TimersMsg::CancelAllTimeouts)?; + state.timers.cast(TimersMsg::ResetTimeouts)?; let validator_set = self.get_validator_set(height).await?; state.driver.move_to_height(height, validator_set); diff --git a/code/actors/src/timers.rs b/code/actors/src/timers.rs index 3cdcb484a..1f4ad607b 100644 --- a/code/actors/src/timers.rs +++ b/code/actors/src/timers.rs @@ -19,8 +19,8 @@ impl TimeoutElapsed { } pub struct Timers { - config: Config, listener: ActorRef, + initial_config: Config, } impl Timers @@ -28,34 +28,50 @@ where M: From + ractor::Message, { pub async fn spawn( - config: Config, + initial_config: Config, listener: ActorRef, ) -> Result<(ActorRef, JoinHandle<()>), ractor::SpawnErr> { - Actor::spawn(None, Self { config, listener }, ()).await + Actor::spawn( + None, + Self { + listener, + initial_config, + }, + initial_config, + ) + .await } pub async fn spawn_linked( - config: Config, + initial_config: Config, listener: ActorRef, supervisor: ActorCell, ) -> Result<(ActorRef, JoinHandle<()>), ractor::SpawnErr> { - Actor::spawn_linked(None, Self { config, listener }, (), supervisor).await - } - - pub fn timeout_duration(&self, step: &TimeoutStep) -> Duration { - match step { - TimeoutStep::Propose => self.config.timeout_propose, - TimeoutStep::Prevote => self.config.timeout_prevote, - TimeoutStep::Precommit => self.config.timeout_precommit, - TimeoutStep::Commit => self.config.timeout_commit, - } + Actor::spawn_linked( + None, + Self { + listener, + initial_config, + }, + initial_config, + supervisor, + ) + .await } } pub enum Msg { + /// Schedule the given timeout ScheduleTimeout(Timeout), + + /// Cancel the given timeout CancelTimeout(Timeout), - Reset, + + /// Cancel all the timeouts + CancelAllTimeouts, + + /// Reset all timeouts values to their original values + ResetTimeouts, // Internal messages #[doc(hidden)] @@ -66,9 +82,41 @@ type TimerTask = JoinHandle>>; #[derive(Default)] pub struct State { + config: Config, timers: HashMap, } +impl State { + pub fn timeout_elapsed(&mut self, timeout: &Timeout) { + self.timers.remove(timeout); + self.increase_timeout(&timeout.step); + } + + pub fn increase_timeout(&mut self, step: &TimeoutStep) { + match step { + TimeoutStep::Propose => { + self.config.timeout_propose += self.config.timeout_propose_delta + } + TimeoutStep::Prevote => { + self.config.timeout_prevote += self.config.timeout_prevote_delta + } + TimeoutStep::Precommit => { + self.config.timeout_precommit += self.config.timeout_precommit_delta + } + TimeoutStep::Commit => (), + } + } + + pub fn timeout_duration(&self, step: &TimeoutStep) -> Duration { + match step { + TimeoutStep::Propose => self.config.timeout_propose, + TimeoutStep::Prevote => self.config.timeout_prevote, + TimeoutStep::Precommit => self.config.timeout_precommit, + TimeoutStep::Commit => self.config.timeout_commit, + } + } +} + #[async_trait] impl Actor for Timers where @@ -76,14 +124,17 @@ where { type Msg = Msg; type State = State; - type Arguments = (); + type Arguments = Config; async fn pre_start( &self, _myself: ActorRef, - _args: (), + config: Config, ) -> Result { - Ok(State::default()) + Ok(State { + config, + ..Default::default() + }) } async fn handle( @@ -94,7 +145,7 @@ where ) -> Result<(), ActorProcessingErr> { match msg { Msg::ScheduleTimeout(timeout) => { - let duration = self.timeout_duration(&timeout.step); + let duration = state.timeout_duration(&timeout.step); let task = send_after(duration, myself.get_cell(), move || { Msg::TimeoutElapsed(timeout) }); @@ -108,15 +159,21 @@ where } } - Msg::Reset => { + Msg::TimeoutElapsed(timeout) => { + state.timeout_elapsed(&timeout); + self.listener.cast(TimeoutElapsed(timeout).into())?; + } + + Msg::CancelAllTimeouts => { + // Cancel all the timers for (_, task) in state.timers.drain() { task.abort(); } } - Msg::TimeoutElapsed(timeout) => { - state.timers.remove(&timeout); - self.listener.cast(TimeoutElapsed(timeout).into())?; + Msg::ResetTimeouts => { + // Reset the timeouts to their original values + state.config = self.initial_config; } }