Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

Commit

Permalink
fix(p2p): rework ticks of bootstrap query interval (#483)
Browse files Browse the repository at this point in the history
Signed-off-by: Simon Paitrault <simon.paitrault@gmail.com>
  • Loading branch information
Freyskeyd authored Mar 21, 2024
1 parent b2a1af0 commit 5b6ddb8
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 19 deletions.
2 changes: 1 addition & 1 deletion crates/topos-p2p/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub(crate) mod peer_info;
pub(crate) mod topos;

/// Represents the health status of a behaviour inside the p2p layer
#[derive(Default, PartialEq, Eq)]
#[derive(Debug, Default, PartialEq, Eq)]
pub(crate) enum HealthStatus {
#[default]
Initializing,
Expand Down
14 changes: 11 additions & 3 deletions crates/topos-p2p/src/behaviour/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,10 @@ impl DiscoveryBehaviour {
next_bootstrap_query: if known_peers.is_empty() {
None
} else {
Some(Box::pin(tokio::time::interval(config.bootstrap_interval)))
let mut interval = tokio::time::interval(config.bootstrap_interval);
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

Some(Box::pin(interval))
},
health_status: if known_peers.is_empty() {
HealthStatus::Healthy
Expand All @@ -105,9 +108,14 @@ impl DiscoveryBehaviour {
}

/// Change the interval of the next bootstrap queries
pub fn change_interval(&mut self, duration: Duration) -> Result<(), P2PError> {
pub async fn change_interval(&mut self, duration: Duration) -> Result<(), P2PError> {
if let Some(interval) = self.next_bootstrap_query.as_mut() {
interval.set(tokio::time::interval(duration));
let mut new_interval = tokio::time::interval(duration);
// Delay the next tick
new_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
// ignore first tick
_ = new_interval.tick().await;
interval.set(new_interval);
}

Ok(())
Expand Down
1 change: 1 addition & 0 deletions crates/topos-p2p/src/runtime/handle_event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ impl EventHandler<SwarmEvent<ComposedEvent>> for Runtime {
_ = self.event_sender.send(event).await;
}

info!("Healthystatus: {:?}", self.health_status);
Ok(())
}
}
46 changes: 32 additions & 14 deletions crates/topos-p2p/src/runtime/handle_event/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,34 @@ impl EventHandler<Box<Event>> for Runtime {
} if num_remaining == 0
&& self.swarm.behaviour().discovery.health_status == HealthStatus::Initializing =>
{
warn!(
"Bootstrap query finished but unable to connect to bootnode during \
initialization, switching to unhealthy and fast bootstrap mode",
);

let behaviour = self.swarm.behaviour_mut();

behaviour.discovery.health_status = HealthStatus::Unhealthy;
_ = behaviour
.discovery
.change_interval(self.config.discovery.fast_bootstrap_interval);
if self
.health_state
.successfully_connected_to_bootpeer
.is_none()
{
warn!(
"Bootstrap query finished but unable to connect to bootnode during \
initialization, switching from discovery(initializing) -> \
discover(unhealthy) and fast bootstrap mode",
);

let behaviour = self.swarm.behaviour_mut();

behaviour.discovery.health_status = HealthStatus::Unhealthy;
_ = behaviour
.discovery
.change_interval(self.config.discovery.fast_bootstrap_interval)
.await;
} else {
warn!(
"Bootstrap query finished with bootnode, switching from \
discovery(initializing) -> discovery(healthy)",
);

let behaviour = self.swarm.behaviour_mut();

behaviour.discovery.health_status = HealthStatus::Healthy;
}
}

Event::OutboundQueryProgressed {
Expand Down Expand Up @@ -107,15 +124,16 @@ impl EventHandler<Box<Event>> for Runtime {
&& self.swarm.behaviour().discovery.health_status == HealthStatus::Unhealthy =>
{
info!(
"Bootstrap query finished with bootnode, switching to healthy and normal \
bootstrap mode",
"Bootstrap query finished with bootnode, switching discover(unhealthy) -> \
discover(healthy) and normal bootstrap mode",
);

let behaviour = self.swarm.behaviour_mut();
behaviour.discovery.health_status = HealthStatus::Healthy;
_ = behaviour
.discovery
.change_interval(self.config.discovery.bootstrap_interval);
.change_interval(self.config.discovery.bootstrap_interval)
.await;
}

Event::OutboundQueryProgressed {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ async fn network_test() {
.bootstrap(&[cfg.clone(), boot_node.clone()], None)
.await
.unwrap();

use topos_core::api::grpc::shared::v1::Uuid as APIUuid;

let peer = boot_node.keypair.public().to_peer_id();
Expand Down

0 comments on commit 5b6ddb8

Please sign in to comment.