Skip to content
This repository has been archived by the owner on Nov 15, 2021. It is now read-only.

Preperation to support different database backends #939

Merged
merged 27 commits into from
Jun 12, 2019
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
09376f3
feat, wip, first step towards a generic db interface
merl111 Apr 5, 2019
76542b8
removed old LevelDB implementation
merl111 Apr 14, 2019
4d39469
feat, reworked blockchain db layer
merl111 Apr 14, 2019
bd02494
feat, NotificationDB now uses the new layer too
merl111 Apr 14, 2019
9ebdde6
feat, factory for prefixed db and snapshot. DB unit tests, Notificati…
merl111 Apr 24, 2019
d3aead8
fix, fixed a few things to make UTs pass
merl111 Apr 24, 2019
7771107
fix, added locking, removed unused code
merl111 Apr 25, 2019
c912b2b
fixed typo
merl111 Apr 29, 2019
94017a8
moved AbstractDBInterface to AbstractDBImplementation to avoid confusion
merl111 Apr 29, 2019
7952819
Documented new classes and methods to povide information on how to im…
merl111 Apr 29, 2019
631228f
extended doc strings, made linter happy
merl111 May 10, 2019
5ac7e58
fixed typos, changed DBFactory
merl111 May 14, 2019
11e644b
extended docstrings + fixed minor bugs
merl111 May 15, 2019
2237261
added unit test for db factory, default values for db settings
merl111 May 16, 2019
281264d
corrected database paths in configs
merl111 May 16, 2019
728666b
Merge branch 'development' into alt-db-support
merl111 May 16, 2019
f08dd6b
fixed paths to data directories
merl111 May 20, 2019
df26c05
fix, fixed typos, minor refactoring after review, fixed bug with paths
merl111 May 21, 2019
50a8c7d
made linter happy
merl111 May 21, 2019
71262dd
removed files slipped in through upstream merge
merl111 May 30, 2019
0f12117
Merge remote-tracking branch 'upstream/development' into alt-db-support
merl111 May 30, 2019
50ac3e1
made linter happy...again
merl111 May 30, 2019
5ab843b
Merge remote-tracking branch 'upstream/development' into alt-db-support
merl111 May 30, 2019
98c9ed4
updated import_blocks and export_blocks to new database layer, remove…
merl111 Jun 5, 2019
c5bb6ce
fix, minor fixes
merl111 Jun 5, 2019
939749d
fix, take --datadir intou account when starting np=prompt, removed du…
merl111 Jun 12, 2019
bc6883e
Merge remote-tracking branch 'upstream/development' into alt-db-support
merl111 Jun 12, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
872 changes: 774 additions & 98 deletions neo/Core/Blockchain.py

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions neo/Core/Helper.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
from base58 import b58decode
import binascii
from neo.Blockchain import GetBlockchain, GetStateReader
from neo.Implementations.Blockchains.LevelDB.CachedScriptTable import CachedScriptTable
from neo.Implementations.Blockchains.LevelDB.DBCollection import DBCollection
from neo.Implementations.Blockchains.LevelDB.DBPrefix import DBPrefix
from neo.Storage.Interface.DBInterface import DBInterface
from neo.Storage.Common.CachedScriptTable import CachedScriptTable
from neo.Storage.Common.DBPrefix import DBPrefix
from neo.Core.State.ContractState import ContractState
from neo.Core.State.AssetState import AssetState
from neo.Core.Cryptography.Crypto import Crypto
Expand Down Expand Up @@ -201,7 +201,7 @@ def VerifyScripts(verifiable):
return False

state_reader = GetStateReader()
script_table = CachedScriptTable(DBCollection(blockchain._db, DBPrefix.ST_Contract, ContractState))
script_table = CachedScriptTable(DBInterface(blockchain._db, DBPrefix.ST_Contract, ContractState))

engine = ApplicationEngine(TriggerType.Verification, verifiable, script_table, state_reader, Fixed8.Zero())
engine.LoadScript(verification)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import plyvel
from neo.EventHub import events
from neo.SmartContract.SmartContractEvent import SmartContractEvent, NotifyEvent, NotifyType
from neo.Storage.Implementation.DBFactory import getNotificationDB
from neo.Storage.Interface.DBInterface import DBProperties
from neo.Core.State.ContractState import ContractState
from neo.Settings import settings
from neo.Core.Blockchain import Blockchain
Expand Down Expand Up @@ -50,7 +52,7 @@ def close():
Closes the database if it is open
"""
if NotificationDB.__instance:
NotificationDB.__instance.db.close()
NotificationDB.__instance.db.closeDB()
NotificationDB.__instance = None

@property
Expand All @@ -69,7 +71,7 @@ def current_events(self):
def __init__(self, path):

try:
self._db = plyvel.DB(path, create_if_missing=True)
self._db = getNotificationDB(path)
logger.info("Created Notification DB At %s " % path)
except Exception as e:
logger.info("Notification leveldb unavailable, you may already be running this process: %s " % e)
Expand Down Expand Up @@ -126,89 +128,80 @@ def on_persist_completed(self, block):
"""
if len(self._events_to_write):

addr_db = self.db.prefixed_db(NotificationPrefix.PREFIX_ADDR)
block_db = self.db.prefixed_db(NotificationPrefix.PREFIX_BLOCK)
contract_db = self.db.prefixed_db(NotificationPrefix.PREFIX_CONTRACT)

block_write_batch = block_db.write_batch()
contract_write_batch = contract_db.write_batch()
addr_db = self.db.getPrefixedDB(NotificationPrefix.PREFIX_ADDR)
block_db = self.db.getPrefixedDB(NotificationPrefix.PREFIX_BLOCK)
contract_db = self.db.getPrefixedDB(NotificationPrefix.PREFIX_CONTRACT)

block_count = 0
block_bytes = self._events_to_write[0].block_number.to_bytes(4, 'little')

for evt in self._events_to_write: # type:NotifyEvent

# write the event for both or one of the addresses involved in the transfer
write_both = True
hash_data = evt.ToByteArray()

bytes_to = bytes(evt.addr_to.Data)
bytes_from = bytes(evt.addr_from.Data)

if bytes_to == bytes_from:
write_both = False

total_bytes_to = addr_db.get(bytes_to + NotificationPrefix.PREFIX_COUNT)
total_bytes_from = addr_db.get(bytes_from + NotificationPrefix.PREFIX_COUNT)

if not total_bytes_to:
total_bytes_to = b'\x00'

if not total_bytes_from:
total_bytes_from = b'x\00'

addr_to_key = bytes_to + total_bytes_to
addr_from_key = bytes_from + total_bytes_from

with addr_db.write_batch() as b:
b.put(addr_to_key, hash_data)
if write_both:
b.put(addr_from_key, hash_data)
total_bytes_to = int.from_bytes(total_bytes_to, 'little') + 1
total_bytes_from = int.from_bytes(total_bytes_from, 'little') + 1
new_bytes_to = total_bytes_to.to_bytes(4, 'little')
new_bytes_from = total_bytes_from.to_bytes(4, 'little')
b.put(bytes_to + NotificationPrefix.PREFIX_COUNT, new_bytes_to)
if write_both:
b.put(bytes_from + NotificationPrefix.PREFIX_COUNT, new_bytes_from)

# write the event to the per-block database
per_block_key = block_bytes + block_count.to_bytes(4, 'little')
block_write_batch.put(per_block_key, hash_data)
block_count += 1

# write the event to the per-contract database
contract_bytes = bytes(evt.contract_hash.Data)
count_for_contract = contract_db.get(contract_bytes + NotificationPrefix.PREFIX_COUNT)
if not count_for_contract:
count_for_contract = b'\x00'
contract_event_key = contract_bytes + count_for_contract
contract_count_int = int.from_bytes(count_for_contract, 'little') + 1
new_contract_count = contract_count_int.to_bytes(4, 'little')
contract_write_batch.put(contract_bytes + NotificationPrefix.PREFIX_COUNT, new_contract_count)
contract_write_batch.put(contract_event_key, hash_data)

# finish off the per-block write batch and contract write batch
block_write_batch.write()
contract_write_batch.write()
with block_db.getBatch() as block_write_batch:
with contract_db.getBatch() as contract_write_batch:
for evt in self._events_to_write: # type:NotifyEvent
# write the event for both or one of the addresses involved in the transfer
write_both = True
hash_data = evt.ToByteArray()

bytes_to = bytes(evt.addr_to.Data)
bytes_from = bytes(evt.addr_from.Data)

if bytes_to == bytes_from:
write_both = False

total_bytes_to = addr_db.get(bytes_to + NotificationPrefix.PREFIX_COUNT)
total_bytes_from = addr_db.get(bytes_from + NotificationPrefix.PREFIX_COUNT)

if not total_bytes_to:
total_bytes_to = b'\x00'

if not total_bytes_from:
total_bytes_from = b'x\00'

addr_to_key = bytes_to + total_bytes_to
addr_from_key = bytes_from + total_bytes_from

with addr_db.getBatch() as b:
b.put(addr_to_key, hash_data)
if write_both:
b.put(addr_from_key, hash_data)
total_bytes_to = int.from_bytes(total_bytes_to, 'little') + 1
total_bytes_from = int.from_bytes(total_bytes_from, 'little') + 1
new_bytes_to = total_bytes_to.to_bytes(4, 'little')
new_bytes_from = total_bytes_from.to_bytes(4, 'little')
b.put(bytes_to + NotificationPrefix.PREFIX_COUNT, new_bytes_to)
if write_both:
b.put(bytes_from + NotificationPrefix.PREFIX_COUNT, new_bytes_from)

# write the event to the per-block database
per_block_key = block_bytes + block_count.to_bytes(4, 'little')
block_write_batch.put(per_block_key, hash_data)
block_count += 1

# write the event to the per-contract database
contract_bytes = bytes(evt.contract_hash.Data)
count_for_contract = contract_db.get(contract_bytes + NotificationPrefix.PREFIX_COUNT)
if not count_for_contract:
count_for_contract = b'\x00'
contract_event_key = contract_bytes + count_for_contract
contract_count_int = int.from_bytes(count_for_contract, 'little') + 1
new_contract_count = contract_count_int.to_bytes(4, 'little')
contract_write_batch.put(contract_bytes + NotificationPrefix.PREFIX_COUNT, new_contract_count)
contract_write_batch.put(contract_event_key, hash_data)

self._events_to_write = []

if len(self._new_contracts_to_write):

token_db = self.db.prefixed_db(NotificationPrefix.PREFIX_TOKEN)

token_write_batch = token_db.write_batch()
token_db = self.db.getPrefixedDB(NotificationPrefix.PREFIX_TOKEN)

for token_event in self._new_contracts_to_write:
try:
hash_data = token_event.ToByteArray() # used to fail here
hash_key = token_event.contract.Code.ScriptHash().ToBytes()
token_write_batch.put(hash_key, hash_data)
except Exception as e:
logger.debug(f"Failed to write new contract, reason: {e}")

token_write_batch.write()
with token_db.getBatch() as token_write_batch:
for token_event in self._new_contracts_to_write:
try:
hash_data = token_event.ToByteArray() # used to fail here
hash_key = token_event.contract.Code.ScriptHash().ToBytes()
token_write_batch.put(hash_key, hash_data)
except Exception as e:
logger.debug(f"Failed to write new contract, reason: {e}")

self._new_contracts_to_write = []

Expand All @@ -221,12 +214,14 @@ def get_by_block(self, block_number):
Returns:
list: a list of notifications
"""
blocklist_snapshot = self.db.prefixed_db(NotificationPrefix.PREFIX_BLOCK).snapshot()
blocklist_snapshot = self.db.getPrefixedDB(NotificationPrefix.PREFIX_BLOCK).createSnapshot()

block_bytes = block_number.to_bytes(4, 'little')
results = []
for val in blocklist_snapshot.iterator(prefix=block_bytes, include_key=False):
event = SmartContractEvent.FromByteArray(val)
results.append(event)
with blocklist_snapshot.openIter(DBProperties(prefix=block_bytes, include_key=False)) as iterator:
for val in iterator:
event = SmartContractEvent.FromByteArray(val)
results.append(event)

return results

Expand All @@ -246,16 +241,17 @@ def get_by_addr(self, address):
if not isinstance(addr, UInt160):
raise Exception("Incorrect address format")

addrlist_snapshot = self.db.prefixed_db(NotificationPrefix.PREFIX_ADDR).snapshot()
addrlist_snapshot = self.db.getPrefixedDB(NotificationPrefix.PREFIX_ADDR).createSnapshot()
results = []

for val in addrlist_snapshot.iterator(prefix=bytes(addr.Data), include_key=False):
if len(val) > 4:
try:
event = SmartContractEvent.FromByteArray(val)
results.append(event)
except Exception as e:
logger.error("could not parse event: %s %s" % (e, val))
with addrlist_snapshot.openIter(DBProperties(prefix=bytes(addr.Data), include_key=False)) as iterator:
for val in iterator:
if len(val) > 4:
try:
event = SmartContractEvent.FromByteArray(val)
results.append(event)
except Exception as e:
logger.error("could not parse event: %s %s" % (e, val))
return results

def get_by_contract(self, contract_hash):
Expand All @@ -274,16 +270,17 @@ def get_by_contract(self, contract_hash):
if not isinstance(hash, UInt160):
raise Exception("Incorrect address format")

contractlist_snapshot = self.db.prefixed_db(NotificationPrefix.PREFIX_CONTRACT).snapshot()
contractlist_snapshot = self.db.getPrefixedDB(NotificationPrefix.PREFIX_CONTRACT).createSnapshot()
results = []

for val in contractlist_snapshot.iterator(prefix=bytes(hash.Data), include_key=False):
if len(val) > 4:
try:
event = SmartContractEvent.FromByteArray(val)
results.append(event)
except Exception as e:
logger.error("could not parse event: %s %s" % (e, val))
with contractlist_snapshot.openIter(DBProperties(prefix=bytes(hash.Data), include_key=False)) as iterator:
for val in iterator:
if len(val) > 4:
try:
event = SmartContractEvent.FromByteArray(val)
results.append(event)
except Exception as e:
logger.error("could not parse event: %s %s" % (e, val))
return results

def get_tokens(self):
Expand All @@ -292,11 +289,13 @@ def get_tokens(self):
Returns:
list: A list of smart contract events with contracts that are NEP5 Tokens
"""
tokens_snapshot = self.db.prefixed_db(NotificationPrefix.PREFIX_TOKEN).snapshot()
tokens_snapshot = self.db.getPrefixedDB(NotificationPrefix.PREFIX_TOKEN).createSnapshot()
results = []
for val in tokens_snapshot.iterator(include_key=False):
event = SmartContractEvent.FromByteArray(val)
results.append(event)

with tokens_snapshot.openIter(DBProperties(include_key=False)) as iterator:
for val in iterator:
event = SmartContractEvent.FromByteArray(val)
results.append(event)
return results

def get_token(self, hash):
Expand All @@ -308,7 +307,7 @@ def get_token(self, hash):
Returns:
SmartContractEvent: A smart contract event with a contract that is an NEP5 Token
"""
tokens_snapshot = self.db.prefixed_db(NotificationPrefix.PREFIX_TOKEN).snapshot()
tokens_snapshot = self.db.getPrefixedDB(DBProperties(NotificationPrefix.PREFIX_TOKEN)).createSnapshot()

try:
val = tokens_snapshot.get(hash.ToBytes())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import shutil
import os

from neo.Implementations.Notifications.LevelDB.NotificationDB import NotificationDB
from neo.Implementations.Notifications.NotificationDB import NotificationDB
from neo.Core.BigInteger import BigInteger


Expand Down
4 changes: 1 addition & 3 deletions neo/Network/NodeLeader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
from typing import List
from neo.Core.Block import Block
from neo.Core.Blockchain import Blockchain as BC
from neo.Implementations.Blockchains.LevelDB.TestLevelDBBlockchain import TestLevelDBBlockchain
from neo.Core.TX.Transaction import Transaction
from neo.Core.TX.MinerTransaction import MinerTransaction
from neo.Network.NeoNode import NeoNode, HEARTBEAT_BLOCKS
Expand Down Expand Up @@ -542,8 +541,7 @@ def RelayDirectly(self, inventory):
relayed |= peer.Relay(inventory)

if len(self.Peers) == 0:
if type(BC.Default()) is TestLevelDBBlockchain:
# mock a true result for tests
if BC.Default().UT:
return True

logger.info("no connected peers")
Expand Down
Loading