Skip to content

Commit

Permalink
feat: add the machines package
Browse files Browse the repository at this point in the history
  • Loading branch information
renan061 authored and vfusco committed Sep 9, 2024
1 parent 2206a7f commit cf978c5
Show file tree
Hide file tree
Showing 8 changed files with 341 additions and 26 deletions.
16 changes: 15 additions & 1 deletion docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,13 @@ When enabled, will connect to postgres database via SSL.
* **Type:** `bool`
* **Default:** `"true"`

## `CARTESI_ADVANCER_POLLING_INTERVAL`

How many seconds the node will wait before querying the database for new inputs.

* **Type:** `Duration`
* **Default:** `"30"`

## `CARTESI_EPOCH_LENGTH`

Length of a rollups epoch in blocks.
Expand All @@ -234,7 +241,7 @@ At the end of each epoch, the node will send claims to the blockchain.

## `CARTESI_EVM_READER_RETRY_POLICY_MAX_DELAY`

How seconds the retry policy will wait between retries.
How many seconds the retry policy will wait between retries.

* **Type:** `Duration`
* **Default:** `"3"`
Expand All @@ -258,3 +265,10 @@ How many seconds the node will wait before trying to finish epochs for all appli
Path to the directory with the cartesi-machine snapshot that will be loaded by the node.

* **Type:** `string`

## `CARTESI_MACHINE_SERVER_VERBOSITY`

TODO.

* **Type:** `string`
* **Default:** `"info"`
245 changes: 245 additions & 0 deletions internal/node/advancer/machines/machines.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
// (c) Cartesi and individual authors (see AUTHORS)
// SPDX-License-Identifier: Apache-2.0 (see LICENSE)

package machines

import (
"context"
"errors"
"fmt"
"log/slog"
"os"
"sync"
"time"

. "github.com/cartesi/rollups-node/internal/node/model"

nm "github.com/cartesi/rollups-node/internal/nodemachine"
"github.com/cartesi/rollups-node/pkg/emulator"
rm "github.com/cartesi/rollups-node/pkg/rollupsmachine"
cm "github.com/cartesi/rollups-node/pkg/rollupsmachine/cartesimachine"
)

type MachineConfig struct {
AppAddress Address
SnapshotPath string
SnapshotInputIndex *uint64
IncCycles uint64
MaxCycles uint64
AdvanceTimeout time.Duration
InspectTimeout time.Duration
MaxConcurrentInspects uint8
}

type Repository interface {
// GetMachineConfigurations retrieves a machine configuration for each application.
GetMachineConfigurations(context.Context) ([]*MachineConfig, error)

// GetProcessedInputs retrieves the processed inputs of an application with indexes greater or
// equal to the given input index.
GetProcessedInputs(_ context.Context, app Address, index uint64) ([]*Input, error)
}

// AdvanceMachine masks nodemachine.NodeMachine to only expose methods required by the Advancer.
type AdvanceMachine interface {
Advance(_ context.Context, input []byte, index uint64) (*nm.AdvanceResult, error)
}

// InspectMachine masks nodemachine.NodeMachine to only expose methods required by the Inspector.
type InspectMachine interface {
Inspect(_ context.Context, query []byte) (*nm.InspectResult, error)
}

// Machines is a thread-safe type that manages the pool of cartesi machines being used by the node.
// It contains a map of applications to machines.
type Machines struct {
mutex sync.RWMutex
machines map[Address]*nm.NodeMachine
}

// Load initializes the cartesi machines.
// Load advances a machine to the last processed input stored in the database.
//
// Load does not fail when one of those machines fail to initialize.
// It stores the error to be returned later and continues to initialize the other machines.
func Load(ctx context.Context, repo Repository, verbosity cm.ServerVerbosity) (*Machines, error) {
configs, err := repo.GetMachineConfigurations(ctx)
if err != nil {
return nil, err
}

machines := map[Address]*nm.NodeMachine{}
var errs error

for _, config := range configs {
// Creates the machine.
machine, err := createMachine(ctx, verbosity, config)
if err != nil {
err = fmt.Errorf("failed to create machine from snapshot (%v): %w", config, err)
errs = errors.Join(errs, err)
continue
}

// Advances the machine until it catches up with the state of the database (if necessary).
err = catchUp(ctx, repo, config.AppAddress, machine, config.SnapshotInputIndex)
if err != nil {
err = fmt.Errorf("failed to advance cartesi machine (%v): %w", config, err)
errs = errors.Join(errs, err, machine.Close())
continue
}

machines[config.AppAddress] = machine
}

return &Machines{machines: machines}, errs
}

// GetAdvanceMachine gets the machine associated with the application from the map.
func (m *Machines) GetAdvanceMachine(app Address) AdvanceMachine {
return m.getMachine(app)
}

// GetInspectMachine gets the machine associated with the application from the map.
func (m *Machines) GetInspectMachine(app Address) InspectMachine {
return m.getMachine(app)
}

// Add maps a new application to a machine.
// It does nothing if the application is already mapped to some machine.
// It returns true if it was able to add the machine and false otherwise.
func (m *Machines) Add(app Address, machine *nm.NodeMachine) bool {
m.mutex.Lock()
defer m.mutex.Unlock()

if _, ok := m.machines[app]; ok {
return false
} else {
m.machines[app] = machine
return true
}
}

// Delete deletes an application from the map.
// It returns the associated machine, if any.
func (m *Machines) Delete(app Address) *nm.NodeMachine {
m.mutex.Lock()
defer m.mutex.Unlock()

if machine, ok := m.machines[app]; ok {
return nil
} else {
delete(m.machines, app)
return machine
}
}

// Apps returns the addresses of the applications for which there are machines.
func (m *Machines) Apps() []Address {
m.mutex.RLock()
defer m.mutex.Unlock()

keys := make([]Address, len(m.machines))
i := 0
for k := range m.machines {
keys[i] = k
i++
}
return keys
}

// Close closes all the machines and erases them from the map.
func (m *Machines) Close() error {
m.mutex.Lock()
defer m.mutex.Unlock()

err := closeMachines(m.machines)
if err != nil {
slog.Error(fmt.Sprintf("failed to close some machines: %v", err))
}
return err
}

// ------------------------------------------------------------------------------------------------

func (m *Machines) getMachine(app Address) *nm.NodeMachine {
m.mutex.RLock()
defer m.mutex.Unlock()
return m.machines[app]
}

func closeMachines(machines map[Address]*nm.NodeMachine) (err error) {
for _, machine := range machines {
err = errors.Join(err, machine.Close())
}
for app := range machines {
delete(machines, app)
}
return
}

func createMachine(ctx context.Context,
verbosity cm.ServerVerbosity,
config *MachineConfig,
) (*nm.NodeMachine, error) {
// Starts the server.
address, err := cm.StartServer(verbosity, 0, os.Stdout, os.Stderr)
if err != nil {
return nil, err
}

// Creates a CartesiMachine from the snapshot.
runtimeConfig := &emulator.MachineRuntimeConfig{}
cartesiMachine, err := cm.Load(ctx, config.SnapshotPath, address, runtimeConfig)
if err != nil {
return nil, errors.Join(err, cm.StopServer(address))
}

// Creates a RollupsMachine from the CartesiMachine.
rollupsMachine, err := rm.New(ctx,
cartesiMachine,
config.IncCycles,
config.MaxCycles)
if err != nil {
return nil, errors.Join(err, cartesiMachine.Close(ctx))
}

// Creates a NodeMachine from the RollupsMachine.
nodeMachine, err := nm.New(rollupsMachine,
config.SnapshotInputIndex,
config.AdvanceTimeout,
config.InspectTimeout,
config.MaxConcurrentInspects)
if err != nil {
return nil, errors.Join(err, rollupsMachine.Close(ctx))
}

return nodeMachine, err
}

func catchUp(ctx context.Context,
repo Repository,
app Address,
machine *nm.NodeMachine,
snapshotInputIndex *uint64,
) error {
// A nil index indicates we should start to process inputs from the beginning (index zero).
// A non-nil index indicates we should start to process inputs from the next available index.
firstInputIndexToProcess := uint64(0)
if snapshotInputIndex != nil {
firstInputIndexToProcess = *snapshotInputIndex + 1
}

inputs, err := repo.GetProcessedInputs(ctx, app, firstInputIndexToProcess)
if err != nil {
return err
}

for _, input := range inputs {
_, err := machine.Advance(ctx, input.RawData, input.Index)
if err != nil {
return err
}
}

return nil
}
8 changes: 8 additions & 0 deletions internal/node/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package config
import (
"fmt"
"os"

"github.com/cartesi/rollups-node/pkg/rollupsmachine/cartesimachine"
)

// NodeConfig contains all the Node variables.
Expand Down Expand Up @@ -39,7 +41,10 @@ type NodeConfig struct {
ExperimentalSunodoValidatorEnabled bool
ExperimentalSunodoValidatorRedisEndpoint string
Auth Auth
AdvancerPollingInterval Duration
ValidatorPollingInterval Duration
// Temporary
MachineServerVerbosity cartesimachine.ServerVerbosity
}

// Auth is used to sign transactions.
Expand Down Expand Up @@ -106,7 +111,10 @@ func FromEnv() NodeConfig {
if getFeatureClaimerEnabled() && !getExperimentalSunodoValidatorEnabled() {
config.Auth = authFromEnv()
}
config.AdvancerPollingInterval = getAdvancerPollingInterval()
config.ValidatorPollingInterval = getValidatorPollingInterval()
// Temporary.
config.MachineServerVerbosity = cartesimachine.ServerVerbosity(getMachineServerVerbosity())
return config
}

Expand Down
18 changes: 17 additions & 1 deletion internal/node/config/generate/Config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,13 @@ How many times some functions should be retried after an error."""
default = "3"
go-type = "Duration"
description = """
How seconds the retry policy will wait between retries."""
How many seconds the retry policy will wait between retries."""

[rollups.CARTESI_ADVANCER_POLLING_INTERVAL]
default = "30"
go-type = "Duration"
description = """
How many seconds the node will wait before querying the database for new inputs."""

[rollups.CARTESI_VALIDATOR_POLLING_INTERVAL]
default = "30"
Expand Down Expand Up @@ -250,3 +256,13 @@ go-type = "bool"
description = """
When enabled, prints server-manager output to stdout and stderr directly.
All other log configurations are ignored."""

#
# Temporary
#

[temp.CARTESI_MACHINE_SERVER_VERBOSITY]
default = "info"
go-type = "string"
description = """
TODO."""
24 changes: 24 additions & 0 deletions internal/node/config/generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions internal/node/model/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ type Application struct {
Id uint64
ContractAddress Address
TemplateHash Hash
TemplateUri string
LastProcessedBlock uint64
Status ApplicationStatus
IConsensusAddress Address
Expand Down
Loading

0 comments on commit cf978c5

Please sign in to comment.