diff --git a/code/actors/src/consensus.rs b/code/actors/src/consensus.rs index a9a52132c..59549a214 100644 --- a/code/actors/src/consensus.rs +++ b/code/actors/src/consensus.rs @@ -647,7 +647,7 @@ where if state.connected_peers.len() == state.validator_set.count() - 1 { info!( - "Enough peers ({}) ready to start consensus", + "Enough peers ({}) connected to start consensus", state.connected_peers.len() ); @@ -663,7 +663,7 @@ where // TODO: pause/stop consensus, if necessary } - GossipEvent::Message(_from, _, data) => { + GossipEvent::Message(_, _, data) => { let msg = NetworkMsg::from_network_bytes(data).unwrap(); // FIXME let Some(msg_height) = msg.msg_height() else { diff --git a/code/actors/src/gossip.rs b/code/actors/src/gossip.rs index ee15d3fe5..f82d05f5f 100644 --- a/code/actors/src/gossip.rs +++ b/code/actors/src/gossip.rs @@ -1,3 +1,4 @@ +use std::collections::BTreeSet; use std::sync::Arc; use async_trait::async_trait; @@ -51,8 +52,7 @@ pub struct Args { pub enum State { Stopped, Running { - expected_peers: Vec, - peers: Vec, + peers: BTreeSet, subscribers: Vec>>, ctrl_handle: CtrlHandle, recv_task: JoinHandle<()>, @@ -95,8 +95,7 @@ impl Actor for Gossip { }); Ok(State::Running { - expected_peers: args.peer_ids, - peers: Vec::new(), + peers: BTreeSet::new(), subscribers: Vec::new(), ctrl_handle, recv_task, @@ -118,7 +117,6 @@ impl Actor for Gossip { state: &mut State, ) -> Result<(), ActorProcessingErr> { let State::Running { - expected_peers, peers, subscribers, ctrl_handle, @@ -132,11 +130,16 @@ impl Actor for Gossip { Msg::Subscribe(subscriber) => subscribers.push(subscriber), Msg::Broadcast(channel, data) => ctrl_handle.broadcast(channel, data).await?, Msg::NewEvent(event) => { - if let Event::PeerConnected(peer_id) = event { - if expected_peers.contains(&peer_id) { - peers.push(peer_id); + match &event { + Event::PeerConnected(peer_id) => { + peers.insert(*peer_id); } + Event::PeerDisconnected(peer_id) => { + peers.remove(peer_id); + } + _ => {} } + let event = Arc::new(event); for subscriber in subscribers { subscriber.cast(Arc::clone(&event))?; diff --git a/code/actors/src/node.rs b/code/actors/src/node.rs index 7fa3639ea..6ede0d1dd 100644 --- a/code/actors/src/node.rs +++ b/code/actors/src/node.rs @@ -86,15 +86,15 @@ where params.gossip_mempool, params.mempool, proposal_builder, - // params.start_height, + params.start_height, ); let actor = node.spawn().await?; Ok(actor) } +#[allow(dead_code)] pub struct Node { - #[allow(dead_code)] ctx: Ctx, cal: ActorRef>, gossip: ActorRef, @@ -102,7 +102,7 @@ pub struct Node { gossip_mempool: ActorRef, mempool: ActorRef, proposal_builder: ActorRef>, - // start_height: Ctx::Height, + start_height: Ctx::Height, } impl Node @@ -120,7 +120,7 @@ where gossip_mempool: ActorRef, mempool: ActorRef, proposal_builder: ActorRef>, - // start_height: Ctx::Height, + start_height: Ctx::Height, ) -> Self { Self { ctx, @@ -130,7 +130,7 @@ where gossip_mempool, mempool, proposal_builder, - // start_height, + start_height, } } diff --git a/code/common/src/height.rs b/code/common/src/height.rs index 2c9bef84e..c4722604e 100644 --- a/code/common/src/height.rs +++ b/code/common/src/height.rs @@ -7,18 +7,8 @@ use core::fmt::{Debug, Display}; /// A height of 0 represents a chain which has not yet produced a block. pub trait Height where - Self: Default - + Copy - + Clone - + Debug - + Display - + PartialEq - + Eq - + PartialOrd - + Ord - + Send - + Sync - + From, + Self: + Default + Copy + Clone + Debug + Display + PartialEq + Eq + PartialOrd + Ord + Send + Sync, { /// Increment the height by one. fn increment(&self) -> Self; diff --git a/code/test/src/height.rs b/code/test/src/height.rs index b8cb81bd5..6ff5b507c 100644 --- a/code/test/src/height.rs +++ b/code/test/src/height.rs @@ -28,12 +28,6 @@ impl fmt::Debug for Height { } } -impl From for Height { - fn from(height: u64) -> Self { - Self::new(height) - } -} - impl malachite_common::Height for Height { fn increment(&self) -> Self { Self(self.0 + 1)