Skip to content

Commit

Permalink
refactor(p2p): refactor peer address handling (#1173)
Browse files Browse the repository at this point in the history
  • Loading branch information
glevco authored Nov 7, 2024
1 parent 127c7fe commit 487e731
Show file tree
Hide file tree
Showing 20 changed files with 747 additions and 393 deletions.
4 changes: 2 additions & 2 deletions hathor/builder/cli_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@
from hathor.indexes import IndexesManager, MemoryIndexesManager, RocksDBIndexesManager
from hathor.manager import HathorManager
from hathor.mining.cpu_mining_service import CpuMiningService
from hathor.p2p.entrypoint import Entrypoint
from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.peer import PrivatePeer
from hathor.p2p.peer_endpoint import PeerEndpoint
from hathor.p2p.utils import discover_hostname, get_genesis_short_hash
from hathor.pubsub import PubSubManager
from hathor.reactor import ReactorProtocol as Reactor
Expand Down Expand Up @@ -420,7 +420,7 @@ def create_manager(self, reactor: Reactor) -> HathorManager:
p2p_manager.add_peer_discovery(DNSPeerDiscovery(dns_hosts))

if self._args.bootstrap:
entrypoints = [Entrypoint.parse(desc) for desc in self._args.bootstrap]
entrypoints = [PeerEndpoint.parse(desc) for desc in self._args.bootstrap]
p2p_manager.add_peer_discovery(BootstrapPeerDiscovery(entrypoints))

if self._args.x_rocksdb_indexes:
Expand Down
215 changes: 0 additions & 215 deletions hathor/p2p/entrypoint.py

This file was deleted.

48 changes: 26 additions & 22 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
from twisted.web.client import Agent

from hathor.conf.settings import HathorSettings
from hathor.p2p.entrypoint import Entrypoint
from hathor.p2p.netfilter.factory import NetfilterFactory
from hathor.p2p.peer import PrivatePeer, PublicPeer, UnverifiedPeer
from hathor.p2p.peer_discovery import PeerDiscovery
from hathor.p2p.peer_endpoint import PeerAddress, PeerEndpoint
from hathor.p2p.peer_id import PeerId
from hathor.p2p.peer_storage import UnverifiedPeerStorage, VerifiedPeerStorage
from hathor.p2p.protocol import HathorProtocol
Expand Down Expand Up @@ -60,7 +60,7 @@ class _SyncRotateInfo(NamedTuple):


class _ConnectingPeer(NamedTuple):
entrypoint: Entrypoint
entrypoint: PeerEndpoint
endpoint_deferred: Deferred


Expand Down Expand Up @@ -370,7 +370,7 @@ def on_connection_failure(self, failure: Failure, peer: Optional[UnverifiedPeer
endpoint: IStreamClientEndpoint) -> None:
connecting_peer = self.connecting_peers[endpoint]
entrypoint = connecting_peer.entrypoint
self.log.warn('connection failure', entrypoint=entrypoint, failure=failure.getErrorMessage())
self.log.warn('connection failure', entrypoint=str(entrypoint), failure=failure.getErrorMessage())
self.connecting_peers.pop(endpoint)

self.pubsub.publish(
Expand Down Expand Up @@ -475,7 +475,7 @@ def iter_ready_connections(self) -> Iterable[HathorProtocol]:
for conn in self.connected_peers.values():
yield conn

def iter_not_ready_endpoints(self) -> Iterable[Entrypoint]:
def iter_not_ready_endpoints(self) -> Iterable[PeerEndpoint]:
"""Iterate over not-ready connections."""
for connecting_peer in self.connecting_peers.values():
yield connecting_peer.entrypoint
Expand Down Expand Up @@ -589,27 +589,28 @@ def connect_to_if_not_connected(self, peer: UnverifiedPeer | PublicPeer, now: in

assert peer.id is not None
if peer.info.can_retry(now):
self.connect_to(self.rng.choice(peer.info.entrypoints), peer)
addr = self.rng.choice(peer.info.entrypoints)
self.connect_to(addr.with_id(peer.id), peer)

def _connect_to_callback(
self,
protocol: IProtocol,
peer: Optional[UnverifiedPeer | PublicPeer],
peer: UnverifiedPeer | PublicPeer | None,
endpoint: IStreamClientEndpoint,
entrypoint: Entrypoint,
entrypoint: PeerEndpoint,
) -> None:
"""Called when we successfully connect to a peer."""
if isinstance(protocol, HathorProtocol):
protocol.on_outbound_connect(entrypoint)
protocol.on_outbound_connect(entrypoint, peer)
else:
assert isinstance(protocol, TLSMemoryBIOProtocol)
assert isinstance(protocol.wrappedProtocol, HathorProtocol)
protocol.wrappedProtocol.on_outbound_connect(entrypoint)
protocol.wrappedProtocol.on_outbound_connect(entrypoint, peer)
self.connecting_peers.pop(endpoint)

def connect_to(
self,
entrypoint: Entrypoint,
entrypoint: PeerEndpoint,
peer: UnverifiedPeer | PublicPeer | None = None,
use_ssl: bool | None = None,
) -> None:
Expand All @@ -618,24 +619,27 @@ def connect_to(
If `use_ssl` is True, then the connection will be wraped by a TLS.
"""
if entrypoint.peer_id is not None and peer is not None and str(entrypoint.peer_id) != peer.id:
if entrypoint.peer_id is not None and peer is not None and entrypoint.peer_id != peer.id:
self.log.debug('skipping because the entrypoint peer_id does not match the actual peer_id',
entrypoint=entrypoint)
entrypoint=str(entrypoint))
return

for connecting_peer in self.connecting_peers.values():
if connecting_peer.entrypoint.equals_ignore_peer_id(entrypoint):
self.log.debug('skipping because we are already connecting to this endpoint', entrypoint=entrypoint)
if connecting_peer.entrypoint.addr == entrypoint.addr:
self.log.debug(
'skipping because we are already connecting to this endpoint',
entrypoint=str(entrypoint),
)
return

if self.localhost_only and not entrypoint.is_localhost():
self.log.debug('skip because of simple localhost check', entrypoint=entrypoint)
if self.localhost_only and not entrypoint.addr.is_localhost():
self.log.debug('skip because of simple localhost check', entrypoint=str(entrypoint))
return

if use_ssl is None:
use_ssl = self.use_ssl

endpoint = entrypoint.to_client_endpoint(self.reactor)
endpoint = entrypoint.addr.to_client_endpoint(self.reactor)

factory: IProtocolFactory
if use_ssl:
Expand All @@ -650,9 +654,9 @@ def connect_to(
deferred = endpoint.connect(factory)
self.connecting_peers[endpoint] = _ConnectingPeer(entrypoint, deferred)

deferred.addCallback(self._connect_to_callback, peer, endpoint, entrypoint) # type: ignore
deferred.addErrback(self.on_connection_failure, peer, endpoint) # type: ignore
self.log.info('connect to', entrypoint=str(entrypoint), peer=str(peer))
deferred.addCallback(self._connect_to_callback, peer, endpoint, entrypoint)
deferred.addErrback(self.on_connection_failure, peer, endpoint)
self.log.info('connecting to', entrypoint=str(entrypoint), peer=str(peer))
self.pubsub.publish(
HathorEvents.NETWORK_PEER_CONNECTING,
peer=peer,
Expand Down Expand Up @@ -708,13 +712,13 @@ def update_hostname_entrypoints(self, *, old_hostname: str | None, new_hostname:
assert self.manager is not None
for address in self._listen_addresses:
if old_hostname is not None:
old_entrypoint = Entrypoint.from_hostname_address(old_hostname, address)
old_entrypoint = PeerAddress.from_hostname_address(old_hostname, address)
if old_entrypoint in self.my_peer.info.entrypoints:
self.my_peer.info.entrypoints.remove(old_entrypoint)
self._add_hostname_entrypoint(new_hostname, address)

def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Address) -> None:
hostname_entrypoint = Entrypoint.from_hostname_address(hostname, address)
hostname_entrypoint = PeerAddress.from_hostname_address(hostname, address)
self.my_peer.info.entrypoints.append(hostname_entrypoint)

def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol:
Expand Down
Loading

0 comments on commit 487e731

Please sign in to comment.