diff --git a/hathor/builder/resources_builder.py b/hathor/builder/resources_builder.py index ce453aef6..f067cc3d9 100644 --- a/hathor/builder/resources_builder.py +++ b/hathor/builder/resources_builder.py @@ -297,7 +297,6 @@ def create_resources(self) -> server.Site: address_index=self.manager.tx_storage.indexes.addresses) if self._args.disable_ws_history_streaming: ws_factory.disable_history_streaming() - ws_factory.start() root.putChild(b'ws', WebSocketResource(ws_factory)) if settings.CONSENSUS_ALGORITHM.is_pow(): @@ -322,8 +321,8 @@ def create_resources(self) -> server.Site: status_server = SiteProfiler(real_root) self.log.info('with status', listen=self._args.status, with_wallet_api=with_wallet_api) - # Set websocket factory in metrics - self.manager.metrics.websocket_factory = ws_factory + # Set websocket factory in metrics. It'll be started when the manager is started. + self.manager.websocket_factory = ws_factory self._built_status = True return status_server diff --git a/hathor/builder/sysctl_builder.py b/hathor/builder/sysctl_builder.py index 0b2131ad8..206547458 100644 --- a/hathor/builder/sysctl_builder.py +++ b/hathor/builder/sysctl_builder.py @@ -38,7 +38,7 @@ def build(self) -> Sysctl: root.put_child('core', core) root.put_child('p2p', ConnectionsManagerSysctl(self.artifacts.p2p_manager)) - ws_factory = self.artifacts.manager.metrics.websocket_factory + ws_factory = self.artifacts.manager.websocket_factory if ws_factory is not None: root.put_child('ws', WebsocketManagerSysctl(ws_factory)) diff --git a/hathor/cli/run_node.py b/hathor/cli/run_node.py index 88489fa8a..9dbc3005c 100644 --- a/hathor/cli/run_node.py +++ b/hathor/cli/run_node.py @@ -198,7 +198,6 @@ def prepare(self, *, register_resources: bool = True) -> None: self.tx_storage = self.manager.tx_storage self.wallet = self.manager.wallet - self.start_manager() if self._args.stratum: assert self.manager.stratum_factory is not None @@ -219,6 +218,8 @@ def prepare(self, *, register_resources: bool = True) -> None: assert status_server is not None self.reactor.listenTCP(self._args.status, status_server) + self.start_manager() + from hathor.builder.builder import BuildArtifacts self.artifacts = BuildArtifacts( peer=self.manager.my_peer, diff --git a/hathor/manager.py b/hathor/manager.py index 7e702c73c..1edcf7592 100644 --- a/hathor/manager.py +++ b/hathor/manager.py @@ -17,7 +17,7 @@ import time from cProfile import Profile from enum import Enum -from typing import Iterator, NamedTuple, Optional, Union +from typing import TYPE_CHECKING, Iterator, NamedTuple, Optional, Union from hathorlib.base_transaction import tx_or_block_from_bytes as lib_tx_or_block_from_bytes from structlog import get_logger @@ -65,6 +65,9 @@ from hathor.vertex_handler import VertexHandler from hathor.wallet import BaseWallet +if TYPE_CHECKING: + from hathor.websocket.factory import HathorAdminWebsocketFactory + logger = get_logger() cpu = get_cpu_profiler() @@ -117,6 +120,8 @@ def __init__( full_verification: bool = False, enable_event_queue: bool = False, poa_block_producer: PoaBlockProducer | None = None, + # Websocket factory + websocket_factory: Optional['HathorAdminWebsocketFactory'] = None ) -> None: """ :param reactor: Twisted reactor which handles the mainloop and the events. @@ -199,12 +204,15 @@ def __init__( self.vertex_handler = vertex_handler self.vertex_parser = vertex_parser + self.websocket_factory = websocket_factory + self.metrics = Metrics( pubsub=self.pubsub, avg_time_between_blocks=settings.AVG_TIME_BETWEEN_BLOCKS, connections=self.connections, tx_storage=self.tx_storage, reactor=self.reactor, + websocket_factory=self.websocket_factory, ) self.wallet = wallet @@ -322,6 +330,10 @@ def start(self) -> None: self.tx_storage.set_allow_scope(TxAllowScope.VALID) self.tx_storage.enable_lock() + # Preferably start before self.metrics + if self.websocket_factory: + self.websocket_factory.start() + # Metric starts to capture data self.metrics.start() @@ -360,6 +372,9 @@ def stop(self) -> Deferred: # Metric stops to capture data self.metrics.stop() + if self.websocket_factory: + self.websocket_factory.stop() + if self.lc_check_sync_state.running: self.lc_check_sync_state.stop() diff --git a/hathor/metrics.py b/hathor/metrics.py index cc72ce9e0..0f3296857 100644 --- a/hathor/metrics.py +++ b/hathor/metrics.py @@ -217,6 +217,8 @@ def set_websocket_data(self) -> None: """ Set websocket metrics data. Connections and addresses subscribed. """ if self.websocket_factory: + assert self.websocket_factory.is_running, 'Websocket factory has not been started' + self.websocket_connections = len(self.websocket_factory.connections) self.subscribed_addresses = len(self.websocket_factory.address_connections) diff --git a/tests/cli/test_run_node.py b/tests/cli/test_run_node.py index d613f8163..3b72a2592 100644 --- a/tests/cli/test_run_node.py +++ b/tests/cli/test_run_node.py @@ -1,3 +1,5 @@ +from unittest.mock import ANY, patch + from hathor.cli.run_node import RunNode from tests import unittest @@ -15,3 +17,17 @@ def register_signal_handlers(self) -> None: run_node = CustomRunNode(argv=['--memory-storage']) self.assertTrue(run_node is not None) + + @patch('twisted.internet.reactor.listenTCP') + def test_listen_tcp_ipv4(self, mock_listenTCP): + class CustomRunNode(RunNode): + def start_manager(self) -> None: + pass + + def register_signal_handlers(self) -> None: + pass + + run_node = CustomRunNode(argv=['--memory-storage', '--status', '1234']) + self.assertTrue(run_node is not None) + + mock_listenTCP.assert_called_with(1234, ANY) diff --git a/tests/cli/test_sysctl_init.py b/tests/cli/test_sysctl_init.py index b71da9d1e..2063d7f76 100644 --- a/tests/cli/test_sysctl_init.py +++ b/tests/cli/test_sysctl_init.py @@ -39,7 +39,7 @@ def test_sysctl_builder_fail_with_invalid_property(self): # prepare to register only p2p commands artifacts = Mock(**{ 'p2p_manager': Mock(), - 'manager.metrics.websocket_factory.return_value': None + 'manager.websocket_factory.return_value': None }) with self.assertRaises(SysctlEntryNotFound) as context: @@ -68,7 +68,7 @@ def test_sysctl_builder_fail_with_invalid_value(self): # prepare to register only p2p commands artifacts = Mock(**{ 'p2p_manager': Mock(), - 'manager.metrics.websocket_factory.return_value': None + 'manager.websocket_factory.return_value': None }) with self.assertRaises(SysctlRunnerException) as context: @@ -85,7 +85,7 @@ def test_syctl_init_file_fail_with_empty_or_invalid_file(self): # prepare to register only p2p commands artifacts = Mock(**{ 'p2p_manager': Mock(), - 'manager.metrics.websocket_factory.return_value': None + 'manager.websocket_factory.return_value': None }) with self.assertRaises(AssertionError):