Skip to content

Commit

Permalink
refactor(p2p): move received peers storage to protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
jansegre committed Dec 6, 2024
1 parent 52a8fcb commit 2a12952
Show file tree
Hide file tree
Showing 21 changed files with 321 additions and 102 deletions.
9 changes: 9 additions & 0 deletions hathor/conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,9 @@ def GENESIS_TX2_TIMESTAMP(self) -> int:
# Maximum period without receiving any messages from ther peer (in seconds).
PEER_IDLE_TIMEOUT: int = 60

# Maximum number of entrypoints that we accept that a peer broadcasts
PEER_MAX_ENTRYPOINTS: int = 30

# Filepath of ca certificate file to generate connection certificates
CA_FILEPATH: str = os.path.join(os.path.dirname(__file__), '../p2p/ca.crt')

Expand Down Expand Up @@ -431,6 +434,12 @@ def GENESIS_TX2_TIMESTAMP(self) -> int:
# more than enough for the forseeable future
MAX_MEMPOOL_RECEIVING_TIPS: int = 1000

# Max number of peers simultanously stored in the node
MAX_VERIFIED_PEERS: int = 10_000

# Max number of peers simultanously stored per-connection
MAX_UNVERIFIED_PEERS_PER_CONN: int = 100

# Used to enable nano contracts.
#
# This should NEVER be enabled for mainnet and testnet, since both networks will
Expand Down
140 changes: 103 additions & 37 deletions hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from collections import deque
from typing import TYPE_CHECKING, Any, Iterable, NamedTuple, Optional

from structlog import get_logger
Expand All @@ -30,7 +31,7 @@
from hathor.p2p.peer_discovery import PeerDiscovery
from hathor.p2p.peer_endpoint import PeerAddress, PeerEndpoint
from hathor.p2p.peer_id import PeerId
from hathor.p2p.peer_storage import UnverifiedPeerStorage, VerifiedPeerStorage
from hathor.p2p.peer_storage import VerifiedPeerStorage
from hathor.p2p.protocol import HathorProtocol
from hathor.p2p.rate_limiter import RateLimiter
from hathor.p2p.states.ready import ReadyState
Expand Down Expand Up @@ -81,10 +82,10 @@ class GlobalRateLimiter:
manager: Optional['HathorManager']
connections: set[HathorProtocol]
connected_peers: dict[PeerId, HathorProtocol]
new_connection_from_queue: deque[PeerId]
connecting_peers: dict[IStreamClientEndpoint, _ConnectingPeer]
handshaking_peers: set[HathorProtocol]
whitelist_only: bool
unverified_peer_storage: UnverifiedPeerStorage
verified_peer_storage: VerifiedPeerStorage
_sync_factories: dict[SyncVersion, SyncAgentFactory]
_enabled_sync_versions: set[SyncVersion]
Expand Down Expand Up @@ -156,12 +157,12 @@ def __init__(
# List of peers connected and ready to communicate.
self.connected_peers = {}

# List of peers received from the network.
# We cannot trust their identity before we connect to them.
self.unverified_peer_storage = UnverifiedPeerStorage()
# Queue of ready peer-id's used by connect_to_peer_from_connection_queue to choose the next peer to pull a
# random new connection from
self.new_connection_from_queue = deque()

# List of known peers.
self.verified_peer_storage = VerifiedPeerStorage() # dict[string (peer.id), PublicPeer]
self.verified_peer_storage = VerifiedPeerStorage(rng=self.rng, max_size=self._settings.MAX_VERIFIED_PEERS)

# Maximum unseen time before removing a peer (seconds).
self.max_peer_unseen_dt: float = 30 * 60 # 30-minutes
Expand All @@ -181,6 +182,11 @@ def __init__(
# Timestamp of the last time sync was updated.
self._last_sync_rotate: float = 0.

# Connect to new peers in a timed loop, instead of as soon as possible
self.lc_connect = LoopingCall(self.connect_to_peer_from_connection_queue)
self.lc_connect.clock = self.reactor
self.lc_connect_interval = 0.2 # seconds

# A timer to try to reconnect to the disconnect known peers.
if self._settings.ENABLE_PEER_WHITELIST:
self.wl_reconnect = LoopingCall(self.update_whitelist)
Expand Down Expand Up @@ -272,7 +278,7 @@ def do_discovery(self) -> None:
Do a discovery and connect on all discovery strategies.
"""
for peer_discovery in self.peer_discoveries:
coro = peer_discovery.discover_and_connect(self.connect_to)
coro = peer_discovery.discover_and_connect(self.connect_to_endpoint)
Deferred.fromCoroutine(coro)

def disable_rate_limiter(self) -> None:
Expand All @@ -293,6 +299,7 @@ def start(self) -> None:
if self.manager is None:
raise TypeError('Class was built incorrectly without a HathorManager.')

self._start_peer_connect_loop()
self.lc_reconnect.start(5, now=False)
self.lc_sync_update.start(self.lc_sync_update_interval, now=False)

Expand All @@ -319,7 +326,28 @@ def _handle_whitelist_reconnect_err(self, *args: Any, **kwargs: Any) -> None:
self.log.error('whitelist reconnect had an exception. Start looping call again.', args=args, kwargs=kwargs)
self.reactor.callLater(30, self._start_whitelist_reconnect)

def _start_peer_connect_loop(self) -> None:
# The deferred returned by the LoopingCall start method
# executes when the looping call stops running
# https://docs.twistedmatrix.com/en/stable/api/twisted.internet.task.LoopingCall.html
d = self.lc_connect.start(self.lc_connect_interval, now=True)
d.addErrback(self._handle_peer_connect_err)

def _handle_peer_connect_err(self, *args: Any, **kwargs: Any) -> None:
# This method will be called when an exception happens inside the peer connect loop
# and ends up stopping the looping call.
# We log the error and start the looping call again.
self.log.error(
'connect_to_peer_from_connection_queue had an exception. Start looping call again.',
args=args,
kwargs=kwargs,
)
self.reactor.callLater(self.lc_connect_interval, self._start_peer_connect_loop)

def stop(self) -> None:
if self.lc_connect.running:
self.lc_connect.stop()

if self.lc_reconnect.running:
self.lc_reconnect.stop()

Expand Down Expand Up @@ -406,10 +434,10 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
"""Called when a peer is ready."""
assert protocol.peer is not None
self.verified_peer_storage.add_or_replace(protocol.peer)
assert protocol.peer.id is not None

self.handshaking_peers.remove(protocol)
self.unverified_peer_storage.pop(protocol.peer.id, None)
for conn in self.iter_all_connections():
conn.unverified_peer_storage.remove(protocol.peer)

# we emit the event even if it's a duplicate peer as a matching
# NETWORK_PEER_DISCONNECTED will be emitted regardless
Expand All @@ -419,7 +447,8 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
peers_count=self._get_peers_count()
)

if protocol.peer.id in self.connected_peers:
peer_id = protocol.peer.id
if peer_id in self.connected_peers:
# connected twice to same peer
self.log.warn('duplicate connection to peer', protocol=protocol)
conn = self.get_connection_to_drop(protocol)
Expand All @@ -428,15 +457,19 @@ def on_peer_ready(self, protocol: HathorProtocol) -> None:
# the new connection is being dropped, so don't save it to connected_peers
return

self.connected_peers[protocol.peer.id] = protocol
self.connected_peers[peer_id] = protocol
if peer_id not in self.new_connection_from_queue:
self.new_connection_from_queue.append(peer_id)
else:
self.log.warn('peer already in queue', peer=str(peer_id))

# In case it was a retry, we must reset the data only here, after it gets ready
protocol.peer.info.reset_retry_timestamp()

if len(self.connected_peers) <= self.MAX_ENABLED_SYNC:
protocol.enable_sync()

if protocol.peer.id in self.always_enable_sync:
if peer_id in self.always_enable_sync:
protocol.enable_sync()

# Notify other peers about this new peer connection.
Expand All @@ -456,7 +489,8 @@ def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
if protocol in self.handshaking_peers:
self.handshaking_peers.remove(protocol)
if protocol._peer is not None:
existing_protocol = self.connected_peers.pop(protocol.peer.id, None)
peer_id = protocol.peer.id
existing_protocol = self.connected_peers.pop(peer_id, None)
if existing_protocol is None:
# in this case, the connection was closed before it got to READY state
return
Expand All @@ -466,7 +500,10 @@ def on_peer_disconnect(self, protocol: HathorProtocol) -> None:
# A check for duplicate connections is done during PEER_ID state, but there's still a
# chance it can happen if both connections start at the same time and none of them has
# reached READY state while the other is on PEER_ID state
self.connected_peers[protocol.peer.id] = existing_protocol
self.connected_peers[peer_id] = existing_protocol
elif peer_id in self.new_connection_from_queue:
# now we're sure it can be removed from new_connection_from_queue
self.new_connection_from_queue.remove(peer_id)
self.pubsub.publish(
HathorEvents.NETWORK_PEER_DISCONNECTED,
protocol=protocol,
Expand Down Expand Up @@ -499,15 +536,6 @@ def is_peer_connected(self, peer_id: PeerId) -> bool:
"""
return peer_id in self.connected_peers

def on_receive_peer(self, peer: UnverifiedPeer, origin: Optional[ReadyState] = None) -> None:
""" Update a peer information in our storage, and instantly attempt to connect
to it if it is not connected yet.
"""
if peer.id == self.my_peer.id:
return
peer = self.unverified_peer_storage.add_or_merge(peer)
self.connect_to_if_not_connected(peer, int(self.reactor.seconds()))

def peers_cleanup(self) -> None:
"""Clean up aged peers."""
now = self.reactor.seconds()
Expand All @@ -523,11 +551,45 @@ def peers_cleanup(self) -> None:
for remove_peer in to_be_removed:
self.verified_peer_storage.remove(remove_peer)

def reconnect_to_all(self) -> None:
""" It is called by the `lc_reconnect` timer and tries to connect to all known
peers.
def connect_to_peer_from_connection_queue(self) -> None:
""" It is called by the `lc_connect` looping call and tries to connect to a new peer.
"""
if not self.new_connection_from_queue:
self.log.debug('connection queue is empty')
return
assert self.manager is not None
self.log.debug('connect to peer from connection queue')
candidate_new_peers: list[UnverifiedPeer]
# we don't know if we will find a candidate, so we can't do `while True:`
for _ in range(len(self.new_connection_from_queue)):
# for a deque([1, 2, 3, 4]) this will get 1 and modify it to deque([2, 3, 4, 1])
next_from_peer_id = self.new_connection_from_queue[0]
self.new_connection_from_queue.rotate(-1)

protocol = self.connected_peers.get(next_from_peer_id)
if protocol is None:
self.log.error('expected protocol not found', peer_id=str(next_from_peer_id))
assert self.new_connection_from_queue.pop() == next_from_peer_id
continue
candidate_new_peers = [
candidate_peer
for candidate_peer_id, candidate_peer in protocol.unverified_peer_storage.items()
if candidate_peer_id not in self.connected_peers or candidate_peer_id not in self.connecting_peers
]
if candidate_new_peers:
break
else:
self.log.debug('no new peers in the connection queue')
# this means we rotated through the whole queue and did not find any candidate
return

TODO(epnichols): Should we always connect to *all*? Should there be a max #?
peer = self.rng.choice(candidate_new_peers)
self.log.debug('random peer chosen', peer=str(peer.id), entrypoints=peer.info.entrypoints_as_str())
now = self.reactor.seconds()
self.connect_to_peer(peer, int(now))

def reconnect_to_all(self) -> None:
""" It is called by the `lc_reconnect` timer and tries to connect to all known peers.
"""
self.peers_cleanup()
# when we have no connected peers left, run the discovery process again
Expand All @@ -536,10 +598,10 @@ def reconnect_to_all(self) -> None:
if now - self._last_discovery >= self.PEER_DISCOVERY_INTERVAL:
self._last_discovery = now
self.do_discovery()
# We need to use list() here because the dict might change inside connect_to_if_not_connected
# We need to use list() here because the dict might change inside connect_to_peer
# when the peer is disconnected and without entrypoint
for peer in list(self.verified_peer_storage.values()):
self.connect_to_if_not_connected(peer, int(now))
self.connect_to_peer(peer, int(now))

def update_whitelist(self) -> Deferred[None]:
from twisted.web.client import readBody
Expand Down Expand Up @@ -582,7 +644,7 @@ def _update_whitelist_cb(self, body: bytes) -> None:
for peer_id in peers_to_remove:
self.manager.remove_peer_from_whitelist_and_disconnect(peer_id)

def connect_to_if_not_connected(self, peer: UnverifiedPeer | PublicPeer, now: int) -> None:
def connect_to_peer(self, peer: UnverifiedPeer | PublicPeer, now: int) -> None:
""" Attempts to connect if it is not connected to the peer.
"""
if not peer.info.entrypoints or (
Expand All @@ -602,15 +664,16 @@ def connect_to_if_not_connected(self, peer: UnverifiedPeer | PublicPeer, now: in
assert peer.id is not None
if peer.info.can_retry(now):
if self.enable_ipv6 and not self.disable_ipv4:
addr = self.rng.choice(peer.info.entrypoints)
addr = self.rng.choice(list(peer.info.entrypoints))
elif self.enable_ipv6 and self.disable_ipv4:
addr = self.rng.choice(peer.info.get_ipv6_only_entrypoints())
elif not self.enable_ipv6 and not self.disable_ipv4:
addr = self.rng.choice(peer.info.get_ipv4_only_entrypoints())
else:
raise ValueError('IPv4 is disabled and IPv6 is not enabled')

self.connect_to(addr.with_id(peer.id), peer)
self.connect_to_endpoint(addr.with_id(peer.id), peer)
else:
self.log.debug('connecting too often, skip retrying', peer=str(peer.id))

def _connect_to_callback(
self,
Expand All @@ -628,14 +691,17 @@ def _connect_to_callback(
protocol.wrappedProtocol.on_outbound_connect(entrypoint, peer)
self.connecting_peers.pop(endpoint)

def connect_to(
def connect_to_endpoint(
self,
entrypoint: PeerEndpoint,
peer: UnverifiedPeer | PublicPeer | None = None,
use_ssl: bool | None = None,
) -> None:
""" Attempt to connect to a peer, even if a connection already exists.
Usually you should call `connect_to_if_not_connected`.
""" Attempt to connect directly to an endpoint, prefer calling `connect_to_peer` when possible.
This method does not take into account the peer's id (since we might not even know it, or have verified it even
if we know). But this method will check if there's already a connection open to the given endpoint and skip it
if there is one.
If `use_ssl` is True, then the connection will be wraped by a TLS.
"""
Expand Down Expand Up @@ -747,7 +813,7 @@ def update_hostname_entrypoints(self, *, old_hostname: str | None, new_hostname:

def _add_hostname_entrypoint(self, hostname: str, address: IPv4Address | IPv6Address) -> None:
hostname_entrypoint = PeerAddress.from_hostname_address(hostname, address)
self.my_peer.info.entrypoints.append(hostname_entrypoint)
self.my_peer.info.entrypoints.add(hostname_entrypoint)

def get_connection_to_drop(self, protocol: HathorProtocol) -> HathorProtocol:
""" When there are duplicate connections, determine which one should be dropped.
Expand Down
Loading

0 comments on commit 2a12952

Please sign in to comment.