Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check for gaps in block numbers and throw if found during migration #282

Draft
wants to merge 24 commits into
base: celo10
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
469727c
check for gaps in block numbers and throw if found during migration
alecps Dec 9, 2024
9399129
fix err msg nit
alecps Dec 10, 2024
31b6ba3
add comment on contiguous blocks
alecps Dec 10, 2024
20737a8
fix for re-running pre-migration
alecps Dec 12, 2024
7cca9e8
Remove redundant zero check
alecps Dec 13, 2024
bbb4035
drive by: add error checking in misc places where it was missed
alecps Dec 17, 2024
e7ec364
add leveldb keys for non-transformed data
alecps Dec 17, 2024
a0aad1e
check for header and body only in leveldb when migrating non-ancients
alecps Dec 17, 2024
f59e3bb
add check that transformed header has the expected block number
alecps Dec 17, 2024
67fe329
add checkOtherDataForNonAncientBlock to non-ancient migration
alecps Dec 17, 2024
8db450d
add checkTransformedHeader helper
alecps Dec 17, 2024
39f2b63
add CheckRLPBlockRangeForGaps to ancient migration
alecps Dec 17, 2024
cb1bc36
add missing return
alecps Dec 17, 2024
66c7b86
fix minor indexing error in logging
alecps Dec 18, 2024
32cbb26
fix typos
alecps Dec 18, 2024
f9964c0
WIP add parent hash checks
alecps Dec 18, 2024
92ac998
fix parent hash check in ancient migration
alecps Dec 19, 2024
c2c2789
Alternative approach to checking continuity (#288)
piersy Dec 20, 2024
8781e3d
fix error check
alecps Jan 6, 2025
407e103
saving progress on consolidating continuity checks between ancient an…
alecps Jan 6, 2025
d7b3f80
saving more progress
alecps Jan 7, 2025
1c84877
saving more progress / cleanup
alecps Jan 7, 2025
2ac2407
more refactoring
alecps Jan 8, 2025
b3ec85c
get rid of numAncients variable in favor of lastAncient block element
alecps Jan 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 59 additions & 8 deletions op-chain-ops/cmd/celo-migrate/ancients.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import (
"fmt"
"path/filepath"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/rawdb"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/ethdb"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rlp"
"golang.org/x/sync/errgroup"
)

Expand All @@ -23,6 +24,44 @@ type RLPBlockRange struct {
tds [][]byte
}

// CheckRLPBlockRangeForGaps checks for gaps in the given RLP block range by comparing the lengths for each table and checking the header numbers
func CheckRLPBlockRangeForGaps(blockRange RLPBlockRange, expectedLength uint64) (err error) {
if uint64(len(blockRange.hashes)) != expectedLength {
err = fmt.Errorf("Expected count mismatch in block range hashes: expected %d, actual %d", expectedLength, len(blockRange.hashes))
}
if uint64(len(blockRange.bodies)) != expectedLength {
err = errors.Join(err, fmt.Errorf("Expected count mismatch in block range bodies: expected %d, actual %d", expectedLength, len(blockRange.bodies)))
}
if uint64(len(blockRange.headers)) != expectedLength {
err = errors.Join(err, fmt.Errorf("Expected count mismatch in block range headers: expected %d, actual %d", expectedLength, len(blockRange.headers)))
}
if uint64(len(blockRange.receipts)) != expectedLength {
err = errors.Join(err, fmt.Errorf("Expected count mismatch in block range receipts: expected %d, actual %d", expectedLength, len(blockRange.receipts)))
}
if uint64(len(blockRange.tds)) != expectedLength {
err = errors.Join(err, fmt.Errorf("Expected count mismatch in block range total difficulties: expected %d, actual %d", expectedLength, len(blockRange.tds)))
}

if err != nil {
return err
}

// Cbecm that block number in header matches the expected block number
alecps marked this conversation as resolved.
Show resolved Hide resolved
for i := uint64(0); i < expectedLength; i++ {
header := new(types.Header)
err := rlp.DecodeBytes(blockRange.headers[i], &header)
if err != nil {
return fmt.Errorf("can't decode header: %w", err)
}
expectedBlockNumber := blockRange.start + i
if header.Number.Uint64() != expectedBlockNumber {
return fmt.Errorf("header number mismatch: expected %d, actual %d", expectedBlockNumber, header.Number.Uint64())
}
alecps marked this conversation as resolved.
Show resolved Hide resolved
}

return nil
}

// NewChainFreezer is a small utility method around NewFreezer that sets the
// default parameters for the chain storage.
func NewChainFreezer(datadir string, namespace string, readonly bool) (*rawdb.Freezer, error) {
Expand Down Expand Up @@ -82,7 +121,7 @@ func migrateAncientsDb(ctx context.Context, oldDBPath, newDBPath string, batchSi
g.Go(func() error {
return readAncientBlocks(ctx, oldFreezer, numAncientsNewBefore, numAncientsOld, batchSize, readChan)
})
g.Go(func() error { return transformBlocks(ctx, readChan, transformChan) })
g.Go(func() error { return transformBlocks(ctx, readChan, transformChan, numAncientsNewBefore) })
g.Go(func() error { return writeAncientBlocks(ctx, newFreezer, transformChan, numAncientsOld) })

if err = g.Wait(); err != nil {
Expand Down Expand Up @@ -110,7 +149,7 @@ func readAncientBlocks(ctx context.Context, freezer *rawdb.Freezer, startBlock,
case <-ctx.Done():
return ctx.Err()
default:
count := min(batchSize, endBlock-i+1)
count := min(batchSize, endBlock-i)
start := i

blockRange := RLPBlockRange{
Expand Down Expand Up @@ -144,15 +183,22 @@ func readAncientBlocks(ctx context.Context, freezer *rawdb.Freezer, startBlock,
return fmt.Errorf("failed to read tds from old freezer: %w", err)
}

if err := CheckRLPBlockRangeForGaps(blockRange, count); err != nil {
return fmt.Errorf("failed to ensure ancient block range has no gaps: %w", err)
}

alecps marked this conversation as resolved.
Show resolved Hide resolved
out <- blockRange
}
}
return nil
}

func transformBlocks(ctx context.Context, in <-chan RLPBlockRange, out chan<- RLPBlockRange) error {
func transformBlocks(ctx context.Context, in <-chan RLPBlockRange, out chan<- RLPBlockRange, startBlock uint64) error {
alecps marked this conversation as resolved.
Show resolved Hide resolved
// Transform blocks from the in channel and send them to the out channel
defer close(out)

prevBlockNumber := uint64(startBlock - 1) // Will underflow when startBlock is 0, but then overflow back to 0

for blockRange := range in {
select {
case <-ctx.Done():
Expand All @@ -161,6 +207,12 @@ func transformBlocks(ctx context.Context, in <-chan RLPBlockRange, out chan<- RL
for i := range blockRange.hashes {
blockNumber := blockRange.start + uint64(i)

if blockNumber != prevBlockNumber+1 { // Overflows back to 0 when startBlock is 0
return fmt.Errorf("gap found between ancient blocks numbered %d and %d. Please delete the target directory and repeat the migration with an uncorrupted source directory", prevBlockNumber, blockNumber)
}
// Block ranges are in order because they are read sequentially from the freezer
prevBlockNumber = blockNumber

newHeader, err := transformHeader(blockRange.headers[i])
if err != nil {
return fmt.Errorf("can't transform header: %w", err)
Expand All @@ -170,9 +222,8 @@ func transformBlocks(ctx context.Context, in <-chan RLPBlockRange, out chan<- RL
return fmt.Errorf("can't transform body: %w", err)
}

if yes, newHash := hasSameHash(newHeader, blockRange.hashes[i]); !yes {
log.Error("Hash mismatch", "block", blockNumber, "oldHash", common.BytesToHash(blockRange.hashes[i]), "newHash", newHash)
return fmt.Errorf("hash mismatch at block %d", blockNumber)
if err := checkTransformedHeader(newHeader, blockRange.hashes[i], blockNumber); err != nil {
return err
}

blockRange.headers[i] = newHeader
Expand Down Expand Up @@ -216,7 +267,7 @@ func writeAncientBlocks(ctx context.Context, freezer *rawdb.Freezer, in <-chan R
return fmt.Errorf("failed to write block range: %w", err)
}
blockRangeEnd := blockRange.start + uint64(len(blockRange.hashes)) - 1
log.Info("Wrote ancient blocks", "start", blockRange.start, "end", blockRangeEnd, "count", len(blockRange.hashes), "remaining", totalAncientBlocks-blockRangeEnd)
log.Info("Wrote ancient blocks", "start", blockRange.start, "end", blockRangeEnd, "count", len(blockRange.hashes), "remaining", totalAncientBlocks-(blockRangeEnd+1))
}
}
return nil
Expand Down
33 changes: 32 additions & 1 deletion op-chain-ops/cmd/celo-migrate/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ const (
)

var (
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
headerPrefix = []byte("h") // headerPrefix + num (uint64 big endian) + hash -> header
headerTDSuffix = []byte("t") // headerPrefix + num (uint64 big endian) + hash + headerTDSuffix -> td
headerHashSuffix = []byte("n") // headerPrefix + num (uint64 big endian) + headerHashSuffix -> hash
headerNumberPrefix = []byte("H") // headerNumberPrefix + hash -> num (uint64 big endian)

blockBodyPrefix = []byte("b") // blockBodyPrefix + num (uint64 big endian) + hash -> block body
blockReceiptsPrefix = []byte("r") // blockReceiptsPrefix + num (uint64 big endian) + hash -> block receipts
)

// encodeBlockNumber encodes a block number as big endian uint64
Expand All @@ -36,6 +42,31 @@ func headerKey(number uint64, hash common.Hash) []byte {
return append(append(headerPrefix, encodeBlockNumber(number)...), hash.Bytes()...)
}

// headerTDKey = headerPrefix + num (uint64 big endian) + hash + headerTDSuffix
func headerTDKey(number uint64, hash common.Hash) []byte {
return append(headerKey(number, hash), headerTDSuffix...)
}

// headerHashKey = headerPrefix + num (uint64 big endian) + headerHashSuffix
func headerHashKey(number uint64) []byte {
return append(append(headerPrefix, encodeBlockNumber(number)...), headerHashSuffix...)
}

// headerNumberKey = headerNumberPrefix + hash
func headerNumberKey(hash common.Hash) []byte {
return append(headerNumberPrefix, hash.Bytes()...)
}

// blockBodyKey = blockBodyPrefix + num (uint64 big endian) + hash
func blockBodyKey(number uint64, hash common.Hash) []byte {
return append(append(blockBodyPrefix, encodeBlockNumber(number)...), hash.Bytes()...)
}

// blockReceiptsKey = blockReceiptsPrefix + num (uint64 big endian) + hash
func blockReceiptsKey(number uint64, hash common.Hash) []byte {
return append(append(blockReceiptsPrefix, encodeBlockNumber(number)...), hash.Bytes()...)
}

// Opens a database with access to AncientsDb
func openDB(chaindataPath string, readOnly bool) (ethdb.Database, error) {
// Will throw an error if the chaindataPath does not exist
Expand Down
4 changes: 3 additions & 1 deletion op-chain-ops/cmd/celo-migrate/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,9 @@ func main() {
if isSubcommand {
return err
}
_ = cli.ShowAppHelp(ctx)
if err := cli.ShowAppHelp(ctx); err != nil {
log.Error("failed to show cli help", "err", err)
}
return fmt.Errorf("please provide a valid command")
},
}
Expand Down
60 changes: 54 additions & 6 deletions op-chain-ops/cmd/celo-migrate/non-ancients.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"bytes"
"fmt"
"os"
"os/exec"
Expand Down Expand Up @@ -78,13 +79,24 @@ func migrateNonAncientsDb(newDB ethdb.Database, lastBlock, numAncients, batchSiz
}
}

prevBlockNumber := uint64(numAncients - 1) // Will underflow if numAncients is 0

for i := numAncients; i <= lastBlock; i += batchSize {
numbersHash := rawdb.ReadAllHashesInRange(newDB, i, i+batchSize-1)

log.Info("Processing Block Range", "process", "non-ancients", "from", i, "to(inclusve)", i+batchSize-1, "count", len(numbersHash))
for _, numberHash := range numbersHash {
if numberHash.Number != prevBlockNumber+1 { // prevBlocNumber will overflow back to 0 here if numAncients is 0
return 0, fmt.Errorf("gap found between non-ancient blocks numbered %d and %d. Please delete the target directory and repeat the migration with an uncorrupted source directory", prevBlockNumber, numberHash.Number)
}
prevBlockNumber = numberHash.Number

if err := migrateNonAncientBlock(numberHash.Number, numberHash.Hash, newDB); err != nil {
return 0, err
return 0, fmt.Errorf("failed to migrate non-ancient block %d - %x: %w", numberHash.Number, numberHash.Hash, err)
}

if err := checkOtherDataForNonAncientBlock(numberHash.Number, numberHash.Hash, newDB); err != nil {
return 0, fmt.Errorf("failed to ensure all non-transformed data is present for non-ancient block %d - %x: %w. Please delete the target directory and repeat the migration with an uncorrupted source directory", numberHash.Number, numberHash.Hash, err)
}
}
}
Expand All @@ -95,8 +107,14 @@ func migrateNonAncientsDb(newDB ethdb.Database, lastBlock, numAncients, batchSiz

func migrateNonAncientBlock(number uint64, hash common.Hash, newDB ethdb.Database) error {
// read header and body
header := rawdb.ReadHeaderRLP(newDB, hash, number)
body := rawdb.ReadBodyRLP(newDB, hash, number)
header, err := newDB.Get(headerKey(number, hash))
if err != nil {
return fmt.Errorf("failed to read header: block %d - %x: %w", number, hash, err)
}
body, err := newDB.Get(blockBodyKey(number, hash))
if err != nil {
return fmt.Errorf("failed to read body: block %d - %x: %w", number, hash, err)
}

// transform header and body
newHeader, err := transformHeader(header)
Expand All @@ -108,9 +126,8 @@ func migrateNonAncientBlock(number uint64, hash common.Hash, newDB ethdb.Databas
return fmt.Errorf("failed to transform body: block %d - %x: %w", number, hash, err)
}

if yes, newHash := hasSameHash(newHeader, hash[:]); !yes {
log.Error("Hash mismatch", "block", number, "oldHash", hash, "newHash", newHash)
return fmt.Errorf("hash mismatch at block %d - %x", number, hash)
if err := checkTransformedHeader(newHeader, hash[:], number); err != nil {
return err
}

// write header and body
Expand All @@ -125,3 +142,34 @@ func migrateNonAncientBlock(number uint64, hash common.Hash, newDB ethdb.Databas

return nil
}

// checkOtherDataForNonAncientBlock checks that all the data that is not transformed is succesfully copied for non-ancient blocks.
alecps marked this conversation as resolved.
Show resolved Hide resolved
// I.e. receipts, total difficulty, canonical hash, and block number.
// If an error is returned, it is likely the source directory is corrupted and the migration should be restarted with a clean source directory.
func checkOtherDataForNonAncientBlock(number uint64, hash common.Hash, newDB ethdb.Database) error {
// Ensure receipts and total difficulty are present in non-ancient db
if has, err := newDB.Has(blockReceiptsKey(number, hash)); !has || err != nil {
return fmt.Errorf("failed to find receipts in newDB leveldb: block %d - %x: %w", number, hash, err)
}
if has, err := newDB.Has(headerTDKey(number, hash)); !has || err != nil {
return fmt.Errorf("failed to find total difficulty in newDB leveldb: block %d - %x: %w", number, hash, err)
}
// Ensure canonical hash and number are present in non-ancient db and that they match expected values
hashFromDB, err := newDB.Get(headerHashKey(number))
if err != nil {
return fmt.Errorf("failed to find canonical hash in newDB leveldb: block %d - %x: %w", number, hash, err)
}
if !bytes.Equal(hashFromDB, hash[:]) {
return fmt.Errorf("canonical hash mismatch in newDB leveldb: block %d - %x: %w", number, hash, err)
}
numberFromDB, err := newDB.Get(headerNumberKey(hash))
if err != nil {
return fmt.Errorf("failed to find number for hash in newDB leveldb: block %d - %x: %w", number, hash, err)
}
if !bytes.Equal(numberFromDB, encodeBlockNumber(number)) {
log.Error("Number for hash mismatch", "block", number, "numberFromDB", numberFromDB, "hash", hash)
return fmt.Errorf("number for hash mismatch in newDB leveldb: block %d - %x: %w", number, hash, err)
}

return nil
}
17 changes: 17 additions & 0 deletions op-chain-ops/cmd/celo-migrate/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,23 @@ func hasSameHash(newHeader, oldHash []byte) (bool, common.Hash) {
return bytes.Equal(oldHash, newHash.Bytes()), newHash
}

func checkTransformedHeader(header, expectedHash []byte, expectedNumber uint64) error {
// Check that transformed header has the same hash
if yes, newHash := hasSameHash(header, expectedHash); !yes {
return fmt.Errorf("hash mismatch after transform at block %d - %x. Expected %d, actual %d", expectedNumber, expectedHash, expectedHash, newHash)
}
// Check that transformed header has the same block number
headerDecoded := new(types.Header)
if err := rlp.DecodeBytes(header, &headerDecoded); err != nil {
return err
}
if headerDecoded.Number.Uint64() != expectedNumber {
return fmt.Errorf("block number mismatch after transform at block %d - %x. Expected %d, actual %d", expectedNumber, expectedHash, expectedNumber, headerDecoded.Number.Uint64())
}

return nil
}

// transformBlockBody migrates the block body from the old format to the new format (works with []byte input output)
func transformBlockBody(oldBodyData []byte) ([]byte, error) {
// decode body into celo-blockchain Body structure
Expand Down
Loading