diff --git a/internal/evmreader/application_adapter.go b/internal/evmreader/application_adapter.go new file mode 100644 index 000000000..4b8becf4a --- /dev/null +++ b/internal/evmreader/application_adapter.go @@ -0,0 +1,55 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package evmreader + +import ( + appcontract "github.com/cartesi/rollups-node/pkg/contracts/application" + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/ethclient" +) + +// IConsensus Wrapper +type ApplicationContractAdapter struct { + application *appcontract.Application +} + +func NewApplicationContractAdapter( + appAddress common.Address, + client *ethclient.Client, +) (*ApplicationContractAdapter, error) { + applicationContract, err := appcontract.NewApplication(appAddress, client) + if err != nil { + return nil, err + } + return &ApplicationContractAdapter{ + application: applicationContract, + }, nil +} + +func (a *ApplicationContractAdapter) GetConsensus(opts *bind.CallOpts) (common.Address, error) { + return a.application.GetConsensus(opts) +} + +func (a *ApplicationContractAdapter) RetrieveOutputExecutionEvents( + opts *bind.FilterOpts, +) ([]*appcontract.ApplicationOutputExecuted, error) { + + itr, err := a.application.FilterOutputExecuted(opts) + if err != nil { + return nil, err + } + defer itr.Close() + + var events []*appcontract.ApplicationOutputExecuted + for itr.Next() { + outputExecutedEvent := itr.Event + events = append(events, outputExecutedEvent) + } + err = itr.Error() + if err != nil { + return nil, err + } + return events, nil +} diff --git a/internal/evmreader/evmreader.go b/internal/evmreader/evmreader.go index 6f4b7ad2f..94e7cfad1 100644 --- a/internal/evmreader/evmreader.go +++ b/internal/evmreader/evmreader.go @@ -13,6 +13,7 @@ import ( "slices" . "github.com/cartesi/rollups-node/internal/node/model" + appcontract "github.com/cartesi/rollups-node/pkg/contracts/application" "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/cartesi/rollups-node/pkg/contracts/inputbox" "github.com/ethereum/go-ethereum" @@ -41,10 +42,14 @@ type EvmReaderRepository interface { GetNodeConfig(ctx context.Context) (*NodePersistentConfig, error) GetEpoch(ctx context.Context, indexKey uint64, appAddressKey Address) (*Epoch, error) GetPreviousSubmittedClaims(ctx context.Context, app Address, lastBlock uint64) ([]Epoch, error) - StoreClaimsTransaction(ctx context.Context, - app Address, - claims []*Epoch, - mostRecentBlockNumber uint64, + StoreClaimsTransaction( + ctx context.Context, app Address, claims []*Epoch, mostRecentBlockNumber uint64, + ) error + GetOutput( + ctx context.Context, indexKey uint64, appAddressKey Address, + ) (*Output, error) + UpdateOutputExecutionTransaction( + ctx context.Context, app Address, executedOutputs []*Output, blockNumber uint64, ) error } @@ -68,6 +73,9 @@ type ConsensusContract interface { type ApplicationContract interface { GetConsensus(opts *bind.CallOpts) (Address, error) + RetrieveOutputExecutionEvents( + opts *bind.FilterOpts, + ) ([]*appcontract.ApplicationOutputExecuted, error) } type ContractFactory interface { @@ -211,6 +219,8 @@ func (r *EvmReader) watchForNewBlocks(ctx context.Context, ready chan<- struct{} r.checkForClaimStatus(ctx, apps, mostRecentBlockNumber) + r.checkForOutputExecution(ctx, apps, mostRecentBlockNumber) + } } } diff --git a/internal/evmreader/evmreader_test.go b/internal/evmreader/evmreader_test.go index 216ee00dd..2e45b86d1 100644 --- a/internal/evmreader/evmreader_test.go +++ b/internal/evmreader/evmreader_test.go @@ -13,6 +13,7 @@ import ( "time" . "github.com/cartesi/rollups-node/internal/node/model" + appcontract "github.com/cartesi/rollups-node/pkg/contracts/application" "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" "github.com/cartesi/rollups-node/pkg/contracts/inputbox" "github.com/ethereum/go-ethereum" @@ -449,6 +450,28 @@ func newMockRepository() *MockRepository { mock.Anything, ).Return(nil) + repo.On("UpdateOutputExecutionTransaction", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything).Return(nil) + + outputHash := common.HexToHash("0xAABBCCDDEE") + repo.On("GetOutput", + mock.Anything, + 0, + common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E")).Return( + &Output{ + Id: 1, + Index: 0, + RawData: common.Hex2Bytes("0xdeadbeef"), + Hash: &outputHash, + InputId: 1, + OutputHashesSiblings: nil, + TransactionHash: nil, + }, + ) + return repo } @@ -520,6 +543,20 @@ func (m *MockRepository) StoreClaimsTransaction(ctx context.Context, return args.Error(0) } +func (m *MockRepository) GetOutput( + ctx context.Context, indexKey uint64, appAddressKey Address, +) (*Output, error) { + args := m.Called(ctx, indexKey, appAddressKey) + return args.Get(0).(*Output), args.Error(1) +} + +func (m *MockRepository) UpdateOutputExecutionTransaction( + ctx context.Context, app Address, executedOutputs []*Output, blockNumber uint64, +) error { + args := m.Called(ctx, app, executedOutputs, blockNumber) + return args.Error(0) +} + type MockApplicationContract struct { mock.Mock } @@ -539,6 +576,13 @@ func (m *MockApplicationContract) GetConsensus( return args.Get(0).(common.Address), args.Error(1) } +func (m *MockApplicationContract) RetrieveOutputExecutionEvents( + opts *bind.FilterOpts, +) ([]*appcontract.ApplicationOutputExecuted, error) { + args := m.Called(opts) + return args.Get(0).([]*appcontract.ApplicationOutputExecuted), args.Error(1) +} + type MockIConsensusContract struct { mock.Mock } @@ -591,6 +635,9 @@ func newEmvReaderContractFactory() *MockEvmReaderContractFactory { mock.Anything, ).Return(common.HexToAddress("0xdeadbeef"), nil) + applicationContract.On("RetrieveOutputExecutionEvents", + mock.Anything).Return([]*appcontract.ApplicationOutputExecuted{}, nil) + consensusContract := &MockIConsensusContract{} consensusContract.On("GetEpochLength", diff --git a/internal/evmreader/input_test.go b/internal/evmreader/input_test.go index a5776c99d..223733900 100644 --- a/internal/evmreader/input_test.go +++ b/internal/evmreader/input_test.go @@ -121,84 +121,6 @@ func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocks() { ) } -func (s *EvmReaderSuite) TestItReadsInputsFromNewBlocksWrongIConsensus() { - - wsClient := FakeWSEhtClient{} - - evmReader := NewEvmReader( - s.client, - &wsClient, - s.inputBox, - s.repository, - 0x10, - DefaultBlockStatusLatest, - s.contractFactory, - ) - - // Prepare repository - s.repository.Unset("GetAllRunningApplications") - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xFFFFFFFF"), - LastProcessedBlock: 0x00, - }}, nil).Once() - s.repository.On( - "GetAllRunningApplications", - mock.Anything, - ).Return([]Application{{ - ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), - IConsensusAddress: common.HexToAddress("0xFFFFFFFF"), - LastProcessedBlock: 0x11, - }}, nil).Once() - - // Prepare Client - s.client.Unset("HeaderByNumber") - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header0, nil).Once() - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header1, nil).Once() - s.client.On( - "HeaderByNumber", - mock.Anything, - mock.Anything, - ).Return(&header2, nil).Once() - - // Start service - ready := make(chan struct{}, 1) - errChannel := make(chan error, 1) - - go func() { - errChannel <- evmReader.Run(s.ctx, ready) - }() - - select { - case <-ready: - break - case err := <-errChannel: - s.FailNow("unexpected error signal", err) - } - - wsClient.fireNewHead(&header0) - wsClient.fireNewHead(&header1) - time.Sleep(time.Second) - - s.inputBox.AssertNumberOfCalls(s.T(), "RetrieveInputs", 0) - s.repository.AssertNumberOfCalls( - s.T(), - "StoreEpochAndInputsTransaction", - 0, - ) -} - func (s *EvmReaderSuite) TestItUpdatesLastProcessedBlockWhenThereIsNoInputs() { wsClient := FakeWSEhtClient{} diff --git a/internal/evmreader/output.go b/internal/evmreader/output.go new file mode 100644 index 000000000..d763636d2 --- /dev/null +++ b/internal/evmreader/output.go @@ -0,0 +1,111 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package evmreader + +import ( + "bytes" + "context" + "log/slog" + + . "github.com/cartesi/rollups-node/internal/node/model" + "github.com/ethereum/go-ethereum/accounts/abi/bind" +) + +func (r *EvmReader) checkForOutputExecution( + ctx context.Context, + apps []application, + mostRecentBlockNumber uint64, +) { + + appAddresses := appToAddresses(apps) + + slog.Debug("Checking for new Output Executed Events", "apps", appAddresses) + + for _, app := range apps { + + LastOutputCheck := app.LastOutputCheckBlock + + // Safeguard: Only check blocks starting from the block where the InputBox + // contract was deployed as Inputs can be added to that same block + if LastOutputCheck < r.inputBoxDeploymentBlock { + LastOutputCheck = r.inputBoxDeploymentBlock + } + + if mostRecentBlockNumber > LastOutputCheck { + + slog.Info("Checking output execution for applications", + "apps", appAddresses, + "last output check block", LastOutputCheck, + "most recent block", mostRecentBlockNumber) + + r.readAndUpdateOutputs(ctx, app, LastOutputCheck, mostRecentBlockNumber) + + } else if mostRecentBlockNumber < LastOutputCheck { + slog.Warn( + "Not reading output execution: most recent block is lower than the last processed one", //nolint:lll + "apps", appAddresses, + "last output check block", LastOutputCheck, + "most recent block", mostRecentBlockNumber, + ) + } else { + slog.Info("Not reading output execution: already checked the most recent blocks", + "apps", appAddresses, + "last output check block", LastOutputCheck, + "most recent block", mostRecentBlockNumber, + ) + } + } + +} + +func (r *EvmReader) readAndUpdateOutputs( + ctx context.Context, app application, lastOutputCheck, mostRecentBlockNumber uint64) { + + contract := app.applicationContract + + opts := &bind.FilterOpts{ + Start: lastOutputCheck + 1, + End: &mostRecentBlockNumber, + } + + outputExecutedEvents, err := contract.RetrieveOutputExecutionEvents(opts) + if err != nil { + slog.Error("Error reading output events", "app", app.ContractAddress, "error", err) + return + } + + // Should we check the output hash?? + var executedOutputs []*Output + for _, event := range outputExecutedEvents { + + // Compare output to check it is the correct one + output, err := r.repository.GetOutput(ctx, event.OutputIndex, app.ContractAddress) + if err != nil { + slog.Error("Error retrieving output", + "app", app.ContractAddress, "index", event.OutputIndex, "error", err) + return + } + + if !bytes.Equal(output.RawData, event.Output) { + slog.Debug("Actual output differs from event's", + "app", app.ContractAddress, "index", event.OutputIndex, + "actual", output.RawData, "event's", event.Output) + + slog.Error("Output mismatch", "app", app.ContractAddress, "index", event.OutputIndex) + + return + } + + slog.Info("Output executed", "app", app, "index", event.OutputIndex) + output.TransactionHash = &event.Raw.TxHash + executedOutputs = append(executedOutputs, output) + } + + err = r.repository.UpdateOutputExecutionTransaction( + ctx, app.ContractAddress, executedOutputs, mostRecentBlockNumber) + if err != nil { + slog.Error("Error storing output execution statuses", "app", app, "error", err) + } + +} diff --git a/internal/evmreader/output_test.go b/internal/evmreader/output_test.go new file mode 100644 index 000000000..c7b431477 --- /dev/null +++ b/internal/evmreader/output_test.go @@ -0,0 +1,278 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package evmreader + +import ( + "time" + + . "github.com/cartesi/rollups-node/internal/node/model" + appcontract "github.com/cartesi/rollups-node/pkg/contracts/application" + "github.com/cartesi/rollups-node/pkg/contracts/iconsensus" + "github.com/cartesi/rollups-node/pkg/contracts/inputbox" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/mock" +) + +func (s *EvmReaderSuite) TestNoClaimsOutputExecution() { + + wsClient := FakeWSEhtClient{} + + //New EVM Reader + evmReader := NewEvmReader( + s.client, + &wsClient, + s.inputBox, + s.repository, + 0x10, + DefaultBlockStatusLatest, + s.contractFactory, + ) + + // Prepare repository + s.repository.Unset("GetAllRunningApplications") + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastOutputCheckBlock: 0x10, + }}, nil).Once() + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E"), + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastOutputCheckBlock: 0x11, + }}, nil).Once() + + s.repository.Unset("UpdateOutputExecutionTransaction") + s.repository.On("UpdateOutputExecutionTransaction", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Once().Run(func(arguments mock.Arguments) { + obj := arguments.Get(2) + claims, ok := obj.([]*Output) + s.Require().True(ok) + s.Require().Equal(0, len(claims)) + + obj = arguments.Get(3) + lastOutputCheck, ok := obj.(uint64) + s.Require().True(ok) + s.Require().Equal(uint64(17), lastOutputCheck) + + }).Return(nil) + s.repository.On("UpdateOutputExecutionTransaction", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Once().Run(func(arguments mock.Arguments) { + obj := arguments.Get(2) + claims, ok := obj.([]*Output) + s.Require().True(ok) + s.Require().Equal(0, len(claims)) + + obj = arguments.Get(3) + lastOutputCheck, ok := obj.(uint64) + s.Require().True(ok) + s.Require().Equal(uint64(18), lastOutputCheck) + + }).Return(nil) + + //No Inputs + s.inputBox.Unset("RetrieveInputs") + s.inputBox.On("RetrieveInputs", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return([]inputbox.InputBoxInputAdded{}, nil) + + // Prepare Client + s.client.Unset("HeaderByNumber") + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header0, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header1, nil).Once() + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header2, nil).Once() + + // Start service + ready := make(chan struct{}, 1) + errChannel := make(chan error, 1) + + go func() { + errChannel <- evmReader.Run(s.ctx, ready) + }() + + select { + case <-ready: + break + case err := <-errChannel: + s.FailNow("unexpected error signal", err) + } + + wsClient.fireNewHead(&header0) + wsClient.fireNewHead(&header1) + time.Sleep(1 * time.Second) + + s.repository.AssertNumberOfCalls( + s.T(), + "UpdateOutputExecutionTransaction", + 2, + ) + +} + +func (s *EvmReaderSuite) TestReadOutputExecution() { + + appAddress := common.HexToAddress("0x2E663fe9aE92275242406A185AA4fC8174339D3E") + + // Contract Factory + + applicationContract := &MockApplicationContract{} + + contractFactory := newEmvReaderContractFactory() + + contractFactory.Unset("NewApplication") + contractFactory.On("NewApplication", + mock.Anything, + ).Return(applicationContract, nil) + + //New EVM Reader + wsClient := FakeWSEhtClient{} + evmReader := NewEvmReader( + s.client, + &wsClient, + s.inputBox, + s.repository, + 0x00, + DefaultBlockStatusLatest, + contractFactory, + ) + + // Prepare Claims Acceptance Events + + outputExecution0 := &appcontract.ApplicationOutputExecuted{ + OutputIndex: 1, + Output: common.Hex2Bytes("0xAABBCCDDEE"), + Raw: types.Log{ + TxHash: common.HexToHash("0xdeadbeef"), + }, + } + + outputExecutionEvents := []*appcontract.ApplicationOutputExecuted{outputExecution0} + applicationContract.On("RetrieveOutputExecutionEvents", + mock.Anything, + ).Return(outputExecutionEvents, nil).Once() + applicationContract.On("RetrieveOutputExecutionEvents", + mock.Anything, + ).Return([]*iconsensus.IConsensusClaimAcceptance{}, nil) + + applicationContract.On("GetConsensus", + mock.Anything, + ).Return(common.HexToAddress("0xdeadbeef"), nil) + + // Prepare repository + s.repository.Unset("GetAllRunningApplications") + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: appAddress, + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastOutputCheckBlock: 0x10, + }}, nil).Once() + s.repository.On( + "GetAllRunningApplications", + mock.Anything, + ).Return([]Application{{ + ContractAddress: appAddress, + IConsensusAddress: common.HexToAddress("0xdeadbeef"), + LastOutputCheckBlock: 0x11, + }}, nil).Once() + + output := &Output{ + Index: 1, + RawData: common.Hex2Bytes("0xAABBCCDDEE"), + } + + s.repository.Unset("GetOutput") + s.repository.On("GetOutput", + mock.Anything, + mock.Anything, + mock.Anything).Return(output, nil) + + s.repository.Unset("UpdateOutputExecutionTransaction") + s.repository.On("UpdateOutputExecutionTransaction", + mock.Anything, + mock.Anything, + mock.Anything, + mock.Anything, + ).Once().Run(func(arguments mock.Arguments) { + obj := arguments.Get(2) + outputs, ok := obj.([]*Output) + s.Require().True(ok) + s.Require().Equal(1, len(outputs)) + output := outputs[0] + s.Require().NotNil(output) + s.Require().Equal(uint64(1), output.Index) + s.Require().Equal(common.HexToHash("0xdeadbeef"), *output.TransactionHash) + + }).Return(nil) + + //No Inputs + s.inputBox.Unset("RetrieveInputs") + s.inputBox.On("RetrieveInputs", + mock.Anything, + mock.Anything, + mock.Anything, + ).Return([]inputbox.InputBoxInputAdded{}, nil) + + // Prepare Client + s.client.Unset("HeaderByNumber") + s.client.On( + "HeaderByNumber", + mock.Anything, + mock.Anything, + ).Return(&header0, nil).Once() + + // Start service + ready := make(chan struct{}, 1) + errChannel := make(chan error, 1) + + go func() { + errChannel <- evmReader.Run(s.ctx, ready) + }() + + select { + case <-ready: + break + case err := <-errChannel: + s.FailNow("unexpected error signal", err) + } + + wsClient.fireNewHead(&header0) + time.Sleep(1 * time.Second) + + s.repository.AssertNumberOfCalls( + s.T(), + "UpdateOutputExecutionTransaction", + 1, + ) + +} diff --git a/internal/evmreader/retrypolicy/contratfactory.go b/internal/evmreader/retrypolicy/contratfactory.go index 69384886e..3b399126e 100644 --- a/internal/evmreader/retrypolicy/contratfactory.go +++ b/internal/evmreader/retrypolicy/contratfactory.go @@ -6,7 +6,6 @@ import ( "time" "github.com/cartesi/rollups-node/internal/evmreader" - "github.com/cartesi/rollups-node/pkg/contracts/application" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" ) @@ -40,7 +39,7 @@ func (f *EvmReaderContractFactory) NewApplication( // Building a contract does not fail due to network errors. // No need to retry this operation - applicationContract, err := application.NewApplication(address, f.ethClient) + applicationContract, err := evmreader.NewApplicationContractAdapter(address, f.ethClient) if err != nil { return nil, err } diff --git a/internal/evmreader/retrypolicy/retrypolicy_application_delegator.go b/internal/evmreader/retrypolicy/retrypolicy_application_delegator.go index 54b0c318a..cdb60ce92 100644 --- a/internal/evmreader/retrypolicy/retrypolicy_application_delegator.go +++ b/internal/evmreader/retrypolicy/retrypolicy_application_delegator.go @@ -8,6 +8,7 @@ import ( "github.com/cartesi/rollups-node/internal/evmreader" "github.com/cartesi/rollups-node/internal/retry" + "github.com/cartesi/rollups-node/pkg/contracts/application" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" ) @@ -36,6 +37,17 @@ func (d *ApplicationRetryPolicyDelegator) GetConsensus(opts *bind.CallOpts, opts, d.maxRetries, d.delayBetweenCalls, - "Consensus::GetEpochLength", + "Application::GetConsensus", + ) +} + +func (d *ApplicationRetryPolicyDelegator) RetrieveOutputExecutionEvents( + opts *bind.FilterOpts, +) ([]*application.ApplicationOutputExecuted, error) { + return retry.CallFunctionWithRetryPolicy(d.delegate.RetrieveOutputExecutionEvents, + opts, + d.maxRetries, + d.delayBetweenCalls, + "Application::RetrieveOutputExecutionEvents", ) } diff --git a/internal/node/model/models.go b/internal/node/model/models.go index f711ddf03..527376c65 100644 --- a/internal/node/model/models.go +++ b/internal/node/model/models.go @@ -62,13 +62,14 @@ type NodePersistentConfig struct { } type Application struct { - Id uint64 - ContractAddress Address - TemplateHash Hash - LastProcessedBlock uint64 - Status ApplicationStatus - IConsensusAddress Address - LastClaimCheckBlock uint64 + Id uint64 + ContractAddress Address + TemplateHash Hash + LastProcessedBlock uint64 + Status ApplicationStatus + IConsensusAddress Address + LastClaimCheckBlock uint64 + LastOutputCheckBlock uint64 } type Epoch struct { @@ -101,6 +102,7 @@ type Output struct { Hash *Hash OutputHashesSiblings []Hash InputId uint64 + TransactionHash *Hash } type Report struct { diff --git a/internal/repository/base.go b/internal/repository/base.go index 822909dae..b967b7066 100644 --- a/internal/repository/base.go +++ b/internal/repository/base.go @@ -91,6 +91,7 @@ func (pg *Database) InsertApplication( template_hash, last_processed_block, last_claim_check_block, + last_output_check_block, status, iconsensus_address) VALUES @@ -98,16 +99,18 @@ func (pg *Database) InsertApplication( @templateHash, @lastProcessedBlock, @lastClaimCheckBlock, + @lastOutputCheckBlock, @status, @iConsensusAddress)` args := pgx.NamedArgs{ - "contractAddress": app.ContractAddress, - "templateHash": app.TemplateHash, - "lastProcessedBlock": app.LastProcessedBlock, - "lastClaimCheckBlock": app.LastClaimCheckBlock, - "status": app.Status, - "iConsensusAddress": app.IConsensusAddress, + "contractAddress": app.ContractAddress, + "templateHash": app.TemplateHash, + "lastProcessedBlock": app.LastProcessedBlock, + "lastClaimCheckBlock": app.LastClaimCheckBlock, + "lastOutputCheckBlock": app.LastOutputCheckBlock, + "status": app.Status, + "iConsensusAddress": app.IConsensusAddress, } _, err := pg.db.Exec(ctx, query, args) @@ -219,12 +222,14 @@ func (pg *Database) InsertOutput( (index, raw_data, output_hashes_siblings, - input_id) + input_id, + transaction_hash) VALUES (@index, @rawData, @outputHashesSiblings, - @inputId) + @inputId, + @transactionHash) RETURNING id ` @@ -234,6 +239,7 @@ func (pg *Database) InsertOutput( "index": output.Index, "rawData": output.RawData, "outputHashesSiblings": output.OutputHashesSiblings, + "transactionHash": output.TransactionHash, } var id uint64 @@ -346,13 +352,14 @@ func (pg *Database) GetApplication( appAddressKey Address, ) (*Application, error) { var ( - id uint64 - contractAddress Address - templateHash Hash - lastProcessedBlock uint64 - lastClaimCheckBlock uint64 - status ApplicationStatus - iconsensusAddress Address + id uint64 + contractAddress Address + templateHash Hash + lastProcessedBlock uint64 + lastClaimCheckBlock uint64 + lastOutputCheckBlock uint64 + status ApplicationStatus + iconsensusAddress Address ) query := ` @@ -362,6 +369,7 @@ func (pg *Database) GetApplication( template_hash, last_processed_block, last_claim_check_block, + last_output_check_block, status, iconsensus_address FROM @@ -379,6 +387,7 @@ func (pg *Database) GetApplication( &templateHash, &lastProcessedBlock, &lastClaimCheckBlock, + &lastOutputCheckBlock, &status, &iconsensusAddress, ) @@ -393,13 +402,14 @@ func (pg *Database) GetApplication( } app := Application{ - Id: id, - ContractAddress: contractAddress, - TemplateHash: templateHash, - LastProcessedBlock: lastProcessedBlock, - LastClaimCheckBlock: lastClaimCheckBlock, - Status: status, - IConsensusAddress: iconsensusAddress, + Id: id, + ContractAddress: contractAddress, + TemplateHash: templateHash, + LastProcessedBlock: lastProcessedBlock, + LastClaimCheckBlock: lastClaimCheckBlock, + LastOutputCheckBlock: lastOutputCheckBlock, + Status: status, + IConsensusAddress: iconsensusAddress, } return &app, nil @@ -564,6 +574,7 @@ func (pg *Database) GetOutput( hash *Hash outputHashesSiblings []Hash inputId uint64 + transactionHash *Hash ) query := ` @@ -573,7 +584,8 @@ func (pg *Database) GetOutput( o.raw_data, o.hash, o.output_hashes_siblings, - o.input_id + o.input_id, + o.transaction_hash FROM output o INNER JOIN @@ -595,6 +607,7 @@ func (pg *Database) GetOutput( &hash, &outputHashesSiblings, &inputId, + &transactionHash, ) if err != nil { if errors.Is(err, pgx.ErrNoRows) { @@ -614,6 +627,7 @@ func (pg *Database) GetOutput( Hash: hash, OutputHashesSiblings: outputHashesSiblings, InputId: inputId, + TransactionHash: transactionHash, } return &output, nil diff --git a/internal/repository/evmreader.go b/internal/repository/evmreader.go index d58914de5..8de6505bd 100644 --- a/internal/repository/evmreader.go +++ b/internal/repository/evmreader.go @@ -160,14 +160,15 @@ func (pg *Database) getAllApplicationsByStatus( criteria *ApplicationStatus, ) ([]Application, error) { var ( - id uint64 - contractAddress Address - templateHash Hash - lastProcessedBlock uint64 - lastClaimCheckBlock uint64 - status ApplicationStatus - iConsensusAddress Address - results []Application + id uint64 + contractAddress Address + templateHash Hash + lastProcessedBlock uint64 + lastClaimCheckBlock uint64 + lastOutputCheckBlock uint64 + status ApplicationStatus + iConsensusAddress Address + results []Application ) query := ` @@ -177,6 +178,7 @@ func (pg *Database) getAllApplicationsByStatus( template_hash, last_processed_block, last_claim_check_block, + last_output_check_block, status, iconsensus_address FROM @@ -196,16 +198,18 @@ func (pg *Database) getAllApplicationsByStatus( _, err = pgx.ForEachRow(rows, []any{&id, &contractAddress, &templateHash, - &lastProcessedBlock, &lastClaimCheckBlock, &status, &iConsensusAddress}, + &lastProcessedBlock, &lastClaimCheckBlock, &lastOutputCheckBlock, + &status, &iConsensusAddress}, func() error { app := Application{ - Id: id, - ContractAddress: contractAddress, - TemplateHash: templateHash, - LastProcessedBlock: lastProcessedBlock, - LastClaimCheckBlock: lastClaimCheckBlock, - Status: status, - IConsensusAddress: iConsensusAddress, + Id: id, + ContractAddress: contractAddress, + TemplateHash: templateHash, + LastProcessedBlock: lastProcessedBlock, + LastClaimCheckBlock: lastClaimCheckBlock, + LastOutputCheckBlock: lastOutputCheckBlock, + Status: status, + IConsensusAddress: iConsensusAddress, } results = append(results, app) return nil @@ -371,3 +375,68 @@ func (pg *Database) StoreClaimsTransaction( return nil } + +func (pg *Database) UpdateOutputExecutionTransaction( + ctx context.Context, + app Address, + executedOutputs []*Output, + blockNumber uint64, +) error { + + var errUpdateOutputs = errors.New("unable to update outputs") + + tx, err := pg.db.Begin(ctx) + if err != nil { + return errors.Join(errUpdateOutputs, err) + } + + updateOutputQuery := ` + UPDATE output + SET + transaction_hash = @hash + WHERE + id = @id + ` + + for _, output := range executedOutputs { + updateOutputArgs := pgx.NamedArgs{ + "hash": output.TransactionHash, + "id": output.Id, + } + + tag, err := tx.Exec(ctx, updateOutputQuery, updateOutputArgs) + if err != nil { + return errors.Join(errUpdateOutputs, err, tx.Rollback(ctx)) + } + if tag.RowsAffected() != 1 { + return errors.Join(errUpdateOutputs, + fmt.Errorf("no rows affected when updating output %d from app %s", output.Index, app), + tx.Rollback(ctx)) + } + } + + // Update last processed block + updateLastBlockQuery := ` + UPDATE application + SET last_output_check_block = @blockNumber + WHERE + contract_address=@contractAddress` + + updateLastBlockArgs := pgx.NamedArgs{ + "blockNumber": blockNumber, + "contractAddress": app, + } + + _, err = tx.Exec(ctx, updateLastBlockQuery, updateLastBlockArgs) + if err != nil { + return errors.Join(errUpdateOutputs, err, tx.Rollback(ctx)) + } + + // Commit transaction + err = tx.Commit(ctx) + if err != nil { + return errors.Join(errUpdateOutputs, err, tx.Rollback(ctx)) + } + + return nil +} diff --git a/internal/repository/evmreader_test.go b/internal/repository/evmreader_test.go index e363bfdb6..803c3dc18 100644 --- a/internal/repository/evmreader_test.go +++ b/internal/repository/evmreader_test.go @@ -198,3 +198,28 @@ func (s *RepositorySuite) TestStoreClaimsTransaction() { s.Require().Equal(uint64(499), application.LastClaimCheckBlock) } + +func (s *RepositorySuite) TestUpdateOutputExecutionTransaction() { + output, err := s.database.GetOutput(s.ctx, 1, common.HexToAddress("deadbeef")) + s.Require().Nil(err) + s.Require().NotNil(output) + + var executedOutputs []*Output + hash := common.HexToHash("0xAABBCCDD") + output.TransactionHash = &hash + + executedOutputs = append(executedOutputs, output) + + err = s.database.UpdateOutputExecutionTransaction( + s.ctx, common.HexToAddress("deadbeef"), executedOutputs, 854758) + s.Require().Nil(err) + + actualOutput, err := s.database.GetOutput(s.ctx, 1, common.HexToAddress("deadbeef")) + s.Require().Nil(err) + s.Require().Equal(output, actualOutput) + + application, err := s.database.GetApplication(s.ctx, common.HexToAddress("deadbeef")) + s.Require().Nil(err) + s.Require().Equal(uint64(854758), application.LastOutputCheckBlock) + +} diff --git a/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql b/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql index 0cf967811..e6abed186 100644 --- a/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql +++ b/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql @@ -38,6 +38,7 @@ CREATE TABLE "application" "status" "ApplicationStatus" NOT NULL, "iconsensus_address" BYTEA NOT NULL, "last_claim_check_block" NUMERIC(20,0) NOT NULL CHECK ("last_claim_check_block" >= 0 AND "last_claim_check_block" <= f_maxuint64()), + "last_output_check_block" NUMERIC(20,0) NOT NULL CHECK ("last_output_check_block" >= 0 AND "last_output_check_block" <= f_maxuint64()), CONSTRAINT "application_pkey" PRIMARY KEY ("id"), UNIQUE("contract_address") ); @@ -87,6 +88,7 @@ CREATE TABLE "output" "hash" BYTEA, "output_hashes_siblings" BYTEA[], "input_id" BIGINT NOT NULL, + "transaction_hash" BYTEA, CONSTRAINT "output_pkey" PRIMARY KEY ("id"), CONSTRAINT "output_input_id_fkey" FOREIGN KEY ("input_id") REFERENCES "input"("id") ); diff --git a/internal/repository/schema/migrations/000002_create_postgraphile_view.up.sql b/internal/repository/schema/migrations/000002_create_postgraphile_view.up.sql index 5cd3bcdf6..0ea36662d 100644 --- a/internal/repository/schema/migrations/000002_create_postgraphile_view.up.sql +++ b/internal/repository/schema/migrations/000002_create_postgraphile_view.up.sql @@ -45,6 +45,7 @@ CREATE OR REPLACE VIEW graphql."outputs" AS o."index", o."raw_data", o."output_hashes_siblings", + o."transaction_hash", i."index" as "input_index" FROM "output" o