diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index ac8e8a17d1e..ead248f516a 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -69,9 +69,8 @@ const ( FullNodeGrpc = "full_node_grpc" GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency" - GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency" + GrpcSendSubaccountUpdateCount = "grpc_send_subaccount_update_count" GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" - GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency" GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count" GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count" GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count" @@ -82,6 +81,7 @@ const ( GrpcStagedAllFinalizeBlockUpdatesCount = "grpc_staged_all_finalize_block_updates_count" GrpcStagedFillFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_fill_updates_count" GrpcStagedSubaccountFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_subaccount_updates_count" + SubscriptionId = "subscription_id" EndBlocker = "end_blocker" EndBlockerLag = "end_blocker_lag" diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 85c265f12e7..5aa9e6fc207 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -145,7 +145,7 @@ func (sm *FullNodeStreamingManagerImpl) Enabled() bool { } func (sm *FullNodeStreamingManagerImpl) EmitMetrics() { - metrics.SetGauge( + metrics.AddSample( metrics.GrpcStreamNumUpdatesBuffered, float32(len(sm.streamUpdateCache)), ) @@ -154,9 +154,10 @@ func (sm *FullNodeStreamingManagerImpl) EmitMetrics() { float32(len(sm.orderbookSubscriptions)), ) for _, subscription := range sm.orderbookSubscriptions { - metrics.AddSample( + metrics.AddSampleWithLabels( metrics.GrpcSubscriptionChannelLength, float32(len(subscription.updatesChannel)), + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)), ) } } @@ -226,9 +227,10 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( // Use current goroutine to consistently poll subscription channel for updates // to send through stream. for updates := range subscription.updatesChannel { - metrics.IncrCounter( + metrics.IncrCounterWithLabels( metrics.GrpcSendResponseToSubscriberCount, 1, + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)), ) err = subscription.messageSender.Send( &clobtypes.StreamOrderbookUpdatesResponse{ @@ -364,9 +366,17 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates( return } + metrics.IncrCounterWithLabels( + metrics.GrpcAddToSubscriptionChannelCount, + 1, + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscriptionId)), + ) + select { case subscription.updatesChannel <- streamUpdates: default: + // Buffer is full. Emit metric and drop subscription. + sm.EmitMetrics() sm.logger.Error( fmt.Sprintf( "Streaming subscription id %+v channel full capacity. Dropping subscription connection.", @@ -399,6 +409,11 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate( return } + metrics.IncrCounter( + metrics.GrpcSendSubaccountUpdateCount, + 1, + ) + // If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block. stagedEvent := clobtypes.StagedFinalizeBlockEvent{ Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{ @@ -710,32 +725,6 @@ func getStreamUpdatesForSubaccountUpdates( return streamUpdates, subaccountIds } -// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and -// sends messages to the subscribers. -func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates( - subaccountUpdates []satypes.StreamSubaccountUpdate, - blockHeight uint32, - execMode sdk.ExecMode, -) { - defer metrics.ModuleMeasureSince( - metrics.FullNodeGrpc, - metrics.GrpcSendFinalizedSubaccountUpdatesLatency, - time.Now(), - ) - - if execMode != sdk.ExecModeFinalize { - panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize") - } - - streamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates( - subaccountUpdates, - blockHeight, - execMode, - ) - - sm.AddSubaccountUpdatesToCache(streamUpdates, subaccountIds) -} - // AddOrderUpdatesToCache adds a series of updates to the full node streaming cache. // Clob pair ids are the clob pair id each update is relevant to. func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache( @@ -752,9 +741,9 @@ func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache( sm.cacheStreamUpdatesByClobPairWithLock(updates, clobPairIds) + sm.EmitMetrics() // Remove all subscriptions and wipe the buffer if buffer overflows. sm.RemoveSubscriptionsAndClearBufferIfFull() - sm.EmitMetrics() } // AddSubaccountUpdatesToCache adds a series of updates to the full node streaming cache. @@ -773,8 +762,8 @@ func (sm *FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache( sm.cacheStreamUpdatesBySubaccountWithLock(updates, subaccountIds) - sm.RemoveSubscriptionsAndClearBufferIfFull() sm.EmitMetrics() + sm.RemoveSubscriptionsAndClearBufferIfFull() } // RemoveSubscriptionsAndClearBufferIfFull removes all subscriptions and wipes the buffer if buffer overflows. @@ -790,6 +779,7 @@ func (sm *FullNodeStreamingManagerImpl) RemoveSubscriptionsAndClearBufferIfFull( } sm.streamUpdateCache = nil sm.streamUpdateSubscriptionCache = nil + sm.EmitMetrics() } } @@ -825,13 +815,16 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() { // If the buffer is full, drop the subscription. for id, updates := range subscriptionUpdates { if subscription, ok := sm.orderbookSubscriptions[id]; ok { - metrics.IncrCounter( + metrics.IncrCounterWithLabels( metrics.GrpcAddToSubscriptionChannelCount, 1, + metrics.GetLabelForIntValue(metrics.SubscriptionId, int(id)), ) select { case subscription.updatesChannel <- updates: default: + // Buffer is full. Emit metric and drop subscription. + sm.EmitMetrics() idsToRemove = append(idsToRemove, id) } } diff --git a/protocol/streaming/types/interface.go b/protocol/streaming/types/interface.go index 5b42864016e..33907fc1ecc 100644 --- a/protocol/streaming/types/interface.go +++ b/protocol/streaming/types/interface.go @@ -42,11 +42,6 @@ type FullNodeStreamingManager interface { takerOrder clobtypes.StreamTakerOrder, ctx sdk.Context, ) - SendFinalizedSubaccountUpdates( - subaccountUpdates []satypes.StreamSubaccountUpdate, - blockHeight uint32, - execMode sdk.ExecMode, - ) SendSubaccountUpdate( ctx sdk.Context, subaccountUpdate satypes.StreamSubaccountUpdate, diff --git a/protocol/x/subaccounts/keeper/subaccount.go b/protocol/x/subaccounts/keeper/subaccount.go index e72ccd60e7f..52d4c05bb33 100644 --- a/protocol/x/subaccounts/keeper/subaccount.go +++ b/protocol/x/subaccounts/keeper/subaccount.go @@ -825,19 +825,3 @@ func (k Keeper) GetAllRelevantPerpetuals( func (k Keeper) GetFullNodeStreamingManager() streamingtypes.FullNodeStreamingManager { return k.streamingManager } - -// SendFinalizedSubaccountUpdates sends the subaccount updates to the gRPC streaming manager. -func (k Keeper) SendFinalizedSubaccountUpdates( - ctx sdk.Context, - subaccountUpdates []types.StreamSubaccountUpdate, -) { - lib.AssertDeliverTxMode(ctx) - if len(subaccountUpdates) == 0 { - return - } - k.GetFullNodeStreamingManager().SendFinalizedSubaccountUpdates( - subaccountUpdates, - lib.MustConvertIntegerToUint32(ctx.BlockHeight()), - ctx.ExecMode(), - ) -}