From 78c01e97ad0bd93027e417a4cb06da7aac2ddba2 Mon Sep 17 00:00:00 2001 From: John Saigle Date: Fri, 5 Jul 2024 16:11:27 -0400 Subject: [PATCH 1/2] node: Modify error handling for CheckPending method in the Governor Previous rollouts of the Flow Cancel feature contained issues when calculating the Governor usage when usage was near the daily limit. This caused an invariant to be violated. However, this was propagated to the processor code and resulted in the processor restarting the entire process. Instead, the Governor should simply fail-closed and report that there is no remaining capacity, causing further VAAs to be queued until the usage diminishes over time. The circumstances leading to the invariant violations are not addressed in this commit. Instead this commit reworks the way errors are handled by the CheckPending, making careful choices about when the process should or should not be killed. - Change "invariant" error handling: instead of causing the process to die, log an error and skip further for a single chain while allowing processing for other chains to continue - Change other less severe error cases to log warnings instead of returning errors. - Generally prevent flow-cancel related issues from affecting normal Governor operations. Instead the flow cancel transfers should simply not be populated and thus result in "GovernorV1" behavior. - Add documentation to CheckPendingForTime to explain the dangers of returning an error - Reword error messages to be more precise and include more relevant fields. Add documentation explaining when the process should and should not die --- node/pkg/governor/governor.go | 70 +++++++++++++++++++++++++++----- node/pkg/governor/governor_db.go | 10 +++-- 2 files changed, 67 insertions(+), 13 deletions(-) diff --git a/node/pkg/governor/governor.go b/node/pkg/governor/governor.go index 13830fd006..2b85690670 100644 --- a/node/pkg/governor/governor.go +++ b/node/pkg/governor/governor.go @@ -31,6 +31,7 @@ import ( "fmt" "math" "math/big" + "strconv" "sync" "time" @@ -651,10 +652,20 @@ func (gov *ChainGovernor) parseMsgAlreadyLocked( return true, ce, token, payload, nil } +// CheckPending is a wrapper method for CheckPendingForTime. It is called by the processor with the purpose of releasing +// queued transfers. func (gov *ChainGovernor) CheckPending() ([]*common.MessagePublication, error) { return gov.CheckPendingForTime(time.Now()) } +// CheckPendingForTime inspects the Governor's queued messages and releases them if: +// * the transfer has been in queue for more than 24 hours +// * the Governor's usage, calculated over the past 24h, has freed up since the message was queued. +// This method returns the MessagePublication of the transfer to be released. It modifies the governors chain entry +// fields, in particular the `pending` and `transfers` slices. It also stores a transfer record in the database when +// a transfer is released. +// WARNING: When this function returns an error, it propagates to the `processor` which in turn interprets this as a +// signal to RESTART THE NODE. Therefore, errors returned by this function effectively act as panics. func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessagePublication, error) { gov.mutex.Lock() defer gov.mutex.Unlock() @@ -669,15 +680,28 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP gov.msgsToPublish = nil } - for _, ce := range gov.chains { + for chainId, ce := range gov.chains { // Keep going as long as we find something that will fit. for { foundOne := false prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime) if err != nil { + // An error here might mean that the sum of emitted transfers for a chain has exceeded + // the Governor's daily limit for this chain, or that the previous total value could + // not be calculated. Although this is a serious issue, we do not want to return an error + // here because it will cause the Governor to be restarted + // by the processor. Instead, skip processing pending transfers for this chain while + // continuing to process other chains as normal. + // Note that the sliding 24h window and/or incoming flow canceling transfers will + // reduce `prevTotalValue` over time. Therefore we expect this state to be temporary. gov.logger.Error("error when attempting to trim and sum transfers", zap.Error(err)) + gov.logger.Error("refusing to release transfers for this chain until Governor usage is reduced", + zap.Stringer("chainId", chainId), + zap.Uint64("prevTotalValue", prevTotalValue), + zap.Error(err)) gov.msgsToPublish = msgsToPublish - return nil, err + // Skip further processing for this chain entry + break } // Keep going until we find something that fits or hit the end. @@ -734,7 +758,8 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP zap.Uint64("value", value), zap.Uint64("prevTotalValue", prevTotalValue), zap.Uint64("newTotalValue", newTotalValue), - zap.String("msgID", pe.dbData.Msg.MessageIDString())) + zap.String("msgID", pe.dbData.Msg.MessageIDString()), + zap.String("flowCancels", strconv.FormatBool(pe.token.flowCancels))) } payload, err := vaa.DecodeTransferPayloadHdr(pe.dbData.Msg.Payload) @@ -746,7 +771,9 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP ) delete(gov.msgsSeen, pe.hash) // Rest of the clean up happens below. } else { - // If we get here, publish it and remove it from the pending list. + // If we get here, publish it and move it from the pending list to the + // transfers list. Also add a flow-cancel transfer to the destination chain + // if the transfer is sending a flow-canceling asset. msgsToPublish = append(msgsToPublish, &pe.dbData.Msg) if countsTowardsTransfers { @@ -762,17 +789,32 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP Hash: pe.hash, } - if err := gov.db.StoreTransfer(&dbTransfer); err != nil { - gov.msgsToPublish = msgsToPublish + transfer, err := newTransferFromDbTransfer(&dbTransfer) + if err != nil { + // Should never occur unless dbTransfer.Value overflows MaxInt64 + gov.logger.Error("could not convert dbTransfer to transfer", + zap.String("msgID", dbTransfer.MsgID), + zap.String("hash", pe.hash), + zap.Error(err), + ) + // This causes the process to die. We don't want to process transfers that + // have USD value in excess of MaxInt64 under any circumstances. + // This check should occur before the call to the database so + // that we don't store a problematic transfer. return nil, err } - transfer, err := newTransferFromDbTransfer(&dbTransfer) - if err != nil { + if err := gov.db.StoreTransfer(&dbTransfer); err != nil { + gov.msgsToPublish = msgsToPublish + // This causes the process to die. We can't tolerate DB connection + // errors. return nil, err } + ce.transfers = append(ce.transfers, transfer) + gov.msgsSeen[pe.hash] = transferComplete + // Add inverse transfer to destination chain entry if this asset can cancel flows. key := tokenKey{chain: dbTransfer.EmitterChain, addr: dbTransfer.EmitterAddress} tokenEntry := gov.tokens[key] @@ -781,7 +823,13 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP if tokenEntry.flowCancels { if destinationChainEntry, ok := gov.chains[payload.TargetChain]; ok { if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(&dbTransfer); err != nil { - return nil, err + gov.logger.Warn("could not add flow canceling transfer to destination chain", + zap.String("msgID", dbTransfer.MsgID), + zap.String("hash", pe.hash), + zap.Error(err), + ) + // Process the next pending transfer + continue } } else { gov.logger.Warn("tried to cancel flow but chain entry for target chain does not exist", @@ -792,7 +840,6 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP } } } - gov.msgsSeen[pe.hash] = transferComplete } else { delete(gov.msgsSeen, pe.hash) } @@ -844,6 +891,8 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) { // into account. Therefore, this value should never be less than 0 and should never exceed the "Governor limit" for the chain. // As a side-effect, this function modifies the parameter `emitter`, updating its `transfers` field so that it only includes // filtered `Transfer`s (i.e. outgoing `Transfer`s newer than `startTime`). +// Returns an error if the sum cannit be calculated or if the sum exceeds the daily limit for the chain. If this happens, +// the transfers field will still be updated. // SECURITY Invariant: The `sum` return value should never be less than 0 // SECURITY Invariant: The `sum` return value should never exceed the "Governor limit" for the chain func (gov *ChainGovernor) TrimAndSumValueForChain(emitter *chainEntry, startTime time.Time) (sum uint64, err error) { @@ -876,6 +925,7 @@ func (gov *ChainGovernor) TrimAndSumValueForChain(emitter *chainEntry, startTime // TrimAndSumValue iterates over a slice of transfer structs. It filters out transfers that have Timestamp values that // are earlier than the parameter `startTime`. The function then iterates over the remaining transfers, sums their Value, // and returns the sum and the filtered transfers. +// As a side-effect, this function deletes transfers from the database if their Timestamp is before `startTime`. // The `transfers` slice must be sorted by Timestamp. We expect this to be the case as transfers are added to the // Governor in chronological order as they arrive. Note that `Timestamp` is created by the Governor; it is not read // from the actual on-chain transaction. diff --git a/node/pkg/governor/governor_db.go b/node/pkg/governor/governor_db.go index a71fb194fa..9eb4f5b1de 100644 --- a/node/pkg/governor/governor_db.go +++ b/node/pkg/governor/governor_db.go @@ -243,15 +243,19 @@ func (gov *ChainGovernor) reloadTransfer(xfer *db.Transfer) error { } ce.transfers = append(ce.transfers, transfer) - // Reload flow-cancel transfers for the TargetChain. This is important when node restarts so that a corresponding, - // inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` loop but - // that function does not capture flow-cancelling when the node is restarted. + // Reload flow-cancel transfers for the TargetChain. This is important when the node restarts so that a corresponding, + // inverse transfer is added to the TargetChain. This is already done during the `ProcessMsgForTime` and + // `CheckPending` loops but those functions do not capture flow-cancelling when the node is restarted. tokenEntry := gov.tokens[tk] if tokenEntry != nil { // Mandatory check to ensure that the token should be able to reduce the Governor limit. if tokenEntry.flowCancels { if destinationChainEntry, ok := gov.chains[xfer.TargetChain]; ok { if err := destinationChainEntry.addFlowCancelTransferFromDbTransfer(xfer); err != nil { + gov.logger.Warn("could not add flow canceling transfer to destination chain", + zap.String("msgID", xfer.MsgID), + zap.String("hash", xfer.Hash), zap.Error(err), + ) return err } } else { From 19fdbe06b3afecaa11ee878849067e2a8efa1df1 Mon Sep 17 00:00:00 2001 From: John Saigle Date: Fri, 12 Jul 2024 10:49:38 -0400 Subject: [PATCH 2/2] node: Change Flow Cancel error handling logic - Remove 'invariant error' in TrimAndSumValueForChain as it can occur somewhat regularly with the addition of the flow cancel feature - Return dailyLimit in error condition rather than 0 so that future transfers will be queued - Do not cap the sum returned from TrimAndSumValueForChain: instead allow it to exceed the daily limit. - Modify unit tests to reflect this - Add unit tests for overflow/underflow scenarios in the TrimAndSumValue functions --- node/pkg/governor/governor.go | 49 +++++++------------ node/pkg/governor/governor_test.go | 75 +++++++++++++++++++++++++++--- 2 files changed, 85 insertions(+), 39 deletions(-) diff --git a/node/pkg/governor/governor.go b/node/pkg/governor/governor.go index 2b85690670..97e98afb19 100644 --- a/node/pkg/governor/governor.go +++ b/node/pkg/governor/governor.go @@ -686,16 +686,8 @@ func (gov *ChainGovernor) CheckPendingForTime(now time.Time) ([]*common.MessageP foundOne := false prevTotalValue, err := gov.TrimAndSumValueForChain(ce, startTime) if err != nil { - // An error here might mean that the sum of emitted transfers for a chain has exceeded - // the Governor's daily limit for this chain, or that the previous total value could - // not be calculated. Although this is a serious issue, we do not want to return an error - // here because it will cause the Governor to be restarted - // by the processor. Instead, skip processing pending transfers for this chain while - // continuing to process other chains as normal. - // Note that the sliding 24h window and/or incoming flow canceling transfers will - // reduce `prevTotalValue` over time. Therefore we expect this state to be temporary. gov.logger.Error("error when attempting to trim and sum transfers", zap.Error(err)) - gov.logger.Error("refusing to release transfers for this chain until Governor usage is reduced", + gov.logger.Error("refusing to release transfers for this chain until the sum can be correctly calculated", zap.Stringer("chainId", chainId), zap.Uint64("prevTotalValue", prevTotalValue), zap.Error(err)) @@ -883,42 +875,35 @@ func computeValue(amount *big.Int, token *tokenEntry) (uint64, error) { return value, nil } -// TrimAndSumValueForChain calculates the `sum` of `Transfer`s for a given chain `emitter`. In effect, it represents a +// TrimAndSumValueForChain calculates the `sum` of `Transfer`s for a given chain `chainEntry`. In effect, it represents a // chain's "Governor Usage" for a given 24 hour period. // This sum may be reduced by the sum of 'flow cancelling' transfers: that is, transfers of an allow-listed token // that have the `emitter` as their destination chain. // The resulting `sum` return value therefore represents the net flow across a chain when taking flow-cancelling tokens // into account. Therefore, this value should never be less than 0 and should never exceed the "Governor limit" for the chain. -// As a side-effect, this function modifies the parameter `emitter`, updating its `transfers` field so that it only includes +// As a side-effect, this function modifies the parameter `chainEntry`, updating its `transfers` field so that it only includes // filtered `Transfer`s (i.e. outgoing `Transfer`s newer than `startTime`). -// Returns an error if the sum cannit be calculated or if the sum exceeds the daily limit for the chain. If this happens, -// the transfers field will still be updated. +// Returns an error if the sum cannot be calculated. The transfers field will still be updated in this case. When +// an error condition occurs, this function returns the chain's `dailyLimit` as the sum. This should result in the +// chain appearing at maximum capacity from the perspective of the Governor, and therefore cause new transfers to be +// queued until space opens up. // SECURITY Invariant: The `sum` return value should never be less than 0 -// SECURITY Invariant: The `sum` return value should never exceed the "Governor limit" for the chain -func (gov *ChainGovernor) TrimAndSumValueForChain(emitter *chainEntry, startTime time.Time) (sum uint64, err error) { - // Sum the value of all outgoing transfers - var sumOutgoing int64 - sumOutgoing, emitter.transfers, err = gov.TrimAndSumValue(emitter.transfers, startTime) +func (gov *ChainGovernor) TrimAndSumValueForChain(chainEntry *chainEntry, startTime time.Time) (sum uint64, err error) { + // Sum the value of all transfers for this chain. This sum can be negative if flow-cancelling is enabled + // and the incoming value of flow-cancelling assets exceeds the summed value of all outgoing assets. + var sumValue int64 + sumValue, chainEntry.transfers, err = gov.TrimAndSumValue(chainEntry.transfers, startTime) if err != nil { - return 0, err + // Return the daily limit as the sum so that any further transfers will be queued. + return chainEntry.dailyLimit, err } - // Return early if the sum is not positive as it cannot exceed the daily limit. - // In this case, return 0 even if the sum is negative. - if sumOutgoing <= 0 { + // Return 0 even if the sum is negative. + if sumValue <= 0 { return 0, nil } - sum = uint64(sumOutgoing) - if sum > emitter.dailyLimit { - return 0, fmt.Errorf( - "invariant violation: calculated sum %d exceeds Governor limit %d", - sum, - emitter.dailyLimit, - ) - } - - return sum, nil + return uint64(sumValue), nil } diff --git a/node/pkg/governor/governor_test.go b/node/pkg/governor/governor_test.go index 964ec53345..bb6f414129 100644 --- a/node/pkg/governor/governor_test.go +++ b/node/pkg/governor/governor_test.go @@ -360,10 +360,12 @@ func TestFlowCancelCannotUnderflow(t *testing.T) { assert.Zero(t, usage) } -// Simulate a case where the total sum of transfers for a chain in a 24 hour period exceeds -// the configured Governor limit. This should never happen, so we make sure that an error -// is returned if the system is in this state -func TestInvariantGovernorLimit(t *testing.T) { +// We never expect this to occur when flow-cancelling is disabled. If flow-cancelling is enabled, there +// are some cases where the outgoing value exceeds the daily limit. Example: a large, incoming transfer +// of a flow-cancelling asset increases the Governor capacity beyond the daily limit. After 24h, that +// transfer is trimmed. This reduces the daily limit back to normal, but by this time more outgoing +// transfers have been emitted, causing the sum to exceed the daily limit. +func TestChainEntrySumExceedsDailyLimit(t *testing.T) { ctx := context.Background() gov, err := newChainGovernorForTest(ctx) require.NoError(t, err) @@ -406,10 +408,69 @@ func TestInvariantGovernorLimit(t *testing.T) { assert.Equal(t, expectedNumTransfers, len(transfers)) assert.NotZero(t, sum) - // Make sure we trigger the Invariant usage, err := gov.TrimAndSumValueForChain(emitter, now.Add(-time.Hour*24)) - require.ErrorContains(t, err, "invariant violation: calculated sum") - assert.Zero(t, usage) + require.NoError(t, err) + assert.Equal(t, emitterTransferValue*uint64(expectedNumTransfers), usage) +} + +func TestTrimAndSumValueOverflowErrors(t *testing.T) { + ctx := context.Background() + gov, err := newChainGovernorForTest(ctx) + require.NoError(t, err) + assert.NotNil(t, gov) + + now, err := time.Parse("2006-Jan-02", "2024-Feb-19") + require.NoError(t, err) + + var transfers_from_emitter []transfer + transferTime, err := time.Parse("2006-Jan-02", "2024-Feb-19") + require.NoError(t, err) + + emitterChainId := vaa.ChainIDSolana + + transfer, err := newTransferFromDbTransfer(&db.Transfer{Value: math.MaxInt64, Timestamp: transferTime}) + require.NoError(t, err) + transfer2, err := newTransferFromDbTransfer(&db.Transfer{Value: 1, Timestamp: transferTime}) + require.NoError(t, err) + transfers_from_emitter = append(transfers_from_emitter, transfer, transfer2) + + // Populate chainEntry and ChainGovernor + emitter := &chainEntry{ + transfers: transfers_from_emitter, + emitterChainId: vaa.ChainID(emitterChainId), + dailyLimit: 10000, + } + gov.chains[emitter.emitterChainId] = emitter + + sum, _, err := gov.TrimAndSumValue(emitter.transfers, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer overflow") + assert.Zero(t, sum) + usage, err := gov.TrimAndSumValueForChain(emitter, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer overflow") + assert.Equal(t, uint64(10000), usage) + + // overwrite emitter (discard transfer added above) + emitter = &chainEntry{ + emitterChainId: vaa.ChainID(emitterChainId), + dailyLimit: 10000, + } + gov.chains[emitter.emitterChainId] = emitter + + // Now test underflow + transfer3 := &db.Transfer{Value: math.MaxInt64, Timestamp: transferTime, TargetChain: vaa.ChainIDSolana} + + ce := gov.chains[emitter.emitterChainId] + err = ce.addFlowCancelTransferFromDbTransfer(transfer3) + require.NoError(t, err) + err = ce.addFlowCancelTransferFromDbTransfer(transfer3) + require.NoError(t, err) + + sum, _, err = gov.TrimAndSumValue(emitter.transfers, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer underflow") + assert.Zero(t, sum) + usage, err = gov.TrimAndSumValueForChain(emitter, now.Add(-time.Hour*24)) + require.ErrorContains(t, err, "integer underflow") + assert.Equal(t, uint64(10000), usage) } func TestTrimOneOfTwoTransfers(t *testing.T) {