diff --git a/protocol/app/app.go b/protocol/app/app.go index 865a6e47ce..a17b6e48bf 100644 --- a/protocol/app/app.go +++ b/protocol/app/app.go @@ -1931,7 +1931,7 @@ func getGrpcStreamingManagerFromOptions( ) (manager streamingtypes.GrpcStreamingManager) { if appFlags.GrpcStreamingEnabled { logger.Info("GRPC streaming is enabled") - return streaming.NewGrpcStreamingManager() + return streaming.NewGrpcStreamingManager(logger) } return streaming.NewNoopGrpcStreamingManager() } diff --git a/protocol/lib/metrics/metric_keys.go b/protocol/lib/metrics/metric_keys.go index bba17887a8..a9d2e36b4d 100644 --- a/protocol/lib/metrics/metric_keys.go +++ b/protocol/lib/metrics/metric_keys.go @@ -65,9 +65,13 @@ const ( GateWithdrawalsIfNegativeTncSubaccountSeenLatency = "gate_withdrawals_if_negative_tnc_subaccount_seen_latency" // Full node grpc - FullNodeGrpc = "full_node_grpc" - GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" - GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" - EndBlocker = "end_blocker" - EndBlockerLag = "end_blocker_lag" + FullNodeGrpc = "full_node_grpc" + GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" + GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency" + GrpcEmitProtocolUpdateCount = "grpc_emit_protocol_update_count" + GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count" + GrpcStreamSubscriberCount = "grpc_stream_subscriber_count" + + EndBlocker = "end_blocker" + EndBlockerLag = "end_blocker_lag" ) diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index a8ce6ad1f2..d60d5fb2f8 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -4,6 +4,7 @@ import ( "sync" "time" + "cosmossdk.io/log" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/gogoproto/proto" ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" @@ -19,6 +20,8 @@ var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil) type GrpcStreamingManagerImpl struct { sync.Mutex + logger log.Logger + // orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions. orderbookSubscriptions map[uint32]*OrderbookSubscription nextSubscriptionId uint32 @@ -36,8 +39,12 @@ type OrderbookSubscription struct { srv clobtypes.Query_StreamOrderbookUpdatesServer } -func NewGrpcStreamingManager() *GrpcStreamingManagerImpl { +func NewGrpcStreamingManager( + logger log.Logger, +) *GrpcStreamingManagerImpl { + logger = logger.With(log.ModuleKey, "grpc-streaming") return &GrpcStreamingManagerImpl{ + logger: logger, orderbookSubscriptions: make(map[uint32]*OrderbookSubscription), } } @@ -46,6 +53,13 @@ func (sm *GrpcStreamingManagerImpl) Enabled() bool { return true } +func (sm *GrpcStreamingManagerImpl) EmitMetrics() { + metrics.SetGauge( + metrics.GrpcStreamSubscriberCount, + float32(len(sm.orderbookSubscriptions)), + ) +} + // Subscribe subscribes to the orderbook updates stream. func (sm *GrpcStreamingManagerImpl) Subscribe( req clobtypes.StreamOrderbookUpdatesRequest, @@ -70,7 +84,7 @@ func (sm *GrpcStreamingManagerImpl) Subscribe( sm.orderbookSubscriptions[sm.nextSubscriptionId] = subscription sm.nextSubscriptionId++ - + sm.EmitMetrics() return nil } @@ -98,55 +112,30 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( updates[clobPairId].Messages = append(updates[clobPairId].Messages, message) } - // Unmarshal messages to v1 updates. - v1updates := make(map[uint32][]ocutypes.OffChainUpdateV1) + // Unmarshal each per-clob pair message to v1 updates. + updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate) for clobPairId, update := range updates { - v1update, err := GetOffchainUpdatesV1(update) + v1updates, err := GetOffchainUpdatesV1(update) if err != nil { panic(err) } - v1updates[clobPairId] = v1update - } - - sm.Lock() - defer sm.Unlock() - - // Send updates to subscribers. - idsToRemove := make([]uint32, 0) - for id, subscription := range sm.orderbookSubscriptions { - updatesToSend := make([]ocutypes.OffChainUpdateV1, 0) - for _, clobPairId := range subscription.clobPairIds { - if updates, ok := v1updates[clobPairId]; ok { - updatesToSend = append(updatesToSend, updates...) - } - } - - if len(updatesToSend) > 0 { - streamUpdates := clobtypes.StreamUpdate{ + updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{ + { UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ - Updates: updatesToSend, + Updates: v1updates, Snapshot: snapshot, }, }, - } - if err := subscription.srv.Send( - &clobtypes.StreamOrderbookUpdatesResponse{ - Updates: []clobtypes.StreamUpdate{streamUpdates}, - BlockHeight: blockHeight, - ExecMode: uint32(execMode), - }, - ); err != nil { - idsToRemove = append(idsToRemove, id) - } + }, } } - // Clean up subscriptions that have been closed. - // If a Send update has failed for any clob pair id, the whole subscription will be removed. - for _, id := range idsToRemove { - delete(sm.orderbookSubscriptions, id) - } + sm.sendStreamUpdate( + updatesByClobPairId, + blockHeight, + execMode, + ) } // SendOrderbookFillUpdates groups fills by their clob pair ids and @@ -181,6 +170,24 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate) } + sm.sendStreamUpdate( + updatesByClobPairId, + blockHeight, + execMode, + ) +} + +// sendStreamUpdate takes in a map of clob pair id to stream updates and emits them to subscribers. +func (sm *GrpcStreamingManagerImpl) sendStreamUpdate( + updatesByClobPairId map[uint32][]clobtypes.StreamUpdate, + blockHeight uint32, + execMode sdk.ExecMode, +) { + metrics.IncrCounter( + metrics.GrpcEmitProtocolUpdateCount, + 1, + ) + sm.Lock() defer sm.Unlock() @@ -195,6 +202,10 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( } if len(streamUpdatesForSubscription) > 0 { + metrics.IncrCounter( + metrics.GrpcSendResponseToSubscriberCount, + 1, + ) if err := subscription.srv.Send( &clobtypes.StreamOrderbookUpdatesResponse{ Updates: streamUpdatesForSubscription, @@ -212,6 +223,7 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates( for _, id := range idsToRemove { delete(sm.orderbookSubscriptions, id) } + sm.EmitMetrics() } // GetUninitializedClobPairIds returns the clob pair ids that have not been initialized.