Skip to content

Commit

Permalink
fix: unsealed sector for synthetic PoRep (#295)
Browse files Browse the repository at this point in the history
* fix unsealed sector for synthetic PoRep

* fix nit
  • Loading branch information
LexLuthr authored Jan 13, 2025
1 parent e02713e commit a30f976
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 82 deletions.
189 changes: 110 additions & 79 deletions lib/ffi/sdr_funcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/lib/ffiselect"
"github.com/filecoin-project/curio/lib/proof"
storiface "github.com/filecoin-project/curio/lib/storiface"
"github.com/filecoin-project/curio/lib/storiface"

// TODO everywhere here that we call this we should call our proxy instead.
ffi "github.com/filecoin-project/filecoin-ffi"
Expand Down Expand Up @@ -525,111 +525,137 @@ func changePathType(path string, newType storiface.SectorFileType) (string, erro

return newPath, nil
}
func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed bool) error {
sectorPaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}
defer releaseSector()

func (sb *SealCalls) GenerateUnsealedSector(ctx context.Context, sector storiface.SectorRef, sectorPaths, pathIDs storiface.SectorPaths, keepUnsealed bool) error {
ssize, err := sector.ProofType.SectorSize()
if err != nil {
return xerrors.Errorf("getting sector size: %w", err)

}

if keepUnsealed {
// We are going to be moving the unsealed file, no need to allocate storage specifically for it
sectorPaths.Unsealed, err = changePathType(sectorPaths.Cache, storiface.FTUnsealed)
if err != nil {
return xerrors.Errorf("changing path type: %w", err)
// Return early if unsealed copy is not required
if !keepUnsealed {
return nil
}

// We are going to be moving the unsealed file, no need to allocate storage specifically for it
sectorPaths.Unsealed, err = changePathType(sectorPaths.Cache, storiface.FTUnsealed)
if err != nil {
return xerrors.Errorf("changing path type: %w", err)

}

pathIDs.Unsealed = pathIDs.Cache // this is just an uuid string

defer func() {
// We don't pass FTUnsealed to Acquire, so releaseSector won't declare it. Do it here.
if err := sb.sectors.sindex.StorageDeclareSector(ctx, storiface.ID(pathIDs.Unsealed), sector.ID, storiface.FTUnsealed, true); err != nil {
log.Errorf("declare unsealed sector error: %s", err)
}
}()

pathIDs.Unsealed = pathIDs.Cache // this is just an uuid string
// tree-d contains exactly unsealed data in the prefix, so
// * we move it to a temp file
// * we truncate the temp file to the sector size
// * we move the temp file to the unsealed location

defer func() {
// We don't pass FTUnsealed to Acquire, so releaseSector won't declare it. Do it here.
if err := sb.sectors.sindex.StorageDeclareSector(ctx, storiface.ID(pathIDs.Unsealed), sector.ID, storiface.FTUnsealed, true); err != nil {
log.Errorf("declare unsealed sector error: %+v", err)
}
}()

// tree-d contains exactly unsealed data in the prefix, so
// * we move it to a temp file
// * we truncate the temp file to the sector size
// * we move the temp file to the unsealed location

// temp path in cache where we'll move tree-d before truncating
// it is in the cache directory so that we can use os.Rename to move it
// to unsealed (which may be on a different filesystem)
tempUnsealed := filepath.Join(sectorPaths.Cache, storiface.SectorName(sector.ID))

_, terr := os.Stat(tempUnsealed)
tempUnsealedExists := terr == nil

// First handle an edge case where we have already gone through this step,
// but ClearCache or later steps failed. In that case we'll see tree-d missing and unsealed present

if _, err := os.Stat(filepath.Join(sectorPaths.Cache, proofpaths.TreeDName)); err != nil {
if os.IsNotExist(err) {
// check that unsealed exists and is the right size
st, err := os.Stat(sectorPaths.Unsealed)
if err != nil {
if os.IsNotExist(err) {
if tempUnsealedExists {
// unsealed file does not exist, but temp unsealed file does
// so we can just resume where the previous attempt left off
goto retryUnsealedMove
}
return xerrors.Errorf("neither unsealed file nor temp-unsealed file exists")
// temp path in cache where we'll move tree-d before truncating
// it is in the cache directory so that we can use os.Rename to move it
// to unsealed (which may be on a different filesystem)
tempUnsealed := filepath.Join(sectorPaths.Cache, storiface.SectorName(sector.ID))

_, terr := os.Stat(tempUnsealed)
tempUnsealedExists := terr == nil

// First handle an edge case where we have already gone through this step,
// in that case we'll see tree-d missing and unsealed present

if _, err := os.Stat(filepath.Join(sectorPaths.Cache, proofpaths.TreeDName)); err != nil {
if os.IsNotExist(err) {
// check that unsealed exists and is the right size
st, err := os.Stat(sectorPaths.Unsealed)
if err != nil {
if os.IsNotExist(err) {
if tempUnsealedExists {
// unsealed file does not exist, but temp unsealed file does
// so we can just resume where the previous attempt left off
return TruncateAndMoveUnsealed(tempUnsealed, sectorPaths.Unsealed, ssize)
}
return xerrors.Errorf("stat unsealed file: %w", err)
return xerrors.Errorf("neither unsealed file nor temp-unsealed file exists")

}
if st.Size() != int64(ssize) {
if tempUnsealedExists {
// unsealed file exists but is the wrong size, and temp unsealed file exists
// so we can just resume where the previous attempt left off with some cleanup
return xerrors.Errorf("stat unsealed file: %w", err)

if err := os.Remove(sectorPaths.Unsealed); err != nil {
return xerrors.Errorf("removing unsealed file from last attempt: %w", err)
}
}
if st.Size() != int64(ssize) {
if tempUnsealedExists {
// unsealed file exists but is the wrong size, and temp unsealed file exists
// so we can just resume where the previous attempt left off with some cleanup

if err := os.Remove(sectorPaths.Unsealed); err != nil {
return xerrors.Errorf("removing unsealed file from last attempt: %w", err)

goto retryUnsealedMove
}
return xerrors.Errorf("unsealed file is not the right size: %d != %d and temp unsealed is missing", st.Size(), ssize)

return TruncateAndMoveUnsealed(tempUnsealed, sectorPaths.Unsealed, ssize)

}
return xerrors.Errorf("unsealed file is not the right size: %d != %d and temp unsealed is missing", st.Size(), ssize)

// all good, just log that this edge case happened
log.Warnw("unsealed file exists but tree-d is missing, skipping move", "sector", sector.ID, "unsealed", sectorPaths.Unsealed, "cache", sectorPaths.Cache)
goto afterUnsealedMove
}
return xerrors.Errorf("stat tree-d file: %w", err)

// all good, just log that this edge case happened
log.Warnw("unsealed file exists but tree-d is missing, skipping move", "sector", sector.ID, "unsealed", sectorPaths.Unsealed, "cache", sectorPaths.Cache)
return nil
}
return xerrors.Errorf("stat tree-d file: %w", err)
}

// If the state in clean do the move
// If the state in clean do the move

// move tree-d to temp file
if err := os.Rename(filepath.Join(sectorPaths.Cache, proofpaths.TreeDName), tempUnsealed); err != nil {
return xerrors.Errorf("moving tree-d to temp file: %w", err)
}
// move tree-d to temp file
if err := os.Rename(filepath.Join(sectorPaths.Cache, proofpaths.TreeDName), tempUnsealed); err != nil {
return xerrors.Errorf("moving tree-d to temp file: %w", err)
}

retryUnsealedMove:
return TruncateAndMoveUnsealed(tempUnsealed, sectorPaths.Unsealed, ssize)
}

// truncate sealed file to sector size
if err := os.Truncate(tempUnsealed, int64(ssize)); err != nil {
return xerrors.Errorf("truncating unsealed file to sector size: %w", err)
}
func TruncateAndMoveUnsealed(tempUnsealed, unsealed string, ssize abi.SectorSize) error {
// truncate unsealed file to sector size
if err := os.Truncate(tempUnsealed, int64(ssize)); err != nil {
return xerrors.Errorf("truncating unsealed file to sector size: %w", err)
}

// move temp file to unsealed location
if err := paths.Move(tempUnsealed, sectorPaths.Unsealed); err != nil {
return xerrors.Errorf("move temp unsealed sector to final location (%s -> %s): %w", tempUnsealed, sectorPaths.Unsealed, err)
}
// move temp file to unsealed location
if err := paths.Move(tempUnsealed, unsealed); err != nil {
return xerrors.Errorf("move temp unsealed sector to final location (%s -> %s): %w", tempUnsealed, unsealed, err)
}
return nil
}

func (sb *SealCalls) FinalizeSector(ctx context.Context, sector storiface.SectorRef, keepUnsealed bool) error {
sectorPaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, nil, sector, storiface.FTCache, storiface.FTNone, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
}

defer releaseSector()

ssize, err := sector.ProofType.SectorSize()
if err != nil {
return xerrors.Errorf("getting sector size: %w", err)
}

afterUnsealedMove:
// If this is a synthetic proof sector then unsealed should already exist otherwise generate it
if abi.Synthetic[sector.ProofType] {
if err = ffi.ClearSyntheticProofs(uint64(ssize), sectorPaths.Cache); err != nil {
if err := ffi.ClearSyntheticProofs(uint64(ssize), sectorPaths.Cache); err != nil {
return xerrors.Errorf("Unable to delete Synth cache: %w", err)
}
} else {
if err := sb.GenerateUnsealedSector(ctx, sector, sectorPaths, pathIDs, keepUnsealed); err != nil {
return xerrors.Errorf("generating unsealed copy of the sector: %w", err)
}
}

if err := ffi.ClearCache(uint64(ssize), sectorPaths.Cache); err != nil {
Expand Down Expand Up @@ -753,7 +779,7 @@ func (sb *SealCalls) TreeD(ctx context.Context, sector storiface.SectorRef, unse
return nil
}

func (sb *SealCalls) SyntheticProofs(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, sealed cid.Cid, unsealed cid.Cid, randomness abi.SealRandomness, pieces []abi.PieceInfo) error {
func (sb *SealCalls) SyntheticProofs(ctx context.Context, task *harmonytask.TaskID, sector storiface.SectorRef, sealed cid.Cid, unsealed cid.Cid, randomness abi.SealRandomness, pieces []abi.PieceInfo, keepUnsealed bool) error {
fspaths, pathIDs, releaseSector, err := sb.sectors.AcquireSector(ctx, task, sector, storiface.FTCache|storiface.FTSealed, storiface.FTNone, storiface.PathSealing)
if err != nil {
return xerrors.Errorf("acquiring sector paths: %w", err)
Expand All @@ -780,6 +806,11 @@ func (sb *SealCalls) SyntheticProofs(ctx context.Context, task *harmonytask.Task
}
}

// Generate unsealed copy before clearing cache
if err = sb.GenerateUnsealedSector(ctx, sector, fspaths, pathIDs, keepUnsealed); err != nil {
return xerrors.Errorf("generating unsealed sector: %w", err)
}

if err = ffi.ClearCache(uint64(ssize), fspaths.Cache); err != nil {
return xerrors.Errorf("failed to clear cache for synthetic proof of sector %d of miner %d", sector.ID.Miner, sector.ID.Number)
}
Expand Down
2 changes: 1 addition & 1 deletion tasks/seal/task_finalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/filecoin-project/curio/harmony/taskhelp"
"github.com/filecoin-project/curio/lib/ffi"
"github.com/filecoin-project/curio/lib/slotmgr"
storiface "github.com/filecoin-project/curio/lib/storiface"
"github.com/filecoin-project/curio/lib/storiface"
)

type FinalizeTask struct {
Expand Down
10 changes: 8 additions & 2 deletions tasks/seal/task_synth_proofs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"github.com/filecoin-project/curio/lib/dealdata"
"github.com/filecoin-project/curio/lib/ffi"
"github.com/filecoin-project/curio/lib/paths"
storiface "github.com/filecoin-project/curio/lib/storiface"
"github.com/filecoin-project/curio/lib/storiface"
)

type SyntheticProofTask struct {
Expand Down Expand Up @@ -71,6 +71,12 @@ func (s *SyntheticProofTask) Do(taskID harmonytask.TaskID, stillOwned func() boo
return true, nil
}

var keepUnsealed bool

if err := s.db.QueryRow(ctx, `SELECT COALESCE(BOOL_OR(NOT data_delete_on_finalize), FALSE) FROM sectors_sdr_initial_pieces WHERE sp_id = $1 AND sector_number = $2`, sectorParams.SpID, sectorParams.SectorNumber).Scan(&keepUnsealed); err != nil {
return false, err
}

sealed, err := cid.Parse(sectorParams.SealedCID)
if err != nil {
return false, xerrors.Errorf("failed to parse sealed cid: %w", err)
Expand All @@ -94,7 +100,7 @@ func (s *SyntheticProofTask) Do(taskID harmonytask.TaskID, stillOwned func() boo
return false, xerrors.Errorf("getting deal data: %w", err)
}

err = s.sc.SyntheticProofs(ctx, &taskID, sref, sealed, unsealed, sectorParams.TicketValue, dealData.PieceInfos)
err = s.sc.SyntheticProofs(ctx, &taskID, sref, sealed, unsealed, sectorParams.TicketValue, dealData.PieceInfos, keepUnsealed)
if err != nil {
serr := resetSectorSealingState(ctx, sectorParams.SpID, sectorParams.SectorNumber, err, s.db, s.TypeDetails().Name)
if serr != nil {
Expand Down

0 comments on commit a30f976

Please sign in to comment.