diff --git a/protocol/streaming/full_node_streaming_manager.go b/protocol/streaming/full_node_streaming_manager.go index 1fe53a7252e..9db77ab4b76 100644 --- a/protocol/streaming/full_node_streaming_manager.go +++ b/protocol/streaming/full_node_streaming_manager.go @@ -35,7 +35,7 @@ type FullNodeStreamingManagerImpl struct { // orderbookSubscriptions maps subscription IDs to their respective orderbook subscriptions. orderbookSubscriptions map[uint32]*OrderbookSubscription - nextSubscriptionId uint32 + activeSubscriptionIds map[uint32]struct{} // stream will batch and flush out messages every 10 ms. ticker *time.Ticker @@ -104,7 +104,7 @@ func NewFullNodeStreamingManager( fullNodeStreamingManager := &FullNodeStreamingManagerImpl{ logger: logger, orderbookSubscriptions: make(map[uint32]*OrderbookSubscription), - nextSubscriptionId: 0, + activeSubscriptionIds: make(map[uint32]struct{}), ticker: time.NewTicker(time.Duration(flushIntervalMs) * time.Millisecond), done: make(chan bool), @@ -180,8 +180,13 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( for i, subaccountId := range subaccountIds { sIds[i] = *subaccountId } + issuedSubscriptionId := uint32(0) + for _, exists := sm.activeSubscriptionIds[issuedSubscriptionId]; exists; { + issuedSubscriptionId += 1 + } + subscription := &OrderbookSubscription{ - subscriptionId: sm.nextSubscriptionId, + subscriptionId: issuedSubscriptionId, initialized: &atomic.Bool{}, // False by default. clobPairIds: clobPairIds, subaccountIds: sIds, @@ -196,7 +201,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( } sm.clobPairIdToSubscriptionIdMapping[clobPairId] = append( sm.clobPairIdToSubscriptionIdMapping[clobPairId], - sm.nextSubscriptionId, + issuedSubscriptionId, ) } for _, subaccountId := range sIds { @@ -207,7 +212,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( } sm.subaccountIdToSubscriptionIdMapping[subaccountId] = append( sm.subaccountIdToSubscriptionIdMapping[subaccountId], - sm.nextSubscriptionId, + issuedSubscriptionId, ) } @@ -220,7 +225,7 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( ), ) sm.orderbookSubscriptions[subscription.subscriptionId] = subscription - sm.nextSubscriptionId++ + sm.activeSubscriptionIds[issuedSubscriptionId] = struct{}{} sm.EmitMetrics() sm.Unlock() @@ -303,6 +308,8 @@ func (sm *FullNodeStreamingManagerImpl) removeSubscription( } } + delete(sm.activeSubscriptionIds, subscriptionIdToRemove) + sm.logger.Info( fmt.Sprintf("Removed streaming subscription id %+v", subscriptionIdToRemove), )