Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(evm-reader): EVM Reader store only non empty epochs #578

Merged
merged 3 commits into from
Aug 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
210 changes: 80 additions & 130 deletions internal/evmreader/evmreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{}
// Check if is there new Inputs for all running Applications
func (r *EvmReader) checkForNewInputs(ctx context.Context) error {

slog.Info("Checking for new inputs")
slog.Debug("Checking for new inputs")

// Get All Applications
apps, err := r.repository.GetAllRunningApplications(ctx)
Expand All @@ -188,7 +188,7 @@ func (r *EvmReader) checkForNewInputs(ctx context.Context) error {
lastProcessedBlock = r.inputBoxDeploymentBlock - 1
}

currentMostRecentFinalizedHeader, err := r.fetchMostRecentHeader(
mostRecentHeader, err := r.fetchMostRecentHeader(
ctx,
r.defaultBlock,
)
Expand All @@ -198,42 +198,42 @@ func (r *EvmReader) checkForNewInputs(ctx context.Context) error {
"error", err)
continue
}
currentMostRecentFinalizedBlockNumber := currentMostRecentFinalizedHeader.Number.Uint64()
mostRecentBlockNumber := mostRecentHeader.Number.Uint64()

if currentMostRecentFinalizedBlockNumber > lastProcessedBlock {
if mostRecentBlockNumber > lastProcessedBlock {

slog.Info("Checking inputs for applications",
"apps", appAddresses,
"last processed block", lastProcessedBlock,
"most recent block", currentMostRecentFinalizedBlockNumber,
"most recent block", mostRecentBlockNumber,
)

err = r.readAndStoreInputs(ctx,
lastProcessedBlock+1,
currentMostRecentFinalizedBlockNumber,
mostRecentBlockNumber,
apps,
)
if err != nil {
slog.Error("Error reading inputs",
"apps", appAddresses,
"last processed block", lastProcessedBlock,
"most recent block", currentMostRecentFinalizedBlockNumber,
"most recent block", mostRecentBlockNumber,
"error", err,
)
continue
}
} else if currentMostRecentFinalizedBlockNumber < lastProcessedBlock {
} else if mostRecentBlockNumber < lastProcessedBlock {
slog.Warn(
"Current most recent block is lower than the last processed one",
"Most recent block is lower than the last processed one",
"apps", appAddresses,
"last processed block", lastProcessedBlock,
"most recent block", currentMostRecentFinalizedBlockNumber,
"most recent block", mostRecentBlockNumber,
)
} else {
slog.Info("Already checked the most recent blocks",
"apps", appAddresses,
"last processed block", lastProcessedBlock,
"most recent block", currentMostRecentFinalizedBlockNumber,
"most recent block", mostRecentBlockNumber,
)
}
}
Expand Down Expand Up @@ -327,81 +327,90 @@ func (r *EvmReader) readAndStoreInputs(
}

// Index Inputs into epochs and handle epoch finalization
INDEX_APP_INPUTS_LOOP:
for address, inputs := range appInputsMap {

epochLength := r.epochLengthCache[address]

//Get this round's first and last Epochs indexes
firstEpochIndex := calculateEpochIndex(epochLength, startBlock)
lastEpochIndex := calculateEpochIndex(epochLength, endBlock)

// Initialize this run's epochs
epochsByIndex, err := r.getOrBuildEpochs(ctx,
firstEpochIndex,
lastEpochIndex,
address,
epochLength)
// Retrieves last open epoch from DB
currentEpoch, err := r.repository.GetEpoch(ctx,
calculateEpochIndex(epochLength, startBlock), address)
if err != nil {
slog.Error("Error building epoch cache",
slog.Error("Error retrieving existing current epoch",
"app", address,
"error", err,
)
continue
}

// Check current epoch status
if currentEpoch != nil && currentEpoch.Status != EpochStatusOpen {
slog.Error("Current epoch is not open",
"app", address,
"epoch-index", currentEpoch.Index,
"status", currentEpoch.Status,
)
continue
}

// Initialize epochs inputs map
var epochInputMap = make(map[*Epoch][]Input)
for _, epoch := range epochsByIndex {
epochInputMap[epoch] = []Input{}
}

// Index Inputs into epochs
lastIndexedInputEpochIndex := firstEpochIndex
for _, input := range inputs {

inputEpochIndex := calculateEpochIndex(epochLength, input.BlockNumber)

slog.Info("Indexing new Input",
// If input belongs into a new epoch, close the previous known one
if currentEpoch != nil && currentEpoch.Index != inputEpochIndex {
currentEpoch.Status = EpochStatusClosed
slog.Info("Closing epoch",
"app", currentEpoch.AppAddress,
"epoch-index", currentEpoch.Index,
"start", currentEpoch.FirstBlock,
"end", currentEpoch.LastBlock)
// Add it to inputMap, so it will be stored
epochInputMap[currentEpoch] = []Input{}
currentEpoch = nil
}
if currentEpoch == nil {
currentEpoch = &Epoch{
Index: inputEpochIndex,
FirstBlock: inputEpochIndex * epochLength,
LastBlock: (inputEpochIndex * epochLength) + epochLength - 1,
Status: EpochStatusOpen,
AppAddress: address,
}
}

slog.Info("Indexing new Input into epoch",
"app", address,
"index", input.Index,
"block", input.BlockNumber,
"epoch", inputEpochIndex)
"epoch-index", inputEpochIndex)

// If input belongs into a new epoch, close the previous ones
if lastIndexedInputEpochIndex != inputEpochIndex {
closeEpochs(epochsByIndex, lastIndexedInputEpochIndex, inputEpochIndex, false)
currentInputs, ok := epochInputMap[currentEpoch]
if !ok {
currentInputs = []Input{}
}

// Check current epoch status
currentEpoch := epochsByIndex[inputEpochIndex]
if currentEpoch.Status != EpochStatusOpen {
slog.Error("Received input but epoch is not open",
"app", address,
"epoch", inputEpochIndex,
"status", currentEpoch.Status,
"input", input.Index,
"block", input.BlockNumber,
)
continue INDEX_APP_INPUTS_LOOP
}

// Index input into the current epoch
epochInputMap[currentEpoch] = append(epochInputMap[currentEpoch], *input)
lastIndexedInputEpochIndex = inputEpochIndex
epochInputMap[currentEpoch] = append(currentInputs, *input)

}

// Indexed all inputs. Close all the previous epochs
shouldCloseLastEpoch := (endBlock == epochsByIndex[lastEpochIndex].LastBlock)
closeEpochs(
epochsByIndex,
lastIndexedInputEpochIndex,
lastEpochIndex,
shouldCloseLastEpoch,
)
// Indexed all inputs. Check if it is time to close this epoch
if currentEpoch != nil && endBlock >= currentEpoch.LastBlock {
currentEpoch.Status = EpochStatusClosed
slog.Info("Closing epoch",
"app", currentEpoch.AppAddress,
"epoch-index", currentEpoch.Index,
"start", currentEpoch.FirstBlock,
"end", currentEpoch.LastBlock)
// Add to inputMap so it is stored
_, ok := epochInputMap[currentEpoch]
if !ok {
epochInputMap[currentEpoch] = []Input{}
}
}

// Store everything
_, _, err = r.repository.StoreEpochAndInputsTransaction(
ctx,
epochInputMap,
Expand All @@ -416,12 +425,19 @@ INDEX_APP_INPUTS_LOOP:
continue
}

slog.Info("Inputs and epochs stored successfully",
"app", address,
"start-block", startBlock,
"end-block", endBlock,
"total inputs", len(inputs),
)
// Store everything
if len(epochInputMap) > 0 {

slog.Debug("Inputs and epochs stored successfully",
"app", address,
"start-block", startBlock,
"end-block", endBlock,
"total epochs", len(epochInputMap),
"total inputs", len(inputs),
)
} else {
slog.Debug("No inputs or epochs to store")
}

}

Expand Down Expand Up @@ -463,43 +479,6 @@ func (r *EvmReader) addAppEpochLengthIntoCache(app Application) error {
return nil
}

// Read epochs from database or build new ones if needed
func (r *EvmReader) getOrBuildEpochs(
ctx context.Context,
firstEpochIndex, lastEpochIndex uint64,
appAddress common.Address,
epochLength uint64,
) (map[uint64]*Epoch, error) {

epochsByIndex := make(map[uint64]*Epoch)
for i := firstEpochIndex; i <= lastEpochIndex; i++ {

//Check if it exists in DB
epoch, err := r.repository.GetEpoch(ctx, i, appAddress)
if err != nil {
return nil, errors.Join(
fmt.Errorf("error retrieving epoch %d", i),
err,
)
}
// Create if needed
if epoch == nil {
firstBlock := i * epochLength
epoch = &Epoch{
Index: i,
FirstBlock: firstBlock,
LastBlock: firstBlock + epochLength - 1,
Status: EpochStatusOpen,
AppAddress: appAddress,
}
}

epochsByIndex[i] = epoch

}
return epochsByIndex, nil
}

// Retrieve ConsensusContract for a given Application
func (r *EvmReader) getIConsensus(app Application) (ConsensusContract, error) {
applicationContract, err := r.contractFactory.NewApplication(app.ContractAddress)
Expand Down Expand Up @@ -557,11 +536,6 @@ func (r *EvmReader) readInputsFromBlockchain(
startBlock, endBlock uint64,
) (map[Address][]*Input, error) {

slog.Info("Reading inputs",
"apps", appsAddresses,
"start", startBlock,
"end", endBlock)

// Initialize app input map
var appInputsMap = make(map[Address][]*Input)
for _, appsAddress := range appsAddresses {
Expand Down Expand Up @@ -605,30 +579,6 @@ func calculateEpochIndex(epochLength uint64, blockNumber uint64) uint64 {
return blockNumber / epochLength
}

// closeEpochs closes all epochs within the given index interval, except for the last.
// If `includeLastEpoch` is true, the last one is included.
func closeEpochs(
epochsByIndex map[uint64]*Epoch,
firstEpochIndex uint64,
lastEpochIndex uint64,
includeLastEpoch bool,
) {

lastIndex := lastEpochIndex
if includeLastEpoch {
lastIndex = lastIndex + 1
}
for i := firstEpochIndex; i < lastIndex; i++ {
epoch := epochsByIndex[i]
slog.Info("Closing epoch",
"app", epoch.AppAddress,
"epoch", i,
"start", epoch.FirstBlock,
"end", epoch.LastBlock)
epoch.Status = EpochStatusClosed
}
}

func appToAddresses(apps []Application) []Address {
var addresses []Address
for _, app := range apps {
Expand Down
Loading
Loading