Skip to content

Commit

Permalink
chore[websocket]: improve websocket factory start (#1145)
Browse files Browse the repository at this point in the history
  • Loading branch information
luislhl authored Oct 8, 2024
1 parent 22f478c commit d85f88c
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 9 deletions.
5 changes: 2 additions & 3 deletions hathor/builder/resources_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -297,7 +297,6 @@ def create_resources(self) -> server.Site:
address_index=self.manager.tx_storage.indexes.addresses)
if self._args.disable_ws_history_streaming:
ws_factory.disable_history_streaming()
ws_factory.start()
root.putChild(b'ws', WebSocketResource(ws_factory))

if settings.CONSENSUS_ALGORITHM.is_pow():
Expand All @@ -322,8 +321,8 @@ def create_resources(self) -> server.Site:
status_server = SiteProfiler(real_root)
self.log.info('with status', listen=self._args.status, with_wallet_api=with_wallet_api)

# Set websocket factory in metrics
self.manager.metrics.websocket_factory = ws_factory
# Set websocket factory in metrics. It'll be started when the manager is started.
self.manager.websocket_factory = ws_factory

self._built_status = True
return status_server
2 changes: 1 addition & 1 deletion hathor/builder/sysctl_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ def build(self) -> Sysctl:
root.put_child('core', core)
root.put_child('p2p', ConnectionsManagerSysctl(self.artifacts.p2p_manager))

ws_factory = self.artifacts.manager.metrics.websocket_factory
ws_factory = self.artifacts.manager.websocket_factory
if ws_factory is not None:
root.put_child('ws', WebsocketManagerSysctl(ws_factory))

Expand Down
3 changes: 2 additions & 1 deletion hathor/cli/run_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ def prepare(self, *, register_resources: bool = True) -> None:

self.tx_storage = self.manager.tx_storage
self.wallet = self.manager.wallet
self.start_manager()

if self._args.stratum:
assert self.manager.stratum_factory is not None
Expand All @@ -219,6 +218,8 @@ def prepare(self, *, register_resources: bool = True) -> None:
assert status_server is not None
self.reactor.listenTCP(self._args.status, status_server)

self.start_manager()

from hathor.builder.builder import BuildArtifacts
self.artifacts = BuildArtifacts(
peer=self.manager.my_peer,
Expand Down
17 changes: 16 additions & 1 deletion hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import time
from cProfile import Profile
from enum import Enum
from typing import Iterator, NamedTuple, Optional, Union
from typing import TYPE_CHECKING, Iterator, NamedTuple, Optional, Union

from hathorlib.base_transaction import tx_or_block_from_bytes as lib_tx_or_block_from_bytes
from structlog import get_logger
Expand Down Expand Up @@ -65,6 +65,9 @@
from hathor.vertex_handler import VertexHandler
from hathor.wallet import BaseWallet

if TYPE_CHECKING:
from hathor.websocket.factory import HathorAdminWebsocketFactory

logger = get_logger()
cpu = get_cpu_profiler()

Expand Down Expand Up @@ -117,6 +120,8 @@ def __init__(
full_verification: bool = False,
enable_event_queue: bool = False,
poa_block_producer: PoaBlockProducer | None = None,
# Websocket factory
websocket_factory: Optional['HathorAdminWebsocketFactory'] = None
) -> None:
"""
:param reactor: Twisted reactor which handles the mainloop and the events.
Expand Down Expand Up @@ -199,12 +204,15 @@ def __init__(
self.vertex_handler = vertex_handler
self.vertex_parser = vertex_parser

self.websocket_factory = websocket_factory

self.metrics = Metrics(
pubsub=self.pubsub,
avg_time_between_blocks=settings.AVG_TIME_BETWEEN_BLOCKS,
connections=self.connections,
tx_storage=self.tx_storage,
reactor=self.reactor,
websocket_factory=self.websocket_factory,
)

self.wallet = wallet
Expand Down Expand Up @@ -322,6 +330,10 @@ def start(self) -> None:
self.tx_storage.set_allow_scope(TxAllowScope.VALID)
self.tx_storage.enable_lock()

# Preferably start before self.metrics
if self.websocket_factory:
self.websocket_factory.start()

# Metric starts to capture data
self.metrics.start()

Expand Down Expand Up @@ -360,6 +372,9 @@ def stop(self) -> Deferred:
# Metric stops to capture data
self.metrics.stop()

if self.websocket_factory:
self.websocket_factory.stop()

if self.lc_check_sync_state.running:
self.lc_check_sync_state.stop()

Expand Down
2 changes: 2 additions & 0 deletions hathor/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ def set_websocket_data(self) -> None:
""" Set websocket metrics data. Connections and addresses subscribed.
"""
if self.websocket_factory:
assert self.websocket_factory.is_running, 'Websocket factory has not been started'

self.websocket_connections = len(self.websocket_factory.connections)
self.subscribed_addresses = len(self.websocket_factory.address_connections)

Expand Down
16 changes: 16 additions & 0 deletions tests/cli/test_run_node.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from unittest.mock import ANY, patch

from hathor.cli.run_node import RunNode
from tests import unittest

Expand All @@ -15,3 +17,17 @@ def register_signal_handlers(self) -> None:

run_node = CustomRunNode(argv=['--memory-storage'])
self.assertTrue(run_node is not None)

@patch('twisted.internet.reactor.listenTCP')
def test_listen_tcp_ipv4(self, mock_listenTCP):
class CustomRunNode(RunNode):
def start_manager(self) -> None:
pass

def register_signal_handlers(self) -> None:
pass

run_node = CustomRunNode(argv=['--memory-storage', '--status', '1234'])
self.assertTrue(run_node is not None)

mock_listenTCP.assert_called_with(1234, ANY)
6 changes: 3 additions & 3 deletions tests/cli/test_sysctl_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def test_sysctl_builder_fail_with_invalid_property(self):
# prepare to register only p2p commands
artifacts = Mock(**{
'p2p_manager': Mock(),
'manager.metrics.websocket_factory.return_value': None
'manager.websocket_factory.return_value': None
})

with self.assertRaises(SysctlEntryNotFound) as context:
Expand Down Expand Up @@ -68,7 +68,7 @@ def test_sysctl_builder_fail_with_invalid_value(self):
# prepare to register only p2p commands
artifacts = Mock(**{
'p2p_manager': Mock(),
'manager.metrics.websocket_factory.return_value': None
'manager.websocket_factory.return_value': None
})

with self.assertRaises(SysctlRunnerException) as context:
Expand All @@ -85,7 +85,7 @@ def test_syctl_init_file_fail_with_empty_or_invalid_file(self):
# prepare to register only p2p commands
artifacts = Mock(**{
'p2p_manager': Mock(),
'manager.metrics.websocket_factory.return_value': None
'manager.websocket_factory.return_value': None
})

with self.assertRaises(AssertionError):
Expand Down

0 comments on commit d85f88c

Please sign in to comment.