Skip to content

Commit

Permalink
Merge pull request #1908 from celo-org/hbandura/upstream_1.10.9_part2
Browse files Browse the repository at this point in the history
  • Loading branch information
mcortesi authored Jul 29, 2022
2 parents e034227 + 62ca7b7 commit a5525dc
Show file tree
Hide file tree
Showing 13 changed files with 1,282 additions and 595 deletions.
184 changes: 80 additions & 104 deletions core/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,7 @@ type BlockChain struct {
processor Processor // Block transaction processor interface
vmConfig vm.Config

shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
terminateInsert func(common.Hash, uint64) bool // Testing hook used to terminate ancient receipt chain insertion.
shouldPreserve func(*types.Block) bool // Function used to determine whether should preserve the given block.
}

// NewBlockChain returns a fully initialised block chain using information
Expand Down Expand Up @@ -1123,27 +1122,6 @@ const (
SideStatTy
)

// truncateAncient rewinds the blockchain to the specified header and deletes all
// data in the ancient store that exceeds the specified header.
func (bc *BlockChain) truncateAncient(head uint64) error {
frozen, err := bc.db.Ancients()
if err != nil {
return err
}
// Short circuit if there is no data to truncate in ancient store.
if frozen <= head+1 {
return nil
}
// Truncate all the data in the freezer beyond the specified head
if err := bc.db.TruncateAncients(head + 1); err != nil {
return err
}
bc.purge()

log.Info("Rewind ancient data", "number", head)
return nil
}

// numberHash is just a container for a number and a hash, to represent a block
type numberHash struct {
number uint64
Expand Down Expand Up @@ -1182,8 +1160,9 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
var (
stats = struct{ processed, ignored int32 }{}
start = time.Now()
size = 0
size = int64(0)
)

// updateHead updates the head fast sync block if the inserted blocks are better
// and returns an indicator whether the inserted blocks are canonical.
updateHead := func(head *types.Block) bool {
Expand All @@ -1204,120 +1183,116 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
}
return false
}

// writeAncient writes blockchain and corresponding receipt chain into ancient store.
//
// this function only accepts canonical chain data. All side chain will be reverted
// eventually.
writeAncient := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
var (
previous = bc.CurrentFastBlock()
batch = bc.db.NewBatch()
)
// If any error occurs before updating the head or we are inserting a side chain,
// all the data written this time wll be rolled back.
defer func() {
if previous != nil {
if err := bc.truncateAncient(previous.NumberU64()); err != nil {
log.Crit("Truncate ancient store failed", "err", err)
}
}
}()
var deleted []*numberHash
for i, block := range blockChain {
// Short circuit insertion if shutting down or processing failed
if bc.insertStopped() {
return 0, errInsertionInterrupted
}
// Short circuit insertion if it is required(used in testing only)
if bc.terminateInsert != nil && bc.terminateInsert(block.Hash(), block.NumberU64()) {
return i, errors.New("insertion is terminated for testing purpose")
}
// Short circuit if the owner header is unknown
if !bc.HasHeader(block.Hash(), block.NumberU64()) {
return i, fmt.Errorf("containing header #%d [%x..] unknown", block.Number(), block.Hash().Bytes()[:4])
}
if block.NumberU64() == 1 {
// Make sure to write the genesis into the freezer
if frozen, _ := bc.db.Ancients(); frozen == 0 {
h := rawdb.ReadCanonicalHash(bc.db, 0)
b := rawdb.ReadBlock(bc.db, h, 0)
size += rawdb.WriteAncientBlock(bc.db, b, rawdb.ReadReceipts(bc.db, h, 0, bc.chainConfig), rawdb.ReadTd(bc.db, h, 0))
log.Info("Wrote genesis to ancients")
first := blockChain[0]
last := blockChain[len(blockChain)-1]

// Ensure genesis is in ancients.
if first.NumberU64() == 1 {
if frozen, _ := bc.db.Ancients(); frozen == 0 {
b := bc.genesisBlock
writeSize, err := rawdb.WriteAncientBlocks(bc.db, []*types.Block{b}, []types.Receipts{nil}, big.NewInt(1))
size += writeSize
if err != nil {
log.Error("Error writing genesis to ancients", "err", err)
return 0, err
}
log.Info("Wrote genesis to ancients")
}
// Flush data into ancient database.
size += rawdb.WriteAncientBlock(bc.db, block, receiptChain[i], bc.GetTd(block.Hash(), block.NumberU64()))

// Write tx indices if any condition is satisfied:
// * If user requires to reserve all tx indices(txlookuplimit=0)
// * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
// * If block number is large enough to be regarded as a recent block
// It means blocks below the ancientLimit-txlookupLimit won't be indexed.
//
// But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
// an external ancient database, during the setup, blockchain will start
// a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
// range. In this case, all tx indices of newly imported blocks should be
// generated.
}
// Before writing the blocks to the ancients, we need to ensure that
// they correspond to the what the headerchain 'expects'.
// We only check the last block/header, since it's a contiguous chain.
if !bc.HasHeader(last.Hash(), last.NumberU64()) {
return 0, fmt.Errorf("containing header #%d [%x..] unknown", last.Number(), last.Hash().Bytes()[:4])
}

// Write all chain data to ancients.
td := bc.GetTd(first.Hash(), first.NumberU64())
writeSize, err := rawdb.WriteAncientBlocks(bc.db, blockChain, receiptChain, td)
size += writeSize
if err != nil {
log.Error("Error importing chain data to ancients", "err", err)
return 0, err
}

// Write tx indices if any condition is satisfied:
// * If user requires to reserve all tx indices(txlookuplimit=0)
// * If all ancient tx indices are required to be reserved(txlookuplimit is even higher than ancientlimit)
// * If block number is large enough to be regarded as a recent block
// It means blocks below the ancientLimit-txlookupLimit won't be indexed.
//
// But if the `TxIndexTail` is not nil, e.g. Geth is initialized with
// an external ancient database, during the setup, blockchain will start
// a background routine to re-indexed all indices in [ancients - txlookupLimit, ancients)
// range. In this case, all tx indices of newly imported blocks should be
// generated.
var batch = bc.db.NewBatch()
for _, block := range blockChain {
if bc.txLookupLimit == 0 || ancientLimit <= bc.txLookupLimit || block.NumberU64() >= ancientLimit-bc.txLookupLimit {
rawdb.WriteTxLookupEntriesByBlock(batch, block)
} else if rawdb.ReadTxIndexTail(bc.db) != nil {
rawdb.WriteTxLookupEntriesByBlock(batch, block)
}
stats.processed++
}

// Flush all tx-lookup index data.
size += batch.ValueSize()
size += int64(batch.ValueSize())
if err := batch.Write(); err != nil {
// The tx index data could not be written.
// Roll back the ancient store update.
fastBlock := bc.CurrentFastBlock().NumberU64()
if err := bc.db.TruncateAncients(fastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, err
}
batch.Reset()

// Sync the ancient store explicitly to ensure all data has been flushed to disk.
if err := bc.db.Sync(); err != nil {
return 0, err
}
if !updateHead(blockChain[len(blockChain)-1]) {
return 0, errors.New("side blocks can't be accepted as the ancient chain data")
}
previous = nil // disable rollback explicitly

// Wipe out canonical block data.
for _, nh := range deleted {
rawdb.DeleteBlockWithoutNumber(batch, nh.hash, nh.number)
rawdb.DeleteCanonicalHash(batch, nh.number)
}
for _, block := range blockChain {
// Always keep genesis block in active database.
if block.NumberU64() != 0 {
rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
rawdb.DeleteCanonicalHash(batch, block.NumberU64())
// Update the current fast block because all block data is now present in DB.
previousFastBlock := bc.CurrentFastBlock().NumberU64()
if !updateHead(blockChain[len(blockChain)-1]) {
// We end up here if the header chain has reorg'ed, and the blocks/receipts
// don't match the canonical chain.
if err := bc.db.TruncateAncients(previousFastBlock + 1); err != nil {
log.Error("Can't truncate ancient store after failed insert", "err", err)
}
return 0, errSideChainReceipts
}
if err := batch.Write(); err != nil {
return 0, err
}
batch.Reset()

// Wipe out side chain too.
for _, nh := range deleted {
for _, hash := range rawdb.ReadAllHashes(bc.db, nh.number) {
rawdb.DeleteBlock(batch, hash, nh.number)
// Delete block data from the main database.
batch.Reset()
canonHashes := make(map[common.Hash]struct{})
for _, block := range blockChain {
canonHashes[block.Hash()] = struct{}{}
if block.NumberU64() == 0 {
continue
}
rawdb.DeleteCanonicalHash(batch, block.NumberU64())
rawdb.DeleteBlockWithoutNumber(batch, block.Hash(), block.NumberU64())
}
for _, block := range blockChain {
// Always keep genesis block in active database.
if block.NumberU64() != 0 {
for _, hash := range rawdb.ReadAllHashes(bc.db, block.NumberU64()) {
rawdb.DeleteBlock(batch, hash, block.NumberU64())
}
// Delete side chain hash-to-number mappings.
for _, nh := range rawdb.ReadAllHashesInRange(bc.db, first.NumberU64(), last.NumberU64()) {
if _, canon := canonHashes[nh.Hash]; !canon {
rawdb.DeleteHeader(batch, nh.Hash, nh.Number)
}
}
if err := batch.Write(); err != nil {
return 0, err
}
return 0, nil
}

// writeLive writes blockchain and corresponding receipt chain into active store.
writeLive := func(blockChain types.Blocks, receiptChain []types.Receipts) (int, error) {
skipPresenceCheck := false
Expand Down Expand Up @@ -1355,7 +1330,7 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
if err := batch.Write(); err != nil {
return 0, err
}
size += batch.ValueSize()
size += int64(batch.ValueSize())
batch.Reset()
}
stats.processed++
Expand All @@ -1364,14 +1339,15 @@ func (bc *BlockChain) InsertReceiptChain(blockChain types.Blocks, receiptChain [
// we can ensure all components of body is completed(body, receipts,
// tx indexes)
if batch.ValueSize() > 0 {
size += batch.ValueSize()
size += int64(batch.ValueSize())
if err := batch.Write(); err != nil {
return 0, err
}
}
updateHead(blockChain[len(blockChain)-1])
return 0, nil
}

// Write downloaded chain data and corresponding receipt chain data
if len(ancientBlocks) > 0 {
if n, err := writeAncient(ancientBlocks, ancientReceipts); err != nil {
Expand Down
77 changes: 19 additions & 58 deletions core/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,7 @@ func TestFastVsFullChains(t *testing.T) {
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(len(blocks)/2)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}

// Iterate over all chain data components, and cross reference
for i := 0; i < len(blocks); i++ {
num, hash := blocks[i].NumberU64(), blocks[i].Hash()
Expand All @@ -682,10 +683,27 @@ func TestFastVsFullChains(t *testing.T) {
} else if types.DeriveSha(fblock.Transactions(), trie.NewStackTrie(nil)) != types.DeriveSha(arblock.Transactions(), trie.NewStackTrie(nil)) || types.DeriveSha(anblock.Transactions(), trie.NewStackTrie(nil)) != types.DeriveSha(arblock.Transactions(), trie.NewStackTrie(nil)) {
t.Errorf("block #%d [%x]: transactions mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, fblock.Transactions(), anblock.Transactions(), arblock.Transactions())
}
if freceipts, anreceipts, areceipts := rawdb.ReadReceipts(fastDb, hash, *rawdb.ReadHeaderNumber(fastDb, hash), fast.Config()), rawdb.ReadReceipts(ancientDb, hash, *rawdb.ReadHeaderNumber(ancientDb, hash), fast.Config()), rawdb.ReadReceipts(archiveDb, hash, *rawdb.ReadHeaderNumber(archiveDb, hash), fast.Config()); types.DeriveSha(freceipts, trie.NewStackTrie(nil)) != types.DeriveSha(areceipts, trie.NewStackTrie(nil)) {

// Check receipts.
freceipts := rawdb.ReadReceipts(fastDb, hash, num, fast.Config())
anreceipts := rawdb.ReadReceipts(ancientDb, hash, num, fast.Config())
areceipts := rawdb.ReadReceipts(archiveDb, hash, num, fast.Config())
if types.DeriveSha(freceipts, trie.NewStackTrie(nil)) != types.DeriveSha(areceipts, trie.NewStackTrie(nil)) {
t.Errorf("block #%d [%x]: receipts mismatch: fastdb %v, ancientdb %v, archivedb %v", num, hash, freceipts, anreceipts, areceipts)
}

// Check that hash-to-number mappings are present in all databases.
if m := rawdb.ReadHeaderNumber(fastDb, hash); m == nil || *m != num {
t.Errorf("block #%d [%x]: wrong hash-to-number mapping in fastdb: %v", num, hash, m)
}
if m := rawdb.ReadHeaderNumber(ancientDb, hash); m == nil || *m != num {
t.Errorf("block #%d [%x]: wrong hash-to-number mapping in ancientdb: %v", num, hash, m)
}
if m := rawdb.ReadHeaderNumber(archiveDb, hash); m == nil || *m != num {
t.Errorf("block #%d [%x]: wrong hash-to-number mapping in archivedb: %v", num, hash, m)
}
}

// Check that the canonical chains are the same between the databases
for i := 0; i < len(blocks)+1; i++ {
if fhash, ahash := rawdb.ReadCanonicalHash(fastDb, uint64(i)), rawdb.ReadCanonicalHash(archiveDb, uint64(i)); fhash != ahash {
Expand Down Expand Up @@ -1630,63 +1648,6 @@ func TestBlockchainRecovery(t *testing.T) {
}
}

func TestIncompleteAncientReceiptChainInsertion(t *testing.T) {
// Configure and generate a sample block chain
var (
gendb = rawdb.NewMemoryDatabase()
key, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
address = crypto.PubkeyToAddress(key.PublicKey)
funds = big.NewInt(1000000000)
gspec = &Genesis{Config: params.IstanbulTestChainConfig, Alloc: GenesisAlloc{address: {Balance: funds}}}
genesis = gspec.MustCommit(gendb)
)
height := uint64(1024)
blocks, receipts := GenerateChain(gspec.Config, genesis, mockEngine.NewFaker(), gendb, int(height), nil)

// Import the chain as a ancient-first node and ensure all pointers are updated
frdir, err := ioutil.TempDir("", "")
if err != nil {
t.Fatalf("failed to create temp freezer dir: %v", err)
}
defer os.Remove(frdir)
ancientDb, err := rawdb.NewDatabaseWithFreezer(rawdb.NewMemoryDatabase(), frdir, "", false)
if err != nil {
t.Fatalf("failed to create temp freezer db: %v", err)
}
gspec.MustCommit(ancientDb)
ancient, _ := NewBlockChain(ancientDb, nil, gspec.Config, mockEngine.NewFaker(), vm.Config{}, nil, nil)
defer ancient.Stop()

headers := make([]*types.Header, len(blocks))
for i, block := range blocks {
headers[i] = block.Header()
}
if n, err := ancient.InsertHeaderChain(headers, 1, true); err != nil {
t.Fatalf("failed to insert header %d: %v", n, err)
}
// Abort ancient receipt chain insertion deliberately
ancient.terminateInsert = func(hash common.Hash, number uint64) bool {
return number == blocks[len(blocks)/2].NumberU64()
}
previousFastBlock := ancient.CurrentFastBlock()
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err == nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
if ancient.CurrentFastBlock().NumberU64() != previousFastBlock.NumberU64() {
t.Fatalf("failed to rollback ancient data, want %d, have %d", previousFastBlock.NumberU64(), ancient.CurrentFastBlock().NumberU64())
}
if frozen, err := ancient.db.Ancients(); err != nil || frozen != 1 {
t.Fatalf("failed to truncate ancient data")
}
ancient.terminateInsert = nil
if n, err := ancient.InsertReceiptChain(blocks, receipts, uint64(3*len(blocks)/4)); err != nil {
t.Fatalf("failed to insert receipt %d: %v", n, err)
}
if ancient.CurrentFastBlock().NumberU64() != blocks[len(blocks)-1].NumberU64() {
t.Fatalf("failed to insert ancient recept chain after rollback")
}
}

// Tests that importing a very large side fork, which is larger than the canon chain,
// but where the difficulty per block is kept low: this means that it will not
// overtake the 'canon' chain until after it's passed canon by about 200 blocks.
Expand Down
2 changes: 2 additions & 0 deletions core/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ var (

// ErrNoGenesis is returned when there is no Genesis Block.
ErrNoGenesis = errors.New("genesis not found in chain")

errSideChainReceipts = errors.New("side blocks can't be accepted as ancient chain data")
)

// List of evm-call-message pre-checking errors. All state transition messages will
Expand Down
Loading

0 comments on commit a5525dc

Please sign in to comment.