Skip to content

Commit

Permalink
feat(indexes): make sync-v1 indexes optional
Browse files Browse the repository at this point in the history
  • Loading branch information
jansegre committed Apr 17, 2023
1 parent 8a78229 commit 89a5fb5
Show file tree
Hide file tree
Showing 10 changed files with 224 additions and 91 deletions.
17 changes: 16 additions & 1 deletion hathor/indexes/height_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,17 @@
# limitations under the License.

from abc import abstractmethod
from typing import List, NamedTuple, Optional, Tuple
from typing import TYPE_CHECKING, List, NamedTuple, Optional, Tuple

from hathor.indexes.base_index import BaseIndex
from hathor.indexes.scope import Scope
from hathor.transaction import BaseTransaction, Block
from hathor.transaction.genesis import BLOCK_GENESIS
from hathor.util import not_none

if TYPE_CHECKING: # pragma: no cover
from hathor.transaction.storage import TransactionStorage

SCOPE = Scope(
include_blocks=True,
include_txs=False,
Expand Down Expand Up @@ -78,6 +81,18 @@ def get(self, height: int) -> Optional[bytes]:
"""
raise NotImplementedError

def find_by_timestamp(self, timestamp: float, tx_storage: 'TransactionStorage') -> Optional[Block]:
""" This method starts from the tip and advances to the parent until it finds a block with lower timestamp.
"""
# TODO: optimize
if timestamp < BLOCK_GENESIS.timestamp:
return None
block = tx_storage.get_transaction(self.get_tip())
assert isinstance(block, Block)
while block.timestamp > timestamp:
block = block.get_block_parent()
return block

@abstractmethod
def get_tip(self) -> bytes:
""" Return the best block hash, it returns the genesis when there is no other block
Expand Down
75 changes: 50 additions & 25 deletions hathor/indexes/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,12 @@ class IndexesManager(ABC):
log = get_logger()

info: InfoIndex
all_tips: TipsIndex
block_tips: TipsIndex
tx_tips: TipsIndex

all_tips: Optional[TipsIndex]
block_tips: Optional[TipsIndex]
tx_tips: Optional[TipsIndex]
sorted_all: TimestampIndex
sorted_blocks: TimestampIndex
sorted_txs: TimestampIndex

height: HeightIndex
deps: Optional[DepsIndex]
mempool_tips: Optional[MempoolTipsIndex]
Expand Down Expand Up @@ -98,6 +96,11 @@ def iter_all_indexes(self) -> Iterator[BaseIndex]:
self.utxo,
])

@abstractmethod
def enable_tips_indexes(self) -> None:
"""Enable tips indexs. It does nothing if it has already been enabled."""
raise NotImplementedError

@abstractmethod
def enable_address_index(self, pubsub: 'PubSubManager') -> None:
"""Enable address index. It does nothing if it has already been enabled."""
Expand Down Expand Up @@ -209,18 +212,21 @@ def add_tx(self, tx: BaseTransaction) -> bool:

# These two calls return False when a transaction changes from
# voided to executed and vice-versa.
r1 = self.all_tips.add_tx(tx)
r2 = self.sorted_all.add_tx(tx)
assert r1 == r2
r1 = self.sorted_all.add_tx(tx)
if self.all_tips is not None:
r2 = self.all_tips.add_tx(tx)
assert r1 == r2

if tx.is_block:
r3 = self.block_tips.add_tx(tx)
r4 = self.sorted_blocks.add_tx(tx)
assert r3 == r4
r3 = self.sorted_blocks.add_tx(tx)
if self.block_tips is not None:
r4 = self.block_tips.add_tx(tx)
assert r3 == r4
else:
r3 = self.tx_tips.add_tx(tx)
r4 = self.sorted_txs.add_tx(tx)
assert r3 == r4
r3 = self.sorted_txs.add_tx(tx)
if self.tx_tips is not None:
r4 = self.tx_tips.add_tx(tx)
assert r3 == r4

if self.addresses:
self.addresses.add_tx(tx)
Expand Down Expand Up @@ -250,7 +256,8 @@ def del_tx(self, tx: BaseTransaction, *, remove_all: bool = False, relax_assert:
# We delete from indexes in two cases: (i) mark tx as voided, and (ii) remove tx.
# We only remove tx from all_tips and sorted_all when it is removed from the storage.
# For clarity, when a tx is marked as voided, it is not removed from all_tips and sorted_all.
self.all_tips.del_tx(tx, relax_assert=relax_assert)
if self.all_tips is not None:
self.all_tips.del_tx(tx, relax_assert=relax_assert)
self.sorted_all.del_tx(tx)
if self.addresses:
self.addresses.remove_tx(tx)
Expand All @@ -264,11 +271,13 @@ def del_tx(self, tx: BaseTransaction, *, remove_all: bool = False, relax_assert:
self.mempool_tips.update(tx, remove=True)

if tx.is_block:
self.block_tips.del_tx(tx, relax_assert=relax_assert)
self.sorted_blocks.del_tx(tx)
if self.block_tips is not None:
self.block_tips.del_tx(tx, relax_assert=relax_assert)
else:
self.tx_tips.del_tx(tx, relax_assert=relax_assert)
self.sorted_txs.del_tx(tx)
if self.tx_tips is not None:
self.tx_tips.del_tx(tx, relax_assert=relax_assert)

if self.tokens:
self.tokens.del_tx(tx)
Expand All @@ -283,12 +292,11 @@ def __init__(self) -> None:
from hathor.indexes.memory_height_index import MemoryHeightIndex
from hathor.indexes.memory_info_index import MemoryInfoIndex
from hathor.indexes.memory_timestamp_index import MemoryTimestampIndex
from hathor.indexes.memory_tips_index import MemoryTipsIndex

self.info = MemoryInfoIndex()
self.all_tips = MemoryTipsIndex(scope_type=TipsScopeType.ALL)
self.block_tips = MemoryTipsIndex(scope_type=TipsScopeType.BLOCKS)
self.tx_tips = MemoryTipsIndex(scope_type=TipsScopeType.TXS)
self.all_tips = None
self.block_tips = None
self.tx_tips = None

self.sorted_all = MemoryTimestampIndex(scope_type=TimestampScopeType.ALL)
self.sorted_blocks = MemoryTimestampIndex(scope_type=TimestampScopeType.BLOCKS)
Expand All @@ -304,6 +312,15 @@ def __init__(self) -> None:
# XXX: this has to be at the end of __init__, after everything has been initialized
self.__init_checks__()

def enable_tips_indexes(self) -> None:
from hathor.indexes.memory_tips_index import MemoryTipsIndex
if self.all_tips is None:
self.all_tips = MemoryTipsIndex(scope_type=TipsScopeType.ALL)
if self.block_tips is None:
self.block_tips = MemoryTipsIndex(scope_type=TipsScopeType.BLOCKS)
if self.tx_tips is None:
self.tx_tips = MemoryTipsIndex(scope_type=TipsScopeType.TXS)

def enable_address_index(self, pubsub: 'PubSubManager') -> None:
from hathor.indexes.memory_address_index import MemoryAddressIndex
if self.addresses is None:
Expand Down Expand Up @@ -332,7 +349,6 @@ def enable_deps_index(self) -> None:

class RocksDBIndexesManager(IndexesManager):
def __init__(self, db: 'rocksdb.DB') -> None:
from hathor.indexes.partial_rocksdb_tips_index import PartialRocksDBTipsIndex
from hathor.indexes.rocksdb_height_index import RocksDBHeightIndex
from hathor.indexes.rocksdb_info_index import RocksDBInfoIndex
from hathor.indexes.rocksdb_timestamp_index import RocksDBTimestampIndex
Expand All @@ -341,9 +357,9 @@ def __init__(self, db: 'rocksdb.DB') -> None:

self.info = RocksDBInfoIndex(self._db)
self.height = RocksDBHeightIndex(self._db)
self.all_tips = PartialRocksDBTipsIndex(self._db, scope_type=TipsScopeType.ALL)
self.block_tips = PartialRocksDBTipsIndex(self._db, scope_type=TipsScopeType.BLOCKS)
self.tx_tips = PartialRocksDBTipsIndex(self._db, scope_type=TipsScopeType.TXS)
self.all_tips = None
self.block_tips = None
self.tx_tips = None

self.sorted_all = RocksDBTimestampIndex(self._db, scope_type=TimestampScopeType.ALL)
self.sorted_blocks = RocksDBTimestampIndex(self._db, scope_type=TimestampScopeType.BLOCKS)
Expand All @@ -358,6 +374,15 @@ def __init__(self, db: 'rocksdb.DB') -> None:
# XXX: this has to be at the end of __init__, after everything has been initialized
self.__init_checks__()

def enable_tips_indexes(self) -> None:
from hathor.indexes.partial_rocksdb_tips_index import PartialRocksDBTipsIndex
if self.all_tips is None:
self.all_tips = PartialRocksDBTipsIndex(self._db, scope_type=TipsScopeType.ALL)
if self.block_tips is None:
self.block_tips = PartialRocksDBTipsIndex(self._db, scope_type=TipsScopeType.BLOCKS)
if self.tx_tips is None:
self.tx_tips = PartialRocksDBTipsIndex(self._db, scope_type=TipsScopeType.TXS)

def enable_address_index(self, pubsub: 'PubSubManager') -> None:
from hathor.indexes.rocksdb_address_index import RocksDBAddressIndex
if self.addresses is None:
Expand Down
40 changes: 40 additions & 0 deletions hathor/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,10 @@ def __init__(self, reactor: Reactor, *, pubsub: PubSubManager, peer_id: Optional

self._event_manager = event_manager

if enable_sync_v1:
assert self.tx_storage.indexes is not None
self.log.debug('enable sync-v1 indexes')
self.tx_storage.indexes.enable_tips_indexes()
if enable_sync_v2:
assert self.tx_storage.indexes is not None
self.log.debug('enable sync-v2 indexes')
Expand Down Expand Up @@ -731,6 +735,42 @@ def generate_parent_txs(self, timestamp: Optional[float]) -> 'ParentTxs':
This method tries to return a stable result, such that for a given timestamp and storage state it will always
return the same.
"""
# return self._generate_parent_txs_from_tips_index(timestamp)
# XXX: prefer txs_tips index since it's been tested more
assert self.tx_storage.indexes is not None
if self.tx_storage.indexes.tx_tips is not None:
return self._generate_parent_txs_from_tips_index(timestamp)
else:
return self._generate_parent_txs_from_mempool_index(timestamp)

def _generate_parent_txs_from_mempool_index(self, timestamp: Optional[float]) -> 'ParentTxs':
# XXX: this implementation is naive, it will return a working result but not necessarily actual tips,
# particularly when the timestamp is in the past it will just return tx parents of a previous block that
# is within the timestamp, this is because we don't need to support that case for normal usage
if timestamp is None:
timestamp = self.reactor.seconds()
assert self.tx_storage.indexes is not None
assert self.tx_storage.indexes.height is not None
assert self.tx_storage.indexes.mempool_tips is not None
tips = [tx for tx in self.tx_storage.indexes.mempool_tips.iter(self.tx_storage) if tx.timestamp < timestamp]
max_timestamp = max(tx.timestamp for tx in tips) if tips else 0
can_include: List[bytes] = [not_none(tx.hash) for tx in tips]
must_include = []
if len(can_include) < 2:
best_block = self.tx_storage.indexes.height.find_by_timestamp(timestamp, self.tx_storage)
assert best_block is not None
all_best_block_parent_txs = list(map(self.tx_storage.get_transaction, best_block.parents[1:]))
best_block_parent_txs = [tx for tx in all_best_block_parent_txs if tx.timestamp < timestamp]
max_timestamp = max(max_timestamp, *list(tx.timestamp for tx in best_block_parent_txs))
if len(can_include) < 1:
can_include.extend(not_none(tx.hash) for tx in best_block_parent_txs)
else:
must_include = can_include
can_include = [not_none(tx.hash) for tx in best_block_parent_txs]
assert len(can_include) + len(must_include) >= 2
return ParentTxs(max_timestamp, can_include, must_include)

def _generate_parent_txs_from_tips_index(self, timestamp: Optional[float]) -> 'ParentTxs':
if timestamp is None:
timestamp = self.reactor.seconds()
can_include_intervals = sorted(self.tx_storage.get_tx_tips(timestamp - 1))
Expand Down
25 changes: 20 additions & 5 deletions hathor/transaction/storage/transaction_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,15 +528,23 @@ def first_timestamp(self) -> int:
raise NotImplementedError

@abstractmethod
def get_best_block_tips(self, timestamp: Optional[float] = None, *, skip_cache: bool = False) -> List[bytes]:
def get_best_block_tips(self, *, skip_cache: bool = False) -> List[bytes]:
""" Return a list of blocks that are heads in a best chain. It must be used when mining.
When more than one block is returned, it means that there are multiple best chains and
you can choose any of them.
"""
if timestamp is None and not skip_cache and self._best_block_tips_cache is not None:
return self._best_block_tips_cache[:]
# ignoring cache because current implementation is ~O(1)
assert self.indexes is not None
return [self.indexes.height.get_tip()]

@abstractmethod
def get_past_best_block_tips(self, timestamp: Optional[float] = None) -> List[bytes]:
""" Return a list of blocks that are heads in a best chain. It must be used when mining.
When more than one block is returned, it means that there are multiple best chains and
you can choose any of them.
"""
best_score = 0.0
best_tip_blocks: List[bytes] = []

Expand Down Expand Up @@ -915,6 +923,7 @@ def iter_mempool_tips_from_tx_tips(self) -> Iterator[Transaction]:
This method requires indexes to be enabled.
"""
assert self.indexes is not None
assert self.indexes.tx_tips is not None
tx_tips = self.indexes.tx_tips

for interval in tx_tips[self.latest_timestamp + 1]:
Expand Down Expand Up @@ -1019,8 +1028,11 @@ def remove_cache(self) -> None:
"""Remove all caches in case we don't need it."""
self.indexes = None

def get_best_block_tips(self, timestamp: Optional[float] = None, *, skip_cache: bool = False) -> List[bytes]:
return super().get_best_block_tips(timestamp, skip_cache=skip_cache)
def get_best_block_tips(self, *, skip_cache: bool = False) -> List[bytes]:
return super().get_best_block_tips(skip_cache=skip_cache)

def get_past_best_block_tips(self, timestamp: Optional[float] = None) -> List[bytes]:
return super().get_past_best_block_tips(timestamp)

def get_weight_best_block(self) -> float:
return super().get_weight_best_block()
Expand All @@ -1029,6 +1041,7 @@ def get_block_tips(self, timestamp: Optional[float] = None) -> Set[Interval]:
if self.indexes is None:
raise NotImplementedError
assert self.indexes is not None
assert self.indexes.block_tips is not None
if timestamp is None:
timestamp = self.latest_timestamp
return self.indexes.block_tips[timestamp]
Expand All @@ -1037,6 +1050,7 @@ def get_tx_tips(self, timestamp: Optional[float] = None) -> Set[Interval]:
if self.indexes is None:
raise NotImplementedError
assert self.indexes is not None
assert self.indexes.tx_tips is not None
if timestamp is None:
timestamp = self.latest_timestamp
tips = self.indexes.tx_tips[timestamp]
Expand All @@ -1054,6 +1068,7 @@ def get_all_tips(self, timestamp: Optional[float] = None) -> Set[Interval]:
if self.indexes is None:
raise NotImplementedError
assert self.indexes is not None
assert self.indexes.all_tips is not None
if timestamp is None:
timestamp = self.latest_timestamp

Expand Down
12 changes: 7 additions & 5 deletions tests/p2p/test_double_spending.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,9 @@ def test_simple_double_spending(self):
self.assertEqual([tx1.hash, tx2.hash], spent_meta.spent_outputs[txin.index])

# old indexes
self.assertNotIn(tx1.hash, [x.data for x in self.manager1.tx_storage.get_tx_tips()])
self.assertNotIn(tx2.hash, [x.data for x in self.manager1.tx_storage.get_tx_tips()])
if self.manager1.tx_storage.indexes.tx_tips is not None:
self.assertNotIn(tx1.hash, [x.data for x in self.manager1.tx_storage.get_tx_tips()])
self.assertNotIn(tx2.hash, [x.data for x in self.manager1.tx_storage.get_tx_tips()])

# new indexes
if self.manager1.tx_storage.indexes.mempool_tips is not None:
Expand Down Expand Up @@ -114,9 +115,10 @@ def test_simple_double_spending(self):
self.assertEqual([tx1.hash, tx2.hash, tx3.hash], spent_meta.spent_outputs[txin.index])

# old indexes
self.assertNotIn(tx1.hash, [x.data for x in self.manager1.tx_storage.get_tx_tips()])
self.assertNotIn(tx2.hash, [x.data for x in self.manager1.tx_storage.get_tx_tips()])
self.assertIn(tx3.hash, [x.data for x in self.manager1.tx_storage.get_tx_tips()])
if self.manager1.tx_storage.indexes.tx_tips is not None:
self.assertNotIn(tx1.hash, [x.data for x in self.manager1.tx_storage.get_tx_tips()])
self.assertNotIn(tx2.hash, [x.data for x in self.manager1.tx_storage.get_tx_tips()])
self.assertIn(tx3.hash, [x.data for x in self.manager1.tx_storage.get_tx_tips()])

# new indexes
if self.manager1.tx_storage.indexes.mempool_tips is not None:
Expand Down
Loading

0 comments on commit 89a5fb5

Please sign in to comment.