From 2a12952bbd7658009e551781ed3d12807afc960b Mon Sep 17 00:00:00 2001 From: Jan Segre Date: Tue, 20 Aug 2024 16:22:30 +0200 Subject: [PATCH] refactor(p2p): move received peers storage to protocol --- hathor/conf/settings.py | 9 ++ hathor/p2p/manager.py | 140 ++++++++++++++------ hathor/p2p/peer.py | 35 ++--- hathor/p2p/peer_discovery/bootstrap.py | 4 +- hathor/p2p/peer_discovery/dns.py | 4 +- hathor/p2p/peer_discovery/peer_discovery.py | 8 +- hathor/p2p/peer_endpoint.py | 13 ++ hathor/p2p/peer_storage.py | 24 +++- hathor/p2p/protocol.py | 22 +++ hathor/p2p/resources/add_peers.py | 2 +- hathor/p2p/states/ready.py | 2 +- hathor/simulator/fake_connection.py | 5 +- tests/others/test_metrics.py | 2 +- tests/p2p/test_bootstrap.py | 4 +- tests/p2p/test_connections.py | 10 +- tests/p2p/test_connectivity.py | 56 ++++++++ tests/p2p/test_peer_id.py | 22 +-- tests/p2p/test_peer_storage.py | 30 +++++ tests/p2p/test_protocol.py | 22 +-- tests/poa/test_poa_simulation.py | 5 +- tests/resources/p2p/test_status.py | 4 +- 21 files changed, 321 insertions(+), 102 deletions(-) create mode 100644 tests/p2p/test_connectivity.py create mode 100644 tests/p2p/test_peer_storage.py diff --git a/hathor/conf/settings.py b/hathor/conf/settings.py index 3edb664a7..9fd45bc3f 100644 --- a/hathor/conf/settings.py +++ b/hathor/conf/settings.py @@ -288,6 +288,9 @@ def GENESIS_TX2_TIMESTAMP(self) -> int: # Maximum period without receiving any messages from ther peer (in seconds). PEER_IDLE_TIMEOUT: int = 60 + # Maximum number of entrypoints that we accept that a peer broadcasts + PEER_MAX_ENTRYPOINTS: int = 30 + # Filepath of ca certificate file to generate connection certificates CA_FILEPATH: str = os.path.join(os.path.dirname(__file__), '../p2p/ca.crt') @@ -431,6 +434,12 @@ def GENESIS_TX2_TIMESTAMP(self) -> int: # more than enough for the forseeable future MAX_MEMPOOL_RECEIVING_TIPS: int = 1000 + # Max number of peers simultanously stored in the node + MAX_VERIFIED_PEERS: int = 10_000 + + # Max number of peers simultanously stored per-connection + MAX_UNVERIFIED_PEERS_PER_CONN: int = 100 + # Used to enable nano contracts. # # This should NEVER be enabled for mainnet and testnet, since both networks will diff --git a/hathor/p2p/manager.py b/hathor/p2p/manager.py index d7e7045c9..56371da69 100644 --- a/hathor/p2p/manager.py +++ b/hathor/p2p/manager.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from collections import deque from typing import TYPE_CHECKING, Any, Iterable, NamedTuple, Optional from structlog import get_logger @@ -30,7 +31,7 @@ from hathor.p2p.peer_discovery import PeerDiscovery from hathor.p2p.peer_endpoint import PeerAddress, PeerEndpoint from hathor.p2p.peer_id import PeerId -from hathor.p2p.peer_storage import UnverifiedPeerStorage, VerifiedPeerStorage +from hathor.p2p.peer_storage import VerifiedPeerStorage from hathor.p2p.protocol import HathorProtocol from hathor.p2p.rate_limiter import RateLimiter from hathor.p2p.states.ready import ReadyState @@ -81,10 +82,10 @@ class GlobalRateLimiter: manager: Optional['HathorManager'] connections: set[HathorProtocol] connected_peers: dict[PeerId, HathorProtocol] + new_connection_from_queue: deque[PeerId] connecting_peers: dict[IStreamClientEndpoint, _ConnectingPeer] handshaking_peers: set[HathorProtocol] whitelist_only: bool - unverified_peer_storage: UnverifiedPeerStorage verified_peer_storage: VerifiedPeerStorage _sync_factories: dict[SyncVersion, SyncAgentFactory] _enabled_sync_versions: set[SyncVersion] @@ -156,12 +157,12 @@ def __init__( # List of peers connected and ready to communicate. self.connected_peers = {} - # List of peers received from the network. - # We cannot trust their identity before we connect to them. - self.unverified_peer_storage = UnverifiedPeerStorage() + # Queue of ready peer-id's used by connect_to_peer_from_connection_queue to choose the next peer to pull a + # random new connection from + self.new_connection_from_queue = deque() # List of known peers. - self.verified_peer_storage = VerifiedPeerStorage() # dict[string (peer.id), PublicPeer] + self.verified_peer_storage = VerifiedPeerStorage(rng=self.rng, max_size=self._settings.MAX_VERIFIED_PEERS) # Maximum unseen time before removing a peer (seconds). self.max_peer_unseen_dt: float = 30 * 60 # 30-minutes @@ -181,6 +182,11 @@ def __init__( # Timestamp of the last time sync was updated. self._last_sync_rotate: float = 0. + # Connect to new peers in a timed loop, instead of as soon as possible + self.lc_connect = LoopingCall(self.connect_to_peer_from_connection_queue) + self.lc_connect.clock = self.reactor + self.lc_connect_interval = 0.2 # seconds + # A timer to try to reconnect to the disconnect known peers. if self._settings.ENABLE_PEER_WHITELIST: self.wl_reconnect = LoopingCall(self.update_whitelist) @@ -272,7 +278,7 @@ def do_discovery(self) -> None: Do a discovery and connect on all discovery strategies. """ for peer_discovery in self.peer_discoveries: - coro = peer_discovery.discover_and_connect(self.connect_to) + coro = peer_discovery.discover_and_connect(self.connect_to_endpoint) Deferred.fromCoroutine(coro) def disable_rate_limiter(self) -> None: @@ -293,6 +299,7 @@ def start(self) -> None: if self.manager is None: raise TypeError('Class was built incorrectly without a HathorManager.') + self._start_peer_connect_loop() self.lc_reconnect.start(5, now=False) self.lc_sync_update.start(self.lc_sync_update_interval, now=False) @@ -319,7 +326,28 @@ def _handle_whitelist_reconnect_err(self, *args: Any, **kwargs: Any) -> None: self.log.error('whitelist reconnect had an exception. Start looping call again.', args=args, kwargs=kwargs) self.reactor.callLater(30, self._start_whitelist_reconnect) + def _start_peer_connect_loop(self) -> None: + # The deferred returned by the LoopingCall start method + # executes when the looping call stops running + # https://docs.twistedmatrix.com/en/stable/api/twisted.internet.task.LoopingCall.html + d = self.lc_connect.start(self.lc_connect_interval, now=True) + d.addErrback(self._handle_peer_connect_err) + + def _handle_peer_connect_err(self, *args: Any, **kwargs: Any) -> None: + # This method will be called when an exception happens inside the peer connect loop + # and ends up stopping the looping call. + # We log the error and start the looping call again. + self.log.error( + 'connect_to_peer_from_connection_queue had an exception. Start looping call again.', + args=args, + kwargs=kwargs, + ) + self.reactor.callLater(self.lc_connect_interval, self._start_peer_connect_loop) + def stop(self) -> None: + if self.lc_connect.running: + self.lc_connect.stop() + if self.lc_reconnect.running: self.lc_reconnect.stop() @@ -406,10 +434,10 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None: """Called when a peer is ready.""" assert protocol.peer is not None self.verified_peer_storage.add_or_replace(protocol.peer) - assert protocol.peer.id is not None self.handshaking_peers.remove(protocol) - self.unverified_peer_storage.pop(protocol.peer.id, None) + for conn in self.iter_all_connections(): + conn.unverified_peer_storage.remove(protocol.peer) # we emit the event even if it's a duplicate peer as a matching # NETWORK_PEER_DISCONNECTED will be emitted regardless @@ -419,7 +447,8 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None: peers_count=self._get_peers_count() ) - if protocol.peer.id in self.connected_peers: + peer_id = protocol.peer.id + if peer_id in self.connected_peers: # connected twice to same peer self.log.warn('duplicate connection to peer', protocol=protocol) conn = self.get_connection_to_drop(protocol) @@ -428,7 +457,11 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None: # the new connection is being dropped, so don't save it to connected_peers return - self.connected_peers[protocol.peer.id] = protocol + self.connected_peers[peer_id] = protocol + if peer_id not in self.new_connection_from_queue: + self.new_connection_from_queue.append(peer_id) + else: + self.log.warn('peer already in queue', peer=str(peer_id)) # In case it was a retry, we must reset the data only here, after it gets ready protocol.peer.info.reset_retry_timestamp() @@ -436,7 +469,7 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None: if len(self.connected_peers) <= self.MAX_ENABLED_SYNC: protocol.enable_sync() - if protocol.peer.id in self.always_enable_sync: + if peer_id in self.always_enable_sync: protocol.enable_sync() # Notify other peers about this new peer connection. @@ -456,7 +489,8 @@ def on_peer_disconnect(self, protocol: HathorProtocol) -> None: if protocol in self.handshaking_peers: self.handshaking_peers.remove(protocol) if protocol._peer is not None: - existing_protocol = self.connected_peers.pop(protocol.peer.id, None) + peer_id = protocol.peer.id + existing_protocol = self.connected_peers.pop(peer_id, None) if existing_protocol is None: # in this case, the connection was closed before it got to READY state return @@ -466,7 +500,10 @@ def on_peer_disconnect(self, protocol: HathorProtocol) -> None: # A check for duplicate connections is done during PEER_ID state, but there's still a # chance it can happen if both connections start at the same time and none of them has # reached READY state while the other is on PEER_ID state - self.connected_peers[protocol.peer.id] = existing_protocol + self.connected_peers[peer_id] = existing_protocol + elif peer_id in self.new_connection_from_queue: + # now we're sure it can be removed from new_connection_from_queue + self.new_connection_from_queue.remove(peer_id) self.pubsub.publish( HathorEvents.NETWORK_PEER_DISCONNECTED, protocol=protocol, @@ -499,15 +536,6 @@ def is_peer_connected(self, peer_id: PeerId) -> bool: """ return peer_id in self.connected_peers - def on_receive_peer(self, peer: UnverifiedPeer, origin: Optional[ReadyState] = None) -> None: - """ Update a peer information in our storage, and instantly attempt to connect - to it if it is not connected yet. - """ - if peer.id == self.my_peer.id: - return - peer = self.unverified_peer_storage.add_or_merge(peer) - self.connect_to_if_not_connected(peer, int(self.reactor.seconds())) - def peers_cleanup(self) -> None: """Clean up aged peers.""" now = self.reactor.seconds() @@ -523,11 +551,45 @@ def peers_cleanup(self) -> None: for remove_peer in to_be_removed: self.verified_peer_storage.remove(remove_peer) - def reconnect_to_all(self) -> None: - """ It is called by the `lc_reconnect` timer and tries to connect to all known - peers. + def connect_to_peer_from_connection_queue(self) -> None: + """ It is called by the `lc_connect` looping call and tries to connect to a new peer. + """ + if not self.new_connection_from_queue: + self.log.debug('connection queue is empty') + return + assert self.manager is not None + self.log.debug('connect to peer from connection queue') + candidate_new_peers: list[UnverifiedPeer] + # we don't know if we will find a candidate, so we can't do `while True:` + for _ in range(len(self.new_connection_from_queue)): + # for a deque([1, 2, 3, 4]) this will get 1 and modify it to deque([2, 3, 4, 1]) + next_from_peer_id = self.new_connection_from_queue[0] + self.new_connection_from_queue.rotate(-1) + + protocol = self.connected_peers.get(next_from_peer_id) + if protocol is None: + self.log.error('expected protocol not found', peer_id=str(next_from_peer_id)) + assert self.new_connection_from_queue.pop() == next_from_peer_id + continue + candidate_new_peers = [ + candidate_peer + for candidate_peer_id, candidate_peer in protocol.unverified_peer_storage.items() + if candidate_peer_id not in self.connected_peers or candidate_peer_id not in self.connecting_peers + ] + if candidate_new_peers: + break + else: + self.log.debug('no new peers in the connection queue') + # this means we rotated through the whole queue and did not find any candidate + return - TODO(epnichols): Should we always connect to *all*? Should there be a max #? + peer = self.rng.choice(candidate_new_peers) + self.log.debug('random peer chosen', peer=str(peer.id), entrypoints=peer.info.entrypoints_as_str()) + now = self.reactor.seconds() + self.connect_to_peer(peer, int(now)) + + def reconnect_to_all(self) -> None: + """ It is called by the `lc_reconnect` timer and tries to connect to all known peers. """ self.peers_cleanup() # when we have no connected peers left, run the discovery process again @@ -536,10 +598,10 @@ def reconnect_to_all(self) -> None: if now - self._last_discovery >= self.PEER_DISCOVERY_INTERVAL: self._last_discovery = now self.do_discovery() - # We need to use list() here because the dict might change inside connect_to_if_not_connected + # We need to use list() here because the dict might change inside connect_to_peer # when the peer is disconnected and without entrypoint for peer in list(self.verified_peer_storage.values()): - self.connect_to_if_not_connected(peer, int(now)) + self.connect_to_peer(peer, int(now)) def update_whitelist(self) -> Deferred[None]: from twisted.web.client import readBody @@ -582,7 +644,7 @@ def _update_whitelist_cb(self, body: bytes) -> None: for peer_id in peers_to_remove: self.manager.remove_peer_from_whitelist_and_disconnect(peer_id) - def connect_to_if_not_connected(self, peer: UnverifiedPeer | PublicPeer, now: int) -> None: + def connect_to_peer(self, peer: UnverifiedPeer | PublicPeer, now: int) -> None: """ Attempts to connect if it is not connected to the peer. """ if not peer.info.entrypoints or ( @@ -602,15 +664,16 @@ def connect_to_if_not_connected(self, peer: UnverifiedPeer | PublicPeer, now: in assert peer.id is not None if peer.info.can_retry(now): if self.enable_ipv6 and not self.disable_ipv4: - addr = self.rng.choice(peer.info.entrypoints) + addr = self.rng.choice(list(peer.info.entrypoints)) elif self.enable_ipv6 and self.disable_ipv4: addr = self.rng.choice(peer.info.get_ipv6_only_entrypoints()) elif not self.enable_ipv6 and not self.disable_ipv4: addr = self.rng.choice(peer.info.get_ipv4_only_entrypoints()) else: raise ValueError('IPv4 is disabled and IPv6 is not enabled') - - self.connect_to(addr.with_id(peer.id), peer) + self.connect_to_endpoint(addr.with_id(peer.id), peer) + else: + self.log.debug('connecting too often, skip retrying', peer=str(peer.id)) def _connect_to_callback( self, @@ -628,14 +691,17 @@ def _connect_to_callback( protocol.wrappedProtocol.on_outbound_connect(entrypoint, peer) self.connecting_peers.pop(endpoint) - def connect_to( + def connect_to_endpoint( self, entrypoint: PeerEndpoint, peer: UnverifiedPeer | PublicPeer | None = None, use_ssl: bool | None = None, ) -> None: - """ Attempt to connect to a peer, even if a connection already exists. - Usually you should call `connect_to_if_not_connected`. + """ Attempt to connect directly to an endpoint, prefer calling `connect_to_peer` when possible. + + This method does not take into account the peer's id (since we might not even know it, or have verified it even + if we know). But this method will check if there's already a connection open to the given endpoint and skip it + if there is one. If `use_ssl` is True, then the connection will be wraped by a TLS. """ @@ -747,7 +813,7 @@ def update_hostname_entrypoints(self, *, old_hostname: str | None, new_hostname: def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Address) -> None: hostname_entrypoint = PeerAddress.from_hostname_address(hostname, address) - self.my_peer.info.entrypoints.append(hostname_entrypoint) + self.my_peer.info.entrypoints.add(hostname_entrypoint) def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol: """ When there are duplicate connections, determine which one should be dropped. diff --git a/hathor/p2p/peer.py b/hathor/p2p/peer.py index 8bc963b93..f3e0dfa05 100644 --- a/hathor/p2p/peer.py +++ b/hathor/p2p/peer.py @@ -106,11 +106,11 @@ class PeerInfo: """ Stores entrypoint and connection attempts information. """ - entrypoints: list[PeerAddress] = field(default_factory=list) - retry_timestamp: int = 0 # should only try connecting to this peer after this timestamp - retry_interval: int = 5 # how long to wait for next connection retry. It will double for each failure - retry_attempts: int = 0 # how many retries were made - last_seen: float = inf # last time this peer was seen + entrypoints: set[PeerAddress] = field(default_factory=set) + retry_timestamp: int = 0 # should only try connecting to this peer after this timestamp + retry_interval: int = 5 # how long to wait for next connection retry. It will double for each failure + retry_attempts: int = 0 # how many retries were made + last_seen: float = inf # last time this peer was seen flags: set[str] = field(default_factory=set) _settings: HathorSettings = field(default_factory=get_global_settings, repr=False) @@ -121,21 +121,18 @@ def get_ipv6_only_entrypoints(self) -> list[PeerAddress]: return list(filter(lambda e: e.is_ipv6(), self.entrypoints)) def ipv4_entrypoints_as_str(self) -> list[str]: - return list(map(str, self.get_ipv4_only_entrypoints())) + return sorted(map(str, self.get_ipv4_only_entrypoints())) def ipv6_entrypoints_as_str(self) -> list[str]: - return list(map(str, self.get_ipv6_only_entrypoints())) + return sorted(map(str, self.get_ipv6_only_entrypoints())) def entrypoints_as_str(self) -> list[str]: """Return a list of entrypoints serialized as str""" - return list(map(str, self.entrypoints)) + return sorted(map(str, self.entrypoints)) def _merge(self, other: PeerInfo) -> None: """Actual merge execution, must only be made after verifications.""" - # Merge entrypoints. - for ep in other.entrypoints: - if ep not in self.entrypoints: - self.entrypoints.append(ep) + self.entrypoints.update(other.entrypoints) async def validate_entrypoint(self, protocol: HathorProtocol) -> bool: """ Validates if connection entrypoint is one of the peer entrypoints @@ -237,7 +234,7 @@ def create_from_json(cls, data: dict[str, Any]) -> Self: It is to create an UnverifiedPeer from a peer connection. """ peer_id = PeerId(data['id']) - endpoints = [] + endpoints = set() for endpoint_str in data.get('entrypoints', []): # We have to parse using PeerEndpoint to be able to support older peers that still @@ -245,12 +242,14 @@ def create_from_json(cls, data: dict[str, Any]) -> Self: endpoint = PeerEndpoint.parse(endpoint_str) if endpoint.peer_id is not None and endpoint.peer_id != peer_id: raise ValueError(f'conflicting peer_id: {endpoint.peer_id} != {peer_id}') - endpoints.append(endpoint.addr) + endpoints.add(endpoint.addr) - return cls( + obj = cls( id=peer_id, info=PeerInfo(entrypoints=endpoints), ) + obj.validate() + return obj def merge(self, other: UnverifiedPeer) -> None: """ Merge two UnverifiedPeer objects, checking that they have the same @@ -259,6 +258,12 @@ def merge(self, other: UnverifiedPeer) -> None: """ assert self.id == other.id self.info._merge(other.info) + self.validate() + + def validate(self) -> None: + """Check if there are too many entrypoints.""" + if len(self.info.entrypoints) > self.info._settings.PEER_MAX_ENTRYPOINTS: + raise InvalidPeerIdException('too many entrypoints') @dataclass(slots=True) diff --git a/hathor/p2p/peer_discovery/bootstrap.py b/hathor/p2p/peer_discovery/bootstrap.py index 55b5e9f16..23399e2ed 100644 --- a/hathor/p2p/peer_discovery/bootstrap.py +++ b/hathor/p2p/peer_discovery/bootstrap.py @@ -37,6 +37,6 @@ def __init__(self, entrypoints: list[PeerEndpoint]): self.entrypoints = entrypoints @override - async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], None]) -> None: + async def discover_and_connect(self, connect_to_endpoint: Callable[[PeerEndpoint], None]) -> None: for entrypoint in self.entrypoints: - connect_to(entrypoint) + connect_to_endpoint(entrypoint) diff --git a/hathor/p2p/peer_discovery/dns.py b/hathor/p2p/peer_discovery/dns.py index c5dfe74d6..9ef792a96 100644 --- a/hathor/p2p/peer_discovery/dns.py +++ b/hathor/p2p/peer_discovery/dns.py @@ -53,13 +53,13 @@ def do_lookup_text(self, host: str) -> Deferred[LookupResult]: return lookupText(host) @override - async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], None]) -> None: + async def discover_and_connect(self, connect_to_endpoint: Callable[[PeerEndpoint], None]) -> None: """ Run DNS lookup for host and connect to it This is executed when starting the DNS Peer Discovery and first connecting to the network """ for host in self.hosts: for entrypoint in (await self.dns_seed_lookup(host)): - connect_to(entrypoint) + connect_to_endpoint(entrypoint) async def dns_seed_lookup(self, host: str) -> set[PeerEndpoint]: """ Run a DNS lookup for TXT, A, and AAAA records and return a list of connection strings. diff --git a/hathor/p2p/peer_discovery/peer_discovery.py b/hathor/p2p/peer_discovery/peer_discovery.py index 7d040fae2..ae8ee626b 100644 --- a/hathor/p2p/peer_discovery/peer_discovery.py +++ b/hathor/p2p/peer_discovery/peer_discovery.py @@ -23,10 +23,10 @@ class PeerDiscovery(ABC): """ @abstractmethod - async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], None]) -> None: - """ This method must discover the peers and call `connect_to` for each of them. + async def discover_and_connect(self, connect_to_endpoint: Callable[[PeerEndpoint], None]) -> None: + """ This method must discover the peers and call `connect_to_endpoint` for each of them. - :param connect_to: Function which will be called for each discovered peer. - :type connect_to: function + :param connect_to_endpoint: Function which will be called for each discovered peer. + :type connect_to_endpoint: function """ raise NotImplementedError diff --git a/hathor/p2p/peer_endpoint.py b/hathor/p2p/peer_endpoint.py index 62e4624a2..b98ec28fc 100644 --- a/hathor/p2p/peer_endpoint.py +++ b/hathor/p2p/peer_endpoint.py @@ -64,6 +64,9 @@ """ IPV6_REGEX = re.compile(r'''^(([0-9a-fA-F]{1,4}:){7}([0-9a-fA-F]{1,4}|:)|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:))$''') # noqa: E501 +# A host with length 64 and over would be rejected later by twisted +MAX_HOST_LEN = 63 + class Protocol(Enum): TCP = 'tcp' @@ -261,6 +264,14 @@ def parse(cls, description: str) -> PeerEndpoint: >>> str(PeerEndpoint.parse('tcp://foo.bar.baz:40403/')) 'tcp://foo.bar.baz:40403' + >>> str(PeerEndpoint.parse('tcp://foooooooooooooooooooo.baaaaaaaaaaaaaaaaaar.baaaaaaaaaaaaaaaaaaz:40403/')) + 'tcp://foooooooooooooooooooo.baaaaaaaaaaaaaaaaaar.baaaaaaaaaaaaaaaaaaz:40403' + + >>> PeerEndpoint.parse('tcp://foooooooooooooooooooo.baaaaaaaaaaaaaaaaaar.baaaaaaaaaaaaaaaaaazz:40403/') + Traceback (most recent call last): + ... + ValueError: hostname too long + >>> PeerEndpoint.parse('tcp://127.0.0.1:40403/?id=123') Traceback (most recent call last): ... @@ -317,6 +328,8 @@ def _parse_address_parts(description: str) -> tuple[Protocol, str, int, str]: host = url.hostname if host is None: raise ValueError(f'expected a host: "{description}"') + if len(host) > MAX_HOST_LEN: + raise ValueError('hostname too long') port = url.port if port is None: raise ValueError(f'expected a port: "{description}"') diff --git a/hathor/p2p/peer_storage.py b/hathor/p2p/peer_storage.py index b6a433077..6f3744439 100644 --- a/hathor/p2p/peer_storage.py +++ b/hathor/p2p/peer_storage.py @@ -18,6 +18,7 @@ from hathor.p2p.peer import PublicPeer, UnverifiedPeer from hathor.p2p.peer_id import PeerId +from hathor.util import Random class GenericPeer(Protocol): @@ -36,6 +37,18 @@ class _BasePeerStorage(dict[PeerId, PeerType]): """ Base class for VerifiedPeerStorage and UnverifiedPeerStorage, do not use directly. """ + def __init__(self, *, rng: Random, max_size: int) -> None: + self.rng = rng + self.max_size = max_size + + def _ensure_max_size(self) -> None: + to_remove_count = len(self) - self.max_size + if to_remove_count < 1: + return + to_remove = self.rng.choices(list(self.keys()), k=to_remove_count) + for k in to_remove: + self.pop(k) + def add(self, peer: PeerType) -> None: """ Add a new peer to the storage. @@ -45,6 +58,7 @@ def add(self, peer: PeerType) -> None: if peer.id in self: raise ValueError('Peer has already been added') self[peer.id] = peer + self._ensure_max_size() def add_or_merge(self, peer: PeerType) -> PeerType: """ Add a peer to the storage if it has not been added yet. Otherwise, merge it with the existing peer. @@ -76,14 +90,16 @@ def remove(self, peer: GenericPeer) -> None: class VerifiedPeerStorage(_BasePeerStorage[PublicPeer]): - """ VerifiedPeerStorage is used to store all peers that we have connected to and verified. + """ Used to store all peers that we have connected to and verified. - It is a dict of PublicPeer objects, and peers can be retrieved by their `peer.id`. + It is a dict of `PublicPeer` objects that should live in the `ConnectionsManager`, the keys are the `peer.id` of + the remote peers. """ class UnverifiedPeerStorage(_BasePeerStorage[UnverifiedPeer]): - """ UnverifiedPeerStorage is used to store all received peers, we haven't verified their ids/entrypoints yet. + """ Used to store all peers that we have connected to and verified. - It is a dict of Peer objects, and peers can be retrieved by their `peer.id`. + It is a dict of `UnverifiedPeer` objects that should live in a `HathorProtocol`, the keys are the `peer.id` of + the remote peers. """ diff --git a/hathor/p2p/protocol.py b/hathor/p2p/protocol.py index cd90601e8..b582fcb77 100644 --- a/hathor/p2p/protocol.py +++ b/hathor/p2p/protocol.py @@ -29,6 +29,7 @@ from hathor.p2p.peer import PrivatePeer, PublicPeer, UnverifiedPeer from hathor.p2p.peer_endpoint import PeerEndpoint from hathor.p2p.peer_id import PeerId +from hathor.p2p.peer_storage import UnverifiedPeerStorage from hathor.p2p.rate_limiter import RateLimiter from hathor.p2p.states import BaseState, HelloState, PeerIdState, ReadyState from hathor.p2p.sync_version import SyncVersion @@ -164,6 +165,13 @@ def __init__( self.use_ssl: bool = use_ssl + # List of peers received from the network. + # We cannot trust their identity before we connect to them. + self.unverified_peer_storage = UnverifiedPeerStorage( + rng=self.connections.rng, + max_size=self._settings.MAX_UNVERIFIED_PEERS_PER_CONN, + ) + # Protocol version is initially unset self.sync_version = None @@ -368,6 +376,20 @@ def disconnect(self, reason: str = '', *, force: bool = False) -> None: else: transport.abortConnection() + def on_receive_peer(self, peer: UnverifiedPeer) -> None: + """ Update a peer information in our storage, the manager's connection loop will pick it later. + """ + # ignore when the remote echo backs our own peer + if peer.id == self.my_peer.id: + return + # ignore peers we've already connected to + if peer.id in self.connections.verified_peer_storage: + return + # merge with known previous information received from this peer since we don't know what's right (a peer can + # change their entrypoints, but the old could still echo, since we haven't connected yet don't assume anything + # and just merge them) + self.unverified_peer_storage.add_or_merge(peer) + def handle_error(self, payload: str) -> None: """ Executed when an ERROR command is received. """ diff --git a/hathor/p2p/resources/add_peers.py b/hathor/p2p/resources/add_peers.py index c8faeb5dc..fcfe9732d 100644 --- a/hathor/p2p/resources/add_peers.py +++ b/hathor/p2p/resources/add_peers.py @@ -86,7 +86,7 @@ def already_connected(endpoint: PeerEndpoint) -> bool: pd = BootstrapPeerDiscovery(filtered_peers) # this fires and forget the coroutine, which is compatible with the original behavior - coro = pd.discover_and_connect(self.manager.connections.connect_to) + coro = pd.discover_and_connect(self.manager.connections.connect_to_endpoint) Deferred.fromCoroutine(coro) ret = {'success': True, 'peers': [str(p) for p in filtered_peers]} diff --git a/hathor/p2p/states/ready.py b/hathor/p2p/states/ready.py index fe1924347..7d10dcc98 100644 --- a/hathor/p2p/states/ready.py +++ b/hathor/p2p/states/ready.py @@ -187,7 +187,7 @@ def handle_peers(self, payload: str) -> None: for data in received_peers: peer = UnverifiedPeer.create_from_json(data) if self.protocol.connections: - self.protocol.connections.on_receive_peer(peer, origin=self) + self.protocol.on_receive_peer(peer) self.log.debug('received peers', payload=payload) def send_ping_if_necessary(self) -> None: diff --git a/hathor/simulator/fake_connection.py b/hathor/simulator/fake_connection.py index 3c030c901..4f569e818 100644 --- a/hathor/simulator/fake_connection.py +++ b/hathor/simulator/fake_connection.py @@ -275,9 +275,10 @@ def reconnect(self) -> None: self._proto1 = self.manager1.connections.server_factory.buildProtocol(self.addr2) self._proto2 = self.manager2.connections.client_factory.buildProtocol(self.addr1) - # When _fake_bootstrap_id is set we don't pass the peer because that's how bootstrap calls connect_to() + # When _fake_bootstrap_id is set we don't pass the peer because that's how bootstrap calls + # connect_to_endpoint() peer = self._proto1.my_peer.to_unverified_peer() if self._fake_bootstrap_id is False else None - self.manager2.connections.connect_to(self.entrypoint, peer) + self.manager2.connections.connect_to_endpoint(self.entrypoint, peer) connecting_peers = list(self.manager2.connections.connecting_peers.values()) for connecting_peer in connecting_peers: diff --git a/tests/others/test_metrics.py b/tests/others/test_metrics.py index b46f6985b..0fb201377 100644 --- a/tests/others/test_metrics.py +++ b/tests/others/test_metrics.py @@ -72,7 +72,7 @@ def test_connections_manager_integration(self): # Execution endpoint = PeerEndpoint.parse('tcp://127.0.0.1:8005') # This will trigger sending to the pubsub one of the network events - manager.connections.connect_to(endpoint, use_ssl=True) + manager.connections.connect_to_endpoint(endpoint, use_ssl=True) self.run_to_completion() diff --git a/tests/p2p/test_bootstrap.py b/tests/p2p/test_bootstrap.py index 9855a0fda..7ae668057 100644 --- a/tests/p2p/test_bootstrap.py +++ b/tests/p2p/test_bootstrap.py @@ -19,10 +19,10 @@ def __init__(self, mocked_host_ports: list[tuple[str, int]]): self.mocked_host_ports = mocked_host_ports @override - async def discover_and_connect(self, connect_to: Callable[[PeerEndpoint], None]) -> None: + async def discover_and_connect(self, connect_to_endpoint: Callable[[PeerEndpoint], None]) -> None: for host, port in self.mocked_host_ports: addr = PeerAddress(Protocol.TCP, host, port) - connect_to(addr.with_id()) + connect_to_endpoint(addr.with_id()) class MockDNSPeerDiscovery(DNSPeerDiscovery): diff --git a/tests/p2p/test_connections.py b/tests/p2p/test_connections.py index db5a85f1e..a0c910dda 100644 --- a/tests/p2p/test_connections.py +++ b/tests/p2p/test_connections.py @@ -18,7 +18,7 @@ def test_manager_connections(self) -> None: manager: HathorManager = self.create_peer('testnet', enable_sync_v1=True, enable_sync_v2=False) endpoint = PeerEndpoint.parse('tcp://127.0.0.1:8005') - manager.connections.connect_to(endpoint, use_ssl=True) + manager.connections.connect_to_endpoint(endpoint, use_ssl=True) self.assertIn(endpoint, manager.connections.iter_not_ready_endpoints()) self.assertNotIn(endpoint, manager.connections.iter_ready_connections()) @@ -36,7 +36,7 @@ def test_manager_disabled_ipv6(self) -> None: ) endpoint = PeerEndpoint.parse('tcp://[::1]:8005') - manager.connections.connect_to(endpoint, use_ssl=True) + manager.connections.connect_to_endpoint(endpoint, use_ssl=True) self.assertNotIn(endpoint, manager.connections.iter_not_ready_endpoints()) self.assertNotIn(endpoint, manager.connections.iter_ready_connections()) @@ -54,10 +54,10 @@ def test_manager_enabled_ipv6_and_ipv4(self) -> None: ) endpoint_ipv6 = PeerEndpoint.parse('tcp://[::3:2:1]:8005') - manager.connections.connect_to(endpoint_ipv6, use_ssl=True) + manager.connections.connect_to_endpoint(endpoint_ipv6, use_ssl=True) endpoint_ipv4 = PeerEndpoint.parse('tcp://1.2.3.4:8005') - manager.connections.connect_to(endpoint_ipv4, use_ssl=True) + manager.connections.connect_to_endpoint(endpoint_ipv4, use_ssl=True) self.assertIn( endpoint_ipv4.addr.host, @@ -84,7 +84,7 @@ def test_manager_disabled_ipv4(self) -> None: ) endpoint = PeerEndpoint.parse('tcp://127.0.0.1:8005') - manager.connections.connect_to(endpoint, use_ssl=True) + manager.connections.connect_to_endpoint(endpoint, use_ssl=True) self.assertEqual(0, len(list(manager.connections.iter_not_ready_endpoints()))) self.assertEqual(0, len(list(manager.connections.iter_ready_connections()))) diff --git a/tests/p2p/test_connectivity.py b/tests/p2p/test_connectivity.py new file mode 100644 index 000000000..328a17050 --- /dev/null +++ b/tests/p2p/test_connectivity.py @@ -0,0 +1,56 @@ +import time +import urllib +from contextlib import contextmanager +from typing import Generator + +import requests + +from tests.utils import run_server + + +@contextmanager +def _run_servers_context(count: int) -> Generator[list[tuple[str, str]], None, None]: + """ Runs `count` number of `test.utils.run_server` that bootstrap in chain, yields a (endpoint, status_url) list. + """ + if count > 80: + raise ValueError('cannot start more than 80 processes at once') + start_port = 8005 + endpoint_and_status_urls = [] + processes = [] + try: + previous_endpoint: None | str = None + for listen_port in range(start_port, start_port + count): + status_port = listen_port + 80 + endpoint = f'tcp://127.0.0.1:{listen_port}' + status_url = f'http://127.0.0.1:{status_port}' + # XXX: it's important for run_server to be inside the try because if it fails it will still terminate the + # ones that were previously started because they would have made it into the processes list + processes.append(run_server(listen=listen_port, status=status_port, bootstrap=previous_endpoint)) + endpoint_and_status_urls.append((endpoint, status_url)) + previous_endpoint = endpoint + yield endpoint_and_status_urls + finally: + for process in processes: + # XXX: this assumes process.terminate() will not fail + process.terminate() + + +def test_manager_connection_transitivity() -> None: + """ Creates a chain of 4 peers that bootstrap to the previous one, they should all connect to each other. + """ + with _run_servers_context(4) as endpoint_status_pairs: + assert len(endpoint_status_pairs) == 4 + time.sleep(1) # 1 sec should be more than enough for the peers to connect to themselves + + statuses = [ + requests.get(urllib.parse.urljoin(status_url, '/v1a/status/')).json() + for _, status_url in endpoint_status_pairs + ] + + all_peer_ids = set(status['server']['id'] for status in statuses) + + for status in statuses: + peer_id = status['server']['id'] + all_other_peer_ids = all_peer_ids - {peer_id} + connected_peer_ids = {i['id'] for i in status['connections']['connected_peers']} + assert all_other_peer_ids == connected_peer_ids diff --git a/tests/p2p/test_peer_id.py b/tests/p2p/test_peer_id.py index 1f95cbd12..75030843b 100644 --- a/tests/p2p/test_peer_id.py +++ b/tests/p2p/test_peer_id.py @@ -72,7 +72,7 @@ def test_sign_verify_fail(self) -> None: def test_merge_peer(self) -> None: # Testing peer storage with merge of peers - peer_storage = VerifiedPeerStorage() + peer_storage = VerifiedPeerStorage(rng=self.rng, max_size=100) p1 = PrivatePeer.auto_generated() p2 = PrivatePeer.auto_generated() @@ -86,19 +86,19 @@ def test_merge_peer(self) -> None: peer = peer_storage[p1.id] self.assertEqual(peer.id, p1.id) self.assertEqual(peer.public_key, p1.public_key) - self.assertEqual(peer.info.entrypoints, []) + self.assertEqual(peer.info.entrypoints, set()) ep1 = PeerAddress.parse('tcp://127.0.0.1:1001') ep2 = PeerAddress.parse('tcp://127.0.0.1:1002') ep3 = PeerAddress.parse('tcp://127.0.0.1:1003') p3 = PrivatePeer.auto_generated().to_public_peer() - p3.info.entrypoints.append(ep1) - p3.info.entrypoints.append(ep2) + p3.info.entrypoints.add(ep1) + p3.info.entrypoints.add(ep2) p4 = PublicPeer(UnverifiedPeer(id=p3.id), public_key=p3.public_key) - p4.info.entrypoints.append(ep2) - p4.info.entrypoints.append(ep3) + p4.info.entrypoints.add(ep2) + p4.info.entrypoints.add(ep3) peer_storage.add_or_merge(p4) self.assertEqual(len(peer_storage), 2) @@ -213,16 +213,16 @@ def test_unverified_peer_to_json_roundtrip(self) -> None: peer_json_simple = dict( id=str(peer_id), - entrypoints=[addr1, addr2, addr3] + entrypoints=sorted({addr1, addr2, addr3}) ) result = UnverifiedPeer.create_from_json(peer_json_simple) assert result.id == peer_id - assert result.info.entrypoints == [ + assert result.info.entrypoints == { PeerAddress.parse(addr1), PeerAddress.parse(addr2), PeerAddress.parse(addr3), - ] + } assert result.to_json() == peer_json_simple # We support this for compatibility with old peers that may send ids in the URLs @@ -237,11 +237,11 @@ def test_unverified_peer_to_json_roundtrip(self) -> None: result = UnverifiedPeer.create_from_json(peer_json_with_ids) assert result.id == peer_id - assert result.info.entrypoints == [ + assert result.info.entrypoints == { PeerAddress.parse(addr1), PeerAddress.parse(addr2), PeerAddress.parse(addr3), - ] + } assert result.to_json() == peer_json_simple # the roundtrip erases the ids from the URLs other_peer_id = PrivatePeer.auto_generated().id diff --git a/tests/p2p/test_peer_storage.py b/tests/p2p/test_peer_storage.py new file mode 100644 index 000000000..8fbbad4f6 --- /dev/null +++ b/tests/p2p/test_peer_storage.py @@ -0,0 +1,30 @@ +import pytest + +from hathor.p2p.peer_storage import UnverifiedPeerStorage, VerifiedPeerStorage +from hathor.util import Random +from tests.unittest import PEER_ID_POOL + + +@pytest.fixture +def rng() -> Random: + import secrets + seed = secrets.randbits(64) + return Random(seed) + + +def test_unverified_peer_storage_max_size(rng: Random) -> None: + max_size = 5 + peer_storage = UnverifiedPeerStorage(rng=rng, max_size=max_size) + for i in range(2 * max_size): + peer = PEER_ID_POOL[i].to_unverified_peer() + peer_storage.add(peer) + assert len(peer_storage) == max_size + + +def test_verified_peer_storage_max_size(rng: Random) -> None: + max_size = 5 + peer_storage = VerifiedPeerStorage(rng=rng, max_size=max_size) + for i in range(2 * max_size): + peer = PEER_ID_POOL[i].to_public_peer() + peer_storage.add(peer) + assert len(peer_storage) == max_size diff --git a/tests/p2p/test_protocol.py b/tests/p2p/test_protocol.py index 708af1f0d..7a054b578 100644 --- a/tests/p2p/test_protocol.py +++ b/tests/p2p/test_protocol.py @@ -78,8 +78,8 @@ def test_on_connect(self) -> None: def test_peer_with_entrypoint(self) -> None: entrypoint_str = 'tcp://192.168.1.1:54321' entrypoint = PeerAddress.parse(entrypoint_str) - self.peer1.info.entrypoints.append(entrypoint) - self.peer2.info.entrypoints.append(entrypoint) + self.peer1.info.entrypoints.add(entrypoint) + self.peer2.info.entrypoints.add(entrypoint) self.conn.run_one_step() # HELLO msg1 = self.conn.peek_tr1_value() @@ -228,9 +228,9 @@ def test_hello_without_ipv6_capability(self) -> None: entrypoint_1_ipv4 = PeerEndpoint.parse(f'tcp://192.168.1.1:{port1}') entrypoint_2_ipv4 = PeerEndpoint.parse(f'tcp://192.168.1.1:{port2}') - self.peer1.info.entrypoints.append(entrypoint_1_ipv6.addr) - self.peer1.info.entrypoints.append(entrypoint_1_ipv4.addr) - self.peer2.info.entrypoints.append(entrypoint_2_ipv4.addr) + self.peer1.info.entrypoints.add(entrypoint_1_ipv6.addr) + self.peer1.info.entrypoints.add(entrypoint_1_ipv4.addr) + self.peer2.info.entrypoints.add(entrypoint_2_ipv4.addr) conn = FakeConnection(manager1, manager2, addr1=addr1, addr2=addr2) @@ -239,8 +239,8 @@ def test_hello_without_ipv6_capability(self) -> None: self.assertEqual(len(conn.proto1.peer.info.entrypoints), 1) self.assertEqual(len(conn.proto2.peer.info.entrypoints), 1) - self.assertEqual(conn.proto1.peer.info.entrypoints[0].host, '192.168.1.1') - self.assertEqual(conn.proto2.peer.info.entrypoints[0].host, '192.168.1.1') + self.assertEqual(next(iter(conn.proto1.peer.info.entrypoints)).host, '192.168.1.1') + self.assertEqual(next(iter(conn.proto2.peer.info.entrypoints)).host, '192.168.1.1') def test_hello_with_ipv6_capability(self) -> None: """Tests the connection between peers with the IPV6 capability. @@ -268,9 +268,9 @@ def test_hello_with_ipv6_capability(self) -> None: entrypoint_1_ipv4 = PeerEndpoint.parse(f'tcp://192.168.1.1:{port1}') entrypoint_2_ipv4 = PeerEndpoint.parse(f'tcp://192.168.1.1:{port2}') - self.peer1.info.entrypoints.append(entrypoint_1_ipv6.addr) - self.peer1.info.entrypoints.append(entrypoint_1_ipv4.addr) - self.peer2.info.entrypoints.append(entrypoint_2_ipv4.addr) + self.peer1.info.entrypoints.add(entrypoint_1_ipv6.addr) + self.peer1.info.entrypoints.add(entrypoint_1_ipv4.addr) + self.peer2.info.entrypoints.add(entrypoint_2_ipv4.addr) conn = FakeConnection(manager1, manager2, addr1=addr1, addr2=addr2) @@ -281,7 +281,7 @@ def test_hello_with_ipv6_capability(self) -> None: self.assertEqual(len(conn.proto2.peer.info.entrypoints), 2) self.assertTrue('::1' in map(lambda x: x.host, conn.proto2.peer.info.entrypoints)) self.assertTrue('192.168.1.1' in map(lambda x: x.host, conn.proto2.peer.info.entrypoints)) - self.assertEqual(conn.proto1.peer.info.entrypoints[0].host, '192.168.1.1') + self.assertEqual(next(iter(conn.proto1.peer.info.entrypoints)).host, '192.168.1.1') def test_invalid_same_peer_id(self) -> None: manager3 = self.create_peer(self.network, peer=self.peer1) diff --git a/tests/poa/test_poa_simulation.py b/tests/poa/test_poa_simulation.py index b0b787f6e..096a93fd1 100644 --- a/tests/poa/test_poa_simulation.py +++ b/tests/poa/test_poa_simulation.py @@ -30,6 +30,7 @@ from hathor.crypto.util import get_address_b58_from_public_key_bytes, get_public_key_bytes_compressed from hathor.manager import HathorManager from hathor.simulator import FakeConnection +from hathor.simulator.trigger import StopWhenTrue from hathor.transaction import BaseTransaction, Block, TxInput, TxOutput from hathor.transaction.genesis import generate_new_genesis from hathor.transaction.poa import PoaBlock @@ -156,8 +157,8 @@ def test_two_producers(self) -> None: connection = FakeConnection(manager1, manager2) self.simulator.add_connection(connection) - # both managers are producing blocks - self.simulator.run(100) + trigger = StopWhenTrue(lambda: manager2.tx_storage.get_block_count() == 12) + assert self.simulator.run(200, trigger=trigger) assert manager1.tx_storage.get_block_count() == 12 assert manager2.tx_storage.get_block_count() == 12 assert manager1.tx_storage.get_best_block_tips() == manager2.tx_storage.get_best_block_tips() diff --git a/tests/resources/p2p/test_status.py b/tests/resources/p2p/test_status.py index 646ba6903..e7c322f74 100644 --- a/tests/resources/p2p/test_status.py +++ b/tests/resources/p2p/test_status.py @@ -18,13 +18,13 @@ def setUp(self): super().setUp() self.web = StubSite(StatusResource(self.manager)) address1 = IPv4Address('TCP', '192.168.1.1', 54321) - self.manager.connections.my_peer.info.entrypoints.append(PeerAddress.from_address(address1)) + self.manager.connections.my_peer.info.entrypoints.add(PeerAddress.from_address(address1)) self.manager.peers_whitelist.append(self.get_random_peer_from_pool().id) self.manager.peers_whitelist.append(self.get_random_peer_from_pool().id) self.manager2 = self.create_peer('testnet') address2 = IPv4Address('TCP', '192.168.1.1', 54322) - self.manager2.connections.my_peer.info.entrypoints.append(PeerAddress.from_address(address2)) + self.manager2.connections.my_peer.info.entrypoints.add(PeerAddress.from_address(address2)) self.conn1 = FakeConnection(self.manager, self.manager2, addr1=address1, addr2=address2) @inlineCallbacks