Skip to content

Commit

Permalink
few more metrics emissions, tag some metrics by subscription ids, rem…
Browse files Browse the repository at this point in the history
…ove unused code
  • Loading branch information
jonfung-dydx committed Oct 16, 2024
1 parent 857eaa1 commit 5d13842
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 55 deletions.
4 changes: 2 additions & 2 deletions protocol/lib/metrics/metric_keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ const (
FullNodeGrpc = "full_node_grpc"
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency"
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency"
GrpcSendSubaccountSnapshotLatency = "grpc_send_subaccount_snapshot_latency"
GrpcSendSubaccountUpdateCount = "grpc_send_subaccount_update_count"
GrpcSendOrderbookFillsLatency = "grpc_send_orderbook_fills_latency"
GrpcSendFinalizedSubaccountUpdatesLatency = "grpc_send_finalized_subaccount_updates_latency"
GrpcAddUpdateToBufferCount = "grpc_add_update_to_buffer_count"
GrpcAddToSubscriptionChannelCount = "grpc_add_to_subscription_channel_count"
GrpcSendResponseToSubscriberCount = "grpc_send_response_to_subscriber_count"
Expand All @@ -82,6 +81,7 @@ const (
GrpcStagedAllFinalizeBlockUpdatesCount = "grpc_staged_all_finalize_block_updates_count"
GrpcStagedFillFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_fill_updates_count"
GrpcStagedSubaccountFinalizeBlockUpdatesCount = "grpc_staged_finalize_block_subaccount_updates_count"
SubscriptionId = "subscription_id"

EndBlocker = "end_blocker"
EndBlockerLag = "end_blocker_lag"
Expand Down
57 changes: 25 additions & 32 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (sm *FullNodeStreamingManagerImpl) Enabled() bool {
}

func (sm *FullNodeStreamingManagerImpl) EmitMetrics() {
metrics.SetGauge(
metrics.AddSample(
metrics.GrpcStreamNumUpdatesBuffered,
float32(len(sm.streamUpdateCache)),
)
Expand All @@ -154,9 +154,10 @@ func (sm *FullNodeStreamingManagerImpl) EmitMetrics() {
float32(len(sm.orderbookSubscriptions)),
)
for _, subscription := range sm.orderbookSubscriptions {
metrics.AddSample(
metrics.AddSampleWithLabels(
metrics.GrpcSubscriptionChannelLength,
float32(len(subscription.updatesChannel)),
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)),
)
}
}
Expand Down Expand Up @@ -226,9 +227,10 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
// Use current goroutine to consistently poll subscription channel for updates
// to send through stream.
for updates := range subscription.updatesChannel {
metrics.IncrCounter(
metrics.IncrCounterWithLabels(
metrics.GrpcSendResponseToSubscriberCount,
1,
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscription.subscriptionId)),
)
err = subscription.messageSender.Send(
&clobtypes.StreamOrderbookUpdatesResponse{
Expand Down Expand Up @@ -364,9 +366,17 @@ func (sm *FullNodeStreamingManagerImpl) sendStreamUpdates(
return
}

metrics.IncrCounterWithLabels(
metrics.GrpcAddToSubscriptionChannelCount,
1,
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(subscriptionId)),
)

select {
case subscription.updatesChannel <- streamUpdates:
default:
// Buffer is full. Emit metric and drop subscription.
sm.EmitMetrics()
sm.logger.Error(
fmt.Sprintf(
"Streaming subscription id %+v channel full capacity. Dropping subscription connection.",
Expand Down Expand Up @@ -399,6 +409,11 @@ func (sm *FullNodeStreamingManagerImpl) SendSubaccountUpdate(
return
}

metrics.IncrCounter(
metrics.GrpcSendSubaccountUpdateCount,
1,
)

// If `DeliverTx`, updates should be staged to be streamed after consensus finalizes on a block.
stagedEvent := clobtypes.StagedFinalizeBlockEvent{
Event: &clobtypes.StagedFinalizeBlockEvent_SubaccountUpdate{
Expand Down Expand Up @@ -710,32 +725,6 @@ func getStreamUpdatesForSubaccountUpdates(
return streamUpdates, subaccountIds
}

// SendFinalizedSubaccountUpdates groups subaccount updates by their subaccount ids and
// sends messages to the subscribers.
func (sm *FullNodeStreamingManagerImpl) SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
) {
defer metrics.ModuleMeasureSince(
metrics.FullNodeGrpc,
metrics.GrpcSendFinalizedSubaccountUpdatesLatency,
time.Now(),
)

if execMode != sdk.ExecModeFinalize {
panic("SendFinalizedSubaccountUpdates should only be called in ExecModeFinalize")
}

streamUpdates, subaccountIds := getStreamUpdatesForSubaccountUpdates(
subaccountUpdates,
blockHeight,
execMode,
)

sm.AddSubaccountUpdatesToCache(streamUpdates, subaccountIds)
}

// AddOrderUpdatesToCache adds a series of updates to the full node streaming cache.
// Clob pair ids are the clob pair id each update is relevant to.
func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache(
Expand All @@ -752,9 +741,9 @@ func (sm *FullNodeStreamingManagerImpl) AddOrderUpdatesToCache(

sm.cacheStreamUpdatesByClobPairWithLock(updates, clobPairIds)

sm.EmitMetrics()
// Remove all subscriptions and wipe the buffer if buffer overflows.
sm.RemoveSubscriptionsAndClearBufferIfFull()
sm.EmitMetrics()
}

// AddSubaccountUpdatesToCache adds a series of updates to the full node streaming cache.
Expand All @@ -773,8 +762,8 @@ func (sm *FullNodeStreamingManagerImpl) AddSubaccountUpdatesToCache(

sm.cacheStreamUpdatesBySubaccountWithLock(updates, subaccountIds)

sm.RemoveSubscriptionsAndClearBufferIfFull()
sm.EmitMetrics()
sm.RemoveSubscriptionsAndClearBufferIfFull()
}

// RemoveSubscriptionsAndClearBufferIfFull removes all subscriptions and wipes the buffer if buffer overflows.
Expand All @@ -790,6 +779,7 @@ func (sm *FullNodeStreamingManagerImpl) RemoveSubscriptionsAndClearBufferIfFull(
}
sm.streamUpdateCache = nil
sm.streamUpdateSubscriptionCache = nil
sm.EmitMetrics()
}
}

Expand Down Expand Up @@ -825,13 +815,16 @@ func (sm *FullNodeStreamingManagerImpl) FlushStreamUpdatesWithLock() {
// If the buffer is full, drop the subscription.
for id, updates := range subscriptionUpdates {
if subscription, ok := sm.orderbookSubscriptions[id]; ok {
metrics.IncrCounter(
metrics.IncrCounterWithLabels(
metrics.GrpcAddToSubscriptionChannelCount,
1,
metrics.GetLabelForIntValue(metrics.SubscriptionId, int(id)),
)
select {
case subscription.updatesChannel <- updates:
default:
// Buffer is full. Emit metric and drop subscription.
sm.EmitMetrics()
idsToRemove = append(idsToRemove, id)
}
}
Expand Down
5 changes: 0 additions & 5 deletions protocol/streaming/types/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,6 @@ type FullNodeStreamingManager interface {
takerOrder clobtypes.StreamTakerOrder,
ctx sdk.Context,
)
SendFinalizedSubaccountUpdates(
subaccountUpdates []satypes.StreamSubaccountUpdate,
blockHeight uint32,
execMode sdk.ExecMode,
)
SendSubaccountUpdate(
ctx sdk.Context,
subaccountUpdate satypes.StreamSubaccountUpdate,
Expand Down
16 changes: 0 additions & 16 deletions protocol/x/subaccounts/keeper/subaccount.go
Original file line number Diff line number Diff line change
Expand Up @@ -825,19 +825,3 @@ func (k Keeper) GetAllRelevantPerpetuals(
func (k Keeper) GetFullNodeStreamingManager() streamingtypes.FullNodeStreamingManager {
return k.streamingManager
}

// SendFinalizedSubaccountUpdates sends the subaccount updates to the gRPC streaming manager.
func (k Keeper) SendFinalizedSubaccountUpdates(
ctx sdk.Context,
subaccountUpdates []types.StreamSubaccountUpdate,
) {
lib.AssertDeliverTxMode(ctx)
if len(subaccountUpdates) == 0 {
return
}
k.GetFullNodeStreamingManager().SendFinalizedSubaccountUpdates(
subaccountUpdates,
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
}

0 comments on commit 5d13842

Please sign in to comment.