Skip to content

Commit

Permalink
update sector terminate (#264)
Browse files Browse the repository at this point in the history
  • Loading branch information
LexLuthr authored Oct 10, 2024
1 parent ed9a6cb commit a898e6d
Showing 1 changed file with 113 additions and 32 deletions.
145 changes: 113 additions & 32 deletions web/api/sector/sector.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,30 @@ import (
"github.com/docker/go-units"
"github.com/gorilla/mux"
"github.com/hashicorp/go-multierror"
"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/samber/lo"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/builtin"
miner2 "github.com/filecoin-project/go-state-types/builtin/v13/miner"
"github.com/filecoin-project/go-state-types/builtin/v9/market"

"github.com/filecoin-project/curio/deps"
"github.com/filecoin-project/curio/lib/storiface"
"github.com/filecoin-project/curio/web/api/apihelper"

"github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/actors"
"github.com/filecoin-project/lotus/chain/actors/adt"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/cli/spcli"
)

const verifiedPowerGainMul = 9
Expand Down Expand Up @@ -73,23 +78,16 @@ func (c *cfg) terminateSectors(w http.ResponseWriter, r *http.Request) {
toDel[m] = append(toDel[m], sec{Sector: abi.SectorNumber(s.Sector), Terminate: false})
}

del, err := c.shouldTerminate(r.Context(), toDel)
// We should context.Background to avoid cancellation due to page reload or other possible scenarios
ctx := context.Background()
err := c.terminate(ctx, toDel)
apihelper.OrHTTPFail(w, err)

for m, sectorList := range del {
mi, err := c.Chain.StateMinerInfo(r.Context(), m.Addr, types.EmptyTSK)
apihelper.OrHTTPFail(w, err)
var term []int
for _, s := range sectorList {
if s.Terminate {
term = append(term, int(s.Sector))
}
}
_, err = spcli.TerminateSectors(r.Context(), c.Chain, m.Addr, term, mi.Worker)
apihelper.OrHTTPFail(w, err)
// Remove sectors
for m, sectorList := range toDel {
for _, s := range sectorList {
id := abi.SectorID{Miner: m.ID, Number: s.Sector}
apihelper.OrHTTPFail(w, c.removeSector(r.Context(), id))
apihelper.OrHTTPFail(w, c.removeSector(ctx, id))
}
}
}
Expand Down Expand Up @@ -440,27 +438,13 @@ func (c *cfg) shouldTerminate(ctx context.Context, smap map[minerDetail][]sec) (
return nil, err
}

list := bitfield.New()

if err := mas.ForEachDeadline(func(dlIdx uint64, dl miner.Deadline) error {
return dl.ForEachPartition(func(partIdx uint64, part miner.Partition) error {
live, err := part.LiveSectors()
if err != nil {
return err
}

list, err = bitfield.SubtractBitField(list, live)
if err != nil {
return err
}
return err
})
}); err != nil {
return nil, err
liveSectors, err := miner.AllPartSectors(mas, miner.Partition.LiveSectors)
if err != nil {
return nil, fmt.Errorf("getting live sector sets for miner %s: %w", m, err)
}

for i := range sectors {
ok, err := list.IsSet(uint64(sectors[i].Sector))
ok, err := liveSectors.IsSet(uint64(sectors[i].Sector))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -503,3 +487,100 @@ func (c *cfg) removeSector(ctx context.Context, sector abi.SectorID) error {

return err
}

const batchSize = 100

func (c *cfg) terminate(ctx context.Context, toDel map[minerDetail][]sec) error {
del, err := c.shouldTerminate(ctx, toDel)
if err != nil {
return err
}

var msgs []cid.Cid

for m, sectorNumbers := range del {
maddr := m.Addr
mi, err := c.Chain.StateMinerInfo(ctx, maddr, types.EmptyTSK)
if err != nil {
return err
}

var terminationDeclarationParams []miner2.TerminationDeclaration

// Get Deadline/Partition for all sectors.
for _, sector := range sectorNumbers {
sectorNum := sector.Sector
sectorbit := bitfield.New()
sectorbit.Set(uint64(sectorNum))

loca, err := c.Chain.StateSectorPartition(ctx, maddr, sectorNum, types.EmptyTSK)
if err != nil {
return fmt.Errorf("get state sector partition %s", err)
}

para := miner2.TerminationDeclaration{
Deadline: loca.Deadline,
Partition: loca.Partition,
Sectors: sectorbit,
}

terminationDeclarationParams = append(terminationDeclarationParams, para)
}

// Batch message for batchSize
var batches [][]miner2.TerminationDeclaration
for i := 0; i < len(terminationDeclarationParams); i += batchSize {
batch := terminationDeclarationParams[i:min(i+batchSize, len(terminationDeclarationParams))]
batches = append(batches, batch)
}

// Send messages for all batches
for _, batch := range batches {
terminateSectorParams := &miner2.TerminateSectorsParams{
Terminations: batch,
}

sp, errA := actors.SerializeParams(terminateSectorParams)
if errA != nil {
return xerrors.Errorf("serializing params: %w", errA)
}

smsg, err := c.Chain.MpoolPushMessage(ctx, &types.Message{
From: mi.Worker,
To: maddr,
Method: builtin.MethodsMiner.TerminateSectors,

Value: big.Zero(),
Params: sp,
}, nil)
if err != nil {
return xerrors.Errorf("mpool push message: %w", err)
}

msgs = append(msgs, smsg.Cid())

log.Infof("sent termination message: %s", smsg.Cid())
}
}

// wait for msgs to get mined into a block for all minerID
eg := errgroup.Group{}
eg.SetLimit(10)
for _, msg := range msgs {
m := msg
eg.Go(func() error {
wait, err := c.Chain.StateWaitMsg(ctx, m, 2, 2000, true)
if err != nil {
log.Errorf("timeout waiting for message to land on chain %s", wait.Message)
return fmt.Errorf("timeout waiting for message to land on chain %s", wait.Message)
}

if wait.Receipt.ExitCode.IsError() {
log.Errorf("failed to execute message %s: %d", wait.Message, wait.Receipt.ExitCode)
return fmt.Errorf("failed to execute message %s: %w", wait.Message, wait.Receipt.ExitCode)
}
return nil
})
}
return eg.Wait()
}

0 comments on commit a898e6d

Please sign in to comment.