diff --git a/protocol/streaming/grpc/grpc_streaming_manager.go b/protocol/streaming/grpc/grpc_streaming_manager.go index b8b8c7d73a..2aea498784 100644 --- a/protocol/streaming/grpc/grpc_streaming_manager.go +++ b/protocol/streaming/grpc/grpc_streaming_manager.go @@ -68,14 +68,11 @@ func NewGrpcStreamingManager( // Worker goroutine to consistently read from channel and send out updates go func() { - for { - for internalResponse := range grpcStreamingManager.updateBuffer { - grpcStreamingManager.logger.Info("start polling a response", "len", len(grpcStreamingManager.updateBuffer)) - grpcStreamingManager.sendUpdateResponse(internalResponse) - grpcStreamingManager.logger.Info("finish polling a response", "len", len(grpcStreamingManager.updateBuffer)) - - } + for internalResponse := range grpcStreamingManager.updateBuffer { + grpcStreamingManager.logger.Info("start polling a response", "len", len(grpcStreamingManager.updateBuffer)) + grpcStreamingManager.sendUpdateResponse(internalResponse) } + grpcStreamingManager.logger.Error("Should never see this. Poller has failed.") }() return grpcStreamingManager @@ -270,8 +267,9 @@ func (sm *GrpcStreamingManagerImpl) mustEnqueueOrderbookUpdate(internalResponse sm.removeSubscription(k) } // Clear out the buffer - close(sm.updateBuffer) - sm.updateBuffer = make(chan bufferInternalResponse, sm.updateBufferWindowSize) + for len(sm.updateBuffer) > 0 { + <-sm.updateBuffer + } } sm.EmitMetrics() }