Skip to content

Commit

Permalink
feat(app): Add 1PeerJoined and PeerLeft` messages (#733)
Browse files Browse the repository at this point in the history
  • Loading branch information
hoank101 authored Jan 8, 2025
1 parent 9768451 commit 4005f40
Show file tree
Hide file tree
Showing 6 changed files with 57 additions and 2 deletions.
8 changes: 8 additions & 0 deletions code/crates/app-channel/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,14 @@ where

reply_to.send(rx.await?)?;
}

HostMsg::PeerJoined { peer_id } => {
self.sender.send(AppMsg::PeerJoined { peer_id }).await?;
}

HostMsg::PeerLeft { peer_id } => {
self.sender.send(AppMsg::PeerLeft { peer_id }).await?;
}
};

Ok(())
Expand Down
12 changes: 12 additions & 0 deletions code/crates/app-channel/src/msgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,18 @@ pub enum AppMsg<Ctx: Context> {
/// Channel for sending back the proposed value, if successfully decoded
reply: Reply<ProposedValue<Ctx>>,
},

/// Notifies the application that a peer has joined the network.
PeerJoined {
/// The ID of the peer that joined
peer_id: PeerId,
},

/// Notifies the application that a peer has left the network.
PeerLeft {
/// The ID of the peer that left
peer_id: PeerId,
},
}

/// Messages sent from the application to consensus.
Expand Down
12 changes: 11 additions & 1 deletion code/crates/engine/src/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ pub enum HostMsg<Ctx: Context> {
},

/// Request the earliest block height in the block store
GetHistoryMinHeight { reply_to: RpcReplyPort<Ctx::Height> },
GetHistoryMinHeight {
reply_to: RpcReplyPort<Ctx::Height>,
},

/// ProposalPart received <-- consensus <-- gossip
ReceivedProposalPart {
Expand Down Expand Up @@ -111,4 +113,12 @@ pub enum HostMsg<Ctx: Context> {
value_bytes: Bytes,
reply_to: RpcReplyPort<ProposedValue<Ctx>>,
},

PeerJoined {
peer_id: PeerId,
},

PeerLeft {
peer_id: PeerId,
},
}
9 changes: 9 additions & 0 deletions code/crates/starknet/host/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,15 @@ impl Host {
value_bytes,
reply_to,
} => on_process_synced_value(value_bytes, height, round, validator_address, reply_to),
HostMsg::PeerJoined { peer_id } => {
debug!(%peer_id, "Peer joined the network");
Ok(())
}

HostMsg::PeerLeft { peer_id } => {
debug!(%peer_id, "Peer left the network");
Ok(())
}
}
}
}
Expand Down
14 changes: 14 additions & 0 deletions code/examples/channel/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,20 @@ pub async fn run(
AppMsg::RestreamProposal { .. } => {
error!("RestreamProposal not implemented");
}

AppMsg::PeerJoined { peer_id } => {
info!(%peer_id, "Peer joined the network");

// You might want to track connected peers in your state
state.peers.insert(peer_id);
}

AppMsg::PeerLeft { peer_id } => {
info!(%peer_id, "Peer left the network");

// Remove the peer from tracking
state.peers.remove(&peer_id);
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion code/examples/channel/src/state.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Internal state of the application. This is a simplified abstract to keep it simple.
//! A regular application would have mempool implemented, a proper database and input methods like RPC.
use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};

use bytes::Bytes;
use rand::rngs::StdRng;
Expand Down Expand Up @@ -41,6 +41,7 @@ pub struct State {
streams_map: PartStreamsMap,

rng: StdRng,
pub peers: HashSet<PeerId>,
}

// Make up a seed for the rng based on our address in
Expand Down Expand Up @@ -70,6 +71,7 @@ impl State {
decided_values: BTreeMap::new(),
streams_map: PartStreamsMap::new(),
rng: StdRng::seed_from_u64(seed_from_address(&address)),
peers: HashSet::new(),
}
}

Expand Down

0 comments on commit 4005f40

Please sign in to comment.