Skip to content

Commit

Permalink
Grpc Streaming Metrics (#1616)
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx authored Jun 3, 2024
1 parent e6b6683 commit 230c44e
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 45 deletions.
2 changes: 1 addition & 1 deletion protocol/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -1931,7 +1931,7 @@ func getGrpcStreamingManagerFromOptions(
) (manager streamingtypes.GrpcStreamingManager) {
if appFlags.GrpcStreamingEnabled {
logger.Info("GRPC streaming is enabled")
return streaming.NewGrpcStreamingManager()
return streaming.NewGrpcStreamingManager(logger)
}
return streaming.NewNoopGrpcStreamingManager()
}
14 changes: 9 additions & 5 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,13 @@ const (
GateWithdrawalsIfNegativeTncSubaccountSeenLatency = "gate_withdrawals_if_negative_tnc_subaccount_seen_latency"

// Full node grpc
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcEmitProtocolUpdateCount = "grpc_emit_protocol_update_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
GrpcStreamSubscriberCount = "grpc_stream_subscriber_count"

EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
)
90 changes: 51 additions & 39 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"sync"
"time"

"cosmossdk.io/log"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/gogoproto/proto"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
Expand All @@ -19,6 +20,8 @@ var _ types.GrpcStreamingManager = (*GrpcStreamingManagerImpl)(nil)
type GrpcStreamingManagerImpl struct {
sync.Mutex

logger log.Logger

// orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions.
orderbookSubscriptions map[uint32]*OrderbookSubscription
nextSubscriptionId uint32
Expand All @@ -36,8 +39,12 @@ type OrderbookSubscription struct {
srv clobtypes.Query_StreamOrderbookUpdatesServer
}

func NewGrpcStreamingManager() *GrpcStreamingManagerImpl {
func NewGrpcStreamingManager(
logger log.Logger,
) *GrpcStreamingManagerImpl {
logger = logger.With(log.ModuleKey, "grpc-streaming")
return &GrpcStreamingManagerImpl{
logger: logger,
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
}
}
Expand All @@ -46,6 +53,13 @@ func (sm *GrpcStreamingManagerImpl) Enabled() bool {
return true
}

func (sm *GrpcStreamingManagerImpl) EmitMetrics() {
metrics.SetGauge(
metrics.GrpcStreamSubscriberCount,
float32(len(sm.orderbookSubscriptions)),
)
}

// Subscribe subscribes to the orderbook updates stream.
func (sm *GrpcStreamingManagerImpl) Subscribe(
req clobtypes.StreamOrderbookUpdatesRequest,
Expand All @@ -70,7 +84,7 @@ func (sm *GrpcStreamingManagerImpl) Subscribe(

sm.orderbookSubscriptions[sm.nextSubscriptionId] = subscription
sm.nextSubscriptionId++

sm.EmitMetrics()
return nil
}

Expand Down Expand Up @@ -98,55 +112,30 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
updates[clobPairId].Messages = append(updates[clobPairId].Messages, message)
}

// Unmarshal messages to v1 updates.
v1updates := make(map[uint32][]ocutypes.OffChainUpdateV1)
// Unmarshal each per-clob pair message to v1 updates.
updatesByClobPairId := make(map[uint32][]clobtypes.StreamUpdate)
for clobPairId, update := range updates {
v1update, err := GetOffchainUpdatesV1(update)
v1updates, err := GetOffchainUpdatesV1(update)
if err != nil {
panic(err)
}
v1updates[clobPairId] = v1update
}

sm.Lock()
defer sm.Unlock()

// Send updates to subscribers.
idsToRemove := make([]uint32, 0)
for id, subscription := range sm.orderbookSubscriptions {
updatesToSend := make([]ocutypes.OffChainUpdateV1, 0)
for _, clobPairId := range subscription.clobPairIds {
if updates, ok := v1updates[clobPairId]; ok {
updatesToSend = append(updatesToSend, updates...)
}
}

if len(updatesToSend) > 0 {
streamUpdates := clobtypes.StreamUpdate{
updatesByClobPairId[clobPairId] = []clobtypes.StreamUpdate{
{
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Updates: updatesToSend,
Updates: v1updates,
Snapshot: snapshot,
},
},
}
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: []clobtypes.StreamUpdate{streamUpdates},
BlockHeight: blockHeight,
ExecMode: uint32(execMode),
},
); err != nil {
idsToRemove = append(idsToRemove, id)
}
},
}
}

// Clean up subscriptions that have been closed.
// If a Send update has failed for any clob pair id, the whole subscription will be removed.
for _, id := range idsToRemove {
delete(sm.orderbookSubscriptions, id)
}
sm.sendStreamUpdate(
updatesByClobPairId,
blockHeight,
execMode,
)
}

// SendOrderbookFillUpdates groups fills by their clob pair ids and
Expand Down Expand Up @@ -181,6 +170,24 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates(
updatesByClobPairId[clobPairId] = append(updatesByClobPairId[clobPairId], streamUpdate)
}

sm.sendStreamUpdate(
updatesByClobPairId,
blockHeight,
execMode,
)
}

// sendStreamUpdate takes in a map of clob pair id to stream updates and emits them to subscribers.
func (sm *GrpcStreamingManagerImpl) sendStreamUpdate(
updatesByClobPairId map[uint32][]clobtypes.StreamUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
metrics.IncrCounter(
metrics.GrpcEmitProtocolUpdateCount,
1,
)

sm.Lock()
defer sm.Unlock()

Expand All @@ -195,6 +202,10 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates(
}

if len(streamUpdatesForSubscription) > 0 {
metrics.IncrCounter(
metrics.GrpcSendResponseToSubscriberCount,
1,
)
if err := subscription.srv.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Updates: streamUpdatesForSubscription,
Expand All @@ -212,6 +223,7 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookFillUpdates(
for _, id := range idsToRemove {
delete(sm.orderbookSubscriptions, id)
}
sm.EmitMetrics()
}

// GetUninitializedClobPairIds returns the clob pair ids that have not been initialized.
Expand Down

0 comments on commit 230c44e

Please sign in to comment.