Skip to content

Commit

Permalink
Fix Sector selection in snap ingester (#137)
Browse files Browse the repository at this point in the history
* snap: Check deal start when send fails

* snap: Better handle sector duration bounds

* snap: Fix pledge math
  • Loading branch information
magik6k authored Aug 9, 2024
1 parent 65e1e60 commit 0ce2342
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 46 deletions.
8 changes: 8 additions & 0 deletions harmony/harmonydb/sql/20240611-snap-pipeline.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ CREATE TABLE sectors_snap_pipeline (
task_id_move_storage BIGINT,
after_move_storage BOOLEAN NOT NULL DEFAULT FALSE,

-- fail
-- added in 20240809-snap-failures.sql
-- Failure handling
-- failed bool not null default false,
-- failed_at timestamp with timezone,
-- failed_reason varchar(20) not null default '',
-- failed_reason_msg text not null default '',

FOREIGN KEY (sp_id, sector_number) REFERENCES sectors_meta (sp_id, sector_num),
PRIMARY KEY (sp_id, sector_number)
);
Expand Down
11 changes: 11 additions & 0 deletions harmony/harmonydb/sql/20240809-snap-failures.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE sectors_snap_pipeline
ADD COLUMN failed BOOLEAN NOT NULL DEFAULT FALSE;

ALTER TABLE sectors_snap_pipeline
ADD COLUMN failed_at TIMESTAMP WITH TIME ZONE;

ALTER TABLE sectors_snap_pipeline
ADD COLUMN failed_reason VARCHAR(20) NOT NULL DEFAULT '';

ALTER TABLE sectors_snap_pipeline
ADD COLUMN failed_reason_msg TEXT NOT NULL DEFAULT '';
1 change: 1 addition & 0 deletions market/deal_ingest_seal.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type PieceIngesterApi interface {
StateGetAllocationForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*verifregtypes.Allocation, error)
StateGetAllocationIdForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (verifregtypes.AllocationId, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
}

type openSector struct {
Expand Down
62 changes: 56 additions & 6 deletions market/deal_ingest_snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
piece.PublishCid = nil
}

head, err := p.api.ChainHead(ctx)
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("getting chain head: %w", err)
}

var maxExpiration int64
vd.isVerified = piece.PieceActivationManifest.VerifiedAllocationKey != nil
if vd.isVerified {
Expand All @@ -253,7 +258,7 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
vd.tmin = alloc.TermMin
vd.tmax = alloc.TermMax

maxExpiration = int64(piece.DealSchedule.EndEpoch + alloc.TermMax)
maxExpiration = int64(head.Height() + alloc.TermMax)
}
propJson, err = json.Marshal(piece.PieceActivationManifest)
if err != nil {
Expand Down Expand Up @@ -309,16 +314,47 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
}

if len(candidates) == 0 {
return false, xerrors.Errorf("no suitable sectors found")
minEpoch := piece.DealSchedule.EndEpoch
maxEpoch := abi.ChainEpoch(maxExpiration)

minEpochDays := (minEpoch - head.Height()) / builtin.EpochsInDay
maxEpochDays := (maxEpoch - head.Height()) / builtin.EpochsInDay

return false, xerrors.Errorf("no suitable sectors found, minEpoch: %d, maxEpoch: %d, minExpirationDays: %d, maxExpirationDays: %d", minEpoch, maxEpoch, minEpochDays, maxEpochDays)
}

// todo - nice to have:
// * double check the sector expiration
// * check sector liveness
// * check deadline mutable

candidate := candidates[0] // this one works best

si, err := p.api.StateSectorGetInfo(ctx, p.miner, abi.SectorNumber(candidate.Sector), types.EmptyTSK)
if err != nil {
return false, xerrors.Errorf("getting sector info: %w", err)
}

sectorLifeTime := si.Expiration - head.Height()
if sectorLifeTime < 0 {
return false, xerrors.Errorf("sector lifetime is negative!?")
}
if piece.DealSchedule.EndEpoch > si.Expiration {
return false, xerrors.Errorf("sector expiration is too soon: %d < %d", si.Expiration, piece.DealSchedule.EndEpoch)
}
if maxExpiration != 0 && si.Expiration > abi.ChainEpoch(maxExpiration) {
return false, xerrors.Errorf("sector expiration is too late: %d > %d", si.Expiration, maxExpiration)
}

// info log detailing EVERYTHING including all the epoch bounds
log.Infow("allocating piece to sector",
"sector", candidate.Sector,
"expiration", si.Expiration,
"sectorLifeTime", sectorLifeTime,
"dealStartEpoch", piece.DealSchedule.StartEpoch,
"dealEndEpoch", piece.DealSchedule.EndEpoch,
"maxExpiration", maxExpiration,
)

_, err = tx.Exec(`SELECT insert_snap_ddo_piece($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,
p.mid, candidate.Sector, 0,
piece.PieceActivationManifest.CID, piece.PieceActivationManifest.Size,
Expand Down Expand Up @@ -358,6 +394,11 @@ func (p *PieceIngesterSnap) allocateToExisting(ctx context.Context, piece lpiece
var allocated bool
var rerr error

head, err := p.api.ChainHead(ctx)
if err != nil {
return false, api.SectorOffset{}, xerrors.Errorf("getting chain head: %w", err)
}

comm, err := p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
openSectors, err := p.getOpenSectors(tx)
if err != nil {
Expand All @@ -368,14 +409,23 @@ func (p *PieceIngesterSnap) allocateToExisting(ctx context.Context, piece lpiece
sec := sec
if sec.currentSize+psize <= abi.PaddedPieceSize(p.sectorSize) {
if vd.isVerified {
sectorLifeTime := sec.latestEndEpoch - sec.earliestStartEpoch
si, err := p.api.StateSectorGetInfo(ctx, p.miner, sec.number, types.EmptyTSK)
if err != nil {
log.Errorw("getting sector info", "error", err, "sector", sec.number, "miner", p.miner)
continue
}

sectorLifeTime := si.Expiration - head.Height()
if sectorLifeTime < 0 {
log.Errorw("sector lifetime is negative", "sector", sec.number, "miner", p.miner, "lifetime", sectorLifeTime)
continue
}

// Allocation's TMin must fit in sector and TMax should be at least sector lifetime or more
// Based on https://github.com/filecoin-project/builtin-actors/blob/a0e34d22665ac8c84f02fea8a099216f29ffaeeb/actors/verifreg/src/lib.rs#L1071-L1086
if sectorLifeTime <= vd.tmin && sectorLifeTime >= vd.tmax {
continue
}

// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// TODO ADD SNAP SECTOR EXP CHECKS
}

ret.Sector = sec.number
Expand Down
18 changes: 16 additions & 2 deletions tasks/seal/poller_commit_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) err
}

if exitcode.ExitCode(execResult[0].ExecutedRcptExitCode) != exitcode.Ok {
return s.pollCommitMsgFail(ctx, task, execResult[0])
if err := s.pollCommitMsgFail(ctx, maddr, task, execResult[0]); err != nil {
return err
}
}

si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
Expand Down Expand Up @@ -78,13 +80,25 @@ func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) err
return nil
}

func (s *SealPoller) pollCommitMsgFail(ctx context.Context, task pollTask, execResult dbExecResult) error {
func (s *SealPoller) pollCommitMsgFail(ctx context.Context, maddr address.Address, task pollTask, execResult dbExecResult) error {
switch exitcode.ExitCode(execResult.ExecutedRcptExitCode) {
case exitcode.SysErrInsufficientFunds:
fallthrough
case exitcode.SysErrOutOfGas:
// just retry
return s.pollRetryCommitMsgSend(ctx, task, execResult)
case exitcode.ErrNotFound:
// message not found, but maybe it's fine?

si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
if err != nil {
return xerrors.Errorf("get sector info: %w", err)
}
if si != nil {
return nil
}

return xerrors.Errorf("sector not found after, commit message can't be found either")
default:
return xerrors.Errorf("commit message failed with exit code %s", exitcode.ExitCode(execResult.ExecutedRcptExitCode))
}
Expand Down
35 changes: 19 additions & 16 deletions tasks/seal/task_submit_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
if err != nil {
return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err)
}
err = AllocationCheck(ctx, s.api, pam, pci, abi.ActorID(sectorParams.SpID), ts)
_, err = AllocationCheck(ctx, s.api, pam, pci.Info.Expiration, abi.ActorID(sectorParams.SpID), ts)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -428,41 +428,44 @@ type AllocNodeApi interface {
StateGetAllocation(ctx context.Context, clientAddr address.Address, allocationId verifregtypes9.AllocationId, tsk types.TipSetKey) (*verifregtypes9.Allocation, error)
}

func AllocationCheck(ctx context.Context, api AllocNodeApi, piece *miner.PieceActivationManifest, precomitInfo *miner.SectorPreCommitOnChainInfo, miner abi.ActorID, ts *types.TipSet) error {
func AllocationCheck(ctx context.Context, api AllocNodeApi, piece *miner.PieceActivationManifest, expiration abi.ChainEpoch, miner abi.ActorID, ts *types.TipSet) (permanent bool, err error) {
// skip pieces not claiming an allocation
if piece.VerifiedAllocationKey == nil {
return nil
return false, nil
}
addr, err := address.NewIDAddress(uint64(piece.VerifiedAllocationKey.Client))
if err != nil {
return err
return false, err
}

alloc, err := api.StateGetAllocation(ctx, addr, verifregtypes9.AllocationId(piece.VerifiedAllocationKey.ID), ts.Key())
if err != nil {
return err
return false, err
}
if alloc == nil {
return xerrors.Errorf("no allocation found for piece %s with allocation ID %d", piece.CID.String(), piece.VerifiedAllocationKey.ID)
return true, xerrors.Errorf("no allocation found for piece %s with allocation ID %d", piece.CID.String(), piece.VerifiedAllocationKey.ID)
}
if alloc.Provider != miner {
return xerrors.Errorf("provider id mismatch for piece %s: expected %d and found %d", piece.CID.String(), miner, alloc.Provider)
return true, xerrors.Errorf("provider id mismatch for piece %s: expected %d and found %d", piece.CID.String(), miner, alloc.Provider)
}
if alloc.Size != piece.Size {
return xerrors.Errorf("size mismatch for piece %s: expected %d and found %d", piece.CID.String(), piece.Size, alloc.Size)
return true, xerrors.Errorf("size mismatch for piece %s: expected %d and found %d", piece.CID.String(), piece.Size, alloc.Size)
}

if precomitInfo == nil {
return nil
}
if precomitInfo.Info.Expiration < ts.Height()+alloc.TermMin {
return xerrors.Errorf("sector expiration %d is before than allocation TermMin %d for piece %s", precomitInfo.Info.Expiration, ts.Height()+alloc.TermMin, piece.CID.String())
if expiration < ts.Height()+alloc.TermMin {
tooLittleBy := ts.Height() + alloc.TermMin - expiration

return true, xerrors.Errorf("sector expiration %d is before than allocation TermMin %d for piece %s (should be at least %d epochs more)", expiration, ts.Height()+alloc.TermMin, piece.CID.String(), tooLittleBy)
}
if precomitInfo.Info.Expiration > ts.Height()+alloc.TermMax {
return xerrors.Errorf("sector expiration %d is later than allocation TermMax %d for piece %s", precomitInfo.Info.Expiration, ts.Height()+alloc.TermMax, piece.CID.String())
if expiration > ts.Height()+alloc.TermMax {
tooMuchBy := expiration - (ts.Height() + alloc.TermMax)

return true, xerrors.Errorf("sector expiration %d is later than allocation TermMax %d for piece %s (should be at least %d epochs less)", expiration, ts.Height()+alloc.TermMax, piece.CID.String(), tooMuchBy)
}

return nil
log.Infow("allocation check details", "piece", piece.CID.String(), "client", alloc.Client, "provider", alloc.Provider, "size", alloc.Size, "term_min", alloc.TermMin, "term_max", alloc.TermMax, "sector_expiration", expiration)

return false, nil
}

var _ harmonytask.TaskInterface = &SubmitCommitTask{}
72 changes: 51 additions & 21 deletions tasks/snap/task_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/multierr"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -129,9 +130,10 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
var pieces []struct {
Manifest json.RawMessage `db:"direct_piece_activation_manifest"`
Size int64 `db:"piece_size"`
Start int64 `db:"direct_start_epoch"`
}
err = s.db.Select(ctx, &pieces, `
SELECT direct_piece_activation_manifest, piece_size
SELECT direct_piece_activation_manifest, piece_size, direct_start_epoch
FROM sectors_snap_initial_pieces
WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, update.SpID, update.SectorNumber)
if err != nil {
Expand All @@ -143,23 +145,55 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, xerrors.Errorf("getting chain head: %w", err)
}

maddr, err := address.NewIDAddress(uint64(update.SpID))
if err != nil {
return false, xerrors.Errorf("parsing miner address: %w", err)
}

snum := abi.SectorNumber(update.SectorNumber)

onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key())
if err != nil {
return false, xerrors.Errorf("getting sector info: %w", err)
}
if onChainInfo == nil {
return false, xerrors.Errorf("sector not found on chain")
}

var pams []miner.PieceActivationManifest
var weight, weightVerif = big.Zero(), big.Zero()
var minStart abi.ChainEpoch
for _, piece := range pieces {
var pam *miner.PieceActivationManifest
err = json.Unmarshal(piece.Manifest, &pam)
if err != nil {
return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err)
}
err = seal.AllocationCheck(ctx, s.api, pam, nil, abi.ActorID(update.SpID), ts)
unrecoverable, err := seal.AllocationCheck(ctx, s.api, pam, onChainInfo.Expiration, abi.ActorID(update.SpID), ts)
if err != nil {
if unrecoverable {
_, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET
failed = TRUE, failed_at = NOW(), failed_reason = 'alloc-check', failed_reason_msg = $1,
task_id_submit = NULL, after_submit = FALSE
WHERE sp_id = $2 AND sector_number = $3`, err.Error(), update.SpID, update.SectorNumber)

log.Errorw("allocation check failed with an unrecoverable issue", "sp", update.SpID, "sector", update.SectorNumber, "err", err)
return true, xerrors.Errorf("allocation check failed with an unrecoverable issue: %w", multierr.Combine(err, err2))
}

return false, err
}

pieceWeight := big.Mul(abi.NewStoragePower(piece.Size), big.NewInt(int64(onChainInfo.Expiration-ts.Height())))

if pam.VerifiedAllocationKey != nil {
weightVerif = big.Add(weightVerif, abi.NewStoragePower(piece.Size))
weightVerif = big.Add(weightVerif, pieceWeight)
} else {
weight = big.Add(weight, abi.NewStoragePower(piece.Size))
weight = big.Add(weight, pieceWeight)
}

if minStart == 0 || abi.ChainEpoch(piece.Start) < minStart {
minStart = abi.ChainEpoch(piece.Start)
}

pams = append(pams, *pam)
Expand All @@ -170,13 +204,6 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, xerrors.Errorf("parsing new sealed cid: %w", err)
}

maddr, err := address.NewIDAddress(uint64(update.SpID))
if err != nil {
return false, xerrors.Errorf("parsing miner address: %w", err)
}

snum := abi.SectorNumber(update.SectorNumber)

sl, err := s.api.StateSectorPartition(ctx, maddr, snum, types.EmptyTSK)
if err != nil {
return false, xerrors.Errorf("getting sector location: %w", err)
Expand Down Expand Up @@ -210,14 +237,6 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, xerrors.Errorf("getting miner info: %w", err)
}

onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key())
if err != nil {
return false, xerrors.Errorf("getting sector info: %w", err)
}
if onChainInfo == nil {
return false, xerrors.Errorf("sector not found on chain")
}

ssize, err := onChainInfo.SealProof.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
Expand Down Expand Up @@ -255,7 +274,18 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done

mcid, err := s.sender.Send(ctx, msg, mss, "update")
if err != nil {
return false, xerrors.Errorf("pushing message to mpool: %w", err)
if minStart != 0 && ts.Height() > minStart {
_, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET
failed = TRUE, failed_at = NOW(), failed_reason = 'start-expired', failed_reason_msg = $1,
task_id_submit = NULL, after_submit = FALSE
WHERE sp_id = $2 AND sector_number = $3`, err.Error(), update.SpID, update.SectorNumber)

log.Errorw("failed to push message to mpool (beyond deal start epoch)", "sp", update.SpID, "sector", update.SectorNumber, "err", err)

return true, xerrors.Errorf("pushing message to mpool (beyond deal start epoch): %w", multierr.Combine(err, err2))
}

return false, xerrors.Errorf("pushing message to mpool (minStart %d, timeTo %d): %w", minStart, minStart-ts.Height(), err)
}

_, err = s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET prove_msg_cid = $1, task_id_submit = NULL, after_submit = TRUE WHERE task_id_submit = $2`, mcid.String(), taskID)
Expand Down Expand Up @@ -364,7 +394,7 @@ func (s *SubmitTask) schedule(ctx context.Context, taskFunc harmonytask.AddTaskF
SectorNumber int64 `db:"sector_number"`
}

err := s.db.Select(ctx, &tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE after_encode = TRUE AND after_prove = TRUE AND after_submit = FALSE AND task_id_submit IS NULL`)
err := s.db.Select(ctx, &tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE failed = FALSE AND after_encode = TRUE AND after_prove = TRUE AND after_submit = FALSE AND task_id_submit IS NULL`)
if err != nil {
return false, xerrors.Errorf("getting tasks: %w", err)
}
Expand Down
Loading

0 comments on commit 0ce2342

Please sign in to comment.