Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(code): Increase timeouts by the configured delta when they elapse #207

Merged
merged 2 commits into from
Jun 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions code/actors/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ where
) -> Result<(), ractor::ActorProcessingErr> {
match &input {
DriverInput::NewRound(_, _, _) => {
state.timers.cast(TimersMsg::Reset)?;
state.timers.cast(TimersMsg::CancelAllTimeouts)?;
}

DriverInput::ProposeValue(round, _) => state
Expand Down Expand Up @@ -608,7 +608,8 @@ where
}

Msg::MoveToHeight(height) => {
state.timers.cast(TimersMsg::Reset)?;
state.timers.cast(TimersMsg::CancelAllTimeouts)?;
state.timers.cast(TimersMsg::ResetTimeouts)?;

let validator_set = self.get_validator_set(height).await?;
state.driver.move_to_height(height, validator_set);
Expand Down
103 changes: 80 additions & 23 deletions code/actors/src/timers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,43 +19,59 @@ impl TimeoutElapsed {
}

pub struct Timers<M> {
config: Config,
listener: ActorRef<M>,
initial_config: Config,
}

impl<M> Timers<M>
where
M: From<TimeoutElapsed> + ractor::Message,
{
pub async fn spawn(
config: Config,
initial_config: Config,
listener: ActorRef<M>,
) -> Result<(ActorRef<Msg>, JoinHandle<()>), ractor::SpawnErr> {
Actor::spawn(None, Self { config, listener }, ()).await
Actor::spawn(
None,
Self {
listener,
initial_config,
},
initial_config,
)
.await
}

pub async fn spawn_linked(
config: Config,
initial_config: Config,
listener: ActorRef<M>,
supervisor: ActorCell,
) -> Result<(ActorRef<Msg>, JoinHandle<()>), ractor::SpawnErr> {
Actor::spawn_linked(None, Self { config, listener }, (), supervisor).await
}

pub fn timeout_duration(&self, step: &TimeoutStep) -> Duration {
match step {
TimeoutStep::Propose => self.config.timeout_propose,
TimeoutStep::Prevote => self.config.timeout_prevote,
TimeoutStep::Precommit => self.config.timeout_precommit,
TimeoutStep::Commit => self.config.timeout_commit,
}
Actor::spawn_linked(
None,
Self {
listener,
initial_config,
},
initial_config,
supervisor,
)
.await
}
}

pub enum Msg {
/// Schedule the given timeout
ScheduleTimeout(Timeout),

/// Cancel the given timeout
CancelTimeout(Timeout),
Reset,

/// Cancel all the timeouts
CancelAllTimeouts,

/// Reset all timeouts values to their original values
ResetTimeouts,

// Internal messages
#[doc(hidden)]
Expand All @@ -66,24 +82,59 @@ type TimerTask = JoinHandle<Result<(), MessagingErr<Msg>>>;

#[derive(Default)]
pub struct State {
config: Config,
timers: HashMap<Timeout, TimerTask>,
}

impl State {
pub fn timeout_elapsed(&mut self, timeout: &Timeout) {
self.timers.remove(timeout);
self.increase_timeout(&timeout.step);
}

pub fn increase_timeout(&mut self, step: &TimeoutStep) {
match step {
TimeoutStep::Propose => {
self.config.timeout_propose += self.config.timeout_propose_delta
}
TimeoutStep::Prevote => {
self.config.timeout_prevote += self.config.timeout_prevote_delta
}
TimeoutStep::Precommit => {
self.config.timeout_precommit += self.config.timeout_precommit_delta
}
TimeoutStep::Commit => (),
}
}

pub fn timeout_duration(&self, step: &TimeoutStep) -> Duration {
match step {
TimeoutStep::Propose => self.config.timeout_propose,
TimeoutStep::Prevote => self.config.timeout_prevote,
TimeoutStep::Precommit => self.config.timeout_precommit,
TimeoutStep::Commit => self.config.timeout_commit,
}
}
}

#[async_trait]
impl<M> Actor for Timers<M>
where
M: From<TimeoutElapsed> + ractor::Message,
{
type Msg = Msg;
type State = State;
type Arguments = ();
type Arguments = Config;

async fn pre_start(
&self,
_myself: ActorRef<Msg>,
_args: (),
config: Config,
) -> Result<State, ActorProcessingErr> {
Ok(State::default())
Ok(State {
config,
..Default::default()
})
}

async fn handle(
Expand All @@ -94,7 +145,7 @@ where
) -> Result<(), ActorProcessingErr> {
match msg {
Msg::ScheduleTimeout(timeout) => {
let duration = self.timeout_duration(&timeout.step);
let duration = state.timeout_duration(&timeout.step);
let task = send_after(duration, myself.get_cell(), move || {
Msg::TimeoutElapsed(timeout)
});
Expand All @@ -108,15 +159,21 @@ where
}
}

Msg::Reset => {
Msg::TimeoutElapsed(timeout) => {
state.timeout_elapsed(&timeout);
self.listener.cast(TimeoutElapsed(timeout).into())?;
}

Msg::CancelAllTimeouts => {
// Cancel all the timers
for (_, task) in state.timers.drain() {
task.abort();
}
}

Msg::TimeoutElapsed(timeout) => {
state.timers.remove(&timeout);
self.listener.cast(TimeoutElapsed(timeout).into())?;
Msg::ResetTimeouts => {
// Reset the timeouts to their original values
state.config = self.initial_config;
}
}

Expand Down
Loading