Skip to content

Commit

Permalink
node: Add additional metrics for Governor status
Browse files Browse the repository at this point in the history
Modify the monitoring code and protobuf files to make the status of the
Governor more legible when flow-canceling is enabled. This can be
consumed by Wormhole Dashboard to better reflect the effects of flow
cancelling.

On the level of the Governor:
- whether the Guardian has enabled flow cancel or not

On the level of the Governor's emitters, reports 24h metrics for:
- net value that has moved across the chain
- total outgoing amount
- total incoming flow cancel amount

Currently big transfers are not accounted for as they do not affect the
Governor's capacity. (They are always queued.)
  • Loading branch information
johnsaigle committed Jul 16, 2024
1 parent 013d79a commit 9d3cbf0
Show file tree
Hide file tree
Showing 4 changed files with 206 additions and 100 deletions.
169 changes: 112 additions & 57 deletions node/pkg/governor/governor_monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,14 @@ func (gov *ChainGovernor) Status() (resp string) {
startTime := time.Now().Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))

for _, ce := range gov.chains {
valueTrans, err := sumValue(ce.transfers, startTime)
netValue, _, _, err := sumValue(ce.transfers, startTime)
if err != nil {
// We don't want to actually return an error or otherwise stop
// execution in this case. Instead of propagating the error here, print the contents of the
// error message.
return fmt.Sprintf("chain: %v, dailyLimit: OVERFLOW. error: %s", ce.emitterChainId, err)
}
s1 := fmt.Sprintf("chain: %v, dailyLimit: %v, total: %v, numPending: %v", ce.emitterChainId, ce.dailyLimit, valueTrans, len(ce.pending))
s1 := fmt.Sprintf("chain: %v, dailyLimit: %v, total: %v, numPending: %v", ce.emitterChainId, ce.dailyLimit, netValue, len(ce.pending))
resp += s1 + "\n"
gov.logger.Info(s1)
if len(ce.pending) != 0 {
Expand Down Expand Up @@ -245,74 +245,112 @@ func (gov *ChainGovernor) resetReleaseTimerForTime(vaaId string, now time.Time,
}

return "", fmt.Errorf("vaa not found in the pending list")

}

// sumValue sums the value of all `transfers`. See also `TrimAndSumValue`.
// sumValue sums the value of all `transfers`, returning separate fields for:
// - the net sum of all outgoing small tranasfers minus flow cancel sum
// - the sum of all outgoing small tranasfers
// - the sum of all incoming flow-cancelling transfers
// NOTE these sums exclude "big transfers" as they are always queued for 24h and are never added to the chain entry's 'transfers' field.
// Returns an error if the sum of all transfers would overflow the bounds of Int64. In this case, the function
// returns a value of 0.
func sumValue(transfers []transfer, startTime time.Time) (uint64, error) {
func sumValue(transfers []transfer, startTime time.Time) (netNotional int64, smallTxOutgoingNotional uint64, flowCancelNotional uint64, err error) {
if len(transfers) == 0 {
return 0, nil
return 0, 0, 0, nil
}

var sum int64
// Sum of all outgoing small tranasfers minus incoming flow cancel transfers. Big transfers are excluded
netNotional = int64(0)
smallTxOutgoingNotional = uint64(0)
flowCancelNotional = uint64(0)

for _, t := range transfers {
if t.dbTransfer.Timestamp.Before(startTime) {
continue
}
checkedSum, err := CheckedAddInt64(sum, t.value)
checkedSum, err := CheckedAddInt64(netNotional, t.value)
if err != nil {
// We have to stop and return an error here (rather than saturate, for example). The
// transfers are not sorted by value so we can't make any guarantee on the final value
// if we hit the upper or lower bound. We don't expect this to happen in any case.
return 0, err
return 0, 0, 0, err
}
netNotional = checkedSum
if t.value < 0 {
// If a transfer is negative then it is an incoming, flow-cancelling transfer.
// We can use the dbTransfer.Value for calculating the sum because it is the unsigned version
// of t.Value
flowCancelNotional += t.dbTransfer.Value
} else {
smallTxOutgoingNotional += t.dbTransfer.Value
}
sum = checkedSum
}

// Do not return negative values. Instead, saturate to zero.
if sum <= 0 {
return 0, nil
}

return uint64(sum), nil
return netNotional, smallTxOutgoingNotional, flowCancelNotional, nil
}

// REST query to get the current available notional value per chain. This is defined as the sum of all transfers
// subtracted from the chains's dailyLimit.
// The available notional limit by chain represents the remaining capacity of a chain. As a result, it should not be
// a negative number: we don't want to represent that there is "negative value" available.
func (gov *ChainGovernor) GetAvailableNotionalByChain() (resp []*publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry) {
gov.mutex.Lock()
defer gov.mutex.Unlock()

startTime := time.Now().Add(-time.Minute * time.Duration(gov.dayLengthInMinutes))

// Iterate deterministically by accessing keys from this slice instead of the chainEntry map directly
for _, chainId := range gov.chainIds {
ce := gov.chains[chainId]
value, err := sumValue(ce.transfers, startTime)
netUsage, _, incoming, err := sumValue(ce.transfers, startTime)
if err != nil {
// Don't return an error here, just return 0.
gov.logger.Error("GetAvailableNotionalByChain: failed to compute sum of transfers for chain entry", zap.String("chainID", chainId.String()), zap.Error(err))
return make([]*publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry, 0)
// Report 0 available notional if we can't calculate the current usage
gov.logger.Error("GetAvailableNotionalByChain: failed to compute sum of transfers for chain entry",
zap.String("chainID", chainId.String()),
zap.Error(err))
resp = append(resp, &publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry{
ChainId: uint32(ce.emitterChainId),
RemainingAvailableNotional: 0,
NotionalLimit: ce.dailyLimit,
BigTransactionSize: ce.bigTransactionSize,
})
continue
}
if value >= ce.dailyLimit {
value = 0
} else {
value = ce.dailyLimit - value

remaining := gov.availableNotionalValue(chainId, netUsage)

if !gov.flowCancelEnabled {
// When flow cancel is disabled, we expect that both the netUsage and remaining notional should be
// within the range of [0, dailyLimit]. Flow cancel allows flexibility here. netUsage may be
// negative if there is a lot of incoming flow; conversely, it may exceed dailyLimit if incoming
// flow added space, allowed additional transfers through, and then expired after 24h.
if netUsage < 0 || incoming != 0 {
gov.logger.Warn("GetAvailableNotionalByChain: net value for chain is negative even though flow cancel is disabled",
zap.String("chainID", chainId.String()),
zap.Uint64("dailyLimit", ce.dailyLimit),
zap.Int64("netUsage", netUsage),
zap.Error(err))
} else if uint64(netUsage) > ce.dailyLimit {
gov.logger.Warn("GetAvailableNotionalByChain: net value for chain exceeds daily limit even though flow cancel is disabled",
zap.String("chainID", chainId.String()),
zap.Uint64("dailyLimit", ce.dailyLimit),
zap.Error(err))
}

resp = append(resp, &publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry{
ChainId: uint32(ce.emitterChainId),
RemainingAvailableNotional: remaining,
NotionalLimit: ce.dailyLimit,
BigTransactionSize: ce.bigTransactionSize,
})
}

resp = append(resp, &publicrpcv1.GovernorGetAvailableNotionalByChainResponse_Entry{
ChainId: uint32(ce.emitterChainId),
RemainingAvailableNotional: value,
NotionalLimit: ce.dailyLimit,
BigTransactionSize: ce.bigTransactionSize,
sort.SliceStable(resp, func(i, j int) bool {
return (resp[i].ChainId < resp[j].ChainId)
})
}

sort.SliceStable(resp, func(i, j int) bool {
return (resp[i].ChainId < resp[j].ChainId)
})

return resp
}

Expand Down Expand Up @@ -372,6 +410,25 @@ func (gov *ChainGovernor) IsVAAEnqueued(msgId *publicrpcv1.MessageID) (bool, err
return false, nil
}

// availableNotionalValue calculates the available notional USD value for a chain entry based on the net value
// of the chain.
func (gov *ChainGovernor) availableNotionalValue(id vaa.ChainID, netUsage int64) uint64 {
remaining := uint64(0)
ce := gov.chains[id]

// Handle negative case here so we can safely cast to uint64 below
if netUsage < 0 {
// The full capacity is available for the chain.
remaining = ce.dailyLimit
} else if uint64(netUsage) > ce.dailyLimit {
remaining = 0
} else {
remaining = ce.dailyLimit - uint64(netUsage)
}

return remaining
}

// REST query to get the list of tokens being monitored by the governor.
func (gov *ChainGovernor) GetTokenList() []*publicrpcv1.GovernorGetTokenListResponse_Entry {
gov.mutex.Lock()
Expand Down Expand Up @@ -448,22 +505,21 @@ func (gov *ChainGovernor) CollectMetrics(hb *gossipv1.Heartbeat, sendC chan<- []

if exists {
enabled = "1"
value, err := sumValue(ce.transfers, startTime)
netUsage, _, _, err := sumValue(ce.transfers, startTime)

remaining := uint64(0)
if err != nil {
// Error can occur if the sum overflows. Return 0 in this case rather than returning an
// error.
gov.logger.Error("CollectMetrics: failed to compute sum of transfers for chain entry", zap.String("chain", chain.String()), zap.Error(err))
value = 0
}
if value >= ce.dailyLimit {
value = 0
remaining = 0
} else {
value = ce.dailyLimit - value
remaining = gov.availableNotionalValue(chain, netUsage)
}

pending := len(ce.pending)
totalNotional = fmt.Sprint(ce.dailyLimit)
available = float64(value)
available = float64(remaining)
numPending = float64(pending)
totalPending += pending
}
Expand Down Expand Up @@ -523,11 +579,12 @@ func (gov *ChainGovernor) publishConfig(hb *gossipv1.Heartbeat, sendC chan<- []b

gov.configPublishCounter += 1
payload := &gossipv1.ChainGovernorConfig{
NodeName: hb.NodeName,
Counter: gov.configPublishCounter,
Timestamp: hb.Timestamp,
Chains: chains,
Tokens: tokens,
NodeName: hb.NodeName,
Counter: gov.configPublishCounter,
Timestamp: hb.Timestamp,
Chains: chains,
Tokens: tokens,
FlowCancelEnabled: gov.flowCancelEnabled,
}

b, err := proto.Marshal(payload)
Expand Down Expand Up @@ -563,18 +620,13 @@ func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan<- []b
numEnqueued := 0
for chainId, ce := range gov.chains {
// The capacity for the chain to emit further messages, denoted as USD value.
remainingAvailableNotional := uint64(0)
// A chain's governor usage is the sum of all outgoing transfers and incoming flow-cancelling transfers
governorUsage, err := sumValue(ce.transfers, startTime)
remaining := uint64(0)
netUsage, smallTxNotional, flowCancelNotional, err := sumValue(ce.transfers, startTime)

if err != nil {
// In case of error, set remainingAvailableNotional to 0 rather than returning an error to the caller. An error
// here means sumValue has encountered an overflow and this should never happen. Even if it did
// we don't want to stop execution here.
gov.logger.Error("publishStatus: failed to compute sum of transfers for chain entry", zap.String("chain", chainId.String()), zap.Error(err))
} else if governorUsage < ce.dailyLimit {
// `remainingAvailableNotional` is 0 unless the current usage is strictly less than the limit.
remainingAvailableNotional = ce.dailyLimit - governorUsage
} else {
remaining = gov.availableNotionalValue(chainId, netUsage)
}

enqueuedVaas := make([]*gossipv1.ChainGovernorStatus_EnqueuedVAA, 0)
Expand All @@ -597,14 +649,17 @@ func (gov *ChainGovernor) publishStatus(hb *gossipv1.Heartbeat, sendC chan<- []b
}

emitter := gossipv1.ChainGovernorStatus_Emitter{
EmitterAddress: "0x" + ce.emitterAddr.String(),
TotalEnqueuedVaas: uint64(len(ce.pending)),
EnqueuedVaas: enqueuedVaas,
EmitterAddress: "0x" + ce.emitterAddr.String(),
TotalEnqueuedVaas: uint64(len(ce.pending)),
EnqueuedVaas: enqueuedVaas,
SmallTxNetNotionalValue: netUsage,
SmallTxOutgingNotionalValue: smallTxNotional,
FlowCancelNotionalValue: flowCancelNotional,
}

chains = append(chains, &gossipv1.ChainGovernorStatus_Chain{
ChainId: uint32(ce.emitterChainId),
RemainingAvailableNotional: remainingAvailableNotional,
RemainingAvailableNotional: remaining,
Emitters: []*gossipv1.ChainGovernorStatus_Emitter{&emitter},
})
}
Expand Down
4 changes: 2 additions & 2 deletions node/pkg/governor/governor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2322,9 +2322,9 @@ func TestLargeTransactionGetsEnqueuedAndReleasedWhenTheTimerExpires(t *testing.T
// But the big transaction should not affect the daily notional.
ce, exists := gov.chains[vaa.ChainIDEthereum]
require.Equal(t, true, exists)
valueTrans, err = sumValue(ce.transfers, now)
_, _, outgoing, err := sumValue(ce.transfers, now)
require.NoError(t, err)
assert.Equal(t, uint64(0), valueTrans)
assert.Equal(t, uint64(0), outgoing)
}

func TestSmallTransactionsGetReleasedWhenTheTimerExpires(t *testing.T) {
Expand Down
Loading

0 comments on commit 9d3cbf0

Please sign in to comment.