Skip to content

Commit

Permalink
snap: Better handle sector duration bounds
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Aug 9, 2024
1 parent e5338cc commit fbefb2e
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 41 deletions.
1 change: 1 addition & 0 deletions market/deal_ingest_seal.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type PieceIngesterApi interface {
StateGetAllocationForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*verifregtypes.Allocation, error)
StateGetAllocationIdForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (verifregtypes.AllocationId, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
}

type openSector struct {
Expand Down
62 changes: 56 additions & 6 deletions market/deal_ingest_snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
piece.PublishCid = nil
}

head, err := p.api.ChainHead(ctx)
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("getting chain head: %w", err)
}

var maxExpiration int64
vd.isVerified = piece.PieceActivationManifest.VerifiedAllocationKey != nil
if vd.isVerified {
Expand All @@ -253,7 +258,7 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
vd.tmin = alloc.TermMin
vd.tmax = alloc.TermMax

maxExpiration = int64(piece.DealSchedule.EndEpoch + alloc.TermMax)
maxExpiration = int64(head.Height() + alloc.TermMax)
}
propJson, err = json.Marshal(piece.PieceActivationManifest)
if err != nil {
Expand Down Expand Up @@ -309,16 +314,47 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
}

if len(candidates) == 0 {
return false, xerrors.Errorf("no suitable sectors found")
minEpoch := piece.DealSchedule.EndEpoch
maxEpoch := abi.ChainEpoch(maxExpiration)

minEpochDays := (minEpoch - head.Height()) / builtin.EpochsInDay
maxEpochDays := (maxEpoch - head.Height()) / builtin.EpochsInDay

return false, xerrors.Errorf("no suitable sectors found, minEpoch: %d, maxEpoch: %d, minExpirationDays: %d, maxExpirationDays: %d", minEpoch, maxEpoch, minEpochDays, maxEpochDays)
}

// todo - nice to have:
// * double check the sector expiration
// * check sector liveness
// * check deadline mutable

candidate := candidates[0] // this one works best

si, err := p.api.StateSectorGetInfo(ctx, p.miner, abi.SectorNumber(candidate.Sector), types.EmptyTSK)
if err != nil {
return false, xerrors.Errorf("getting sector info: %w", err)
}

sectorLifeTime := si.Expiration - head.Height()
if sectorLifeTime < 0 {
return false, xerrors.Errorf("sector lifetime is negative!?")
}
if piece.DealSchedule.EndEpoch > si.Expiration {
return false, xerrors.Errorf("sector expiration is too soon: %d < %d", si.Expiration, piece.DealSchedule.EndEpoch)
}
if maxExpiration != 0 && si.Expiration > abi.ChainEpoch(maxExpiration) {
return false, xerrors.Errorf("sector expiration is too late: %d > %d", si.Expiration, maxExpiration)
}

// info log detailing EVERYTHING including all the epoch bounds
log.Infow("allocating piece to sector",
"sector", candidate.Sector,
"expiration", si.Expiration,
"sectorLifeTime", sectorLifeTime,
"dealStartEpoch", piece.DealSchedule.StartEpoch,
"dealEndEpoch", piece.DealSchedule.EndEpoch,
"maxExpiration", maxExpiration,
)

_, err = tx.Exec(`SELECT insert_snap_ddo_piece($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,
p.mid, candidate.Sector, 0,
piece.PieceActivationManifest.CID, piece.PieceActivationManifest.Size,
Expand Down Expand Up @@ -358,6 +394,11 @@ func (p *PieceIngesterSnap) allocateToExisting(ctx context.Context, piece lpiece
var allocated bool
var rerr error

head, err := p.api.ChainHead(ctx)
if err != nil {
return false, api.SectorOffset{}, xerrors.Errorf("getting chain head: %w", err)
}

comm, err := p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
openSectors, err := p.getOpenSectors(tx)
if err != nil {
Expand All @@ -368,14 +409,23 @@ func (p *PieceIngesterSnap) allocateToExisting(ctx context.Context, piece lpiece
sec := sec
if sec.currentSize+psize <= abi.PaddedPieceSize(p.sectorSize) {
if vd.isVerified {
sectorLifeTime := sec.latestEndEpoch - sec.earliestStartEpoch
si, err := p.api.StateSectorGetInfo(ctx, p.miner, sec.number, types.EmptyTSK)
if err != nil {
log.Errorw("getting sector info", "error", err, "sector", sec.number, "miner", p.miner)
continue
}

sectorLifeTime := si.Expiration - head.Height()
if sectorLifeTime < 0 {
log.Errorw("sector lifetime is negative", "sector", sec.number, "miner", p.miner, "lifetime", sectorLifeTime)
continue
}

// Allocation's TMin must fit in sector and TMax should be at least sector lifetime or more
// Based on https://github.com/filecoin-project/builtin-actors/blob/a0e34d22665ac8c84f02fea8a099216f29ffaeeb/actors/verifreg/src/lib.rs#L1071-L1086
if sectorLifeTime <= vd.tmin && sectorLifeTime >= vd.tmax {
continue
}

// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// TODO ADD SNAP SECTOR EXP CHECKS
}

ret.Sector = sec.number
Expand Down
35 changes: 19 additions & 16 deletions tasks/seal/task_submit_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
if err != nil {
return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err)
}
err = AllocationCheck(ctx, s.api, pam, pci, abi.ActorID(sectorParams.SpID), ts)
_, err = AllocationCheck(ctx, s.api, pam, pci.Info.Expiration, abi.ActorID(sectorParams.SpID), ts)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -428,41 +428,44 @@ type AllocNodeApi interface {
StateGetAllocation(ctx context.Context, clientAddr address.Address, allocationId verifregtypes9.AllocationId, tsk types.TipSetKey) (*verifregtypes9.Allocation, error)
}

func AllocationCheck(ctx context.Context, api AllocNodeApi, piece *miner.PieceActivationManifest, precomitInfo *miner.SectorPreCommitOnChainInfo, miner abi.ActorID, ts *types.TipSet) error {
func AllocationCheck(ctx context.Context, api AllocNodeApi, piece *miner.PieceActivationManifest, expiration abi.ChainEpoch, miner abi.ActorID, ts *types.TipSet) (permanent bool, err error) {
// skip pieces not claiming an allocation
if piece.VerifiedAllocationKey == nil {
return nil
return false, nil
}
addr, err := address.NewIDAddress(uint64(piece.VerifiedAllocationKey.Client))
if err != nil {
return err
return false, err
}

alloc, err := api.StateGetAllocation(ctx, addr, verifregtypes9.AllocationId(piece.VerifiedAllocationKey.ID), ts.Key())
if err != nil {
return err
return false, err
}
if alloc == nil {
return xerrors.Errorf("no allocation found for piece %s with allocation ID %d", piece.CID.String(), piece.VerifiedAllocationKey.ID)
return true, xerrors.Errorf("no allocation found for piece %s with allocation ID %d", piece.CID.String(), piece.VerifiedAllocationKey.ID)
}
if alloc.Provider != miner {
return xerrors.Errorf("provider id mismatch for piece %s: expected %d and found %d", piece.CID.String(), miner, alloc.Provider)
return true, xerrors.Errorf("provider id mismatch for piece %s: expected %d and found %d", piece.CID.String(), miner, alloc.Provider)
}
if alloc.Size != piece.Size {
return xerrors.Errorf("size mismatch for piece %s: expected %d and found %d", piece.CID.String(), piece.Size, alloc.Size)
return true, xerrors.Errorf("size mismatch for piece %s: expected %d and found %d", piece.CID.String(), piece.Size, alloc.Size)
}

if precomitInfo == nil {
return nil
}
if precomitInfo.Info.Expiration < ts.Height()+alloc.TermMin {
return xerrors.Errorf("sector expiration %d is before than allocation TermMin %d for piece %s", precomitInfo.Info.Expiration, ts.Height()+alloc.TermMin, piece.CID.String())
if expiration < ts.Height()+alloc.TermMin {
tooLittleBy := ts.Height() + alloc.TermMin - expiration

return true, xerrors.Errorf("sector expiration %d is before than allocation TermMin %d for piece %s (should be at least %d epochs more)", expiration, ts.Height()+alloc.TermMin, piece.CID.String(), tooLittleBy)
}
if precomitInfo.Info.Expiration > ts.Height()+alloc.TermMax {
return xerrors.Errorf("sector expiration %d is later than allocation TermMax %d for piece %s", precomitInfo.Info.Expiration, ts.Height()+alloc.TermMax, piece.CID.String())
if expiration > ts.Height()+alloc.TermMax {
tooMuchBy := expiration - (ts.Height() + alloc.TermMax)

return true, xerrors.Errorf("sector expiration %d is later than allocation TermMax %d for piece %s (should be at least %d epochs less)", expiration, ts.Height()+alloc.TermMax, piece.CID.String(), tooMuchBy)
}

return nil
log.Infow("allocation check details", "piece", piece.CID.String(), "client", alloc.Client, "provider", alloc.Provider, "size", alloc.Size, "term_min", alloc.TermMin, "term_max", alloc.TermMax, "sector_expiration", expiration)

return false, nil
}

var _ harmonytask.TaskInterface = &SubmitCommitTask{}
45 changes: 27 additions & 18 deletions tasks/snap/task_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,42 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, xerrors.Errorf("getting chain head: %w", err)
}

maddr, err := address.NewIDAddress(uint64(update.SpID))
if err != nil {
return false, xerrors.Errorf("parsing miner address: %w", err)
}

snum := abi.SectorNumber(update.SectorNumber)

onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key())
if err != nil {
return false, xerrors.Errorf("getting sector info: %w", err)
}
if onChainInfo == nil {
return false, xerrors.Errorf("sector not found on chain")
}

var pams []miner.PieceActivationManifest
var weight, weightVerif = big.Zero(), big.Zero()
var minStart abi.ChainEpoch

for _, piece := range pieces {
var pam *miner.PieceActivationManifest
err = json.Unmarshal(piece.Manifest, &pam)
if err != nil {
return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err)
}
err = seal.AllocationCheck(ctx, s.api, pam, nil, abi.ActorID(update.SpID), ts)
unrecoverable, err := seal.AllocationCheck(ctx, s.api, pam, onChainInfo.Expiration, abi.ActorID(update.SpID), ts)
if err != nil {
if unrecoverable {
_, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET
failed = TRUE, failed_at = NOW(), failed_reason = 'alloc-check', failed_reason_msg = $1,
task_id_submit = NULL, after_submit = FALSE
WHERE sp_id = $2 AND sector_number = $3`, err.Error(), update.SpID, update.SectorNumber)

log.Errorw("allocation check failed with an unrecoverable issue", "sp", update.SpID, "sector", update.SectorNumber, "err", err)
return true, xerrors.Errorf("allocation check failed with an unrecoverable issue: %w", multierr.Combine(err, err2))
}

return false, err
}

Expand All @@ -178,13 +202,6 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, xerrors.Errorf("parsing new sealed cid: %w", err)
}

maddr, err := address.NewIDAddress(uint64(update.SpID))
if err != nil {
return false, xerrors.Errorf("parsing miner address: %w", err)
}

snum := abi.SectorNumber(update.SectorNumber)

sl, err := s.api.StateSectorPartition(ctx, maddr, snum, types.EmptyTSK)
if err != nil {
return false, xerrors.Errorf("getting sector location: %w", err)
Expand Down Expand Up @@ -218,14 +235,6 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, xerrors.Errorf("getting miner info: %w", err)
}

onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key())
if err != nil {
return false, xerrors.Errorf("getting sector info: %w", err)
}
if onChainInfo == nil {
return false, xerrors.Errorf("sector not found on chain")
}

ssize, err := onChainInfo.SealProof.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
Expand Down Expand Up @@ -274,7 +283,7 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return true, xerrors.Errorf("pushing message to mpool (beyond deal start epoch): %w", multierr.Combine(err, err2))
}

return false, xerrors.Errorf("pushing message to mpool: %w", err)
return false, xerrors.Errorf("pushing message to mpool (minStart %d, timeTo %d): %w", minStart, minStart-ts.Height(), err)
}

_, err = s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET prove_msg_cid = $1, task_id_submit = NULL, after_submit = TRUE WHERE task_id_submit = $2`, mcid.String(), taskID)
Expand Down
6 changes: 5 additions & 1 deletion web/api/webrpc/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@ type UpgradeSector struct {

TaskIDMoveStorage *uint64 `db:"task_id_move_storage"`
AfterMoveStorage bool `db:"after_move_storage"`

Failed bool `db:"failed"`
FailedReason string `db:"failed_reason"`
FailedMsg string `db:"failed_reason_msg"`
}

func (a *WebRPC) UpgradeSectors(ctx context.Context) ([]UpgradeSector, error) {
sectors := []UpgradeSector{}
err := a.deps.DB.Select(ctx, &sectors, `SELECT sp_id, sector_number, task_id_encode, after_encode, task_id_prove, after_prove, task_id_submit, after_submit, after_prove_msg_success, task_id_move_storage, after_move_storage FROM sectors_snap_pipeline`)
err := a.deps.DB.Select(ctx, &sectors, `SELECT sp_id, sector_number, task_id_encode, after_encode, task_id_prove, after_prove, task_id_submit, after_submit, after_prove_msg_success, task_id_move_storage, after_move_storage, failed, failed_reason, failed_reason_msg FROM sectors_snap_pipeline`)
if err != nil {
return nil, err
}
Expand Down
2 changes: 2 additions & 0 deletions web/static/snap/upgrade-sectors.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class UpgradeSectors extends LitElement {
<th>Submit</th>
<th>Move Storage</th>
<th>Prove Message Landed</th>
<th>State</th>
<th>Actions</th>
</tr>
Expand All @@ -49,6 +50,7 @@ class UpgradeSectors extends LitElement {
<td>${entry.AfterSubmit ? 'Done' : entry.TaskIDSubmit === null ? 'Not Started' : entry.TaskIDSubmit}</td>
<td>${entry.AfterMoveStorage ? 'Done' : entry.TaskIDMoveStorage === null ? 'Not Started' : entry.TaskIDMoveStorage}</td>
<td>${entry.AfterProveSuccess ? 'Done' : entry.AfterSubmit ? 'Waiting' : 'Not Sent'}</td>
<td>${entry.Failed ? html`<abbr title=${entry.FailedMsg}><p>FAILED</p><p>${entry.FailedReason}</p></abbr>` : 'Healthy'}</td>
<td>
${ '' /*todo: this button is a massive footgun, it should get some more safety*/ }
Expand Down

0 comments on commit fbefb2e

Please sign in to comment.