Skip to content

Commit

Permalink
feat(sync-v2): sync-v2 implemented, sync-v1 still default
Browse files Browse the repository at this point in the history
Co-authored-by: Marcelo Salhab Brogliato <msbrogli@gmail.com>
Co-authored-by: Pedro Ferreira <phsf.pedro@gmail.com>
  • Loading branch information
3 people committed Jul 13, 2023
1 parent d920df5 commit fffecfe
Show file tree
Hide file tree
Showing 22 changed files with 2,940 additions and 169 deletions.
9 changes: 3 additions & 6 deletions hathor/indexes/deps_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,7 @@ def get_scope(self) -> Scope:
return SCOPE

def init_loop_step(self, tx: BaseTransaction) -> None:
tx_meta = tx.get_metadata()
if tx_meta.voided_by:
return
self.add_tx(tx, partial=False)
self.add_tx(tx)

def update(self, tx: BaseTransaction) -> None:
assert tx.hash is not None
Expand Down Expand Up @@ -193,6 +190,6 @@ def remove_from_needed_index(self, tx: bytes) -> None:
raise NotImplementedError

@abstractmethod
def get_next_needed_tx(self) -> bytes:
"""Choose the start hash for downloading the needed txs"""
def iter_next_needed_txs(self) -> Iterator[bytes]:
"""Iterate over the next needed transactions."""
raise NotImplementedError
54 changes: 37 additions & 17 deletions hathor/indexes/memory_deps_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ class MemoryDepsIndex(DepsIndex):
_txs_with_deps_ready: set[bytes]

# Next to be downloaded
# - Key: hash of the tx to be downloaded
# - Value[0]: height
# - Value[1]: hash of the tx waiting for the download
_needed_txs_index: dict[bytes, tuple[int, bytes]]

def __init__(self):
Expand All @@ -49,10 +52,11 @@ def force_clear(self) -> None:
self._needed_txs_index = {}

def add_tx(self, tx: BaseTransaction, partial: bool = True) -> None:
assert tx.hash is not None
assert tx.storage is not None
validation = tx.get_metadata().validation
if validation.is_fully_connected():
# discover if new txs are ready because of this tx
self._update_new_deps_ready(tx)
# finally remove from rev deps
self._del_from_deps_index(tx)
elif not partial:
raise ValueError('partial=False will only accept fully connected transactions')
Expand All @@ -63,6 +67,19 @@ def add_tx(self, tx: BaseTransaction, partial: bool = True) -> None:
def del_tx(self, tx: BaseTransaction) -> None:
self._del_from_deps_index(tx)

def _update_new_deps_ready(self, tx: BaseTransaction) -> None:
"""Go over the reverse dependencies of tx and check if any of them are now ready to be validated.
This is also idempotent.
"""
assert tx.hash is not None
assert tx.storage is not None
for candidate_hash in self._rev_dep_index.get(tx.hash, []):
with tx.storage.allow_partially_validated_context():
candidate_tx = tx.storage.get_transaction(candidate_hash)
if candidate_tx.is_ready_for_validation():
self._txs_with_deps_ready.add(candidate_hash)

def _add_deps(self, tx: BaseTransaction) -> None:
"""This method is idempotent, because self.update needs it to be indempotent."""
assert tx.hash is not None
Expand Down Expand Up @@ -94,7 +111,9 @@ def next_ready_for_validation(self, tx_storage: 'TransactionStorage', *, dry_run
else:
cur_ready, self._txs_with_deps_ready = self._txs_with_deps_ready, set()
while cur_ready:
yield from sorted(cur_ready, key=lambda tx_hash: tx_storage.get_transaction(tx_hash).timestamp)
with tx_storage.allow_partially_validated_context():
sorted_cur_ready = sorted(cur_ready, key=lambda tx_hash: tx_storage.get_transaction(tx_hash).timestamp)
yield from sorted_cur_ready
if dry_run:
cur_ready = self._txs_with_deps_ready - cur_ready
else:
Expand All @@ -113,7 +132,8 @@ def _get_rev_deps(self, tx: bytes) -> frozenset[bytes]:
def known_children(self, tx: BaseTransaction) -> list[bytes]:
assert tx.hash is not None
assert tx.storage is not None
it_rev_deps = map(tx.storage.get_transaction, self._get_rev_deps(tx.hash))
with tx.storage.allow_partially_validated_context():
it_rev_deps = map(tx.storage.get_transaction, self._get_rev_deps(tx.hash))
return [not_none(rev.hash) for rev in it_rev_deps if tx.hash in rev.parents]

# needed-txs-index methods:
Expand All @@ -127,18 +147,13 @@ def is_tx_needed(self, tx: bytes) -> bool:
def remove_from_needed_index(self, tx: bytes) -> None:
self._needed_txs_index.pop(tx, None)

def get_next_needed_tx(self) -> bytes:
# This strategy maximizes the chance to download multiple txs on the same stream
# find the tx with highest "height"
# XXX: we could cache this onto `needed_txs` so we don't have to fetch txs every time
# TODO: improve this by using some sorted data structure to make this better than O(n)
height, start_hash, tx = max((h, s, t) for t, (h, s) in self._needed_txs_index.items())
self.log.debug('next needed tx start', needed=len(self._needed_txs_index), start=start_hash.hex(),
height=height, needed_tx=tx.hex())
return start_hash
def iter_next_needed_txs(self) -> Iterator[bytes]:
for tx_hash, _ in self._needed_txs_index.items():
yield tx_hash

def _add_needed(self, tx: BaseTransaction) -> None:
"""This method is idempotent, because self.update needs it to be indempotent."""
assert tx.hash is not None
assert tx.storage is not None
tx_storage = tx.storage

Expand All @@ -147,9 +162,14 @@ def _add_needed(self, tx: BaseTransaction) -> None:
# get_all_dependencies is needed to ensure that we get the inputs that aren't reachable through parents alone,
# this can happen for inputs that have not been confirmed as of the block the confirms the block or transaction
# that we're adding the dependencies of
for tx_hash in tx.get_all_dependencies():
for dep_hash in tx.get_all_dependencies():
# It may happen that we have one of the dependencies already, so just add the ones we don't
# have. We should add at least one dependency, otherwise this tx should be full validated
if not tx_storage.transaction_exists(tx_hash):
self.log.debug('tx parent is needed', tx=tx_hash.hex())
self._needed_txs_index[tx_hash] = (height, not_none(tx.hash))
with tx_storage.allow_partially_validated_context():
tx_exists = tx_storage.transaction_exists(dep_hash)
if not tx_exists:
self.log.debug('tx parent is needed', tx=dep_hash.hex())
self._needed_txs_index[dep_hash] = (height, not_none(tx.hash))

# also, remove the given transaction from needed, because we already have it
self._needed_txs_index.pop(tx.hash, None)
11 changes: 3 additions & 8 deletions hathor/indexes/rocksdb_deps_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,11 +351,6 @@ def remove_from_needed_index(self, tx: bytes) -> None:
key_needed = self._to_key_needed(tx)
self._db.delete((self._cf, key_needed))

def get_next_needed_tx(self) -> bytes:
# This strategy maximizes the chance to download multiple txs on the same stream
# Find the tx with highest "height"
# XXX: we could cache this onto `needed_txs` so we don't have to fetch txs every time
# TODO: improve this by using some sorted data structure to make this better than O(n)
height, start_hash, tx = max((h, s, t) for t, h, s in self._iter_needed())
self.log.debug('next needed tx start', start=start_hash.hex(), height=height, needed_tx=tx.hex())
return start_hash
def iter_next_needed_txs(self) -> Iterator[bytes]:
for tx_hash, _, __ in self._iter_needed():
yield tx_hash
28 changes: 19 additions & 9 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,6 @@ def _initialize_components_full_verification(self) -> None:

self.log.debug('load blocks and transactions')
for tx in self.tx_storage._topological_sort_dfs():
tx.update_initial_metadata()

assert tx.hash is not None

tx_meta = tx.get_metadata()
Expand Down Expand Up @@ -433,7 +431,14 @@ def _initialize_components_full_verification(self) -> None:

try:
# TODO: deal with invalid tx
tx.calculate_height()
tx._update_parents_children_metadata()

if tx.can_validate_full():
tx.update_initial_metadata()
tx.calculate_min_height()
if tx.is_genesis:
assert tx.validate_checkpoint(self.checkpoints)
assert tx.validate_full(skip_block_weight_verification=skip_block_weight_verification)
self.tx_storage.add_to_indexes(tx)
with self.tx_storage.allow_only_valid_context():
Expand Down Expand Up @@ -934,12 +939,11 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
"""
assert self.tx_storage.is_only_valid_allowed()
assert tx.hash is not None

already_exists = False
if self.tx_storage.transaction_exists(tx.hash):
self.tx_storage.compare_bytes_with_local_tx(tx)
if not fails_silently:
raise InvalidNewTransaction('Transaction already exists {}'.format(tx.hash_hex))
self.log.warn('on_new_tx(): Transaction already exists', tx=tx.hash_hex)
return False
already_exists = True

if tx.timestamp - self.reactor.seconds() > settings.MAX_FUTURE_TIMESTAMP_ALLOWED:
if not fails_silently:
Expand All @@ -956,8 +960,14 @@ def on_new_tx(self, tx: BaseTransaction, *, conn: Optional[HathorProtocol] = Non
metadata = tx.get_metadata()
except TransactionDoesNotExist:
if not fails_silently:
raise InvalidNewTransaction('missing parent')
self.log.warn('on_new_tx(): missing parent', tx=tx.hash_hex)
raise InvalidNewTransaction('cannot get metadata')
self.log.warn('on_new_tx(): cannot get metadata', tx=tx.hash_hex)
return False

if already_exists and metadata.validation.is_fully_connected():
if not fails_silently:
raise InvalidNewTransaction('Transaction already exists {}'.format(tx.hash_hex))
self.log.warn('on_new_tx(): Transaction already exists', tx=tx.hash_hex)
return False

if metadata.validation.is_invalid():
Expand Down Expand Up @@ -1044,7 +1054,7 @@ def sync_v2_step_validations(self, txs: Iterable[BaseTransaction], *, quiet: boo
try:
# XXX: `reject_locked_reward` might not apply, partial validation is only used on sync-v2
# TODO: deal with `reject_locked_reward` on sync-v2
assert tx.validate_full(reject_locked_reward=True)
assert tx.validate_full(reject_locked_reward=False)
except (AssertionError, HathorError):
# TODO
raise
Expand Down
3 changes: 2 additions & 1 deletion hathor/p2p/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ def __init__(self,
enable_sync_v1_1: bool) -> None:
from hathor.p2p.sync_v1.factory_v1_0 import SyncV10Factory
from hathor.p2p.sync_v1.factory_v1_1 import SyncV11Factory
from hathor.p2p.sync_v2.factory import SyncV2Factory

if not (enable_sync_v1 or enable_sync_v1_1 or enable_sync_v2):
raise TypeError(f'{type(self).__name__}() at least one sync version is required')
Expand Down Expand Up @@ -185,7 +186,7 @@ def __init__(self,
if enable_sync_v1_1:
self._sync_factories[SyncVersion.V1_1] = SyncV11Factory(self)
if enable_sync_v2:
self._sync_factories[SyncVersion.V2] = SyncV10Factory(self)
self._sync_factories[SyncVersion.V2] = SyncV2Factory(self)

def set_manager(self, manager: 'HathorManager') -> None:
"""Set the manager. This method must be called before start()."""
Expand Down
2 changes: 2 additions & 0 deletions hathor/p2p/messages.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ class ProtocolMessages(Enum):
BEST_BLOCK = 'BEST-BLOCK' # Send the best block to your peer

GET_BLOCK_TXS = 'GET-BLOCK-TXS' # TODO: rename, maybe GET-TX-RANGE or repurpose GET-TRANSACTIONS above
GET_TRANSACTIONS_BFS = 'GET-TRANSACTIONS-BFS'
TRANSACTION = 'TRANSACTION'
TRANSACTIONS_END = 'TRANSACTIONS-END'

GET_MEMPOOL = 'GET-MEMPOOL' # TODO: rename, maybe GET-TX-RANGE or repurpose GET-TRANSACTIONS above
MEMPOOL_END = 'MEMPOOL-END' # End of mempool sync
Expand Down
Empty file added hathor/p2p/sync_v2/__init__.py
Empty file.
32 changes: 32 additions & 0 deletions hathor/p2p/sync_v2/factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright 2021 Hathor Labs
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import TYPE_CHECKING, Optional

from hathor.p2p.manager import ConnectionsManager
from hathor.p2p.sync_factory import SyncManagerFactory
from hathor.p2p.sync_manager import SyncManager
from hathor.p2p.sync_v2.manager import NodeBlockSync
from hathor.util import Reactor

if TYPE_CHECKING:
from hathor.p2p.protocol import HathorProtocol


class SyncV2Factory(SyncManagerFactory):
def __init__(self, connections: ConnectionsManager):
self.connections = connections

def create_sync_manager(self, protocol: 'HathorProtocol', reactor: Optional[Reactor] = None) -> SyncManager:
return NodeBlockSync(protocol, reactor=reactor)
Loading

0 comments on commit fffecfe

Please sign in to comment.