From 4b8714d61a8dc1ca6d0573ac6f02920891dde68d Mon Sep 17 00:00:00 2001 From: Matthias Seitz Date: Wed, 15 Jan 2025 03:04:06 +0100 Subject: [PATCH] fix: correct trusted peer excemptions (#13801) --- crates/net/network/src/peers.rs | 91 ++++++++++++++++++++++++++++++++- 1 file changed, 89 insertions(+), 2 deletions(-) diff --git a/crates/net/network/src/peers.rs b/crates/net/network/src/peers.rs index f8d18e159946..a7e981b5890d 100644 --- a/crates/net/network/src/peers.rs +++ b/crates/net/network/src/peers.rs @@ -252,10 +252,10 @@ impl PeersManager { // we still want to limit concurrent pending connections let max_inbound = self.trusted_peer_ids.len().max(self.connection_info.config.max_inbound); - if self.connection_info.num_pending_in <= max_inbound { + if self.connection_info.num_pending_in < max_inbound { self.connection_info.inc_pending_in(); + return Ok(()) } - return Ok(()) } // all trusted peers are either connected or connecting @@ -1659,6 +1659,93 @@ mod tests { assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err()); } + #[tokio::test] + async fn test_reject_incoming_at_pending_capacity_trusted_peers() { + let mut peers = PeersManager::new(PeersConfig::test().with_max_inbound(2)); + let trusted = PeerId::random(); + peers.add_trusted_peer_id(trusted); + + // connect the trusted peer + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 0)), 8008); + assert!(peers.on_incoming_pending_session(addr.ip()).is_ok()); + peers.on_incoming_session_established(trusted, addr); + + match event!(peers) { + PeerAction::PeerAdded(id) => { + assert_eq!(id, trusted); + } + _ => unreachable!(), + } + + // saturate the remaining inbound slots with untrusted peers + let mut connected_untrusted_peer_ids = Vec::new(); + for i in 0..(peers.connection_info.config.max_inbound - 1) { + let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 1) as u8)), 8008); + assert!(peers.on_incoming_pending_session(addr.ip()).is_ok()); + let peer_id = PeerId::random(); + peers.on_incoming_session_established(peer_id, addr); + connected_untrusted_peer_ids.push(peer_id); + + match event!(peers) { + PeerAction::PeerAdded(id) => { + assert_eq!(id, peer_id); + } + _ => unreachable!(), + } + } + + let mut pending_addrs = Vec::new(); + + // saturate available slots + for i in 0..2 { + let socket_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 10) as u8)), 8008); + assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); + + pending_addrs.push(socket_addr); + } + + assert_eq!(peers.connection_info.num_pending_in, 2); + + // try to handle additional incoming connections at capacity + for i in 0..2 { + let socket_addr = + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, (i + 20) as u8)), 8008); + assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_err()); + } + + let err = PendingSessionHandshakeError::Eth(EthStreamError::P2PStreamError( + P2PStreamError::HandshakeError(P2PHandshakeError::Disconnected( + DisconnectReason::UselessPeer, + )), + )); + + // Remove all pending peers + for pending_addr in pending_addrs { + peers.on_incoming_pending_session_dropped(pending_addr, &err); + } + + println!("num_pending_in: {}", peers.connection_info.num_pending_in); + + println!( + "num_inbound: {}, has_in_capacity: {}", + peers.connection_info.num_inbound, + peers.connection_info.has_in_capacity() + ); + + // disconnect a connected peer + peers.on_active_session_gracefully_closed(connected_untrusted_peer_ids[0]); + + println!( + "num_inbound: {}, has_in_capacity: {}", + peers.connection_info.num_inbound, + peers.connection_info.has_in_capacity() + ); + + let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 99)), 8008); + assert!(peers.on_incoming_pending_session(socket_addr.ip()).is_ok()); + } + #[tokio::test] async fn test_closed_incoming() { let socket_addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 1, 2)), 8008);