Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: added execution stage for schedules #NTRN-339 #671

Merged
merged 14 commits into from
Sep 4, 2024
6 changes: 6 additions & 0 deletions proto/neutron/cron/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,31 @@ service Query {
// this line is used by starport scaffolding # 2
}

// QueryParamsRequest is the request type for the Query/Params RPC method.
message QueryParamsRequest {}

// QueryParamsResponse is the response type for the Query/Params RPC method.
message QueryParamsResponse {
// params holds all the parameters of this module.
Params params = 1 [(gogoproto.nullable) = false];
}

// QueryGetScheduleRequest is the request type for the Query/Schedule RPC method.
message QueryGetScheduleRequest {
string name = 1;
}

// QueryGetScheduleResponse is the response type for the Query/Params RPC method.
message QueryGetScheduleResponse {
Schedule schedule = 1 [(gogoproto.nullable) = false];
}

// QuerySchedulesRequest is the request type for the Query/Schedules RPC method.
message QuerySchedulesRequest {
cosmos.base.query.v1beta1.PageRequest pagination = 1;
}

// QuerySchedulesResponse is the response type for the Query/Params RPC method.
message QuerySchedulesResponse {
repeated Schedule schedules = 1 [(gogoproto.nullable) = false];
cosmos.base.query.v1beta1.PageResponse pagination = 2;
Expand Down
16 changes: 10 additions & 6 deletions proto/neutron/cron/schedule.proto
Original file line number Diff line number Diff line change
Expand Up @@ -7,31 +7,35 @@ option go_package = "github.com/neutron-org/neutron/v4/x/cron/types";

// ExecutionStage defines when messages will be executed in the block
enum ExecutionStage {
BEGIN_BLOCKER = 0;
END_BLOCKER = 1;
BOTH_BLOCKERS = 2;
// Execution at the end of the block
EXECUTION_STAGE_END_BLOCKER = 0;
// Execution at the beginning of the block
EXECUTION_STAGE_BEGIN_BLOCKER = 1;
}

// Schedule defines the schedule for execution
message Schedule {
// Name of schedule
string name = 1;
// Period in blocks
uint64 period = 2;
// Msgs that will be executed every period amount of time
// Msgs that will be executed every certain number of blocks, specified in the `period` field
repeated MsgExecuteContract msgs = 3 [(gogoproto.nullable) = false];
// Last execution's block height
uint64 last_execute_height = 4;
// Execution stage when messages will be executed
ExecutionStage execution_stage = 5;
// Execution stages when messages will be executed
repeated ExecutionStage execution_stages = 5 [(gogoproto.nullable) = false];
}

// MsgExecuteContract defines the contract and the message to pass
message MsgExecuteContract {
// Contract is the address of the smart contract
string contract = 1;
// Msg is json encoded message to be passed to the contract
string msg = 2;
}

// ScheduleCount defines the number of current schedules
message ScheduleCount {
// Count is the number of current schedules
int32 count = 1;
Expand Down
9 changes: 6 additions & 3 deletions proto/neutron/cron/tx.proto
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ message MsgAddSchedule {

// Authority is the address of the governance account.
string authority = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"];

// Name of the schedule
string name = 2;
// Period in blocks
uint64 period = 3;
// Msgs that will be executed every certain number of blocks, specified in the `period` field
repeated MsgExecuteContract msgs = 4 [(gogoproto.nullable) = false];
ExecutionStage execution_stage = 5;
// Execution stages when messages will be executed
repeated ExecutionStage execution_stages = 5 [(gogoproto.nullable) = false];
}

// MsgAddScheduleResponse defines the response structure for executing a
Expand All @@ -50,7 +53,7 @@ message MsgRemoveSchedule {

// Authority is the address of the governance account.
string authority = 1 [(cosmos_proto.scalar) = "cosmos.AddressString"];

// Name of the schedule
string name = 2;
}

Expand Down
5 changes: 4 additions & 1 deletion proto/neutron/cron/v1/schedule.proto
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,27 @@ import "gogoproto/gogo.proto";

option go_package = "github.com/neutron-org/neutron/v4/x/cron/types/v1";

// Schedule defines the schedule for execution
message Schedule {
// Name of schedule
string name = 1;
// Period in blocks
uint64 period = 2;
// Msgs that will be executed every period amount of time
// Msgs that will be executed every certain number of blocks, specified in the `period` field
repeated MsgExecuteContract msgs = 3 [(gogoproto.nullable) = false];
// Last execution's block height
uint64 last_execute_height = 4;
}

// MsgExecuteContract defines the contract and the message to pass
message MsgExecuteContract {
// Contract is the address of the smart contract
string contract = 1;
// Msg is json encoded message to be passed to the contract
string msg = 2;
}

// ScheduleCount defines the number of current schedules
message ScheduleCount {
// Count is the number of current schedules
int32 count = 1;
Expand Down
8 changes: 4 additions & 4 deletions wasmbinding/bindings/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,10 +195,10 @@ type ForceTransfer struct {

// AddSchedule adds new schedule to the cron module
type AddSchedule struct {
Name string `json:"name"`
Period uint64 `json:"period"`
Msgs []MsgExecuteContract `json:"msgs"`
ExecutionStage string `json:"execution_stage"`
Name string `json:"name"`
Period uint64 `json:"period"`
Msgs []MsgExecuteContract `json:"msgs"`
ExecutionStages []string `json:"execution_stages"`
}

// AddScheduleResponse holds response AddSchedule
Expand Down
11 changes: 4 additions & 7 deletions wasmbinding/message_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -985,15 +985,12 @@ func (m *CustomMessenger) addSchedule(ctx sdk.Context, contractAddr sdk.AccAddre
})
}

var executionStage crontypes.ExecutionStage

if v, ok := crontypes.ExecutionStage_value[addSchedule.ExecutionStage]; !ok {
executionStage = crontypes.ExecutionStage_END_BLOCKER
} else {
executionStage = crontypes.ExecutionStage(v)
executionStages := make([]crontypes.ExecutionStage, 0, len(addSchedule.ExecutionStages))
for _, executionStage := range addSchedule.ExecutionStages {
executionStages = append(executionStages, crontypes.ExecutionStage(crontypes.ExecutionStage_value[executionStage]))
}

err := m.CronKeeper.AddSchedule(ctx, addSchedule.Name, addSchedule.Period, msgs, executionStage)
err := m.CronKeeper.AddSchedule(ctx, addSchedule.Name, addSchedule.Period, msgs, executionStages)
if err != nil {
ctx.Logger().Error("failed to addSchedule",
"from_address", contractAddr.String(),
Expand Down
2 changes: 1 addition & 1 deletion x/cron/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func InitGenesis(ctx sdk.Context, k keeper.Keeper, genState types.GenesisState) {
// Set all the schedules
for _, elem := range genState.ScheduleList {
err := k.AddSchedule(ctx, elem.Name, elem.Period, elem.Msgs, elem.ExecutionStage)
err := k.AddSchedule(ctx, elem.Name, elem.Period, elem.Msgs, elem.ExecutionStages)
if err != nil {
panic(err)
}
Expand Down
3 changes: 2 additions & 1 deletion x/cron/keeper/grpc_query_schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,9 @@ func createNSchedule(t *testing.T, ctx sdk.Context, k *cronkeeper.Keeper, n int3
item.Period = 1000
item.Msgs = nil
item.LastExecuteHeight = uint64(ctx.BlockHeight())
item.ExecutionStages = []types.ExecutionStage{types.ExecutionStage_EXECUTION_STAGE_END_BLOCKER}

err := k.AddSchedule(ctx, item.Name, item.Period, item.Msgs, item.ExecutionStage)
err := k.AddSchedule(ctx, item.Name, item.Period, item.Msgs, item.ExecutionStages)
require.NoError(t, err)

res[idx] = item
Expand Down
36 changes: 23 additions & 13 deletions x/cron/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (

MetricLabelSuccess = "success"
MetricLabelScheduleName = "schedule_name"

schedulesExecutionStages map[string]map[types.ExecutionStage]struct{}
sotnikov-s marked this conversation as resolved.
Show resolved Hide resolved
)

type (
Expand All @@ -47,6 +49,8 @@ func NewKeeper(
accountKeeper types.AccountKeeper,
authority string,
) *Keeper {
schedulesExecutionStages = make(map[string]map[types.ExecutionStage]struct{})

return &Keeper{
cdc: cdc,
storeKey: storeKey,
Expand All @@ -66,42 +70,50 @@ func (k *Keeper) Logger(ctx sdk.Context) log.Logger {

// ExecuteReadySchedules gets all schedules that are due for execution (with limit that is equal to Params.Limit)
// and executes messages in each one
// NOTE that errors in contract calls rollback all already executed messages
func (k *Keeper) ExecuteReadySchedules(ctx sdk.Context, executionStage types.ExecutionStage) {
telemetry.ModuleMeasureSince(types.ModuleName, time.Now(), LabelExecuteReadySchedules)
schedules := k.getSchedulesReadyForExecution(ctx)

for _, schedule := range schedules {
if isExecutableAtTheStage(schedule, executionStage) {
if _, ok := schedulesExecutionStages[schedule.Name][executionStage]; ok {
err := k.executeSchedule(ctx, schedule)
recordExecutedSchedule(err, schedule)
}
}
}

// AddSchedule adds new schedule to execution for every block `period`.
// AddSchedule adds a new schedule to be executed every certain number of blocks, specified in the `period`.
// First schedule execution is supposed to be on `now + period` block.
func (k *Keeper) AddSchedule(
ctx sdk.Context,
name string,
period uint64,
msgs []types.MsgExecuteContract,
executionStage types.ExecutionStage,
executionStages []types.ExecutionStage,
) error {
if k.scheduleExists(ctx, name) {
return fmt.Errorf("schedule already exists with name=%v", name)
}

schedulesExecutionStages[name] = make(map[types.ExecutionStage]struct{})
execStages := make([]types.ExecutionStage, 0)
for _, executionStage := range executionStages {
if _, ok := types.ExecutionStage_name[int32(executionStage)]; !ok {
executionStage = types.ExecutionStage_EXECUTION_STAGE_END_BLOCKER
}

if _, ok := schedulesExecutionStages[name][executionStage]; !ok {
schedulesExecutionStages[name][executionStage] = struct{}{}
execStages = append(execStages, executionStage)
}
}

schedule := types.Schedule{
Name: name,
Period: period,
Msgs: msgs,
LastExecuteHeight: uint64(ctx.BlockHeight()), // let's execute newly added schedule on `now + period` block
ExecutionStage: executionStage,
}

if _, ok := types.ExecutionStage_name[int32(executionStage)]; !ok {
schedule.ExecutionStage = types.ExecutionStage_END_BLOCKER
ExecutionStages: execStages,
}

k.storeSchedule(ctx, schedule)
Expand All @@ -116,6 +128,8 @@ func (k *Keeper) RemoveSchedule(ctx sdk.Context, name string) {
return
}

delete(schedulesExecutionStages, name)

k.changeTotalCount(ctx, -1)
k.removeSchedule(ctx, name)
}
Expand Down Expand Up @@ -183,10 +197,6 @@ func (k *Keeper) getSchedulesReadyForExecution(ctx sdk.Context) []types.Schedule
return res
}

func isExecutableAtTheStage(schedule types.Schedule, executionStage types.ExecutionStage) bool {
return schedule.ExecutionStage == executionStage || schedule.ExecutionStage == types.ExecutionStage_BOTH_BLOCKERS
}

// executeSchedule executes all msgs in a given schedule and changes LastExecuteHeight
// if at least one msg execution fails, rollback all messages
func (k *Keeper) executeSchedule(ctx sdk.Context, schedule types.Schedule) error {
Expand Down
Loading
Loading