Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Sector selection in snap ingester #137

Merged
merged 3 commits into from
Aug 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions harmony/harmonydb/sql/20240611-snap-pipeline.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ CREATE TABLE sectors_snap_pipeline (
task_id_move_storage BIGINT,
after_move_storage BOOLEAN NOT NULL DEFAULT FALSE,

-- fail
-- added in 20240809-snap-failures.sql
-- Failure handling
-- failed bool not null default false,
-- failed_at timestamp with timezone,
-- failed_reason varchar(20) not null default '',
-- failed_reason_msg text not null default '',

FOREIGN KEY (sp_id, sector_number) REFERENCES sectors_meta (sp_id, sector_num),
PRIMARY KEY (sp_id, sector_number)
);
Expand Down
11 changes: 11 additions & 0 deletions harmony/harmonydb/sql/20240809-snap-failures.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
ALTER TABLE sectors_snap_pipeline
ADD COLUMN failed BOOLEAN NOT NULL DEFAULT FALSE;

ALTER TABLE sectors_snap_pipeline
ADD COLUMN failed_at TIMESTAMP WITH TIME ZONE;

ALTER TABLE sectors_snap_pipeline
ADD COLUMN failed_reason VARCHAR(20) NOT NULL DEFAULT '';

ALTER TABLE sectors_snap_pipeline
ADD COLUMN failed_reason_msg TEXT NOT NULL DEFAULT '';
1 change: 1 addition & 0 deletions market/deal_ingest_seal.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ type PieceIngesterApi interface {
StateGetAllocationForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*verifregtypes.Allocation, error)
StateGetAllocationIdForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (verifregtypes.AllocationId, error)
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
}

type openSector struct {
Expand Down
62 changes: 56 additions & 6 deletions market/deal_ingest_snap.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,11 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
piece.PublishCid = nil
}

head, err := p.api.ChainHead(ctx)
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("getting chain head: %w", err)
}

var maxExpiration int64
vd.isVerified = piece.PieceActivationManifest.VerifiedAllocationKey != nil
if vd.isVerified {
Expand All @@ -253,7 +258,7 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
vd.tmin = alloc.TermMin
vd.tmax = alloc.TermMax

maxExpiration = int64(piece.DealSchedule.EndEpoch + alloc.TermMax)
maxExpiration = int64(head.Height() + alloc.TermMax)
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
}
propJson, err = json.Marshal(piece.PieceActivationManifest)
if err != nil {
Expand Down Expand Up @@ -309,16 +314,47 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
}

if len(candidates) == 0 {
return false, xerrors.Errorf("no suitable sectors found")
minEpoch := piece.DealSchedule.EndEpoch
maxEpoch := abi.ChainEpoch(maxExpiration)

minEpochDays := (minEpoch - head.Height()) / builtin.EpochsInDay
maxEpochDays := (maxEpoch - head.Height()) / builtin.EpochsInDay

return false, xerrors.Errorf("no suitable sectors found, minEpoch: %d, maxEpoch: %d, minExpirationDays: %d, maxExpirationDays: %d", minEpoch, maxEpoch, minEpochDays, maxEpochDays)
}

// todo - nice to have:
// * double check the sector expiration
// * check sector liveness
// * check deadline mutable

candidate := candidates[0] // this one works best

si, err := p.api.StateSectorGetInfo(ctx, p.miner, abi.SectorNumber(candidate.Sector), types.EmptyTSK)
if err != nil {
return false, xerrors.Errorf("getting sector info: %w", err)
}

sectorLifeTime := si.Expiration - head.Height()
if sectorLifeTime < 0 {
return false, xerrors.Errorf("sector lifetime is negative!?")
}
if piece.DealSchedule.EndEpoch > si.Expiration {
return false, xerrors.Errorf("sector expiration is too soon: %d < %d", si.Expiration, piece.DealSchedule.EndEpoch)
}
if maxExpiration != 0 && si.Expiration > abi.ChainEpoch(maxExpiration) {
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
return false, xerrors.Errorf("sector expiration is too late: %d > %d", si.Expiration, maxExpiration)
}

// info log detailing EVERYTHING including all the epoch bounds
log.Infow("allocating piece to sector",
"sector", candidate.Sector,
"expiration", si.Expiration,
"sectorLifeTime", sectorLifeTime,
"dealStartEpoch", piece.DealSchedule.StartEpoch,
"dealEndEpoch", piece.DealSchedule.EndEpoch,
"maxExpiration", maxExpiration,
)

_, err = tx.Exec(`SELECT insert_snap_ddo_piece($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,
p.mid, candidate.Sector, 0,
piece.PieceActivationManifest.CID, piece.PieceActivationManifest.Size,
Expand Down Expand Up @@ -358,6 +394,11 @@ func (p *PieceIngesterSnap) allocateToExisting(ctx context.Context, piece lpiece
var allocated bool
var rerr error

head, err := p.api.ChainHead(ctx)
if err != nil {
return false, api.SectorOffset{}, xerrors.Errorf("getting chain head: %w", err)
}

comm, err := p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
openSectors, err := p.getOpenSectors(tx)
if err != nil {
Expand All @@ -368,14 +409,23 @@ func (p *PieceIngesterSnap) allocateToExisting(ctx context.Context, piece lpiece
sec := sec
if sec.currentSize+psize <= abi.PaddedPieceSize(p.sectorSize) {
if vd.isVerified {
sectorLifeTime := sec.latestEndEpoch - sec.earliestStartEpoch
si, err := p.api.StateSectorGetInfo(ctx, p.miner, sec.number, types.EmptyTSK)
if err != nil {
log.Errorw("getting sector info", "error", err, "sector", sec.number, "miner", p.miner)
continue
}

sectorLifeTime := si.Expiration - head.Height()
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
if sectorLifeTime < 0 {
log.Errorw("sector lifetime is negative", "sector", sec.number, "miner", p.miner, "lifetime", sectorLifeTime)
continue
}

// Allocation's TMin must fit in sector and TMax should be at least sector lifetime or more
// Based on https://github.com/filecoin-project/builtin-actors/blob/a0e34d22665ac8c84f02fea8a099216f29ffaeeb/actors/verifreg/src/lib.rs#L1071-L1086
if sectorLifeTime <= vd.tmin && sectorLifeTime >= vd.tmax {
continue
}

// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// TODO ADD SNAP SECTOR EXP CHECKS
}

ret.Sector = sec.number
Expand Down
18 changes: 16 additions & 2 deletions tasks/seal/poller_commit_msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,9 @@ func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) err
}

if exitcode.ExitCode(execResult[0].ExecutedRcptExitCode) != exitcode.Ok {
return s.pollCommitMsgFail(ctx, task, execResult[0])
if err := s.pollCommitMsgFail(ctx, maddr, task, execResult[0]); err != nil {
return err
}
}

si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
Expand Down Expand Up @@ -78,13 +80,25 @@ func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) err
return nil
}

func (s *SealPoller) pollCommitMsgFail(ctx context.Context, task pollTask, execResult dbExecResult) error {
func (s *SealPoller) pollCommitMsgFail(ctx context.Context, maddr address.Address, task pollTask, execResult dbExecResult) error {
switch exitcode.ExitCode(execResult.ExecutedRcptExitCode) {
case exitcode.SysErrInsufficientFunds:
fallthrough
case exitcode.SysErrOutOfGas:
// just retry
return s.pollRetryCommitMsgSend(ctx, task, execResult)
case exitcode.ErrNotFound:
// message not found, but maybe it's fine?

si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
if err != nil {
return xerrors.Errorf("get sector info: %w", err)
}
if si != nil {
return nil
}

return xerrors.Errorf("sector not found after, commit message can't be found either")
default:
return xerrors.Errorf("commit message failed with exit code %s", exitcode.ExitCode(execResult.ExecutedRcptExitCode))
}
Expand Down
35 changes: 19 additions & 16 deletions tasks/seal/task_submit_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
if err != nil {
return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err)
}
err = AllocationCheck(ctx, s.api, pam, pci, abi.ActorID(sectorParams.SpID), ts)
_, err = AllocationCheck(ctx, s.api, pam, pci.Info.Expiration, abi.ActorID(sectorParams.SpID), ts)
if err != nil {
return false, err
}
Expand Down Expand Up @@ -428,41 +428,44 @@ type AllocNodeApi interface {
StateGetAllocation(ctx context.Context, clientAddr address.Address, allocationId verifregtypes9.AllocationId, tsk types.TipSetKey) (*verifregtypes9.Allocation, error)
}

func AllocationCheck(ctx context.Context, api AllocNodeApi, piece *miner.PieceActivationManifest, precomitInfo *miner.SectorPreCommitOnChainInfo, miner abi.ActorID, ts *types.TipSet) error {
func AllocationCheck(ctx context.Context, api AllocNodeApi, piece *miner.PieceActivationManifest, expiration abi.ChainEpoch, miner abi.ActorID, ts *types.TipSet) (permanent bool, err error) {
// skip pieces not claiming an allocation
if piece.VerifiedAllocationKey == nil {
return nil
return false, nil
}
addr, err := address.NewIDAddress(uint64(piece.VerifiedAllocationKey.Client))
if err != nil {
return err
return false, err
}

alloc, err := api.StateGetAllocation(ctx, addr, verifregtypes9.AllocationId(piece.VerifiedAllocationKey.ID), ts.Key())
if err != nil {
return err
return false, err
}
if alloc == nil {
return xerrors.Errorf("no allocation found for piece %s with allocation ID %d", piece.CID.String(), piece.VerifiedAllocationKey.ID)
return true, xerrors.Errorf("no allocation found for piece %s with allocation ID %d", piece.CID.String(), piece.VerifiedAllocationKey.ID)
}
if alloc.Provider != miner {
return xerrors.Errorf("provider id mismatch for piece %s: expected %d and found %d", piece.CID.String(), miner, alloc.Provider)
return true, xerrors.Errorf("provider id mismatch for piece %s: expected %d and found %d", piece.CID.String(), miner, alloc.Provider)
}
if alloc.Size != piece.Size {
return xerrors.Errorf("size mismatch for piece %s: expected %d and found %d", piece.CID.String(), piece.Size, alloc.Size)
return true, xerrors.Errorf("size mismatch for piece %s: expected %d and found %d", piece.CID.String(), piece.Size, alloc.Size)
}

if precomitInfo == nil {
return nil
}
if precomitInfo.Info.Expiration < ts.Height()+alloc.TermMin {
return xerrors.Errorf("sector expiration %d is before than allocation TermMin %d for piece %s", precomitInfo.Info.Expiration, ts.Height()+alloc.TermMin, piece.CID.String())
if expiration < ts.Height()+alloc.TermMin {
tooLittleBy := ts.Height() + alloc.TermMin - expiration

return true, xerrors.Errorf("sector expiration %d is before than allocation TermMin %d for piece %s (should be at least %d epochs more)", expiration, ts.Height()+alloc.TermMin, piece.CID.String(), tooLittleBy)
}
if precomitInfo.Info.Expiration > ts.Height()+alloc.TermMax {
return xerrors.Errorf("sector expiration %d is later than allocation TermMax %d for piece %s", precomitInfo.Info.Expiration, ts.Height()+alloc.TermMax, piece.CID.String())
if expiration > ts.Height()+alloc.TermMax {
LexLuthr marked this conversation as resolved.
Show resolved Hide resolved
tooMuchBy := expiration - (ts.Height() + alloc.TermMax)

return true, xerrors.Errorf("sector expiration %d is later than allocation TermMax %d for piece %s (should be at least %d epochs less)", expiration, ts.Height()+alloc.TermMax, piece.CID.String(), tooMuchBy)
}

return nil
log.Infow("allocation check details", "piece", piece.CID.String(), "client", alloc.Client, "provider", alloc.Provider, "size", alloc.Size, "term_min", alloc.TermMin, "term_max", alloc.TermMax, "sector_expiration", expiration)

return false, nil
}

var _ harmonytask.TaskInterface = &SubmitCommitTask{}
72 changes: 51 additions & 21 deletions tasks/snap/task_submit.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/multierr"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -129,9 +130,10 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
var pieces []struct {
Manifest json.RawMessage `db:"direct_piece_activation_manifest"`
Size int64 `db:"piece_size"`
Start int64 `db:"direct_start_epoch"`
}
err = s.db.Select(ctx, &pieces, `
SELECT direct_piece_activation_manifest, piece_size
SELECT direct_piece_activation_manifest, piece_size, direct_start_epoch
FROM sectors_snap_initial_pieces
WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, update.SpID, update.SectorNumber)
if err != nil {
Expand All @@ -143,23 +145,55 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, xerrors.Errorf("getting chain head: %w", err)
}

maddr, err := address.NewIDAddress(uint64(update.SpID))
if err != nil {
return false, xerrors.Errorf("parsing miner address: %w", err)
}

snum := abi.SectorNumber(update.SectorNumber)

onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key())
if err != nil {
return false, xerrors.Errorf("getting sector info: %w", err)
}
if onChainInfo == nil {
return false, xerrors.Errorf("sector not found on chain")
}

var pams []miner.PieceActivationManifest
var weight, weightVerif = big.Zero(), big.Zero()
var minStart abi.ChainEpoch
for _, piece := range pieces {
var pam *miner.PieceActivationManifest
err = json.Unmarshal(piece.Manifest, &pam)
if err != nil {
return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err)
}
err = seal.AllocationCheck(ctx, s.api, pam, nil, abi.ActorID(update.SpID), ts)
unrecoverable, err := seal.AllocationCheck(ctx, s.api, pam, onChainInfo.Expiration, abi.ActorID(update.SpID), ts)
if err != nil {
if unrecoverable {
_, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET
failed = TRUE, failed_at = NOW(), failed_reason = 'alloc-check', failed_reason_msg = $1,
task_id_submit = NULL, after_submit = FALSE
WHERE sp_id = $2 AND sector_number = $3`, err.Error(), update.SpID, update.SectorNumber)

log.Errorw("allocation check failed with an unrecoverable issue", "sp", update.SpID, "sector", update.SectorNumber, "err", err)
return true, xerrors.Errorf("allocation check failed with an unrecoverable issue: %w", multierr.Combine(err, err2))
}

return false, err
}

pieceWeight := big.Mul(abi.NewStoragePower(piece.Size), big.NewInt(int64(onChainInfo.Expiration-ts.Height())))

if pam.VerifiedAllocationKey != nil {
weightVerif = big.Add(weightVerif, abi.NewStoragePower(piece.Size))
weightVerif = big.Add(weightVerif, pieceWeight)
} else {
weight = big.Add(weight, abi.NewStoragePower(piece.Size))
weight = big.Add(weight, pieceWeight)
}

if minStart == 0 || abi.ChainEpoch(piece.Start) < minStart {
minStart = abi.ChainEpoch(piece.Start)
}

pams = append(pams, *pam)
Expand All @@ -170,13 +204,6 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, xerrors.Errorf("parsing new sealed cid: %w", err)
}

maddr, err := address.NewIDAddress(uint64(update.SpID))
if err != nil {
return false, xerrors.Errorf("parsing miner address: %w", err)
}

snum := abi.SectorNumber(update.SectorNumber)

sl, err := s.api.StateSectorPartition(ctx, maddr, snum, types.EmptyTSK)
if err != nil {
return false, xerrors.Errorf("getting sector location: %w", err)
Expand Down Expand Up @@ -210,14 +237,6 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
return false, xerrors.Errorf("getting miner info: %w", err)
}

onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key())
if err != nil {
return false, xerrors.Errorf("getting sector info: %w", err)
}
if onChainInfo == nil {
return false, xerrors.Errorf("sector not found on chain")
}

ssize, err := onChainInfo.SealProof.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
Expand Down Expand Up @@ -255,7 +274,18 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done

mcid, err := s.sender.Send(ctx, msg, mss, "update")
if err != nil {
return false, xerrors.Errorf("pushing message to mpool: %w", err)
if minStart != 0 && ts.Height() > minStart {
_, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET
failed = TRUE, failed_at = NOW(), failed_reason = 'start-expired', failed_reason_msg = $1,
task_id_submit = NULL, after_submit = FALSE
WHERE sp_id = $2 AND sector_number = $3`, err.Error(), update.SpID, update.SectorNumber)

log.Errorw("failed to push message to mpool (beyond deal start epoch)", "sp", update.SpID, "sector", update.SectorNumber, "err", err)

return true, xerrors.Errorf("pushing message to mpool (beyond deal start epoch): %w", multierr.Combine(err, err2))
}

return false, xerrors.Errorf("pushing message to mpool (minStart %d, timeTo %d): %w", minStart, minStart-ts.Height(), err)
}

_, err = s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET prove_msg_cid = $1, task_id_submit = NULL, after_submit = TRUE WHERE task_id_submit = $2`, mcid.String(), taskID)
Expand Down Expand Up @@ -364,7 +394,7 @@ func (s *SubmitTask) schedule(ctx context.Context, taskFunc harmonytask.AddTaskF
SectorNumber int64 `db:"sector_number"`
}

err := s.db.Select(ctx, &tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE after_encode = TRUE AND after_prove = TRUE AND after_submit = FALSE AND task_id_submit IS NULL`)
err := s.db.Select(ctx, &tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE failed = FALSE AND after_encode = TRUE AND after_prove = TRUE AND after_submit = FALSE AND task_id_submit IS NULL`)
if err != nil {
return false, xerrors.Errorf("getting tasks: %w", err)
}
Expand Down
Loading