Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

node: Modify error handling for CheckPending method in the Governor to prevent process restarts #4011

Closed
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 60 additions & 10 deletions node/pkg/governor/governor.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"fmt"
"math"
"math/big"
"strconv"
"sync"
"time"

Expand Down Expand Up @@ -651,10 +652,20 @@ func (gov *ChainGovernor) parseMsgAlreadyLocked(
return true, ce, token, payload, nil
}

// CheckPending is a wrapper method for CheckPendingForTime. It is called by the processor with the purpose of releasing
// queued transfers.
func (gov *ChainGovernor) CheckPending() ([]*common.MessagePublication, error) {
return gov.CheckPendingForTime(time.Now())
}

// CheckPendingForTime inspects the Governor's queued messages and releases them if:
// * the transfer has been in queue for more than 24 hours
// * the Governor's usage, calculated over the past 24h, has freed up since the message was queued.
// This method returns the MessagePublication of the transfer to be released. It modifies the governors chain entry
// fields, in particular the `pending` and `transfers` slices. It also stores a transfer record in the database when
// a transfer is released.
// WARNING: When this function returns an error, it propagates to the `processor` which in turn interprets this as a
// signal to RESTART THE NODE. Therefore, errors returned by this function effectively act as panics.
func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessagePublication, error) {
gov.mutex.Lock()
defer gov.mutex.Unlock()
Expand All @@ -669,15 +680,28 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
gov.msgsToPublish = nil
}

for _, ce := range gov.chains {
for chainId, ce := range gov.chains {
// Keep going as long as we find something that will fit.
for {
foundOne := false
prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This error statement is now impossible to hit, right? TrimAndSumValueForChain will always return a value instead of an error.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible, there's another error case for that function relating to overflow/underflow for int64 calculations. Much, much less likely though.

// An error here might mean that the sum of emitted transfers for a chain has exceeded
// the Governor's daily limit for this chain, or that the previous total value could
// not be calculated. Although this is a serious issue, we do not want to return an error
// here because it will cause the Governor to be restarted
// by the processor. Instead, skip processing pending transfers for this chain while
// continuing to process other chains as normal.
// Note that the sliding 24h window and/or incoming flow canceling transfers will
// reduce `prevTotalValue` over time. Therefore we expect this state to be temporary.
gov.logger.Error("error when attempting to trim and sum transfers", zap.Error(err))
gov.logger.Error("refusing to release transfers for this chain until Governor usage is reduced",
zap.Stringer("chainId", chainId),
zap.Uint64("prevTotalValue", prevTotalValue),
zap.Error(err))
gov.msgsToPublish = msgsToPublish
return nil, err
// Skip further processing for this chain entry
break
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not entirely sure about this handling, I'm wondering whether we'd be better off changing TrimAndSumValueForChain?

The invariant there isn't actually too hard to hit. If we peg the limit and a flow cancelling transfer is older than 24 hours and falls off the list, then the sum calculated will be greater than the daily limit for that chain. I don't think we can consider that an invariant in this case.

I think we should move that invariant check to before the trimming of old transfers happens? I believe it should hold in that case?

I think the idea of skipping processing for a chain in the event of an error is the right thing to do (rather than crashing the process), but ideally we don't hit this code path very often

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, another thought about the invariant, is the token entry price during this processing cached or is the computed value calculated on the fly? If we're close to the daily transfer limit and the token price is increased with a coingecko price update then we could hit the invariant also even though we were under the limit at the original processing time. Something to think about

Copy link
Contributor Author

@johnsaigle johnsaigle Jul 8, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback. I was also wondering how best to handle this.

I don't think we can consider that an invariant in this case.

I think you're right. Maybe this should be thought of more as a Warning with the idea that it's a temporary state that should be resolved by 24h going by or by incoming flow.

I think we should move that invariant check to before the trimming of old transfers happens?

As written it's one process - the sum happens at the same time as trim so we can't get prevTotalValue until after we sum. In theory we could store this in the node or split trimming and summing but it's more of a significant refactor.

is the token entry price during this processing cached or is the computed value calculated on the fly

Both, technically. I think the token Entry is refreshed when the process is restarted by reading from the mainnet token list. But then the computed value of a transfer is re-calculated on each iteration.

If we're close to the daily transfer limit and the token price is increased with a coingecko price update then we could hit the invariant also even though we were under the limit at the original processing time

I think that's correct, nice catch. With this additional case I'm in favour of not considering it as an invariant. I can refactor here to basically just prevent new transfers from going through when the Governor is at its limit.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with Dirk on this. I don't think it can be considered an invariant anymore with the changes we've made with flow canceling (and even prior to that).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tested this locally in Tilt with a violated invariant. It appears to log the error message and continue on, as opposed to just crash looping like before.

}

// Keep going until we find something that fits or hit the end.
Expand Down Expand Up @@ -734,7 +758,8 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
zap.Uint64("value", value),
zap.Uint64("prevTotalValue", prevTotalValue),
zap.Uint64("newTotalValue", newTotalValue),
zap.String("msgID", pe.dbData.Msg.MessageIDString()))
zap.String("msgID", pe.dbData.Msg.MessageIDString()),
zap.String("flowCancels", strconv.FormatBool(pe.token.flowCancels)))
}

payload, err := vaa.DecodeTransferPayloadHdr(pe.dbData.Msg.Payload)
Expand All @@ -746,7 +771,9 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
)
delete(gov.msgsSeen, pe.hash) // Rest of the clean up happens below.
} else {
// If we get here, publish it and remove it from the pending list.
// If we get here, publish it and move it from the pending list to the
// transfers list. Also add a flow-cancel transfer to the destination chain
// if the transfer is sending a flow-canceling asset.
msgsToPublish = append(msgsToPublish, &pe.dbData.Msg)

if countsTowardsTransfers {
Expand All @@ -762,17 +789,32 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
Hash: pe.hash,
}

if err := gov.db.StoreTransfer(&dbTransfer); err != nil {
gov.msgsToPublish = msgsToPublish
transfer, err := newTransferFromDbTransfer(&dbTransfer)
if err != nil {
// Should never occur unless dbTransfer.Value overflows MaxInt64
gov.logger.Error("could not convert dbTransfer to transfer",
zap.String("msgID", dbTransfer.MsgID),
zap.String("hash", pe.hash),
zap.Error(err),
)
// This causes the process to die. We don't want to process transfers that
// have USD value in excess of MaxInt64 under any circumstances.
// This check should occur before the call to the database so
// that we don't store a problematic transfer.
return nil, err
}

transfer, err := newTransferFromDbTransfer(&dbTransfer)
if err != nil {
if err := gov.db.StoreTransfer(&dbTransfer); err != nil {
gov.msgsToPublish = msgsToPublish
// This causes the process to die. We can't tolerate DB connection
// errors.
return nil, err
}

ce.transfers = append(ce.transfers, transfer)

gov.msgsSeen[pe.hash] = transferComplete

// Add inverse transfer to destination chain entry if this asset can cancel flows.
key := tokenKey{chain: dbTransfer.EmitterChain, addr: dbTransfer.EmitterAddress}
tokenEntry := gov.tokens[key]
Expand All @@ -781,7 +823,13 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
if tokenEntry.flowCancels {
if destinationChainEntry, ok := gov.chains[payload.TargetChain]; ok {
if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(&dbTransfer); err != nil {
return nil, err
gov.logger.Warn("could not add flow canceling transfer to destination chain",
zap.String("msgID", dbTransfer.MsgID),
zap.String("hash", pe.hash),
zap.Error(err),
)
// Process the next pending transfer
continue
}
} else {
gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist",
Expand All @@ -792,7 +840,6 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP
}
}
}
gov.msgsSeen[pe.hash] = transferComplete
} else {
delete(gov.msgsSeen, pe.hash)
}
Expand Down Expand Up @@ -844,6 +891,8 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) {
// into account. Therefore, this value should never be less than 0 and should never exceed the "Governor limit" for the chain.
// As a side-effect, this function modifies the parameter `emitter`, updating its `transfers` field so that it only includes
// filtered `Transfer`s (i.e. outgoing `Transfer`s newer than `startTime`).
// Returns an error if the sum cannit be calculated or if the sum exceeds the daily limit for the chain. If this happens,
// the transfers field will still be updated.
// SECURITY Invariant: The `sum` return value should never be less than 0
// SECURITY Invariant: The `sum` return value should never exceed the "Governor limit" for the chain
func (gov *ChainGovernor) TrimAndSumValueForChain(emitter *chainEntry, startTime time.Time) (sum uint64, err error) {
Expand Down Expand Up @@ -876,6 +925,7 @@ func (gov *ChainGovernor) TrimAndSumValueForChain(emitter *chainEntry, startTime
// TrimAndSumValue iterates over a slice of transfer structs. It filters out transfers that have Timestamp values that
// are earlier than the parameter `startTime`. The function then iterates over the remaining transfers, sums their Value,
// and returns the sum and the filtered transfers.
// As a side-effect, this function deletes transfers from the database if their Timestamp is before `startTime`.
// The `transfers` slice must be sorted by Timestamp. We expect this to be the case as transfers are added to the
// Governor in chronological order as they arrive. Note that `Timestamp` is created by the Governor; it is not read
// from the actual on-chain transaction.
Expand Down
10 changes: 7 additions & 3 deletions node/pkg/governor/governor_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,15 +243,19 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error {
}
ce.transfers = append(ce.transfers, transfer)

// Reload flow-cancel transfers for the TargetChain. This is important when node restarts so that a corresponding,
// inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` loop but
// that function does not capture flow-cancelling when the node is restarted.
// Reload flow-cancel transfers for the TargetChain. This is important when the node restarts so that a corresponding,
// inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` and
// `CheckPending` loops but those functions do not capture flow-cancelling when the node is restarted.
tokenEntry := gov.tokens[tk]
if tokenEntry != nil {
// Mandatory check to ensure that the token should be able to reduce the Governor limit.
if tokenEntry.flowCancels {
if destinationChainEntry, ok := gov.chains[xfer.TargetChain]; ok {
if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(xfer); err != nil {
gov.logger.Warn("could not add flow canceling transfer to destination chain",
zap.String("msgID", xfer.MsgID),
zap.String("hash", xfer.Hash), zap.Error(err),
)
return err
}
} else {
Expand Down
Loading