diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index e77dcaffdb..c9b061d664 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -2,7 +2,6 @@ package grpc import ( "fmt" - "math/rand" "sync" "time" @@ -29,7 +28,8 @@ type GrpcStreamingManagerImpl struct { // Readonly buffer to enqueue orderbook updates before pushing them through grpc streams. // Decouples the execution of abci logic with full node streaming. - updateBuffer chan bufferInternalResponse + updateBuffer chan bufferInternalResponse + updateBufferWindowSize uint32 } // bufferInternalResponse is enqueued into the readonly buffer. @@ -62,13 +62,17 @@ func NewGrpcStreamingManager( orderbookSubscriptions: make(map[uint32]*OrderbookSubscription), nextSubscriptionId: 0, - updateBuffer: make(chan bufferInternalResponse, bufferWindow), + updateBuffer: make(chan bufferInternalResponse, bufferWindow), + updateBufferWindowSize: bufferWindow, } // Worker goroutine to consistently read from channel and send out updates go func() { for internalResponse := range grpcStreamingManager.updateBuffer { + grpcStreamingManager.logger.Info("start polling a response", "len", len(grpcStreamingManager.updateBuffer)) grpcStreamingManager.sendUpdateResponse(internalResponse) + grpcStreamingManager.logger.Info("finish polling a response", "len", len(grpcStreamingManager.updateBuffer)) + } }() @@ -101,30 +105,20 @@ func (sm *GrpcStreamingManagerImpl) removeSubscription(clobPairId uint32) { func (sm *GrpcStreamingManagerImpl) sendUpdateResponse( internalResponse bufferInternalResponse, ) { - sample_rate := 1.0 - log := false - if rand.Float64() < sample_rate { - log = true - } - // Send update to subscribers. subscriptionIdsToRemove := make([]uint32, 0) for id, subscription := range sm.orderbookSubscriptions { for _, clobPairId := range subscription.clobPairIds { if clobPairId == internalResponse.clobPairId { - if log { - sm.logger.Info("sending out update") - } + sm.logger.Info("sending out update") if err := subscription.srv.Send( &internalResponse.response, ); err != nil { sm.logger.Error("Error sending out update", "err", err) subscriptionIdsToRemove = append(subscriptionIdsToRemove, id) } - if log { - sm.logger.Info("finished sending out update") - } + sm.logger.Info("finished sending out update") } } } @@ -268,12 +262,13 @@ func (sm *GrpcStreamingManagerImpl) SendOrderbookUpdates( func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse bufferInternalResponse) { select { case sm.updateBuffer <- internalResponse: - sm.logger.Info("successfully enqueue buffer") default: - sm.logger.Info("GRPC Streaming buffer full. Clearing all subscriptions") + sm.logger.Info("GRPC Streaming buffer full capacity. Dropping messages and all subscriptions. Increase buffer size via the `grpc-streaming-buffer-size flag.") for k := range sm.orderbookSubscriptions { sm.removeSubscription(k) } + // Clear out the buffer + sm.updateBuffer = make(chan bufferInternalResponse, sm.updateBufferWindowSize) } sm.EmitMetrics() }