diff --git a/market/deal_ingest_seal.go b/market/deal_ingest_seal.go index 2c6fd393a..039965cee 100644 --- a/market/deal_ingest_seal.go +++ b/market/deal_ingest_seal.go @@ -45,6 +45,7 @@ type PieceIngesterApi interface { StateGetAllocationForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*verifregtypes.Allocation, error) StateGetAllocationIdForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (verifregtypes.AllocationId, error) StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error) + StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error) } type openSector struct { diff --git a/market/deal_ingest_snap.go b/market/deal_ingest_snap.go index 6a4a6ef0d..6d3b4b0c6 100644 --- a/market/deal_ingest_snap.go +++ b/market/deal_ingest_snap.go @@ -236,6 +236,11 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add piece.PublishCid = nil } + head, err := p.api.ChainHead(ctx) + if err != nil { + return api.SectorOffset{}, xerrors.Errorf("getting chain head: %w", err) + } + var maxExpiration int64 vd.isVerified = piece.PieceActivationManifest.VerifiedAllocationKey != nil if vd.isVerified { @@ -253,7 +258,7 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add vd.tmin = alloc.TermMin vd.tmax = alloc.TermMax - maxExpiration = int64(piece.DealSchedule.EndEpoch + alloc.TermMax) + maxExpiration = int64(head.Height() + alloc.TermMax) } propJson, err = json.Marshal(piece.PieceActivationManifest) if err != nil { @@ -309,16 +314,47 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add } if len(candidates) == 0 { - return false, xerrors.Errorf("no suitable sectors found") + minEpoch := piece.DealSchedule.EndEpoch + maxEpoch := abi.ChainEpoch(maxExpiration) + + minEpochDays := (minEpoch - head.Height()) / builtin.EpochsInDay + maxEpochDays := (maxEpoch - head.Height()) / builtin.EpochsInDay + + return false, xerrors.Errorf("no suitable sectors found, minEpoch: %d, maxEpoch: %d, minExpirationDays: %d, maxExpirationDays: %d", minEpoch, maxEpoch, minEpochDays, maxEpochDays) } // todo - nice to have: - // * double check the sector expiration // * check sector liveness // * check deadline mutable candidate := candidates[0] // this one works best + si, err := p.api.StateSectorGetInfo(ctx, p.miner, abi.SectorNumber(candidate.Sector), types.EmptyTSK) + if err != nil { + return false, xerrors.Errorf("getting sector info: %w", err) + } + + sectorLifeTime := si.Expiration - head.Height() + if sectorLifeTime < 0 { + return false, xerrors.Errorf("sector lifetime is negative!?") + } + if piece.DealSchedule.EndEpoch > si.Expiration { + return false, xerrors.Errorf("sector expiration is too soon: %d < %d", si.Expiration, piece.DealSchedule.EndEpoch) + } + if maxExpiration != 0 && si.Expiration > abi.ChainEpoch(maxExpiration) { + return false, xerrors.Errorf("sector expiration is too late: %d > %d", si.Expiration, maxExpiration) + } + + // info log detailing EVERYTHING including all the epoch bounds + log.Infow("allocating piece to sector", + "sector", candidate.Sector, + "expiration", si.Expiration, + "sectorLifeTime", sectorLifeTime, + "dealStartEpoch", piece.DealSchedule.StartEpoch, + "dealEndEpoch", piece.DealSchedule.EndEpoch, + "maxExpiration", maxExpiration, + ) + _, err = tx.Exec(`SELECT insert_snap_ddo_piece($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`, p.mid, candidate.Sector, 0, piece.PieceActivationManifest.CID, piece.PieceActivationManifest.Size, @@ -358,6 +394,11 @@ func (p *PieceIngesterSnap) allocateToExisting(ctx context.Context, piece lpiece var allocated bool var rerr error + head, err := p.api.ChainHead(ctx) + if err != nil { + return false, api.SectorOffset{}, xerrors.Errorf("getting chain head: %w", err) + } + comm, err := p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { openSectors, err := p.getOpenSectors(tx) if err != nil { @@ -368,14 +409,23 @@ func (p *PieceIngesterSnap) allocateToExisting(ctx context.Context, piece lpiece sec := sec if sec.currentSize+psize <= abi.PaddedPieceSize(p.sectorSize) { if vd.isVerified { - sectorLifeTime := sec.latestEndEpoch - sec.earliestStartEpoch + si, err := p.api.StateSectorGetInfo(ctx, p.miner, sec.number, types.EmptyTSK) + if err != nil { + log.Errorw("getting sector info", "error", err, "sector", sec.number, "miner", p.miner) + continue + } + + sectorLifeTime := si.Expiration - head.Height() + if sectorLifeTime < 0 { + log.Errorw("sector lifetime is negative", "sector", sec.number, "miner", p.miner, "lifetime", sectorLifeTime) + continue + } + // Allocation's TMin must fit in sector and TMax should be at least sector lifetime or more // Based on https://github.com/filecoin-project/builtin-actors/blob/a0e34d22665ac8c84f02fea8a099216f29ffaeeb/actors/verifreg/src/lib.rs#L1071-L1086 if sectorLifeTime <= vd.tmin && sectorLifeTime >= vd.tmax { continue } - - // //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// TODO ADD SNAP SECTOR EXP CHECKS } ret.Sector = sec.number diff --git a/tasks/seal/task_submit_commit.go b/tasks/seal/task_submit_commit.go index 5211a33fe..aa4b82519 100644 --- a/tasks/seal/task_submit_commit.go +++ b/tasks/seal/task_submit_commit.go @@ -217,7 +217,7 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) if err != nil { return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err) } - err = AllocationCheck(ctx, s.api, pam, pci, abi.ActorID(sectorParams.SpID), ts) + _, err = AllocationCheck(ctx, s.api, pam, pci.Info.Expiration, abi.ActorID(sectorParams.SpID), ts) if err != nil { return false, err } @@ -428,41 +428,44 @@ type AllocNodeApi interface { StateGetAllocation(ctx context.Context, clientAddr address.Address, allocationId verifregtypes9.AllocationId, tsk types.TipSetKey) (*verifregtypes9.Allocation, error) } -func AllocationCheck(ctx context.Context, api AllocNodeApi, piece *miner.PieceActivationManifest, precomitInfo *miner.SectorPreCommitOnChainInfo, miner abi.ActorID, ts *types.TipSet) error { +func AllocationCheck(ctx context.Context, api AllocNodeApi, piece *miner.PieceActivationManifest, expiration abi.ChainEpoch, miner abi.ActorID, ts *types.TipSet) (permanent bool, err error) { // skip pieces not claiming an allocation if piece.VerifiedAllocationKey == nil { - return nil + return false, nil } addr, err := address.NewIDAddress(uint64(piece.VerifiedAllocationKey.Client)) if err != nil { - return err + return false, err } alloc, err := api.StateGetAllocation(ctx, addr, verifregtypes9.AllocationId(piece.VerifiedAllocationKey.ID), ts.Key()) if err != nil { - return err + return false, err } if alloc == nil { - return xerrors.Errorf("no allocation found for piece %s with allocation ID %d", piece.CID.String(), piece.VerifiedAllocationKey.ID) + return true, xerrors.Errorf("no allocation found for piece %s with allocation ID %d", piece.CID.String(), piece.VerifiedAllocationKey.ID) } if alloc.Provider != miner { - return xerrors.Errorf("provider id mismatch for piece %s: expected %d and found %d", piece.CID.String(), miner, alloc.Provider) + return true, xerrors.Errorf("provider id mismatch for piece %s: expected %d and found %d", piece.CID.String(), miner, alloc.Provider) } if alloc.Size != piece.Size { - return xerrors.Errorf("size mismatch for piece %s: expected %d and found %d", piece.CID.String(), piece.Size, alloc.Size) + return true, xerrors.Errorf("size mismatch for piece %s: expected %d and found %d", piece.CID.String(), piece.Size, alloc.Size) } - if precomitInfo == nil { - return nil - } - if precomitInfo.Info.Expiration < ts.Height()+alloc.TermMin { - return xerrors.Errorf("sector expiration %d is before than allocation TermMin %d for piece %s", precomitInfo.Info.Expiration, ts.Height()+alloc.TermMin, piece.CID.String()) + if expiration < ts.Height()+alloc.TermMin { + tooLittleBy := ts.Height() + alloc.TermMin - expiration + + return true, xerrors.Errorf("sector expiration %d is before than allocation TermMin %d for piece %s (should be at least %d epochs more)", expiration, ts.Height()+alloc.TermMin, piece.CID.String(), tooLittleBy) } - if precomitInfo.Info.Expiration > ts.Height()+alloc.TermMax { - return xerrors.Errorf("sector expiration %d is later than allocation TermMax %d for piece %s", precomitInfo.Info.Expiration, ts.Height()+alloc.TermMax, piece.CID.String()) + if expiration > ts.Height()+alloc.TermMax { + tooMuchBy := expiration - (ts.Height() + alloc.TermMax) + + return true, xerrors.Errorf("sector expiration %d is later than allocation TermMax %d for piece %s (should be at least %d epochs less)", expiration, ts.Height()+alloc.TermMax, piece.CID.String(), tooMuchBy) } - return nil + log.Infow("allocation check details", "piece", piece.CID.String(), "client", alloc.Client, "provider", alloc.Provider, "size", alloc.Size, "term_min", alloc.TermMin, "term_max", alloc.TermMax, "sector_expiration", expiration) + + return false, nil } var _ harmonytask.TaskInterface = &SubmitCommitTask{} diff --git a/tasks/snap/task_submit.go b/tasks/snap/task_submit.go index 3744b5e1c..10b0d47e1 100644 --- a/tasks/snap/task_submit.go +++ b/tasks/snap/task_submit.go @@ -145,18 +145,42 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("getting chain head: %w", err) } + maddr, err := address.NewIDAddress(uint64(update.SpID)) + if err != nil { + return false, xerrors.Errorf("parsing miner address: %w", err) + } + + snum := abi.SectorNumber(update.SectorNumber) + + onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key()) + if err != nil { + return false, xerrors.Errorf("getting sector info: %w", err) + } + if onChainInfo == nil { + return false, xerrors.Errorf("sector not found on chain") + } + var pams []miner.PieceActivationManifest var weight, weightVerif = big.Zero(), big.Zero() var minStart abi.ChainEpoch - for _, piece := range pieces { var pam *miner.PieceActivationManifest err = json.Unmarshal(piece.Manifest, &pam) if err != nil { return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err) } - err = seal.AllocationCheck(ctx, s.api, pam, nil, abi.ActorID(update.SpID), ts) + unrecoverable, err := seal.AllocationCheck(ctx, s.api, pam, onChainInfo.Expiration, abi.ActorID(update.SpID), ts) if err != nil { + if unrecoverable { + _, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET + failed = TRUE, failed_at = NOW(), failed_reason = 'alloc-check', failed_reason_msg = $1, + task_id_submit = NULL, after_submit = FALSE + WHERE sp_id = $2 AND sector_number = $3`, err.Error(), update.SpID, update.SectorNumber) + + log.Errorw("allocation check failed with an unrecoverable issue", "sp", update.SpID, "sector", update.SectorNumber, "err", err) + return true, xerrors.Errorf("allocation check failed with an unrecoverable issue: %w", multierr.Combine(err, err2)) + } + return false, err } @@ -178,13 +202,6 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("parsing new sealed cid: %w", err) } - maddr, err := address.NewIDAddress(uint64(update.SpID)) - if err != nil { - return false, xerrors.Errorf("parsing miner address: %w", err) - } - - snum := abi.SectorNumber(update.SectorNumber) - sl, err := s.api.StateSectorPartition(ctx, maddr, snum, types.EmptyTSK) if err != nil { return false, xerrors.Errorf("getting sector location: %w", err) @@ -218,14 +235,6 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return false, xerrors.Errorf("getting miner info: %w", err) } - onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key()) - if err != nil { - return false, xerrors.Errorf("getting sector info: %w", err) - } - if onChainInfo == nil { - return false, xerrors.Errorf("sector not found on chain") - } - ssize, err := onChainInfo.SealProof.SectorSize() if err != nil { return false, xerrors.Errorf("getting sector size: %w", err) @@ -274,7 +283,7 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done return true, xerrors.Errorf("pushing message to mpool (beyond deal start epoch): %w", multierr.Combine(err, err2)) } - return false, xerrors.Errorf("pushing message to mpool: %w", err) + return false, xerrors.Errorf("pushing message to mpool (minStart %d, timeTo %d): %w", minStart, minStart-ts.Height(), err) } _, err = s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET prove_msg_cid = $1, task_id_submit = NULL, after_submit = TRUE WHERE task_id_submit = $2`, mcid.String(), taskID) diff --git a/web/api/webrpc/upgrade.go b/web/api/webrpc/upgrade.go index e0261826b..109d2ae76 100644 --- a/web/api/webrpc/upgrade.go +++ b/web/api/webrpc/upgrade.go @@ -19,11 +19,15 @@ type UpgradeSector struct { TaskIDMoveStorage *uint64 `db:"task_id_move_storage"` AfterMoveStorage bool `db:"after_move_storage"` + + Failed bool `db:"failed"` + FailedReason string `db:"failed_reason"` + FailedMsg string `db:"failed_reason_msg"` } func (a *WebRPC) UpgradeSectors(ctx context.Context) ([]UpgradeSector, error) { sectors := []UpgradeSector{} - err := a.deps.DB.Select(ctx, §ors, `SELECT sp_id, sector_number, task_id_encode, after_encode, task_id_prove, after_prove, task_id_submit, after_submit, after_prove_msg_success, task_id_move_storage, after_move_storage FROM sectors_snap_pipeline`) + err := a.deps.DB.Select(ctx, §ors, `SELECT sp_id, sector_number, task_id_encode, after_encode, task_id_prove, after_prove, task_id_submit, after_submit, after_prove_msg_success, task_id_move_storage, after_move_storage, failed, failed_reason, failed_reason_msg FROM sectors_snap_pipeline`) if err != nil { return nil, err } diff --git a/web/static/snap/upgrade-sectors.mjs b/web/static/snap/upgrade-sectors.mjs index e17e03204..4f88ba4ae 100644 --- a/web/static/snap/upgrade-sectors.mjs +++ b/web/static/snap/upgrade-sectors.mjs @@ -34,6 +34,7 @@ class UpgradeSectors extends LitElement {
FAILED
${entry.FailedReason}
` : 'Healthy'}