Skip to content

Commit

Permalink
clear buffer when buffer is full
Browse files Browse the repository at this point in the history
  • Loading branch information
jonfung-dydx committed May 24, 2024
1 parent fdf10d3 commit 1066d2b
Showing 1 changed file with 12 additions and 17 deletions.
29 changes: 12 additions & 17 deletions protocol/streaming/grpc/grpc_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package grpc

import (
"fmt"
"math/rand"
"sync"
"time"

Expand All @@ -29,7 +28,8 @@ type GrpcStreamingManagerImpl struct {

// Readonly buffer to enqueue orderbook updates before pushing them through grpc streams.
// Decouples the execution of abci logic with full node streaming.
updateBuffer chan bufferInternalResponse
updateBuffer chan bufferInternalResponse
updateBufferWindowSize uint32
}

// bufferInternalResponse is enqueued into the readonly buffer.
Expand Down Expand Up @@ -62,13 +62,17 @@ func NewGrpcStreamingManager(
orderbookSubscriptions: make(map[uint32]*OrderbookSubscription),
nextSubscriptionId: 0,

updateBuffer: make(chan bufferInternalResponse, bufferWindow),
updateBuffer: make(chan bufferInternalResponse, bufferWindow),
updateBufferWindowSize: bufferWindow,
}

// Worker goroutine to consistently read from channel and send out updates
go func() {
for internalResponse := range grpcStreamingManager.updateBuffer {
grpcStreamingManager.logger.Info("start polling a response", "len", len(grpcStreamingManager.updateBuffer))
grpcStreamingManager.sendUpdateResponse(internalResponse)
grpcStreamingManager.logger.Info("finish polling a response", "len", len(grpcStreamingManager.updateBuffer))

}
}()

Expand Down Expand Up @@ -101,30 +105,20 @@ func (sm *GrpcStreamingManagerImpl) removeSubscription(clobPairId uint32) {
func (sm *GrpcStreamingManagerImpl) sendUpdateResponse(
internalResponse bufferInternalResponse,
) {
sample_rate := 1.0
log := false
if rand.Float64() < sample_rate {
log = true
}

// Send update to subscribers.
subscriptionIdsToRemove := make([]uint32, 0)

for id, subscription := range sm.orderbookSubscriptions {
for _, clobPairId := range subscription.clobPairIds {
if clobPairId == internalResponse.clobPairId {
if log {
sm.logger.Info("sending out update")
}
sm.logger.Info("sending out update")
if err := subscription.srv.Send(
&internalResponse.response,
); err != nil {
sm.logger.Error("Error sending out update", "err", err)
subscriptionIdsToRemove = append(subscriptionIdsToRemove, id)
}
if log {
sm.logger.Info("finished sending out update")
}
sm.logger.Info("finished sending out update")
}
}
}
Expand Down Expand Up @@ -268,12 +262,13 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates(
func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse bufferInternalResponse) {
select {
case sm.updateBuffer <- internalResponse:
sm.logger.Info("successfully enqueue buffer")
default:
sm.logger.Info("GRPC Streaming buffer full. Clearing all subscriptions")
sm.logger.Info("GRPC Streaming buffer full capacity. Dropping messages and all subscriptions. Increase buffer size via the `grpc-streaming-buffer-size flag.")
for k := range sm.orderbookSubscriptions {
sm.removeSubscription(k)
}
// Clear out the buffer
sm.updateBuffer = make(chan bufferInternalResponse, sm.updateBufferWindowSize)
}
sm.EmitMetrics()
}
Expand Down

0 comments on commit 1066d2b

Please sign in to comment.