diff --git a/cmd/curio/tasks/tasks.go b/cmd/curio/tasks/tasks.go index d29a37ab8..f3a1d8c19 100644 --- a/cmd/curio/tasks/tasks.go +++ b/cmd/curio/tasks/tasks.go @@ -92,6 +92,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task si := dependencies.Si bstore := dependencies.Bstore machine := dependencies.ListenAddr + prover := dependencies.Prover var activeTasks []harmonytask.TaskInterface sender, sendTask := message.NewSender(full, full, db) @@ -195,7 +196,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task cfg.Subsystems.EnableUpdateSubmit if hasAnySealingTask { - sealingTasks, err := addSealingTasks(ctx, hasAnySealingTask, db, full, sender, as, cfg, slrLazy, asyncParams, si, stor, bstore, machine) + sealingTasks, err := addSealingTasks(ctx, hasAnySealingTask, db, full, sender, as, cfg, slrLazy, asyncParams, si, stor, bstore, machine, prover) if err != nil { return nil, err } @@ -243,14 +244,18 @@ func addSealingTasks( ctx context.Context, hasAnySealingTask bool, db *harmonydb.DB, full api.Chain, sender *message.Sender, as *multictladdr.MultiAddressSelector, cfg *config.CurioConfig, slrLazy *lazy.Lazy[*ffi.SealCalls], asyncParams func() func() (bool, error), si paths.SectorIndex, stor *paths.Remote, - bstore curiochain.CurioBlockstore, machineHostPort string) ([]harmonytask.TaskInterface, error) { + bstore curiochain.CurioBlockstore, machineHostPort string, prover storiface.Prover) ([]harmonytask.TaskInterface, error) { var activeTasks []harmonytask.TaskInterface // Sealing / Snap var sp *seal.SealPoller var slr *ffi.SealCalls + var err error if hasAnySealingTask { - sp = seal.NewPoller(db, full) + sp, err = seal.NewPoller(db, full, cfg) + if err != nil { + return nil, xerrors.Errorf("creating seal poller: %w", err) + } go sp.RunPoller(ctx) slr = must.One(slrLazy.Val()) @@ -303,7 +308,7 @@ func addSealingTasks( } if cfg.Subsystems.EnableSendPrecommitMsg { - precommitTask := seal.NewSubmitPrecommitTask(sp, db, full, sender, as, cfg.Fees.MaxPreCommitGasFee, cfg.Fees.CollateralFromMinerBalance, cfg.Fees.DisableCollateralFallback) + precommitTask := seal.NewSubmitPrecommitTask(sp, db, full, sender, as, cfg) activeTasks = append(activeTasks, precommitTask) } if cfg.Subsystems.EnablePoRepProof { @@ -321,7 +326,7 @@ func addSealingTasks( } } if cfg.Subsystems.EnableSendCommitMsg { - commitTask := seal.NewSubmitCommitTask(sp, db, full, sender, as, cfg) + commitTask := seal.NewSubmitCommitTask(sp, db, full, sender, as, cfg, prover) activeTasks = append(activeTasks, commitTask) } diff --git a/deps/config/doc_gen.go b/deps/config/doc_gen.go index 1a1cd15cb..250a852a5 100644 --- a/deps/config/doc_gen.go +++ b/deps/config/doc_gen.go @@ -37,6 +37,26 @@ var Doc = map[string][]DocField{ Comment: ``, }, }, + "CommitBatchingConfig": { + { + Name: "BaseFeeThreshold", + Type: "types.FIL", + + Comment: `Base fee value below which we should try to send Commit messages immediately`, + }, + { + Name: "Timeout", + Type: "Duration", + + Comment: `Maximum amount of time any given sector in the batch can wait for the batch to accumulate`, + }, + { + Name: "Slack", + Type: "Duration", + + Comment: `Time buffer for forceful batch submission before sectors/deals in batch would start expiring`, + }, + }, "CurioAddresses": { { Name: "PreCommitControl", @@ -106,6 +126,26 @@ alerts will be triggered for the wallet`, Comment: `SlackWebhookConfig is a configuration type for Slack webhook integration.`, }, }, + "CurioBatchingConfig": { + { + Name: "PreCommit", + Type: "PreCommitBatchingConfig", + + Comment: `Precommit Batching configuration`, + }, + { + Name: "Commit", + Type: "CommitBatchingConfig", + + Comment: `Commit batching configuration`, + }, + { + Name: "Update", + Type: "UpdateBatchingConfig", + + Comment: `Snap Deals batching configuration`, + }, + }, "CurioConfig": { { Name: "Subsystems", @@ -153,6 +193,12 @@ alerts will be triggered for the wallet`, Name: "Alerting", Type: "CurioAlertingConfig", + Comment: ``, + }, + { + Name: "Batching", + Type: "CurioBatchingConfig", + Comment: ``, }, }, @@ -187,6 +233,12 @@ alerts will be triggered for the wallet`, Comment: ``, }, + { + Name: "MaxUpdateBatchGasFee", + Type: "BatchFeeConfig", + + Comment: ``, + }, { Name: "MaxTerminateGasFee", Type: "types.FIL", @@ -709,6 +761,26 @@ The default is sufficient for integration with the stock commercial PagerDuty.co identifier in the integration page for the service.`, }, }, + "PreCommitBatchingConfig": { + { + Name: "BaseFeeThreshold", + Type: "types.FIL", + + Comment: `Base fee value below which we should try to send Precommit messages immediately`, + }, + { + Name: "Timeout", + Type: "Duration", + + Comment: `Maximum amount of time any given sector in the batch can wait for the batch to accumulate`, + }, + { + Name: "Slack", + Type: "Duration", + + Comment: `Time buffer for forceful batch submission before sectors/deal in batch would start expiring`, + }, + }, "PrometheusAlertManagerConfig": { { Name: "Enable", @@ -738,4 +810,24 @@ identifier in the integration page for the service.`, Example: https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX`, }, }, + "UpdateBatchingConfig": { + { + Name: "BaseFeeThreshold", + Type: "types.FIL", + + Comment: `Base fee value below which we should try to send Commit messages immediately`, + }, + { + Name: "Timeout", + Type: "Duration", + + Comment: `Maximum amount of time any given sector in the batch can wait for the batch to accumulate`, + }, + { + Name: "Slack", + Type: "Duration", + + Comment: `Time buffer for forceful batch submission before sectors/deals in batch would start expiring`, + }, + }, } diff --git a/deps/config/types.go b/deps/config/types.go index 12715f8e8..48ebbdfc8 100644 --- a/deps/config/types.go +++ b/deps/config/types.go @@ -3,6 +3,9 @@ package config import ( "time" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/lotus/chain/types" ) @@ -27,6 +30,10 @@ func DefaultCurioConfig() *CurioConfig { Base: types.MustParseFIL("0"), PerSector: types.MustParseFIL("0.03"), // enough for 6 agg and 1nFIL base fee }, + MaxUpdateBatchGasFee: BatchFeeConfig{ + Base: types.MustParseFIL("0"), + PerSector: types.MustParseFIL("0.03"), + }, MaxTerminateGasFee: types.MustParseFIL("0.5"), MaxWindowPoStGasFee: types.MustParseFIL("5"), @@ -70,6 +77,23 @@ func DefaultCurioConfig() *CurioConfig { AlertManagerURL: "http://localhost:9093/api/v2/alerts", }, }, + Batching: CurioBatchingConfig{ + PreCommit: PreCommitBatchingConfig{ + BaseFeeThreshold: types.MustParseFIL("0.005"), + Timeout: Duration(4 * time.Hour), + Slack: Duration(6 * time.Hour), + }, + Commit: CommitBatchingConfig{ + BaseFeeThreshold: types.MustParseFIL("0.005"), + Timeout: Duration(1 * time.Hour), + Slack: Duration(1 * time.Hour), + }, + Update: UpdateBatchingConfig{ + BaseFeeThreshold: types.MustParseFIL("0.005"), + Timeout: Duration(1 * time.Hour), + Slack: Duration(1 * time.Hour), + }, + }, } } @@ -85,6 +109,7 @@ type CurioConfig struct { Seal CurioSealConfig Apis ApisConfig Alerting CurioAlertingConfig + Batching CurioBatchingConfig } func DefaultDefaultMaxFee() types.FIL { @@ -96,6 +121,10 @@ type BatchFeeConfig struct { PerSector types.FIL } +func (b *BatchFeeConfig) FeeForSectors(nSectors int) abi.TokenAmount { + return big.Add(big.Int(b.Base), big.Mul(big.NewInt(int64(nSectors)), big.Int(b.PerSector))) +} + type CurioSubsystemsConfig struct { // EnableWindowPost enables window post to be executed on this curio instance. Each machine in the cluster // with WindowPoSt enabled will also participate in the window post scheduler. It is possible to have multiple @@ -288,6 +317,7 @@ type CurioFees struct { // maxBatchFee = maxBase + maxPerSector * nSectors MaxPreCommitBatchGasFee BatchFeeConfig MaxCommitBatchGasFee BatchFeeConfig + MaxUpdateBatchGasFee BatchFeeConfig MaxTerminateGasFee types.FIL // WindowPoSt is a high-value operation, so the default fee should be high. @@ -500,3 +530,47 @@ type ApisConfig struct { // Chain API auth secret for the Curio nodes to use. StorageRPCSecret string } + +type CurioBatchingConfig struct { + // Precommit Batching configuration + PreCommit PreCommitBatchingConfig + + // Commit batching configuration + Commit CommitBatchingConfig + + // Snap Deals batching configuration + Update UpdateBatchingConfig +} + +type PreCommitBatchingConfig struct { + // Base fee value below which we should try to send Precommit messages immediately + BaseFeeThreshold types.FIL + + // Maximum amount of time any given sector in the batch can wait for the batch to accumulate + Timeout Duration + + // Time buffer for forceful batch submission before sectors/deal in batch would start expiring + Slack Duration +} + +type CommitBatchingConfig struct { + // Base fee value below which we should try to send Commit messages immediately + BaseFeeThreshold types.FIL + + // Maximum amount of time any given sector in the batch can wait for the batch to accumulate + Timeout Duration + + // Time buffer for forceful batch submission before sectors/deals in batch would start expiring + Slack Duration +} + +type UpdateBatchingConfig struct { + // Base fee value below which we should try to send Commit messages immediately + BaseFeeThreshold types.FIL + + // Maximum amount of time any given sector in the batch can wait for the batch to accumulate + Timeout Duration + + // Time buffer for forceful batch submission before sectors/deals in batch would start expiring + Slack Duration +} diff --git a/deps/deps.go b/deps/deps.go index 466cfbb1b..68b0abbfb 100644 --- a/deps/deps.go +++ b/deps/deps.go @@ -182,6 +182,7 @@ type Deps struct { ListenAddr string Name string Alert *alertmanager.AlertNow + Prover storiface.Prover } const ( @@ -348,6 +349,10 @@ Get it with: jq .PrivateKey ~/.lotus-miner/keystore/MF2XI2BNNJ3XILLQOJUXMYLUMU`, deps.Name = cctx.String("name") } + if deps.Prover == nil { + deps.Prover = ffiwrapper.ProofProver + } + return nil } diff --git a/documentation/en/configuration/default-curio-configuration.md b/documentation/en/configuration/default-curio-configuration.md index 3d40c69c5..c48fec76d 100644 --- a/documentation/en/configuration/default-curio-configuration.md +++ b/documentation/en/configuration/default-curio-configuration.md @@ -302,6 +302,13 @@ description: The default curio configuration # type: types.FIL #PerSector = "0.03 FIL" + [Fees.MaxUpdateBatchGasFee] + # type: types.FIL + #Base = "0 FIL" + + # type: types.FIL + #PerSector = "0.03 FIL" + [[Addresses]] #PreCommitControl = [] @@ -502,4 +509,54 @@ description: The default curio configuration # type: string #WebHookURL = "" + +[Batching] + [Batching.PreCommit] + # Base fee value below which we should try to send Precommit messages immediately + # + # type: types.FIL + #BaseFeeThreshold = "0.005 FIL" + + # Maximum amount of time any given sector in the batch can wait for the batch to accumulate + # + # type: Duration + #Timeout = "4h0m0s" + + # Time buffer for forceful batch submission before sectors/deal in batch would start expiring + # + # type: Duration + #Slack = "6h0m0s" + + [Batching.Commit] + # Base fee value below which we should try to send Commit messages immediately + # + # type: types.FIL + #BaseFeeThreshold = "0.005 FIL" + + # Maximum amount of time any given sector in the batch can wait for the batch to accumulate + # + # type: Duration + #Timeout = "1h0m0s" + + # Time buffer for forceful batch submission before sectors/deals in batch would start expiring + # + # type: Duration + #Slack = "1h0m0s" + + [Batching.Update] + # Base fee value below which we should try to send Commit messages immediately + # + # type: types.FIL + #BaseFeeThreshold = "0.005 FIL" + + # Maximum amount of time any given sector in the batch can wait for the batch to accumulate + # + # type: Duration + #Timeout = "1h0m0s" + + # Time buffer for forceful batch submission before sectors/deals in batch would start expiring + # + # type: Duration + #Slack = "1h0m0s" + ``` diff --git a/harmony/harmonydb/sql/20231217-sdr-pipeline.sql b/harmony/harmonydb/sql/20231217-sdr-pipeline.sql index b96318b53..7b71768cf 100644 --- a/harmony/harmonydb/sql/20231217-sdr-pipeline.sql +++ b/harmony/harmonydb/sql/20231217-sdr-pipeline.sql @@ -36,8 +36,10 @@ create table sectors_sdr_pipeline ( after_tree_r bool not null default false, -- synthetic proofs (Added in 20240617-synthetic-proofs.sql) --- task_id_synth bigint, --- after_synth bool not null default false, + -- task_id_synth bigint, + -- after_synth bool not null default false, + + -- precommit_ready_at (Added in 20241210-sdr-batching.sql) -- precommit message sending precommit_msg_cid text, @@ -66,6 +68,8 @@ create table sectors_sdr_pipeline ( task_id_move_storage bigint, after_move_storage bool not null default false, + -- commit_ready_at (Added in 20241210-sdr-batching.sql) + -- Commit message sending commit_msg_cid text, diff --git a/harmony/harmonydb/sql/20240611-snap-pipeline.sql b/harmony/harmonydb/sql/20240611-snap-pipeline.sql index f8820546b..6ed7c170c 100644 --- a/harmony/harmonydb/sql/20240611-snap-pipeline.sql +++ b/harmony/harmonydb/sql/20240611-snap-pipeline.sql @@ -26,6 +26,8 @@ CREATE TABLE sectors_snap_pipeline ( task_id_prove BIGINT, after_prove BOOLEAN NOT NULL DEFAULT FALSE, + -- update_ready_at TIMESTAMP, // Added in 20241210-sdr-batching + -- submit prove_msg_cid TEXT, diff --git a/harmony/harmonydb/sql/20241210-sdr-batching.sql b/harmony/harmonydb/sql/20241210-sdr-batching.sql new file mode 100644 index 000000000..3f39a3e08 --- /dev/null +++ b/harmony/harmonydb/sql/20241210-sdr-batching.sql @@ -0,0 +1,66 @@ +ALTER TABLE sectors_sdr_pipeline ADD COLUMN precommit_ready_at TIMESTAMPTZ; +ALTER TABLE sectors_sdr_pipeline ADD COLUMN commit_ready_at TIMESTAMPTZ; + +-- Function to precommit_ready_at value. Used by the trigger +CREATE OR REPLACE FUNCTION set_precommit_ready_at() +RETURNS TRIGGER AS $$ +BEGIN + -- Check if after_tree_r column is changing from FALSE to TRUE + IF OLD.after_tree_r = FALSE AND NEW.after_tree_r = TRUE THEN + -- Explicitly set precommit_ready_at to the current UTC timestamp + UPDATE sectors_sdr_pipeline SET precommit_ready_at = CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + WHERE sp_id = NEW.sp_id AND sector_number = NEW.sector_number; + END IF; + + -- Return the modified row + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + + +-- Function to set commit_ready_at. Used by trigger +CREATE OR REPLACE FUNCTION set_commit_ready_at() +RETURNS TRIGGER AS $$ +BEGIN + -- Check if after_porep column is changing from FALSE to TRUE + IF OLD.after_porep = FALSE AND NEW.after_porep = TRUE THEN + -- Explicitly set precommit_ready_at to the current UTC timestamp + UPDATE sectors_sdr_pipeline SET commit_ready_at = CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + WHERE sp_id = NEW.sp_id AND sector_number = NEW.sector_number; + END IF; + + -- Return the modified row + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER update_precommit_ready_at + AFTER INSERT OR UPDATE OR DELETE ON sectors_sdr_pipeline + FOR EACH ROW EXECUTE FUNCTION set_precommit_ready_at(); + + +CREATE TRIGGER update_commit_ready_at + AFTER INSERT OR UPDATE OR DELETE ON sectors_sdr_pipeline + FOR EACH ROW EXECUTE FUNCTION set_commit_ready_at(); + +ALTER TABLE sectors_snap_pipeline ADD COLUMN update_ready_at TIMESTAMPTZ; + +-- Function to precommit_ready_at value. Used by the trigger +CREATE OR REPLACE FUNCTION set_update_ready_at() +RETURNS TRIGGER AS $$ +BEGIN + -- Check if after_prove column is changing from FALSE to TRUE + IF OLD.after_prove = FALSE AND NEW.after_prove = TRUE THEN + -- Explicitly set update_ready_at to the current UTC timestamp + UPDATE sectors_snap_pipeline SET update_ready_at = CURRENT_TIMESTAMP AT TIME ZONE 'UTC' + WHERE sp_id = NEW.sp_id AND sector_number = NEW.sector_number; + END IF; + + -- Return the modified row + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER update_update_ready_at + AFTER INSERT OR UPDATE OR DELETE ON sectors_snap_pipeline + FOR EACH ROW EXECUTE FUNCTION set_update_ready_at(); diff --git a/itests/curio_test.go b/itests/curio_test.go index b910b640a..b79512d38 100644 --- a/itests/curio_test.go +++ b/itests/curio_test.go @@ -158,6 +158,9 @@ func TestCurioHappyPath(t *testing.T) { require.Contains(t, baseCfg.Addresses[0].MinerAddresses, maddr.String()) + baseCfg.Batching.PreCommit.Timeout = config.Duration(5 * time.Second) + baseCfg.Batching.Commit.Timeout = config.Duration(5 * time.Minute) + temp := os.TempDir() dir, err := os.MkdirTemp(temp, "curio") require.NoError(t, err) diff --git a/tasks/seal/poller.go b/tasks/seal/poller.go index dd51de08e..183961785 100644 --- a/tasks/seal/poller.go +++ b/tasks/seal/poller.go @@ -9,11 +9,14 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + miner15 "github.com/filecoin-project/go-state-types/builtin/v15/miner" + "github.com/filecoin-project/curio/deps/config" "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" "github.com/filecoin-project/curio/lib/promise" + apitypes "github.com/filecoin-project/lotus/api/types" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" "github.com/filecoin-project/lotus/chain/types" ) @@ -41,20 +44,67 @@ type SealPollerAPI interface { StateSectorPreCommitInfo(context.Context, address.Address, abi.SectorNumber, types.TipSetKey) (*miner.SectorPreCommitOnChainInfo, error) StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error) ChainHead(context.Context) (*types.TipSet, error) + StateNetworkVersion(context.Context, types.TipSetKey) (apitypes.NetworkVersion, error) +} + +type preCommitBatchingConfig struct { + MaxPreCommitBatch int + Slack time.Duration + Timeout time.Duration + BaseFeeThreshold abi.TokenAmount +} + +type commitBatchingConfig struct { + MinCommitBatch int + MaxCommitBatch int + Slack time.Duration + Timeout time.Duration + BaseFeeThreshold abi.TokenAmount +} + +type pollerConfig struct { + preCommit preCommitBatchingConfig + commit commitBatchingConfig } type SealPoller struct { db *harmonydb.DB api SealPollerAPI + cfg pollerConfig pollers [numPollers]promise.Promise[harmonytask.AddTaskFunc] } -func NewPoller(db *harmonydb.DB, api SealPollerAPI) *SealPoller { +func NewPoller(db *harmonydb.DB, api SealPollerAPI, cfg *config.CurioConfig) (*SealPoller, error) { + + if cfg.Batching.PreCommit.BaseFeeThreshold == types.MustParseFIL("0") { + return nil, xerrors.Errorf("BaseFeeThreshold cannot be 0 for precommit") + } + if cfg.Batching.Commit.BaseFeeThreshold == types.MustParseFIL("0") { + return nil, xerrors.Errorf("BaseFeeThreshold cannot be 0 for commit") + } + + c := pollerConfig{ + commit: commitBatchingConfig{ + MinCommitBatch: miner.MinAggregatedSectors, + MaxCommitBatch: 256, + Slack: time.Duration(cfg.Batching.Commit.Slack), + Timeout: time.Duration(cfg.Batching.Commit.Timeout), + BaseFeeThreshold: abi.TokenAmount(cfg.Batching.Commit.BaseFeeThreshold), + }, + preCommit: preCommitBatchingConfig{ + MaxPreCommitBatch: miner15.PreCommitSectorBatchMaxSize, + Slack: time.Duration(cfg.Batching.PreCommit.Slack), + Timeout: time.Duration(cfg.Batching.PreCommit.Timeout), + BaseFeeThreshold: abi.TokenAmount(cfg.Batching.PreCommit.BaseFeeThreshold), + }, + } + return &SealPoller{ db: db, api: api, - } + cfg: c, + }, nil } func (s *SealPoller) RunPoller(ctx context.Context) { @@ -83,8 +133,11 @@ NOTE: TaskIDs are ONLY set while the tasks are executing or waiting to execute. */ type pollTask struct { - SpID int64 `db:"sp_id"` - SectorNumber int64 `db:"sector_number"` + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + RegisteredSealProof abi.RegisteredSealProof `db:"reg_seal_proof"` + + TicketEpoch *int64 `db:"ticket_epoch"` TaskSDR *int64 `db:"task_id_sdr"` AfterSDR bool `db:"after_sdr"` @@ -101,6 +154,8 @@ type pollTask struct { TaskSynth *int64 `db:"task_id_synth"` AfterSynth bool `db:"after_synth"` + PreCommitReadyAt *time.Time `db:"precommit_ready_at"` + TaskPrecommitMsg *int64 `db:"task_id_precommit_msg"` AfterPrecommitMsg bool `db:"after_precommit_msg"` @@ -117,6 +172,8 @@ type pollTask struct { TaskMoveStorage *int64 `db:"task_id_move_storage"` AfterMoveStorage bool `db:"after_move_storage"` + CommitReadyAt *time.Time `db:"commit_ready_at"` + TaskCommitMsg *int64 `db:"task_id_commit_msg"` AfterCommitMsg bool `db:"after_commit_msg"` @@ -124,27 +181,59 @@ type pollTask struct { Failed bool `db:"failed"` FailedReason string `db:"failed_reason"` + + StartEpoch abi.ChainEpoch `db:"smallest_start_epoch"` } func (s *SealPoller) poll(ctx context.Context) error { var tasks []pollTask err := s.db.Select(ctx, &tasks, `SELECT - sp_id, sector_number, - task_id_sdr, after_sdr, - task_id_tree_d, after_tree_d, - task_id_tree_c, after_tree_c, - task_id_tree_r, after_tree_r, - task_id_synth, after_synth, - task_id_precommit_msg, after_precommit_msg, - after_precommit_msg_success, seed_epoch, - task_id_porep, porep_proof, after_porep, - task_id_finalize, after_finalize, - task_id_move_storage, after_move_storage, - task_id_commit_msg, after_commit_msg, - after_commit_msg_success, - failed, failed_reason - FROM sectors_sdr_pipeline WHERE after_commit_msg_success != TRUE OR after_move_storage != TRUE`) + p.sp_id, + p.sector_number, + p.reg_seal_proof, + p.ticket_epoch, + p.task_id_sdr, + p.after_sdr, + p.task_id_tree_d, + p.after_tree_d, + p.task_id_tree_c, + p.after_tree_c, + p.task_id_tree_r, + p.after_tree_r, + p.task_id_synth, + p.after_synth, + p.precommit_ready_at, + p.task_id_precommit_msg, + p.after_precommit_msg, + p.after_precommit_msg_success, + p.seed_epoch, + p.task_id_porep, + p.porep_proof, + p.after_porep, + p.task_id_finalize, + p.after_finalize, + p.task_id_move_storage, + p.after_move_storage, + p.commit_ready_at, + p.task_id_commit_msg, + p.after_commit_msg, + p.after_commit_msg_success, + p.failed, + p.failed_reason, + COALESCE( + (SELECT + MIN(LEAST(s.f05_deal_start_epoch, s.direct_start_epoch)) + FROM sectors_sdr_initial_pieces s + WHERE s.sp_id = p.sp_id AND s.sector_number = p.sector_number + ), + 0 + ) AS smallest_start_epoch + FROM + sectors_sdr_pipeline p + WHERE + p.after_commit_msg_success != TRUE + OR p.after_move_storage != TRUE;`) if err != nil { return err } @@ -164,15 +253,17 @@ func (s *SealPoller) poll(ctx context.Context) error { s.pollStartSDRTreeD(ctx, task) s.pollStartSDRTreeRC(ctx, task) s.pollStartSynth(ctx, task) - s.pollStartPrecommitMsg(ctx, task) s.mustPoll(s.pollPrecommitMsgLanded(ctx, task)) s.pollStartPoRep(ctx, task, ts) s.pollStartFinalize(ctx, task, ts) s.pollStartMoveStorage(ctx, task) - s.pollStartCommitMsg(ctx, task) s.mustPoll(s.pollCommitMsgLanded(ctx, task)) } + // Aggregate/Batch PreCommit and Commit + s.pollStartBatchPrecommitMsg(ctx, tasks) + s.pollStartBatchCommitMsg(ctx, tasks) + return nil } diff --git a/tasks/seal/poller_commit_msg.go b/tasks/seal/poller_commit_msg.go index b5e2baf93..78a5a3eb2 100644 --- a/tasks/seal/poller_commit_msg.go +++ b/tasks/seal/poller_commit_msg.go @@ -2,33 +2,185 @@ package seal import ( "context" + "sort" + "time" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" + actorstypes "github.com/filecoin-project/go-state-types/actors" "github.com/filecoin-project/go-state-types/exitcode" + "github.com/filecoin-project/curio/build" "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" ) -func (s *SealPoller) pollStartCommitMsg(ctx context.Context, task pollTask) { - if task.afterPoRep() && len(task.PoRepProof) > 0 && task.TaskCommitMsg == nil && !task.AfterCommitMsg && s.pollers[pollerCommitMsg].IsSet() { - s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_commit_msg IS NULL`, id, task.SpID, task.SectorNumber) - if err != nil { - return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) +func (s *SealPoller) pollStartBatchCommitMsg(ctx context.Context, tasks []pollTask) { + // Make batches based on Proof types + ts, err := s.api.ChainHead(ctx) + if err != nil { + log.Errorf("error getting chain head: %s", err) + return + } + + nv, err := s.api.StateNetworkVersion(ctx, ts.Key()) + if err != nil { + log.Errorf("getting network version: %s", err) + return + } + + av, err := actorstypes.VersionForNetwork(nv) + if err != nil { + log.Errorf("unsupported network version: %s", err) + return + } + + var tsks []pollTask + + for i := range tasks { + if tasks[i].afterPoRep() && len(tasks[i].PoRepProof) > 0 && tasks[i].TaskCommitMsg == nil && !tasks[i].AfterCommitMsg && s.pollers[pollerCommitMsg].IsSet() { + // If CC sector set StartEpoch/CutOff + if tasks[i].StartEpoch == 0 { + maddr, err := address.NewIDAddress(uint64(tasks[i].SpID)) + if err != nil { + log.Errorf("error creating miner address: %s", err) + return + } + + pci, err := s.api.StateSectorPreCommitInfo(ctx, maddr, abi.SectorNumber(tasks[i].SectorNumber), ts.Key()) + if err != nil { + log.Errorf("getting precommit info: %s", err) + return + } + + if pci == nil { + log.Errorf("precommit info not found for sp %s and sector %d", maddr.String(), tasks[i].SectorNumber) + return + } + + mpcd, err := policy.GetMaxProveCommitDuration(av, tasks[i].RegisteredSealProof) + if err != nil { + log.Errorf("getting max prove commit duration: %s", err) + return + } + + tasks[i].StartEpoch = pci.PreCommitEpoch + mpcd + } + tsks = append(tsks, tasks[i]) + } + } + + sort.Slice(tsks, func(i, j int) bool { + return tsks[i].StartEpoch < tsks[j].StartEpoch + }) + + batchMap := make(map[int64]map[abi.RegisteredSealProof][]pollTask) + for i := range tsks { + // Check if SpID exists in batchMap + v, ok := batchMap[tsks[i].SpID] + if !ok { + // If not, initialize a new map for the RegisteredSealProof + v = make(map[abi.RegisteredSealProof][]pollTask) + batchMap[tsks[i].SpID] = v + } + // Append the task to the correct RegisteredSealProof + v[tsks[i].RegisteredSealProof] = append(v[tsks[i].RegisteredSealProof], tsks[i]) + } + + // Send batches per MinerID and per Proof type based on the following logic: + // 1. Check if Slack for any sector is reaching, if yes then send full batch + // 2. Check if timeout is reaching for any sector in the batch, if yes, then send the batch + // 3. Check if baseFee below set threshold. If yes then send all batches + + for spid, miners := range batchMap { + for _, pts := range miners { + // Break into batches + var batches []sectorBatch + for i := 0; i < len(pts); i += s.cfg.commit.MaxCommitBatch { + // Create a batch of size `maxBatchSize` or smaller for the last batch + end := i + s.cfg.commit.MaxCommitBatch + if end > len(pts) { + end = len(pts) + } + var batch []int64 + cutoff := abi.ChainEpoch(0) + earliest := time.Now() + for _, pt := range pts[i:end] { + + if cutoff == 0 || pt.StartEpoch < cutoff { + cutoff = pt.StartEpoch + } + + if pt.CommitReadyAt.Before(earliest) { + earliest = *pt.CommitReadyAt + } + + batch = append(batch, pt.SectorNumber) + } + + batches = append(batches, sectorBatch{ + cutoff: cutoff, + sectors: batch, + }) } - if n != 1 { - return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + + for i := range batches { + batch := batches[i] + //sectors := batch.sectors + // Process batch if slack has reached + if (time.Duration(batch.cutoff-ts.Height()) * time.Duration(build.BlockDelaySecs) * time.Second) < s.cfg.commit.Slack { + s.sendCommitBatch(ctx, spid, batch.sectors) + continue + } + // Process batch if timeout has reached + if batch.earliest.Add(s.cfg.commit.Timeout).After(time.Now()) { + s.sendCommitBatch(ctx, spid, batch.sectors) + continue + } + // Process batch if base fee is low enough for us to send + if ts.MinTicketBlock().ParentBaseFee.LessThan(s.cfg.commit.BaseFeeThreshold) { + s.sendCommitBatch(ctx, spid, batch.sectors) + continue + } } + } + } +} + +func (s *SealPoller) sendCommitBatch(ctx context.Context, spid int64, sectors []int64) { + if len(sectors) < miner.MinAggregatedSectors { + for i := range sectors { + s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_commit_msg IS NULL`, id, spid, sectors[i]) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + } - return true, nil - }) + return true, nil + }) + } } + + s.pollers[pollerCommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_commit_msg = $1 WHERE sp_id = $2 AND sector_number = ANY($3) AND task_id_commit_msg IS NULL`, id, spid, sectors) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != len(sectors) { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + } + + return true, nil + }) } func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) error { diff --git a/tasks/seal/poller_precommit_msg.go b/tasks/seal/poller_precommit_msg.go index b9ce05019..39feb46bb 100644 --- a/tasks/seal/poller_precommit_msg.go +++ b/tasks/seal/poller_precommit_msg.go @@ -2,6 +2,8 @@ package seal import ( "context" + "sort" + "time" "golang.org/x/xerrors" @@ -9,6 +11,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/exitcode" + "github.com/filecoin-project/curio/build" "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" @@ -16,22 +19,117 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -func (s *SealPoller) pollStartPrecommitMsg(ctx context.Context, task pollTask) { - if task.TaskPrecommitMsg == nil && !task.AfterPrecommitMsg && task.afterSynth() && s.pollers[pollerPrecommitMsg].IsSet() { - s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = $3 AND task_id_precommit_msg IS NULL AND after_synth = TRUE`, id, task.SpID, task.SectorNumber) - if err != nil { - return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) +type sectorBatch struct { + cutoff abi.ChainEpoch + earliest time.Time + sectors []int64 +} + +func (s *SealPoller) pollStartBatchPrecommitMsg(ctx context.Context, tasks []pollTask) { + // Make batches based on Proof types + var tsks []pollTask + for i := range tasks { + if tasks[i].TaskPrecommitMsg == nil && !tasks[i].AfterPrecommitMsg && tasks[i].afterSynth() && s.pollers[pollerPrecommitMsg].IsSet() { + // If CC sector set StartEpoch/CutOff + if tasks[i].StartEpoch == 0 { + tasks[i].StartEpoch = abi.ChainEpoch(*tasks[i].TicketEpoch) + policy.MaxPreCommitRandomnessLookback } - if n != 1 { - return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + tsks = append(tsks, tasks[i]) + } + } + + // Sort in ascending order to allow maximum time for sectors to wait for base free drop + sort.Slice(tsks, func(i, j int) bool { + return tsks[i].StartEpoch < tsks[j].StartEpoch + }) + + batchMap := make(map[int64]map[abi.RegisteredSealProof][]pollTask) + for i := range tsks { + // Check if SpID exists in batchMap + v, ok := batchMap[tsks[i].SpID] + if !ok { + // If not, initialize a new map for the RegisteredSealProof + v = make(map[abi.RegisteredSealProof][]pollTask) + batchMap[tsks[i].SpID] = v + } + // Append the task to the correct RegisteredSealProof + v[tsks[i].RegisteredSealProof] = append(v[tsks[i].RegisteredSealProof], tsks[i]) + } + + // Send batches per MinerID and per Proof type based on the following logic: + // 1. Check if Slack for any sector is reaching, if yes then send full batch + // 2. Check if timeout is reaching for any sector in the batch, if yes, then send the batch + // 3. Check if baseFee below set threshold. If yes then send all batches + + ts, err := s.api.ChainHead(ctx) + if err != nil { + log.Errorf("error getting chain head: %s", err) + return + } + + for spid, miners := range batchMap { + for _, pts := range miners { + // Break into batches + var batches []sectorBatch + for i := 0; i < len(pts); i += s.cfg.preCommit.MaxPreCommitBatch { + // Create a batch of size `maxBatchSize` or smaller for the last batch + end := i + s.cfg.preCommit.MaxPreCommitBatch + if end > len(pts) { + end = len(pts) + } + var batch []int64 + cutoff := abi.ChainEpoch(0) + earliest := time.Now() + for _, pt := range pts[i:end] { + batch = append(batch, pt.SectorNumber) + if cutoff == 0 || pt.StartEpoch < cutoff { + cutoff = pt.StartEpoch + } + if pt.PreCommitReadyAt.Before(earliest) { + earliest = *pt.PreCommitReadyAt + } + } + + batches = append(batches, sectorBatch{ + cutoff: cutoff, + sectors: batch, + }) } - return true, nil - }) + for i := range batches { + batch := batches[i] + //sectors := batch.sectors + // Process batch if slack has reached + if (time.Duration(batch.cutoff-ts.Height()) * time.Duration(build.BlockDelaySecs) * time.Second) < s.cfg.preCommit.Slack { + s.sendPreCommitBatch(ctx, spid, batch.sectors) + } + // Process batch if timeout has reached + if batch.earliest.Add(s.cfg.preCommit.Timeout).After(time.Now()) { + s.sendPreCommitBatch(ctx, spid, batch.sectors) + } + // Process batch if base fee is low enough for us to send + if ts.MinTicketBlock().ParentBaseFee.LessThan(s.cfg.preCommit.BaseFeeThreshold) { + s.sendPreCommitBatch(ctx, spid, batch.sectors) + } + } + } } } +func (s *SealPoller) sendPreCommitBatch(ctx context.Context, spid int64, sectors []int64) { + s.pollers[pollerPrecommitMsg].Val(ctx)(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + n, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_precommit_msg = $1 WHERE sp_id = $2 AND sector_number = ANY($3) AND task_id_precommit_msg IS NULL AND after_synth = TRUE`, id, spid, sectors) + if err != nil { + return false, xerrors.Errorf("update sectors_sdr_pipeline: %w", err) + } + if n != len(sectors) { + return false, xerrors.Errorf("expected to update 1 row, updated %d", n) + } + + return true, nil + }) +} + type dbExecResult struct { PrecommitMsgCID *string `db:"precommit_msg_cid"` CommitMsgCID *string `db:"commit_msg_cid"` diff --git a/tasks/seal/task_submit_commit.go b/tasks/seal/task_submit_commit.go index fb95f4e74..777fb3d3d 100644 --- a/tasks/seal/task_submit_commit.go +++ b/tasks/seal/task_submit_commit.go @@ -6,6 +6,8 @@ import ( "encoding/json" "fmt" + "github.com/ipfs/go-cid" + "go.uber.org/multierr" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -16,6 +18,8 @@ import ( miner2 "github.com/filecoin-project/go-state-types/builtin/v13/miner" verifreg13 "github.com/filecoin-project/go-state-types/builtin/v13/verifreg" verifregtypes9 "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" + "github.com/filecoin-project/go-state-types/network" + "github.com/filecoin-project/go-state-types/proof" "github.com/filecoin-project/curio/deps/config" "github.com/filecoin-project/curio/harmony/harmonydb" @@ -23,11 +27,13 @@ import ( "github.com/filecoin-project/curio/harmony/resources" "github.com/filecoin-project/curio/harmony/taskhelp" "github.com/filecoin-project/curio/lib/multictladdr" + "github.com/filecoin-project/curio/lib/storiface" "github.com/filecoin-project/curio/tasks/message" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/market" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/storage/ctladdr" ) @@ -40,41 +46,41 @@ type SubmitCommitAPI interface { StateGetAllocation(ctx context.Context, clientAddr address.Address, allocationId verifregtypes9.AllocationId, tsk types.TipSetKey) (*verifregtypes9.Allocation, error) StateGetAllocationIdForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (verifregtypes9.AllocationId, error) StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error) + StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error) + GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error) ctladdr.NodeApi } type commitConfig struct { - maxFee types.FIL + feeCfg *config.CurioFees RequireActivationSuccess bool RequireNotificationSuccess bool - CollateralFromMinerBalance bool - DisableCollateralFallback bool } type SubmitCommitTask struct { - sp *SealPoller - db *harmonydb.DB - api SubmitCommitAPI + sp *SealPoller + db *harmonydb.DB + api SubmitCommitAPI + prover storiface.Prover sender *message.Sender as *multictladdr.MultiAddressSelector cfg commitConfig } -func NewSubmitCommitTask(sp *SealPoller, db *harmonydb.DB, api SubmitCommitAPI, sender *message.Sender, as *multictladdr.MultiAddressSelector, cfg *config.CurioConfig) *SubmitCommitTask { +func NewSubmitCommitTask(sp *SealPoller, db *harmonydb.DB, api SubmitCommitAPI, sender *message.Sender, as *multictladdr.MultiAddressSelector, cfg *config.CurioConfig, prover storiface.Prover) *SubmitCommitTask { cnfg := commitConfig{ - maxFee: cfg.Fees.MaxCommitGasFee, + feeCfg: &cfg.Fees, RequireActivationSuccess: cfg.Subsystems.RequireActivationSuccess, RequireNotificationSuccess: cfg.Subsystems.RequireNotificationSuccess, - CollateralFromMinerBalance: cfg.Fees.CollateralFromMinerBalance, - DisableCollateralFallback: cfg.Fees.DisableCollateralFallback, } return &SubmitCommitTask{ sp: sp, db: db, api: api, + prover: prover, sender: sender, as: as, cfg: cnfg, @@ -108,47 +114,29 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) ctx := context.Background() var sectorParamsArr []struct { - SpID int64 `db:"sp_id"` - SectorNumber int64 `db:"sector_number"` - Proof []byte `db:"porep_proof"` + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + Proof []byte `db:"porep_proof"` + RegSealProof abi.RegisteredSealProof `db:"reg_seal_proof"` + TicketValue []byte `db:"ticket_value"` + SealedCID string `db:"tree_r_cid"` + UnsealedCID string `db:"tree_d_cid"` + SeedValue []byte `db:"seed_value"` } err = s.db.Select(ctx, §orParamsArr, ` - SELECT sp_id, sector_number, porep_proof + SELECT sp_id, sector_number, porep_proof, ticket_value, tree_r_cid, tree_d_cid FROM sectors_sdr_pipeline - WHERE task_id_commit_msg = $1`, taskID) + WHERE task_id_commit_msg = $1 ORDER BY sector_number ASC`, taskID) if err != nil { return false, xerrors.Errorf("getting sector params: %w", err) } - if len(sectorParamsArr) != 1 { - return false, xerrors.Errorf("expected 1 sector params, got %d", len(sectorParamsArr)) + if len(sectorParamsArr) == 0 || (len(sectorParamsArr) < 4 && len(sectorParamsArr) != 1) { + return false, xerrors.Errorf("expected either 1 or at least 4 sector params, got %d", len(sectorParamsArr)) } - sectorParams := sectorParamsArr[0] - var pieces []struct { - PieceIndex int64 `db:"piece_index"` - PieceCID string `db:"piece_cid"` - PieceSize int64 `db:"piece_size"` - Proposal json.RawMessage `db:"f05_deal_proposal"` - Manifest json.RawMessage `db:"direct_piece_activation_manifest"` - DealID abi.DealID `db:"f05_deal_id"` - } - - err = s.db.Select(ctx, &pieces, ` - SELECT piece_index, - piece_cid, - piece_size, - f05_deal_proposal, - direct_piece_activation_manifest, - COALESCE(f05_deal_id, 0) AS f05_deal_id - FROM sectors_sdr_initial_pieces - WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, sectorParams.SpID, sectorParams.SectorNumber) - if err != nil { - return false, xerrors.Errorf("getting pieces: %w", err) - } - - maddr, err := address.NewIDAddress(uint64(sectorParams.SpID)) + maddr, err := address.NewIDAddress(uint64(sectorParamsArr[0].SpID)) if err != nil { return false, xerrors.Errorf("getting miner address: %w", err) } @@ -158,170 +146,345 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) return false, xerrors.Errorf("getting chain head: %w", err) } - pci, err := s.api.StateSectorPreCommitInfo(ctx, maddr, abi.SectorNumber(sectorParams.SectorNumber), ts.Key()) - if err != nil { - return false, xerrors.Errorf("getting precommit info: %w", err) - } - if pci == nil { - return false, xerrors.Errorf("precommit info not found on chain") - } - - mi, err := s.api.StateMinerInfo(ctx, maddr, types.EmptyTSK) - if err != nil { - return false, xerrors.Errorf("getting miner info: %w", err) - } - - var verifiedSize int64 + regProof := sectorParamsArr[0].RegSealProof params := miner.ProveCommitSectors3Params{ RequireActivationSuccess: s.cfg.RequireActivationSuccess, RequireNotificationSuccess: s.cfg.RequireNotificationSuccess, } - var pams []miner.PieceActivationManifest + collateral := big.Zero() + var infos []proof.AggregateSealVerifyInfo + var sectors []int64 - for _, piece := range pieces { - if piece.Proposal != nil { - var prop *market.DealProposal - err = json.Unmarshal(piece.Proposal, &prop) - if err != nil { - return false, xerrors.Errorf("marshalling json to deal proposal: %w", err) - } - alloc, err := s.api.StateGetAllocationIdForPendingDeal(ctx, piece.DealID, types.EmptyTSK) - if err != nil { - return false, xerrors.Errorf("getting allocation for deal %d: %w", piece.DealID, err) - } - clid, err := s.api.StateLookupID(ctx, prop.Client, types.EmptyTSK) - if err != nil { - return false, xerrors.Errorf("getting client address for deal %d: %w", piece.DealID, err) - } + for _, sectorParams := range sectorParamsArr { + sectorParams := sectorParams - clientId, err := address.IDFromAddress(clid) - if err != nil { - return false, xerrors.Errorf("getting client address for deal %d: %w", piece.DealID, err) - } + // Check miner ID is same for all sectors in batch + tmpMaddr, err := address.NewIDAddress(uint64(sectorParams.SpID)) + if err != nil { + return false, xerrors.Errorf("getting miner address: %w", err) + } + + if maddr != tmpMaddr { + return false, xerrors.Errorf("expected miner IDs to be same (%s) for all sectors in a batch but found %s", maddr.String(), tmpMaddr.String()) + } + + // Check proof types is same for all sectors in batch + if sectorParams.RegSealProof != regProof { + return false, xerrors.Errorf("expected proofs type to be same (%d) for all sectors in a batch but found %d", regProof, sectorParams.RegSealProof) + } + + var pieces []struct { + PieceIndex int64 `db:"piece_index"` + PieceCID string `db:"piece_cid"` + PieceSize int64 `db:"piece_size"` + Proposal json.RawMessage `db:"f05_deal_proposal"` + Manifest json.RawMessage `db:"direct_piece_activation_manifest"` + DealID abi.DealID `db:"f05_deal_id"` + } + + err = s.db.Select(ctx, &pieces, ` + SELECT piece_index, + piece_cid, + piece_size, + f05_deal_proposal, + direct_piece_activation_manifest, + COALESCE(f05_deal_id, 0) AS f05_deal_id + FROM sectors_sdr_initial_pieces + WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, sectorParams.SpID, sectorParams.SectorNumber) + if err != nil { + return false, xerrors.Errorf("getting pieces: %w", err) + } + + pci, err := s.api.StateSectorPreCommitInfo(ctx, maddr, abi.SectorNumber(sectorParams.SectorNumber), ts.Key()) + if err != nil { + return false, xerrors.Errorf("getting precommit info: %w", err) + } + if pci == nil { + return false, xerrors.Errorf("precommit info not found on chain") + } - var vac *miner2.VerifiedAllocationKey - if alloc != verifregtypes9.NoAllocationID { - vac = &miner2.VerifiedAllocationKey{ - Client: abi.ActorID(clientId), - ID: verifreg13.AllocationId(alloc), + var verifiedSize abi.PaddedPieceSize + + var pams []miner.PieceActivationManifest + + var sectorFailed bool + + for _, piece := range pieces { + var pam *miner.PieceActivationManifest + if piece.Proposal != nil { + var prop *market.DealProposal + err = json.Unmarshal(piece.Proposal, &prop) + if err != nil { + return false, xerrors.Errorf("marshalling json to deal proposal: %w", err) + } + alloc, err := s.api.StateGetAllocationIdForPendingDeal(ctx, piece.DealID, types.EmptyTSK) + if err != nil { + return false, xerrors.Errorf("getting allocation for deal %d: %w", piece.DealID, err) + } + clid, err := s.api.StateLookupID(ctx, prop.Client, types.EmptyTSK) + if err != nil { + return false, xerrors.Errorf("getting client address for deal %d: %w", piece.DealID, err) } - verifiedSize += piece.PieceSize - } - payload, err := cborutil.Dump(piece.DealID) - if err != nil { - return false, xerrors.Errorf("serializing deal id: %w", err) - } + clientId, err := address.IDFromAddress(clid) + if err != nil { + return false, xerrors.Errorf("getting client address for deal %d: %w", piece.DealID, err) + } - pams = append(pams, miner.PieceActivationManifest{ - CID: prop.PieceCID, - Size: prop.PieceSize, - VerifiedAllocationKey: vac, - Notify: []miner2.DataActivationNotification{ - { - Address: market.Address, - Payload: payload, + var vac *miner2.VerifiedAllocationKey + if alloc != verifregtypes9.NoAllocationID { + vac = &miner2.VerifiedAllocationKey{ + Client: abi.ActorID(clientId), + ID: verifreg13.AllocationId(alloc), + } + } + + payload, err := cborutil.Dump(piece.DealID) + if err != nil { + return false, xerrors.Errorf("serializing deal id: %w", err) + } + + pam = &miner.PieceActivationManifest{ + CID: prop.PieceCID, + Size: prop.PieceSize, + VerifiedAllocationKey: vac, + Notify: []miner2.DataActivationNotification{ + { + Address: market.Address, + Payload: payload, + }, }, - }, - }) - } else { - var pam *miner.PieceActivationManifest - err = json.Unmarshal(piece.Manifest, &pam) - if err != nil { - return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err) + } + } else { + err = json.Unmarshal(piece.Manifest, &pam) + if err != nil { + return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err) + } } - _, err = AllocationCheck(ctx, s.api, pam, pci.Info.Expiration, abi.ActorID(sectorParams.SpID), ts) + unrecoverable, err := AllocationCheck(ctx, s.api, pam, pci.Info.Expiration, abi.ActorID(sectorParams.SpID), ts) if err != nil { - return false, err + if unrecoverable { + _, err2 := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET + failed = TRUE, failed_at = NOW(), failed_reason = 'alloc-check', failed_reason_msg = $1, + task_id_commit_msg = NULL, after_commit_msg = FALSE + WHERE task_id_commit_msg = $2 AND sp_id = $3 AND sector_number = $4`, err.Error(), sectorParams.SpID, sectorParams.SectorNumber) + if err2 != nil { + return false, xerrors.Errorf("allocation check failed with an unrecoverable issue: %w", multierr.Combine(err, err2)) + } + log.Errorw("allocation check failed with an unrecoverable issue", "sp", sectorParams.SpID, "sector", sectorParams.SectorNumber, "err", err) + sectorFailed = true + break + } + } + if pam.VerifiedAllocationKey != nil || pam.VerifiedAllocationKey.ID != verifreg13.NoAllocationID { + verifiedSize += pam.Size } + pams = append(pams, *pam) } + + if sectorFailed { + continue // Skip this sector + } + + ssize, err := pci.Info.SealProof.SectorSize() + if err != nil { + return false, xerrors.Errorf("could not get sector size: %w", err) + } + + collateralPerSector, err := s.api.StateMinerInitialPledgeForSector(ctx, pci.Info.Expiration-ts.Height(), ssize, uint64(verifiedSize), ts.Key()) + if err != nil { + return false, xerrors.Errorf("getting initial pledge collateral: %w", err) + } + + collateralPerSector = big.Sub(collateralPerSector, pci.PreCommitDeposit) + if collateralPerSector.LessThan(big.Zero()) { + collateralPerSector = big.Zero() + } + + collateral = big.Add(collateral, collateralPerSector) + + params.SectorActivations = append(params.SectorActivations, miner.SectorActivationManifest{ + SectorNumber: abi.SectorNumber(sectorParams.SectorNumber), + Pieces: pams, + }) + params.SectorProofs = append(params.SectorProofs, sectorParams.Proof) + + sealedCID, err := cid.Parse(sectorParams.SealedCID) + if err != nil { + return false, xerrors.Errorf("parsing sealed CID: %w", err) + } + + unsealedCID, err := cid.Parse(sectorParams.UnsealedCID) + if err != nil { + return false, xerrors.Errorf("parsing unsealed CID: %w", err) + } + + infos = append(infos, proof.AggregateSealVerifyInfo{ + Number: abi.SectorNumber(sectorParams.SectorNumber), + Randomness: sectorParams.TicketValue, + InteractiveRandomness: sectorParams.SeedValue, + SealedCID: sealedCID, + UnsealedCID: unsealedCID, + }) + + sectors = append(sectors, sectorParams.SectorNumber) } - params.SectorActivations = append(params.SectorActivations, miner.SectorActivationManifest{ - SectorNumber: abi.SectorNumber(sectorParams.SectorNumber), - Pieces: pams, - }) - params.SectorProofs = append(params.SectorProofs, sectorParams.Proof) + maxFee := s.cfg.feeCfg.MaxCommitBatchGasFee.FeeForSectors(len(infos)) - enc := new(bytes.Buffer) - if err := params.MarshalCBOR(enc); err != nil { - return false, xerrors.Errorf("could not serialize commit params: %w", err) + msg, err := s.createCommitMessage(ctx, maddr, sectorParamsArr[0].RegSealProof, sectorParamsArr[0].SpID, collateral, params, infos, ts) + if err != nil { + return false, xerrors.Errorf("failed to create the commit message: %w", err) + } + + mcid, err := s.sender.Send(ctx, msg, &api.MessageSendSpec{MaxFee: maxFee}, "commit") + if err != nil { + return false, xerrors.Errorf("pushing message to mpool: %w", err) } - ssize, err := pci.Info.SealProof.SectorSize() + _, err = s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET commit_msg_cid = $1, after_commit_msg = TRUE, + task_id_commit_msg = NULL WHERE task_id_commit_msg = $2 AND sp_id = $3 AND sector_number = ANY($4)`, mcid, taskID, sectorParamsArr[0].SpID, sectors) if err != nil { - return false, xerrors.Errorf("could not get sector size: %w", err) + return false, xerrors.Errorf("updating commit_msg_cid: %w", err) } - collateral, err := s.api.StateMinerInitialPledgeForSector(ctx, pci.Info.Expiration-ts.Height(), ssize, uint64(verifiedSize), ts.Key()) + _, err = s.db.Exec(ctx, `INSERT INTO message_waits (signed_message_cid) VALUES ($1)`, mcid) if err != nil { - return false, xerrors.Errorf("getting initial pledge collateral: %w", err) + return false, xerrors.Errorf("inserting into message_waits: %w", err) } - collateral = big.Sub(collateral, pci.PreCommitDeposit) - if collateral.LessThan(big.Zero()) { - collateral = big.Zero() + if err := s.transferFinalizedSectorData(ctx, sectorParamsArr[0].SpID, sectors); err != nil { + return false, xerrors.Errorf("transferring finalized sector data: %w", err) } - if s.cfg.CollateralFromMinerBalance { - if s.cfg.DisableCollateralFallback { - collateral = big.Zero() + return true, nil +} + +func (s *SubmitCommitTask) createCommitMessage(ctx context.Context, maddr address.Address, sealProof abi.RegisteredSealProof, SpID int64, collateral abi.TokenAmount, params miner.ProveCommitSectors3Params, infos []proof.AggregateSealVerifyInfo, ts *types.TipSet) (*types.Message, error) { + aggParams := params + var aggCost, cost big.Int + var msg, aggMsg *types.Message + maxFee := s.cfg.feeCfg.MaxCommitBatchGasFee.FeeForSectors(len(infos)) + + nv, err := s.api.StateNetworkVersion(ctx, ts.Key()) + if err != nil { + return nil, xerrors.Errorf("getting network version: %s", err) + } + + balance, err := s.api.StateMinerAvailableBalance(ctx, maddr, types.EmptyTSK) + if err != nil { + if err != nil { + return nil, xerrors.Errorf("getting miner balance: %w", err) } - balance, err := s.api.StateMinerAvailableBalance(ctx, maddr, types.EmptyTSK) + } + + mi, err := s.api.StateMinerInfo(ctx, maddr, types.EmptyTSK) + if err != nil { + return nil, xerrors.Errorf("getting miner info: %w", err) + } + + if len(infos) > 4 { + arp, err := aggregateProofType(nv) if err != nil { - if err != nil { - return false, xerrors.Errorf("getting miner balance: %w", err) - } + return nil, xerrors.Errorf("getting aggregate proof type: %w", err) } - collateral = big.Sub(collateral, balance) - if collateral.LessThan(big.Zero()) { - collateral = big.Zero() + aggParams.AggregateProofType = &arp + + aggParams.AggregateProof, err = s.prover.AggregateSealProofs(proof.AggregateSealVerifyProofAndInfos{ + Miner: abi.ActorID(SpID), + SealProof: sealProof, + AggregateProof: arp, + Infos: infos, + }, aggParams.SectorProofs) + + if err != nil { + return nil, xerrors.Errorf("aggregating proofs: %w", err) + } + aggParams.SectorProofs = nil // can't be set when aggregating + + aggFeeRaw, err := policy.AggregateProveCommitNetworkFee(nv, len(infos), ts.MinTicketBlock().ParentBaseFee) + if err != nil { + return nil, xerrors.Errorf("getting aggregate commit network fee: %s", err) } + + aggFee := big.Div(big.Mul(aggFeeRaw, big.NewInt(110)), big.NewInt(110)) + + aggCollateral := big.Add(collateral, aggFee) + aggCollateral = s.calculateCollateral(balance, aggCollateral) + goodFunds := big.Add(maxFee, aggCollateral) + enc := new(bytes.Buffer) + if err := params.MarshalCBOR(enc); err != nil { + return nil, xerrors.Errorf("could not serialize commit params: %w", err) + } + aggMsg, err = s.gasEstimateCommit(ctx, maddr, enc.Bytes(), mi, goodFunds, aggCollateral, maxFee, ts.Key()) + if err != nil { + return nil, err + } + aggGas := big.Mul(big.Add(ts.MinTicketBlock().ParentBaseFee, aggMsg.GasPremium), big.NewInt(aggMsg.GasLimit)) + aggCost = big.Add(aggGas, aggFee) } - a, _, err := s.as.AddressFor(ctx, s.api, maddr, mi, api.CommitAddr, collateral, big.Zero()) + { + collateral = s.calculateCollateral(balance, collateral) + goodFunds := big.Add(maxFee, collateral) + enc := new(bytes.Buffer) + if err := params.MarshalCBOR(enc); err != nil { + return nil, xerrors.Errorf("could not serialize commit params: %w", err) + } + msg, err = s.gasEstimateCommit(ctx, maddr, enc.Bytes(), mi, goodFunds, collateral, maxFee, ts.Key()) + if err != nil { + return nil, err + } + gas := big.Mul(big.Add(ts.MinTicketBlock().ParentBaseFee, msg.GasPremium), big.NewInt(msg.GasLimit)) + cost = gas + } + + if aggCost.Nil() || cost.LessThan(aggCost) { + log.Infow("Sending commit message without aggregate", "Batch Cost", cost, "Aggregate Cost", aggCost) + return msg, nil + } + return aggMsg, nil +} + +func (s *SubmitCommitTask) gasEstimateCommit(ctx context.Context, maddr address.Address, params []byte, mi api.MinerInfo, goodFunds, collateral, maxFee abi.TokenAmount, ts types.TipSetKey) (*types.Message, error) { + a, _, err := s.as.AddressFor(ctx, s.api, maddr, mi, api.CommitAddr, goodFunds, collateral) if err != nil { - return false, xerrors.Errorf("getting address for precommit: %w", err) + return nil, xerrors.Errorf("getting address for precommit: %w", err) } msg := &types.Message{ To: maddr, From: a, Method: builtin.MethodsMiner.ProveCommitSectors3, - Params: enc.Bytes(), + Params: params, Value: collateral, } mss := &api.MessageSendSpec{ - MaxFee: abi.TokenAmount(s.cfg.maxFee), + MaxFee: maxFee, } - mcid, err := s.sender.Send(ctx, msg, mss, "commit") - if err != nil { - return false, xerrors.Errorf("pushing message to mpool: %w", err) - } - - _, err = s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET commit_msg_cid = $1, after_commit_msg = TRUE, task_id_commit_msg = NULL WHERE sp_id = $2 AND sector_number = $3`, mcid, sectorParams.SpID, sectorParams.SectorNumber) - if err != nil { - return false, xerrors.Errorf("updating commit_msg_cid: %w", err) - } + return s.api.GasEstimateMessageGas(ctx, msg, mss, ts) +} - _, err = s.db.Exec(ctx, `INSERT INTO message_waits (signed_message_cid) VALUES ($1)`, mcid) - if err != nil { - return false, xerrors.Errorf("inserting into message_waits: %w", err) - } +func (s *SubmitCommitTask) calculateCollateral(minerBalance abi.TokenAmount, collateral abi.TokenAmount) abi.TokenAmount { + if s.cfg.feeCfg.CollateralFromMinerBalance { + if s.cfg.feeCfg.DisableCollateralFallback { + collateral = big.Zero() + } - if err := s.transferFinalizedSectorData(ctx, sectorParams.SpID, sectorParams.SectorNumber); err != nil { - return false, xerrors.Errorf("transferring finalized sector data: %w", err) + collateral = big.Sub(collateral, minerBalance) + if collateral.LessThan(big.Zero()) { + collateral = big.Zero() + } } - - return true, nil + return collateral } -func (s *SubmitCommitTask) transferFinalizedSectorData(ctx context.Context, spID, sectorNum int64) error { +func (s *SubmitCommitTask) transferFinalizedSectorData(ctx context.Context, spID int64, sectors []int64) error { if _, err := s.db.Exec(ctx, ` INSERT INTO sectors_meta ( sp_id, @@ -356,7 +519,7 @@ func (s *SubmitCommitTask) transferFinalizedSectorData(ctx context.Context, spID sectors_sdr_pipeline WHERE sp_id = $1 AND - sector_number = $2 + sector_number = ANY($2) ON CONFLICT (sp_id, sector_num) DO UPDATE SET reg_seal_proof = excluded.reg_seal_proof, ticket_epoch = excluded.ticket_epoch, @@ -367,7 +530,7 @@ func (s *SubmitCommitTask) transferFinalizedSectorData(ctx context.Context, spID msg_cid_commit = excluded.msg_cid_commit, seed_epoch = excluded.seed_epoch, seed_value = excluded.seed_value; - `, spID, sectorNum); err != nil { + `, spID, sectors); err != nil { return fmt.Errorf("failed to insert/update sectors_meta: %w", err) } @@ -404,7 +567,7 @@ func (s *SubmitCommitTask) transferFinalizedSectorData(ctx context.Context, spID sectors_sdr_initial_pieces WHERE sp_id = $1 AND - sector_number = $2 + sector_number = ANY($2) ON CONFLICT (sp_id, sector_num, piece_num) DO UPDATE SET piece_cid = excluded.piece_cid, piece_size = excluded.piece_size, @@ -415,7 +578,7 @@ func (s *SubmitCommitTask) transferFinalizedSectorData(ctx context.Context, spID f05_deal_id = excluded.f05_deal_id, ddo_pam = excluded.ddo_pam, f05_deal_proposal = excluded.f05_deal_proposal; - `, spID, sectorNum); err != nil { + `, spID, sectors); err != nil { return fmt.Errorf("failed to insert/update sector_meta_pieces: %w", err) } @@ -430,7 +593,7 @@ func (s *SubmitCommitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonyta func (s *SubmitCommitTask) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ Max: taskhelp.Max(128), - Name: "CommitSubmit", + Name: "CommitBatch", Cost: resources.Resources{ Cpu: 0, Gpu: 0, @@ -489,3 +652,10 @@ func AllocationCheck(ctx context.Context, api AllocNodeApi, piece *miner.PieceAc } var _ harmonytask.TaskInterface = &SubmitCommitTask{} + +func aggregateProofType(nv network.Version) (abi.RegisteredAggregationProof, error) { + if nv < network.Version16 { + return abi.RegisteredAggregationProof_SnarkPackV1, nil + } + return abi.RegisteredAggregationProof_SnarkPackV2, nil +} diff --git a/tasks/seal/task_submit_precommit.go b/tasks/seal/task_submit_precommit.go index 323db913f..0e6bcdc84 100644 --- a/tasks/seal/task_submit_precommit.go +++ b/tasks/seal/task_submit_precommit.go @@ -3,6 +3,7 @@ package seal import ( "bytes" "context" + "fmt" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -15,6 +16,7 @@ import ( miner12 "github.com/filecoin-project/go-state-types/builtin/v12/miner" "github.com/filecoin-project/go-state-types/network" + "github.com/filecoin-project/curio/deps/config" "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" "github.com/filecoin-project/curio/harmony/resources" @@ -35,32 +37,27 @@ type SubmitPrecommitTaskApi interface { StateMinerInfo(context.Context, address.Address, types.TipSetKey) (api.MinerInfo, error) StateNetworkVersion(context.Context, types.TipSetKey) (network.Version, error) StateMinerAvailableBalance(context.Context, address.Address, types.TipSetKey) (big.Int, error) + GasEstimateMessageGas(context.Context, *types.Message, *api.MessageSendSpec, types.TipSetKey) (*types.Message, error) ctladdr.NodeApi } type SubmitPrecommitTask struct { - sp *SealPoller - db *harmonydb.DB - api SubmitPrecommitTaskApi - sender *message.Sender - as *multictladdr.MultiAddressSelector - CollateralFromMinerBalance bool - DisableCollateralFallback bool - - maxFee types.FIL + sp *SealPoller + db *harmonydb.DB + api SubmitPrecommitTaskApi + sender *message.Sender + as *multictladdr.MultiAddressSelector + feeCfg *config.CurioFees } -func NewSubmitPrecommitTask(sp *SealPoller, db *harmonydb.DB, api SubmitPrecommitTaskApi, sender *message.Sender, as *multictladdr.MultiAddressSelector, maxFee types.FIL, CollateralFromMinerBalance, DisableCollateralFallback bool) *SubmitPrecommitTask { +func NewSubmitPrecommitTask(sp *SealPoller, db *harmonydb.DB, api SubmitPrecommitTaskApi, sender *message.Sender, as *multictladdr.MultiAddressSelector, cfg *config.CurioConfig) *SubmitPrecommitTask { return &SubmitPrecommitTask{ sp: sp, db: db, api: api, sender: sender, as: as, - - maxFee: maxFee, - CollateralFromMinerBalance: CollateralFromMinerBalance, - DisableCollateralFallback: DisableCollateralFallback, + feeCfg: &cfg.Fees, } } @@ -105,54 +102,110 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo err = s.db.Select(ctx, §orParamsArr, ` SELECT sp_id, sector_number, reg_seal_proof, user_sector_duration_epochs, ticket_epoch, tree_r_cid, tree_d_cid FROM sectors_sdr_pipeline - WHERE task_id_precommit_msg = $1`, taskID) + WHERE task_id_precommit_msg = $1 ORDER BY sector_number ASC`, taskID) if err != nil { return false, xerrors.Errorf("getting sector params: %w", err) } - if len(sectorParamsArr) != 1 { - return false, xerrors.Errorf("expected 1 sector params, got %d", len(sectorParamsArr)) + if len(sectorParamsArr) == 0 { + return false, xerrors.Errorf("expected at least 1 sector params, got 0") } - sectorParams := sectorParamsArr[0] - maddr, err := address.NewIDAddress(uint64(sectorParams.SpID)) - if err != nil { - return false, xerrors.Errorf("getting miner address: %w", err) - } + //if s.batching { + // if len(sectorParamsArr) != 1 { + // return false, xerrors.Errorf("expected 1 sector params, got %d", len(sectorParamsArr)) + // } + //} else { + // + //} - sealedCID, err := cid.Parse(sectorParams.SealedCID) + head, err := s.api.ChainHead(ctx) if err != nil { - return false, xerrors.Errorf("parsing sealed CID: %w", err) + return false, xerrors.Errorf("getting chain head: %w", err) } - unsealedCID, err := cid.Parse(sectorParams.UnsealedCID) + maddr, err := address.NewIDAddress(uint64(sectorParamsArr[0].SpID)) if err != nil { - return false, xerrors.Errorf("parsing unsealed CID: %w", err) + return false, xerrors.Errorf("getting miner address: %w", err) } - // 2. Prepare message params + proof := sectorParamsArr[0].RegSealProof - head, err := s.api.ChainHead(ctx) + nv, err := s.api.StateNetworkVersion(ctx, types.EmptyTSK) if err != nil { - return false, xerrors.Errorf("getting chain head: %w", err) + return false, xerrors.Errorf("getting network version: %w", err) + } + av, err := actorstypes.VersionForNetwork(nv) + if err != nil { + return false, xerrors.Errorf("failed to get actors version: %w", err) + } + msd, err := policy.GetMaxProveCommitDuration(av, proof) + if err != nil { + return false, xerrors.Errorf("failed to get max prove commit duration: %w", err) } + //never commit P2 message before, check ticket expiration + ticketEarliest := head.Height() - policy.MaxPreCommitRandomnessLookback + params := miner.PreCommitSectorBatchParams2{} + collateral := big.Zero() - expiration := sectorParams.TicketEpoch + miner12.MaxSectorExpirationExtension - if sectorParams.UserSectorDurationEpochs != nil { - expiration = sectorParams.TicketEpoch + abi.ChainEpoch(*sectorParams.UserSectorDurationEpochs) - } + // 2. Prepare preCommit info and PreCommitSectorBatchParams + for _, sectorParams := range sectorParamsArr { + sectorParams := sectorParams - params.Sectors = append(params.Sectors, miner.SectorPreCommitInfo{ - SealProof: sectorParams.RegSealProof, - SectorNumber: abi.SectorNumber(sectorParams.SectorNumber), - SealedCID: sealedCID, - SealRandEpoch: sectorParams.TicketEpoch, - Expiration: expiration, - }) + // Check miner ID is same for all sectors in batch + tmpMaddr, err := address.NewIDAddress(uint64(sectorParams.SpID)) + if err != nil { + return false, xerrors.Errorf("getting miner address: %w", err) + } + + if maddr != tmpMaddr { + return false, xerrors.Errorf("expected miner IDs to be same (%s) for all sectors in a batch but found %s", maddr.String(), tmpMaddr.String()) + } + + // Check proof types is same for all sectors in batch + if sectorParams.RegSealProof != proof { + return false, xerrors.Errorf("expected proofs type to be same (%d) for all sectors in a batch but found %d", proof, sectorParams.RegSealProof) + } + + // Skip sectors where ticket has already expired + if sectorParams.TicketEpoch < ticketEarliest { + _, perr := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline + SET failed = TRUE, failed_at = NOW(), failed_reason = 'precommit-check', failed_reason_msg = $1, task_id_precommit_msg = NULL + WHERE task_id_precommit_msg = $2 AND sp_id = $3 AND sector_number = $4`, + fmt.Sprintf("ticket expired: seal height: %d, head: %d", sectorParams.TicketEpoch+policy.SealRandomnessLookback, head.Height()), + taskID, sectorParams.SpID, sectorParams.SectorNumber) + if perr != nil { + return false, xerrors.Errorf("persisting precommit check error: %w", perr) + } + continue + } + + sealedCID, err := cid.Parse(sectorParams.SealedCID) + if err != nil { + return false, xerrors.Errorf("parsing sealed CID: %w", err) + } + + unsealedCID, err := cid.Parse(sectorParams.UnsealedCID) + if err != nil { + return false, xerrors.Errorf("parsing unsealed CID: %w", err) + } + + // 2. Prepare message params + + param := miner.SectorPreCommitInfo{ + SealedCID: sealedCID, + SealProof: sectorParams.RegSealProof, + SectorNumber: abi.SectorNumber(sectorParams.SectorNumber), + SealRandEpoch: sectorParams.TicketEpoch, + } + + expiration := sectorParams.TicketEpoch + miner12.MaxSectorExpirationExtension + if sectorParams.UserSectorDurationEpochs != nil { + expiration = sectorParams.TicketEpoch + abi.ChainEpoch(*sectorParams.UserSectorDurationEpochs) + } - { var pieces []struct { PieceIndex int64 `db:"piece_index"` PieceCID string `db:"piece_cid"` @@ -167,15 +220,15 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo piece_size, COALESCE(f05_deal_end_epoch, direct_end_epoch, 0) AS deal_end_epoch, COALESCE(f05_deal_start_epoch, direct_start_epoch, 0) AS deal_start_epoch - FROM sectors_sdr_initial_pieces - WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, sectorParams.SpID, sectorParams.SectorNumber) + FROM sectors_sdr_initial_pieces + WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, sectorParams.SpID, sectorParams.SectorNumber) if err != nil { return false, xerrors.Errorf("getting pieces: %w", err) } if len(pieces) > 0 { var endEpoch abi.ChainEpoch - params.Sectors[0].UnsealedCid = &unsealedCID + param.UnsealedCid = &unsealedCID for _, p := range pieces { if p.DealStartEpoch > 0 && abi.ChainEpoch(p.DealStartEpoch) < head.Height() { // deal start epoch is in the past, can't precommit this sector anymore @@ -191,67 +244,50 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo endEpoch = abi.ChainEpoch(p.DealEndEpoch) } } - if endEpoch != params.Sectors[0].Expiration { - params.Sectors[0].Expiration = endEpoch + if endEpoch != expiration { + expiration = endEpoch } } - } - nv, err := s.api.StateNetworkVersion(ctx, types.EmptyTSK) - if err != nil { - return false, xerrors.Errorf("getting network version: %w", err) - } - av, err := actorstypes.VersionForNetwork(nv) - if err != nil { - return false, xerrors.Errorf("failed to get actors version: %w", err) - } - msd, err := policy.GetMaxProveCommitDuration(av, sectorParams.RegSealProof) - if err != nil { - return false, xerrors.Errorf("failed to get max prove commit duration: %w", err) - } - - if minExpiration := sectorParams.TicketEpoch + policy.MaxPreCommitRandomnessLookback + msd + miner.MinSectorExpiration; params.Sectors[0].Expiration < minExpiration { - params.Sectors[0].Expiration = minExpiration - } + if minExpiration := sectorParams.TicketEpoch + policy.MaxPreCommitRandomnessLookback + msd + miner.MinSectorExpiration; expiration < minExpiration { + expiration = minExpiration + } - // 3. Check precommit + param.Expiration = expiration - { - record, err := s.checkPrecommit(ctx, params) + collateralPerSector, err := s.api.StateMinerPreCommitDepositForPower(ctx, maddr, param, types.EmptyTSK) if err != nil { - if record { - _, perr := s.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET failed = TRUE, failed_at = NOW(), failed_reason = 'precommit-check', failed_reason_msg = $1, task_id_precommit_msg = NULL - WHERE task_id_precommit_msg = $2`, err.Error(), taskID) - if perr != nil { - return false, xerrors.Errorf("persisting precommit check error: %w", perr) - } - } - - return record, xerrors.Errorf("checking precommit: %w", err) + return false, xerrors.Errorf("getting precommit deposit: %w", err) } + + collateral = big.Add(collateral, collateralPerSector) + + params.Sectors = append(params.Sectors, param) } - // 4. Prepare and send message + // 3. Prepare and send message var pbuf bytes.Buffer if err := params.MarshalCBOR(&pbuf); err != nil { return false, xerrors.Errorf("serializing params: %w", err) } - collateral, err := s.api.StateMinerPreCommitDepositForPower(ctx, maddr, params.Sectors[0], types.EmptyTSK) + mi, err := s.api.StateMinerInfo(ctx, maddr, types.EmptyTSK) if err != nil { - return false, xerrors.Errorf("getting precommit deposit: %w", err) + return false, xerrors.Errorf("getting miner info: %w", err) } - mi, err := s.api.StateMinerInfo(ctx, maddr, types.EmptyTSK) + maxFee := s.feeCfg.MaxPreCommitBatchGasFee.FeeForSectors(len(params.Sectors)) + aggFeeRaw, err := policy.AggregatePreCommitNetworkFee(nv, len(params.Sectors), head.MinTicketBlock().ParentBaseFee) if err != nil { - return false, xerrors.Errorf("getting miner info: %w", err) + return false, xerrors.Errorf("getting aggregate precommit network fee: %w", err) } + aggFee := big.Div(big.Mul(aggFeeRaw, big.NewInt(110)), big.NewInt(100)) + needFunds := big.Add(collateral, aggFee) - if s.CollateralFromMinerBalance { - if s.DisableCollateralFallback { - collateral = big.Zero() + if s.feeCfg.CollateralFromMinerBalance { + if s.feeCfg.DisableCollateralFallback { + needFunds = big.Zero() } balance, err := s.api.StateMinerAvailableBalance(ctx, maddr, types.EmptyTSK) if err != nil { @@ -259,13 +295,15 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo return false, xerrors.Errorf("getting miner balance: %w", err) } } - collateral = big.Sub(collateral, balance) - if collateral.LessThan(big.Zero()) { - collateral = big.Zero() + needFunds = big.Sub(needFunds, balance) + if needFunds.LessThan(big.Zero()) { + needFunds = big.Zero() } } - a, _, err := s.as.AddressFor(ctx, s.api, maddr, mi, api.PreCommitAddr, collateral, big.Zero()) + goodFunds := big.Add(maxFee, needFunds) + + a, _, err := s.as.AddressFor(ctx, s.api, maddr, mi, api.PreCommitAddr, goodFunds, collateral) if err != nil { return false, xerrors.Errorf("getting address for precommit: %w", err) } @@ -275,11 +313,11 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo From: a, Method: builtin.MethodsMiner.PreCommitSectorBatch2, Params: pbuf.Bytes(), - Value: collateral, + Value: needFunds, } mss := &api.MessageSendSpec{ - MaxFee: abi.TokenAmount(s.maxFee), + MaxFee: maxFee, } mcid, err := s.sender.Send(ctx, msg, mss, "precommit") @@ -303,29 +341,6 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo return true, nil } -func (s *SubmitPrecommitTask) checkPrecommit(ctx context.Context, params miner.PreCommitSectorBatchParams2) (record bool, err error) { - if len(params.Sectors) != 1 { - return false, xerrors.Errorf("expected 1 sector") - } - - preCommitInfo := params.Sectors[0] - - head, err := s.api.ChainHead(ctx) - if err != nil { - return false, xerrors.Errorf("getting chain head: %w", err) - } - height := head.Height() - - //never commit P2 message before, check ticket expiration - ticketEarliest := height - policy.MaxPreCommitRandomnessLookback - - if preCommitInfo.SealRandEpoch < ticketEarliest { - return true, xerrors.Errorf("ticket expired: seal height: %d, head: %d", preCommitInfo.SealRandEpoch+policy.SealRandomnessLookback, height) - } - - return true, nil -} - func (s *SubmitPrecommitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) { id := ids[0] return &id, nil @@ -334,7 +349,7 @@ func (s *SubmitPrecommitTask) CanAccept(ids []harmonytask.TaskID, engine *harmon func (s *SubmitPrecommitTask) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ Max: taskhelp.Max(1024), - Name: "PreCommitSubmit", + Name: "PreCommitBatch", Cost: resources.Resources{ Cpu: 0, Gpu: 0, diff --git a/tasks/seal/task_treerc.go b/tasks/seal/task_treerc.go index 26a2f57ed..7d76b5463 100644 --- a/tasks/seal/task_treerc.go +++ b/tasks/seal/task_treerc.go @@ -15,7 +15,7 @@ import ( "github.com/filecoin-project/curio/lib/dealdata" ffi2 "github.com/filecoin-project/curio/lib/ffi" "github.com/filecoin-project/curio/lib/paths" - storiface "github.com/filecoin-project/curio/lib/storiface" + "github.com/filecoin-project/curio/lib/storiface" ) type TreeRCTask struct { @@ -92,7 +92,7 @@ func (t *TreeRCTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } n, err := t.db.Exec(ctx, `UPDATE sectors_sdr_pipeline - SET after_tree_r = true, after_tree_c = true, tree_r_cid = $3, task_id_tree_r = NULL, task_id_tree_c = NULL + SET after_tree_r = true, after_tree_c = true, tree_r_cid = $3, task_id_tree_r = NULL, task_id_tree_c = NULL WHERE sp_id = $1 AND sector_number = $2`, sectorParams.SpID, sectorParams.SectorNumber, sealed) if err != nil { diff --git a/tasks/snap/task_submit.go b/tasks/snap/task_submit.go index d0c53e7df..95d5255a3 100644 --- a/tasks/snap/task_submit.go +++ b/tasks/snap/task_submit.go @@ -5,11 +5,12 @@ import ( "context" "encoding/json" "fmt" - "math/rand/v2" + "time" "github.com/ipfs/go-cid" logging "github.com/ipfs/go-log/v2" "go.uber.org/multierr" + "golang.org/x/exp/maps" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" @@ -21,6 +22,7 @@ import ( "github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/exitcode" + "github.com/filecoin-project/curio/build" "github.com/filecoin-project/curio/deps/config" "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/curio/harmony/harmonytask" @@ -60,8 +62,16 @@ type SubmitTaskNodeAPI interface { StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*dline.Info, error) } +type updateBatchingConfig struct { + MaxUpdateBatch int + Slack time.Duration + Timeout time.Duration + BaseFeeThreshold abi.TokenAmount +} + type submitConfig struct { - maxFee types.FIL + batch updateBatchingConfig + feeCfg *config.CurioFees RequireActivationSuccess bool RequireNotificationSuccess bool CollateralFromMinerBalance bool @@ -90,7 +100,13 @@ func NewSubmitTask(db *harmonydb.DB, api SubmitTaskNodeAPI, bstore curiochain.Cu as: as, cfg: submitConfig{ - maxFee: cfg.Fees.MaxCommitGasFee, // todo snap-specific + batch: updateBatchingConfig{ + MaxUpdateBatch: 256, + Slack: time.Duration(cfg.Batching.Update.Slack), + Timeout: time.Duration(cfg.Batching.Update.Timeout), + BaseFeeThreshold: abi.TokenAmount(cfg.Batching.Update.BaseFeeThreshold), + }, + feeCfg: &cfg.Fees, RequireActivationSuccess: cfg.Subsystems.RequireActivationSuccess, RequireNotificationSuccess: cfg.Subsystems.RequireNotificationSuccess, @@ -100,6 +116,11 @@ func NewSubmitTask(db *harmonydb.DB, api SubmitTaskNodeAPI, bstore curiochain.Cu } } +type updateCids struct { + sealed cid.Cid + unsealed cid.Cid +} + func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) { var tasks []struct { SpID int64 `db:"sp_id"` @@ -127,23 +148,13 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("getting sector params: %w", err) } - if len(tasks) != 1 { - return false, xerrors.Errorf("expected 1 sector params, got %d", len(tasks)) + if len(tasks) == 0 { + return false, xerrors.Errorf("expected at least 1 sector params, got 0") } - update := tasks[0] - - var pieces []struct { - Manifest json.RawMessage `db:"direct_piece_activation_manifest"` - Size int64 `db:"piece_size"` - Start int64 `db:"direct_start_epoch"` - } - err = s.db.Select(ctx, &pieces, ` - SELECT direct_piece_activation_manifest, piece_size, direct_start_epoch - FROM sectors_snap_initial_pieces - WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, update.SpID, update.SectorNumber) + maddr, err := address.NewIDAddress(uint64(tasks[0].SpID)) if err != nil { - return false, xerrors.Errorf("getting pieces: %w", err) + return false, xerrors.Errorf("getting miner address: %w", err) } ts, err := s.api.ChainHead(ctx) @@ -151,155 +162,202 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("getting chain head: %w", err) } - maddr, err := address.NewIDAddress(uint64(update.SpID)) + mi, err := s.api.StateMinerInfo(ctx, maddr, types.EmptyTSK) if err != nil { - return false, xerrors.Errorf("parsing miner address: %w", err) + return false, xerrors.Errorf("getting miner info: %w", err) } - snum := abi.SectorNumber(update.SectorNumber) + regProof := tasks[0].RegSealProof + updateProof := tasks[0].UpdateProof - onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key()) - if err != nil { - return false, xerrors.Errorf("getting sector info: %w", err) - } - if onChainInfo == nil { - return false, xerrors.Errorf("sector not found on chain") + params := miner.ProveReplicaUpdates3Params{ + AggregateProof: nil, + UpdateProofsType: abi.RegisteredUpdateProof(updateProof), + AggregateProofType: nil, + RequireActivationSuccess: s.cfg.RequireActivationSuccess, + RequireNotificationSuccess: s.cfg.RequireNotificationSuccess, } - sl, err := s.api.StateSectorPartition(ctx, maddr, snum, types.EmptyTSK) - if err != nil { - return false, xerrors.Errorf("getting sector location: %w", err) - } + collateral := big.Zero() + transferMap := make(map[int64]*updateCids) - // Check that the sector isn't in an immutable deadline (or isn't about to be) - curDl, err := s.api.StateMinerProvingDeadline(ctx, maddr, ts.Key()) - if err != nil { - return false, xerrors.Errorf("getting current proving deadline: %w", err) - } + for _, update := range tasks { + update := update - // Matches actor logic - https://github.com/filecoin-project/builtin-actors/blob/76abc47726bdbd8b478ef10e573c25957c786d1d/actors/miner/src/deadlines.rs#L65 - sectorDl := dline.NewInfo(curDl.PeriodStart, sl.Deadline, curDl.CurrentEpoch, - curDl.WPoStPeriodDeadlines, - curDl.WPoStProvingPeriod, - curDl.WPoStChallengeWindow, - curDl.WPoStChallengeLookback, - curDl.FaultDeclarationCutoff) + // Check miner ID is same for all sectors in batch + tmpMaddr, err := address.NewIDAddress(uint64(update.SpID)) + if err != nil { + return false, xerrors.Errorf("getting miner address: %w", err) + } - sectorDl = sectorDl.NextNotElapsed() - firstImmutableEpoch := sectorDl.Open - curDl.WPoStChallengeWindow - firstUnsafeEpoch := firstImmutableEpoch - ImmutableSubmitGate - lastImmutableEpoch := sectorDl.Close + if maddr != tmpMaddr { + return false, xerrors.Errorf("expected miner IDs to be same (%s) for all sectors in a batch but found %s", maddr.String(), tmpMaddr.String()) + } - if ts.Height() > firstUnsafeEpoch && ts.Height() < lastImmutableEpoch { - closeTime := curiochain.EpochTime(ts, sectorDl.Close) + // Check proof types is same for all sectors in batch + if update.RegSealProof != regProof { + return false, xerrors.Errorf("expected proofs type to be same (%d) for all sectors in a batch but found %d for sector %d of miner %d", regProof, update.RegSealProof, update.SectorNumber, update.SpID) + } - log.Warnw("sector in unsafe window, delaying submit", "sp", update.SpID, "sector", update.SectorNumber, "cur_dl", curDl, "sector_dl", sectorDl, "close_time", closeTime) + if update.UpdateProof != updateProof { + return false, xerrors.Errorf("expected registered proofs type to be same (%d) for all sectors in a batch but found %d for sector %d of miner %d", updateProof, update.UpdateProof, update.SectorNumber, update.SpID) + } - _, err := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET - task_id_submit = NULL, after_submit = FALSE, submit_after = $1 - WHERE sp_id = $2 AND sector_number = $3`, closeTime, update.SpID, update.SectorNumber) + var pieces []struct { + Manifest json.RawMessage `db:"direct_piece_activation_manifest"` + Size int64 `db:"piece_size"` + Start int64 `db:"direct_start_epoch"` + } + err = s.db.Select(ctx, &pieces, ` + SELECT direct_piece_activation_manifest, piece_size, direct_start_epoch + FROM sectors_snap_initial_pieces + WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, update.SpID, update.SectorNumber) if err != nil { - return false, xerrors.Errorf("updating sector params: %w", err) + return false, xerrors.Errorf("getting pieces: %w", err) } - return true, nil - } - if ts.Height() >= lastImmutableEpoch { - // the deadline math shouldn't allow this to ever happen, buuut just in case the math is wrong we also check the - // upper bound of the proving window - // (should never happen because if the current epoch is at deadline Close, NextNotElapsed will give us the next deadline) - log.Errorw("sector in somehow past immutable window", "sp", update.SpID, "sector", update.SectorNumber, "cur_dl", curDl, "sector_dl", sectorDl) - } + snum := abi.SectorNumber(update.SectorNumber) - // Process pieces, prepare PAMs - var pams []miner.PieceActivationManifest - var minStart abi.ChainEpoch - var verifiedSize int64 - for _, piece := range pieces { - var pam *miner.PieceActivationManifest - err = json.Unmarshal(piece.Manifest, &pam) + onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key()) if err != nil { - return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err) + return false, xerrors.Errorf("getting sector info: %w", err) + } + if onChainInfo == nil { + return false, xerrors.Errorf("sector not found on chain") } - unrecoverable, err := seal.AllocationCheck(ctx, s.api, pam, onChainInfo.Expiration, abi.ActorID(update.SpID), ts) + if onChainInfo.SealProof != abi.RegisteredSealProof(regProof) { + return false, xerrors.Errorf("Proof mismatch between on chain %d and local database %d for sector %d of miner %d", onChainInfo.SealProof, regProof, update.SectorNumber, update.SpID) + } + + sl, err := s.api.StateSectorPartition(ctx, maddr, snum, types.EmptyTSK) if err != nil { - if unrecoverable { - _, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET + return false, xerrors.Errorf("getting sector location: %w", err) + } + + // Check that the sector isn't in an immutable deadline (or isn't about to be) + curDl, err := s.api.StateMinerProvingDeadline(ctx, maddr, ts.Key()) + if err != nil { + return false, xerrors.Errorf("getting current proving deadline: %w", err) + } + + // Matches actor logic - https://github.com/filecoin-project/builtin-actors/blob/76abc47726bdbd8b478ef10e573c25957c786d1d/actors/miner/src/deadlines.rs#L65 + sectorDl := dline.NewInfo(curDl.PeriodStart, sl.Deadline, curDl.CurrentEpoch, + curDl.WPoStPeriodDeadlines, + curDl.WPoStProvingPeriod, + curDl.WPoStChallengeWindow, + curDl.WPoStChallengeLookback, + curDl.FaultDeclarationCutoff) + + sectorDl = sectorDl.NextNotElapsed() + firstImmutableEpoch := sectorDl.Open - curDl.WPoStChallengeWindow + firstUnsafeEpoch := firstImmutableEpoch - ImmutableSubmitGate + lastImmutableEpoch := sectorDl.Close + + if ts.Height() > firstUnsafeEpoch && ts.Height() < lastImmutableEpoch { + closeTime := curiochain.EpochTime(ts, sectorDl.Close) + + log.Warnw("sector in unsafe window, delaying submit", "sp", update.SpID, "sector", update.SectorNumber, "cur_dl", curDl, "sector_dl", sectorDl, "close_time", closeTime) + + _, err := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET + task_id_submit = NULL, after_submit = FALSE, submit_after = $1 + WHERE sp_id = $2 AND sector_number = $3`, closeTime, update.SpID, update.SectorNumber) + if err != nil { + return false, xerrors.Errorf("updating sector params: %w", err) + } + + continue // Skip this sector + } + if ts.Height() >= lastImmutableEpoch { + // the deadline math shouldn't allow this to ever happen, buuut just in case the math is wrong we also check the + // upper bound of the proving window + // (should never happen because if the current epoch is at deadline Close, NextNotElapsed will give us the next deadline) + log.Errorw("sector in somehow past immutable window", "sp", update.SpID, "sector", update.SectorNumber, "cur_dl", curDl, "sector_dl", sectorDl) + } + + // Process pieces, prepare PAMs + var pams []miner.PieceActivationManifest + var verifiedSize int64 + pieceCheckFailed := false + for _, piece := range pieces { + var pam *miner.PieceActivationManifest + err = json.Unmarshal(piece.Manifest, &pam) + if err != nil { + return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err) + } + unrecoverable, err := seal.AllocationCheck(ctx, s.api, pam, onChainInfo.Expiration, abi.ActorID(update.SpID), ts) + if err != nil { + if unrecoverable { + _, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET failed = TRUE, failed_at = NOW(), failed_reason = 'alloc-check', failed_reason_msg = $1, task_id_submit = NULL, after_submit = FALSE WHERE sp_id = $2 AND sector_number = $3`, err.Error(), update.SpID, update.SectorNumber) - log.Errorw("allocation check failed with an unrecoverable issue", "sp", update.SpID, "sector", update.SectorNumber, "err", err) - return true, xerrors.Errorf("allocation check failed with an unrecoverable issue: %w", multierr.Combine(err, err2)) + log.Errorw("allocation check failed with an unrecoverable issue", "sp", update.SpID, "sector", update.SectorNumber, "err", err) + return true, xerrors.Errorf("allocation check failed with an unrecoverable issue: %w", multierr.Combine(err, err2)) + } + + pieceCheckFailed = true + break } - return false, err - } + if pam.VerifiedAllocationKey != nil { + verifiedSize += piece.Size + } - if pam.VerifiedAllocationKey != nil { - verifiedSize += piece.Size + pams = append(pams, *pam) } - if minStart == 0 || abi.ChainEpoch(piece.Start) < minStart { - minStart = abi.ChainEpoch(piece.Start) + if pieceCheckFailed { + continue // Skip this sector } - pams = append(pams, *pam) - } + newSealedCID, err := cid.Parse(update.UpdateSealedCID) + if err != nil { + return false, xerrors.Errorf("parsing new sealed cid: %w", err) + } + newUnsealedCID, err := cid.Parse(update.UpdateUnsealedCID) + if err != nil { + return false, xerrors.Errorf("parsing new unsealed cid: %w", err) + } - newSealedCID, err := cid.Parse(update.UpdateSealedCID) - if err != nil { - return false, xerrors.Errorf("parsing new sealed cid: %w", err) - } - newUnsealedCID, err := cid.Parse(update.UpdateUnsealedCID) - if err != nil { - return false, xerrors.Errorf("parsing new unsealed cid: %w", err) - } + transferMap[update.SectorNumber] = &updateCids{ + sealed: newSealedCID, + unsealed: newUnsealedCID, + } - // Prepare params - params := miner.ProveReplicaUpdates3Params{ - SectorUpdates: []miner13.SectorUpdateManifest{ - { - Sector: snum, - Deadline: sl.Deadline, - Partition: sl.Partition, - NewSealedCID: newSealedCID, - Pieces: pams, - }, - }, - SectorProofs: [][]byte{update.Proof}, - AggregateProof: nil, - UpdateProofsType: abi.RegisteredUpdateProof(update.UpdateProof), - AggregateProofType: nil, - RequireActivationSuccess: s.cfg.RequireActivationSuccess, - RequireNotificationSuccess: s.cfg.RequireNotificationSuccess, - } + ssize, err := onChainInfo.SealProof.SectorSize() + if err != nil { + return false, xerrors.Errorf("getting sector size: %w", err) + } - enc := new(bytes.Buffer) - if err := params.MarshalCBOR(enc); err != nil { - return false, xerrors.Errorf("could not serialize commit params: %w", err) - } + duration := onChainInfo.Expiration - ts.Height() - mi, err := s.api.StateMinerInfo(ctx, maddr, types.EmptyTSK) - if err != nil { - return false, xerrors.Errorf("getting miner info: %w", err) - } + secCollateral, err := s.api.StateMinerInitialPledgeForSector(ctx, duration, ssize, uint64(verifiedSize), ts.Key()) + if err != nil { + return false, xerrors.Errorf("calculating pledge: %w", err) + } - ssize, err := onChainInfo.SealProof.SectorSize() - if err != nil { - return false, xerrors.Errorf("getting sector size: %w", err) - } + secCollateral = big.Sub(secCollateral, onChainInfo.InitialPledge) + if secCollateral.LessThan(big.Zero()) { + secCollateral = big.Zero() + } - duration := onChainInfo.Expiration - ts.Height() + collateral = big.Add(collateral, secCollateral) - collateral, err := s.api.StateMinerInitialPledgeForSector(ctx, duration, ssize, uint64(verifiedSize), ts.Key()) - if err != nil { - return false, xerrors.Errorf("calculating pledge: %w", err) + // Prepare params + params.SectorUpdates = append(params.SectorUpdates, miner13.SectorUpdateManifest{ + Sector: snum, + Deadline: sl.Deadline, + Partition: sl.Partition, + NewSealedCID: newSealedCID, + Pieces: pams, + }) + params.SectorProofs = append(params.SectorProofs, update.Proof) } - collateral = big.Sub(collateral, onChainInfo.InitialPledge) - if collateral.LessThan(big.Zero()) { - collateral = big.Zero() + enc := new(bytes.Buffer) + if err := params.MarshalCBOR(enc); err != nil { + return false, xerrors.Errorf("could not serialize commit params: %w", err) } if s.cfg.CollateralFromMinerBalance { @@ -318,6 +376,8 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done } } + maxFee := s.cfg.feeCfg.MaxUpdateBatchGasFee.FeeForSectors(len(params.SectorUpdates)) + a, _, err := s.as.AddressFor(ctx, s.api, maddr, mi, api.CommitAddr, collateral, big.Zero()) if err != nil { return false, xerrors.Errorf("getting address for precommit: %w", err) @@ -328,27 +388,16 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done From: a, Method: builtin.MethodsMiner.ProveReplicaUpdates3, Params: enc.Bytes(), - Value: collateral, // todo config for pulling from miner balance!! + Value: collateral, } mss := &api.MessageSendSpec{ - MaxFee: abi.TokenAmount(s.cfg.maxFee), + MaxFee: maxFee, } mcid, err := s.sender.Send(ctx, msg, mss, "update") if err != nil { - if minStart != 0 && ts.Height() > minStart { - _, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET - failed = TRUE, failed_at = NOW(), failed_reason = 'start-expired', failed_reason_msg = $1, - task_id_submit = NULL, after_submit = FALSE - WHERE sp_id = $2 AND sector_number = $3`, err.Error(), update.SpID, update.SectorNumber) - - log.Errorw("failed to push message to mpool (beyond deal start epoch)", "sp", update.SpID, "sector", update.SectorNumber, "err", err) - - return true, xerrors.Errorf("pushing message to mpool (beyond deal start epoch): %w", multierr.Combine(err, err2)) - } - - return false, xerrors.Errorf("pushing message to mpool (minStart %d, timeTo %d): %w", minStart, minStart-ts.Height(), err) + return false, xerrors.Errorf("pushing message to mpool: %w", err) } _, err = s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET prove_msg_cid = $1, task_id_submit = NULL, after_submit = TRUE WHERE task_id_submit = $2`, mcid.String(), taskID) @@ -361,18 +410,38 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("inserting into message_waits: %w", err) } - if err := s.transferUpdatedSectorData(ctx, update.SpID, update.SectorNumber, newUnsealedCID, newSealedCID, mcid); err != nil { + if err := s.transferUpdatedSectorData(ctx, tasks[0].SpID, transferMap, mcid); err != nil { return false, xerrors.Errorf("updating sector meta: %w", err) } return true, nil } -func (s *SubmitTask) transferUpdatedSectorData(ctx context.Context, spID, sectorNum int64, newUns, newSl, mcid cid.Cid) error { - if _, err := s.db.Exec(ctx, `UPDATE sectors_meta SET cur_sealed_cid = $1, +func (s *SubmitTask) transferUpdatedSectorData(ctx context.Context, spID int64, transferMap map[int64]*updateCids, mcid cid.Cid) error { + + commit, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { + for sectorNum, cids := range transferMap { + sectorNum, cids := sectorNum, cids + n, err := tx.Exec(`UPDATE sectors_meta SET cur_sealed_cid = $1, cur_unsealed_cid = $2, msg_cid_update = $3 - WHERE sp_id = $4 AND sector_num = $5`, newSl.String(), newUns.String(), mcid.String(), spID, sectorNum); err != nil { - return xerrors.Errorf("updating sector meta: %w", err) + WHERE sp_id = $4 AND sector_num = $5`, cids.sealed.String(), cids.unsealed.String(), mcid.String(), spID, sectorNum) + + if err != nil { + return false, xerrors.Errorf("updating sector meta: %w", err) + } + if n != 1 { + return false, xerrors.Errorf("updating sector meta: expected to update 1 row, but updated %d rows", n) + } + } + return true, nil + }, harmonydb.OptionRetry()) + + if err != nil { + return err + } + + if !commit { + return xerrors.Errorf("updating sector meta: transaction failed") } // Execute the query for piece metadata @@ -408,7 +477,7 @@ func (s *SubmitTask) transferUpdatedSectorData(ctx context.Context, spID, sector sectors_snap_initial_pieces WHERE sp_id = $1 AND - sector_number = $2 + sector_number = ANY($2) ON CONFLICT (sp_id, sector_num, piece_num) DO UPDATE SET piece_cid = excluded.piece_cid, piece_size = excluded.piece_size, @@ -419,7 +488,7 @@ func (s *SubmitTask) transferUpdatedSectorData(ctx context.Context, spID, sector f05_deal_id = excluded.f05_deal_id, ddo_pam = excluded.ddo_pam, f05_deal_proposal = excluded.f05_deal_proposal; - `, spID, sectorNum); err != nil { + `, spID, maps.Keys(transferMap)); err != nil { return fmt.Errorf("failed to insert/update sector_meta_pieces: %w", err) } @@ -433,7 +502,7 @@ func (s *SubmitTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.Tas func (s *SubmitTask) TypeDetails() harmonytask.TaskTypeDetails { return harmonytask.TaskTypeDetails{ - Name: "UpdateSubmit", + Name: "BatchUpdateSubmit", Cost: resources.Resources{ Cpu: 1, Ram: 64 << 20, @@ -447,42 +516,147 @@ func (s *SubmitTask) TypeDetails() harmonytask.TaskTypeDetails { func (s *SubmitTask) schedule(ctx context.Context, taskFunc harmonytask.AddTaskFunc) error { // schedule submits - var stop bool - for !stop { - taskFunc(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { - stop = true // assume we're done until we find a task to schedule - - var tasks []struct { - SpID int64 `db:"sp_id"` - SectorNumber int64 `db:"sector_number"` - } + taskFunc(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) { + var cmt bool + type task struct { + SpID int64 `db:"sp_id"` + SectorNumber int64 `db:"sector_number"` + UpgradeProof int `db:"upgrade_proof"` + StartEpoch abi.ChainEpoch `db:"smallest_direct_start_epoch"` + UpdateReadyAt *time.Time `db:"update_ready_at"` + } - err := tx.Select(&tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE failed = FALSE - AND after_encode = TRUE - AND after_prove = TRUE - AND after_submit = FALSE - AND (submit_after IS NULL OR submit_after < NOW()) - AND task_id_submit IS NULL`) - if err != nil { - return false, xerrors.Errorf("getting tasks: %w", err) - } + type sectorBatch struct { + cutoff abi.ChainEpoch + earliest time.Time + sectors []int64 + } - if len(tasks) == 0 { - return false, nil - } + var tasks []task + + err := tx.Select(&tasks, `SELECT + ssp.sp_id, + ssp.sector_number, + ssp.upgrade_proof, + ssp.update_ready_at, + MIN(ssip.direct_start_epoch) AS smallest_direct_start_epoch + FROM + sectors_snap_pipeline ssp + JOIN + sectors_snap_initial_pieces ssip + ON + ssp.sp_id = ssip.sp_id AND ssp.sector_number = ssip.sector_number + WHERE + ssp.failed = FALSE + AND ssp.after_encode = TRUE + AND ssp.after_prove = TRUE + AND ssp.after_submit = FALSE + AND (ssp.submit_after IS NULL OR ssp.submit_after < NOW()) + AND ssp.task_id_submit IS NULL + GROUP BY + ssp.sp_id, ssp.sector_number, ssp.upgrade_proof + ORDER BY + ssp.sector_number ASC;`) + if err != nil { + return false, xerrors.Errorf("getting tasks: %w", err) + } - // pick at random in case there are a bunch of schedules across the cluster - t := tasks[rand.N(len(tasks))] + if len(tasks) == 0 { + return false, nil + } - _, err = tx.Exec(`UPDATE sectors_snap_pipeline SET task_id_submit = $1, submit_after = NULL WHERE sp_id = $2 AND sector_number = $3`, id, t.SpID, t.SectorNumber) - if err != nil { - return false, xerrors.Errorf("updating task id: %w", err) + // Make batches based on Proof types + ts, err := s.api.ChainHead(ctx) + if err != nil { + log.Errorf("error getting chain head: %s", err) + return + } + + batchMap := make(map[int64]map[abi.RegisteredUpdateProof][]task) + for i := range tasks { + // Check if SpID exists in batchMap + v, ok := batchMap[tasks[i].SpID] + if !ok { + // If not, initialize a new map for the RegisteredSealProof + v = make(map[abi.RegisteredUpdateProof][]task) + batchMap[tasks[i].SpID] = v } + // Append the task to the correct RegisteredSealProof + v[abi.RegisteredUpdateProof(tasks[i].UpgradeProof)] = append(v[abi.RegisteredUpdateProof(tasks[i].UpgradeProof)], tasks[i]) + } - stop = false // we found a task to schedule, keep going - return true, nil - }) - } + // Send batches per MinerID and per Proof type based on the following logic: + // 1. Check if Slack for any sector is reaching, if yes then send full batch + // 2. Check if timeout is reaching for any sector in the batch, if yes, then send the batch + // 3. Check if baseFee below set threshold. If yes then send all batches + + for spid, miners := range batchMap { + for _, pts := range miners { + // Break into batches + var batches []sectorBatch + for i := 0; i < len(pts); i += s.cfg.batch.MaxUpdateBatch { + // Create a batch of size `maxBatchSize` or smaller for the last batch + end := i + s.cfg.batch.MaxUpdateBatch + if end > len(pts) { + end = len(pts) + } + var batch []int64 + cutoff := abi.ChainEpoch(0) + earliest := time.Now() + for _, pt := range pts[i:end] { + + if cutoff == 0 || pt.StartEpoch < cutoff { + cutoff = pt.StartEpoch + } + + if pt.UpdateReadyAt.Before(earliest) { + earliest = *pt.UpdateReadyAt + } + + batch = append(batch, pt.SectorNumber) + } + + batches = append(batches, sectorBatch{ + cutoff: cutoff, + sectors: batch, + }) + } + + for i := range batches { + batch := batches[i] + //sectors := batch.sectors + // Process batch if slack has reached + if (time.Duration(batch.cutoff-ts.Height()) * time.Duration(build.BlockDelaySecs) * time.Second) < s.cfg.batch.Slack { + _, err = tx.Exec(`UPDATE sectors_snap_pipeline SET task_id_submit = $1, submit_after = NULL WHERE sp_id = $2 AND sector_number = $3`, id, spid, batch.sectors) + if err != nil { + return false, xerrors.Errorf("updating task id: %w", err) + } + cmt = true + continue + } + // Process batch if timeout has reached + if batch.earliest.Add(s.cfg.batch.Timeout).After(time.Now()) { + _, err = tx.Exec(`UPDATE sectors_snap_pipeline SET task_id_submit = $1, submit_after = NULL WHERE sp_id = $2 AND sector_number = $3`, id, spid, batch.sectors) + if err != nil { + return false, xerrors.Errorf("updating task id: %w", err) + } + cmt = true + continue + } + // Process batch if base fee is low enough for us to send + if ts.MinTicketBlock().ParentBaseFee.LessThan(s.cfg.batch.BaseFeeThreshold) { + _, err = tx.Exec(`UPDATE sectors_snap_pipeline SET task_id_submit = $1, submit_after = NULL WHERE sp_id = $2 AND sector_number = $3`, id, spid, batch.sectors) + if err != nil { + return false, xerrors.Errorf("updating task id: %w", err) + } + cmt = true + continue + } + } + } + } + return cmt, nil + }) // update landed var tasks []struct {