-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
node: Modify error handling for CheckPending method in the Governor to prevent process restarts #4011
node: Modify error handling for CheckPending method in the Governor to prevent process restarts #4011
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -31,6 +31,7 @@ import ( | |
"fmt" | ||
"math" | ||
"math/big" | ||
"strconv" | ||
"sync" | ||
"time" | ||
|
||
|
@@ -651,10 +652,20 @@ func (gov *ChainGovernor) parseMsgAlreadyLocked( | |
return true, ce, token, payload, nil | ||
} | ||
|
||
// CheckPending is a wrapper method for CheckPendingForTime. It is called by the processor with the purpose of releasing | ||
// queued transfers. | ||
func (gov *ChainGovernor) CheckPending() ([]*common.MessagePublication, error) { | ||
return gov.CheckPendingForTime(time.Now()) | ||
} | ||
|
||
// CheckPendingForTime inspects the Governor's queued messages and releases them if: | ||
// * the transfer has been in queue for more than 24 hours | ||
// * the Governor's usage, calculated over the past 24h, has freed up since the message was queued. | ||
// This method returns the MessagePublication of the transfer to be released. It modifies the governors chain entry | ||
// fields, in particular the `pending` and `transfers` slices. It also stores a transfer record in the database when | ||
// a transfer is released. | ||
// WARNING: When this function returns an error, it propagates to the `processor` which in turn interprets this as a | ||
// signal to RESTART THE NODE. Therefore, errors returned by this function effectively act as panics. | ||
func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessagePublication, error) { | ||
gov.mutex.Lock() | ||
defer gov.mutex.Unlock() | ||
|
@@ -669,15 +680,28 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP | |
gov.msgsToPublish = nil | ||
} | ||
|
||
for _, ce := range gov.chains { | ||
for chainId, ce := range gov.chains { | ||
// Keep going as long as we find something that will fit. | ||
for { | ||
foundOne := false | ||
prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime) | ||
if err != nil { | ||
// An error here might mean that the sum of emitted transfers for a chain has exceeded | ||
// the Governor's daily limit for this chain, or that the previous total value could | ||
// not be calculated. Although this is a serious issue, we do not want to return an error | ||
// here because it will cause the Governor to be restarted | ||
// by the processor. Instead, skip processing pending transfers for this chain while | ||
// continuing to process other chains as normal. | ||
// Note that the sliding 24h window and/or incoming flow canceling transfers will | ||
// reduce `prevTotalValue` over time. Therefore we expect this state to be temporary. | ||
gov.logger.Error("error when attempting to trim and sum transfers", zap.Error(err)) | ||
gov.logger.Error("refusing to release transfers for this chain until Governor usage is reduced", | ||
zap.Stringer("chainId", chainId), | ||
zap.Uint64("prevTotalValue", prevTotalValue), | ||
zap.Error(err)) | ||
gov.msgsToPublish = msgsToPublish | ||
return nil, err | ||
// Skip further processing for this chain entry | ||
break | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not entirely sure about this handling, I'm wondering whether we'd be better off changing The invariant there isn't actually too hard to hit. If we peg the limit and a flow cancelling transfer is older than 24 hours and falls off the list, then the sum calculated will be greater than the daily limit for that chain. I don't think we can consider that an invariant in this case. I think we should move that invariant check to before the trimming of old transfers happens? I believe it should hold in that case? I think the idea of skipping processing for a chain in the event of an error is the right thing to do (rather than crashing the process), but ideally we don't hit this code path very often There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also, another thought about the invariant, is the token entry price during this processing cached or is the computed value calculated on the fly? If we're close to the daily transfer limit and the token price is increased with a coingecko price update then we could hit the invariant also even though we were under the limit at the original processing time. Something to think about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for the feedback. I was also wondering how best to handle this.
I think you're right. Maybe this should be thought of more as a Warning with the idea that it's a temporary state that should be resolved by 24h going by or by incoming flow.
As written it's one process - the sum happens at the same time as trim so we can't get
Both, technically. I think the token Entry is refreshed when the process is restarted by reading from the mainnet token list. But then the computed value of a transfer is re-calculated on each iteration.
I think that's correct, nice catch. With this additional case I'm in favour of not considering it as an invariant. I can refactor here to basically just prevent new transfers from going through when the Governor is at its limit. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with Dirk on this. I don't think it can be considered an invariant anymore with the changes we've made with flow canceling (and even prior to that). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I just tested this locally in Tilt with a violated invariant. It appears to log the error message and continue on, as opposed to just crash looping like before. |
||
} | ||
|
||
// Keep going until we find something that fits or hit the end. | ||
|
@@ -734,7 +758,8 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP | |
zap.Uint64("value", value), | ||
zap.Uint64("prevTotalValue", prevTotalValue), | ||
zap.Uint64("newTotalValue", newTotalValue), | ||
zap.String("msgID", pe.dbData.Msg.MessageIDString())) | ||
zap.String("msgID", pe.dbData.Msg.MessageIDString()), | ||
zap.String("flowCancels", strconv.FormatBool(pe.token.flowCancels))) | ||
} | ||
|
||
payload, err := vaa.DecodeTransferPayloadHdr(pe.dbData.Msg.Payload) | ||
|
@@ -746,7 +771,9 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP | |
) | ||
delete(gov.msgsSeen, pe.hash) // Rest of the clean up happens below. | ||
} else { | ||
// If we get here, publish it and remove it from the pending list. | ||
// If we get here, publish it and move it from the pending list to the | ||
// transfers list. Also add a flow-cancel transfer to the destination chain | ||
// if the transfer is sending a flow-canceling asset. | ||
msgsToPublish = append(msgsToPublish, &pe.dbData.Msg) | ||
|
||
if countsTowardsTransfers { | ||
|
@@ -762,17 +789,32 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP | |
Hash: pe.hash, | ||
} | ||
|
||
if err := gov.db.StoreTransfer(&dbTransfer); err != nil { | ||
gov.msgsToPublish = msgsToPublish | ||
transfer, err := newTransferFromDbTransfer(&dbTransfer) | ||
if err != nil { | ||
// Should never occur unless dbTransfer.Value overflows MaxInt64 | ||
gov.logger.Error("could not convert dbTransfer to transfer", | ||
zap.String("msgID", dbTransfer.MsgID), | ||
zap.String("hash", pe.hash), | ||
zap.Error(err), | ||
) | ||
// This causes the process to die. We don't want to process transfers that | ||
// have USD value in excess of MaxInt64 under any circumstances. | ||
// This check should occur before the call to the database so | ||
// that we don't store a problematic transfer. | ||
return nil, err | ||
} | ||
|
||
transfer, err := newTransferFromDbTransfer(&dbTransfer) | ||
if err != nil { | ||
if err := gov.db.StoreTransfer(&dbTransfer); err != nil { | ||
gov.msgsToPublish = msgsToPublish | ||
// This causes the process to die. We can't tolerate DB connection | ||
// errors. | ||
return nil, err | ||
} | ||
|
||
ce.transfers = append(ce.transfers, transfer) | ||
|
||
gov.msgsSeen[pe.hash] = transferComplete | ||
|
||
// Add inverse transfer to destination chain entry if this asset can cancel flows. | ||
key := tokenKey{chain: dbTransfer.EmitterChain, addr: dbTransfer.EmitterAddress} | ||
tokenEntry := gov.tokens[key] | ||
|
@@ -781,7 +823,13 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP | |
if tokenEntry.flowCancels { | ||
if destinationChainEntry, ok := gov.chains[payload.TargetChain]; ok { | ||
if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(&dbTransfer); err != nil { | ||
return nil, err | ||
gov.logger.Warn("could not add flow canceling transfer to destination chain", | ||
zap.String("msgID", dbTransfer.MsgID), | ||
zap.String("hash", pe.hash), | ||
zap.Error(err), | ||
) | ||
// Process the next pending transfer | ||
continue | ||
} | ||
} else { | ||
gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist", | ||
|
@@ -792,7 +840,6 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP | |
} | ||
} | ||
} | ||
gov.msgsSeen[pe.hash] = transferComplete | ||
} else { | ||
delete(gov.msgsSeen, pe.hash) | ||
} | ||
|
@@ -844,6 +891,8 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) { | |
// into account. Therefore, this value should never be less than 0 and should never exceed the "Governor limit" for the chain. | ||
// As a side-effect, this function modifies the parameter `emitter`, updating its `transfers` field so that it only includes | ||
// filtered `Transfer`s (i.e. outgoing `Transfer`s newer than `startTime`). | ||
// Returns an error if the sum cannit be calculated or if the sum exceeds the daily limit for the chain. If this happens, | ||
// the transfers field will still be updated. | ||
// SECURITY Invariant: The `sum` return value should never be less than 0 | ||
// SECURITY Invariant: The `sum` return value should never exceed the "Governor limit" for the chain | ||
func (gov *ChainGovernor) TrimAndSumValueForChain(emitter *chainEntry, startTime time.Time) (sum uint64, err error) { | ||
|
@@ -876,6 +925,7 @@ func (gov *ChainGovernor) TrimAndSumValueForChain(emitter *chainEntry, startTime | |
// TrimAndSumValue iterates over a slice of transfer structs. It filters out transfers that have Timestamp values that | ||
// are earlier than the parameter `startTime`. The function then iterates over the remaining transfers, sums their Value, | ||
// and returns the sum and the filtered transfers. | ||
// As a side-effect, this function deletes transfers from the database if their Timestamp is before `startTime`. | ||
// The `transfers` slice must be sorted by Timestamp. We expect this to be the case as transfers are added to the | ||
// Governor in chronological order as they arrive. Note that `Timestamp` is created by the Governor; it is not read | ||
// from the actual on-chain transaction. | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This error statement is now impossible to hit, right?
TrimAndSumValueForChain
will always return a value instead of an error.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's possible, there's another error case for that function relating to overflow/underflow for int64 calculations. Much, much less likely though.