Skip to content

Commit

Permalink
fix: correct trusted peer excemptions (#13801)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattsse authored Jan 15, 2025
1 parent 83b2fb9 commit 4b8714d
Showing 1 changed file with 89 additions and 2 deletions.
91 changes: 89 additions & 2 deletions crates/net/network/src/peers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,10 +252,10 @@ impl PeersManager {
// we still want to limit concurrent pending connections
let max_inbound =
self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound);
if self.connection_info.num_pending_in <= max_inbound {
if self.connection_info.num_pending_in < max_inbound {
self.connection_info.inc_pending_in();
return Ok(())
}
return Ok(())
}

// all trusted peers are either connected or connecting
Expand Down Expand Up @@ -1659,6 +1659,93 @@ mod tests {
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
}

#[tokio::test]
async fn test_reject_incoming_at_pending_capacity_trusted_peers() {
let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2));
let trusted = PeerId::random();
peers.add_trusted_peer_id(trusted);

// connect the trusted peer
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008);
assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
peers.on_incoming_session_established(trusted, addr);

match event!(peers) {
PeerAction::PeerAdded(id) => {
assert_eq!(id, trusted);
}
_ => unreachable!(),
}

// saturate the remaining inbound slots with untrusted peers
let mut connected_untrusted_peer_ids = Vec::new();
for i in 0..(peers.connection_info.config.max_inbound - 1) {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008);
assert!(peers.on_incoming_pending_session(addr.ip()).is_ok());
let peer_id = PeerId::random();
peers.on_incoming_session_established(peer_id, addr);
connected_untrusted_peer_ids.push(peer_id);

match event!(peers) {
PeerAction::PeerAdded(id) => {
assert_eq!(id, peer_id);
}
_ => unreachable!(),
}
}

let mut pending_addrs = Vec::new();

// saturate available slots
for i in 0..2 {
let socket_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008);
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());

pending_addrs.push(socket_addr);
}

assert_eq!(peers.connection_info.num_pending_in, 2);

// try to handle additional incoming connections at capacity
for i in 0..2 {
let socket_addr =
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008);
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err());
}

let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError(
P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected(
DisconnectReason::UselessPeer,
)),
));

// Remove all pending peers
for pending_addr in pending_addrs {
peers.on_incoming_pending_session_dropped(pending_addr, &err);
}

println!("num_pending_in: {}", peers.connection_info.num_pending_in);

println!(
"num_inbound: {}, has_in_capacity: {}",
peers.connection_info.num_inbound,
peers.connection_info.has_in_capacity()
);

// disconnect a connected peer
peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]);

println!(
"num_inbound: {}, has_in_capacity: {}",
peers.connection_info.num_inbound,
peers.connection_info.has_in_capacity()
);

let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008);
assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok());
}

#[tokio::test]
async fn test_closed_incoming() {
let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);
Expand Down

0 comments on commit 4b8714d

Please sign in to comment.