diff --git a/code/crates/app-channel/src/connector.rs b/code/crates/app-channel/src/connector.rs index c13b3657b..9f2425674 100644 --- a/code/crates/app-channel/src/connector.rs +++ b/code/crates/app-channel/src/connector.rs @@ -196,6 +196,14 @@ where reply_to.send(rx.await?)?; } + + HostMsg::PeerJoined { peer_id } => { + self.sender.send(AppMsg::PeerJoined { peer_id }).await?; + } + + HostMsg::PeerLeft { peer_id } => { + self.sender.send(AppMsg::PeerLeft { peer_id }).await?; + } }; Ok(()) diff --git a/code/crates/app-channel/src/msgs.rs b/code/crates/app-channel/src/msgs.rs index 4711abcc7..5d140c114 100644 --- a/code/crates/app-channel/src/msgs.rs +++ b/code/crates/app-channel/src/msgs.rs @@ -146,6 +146,18 @@ pub enum AppMsg { /// Channel for sending back the proposed value, if successfully decoded reply: Reply>, }, + + /// Notifies the application that a peer has joined the network. + PeerJoined { + /// The ID of the peer that joined + peer_id: PeerId, + }, + + /// Notifies the application that a peer has left the network. + PeerLeft { + /// The ID of the peer that left + peer_id: PeerId, + }, } /// Messages sent from the application to consensus. diff --git a/code/crates/engine/src/host.rs b/code/crates/engine/src/host.rs index dffe11321..5c13efa9d 100644 --- a/code/crates/engine/src/host.rs +++ b/code/crates/engine/src/host.rs @@ -76,7 +76,9 @@ pub enum HostMsg { }, /// Request the earliest block height in the block store - GetHistoryMinHeight { reply_to: RpcReplyPort }, + GetHistoryMinHeight { + reply_to: RpcReplyPort, + }, /// ProposalPart received <-- consensus <-- gossip ReceivedProposalPart { @@ -111,4 +113,12 @@ pub enum HostMsg { value_bytes: Bytes, reply_to: RpcReplyPort>, }, + + PeerJoined { + peer_id: PeerId, + }, + + PeerLeft { + peer_id: PeerId, + }, } diff --git a/code/crates/starknet/host/src/actor.rs b/code/crates/starknet/host/src/actor.rs index 47a3cbb51..c39866e7f 100644 --- a/code/crates/starknet/host/src/actor.rs +++ b/code/crates/starknet/host/src/actor.rs @@ -180,6 +180,15 @@ impl Host { value_bytes, reply_to, } => on_process_synced_value(value_bytes, height, round, validator_address, reply_to), + HostMsg::PeerJoined { peer_id } => { + debug!(%peer_id, "Peer joined the network"); + Ok(()) + } + + HostMsg::PeerLeft { peer_id } => { + debug!(%peer_id, "Peer left the network"); + Ok(()) + } } } } diff --git a/code/examples/channel/src/app.rs b/code/examples/channel/src/app.rs index 021758e76..a6729d8e1 100644 --- a/code/examples/channel/src/app.rs +++ b/code/examples/channel/src/app.rs @@ -219,6 +219,20 @@ pub async fn run( AppMsg::RestreamProposal { .. } => { error!("RestreamProposal not implemented"); } + + AppMsg::PeerJoined { peer_id } => { + info!(%peer_id, "Peer joined the network"); + + // You might want to track connected peers in your state + state.peers.insert(peer_id); + } + + AppMsg::PeerLeft { peer_id } => { + info!(%peer_id, "Peer left the network"); + + // Remove the peer from tracking + state.peers.remove(&peer_id); + } } } diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index b583578da..0be300e94 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -1,7 +1,7 @@ //! Internal state of the application. This is a simplified abstract to keep it simple. //! A regular application would have mempool implemented, a proper database and input methods like RPC. -use std::collections::{BTreeMap, HashMap}; +use std::collections::{BTreeMap, HashMap, HashSet}; use bytes::Bytes; use rand::rngs::StdRng; @@ -41,6 +41,7 @@ pub struct State { streams_map: PartStreamsMap, rng: StdRng, + pub peers: HashSet, } // Make up a seed for the rng based on our address in @@ -70,6 +71,7 @@ impl State { decided_values: BTreeMap::new(), streams_map: PartStreamsMap::new(), rng: StdRng::seed_from_u64(seed_from_address(&address)), + peers: HashSet::new(), } }