diff --git a/cmd/cartesi-rollups-advancer/main.go b/cmd/cartesi-rollups-advancer/main.go new file mode 100644 index 000000000..6eccd072f --- /dev/null +++ b/cmd/cartesi-rollups-advancer/main.go @@ -0,0 +1,105 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package main + +import ( + "context" + "fmt" + "log/slog" + "os" + "os/signal" + "syscall" + "time" + + "github.com/cartesi/rollups-node/internal/node/advancer" + "github.com/cartesi/rollups-node/internal/node/advancer/machines" + "github.com/cartesi/rollups-node/internal/node/config" + "github.com/cartesi/rollups-node/internal/node/startup" + "github.com/cartesi/rollups-node/internal/repository" + "github.com/spf13/cobra" +) + +const CMD_NAME = "advancer" + +var ( + buildVersion = "devel" + Cmd = &cobra.Command{ + Use: CMD_NAME, + Short: "Runs the Advancer", + Long: "Runs the Advancer in standalone mode", + Run: run, + } +) + +func init() { + flags := Cmd.Flags() + flags.BytesHex("application-address", nil, "") + flags.String("server-address", "", "") + flags.String("snapshot", "", "") + flags.Int64("snapshot-input-index", -1, "") + flags.Uint64("machine-inc-cycles", 50_000_000, "") + flags.Uint64("machine-max-cycles", 5_000_000_000, "") + flags.Uint64("machine-advance-timeout", 60, "") + flags.Uint64("machine-inspect-timeout", 10, "") +} + +func main() { + err := Cmd.Execute() + if err != nil { + os.Exit(1) + } +} + +func getDatabase(ctx context.Context, c config.NodeConfig) (*repository.Database, error) { + err := startup.ValidateSchema(c) + if err != nil { + return nil, fmt.Errorf("invalid database schema: %w", err) + } + + database, err := repository.Connect(ctx, c.PostgresEndpoint.Value) + if err != nil { + return nil, fmt.Errorf("failed to connect to the database: %w", err) + } + + return database, nil +} + +func run(cmd *cobra.Command, args []string) { + startTime := time.Now() + + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + c := config.FromEnv() + startup.ConfigLogs(c) + + slog.Info("Starting the Cartesi Rollups Node Advancer", "version", buildVersion, "config", c) + + database, err := getDatabase(ctx, c) + if err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + defer database.Close() + + repo := &repository.AdvancerRepository{Database: database} + + machines, err := machines.Load(ctx, c, repo) + if err != nil { + slog.Error(err.Error()) + os.Exit(1) + } + defer machines.Close() + + advancer, err := advancer.New(machines, repo) + + poller, err := advancer.Poller(5 * time.Second) + + ready := make(chan struct{}, 1) + + if err := poller.Start(ctx, ready); err != nil { + slog.Error("advancer exited with an error", "error", err) + os.Exit(1) + } +} diff --git a/cmd/cartesi-rollups-cli/root/app/add/add.go b/cmd/cartesi-rollups-cli/root/app/add/add.go index 0545c88b9..1d622e3de 100644 --- a/cmd/cartesi-rollups-cli/root/app/add/add.go +++ b/cmd/cartesi-rollups-cli/root/app/add/add.go @@ -118,7 +118,7 @@ func run(cmd *cobra.Command, args []string) { IConsensusAddress: common.HexToAddress(iConsensusAddress), } - err := cmdcommom.Database.InsertApplication(ctx, &application) + _, err := cmdcommom.Database.InsertApplication(ctx, &application) cobra.CheckErr(err) fmt.Printf("Application %v successfully added\n", application.ContractAddress) } diff --git a/internal/node/advancer/advancer.go b/internal/node/advancer/advancer.go index 32ef675ab..9b9c20d80 100644 --- a/internal/node/advancer/advancer.go +++ b/internal/node/advancer/advancer.go @@ -10,6 +10,7 @@ import ( "log/slog" "time" + "github.com/cartesi/rollups-node/internal/node/advancer/machines" "github.com/cartesi/rollups-node/internal/node/advancer/poller" . "github.com/cartesi/rollups-node/internal/node/model" "github.com/cartesi/rollups-node/internal/nodemachine" @@ -24,19 +25,19 @@ var ( ) type Advancer struct { - machines Machines - repository Repository + machines Machines + repo Repository } // New instantiates a new Advancer. -func New(machines Machines, repository Repository) (*Advancer, error) { +func New(machines Machines, repo Repository) (*Advancer, error) { if machines == nil { return nil, ErrInvalidMachines } - if repository == nil { + if repo == nil { return nil, ErrInvalidRepository } - return &Advancer{machines: machines, repository: repository}, nil + return &Advancer{machines: machines, repo: repo}, nil } // Poller instantiates a new poller.Poller using the Advancer. @@ -49,11 +50,11 @@ func (advancer *Advancer) Poller(pollingInterval time.Duration) (*poller.Poller, // runs them through the cartesi machine, // and updates the repository with the ouputs. func (advancer *Advancer) Step(ctx context.Context) error { - apps := keysFrom(advancer.machines) + apps := advancer.machines.Keys() // Gets the unprocessed inputs (of all apps) from the repository. slog.Info("advancer: getting unprocessed inputs") - inputs, err := advancer.repository.GetUnprocessedInputs(ctx, apps) + inputs, err := advancer.repo.GetUnprocessedInputs(ctx, apps) if err != nil { return err } @@ -67,14 +68,22 @@ func (advancer *Advancer) Step(ctx context.Context) error { } } + // Updates the status of the epochs. + for _, app := range apps { + err := advancer.repo.UpdateEpochs(ctx, app) + if err != nil { + return err + } + } + return nil } // process sequentially processes inputs from the the application. func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*Input) error { // Asserts that the app has an associated machine. - machine, ok := advancer.machines[app] - if !ok { + machine := advancer.machines.GetAdvanceMachine(app) + if machine == nil { panic(fmt.Errorf("%w %s", ErrNoApp, app.String())) } @@ -93,17 +102,13 @@ func (advancer *Advancer) process(ctx context.Context, app Address, inputs []*In } // Stores the result in the database. - err = advancer.repository.StoreAdvanceResult(ctx, input, res) + err = advancer.repo.StoreAdvanceResult(ctx, input, res) if err != nil { return err } } - // Updates the status of the epochs based on the last processed input. - lastInput := inputs[len(inputs)-1] - err := advancer.repository.UpdateEpochs(ctx, app, lastInput) - - return err + return nil } // ------------------------------------------------------------------------------------------------ @@ -114,25 +119,10 @@ type Repository interface { StoreAdvanceResult(context.Context, *Input, *nodemachine.AdvanceResult) error - UpdateEpochs(_ context.Context, app Address, lastInput *Input) error + UpdateEpochs(_ context.Context, app Address) error } -// A map of application addresses to machines. -type Machines = map[Address]Machine - -type Machine interface { - Advance(_ context.Context, input []byte, index uint64) (*nodemachine.AdvanceResult, error) -} - -// ------------------------------------------------------------------------------------------------ - -// keysFrom returns a slice with the keysFrom of a map. -func keysFrom[K comparable, V any](m map[K]V) []K { - keys := make([]K, len(m)) - i := 0 - for k := range m { - keys[i] = k - i++ - } - return keys +type Machines interface { + GetAdvanceMachine(Address) machines.AdvanceMachine + Keys() []Address } diff --git a/internal/node/advancer/advancer_test.go b/internal/node/advancer/advancer_test.go index 07b6bb6c0..bb97a8d6d 100644 --- a/internal/node/advancer/advancer_test.go +++ b/internal/node/advancer/advancer_test.go @@ -73,7 +73,7 @@ func (s *AdvancerSuite) TestRun() { res3 := randomAdvanceResult() repository := &MockRepository{ - GetInputsReturn: map[Address][]*Input{ + GetUnprocessedInputsReturn: map[Address][]*Input{ app1: { {Id: 1, RawData: marshal(res1)}, {Id: 2, RawData: marshal(res2)}, @@ -94,7 +94,9 @@ func (s *AdvancerSuite) TestRun() { require.Len(repository.StoredResults, 3) }) - // NOTE: missing more test cases + s.Run("Error/UpdateEpochs", func() { + s.T().Skip("TODO") + }) } func (s *AdvancerSuite) TestProcess() { @@ -124,7 +126,6 @@ func (s *AdvancerSuite) TestProcess() { err := advancer.process(context.Background(), app, inputs) require.Nil(err) require.Len(repository.StoredResults, 7) - require.Equal(*inputs[6], repository.LastInput) }) s.Run("Panic", func() { @@ -183,25 +184,7 @@ func (s *AdvancerSuite) TestProcess() { require.Errorf(err, "store-advance error") require.Len(repository.StoredResults, 1) }) - - s.Run("UpdateEpochs", func() { - require := s.Require() - - _, repository, advancer, app := setup() - inputs := []*Input{ - {Id: 1, RawData: marshal(randomAdvanceResult())}, - {Id: 2, RawData: marshal(randomAdvanceResult())}, - {Id: 3, RawData: marshal(randomAdvanceResult())}, - {Id: 4, RawData: marshal(randomAdvanceResult())}, - } - repository.UpdateEpochsError = errors.New("update-epochs error") - - err := advancer.process(context.Background(), app, inputs) - require.Errorf(err, "update-epochs error") - require.Len(repository.StoredResults, 4) - }) }) - } func (s *AdvancerSuite) TestKeysFrom() { @@ -228,20 +211,19 @@ func (mock *MockMachine) Advance( // ------------------------------------------------------------------------------------------------ type MockRepository struct { - GetInputsReturn map[Address][]*Input - GetInputsError error - StoreAdvanceError error - UpdateEpochsError error + GetUnprocessedInputsReturn map[Address][]*Input + GetUnprocessedInputsError error + StoreAdvanceError error + UpdateEpochsError error StoredResults []*nodemachine.AdvanceResult - LastInput Input } func (mock *MockRepository) GetUnprocessedInputs( _ context.Context, appAddresses []Address, ) (map[Address][]*Input, error) { - return mock.GetInputsReturn, mock.GetInputsError + return mock.GetUnprocessedInputsReturn, mock.GetUnprocessedInputsError } func (mock *MockRepository) StoreAdvanceResult( @@ -253,12 +235,7 @@ func (mock *MockRepository) StoreAdvanceResult( return mock.StoreAdvanceError } -func (mock *MockRepository) UpdateEpochs( - _ context.Context, - _ Address, - lastInput *Input, -) error { - mock.LastInput = *lastInput +func (mock *MockRepository) UpdateEpochs(_ context.Context, _ Address) error { return mock.UpdateEpochsError } diff --git a/internal/repository/advancer.go b/internal/repository/advancer.go new file mode 100644 index 000000000..ab5a7e826 --- /dev/null +++ b/internal/repository/advancer.go @@ -0,0 +1,260 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package repository + +import ( + "context" + "errors" + "fmt" + "strings" + + . "github.com/cartesi/rollups-node/internal/node/model" + "github.com/cartesi/rollups-node/internal/nodemachine" + "github.com/jackc/pgx/v5" +) + +var ErrAdvancerRepository = errors.New("advancer repository error") + +type AdvancerRepository struct{ *Database } + +type AppData struct { + AppAddress Address + InputIndex *uint64 + SnapshotPath string +} + +func (repo *AdvancerRepository) GetAppData(ctx context.Context) ([]*AppData, error) { + query := ` + SELECT DISTINCT ON (address) + a.contract_address AS address, + i.index, + COALESCE(s.uri, a.template_uri) + FROM application AS a + LEFT JOIN snapshot AS s ON (a.contract_address = s.application_address) + LEFT JOIN input AS i ON (s.input_id = i.id) + WHERE a.status = 'RUNNING' + ORDER BY address, i.index DESC + ` + rows, err := repo.db.Query(ctx, query) + if err != nil { + return nil, fmt.Errorf("%w (failed querying applications): %w", ErrAdvancerRepository, err) + } + + res := []*AppData{} + var row AppData + + scans := []any{&row.AppAddress, &row.InputIndex, &row.SnapshotPath} + _, err = pgx.ForEachRow(rows, scans, func() error { + row := row + res = append(res, &row) + return nil + }) + if err != nil { + return nil, fmt.Errorf("%w (failed reading rows): %w", ErrAdvancerRepository, err) + } + + return res, nil +} + +func (repo *AdvancerRepository) GetUnprocessedInputs( + ctx context.Context, + apps []Address, +) (map[Address][]*Input, error) { + result := map[Address][]*Input{} + if len(apps) == 0 { + return result, nil + } + + query := fmt.Sprintf(` + SELECT id, application_address, raw_data + FROM input + WHERE status = 'NONE' + AND application_address IN %s + ORDER BY index ASC, application_address + `, toSqlIn(apps)) // NOTE: not sanitized + rows, err := repo.db.Query(ctx, query) + if err != nil { + return nil, fmt.Errorf("%w (failed querying inputs): %w", ErrAdvancerRepository, err) + } + + var input Input + scans := []any{&input.Id, &input.AppAddress, &input.RawData} + _, err = pgx.ForEachRow(rows, scans, func() error { + input := input + if _, ok := result[input.AppAddress]; ok { //nolint:gosimple + result[input.AppAddress] = append(result[input.AppAddress], &input) + } else { + result[input.AppAddress] = []*Input{&input} + } + return nil + }) + if err != nil { + return nil, fmt.Errorf("%w (failed reading input rows): %w", ErrAdvancerRepository, err) + } + + return result, nil +} + +func (repo *AdvancerRepository) StoreAdvanceResult( + ctx context.Context, + input *Input, + res *nodemachine.AdvanceResult, +) error { + tx, err := repo.db.Begin(ctx) + if err != nil { + return errors.Join(ErrBeginTx, err) + } + + // Inserts the outputs. + nextOutputIndex, err := repo.getNextIndex(ctx, tx, "output", input.AppAddress) + if err != nil { + return err + } + err = repo.insert(ctx, tx, "output", res.Outputs, input.Id, nextOutputIndex) + if err != nil { + return err + } + + // Inserts the reports. + nextReportIndex, err := repo.getNextIndex(ctx, tx, "report", input.AppAddress) + if err != nil { + return err + } + err = repo.insert(ctx, tx, "report", res.Reports, input.Id, nextReportIndex) + if err != nil { + return err + } + + // Updates the input's status. + err = repo.updateInput(ctx, tx, input.Id, res.Status, res.OutputsHash, res.MachineHash) + if err != nil { + return err + } + + err = tx.Commit(ctx) + if err != nil { + return errors.Join(ErrCommitTx, err, tx.Rollback(ctx)) + } + + return nil +} + +func (repo *AdvancerRepository) UpdateEpochs(ctx context.Context, app Address) error { + query := ` + UPDATE epoch + SET status = 'PROCESSED_ALL_INPUTS' + WHERE id IN (( + SELECT DISTINCT epoch.id + FROM epoch INNER JOIN input ON (epoch.id = input.epoch_id) + WHERE epoch.application_address = @applicationAddress + AND epoch.status = 'CLOSED' + AND input.status != 'NONE' + ) EXCEPT ( + SELECT DISTINCT epoch.id + FROM epoch INNER JOIN input ON (epoch.id = input.epoch_id) + WHERE epoch.application_address = @applicationAddress + AND epoch.status = 'CLOSED' + AND input.status = 'NONE')) + ` + args := pgx.NamedArgs{"applicationAddress": app} + _, err := repo.db.Exec(ctx, query, args) + if err != nil { + return errors.Join(ErrUpdateRow, err) + } + return nil +} + +// ------------------------------------------------------------------------------------------------ + +func (_ *AdvancerRepository) getNextIndex( + ctx context.Context, + tx pgx.Tx, + tableName string, + appAddress Address, +) (uint64, error) { + var nextIndex uint64 + query := fmt.Sprintf(` + SELECT COALESCE(MAX(%s.index) + 1, 0) + FROM input INNER JOIN %s ON input.id = %s.input_id + WHERE input.status = 'ACCEPTED' + AND input.application_address = $1 + `, tableName, tableName, tableName) + err := tx.QueryRow(ctx, query, appAddress).Scan(&nextIndex) + if err != nil { + err = fmt.Errorf("failed to get the next %s index: %w", tableName, err) + return 0, errors.Join(err, tx.Rollback(ctx)) + } + return nextIndex, nil +} + +func (_ *AdvancerRepository) insert( + ctx context.Context, + tx pgx.Tx, + tableName string, + dataArray [][]byte, + inputId uint64, + nextIndex uint64, +) error { + lenOutputs := int64(len(dataArray)) + if lenOutputs < 1 { + return nil + } + + rows := [][]any{} + for i, data := range dataArray { + rows = append(rows, []any{inputId, nextIndex + uint64(i), data}) + } + + count, err := tx.CopyFrom( + ctx, + pgx.Identifier{tableName}, + []string{"input_id", "index", "raw_data"}, + pgx.CopyFromRows(rows), + ) + if err != nil { + return errors.Join(ErrCopyFrom, err, tx.Rollback(ctx)) + } + if lenOutputs != count { + err := fmt.Errorf("not all %ss were inserted (%d != %d)", tableName, lenOutputs, count) + return errors.Join(err, tx.Rollback(ctx)) + } + + return nil +} + +func (_ *AdvancerRepository) updateInput( + ctx context.Context, + tx pgx.Tx, + inputId uint64, + status InputCompletionStatus, + outputsHash Hash, + machineHash *Hash, +) error { + query := ` + UPDATE input + SET (status, outputs_hash, machine_hash) = (@status, @outputsHash, @machineHash) + WHERE id = @id + ` + args := pgx.NamedArgs{ + "status": status, + "outputsHash": outputsHash, + "machineHash": machineHash, + "id": inputId, + } + _, err := tx.Exec(ctx, query, args) + if err != nil { + return errors.Join(ErrUpdateRow, err, tx.Rollback(ctx)) + } + return nil +} + +// ------------------------------------------------------------------------------------------------ + +func toSqlIn[T fmt.Stringer](a []T) string { + s := []string{} + for _, x := range a { + s = append(s, fmt.Sprintf("'\\x%s'", x.String()[2:])) + } + return fmt.Sprintf("(%s)", strings.Join(s, ", ")) +} diff --git a/internal/repository/advancer_test.go b/internal/repository/advancer_test.go new file mode 100644 index 000000000..88d6544c4 --- /dev/null +++ b/internal/repository/advancer_test.go @@ -0,0 +1,311 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package repository + +import ( + "context" + "testing" + + . "github.com/cartesi/rollups-node/internal/node/model" + + "github.com/cartesi/rollups-node/pkg/rollupsmachine" + "github.com/cartesi/rollups-node/test/tooling/db" + "github.com/ethereum/go-ethereum/common" + + "github.com/stretchr/testify/require" +) + +func TestAdvancerRepository(t *testing.T) { + ctx := context.Background() + + t.Run("GetAppData", func(t *testing.T) { + require := require.New(t) + + endpoint, err := db.Setup(ctx) + require.Nil(err) + + database, err := Connect(ctx, endpoint) + require.Nil(err) + require.NotNil(database) + + apps, _, _, _, err := populate2(database) + require.Nil(err) + repository := &AdvancerRepository{Database: database} + + res, err := repository.GetAppData(ctx) + require.Nil(err) + require.Len(res, 2) + + appData1, appData2 := res[0], res[1] + + require.Equal(apps[1].ContractAddress, appData2.AppAddress) + require.Equal(uint64(6), *appData2.InputIndex) + require.Equal("path/to/snapshot/2", appData2.SnapshotPath) + + require.Equal(apps[2].ContractAddress, appData1.AppAddress) + require.Nil(appData1.InputIndex) + require.Equal("path/to/template/uri/2", appData1.SnapshotPath) + }) + + t.Run("GetUnprocessedInputs", func(t *testing.T) { + t.Skip("TODO") + }) + + t.Run("StoreAdvanceResult", func(t *testing.T) { + t.Skip("TODO") + }) + + t.Run("UpdateEpochs", func(t *testing.T) { + require := require.New(t) + + endpoint, err := db.Setup(ctx) + require.Nil(err) + + database, err := Connect(ctx, endpoint) + require.Nil(err) + require.NotNil(database) + + app, _, _, err := populate1(database) + require.Nil(err) + repository := &AdvancerRepository{Database: database} + + err = repository.UpdateEpochs(ctx, app.ContractAddress) + require.Nil(err) + + epoch0, err := repository.GetEpoch(ctx, 0, app.ContractAddress) + require.Nil(err) + require.NotNil(epoch0) + + epoch1, err := repository.GetEpoch(ctx, 1, app.ContractAddress) + require.Nil(err) + require.NotNil(epoch1) + + epoch2, err := repository.GetEpoch(ctx, 2, app.ContractAddress) + require.Nil(err) + require.NotNil(epoch2) + + epoch3, err := repository.GetEpoch(ctx, 3, app.ContractAddress) + require.Nil(err) + require.NotNil(epoch3) + + require.Equal(EpochStatusProcessedAllInputs, epoch0.Status) + require.Equal(EpochStatusProcessedAllInputs, epoch1.Status) + require.Equal(EpochStatusClosed, epoch2.Status) + require.Equal(EpochStatusOpen, epoch3.Status) + }) +} + +// ------------------------------------------------------------------------------------------------ + +func populate1(database *Database) (*Application, []*Epoch, []*Input, error) { + ctx := context.Background() + + app := &Application{ + ContractAddress: common.HexToAddress("deadbeef"), + IConsensusAddress: common.HexToAddress("beefdead"), + TemplateHash: [32]byte{}, + LastProcessedBlock: 0, + Status: "RUNNING", + } + + _, err := database.InsertApplication(ctx, app) + if err != nil { + return nil, nil, nil, err + } + + epochs := []*Epoch{{ + FirstBlock: 0, + LastBlock: 1, + Status: EpochStatusClosed, + }, { + FirstBlock: 2, + LastBlock: 3, + Status: EpochStatusClosed, + }, { + FirstBlock: 4, + LastBlock: 5, + Status: EpochStatusClosed, + }, { + FirstBlock: 6, + LastBlock: 7, + Status: EpochStatusOpen, + }} + + for i, epoch := range epochs { + epoch.Index = uint64(i) + epoch.AppAddress = app.ContractAddress + epoch.Id, err = database.InsertEpoch(ctx, epoch) + if err != nil { + return nil, nil, nil, err + } + } + + inputs := []*Input{{ + EpochId: epochs[0].Id, + CompletionStatus: InputStatusAccepted, + RawData: []byte("first input"), + }, { + EpochId: epochs[0].Id, + CompletionStatus: InputStatusRejected, + RawData: []byte("second input"), + }, { + EpochId: epochs[1].Id, + CompletionStatus: InputStatusException, + RawData: []byte("third input"), + }, { + EpochId: epochs[1].Id, + CompletionStatus: InputStatusAccepted, + RawData: []byte("fourth input"), + }, { + EpochId: epochs[2].Id, + CompletionStatus: InputStatusAccepted, + RawData: []byte("fifth input"), + }, { + EpochId: epochs[2].Id, + CompletionStatus: InputStatusNone, + RawData: []byte("sixth input"), + }, { + EpochId: epochs[3].Id, + CompletionStatus: InputStatusNone, + RawData: []byte("seventh input"), + }} + + for i, input := range inputs { + input.Index = uint64(i) + input.BlockNumber = uint64(i) + input.AppAddress = app.ContractAddress + + input.RawData, err = rollupsmachine.Input{Data: input.RawData}.Encode() + if err != nil { + return nil, nil, nil, err + } + + input.Id, err = database.InsertInput(ctx, input) + if err != nil { + return nil, nil, nil, err + } + } + + return app, epochs, inputs, nil +} + +// ------------------------------------------------------------------------------------------------ + +func populate2(database *Database) ([]*Application, []*Epoch, []*Input, []*Snapshot, error) { + ctx := context.Background() + + apps := []*Application{{ + ContractAddress: common.HexToAddress("dead"), + TemplateUri: "path/to/template/uri/0", + Status: ApplicationStatusNotRunning, + }, { + ContractAddress: common.HexToAddress("beef"), + TemplateUri: "path/to/template/uri/1", + Status: ApplicationStatusRunning, + }, { + ContractAddress: common.HexToAddress("bead"), + TemplateUri: "path/to/template/uri/2", + Status: ApplicationStatusRunning, + }} + if err := database.InsertApps(ctx, apps); err != nil { + return nil, nil, nil, nil, err + } + + epochs := []*Epoch{{ + Index: 0, + Status: EpochStatusClosed, + AppAddress: apps[1].ContractAddress, + }, { + Index: 1, + Status: EpochStatusClosed, + AppAddress: apps[1].ContractAddress, + }, { + Status: EpochStatusClosed, + AppAddress: apps[2].ContractAddress, + }} + err := database.InsertEpochs(ctx, epochs) + if err != nil { + return nil, nil, nil, nil, err + } + + inputs := []*Input{{ + Index: 0, + CompletionStatus: InputStatusAccepted, + RawData: []byte("first"), + AppAddress: apps[1].ContractAddress, + EpochId: epochs[0].Id, + }, { + Index: 6, + CompletionStatus: InputStatusAccepted, + RawData: []byte("second"), + AppAddress: apps[1].ContractAddress, + EpochId: epochs[1].Id, + }} + err = database.InsertInputs(ctx, inputs) + if err != nil { + return nil, nil, nil, nil, err + } + + snapshots := []*Snapshot{{ + URI: "path/to/snapshot/1", + InputId: inputs[0].Id, + AppAddress: apps[1].ContractAddress, + }, { + URI: "path/to/snapshot/2", + InputId: inputs[1].Id, + AppAddress: apps[1].ContractAddress, + }} + err = database.InsertSnapshots(ctx, snapshots) + if err != nil { + return nil, nil, nil, nil, err + } + + return apps, epochs, inputs, snapshots, nil +} + +// ------------------------------------------------------------------------------------------------ + +func (pg *Database) InsertApps(ctx context.Context, apps []*Application) error { + var err error + for _, app := range apps { + app.Id, err = pg.InsertApplication(ctx, app) + if err != nil { + return err + } + } + return nil +} + +func (pg *Database) InsertEpochs(ctx context.Context, epochs []*Epoch) error { + var err error + for _, epoch := range epochs { + epoch.Id, err = pg.InsertEpoch(ctx, epoch) + if err != nil { + return err + } + } + return nil +} + +func (pg *Database) InsertInputs(ctx context.Context, inputs []*Input) error { + var err error + for _, input := range inputs { + input.Id, err = pg.InsertInput(ctx, input) + if err != nil { + return err + } + } + return nil +} + +func (pg *Database) InsertSnapshots(ctx context.Context, snapshots []*Snapshot) error { + var err error + for _, snapshot := range snapshots { + snapshot.Id, err = pg.InsertSnapshot(ctx, snapshot) + if err != nil { + return err + } + } + return nil +} diff --git a/internal/repository/base.go b/internal/repository/base.go index 10f688bc5..d2db371e9 100644 --- a/internal/repository/base.go +++ b/internal/repository/base.go @@ -19,7 +19,14 @@ type Database struct { db *pgxpool.Pool } -var ErrInsertRow = errors.New("unable to insert row") +var ( + ErrInsertRow = errors.New("unable to insert row") + ErrUpdateRow = errors.New("unable to update row") + ErrCopyFrom = errors.New("unable to COPY FROM") + + ErrBeginTx = errors.New("unable to begin transaction") + ErrCommitTx = errors.New("unable to commit transaction") +) func Connect( ctx context.Context, @@ -84,42 +91,47 @@ func (pg *Database) InsertNodeConfig( func (pg *Database) InsertApplication( ctx context.Context, app *Application, -) error { +) (id uint64, _ error) { query := ` INSERT INTO application (contract_address, template_hash, + template_uri, last_processed_block, status, iconsensus_address) VALUES (@contractAddress, @templateHash, + @templateUri, @lastProcessedBlock, @status, - @iConsensusAddress)` + @iConsensusAddress) + RETURNING + id + ` args := pgx.NamedArgs{ "contractAddress": app.ContractAddress, "templateHash": app.TemplateHash, + "templateUri": app.TemplateUri, "lastProcessedBlock": app.LastProcessedBlock, "status": app.Status, "iConsensusAddress": app.IConsensusAddress, } - _, err := pg.db.Exec(ctx, query, args) + err := pg.db.QueryRow(ctx, query, args).Scan(&id) if err != nil { - return fmt.Errorf("%w: %w", ErrInsertRow, err) + return 0, fmt.Errorf("%w: %w", ErrInsertRow, err) } - return nil + return id, nil } func (pg *Database) InsertEpoch( ctx context.Context, epoch *Epoch, ) (uint64, error) { - var id uint64 query := ` @@ -140,7 +152,8 @@ func (pg *Database) InsertEpoch( @status, @applicationAddress) RETURNING - id` + id + ` args := pgx.NamedArgs{ "index": epoch.Index, @@ -283,7 +296,9 @@ func (pg *Database) InsertSnapshot( (@inputId, @appAddress, @uri) - RETURNING id` + RETURNING + id + ` args := pgx.NamedArgs{ "inputId": snapshot.InputId, diff --git a/internal/repository/base_test.go b/internal/repository/base_test.go index 3caa6943c..fbcb4267c 100644 --- a/internal/repository/base_test.go +++ b/internal/repository/base_test.go @@ -11,6 +11,7 @@ import ( . "github.com/cartesi/rollups-node/internal/node/model" "github.com/cartesi/rollups-node/internal/repository/schema" + "github.com/ethereum/go-ethereum/common" "github.com/stretchr/testify/suite" "github.com/testcontainers/testcontainers-go" @@ -84,10 +85,10 @@ func (s *RepositorySuite) SetupDatabase() { Status: ApplicationStatusNotRunning, } - err = s.database.InsertApplication(s.ctx, &app) + _, err = s.database.InsertApplication(s.ctx, &app) s.Require().Nil(err) - err = s.database.InsertApplication(s.ctx, &app2) + _, err = s.database.InsertApplication(s.ctx, &app2) s.Require().Nil(err) genericHash := common.HexToHash("deadbeef") @@ -256,7 +257,7 @@ func (s *RepositorySuite) TestApplicationFailsDuplicateRow() { Status: ApplicationStatusRunning, } - err := s.database.InsertApplication(s.ctx, &app) + _, err := s.database.InsertApplication(s.ctx, &app) s.Require().ErrorContains(err, "duplicate key value") } diff --git a/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql b/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql index a4c6483fb..7f0da3707 100644 --- a/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql +++ b/internal/repository/schema/migrations/000001_create_application_input_claim_output_report_nodeconfig.up.sql @@ -34,6 +34,7 @@ CREATE TABLE "application" "id" SERIAL, "contract_address" BYTEA NOT NULL, "template_hash" BYTEA NOT NULL, + "template_uri" VARCHAR(4096) NOT NULL, "last_processed_block" NUMERIC(20,0) NOT NULL CHECK ("last_processed_block" >= 0 AND "last_processed_block" <= f_maxuint64()), "status" "ApplicationStatus" NOT NULL, "iconsensus_address" BYTEA NOT NULL, diff --git a/internal/repository/validator_test.go b/internal/repository/validator_test.go index 20731405c..d5983908e 100644 --- a/internal/repository/validator_test.go +++ b/internal/repository/validator_test.go @@ -24,7 +24,7 @@ func (s *RepositorySuite) TestGetOutputsProducedInBlockRange() { TemplateHash: common.BytesToHash([]byte("template")), Status: ApplicationStatusRunning, } - err = s.database.InsertApplication(s.ctx, &app) + _, err = s.database.InsertApplication(s.ctx, &app) s.Require().Nil(err) epoch := Epoch{ @@ -77,7 +77,7 @@ func (s *RepositorySuite) TestGetProcessedEpochs() { TemplateHash: common.BytesToHash([]byte("template")), Status: ApplicationStatusRunning, } - err := s.database.InsertApplication(s.ctx, &app) + _, err := s.database.InsertApplication(s.ctx, &app) s.Require().Nil(err) // no epochs, should return nothing @@ -123,7 +123,7 @@ func (s *RepositorySuite) TestGetLastInputOutputHash() { TemplateHash: common.BytesToHash([]byte("template")), Status: ApplicationStatusRunning, } - err := s.database.InsertApplication(s.ctx, &app) + _, err := s.database.InsertApplication(s.ctx, &app) s.Require().Nil(err) epoch := Epoch{ @@ -193,7 +193,7 @@ func (s *RepositorySuite) TestGetPreviousEpoch() { TemplateHash: common.BytesToHash([]byte("template")), Status: ApplicationStatusRunning, } - err := s.database.InsertApplication(s.ctx, app) + _, err := s.database.InsertApplication(s.ctx, app) s.Require().Nil(err) epoch := Epoch{ @@ -234,7 +234,7 @@ func (s *RepositorySuite) TestSetEpochClaimAndInsertProofsTransaction() { TemplateHash: common.BytesToHash([]byte("template")), Status: ApplicationStatusRunning, } - err := s.database.InsertApplication(s.ctx, &app) + _, err := s.database.InsertApplication(s.ctx, &app) s.Require().Nil(err) epoch := Epoch{ @@ -321,7 +321,7 @@ func (s *RepositorySuite) TestSetEpochClaimAndInsertProofsTransactionRollback() TemplateHash: common.BytesToHash([]byte("template")), Status: ApplicationStatusRunning, } - err := s.database.InsertApplication(s.ctx, &app) + _, err := s.database.InsertApplication(s.ctx, &app) s.Require().Nil(err) epoch := Epoch{ diff --git a/test/advancer/advancer_test.go b/test/advancer/advancer_test.go new file mode 100644 index 000000000..a1d262e95 --- /dev/null +++ b/test/advancer/advancer_test.go @@ -0,0 +1,206 @@ +// (c) Cartesi and individual authors (see AUTHORS) +// SPDX-License-Identifier: Apache-2.0 (see LICENSE) + +package advancer + +import ( + "context" + "fmt" + "os" + "testing" + "time" + + . "github.com/cartesi/rollups-node/internal/node/model" + + "github.com/cartesi/rollups-node/internal/node/advancer" + "github.com/cartesi/rollups-node/internal/nodemachine" + "github.com/cartesi/rollups-node/internal/repository" + "github.com/cartesi/rollups-node/pkg/emulator" + "github.com/cartesi/rollups-node/pkg/rollupsmachine" + "github.com/cartesi/rollups-node/pkg/rollupsmachine/cartesimachine" + "github.com/cartesi/rollups-node/test/snapshot" + "github.com/cartesi/rollups-node/test/tooling/db" + + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" +) + +func TestAdvancer(t *testing.T) { + require := require.New(t) + ctx := context.Background() + + // Setups the database. + endpoint, err := db.Setup(ctx) + require.Nil(err) + database, err := repository.Connect(ctx, endpoint) + require.Nil(err) + require.NotNil(database) + app, _, _, err := populate(database) + require.Nil(err) + + // Creates the snapshot. + script := "ioctl-echo-loop --vouchers=1 --notices=3 --reports=5 --verbose=1" + snapshot, err := snapshot.FromScript(script, uint64(1_000_000_000)) + require.Nil(err) + defer func() { require.Nil(snapshot.Close()) }() + + // Starts the server. + verbosity := cartesimachine.ServerVerbosityInfo + address, err := cartesimachine.StartServer(verbosity, 0, os.Stdout, os.Stderr) + require.Nil(err) + + // Loads the cartesimachine. + config := &emulator.MachineRuntimeConfig{} + cartesiMachine, err := cartesimachine.Load(ctx, snapshot.Path(), address, config) + require.Nil(err) + require.NotNil(cartesiMachine) + + // Wraps the cartesimachine with rollupsmachine. + rollupsMachine, err := rollupsmachine.New(ctx, cartesiMachine, 50_000_000, 5_000_000_000) + require.Nil(err) + require.NotNil(rollupsMachine) + + // Wraps the rollupsmachine with nodemachine. + nodeMachine, err := nodemachine.New(rollupsMachine, 0, time.Minute, time.Minute, 10) + require.Nil(err) + require.NotNil(nodeMachine) + defer func() { require.Nil(nodeMachine.Close()) }() + + // Creates the machine pool. + machines := advancer.Machines{app.ContractAddress: nodeMachine} + + // Creates the advancer's repository. + repository := &repository.AdvancerRepository{Database: database} + + // Creates the advancer. + advancer, err := advancer.New(machines, repository) + require.Nil(err) + require.NotNil(advancer) + + // Creates the poller from the advancer. + poller, err := advancer.Poller(5 * time.Second) + require.Nil(err) + require.NotNil(poller) + + // Starts the advancer in a separate goroutine. + done := make(chan struct{}, 1) + go func() { + ready := make(chan struct{}, 1) + err = poller.Start(ctx, ready) + <-ready + require.Nil(err, "%v", err) + done <- struct{}{} + }() + + // Orders the advancer to stop after some time has passed. + time.Sleep(5 * time.Second) + poller.Stop() + +wait: + for { + select { + case <-done: + fmt.Println("Done!") + break wait + default: + fmt.Println("Waiting...") + time.Sleep(time.Second) + } + } + + t.Run("AssertThings!", func(t *testing.T) { + t.Skip("TODO") + }) +} + +func populate(database *repository.Database) (*Application, []*Epoch, []*Input, error) { + ctx := context.Background() + + app := &Application{ + ContractAddress: common.HexToAddress("deadbeef"), + IConsensusAddress: common.HexToAddress("beefdead"), + TemplateHash: [32]byte{}, + LastProcessedBlock: 0, + Status: "RUNNING", + } + + _, err := database.InsertApplication(ctx, app) + if err != nil { + return nil, nil, nil, err + } + + epochs := []*Epoch{{ + FirstBlock: 0, + LastBlock: 1, + Status: EpochStatusClosed, + }, { + FirstBlock: 2, + LastBlock: 3, + Status: EpochStatusClosed, + }, { + FirstBlock: 4, + LastBlock: 5, + Status: EpochStatusClosed, + }, { + FirstBlock: 6, + LastBlock: 7, + Status: EpochStatusOpen, + }} + + for i, epoch := range epochs { + epoch.Index = uint64(i) + epoch.AppAddress = app.ContractAddress + epoch.Id, err = database.InsertEpoch(ctx, epoch) + if err != nil { + return nil, nil, nil, err + } + } + + inputs := []*Input{{ + EpochId: epochs[0].Id, + CompletionStatus: InputStatusAccepted, + RawData: []byte("first input"), + }, { + EpochId: epochs[0].Id, + CompletionStatus: InputStatusRejected, + RawData: []byte("second input"), + }, { + EpochId: epochs[1].Id, + CompletionStatus: InputStatusException, + RawData: []byte("third input"), + }, { + EpochId: epochs[1].Id, + CompletionStatus: InputStatusAccepted, + RawData: []byte("fourth input"), + }, { + EpochId: epochs[2].Id, + CompletionStatus: InputStatusAccepted, + RawData: []byte("fifth input"), + }, { + EpochId: epochs[2].Id, + CompletionStatus: InputStatusNone, + RawData: []byte("sixth input"), + }, { + EpochId: epochs[3].Id, + CompletionStatus: InputStatusNone, + RawData: []byte("seventh input"), + }} + + for i, input := range inputs { + input.Index = uint64(i) + input.BlockNumber = uint64(i) + input.AppAddress = app.ContractAddress + + input.RawData, err = rollupsmachine.Input{Data: input.RawData}.Encode() + if err != nil { + return nil, nil, nil, err + } + + input.Id, err = database.InsertInput(ctx, input) + if err != nil { + return nil, nil, nil, err + } + } + + return app, epochs, inputs, nil +}