Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fullnode) Add filterOrders option to streaming subscription #2676

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ export interface StreamOrderbookUpdatesRequest {
/** Market ids for price updates. */

marketIds: number[];
/** Filter order updates in addition to position updates */

filterOrders: boolean;
}
/**
* StreamOrderbookUpdatesRequest is a request message for the
Expand All @@ -293,6 +296,9 @@ export interface StreamOrderbookUpdatesRequestSDKType {
/** Market ids for price updates. */

market_ids: number[];
/** Filter order updates in addition to position updates */

filter_orders: boolean;
}
/**
* StreamOrderbookUpdatesResponse is a response message for the
Expand Down Expand Up @@ -1298,7 +1304,8 @@ function createBaseStreamOrderbookUpdatesRequest(): StreamOrderbookUpdatesReques
return {
clobPairId: [],
subaccountIds: [],
marketIds: []
marketIds: [],
filterOrders: false
};
}

Expand All @@ -1323,6 +1330,11 @@ export const StreamOrderbookUpdatesRequest = {
}

writer.ldelim();

if (message.filterOrders === true) {
writer.uint32(32).bool(message.filterOrders);
}

return writer;
},

Expand Down Expand Up @@ -1365,6 +1377,10 @@ export const StreamOrderbookUpdatesRequest = {

break;

case 4:
message.filterOrders = reader.bool();
break;

default:
reader.skipType(tag & 7);
break;
Expand All @@ -1379,6 +1395,7 @@ export const StreamOrderbookUpdatesRequest = {
message.clobPairId = object.clobPairId?.map(e => e) || [];
message.subaccountIds = object.subaccountIds?.map(e => SubaccountId.fromPartial(e)) || [];
message.marketIds = object.marketIds?.map(e => e) || [];
message.filterOrders = object.filterOrders ?? false;
return message;
}

Expand Down
3 changes: 3 additions & 0 deletions proto/dydxprotocol/clob/query.proto
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ message StreamOrderbookUpdatesRequest {

// Market ids for price updates.
repeated uint32 market_ids = 3;

// Filter order updates in addition to position updates
bool filter_orders = 4;
Comment on lines +189 to +191
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Implementation of filter_orders parameter appears incomplete

The filter_orders parameter has been added to the protocol definition and is properly passed through the interface chain, but there's no evidence of actual filtering logic implementation in the codebase. This could lead to the parameter being ineffective.

  • protocol/streaming/full_node_streaming_manager.go: The streaming manager accepts the parameter but doesn't appear to use it for filtering order updates
  • Consider implementing the filtering logic in AddOrderUpdatesToCache or related methods where updates are processed
🔗 Analysis chain

Verify integration with existing streaming functionality.

The field addition looks correct, but let's verify its integration with the existing streaming system.

Run the following to check for consistent handling across the codebase:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the integration of filter_orders field with streaming functionality
# Check for corresponding handler implementations
rg -g '*.{go,proto}' -A 5 'StreamOrderbookUpdates.*Request'

# Check for any existing filtering logic that might need updates
rg -g '*.go' -A 10 'func.*Subscribe'

# Look for tests covering the new functionality
rg -g '*_test.{go,proto}' -A 5 'filter.*orders'

Length of output: 14812


Script:

#!/bin/bash
# Check for filtering logic implementation in streaming manager
rg -g '*.go' -A 15 'filterOrders.*bool' protocol/streaming/

# Look for any tests verifying the filtering functionality
rg -g '*_test.go' -B 5 -A 10 'filterOrders.*true' protocol/streaming/

Length of output: 2484


Script:

#!/bin/bash
# Look for order update streaming implementation
rg -g '*.go' -B 5 -A 10 'SendStreamUpdates|SendOrderbookUpdates' protocol/streaming/

# Check for any filtering logic in the update sending code
rg -g '*.go' -B 5 -A 10 'func.*Update.*Order' protocol/streaming/

# Look for any order-related streaming logic
ast-grep --pattern 'if $filterOrders {
  $$$
}'

Length of output: 7169


Script:

#!/bin/bash
# Look for message sending implementation
rg -g '*.go' -B 5 -A 15 'func.*Send.*Message' protocol/streaming/

# Check subscriber management and cache handling
rg -g '*.go' -B 5 -A 15 'AddOrderUpdatesToCache|HandleNewSubscriber' protocol/streaming/

# Look for any references to filter_orders in the implementation
rg -g '*.go' -l 'filter.*orders' protocol/streaming/

Length of output: 6862

}

// StreamOrderbookUpdatesResponse is a response message for the
Expand Down
141 changes: 120 additions & 21 deletions protocol/streaming/full_node_streaming_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,25 @@ package streaming

import (
"fmt"
"slices"
"sync"
"sync/atomic"
"time"

"github.com/dydxprotocol/v4-chain/protocol/lib"
pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"

"cosmossdk.io/log"
storetypes "cosmossdk.io/store/types"
"github.com/cosmos/cosmos-sdk/codec"
sdk "github.com/cosmos/cosmos-sdk/types"
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types"
"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"
"github.com/dydxprotocol/v4-chain/protocol/lib"
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics"
"github.com/dydxprotocol/v4-chain/protocol/streaming/types"
streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util"
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types"

ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types"

"github.com/dydxprotocol/v4-chain/protocol/finalizeblock"
pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types"
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types"
)

var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil)
Expand Down Expand Up @@ -96,6 +94,41 @@ type OrderbookSubscription struct {
nextSnapshotBlock uint32
}

func NewOrderbookSubscription(
subscriptionId uint32,
clobPairIds []uint32,
subaccountIds []satypes.SubaccountId,
marketIds []uint32,
messageSender types.OutgoingMessageSender,
updatesChannel chan []clobtypes.StreamUpdate,
) *OrderbookSubscription {
return &OrderbookSubscription{
subscriptionId: subscriptionId,
initialized: &atomic.Bool{}, // False by default.
clobPairIds: clobPairIds,
subaccountIds: subaccountIds,
marketIds: marketIds,
messageSender: messageSender,
updatesChannel: updatesChannel,
}
}

func (sm *FullNodeStreamingManagerImpl) NewOrderbookSubscription(
clobPairIds []uint32,
subaccountIds []satypes.SubaccountId,
marketIds []uint32,
messageSender types.OutgoingMessageSender,
) *OrderbookSubscription {
return NewOrderbookSubscription(
sm.getNextAvailableSubscriptionId(),
clobPairIds,
subaccountIds,
marketIds,
messageSender,
make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize),
)
}

func (sub *OrderbookSubscription) IsInitialized() bool {
return sub.initialized.Load()
}
Expand Down Expand Up @@ -187,11 +220,68 @@ func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32
return id
}

// Filter StreamUpdates for subaccountIdNumbers
// If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop message
// If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, construct a new
// StreamUpdate_OrderUpdate with updates only for subscribed subaccounts
func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates(
output chan []clobtypes.StreamUpdate,
logger log.Logger,
) {
subaccountIdNumbers := make([]uint32, len(sub.subaccountIds))
for i, subaccountId := range sub.subaccountIds {
subaccountIdNumbers[i] = subaccountId.Number
}

// If reflection becomes too expensive, split updatesChannel by message type
for updates := range sub.updatesChannel {
filteredUpdates := []clobtypes.StreamUpdate{}
for _, update := range updates {
switch updateMessage := update.UpdateMessage.(type) {
case *clobtypes.StreamUpdate_OrderbookUpdate:
orderBookUpdates := []ocutypes.OffChainUpdateV1{}
for _, orderBookUpdate := range updateMessage.OrderbookUpdate.Updates {
orderBookUpdateSubaccountIdNumber, err := streaming_util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate)
if err == nil {
if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) {
orderBookUpdates = append(orderBookUpdates, orderBookUpdate)
}
} else {
logger.Error(err.Error())
}
}
// Drop the StreamUpdate_OrderbookUpdate if all updates inside were dropped
if len(orderBookUpdates) > 0 {
if len(orderBookUpdates) < len(updateMessage.OrderbookUpdate.Updates) {
update = clobtypes.StreamUpdate{
BlockHeight: update.BlockHeight,
ExecMode: update.ExecMode,
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{
Snapshot: updateMessage.OrderbookUpdate.Snapshot,
Updates: orderBookUpdates,
},
},
}
}
filteredUpdates = append(filteredUpdates, update)
}
default:
filteredUpdates = append(filteredUpdates, update)
}
}
if len(filteredUpdates) > 0 {
output <- filteredUpdates
}
}
}

// Subscribe subscribes to the orderbook updates stream.
func (sm *FullNodeStreamingManagerImpl) Subscribe(
clobPairIds []uint32,
subaccountIds []*satypes.SubaccountId,
marketIds []uint32,
filterOrders bool,
messageSender types.OutgoingMessageSender,
) (
err error,
Expand All @@ -207,17 +297,8 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
sIds[i] = *subaccountId
}

subscriptionId := sm.getNextAvailableSubscriptionId()
subscription := sm.NewOrderbookSubscription(clobPairIds, sIds, marketIds, messageSender)

subscription := &OrderbookSubscription{
subscriptionId: subscriptionId,
initialized: &atomic.Bool{}, // False by default.
clobPairIds: clobPairIds,
subaccountIds: sIds,
marketIds: marketIds,
messageSender: messageSender,
updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize),
}
for _, clobPairId := range clobPairIds {
// if clobPairId exists in the map, append the subscription id to the slice
// otherwise, create a new slice with the subscription id
Expand Down Expand Up @@ -265,9 +346,27 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe(
sm.EmitMetrics()
sm.Unlock()

// If filterOrders, listen to filtered channel and start filter goroutine
// Error if filterOrders but no subaccounts are subscribed
filteredUpdateChannel := subscription.updatesChannel
if filterOrders {
if len(subaccountIds) == 0 {
sm.logger.Error(
fmt.Sprintf(
"filterOrders requires subaccountIds for subscription id: %+v",
subscription.subscriptionId,
),
)
} else {
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize)
defer close(filteredUpdateChannel)
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger)
}
}

Comment on lines +349 to +366
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent execution when filterOrders is true without subaccountIds

When filterOrders is true but no subaccountIds are provided, the code logs an error but continues execution. This can lead to subscribers receiving unfiltered order updates, contrary to their request. It's advisable to return an error or terminate the subscription to prevent unintended behavior.

Apply this diff to return an error when filterOrders is requested without subaccountIds:

 if filterOrders {
     if len(subaccountIds) == 0 {
         sm.logger.Error(
             fmt.Sprintf(
                 "filterOrders requires subaccountIds for subscription id: %+v",
                 subscription.subscriptionId,
             ),
         )
+        return types.ErrInvalidStreamingRequest
     } else {
         filteredUpdateChannel = make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize)
         defer close(filteredUpdateChannel)
         go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger)
     }
 }

Committable suggestion skipped: line range outside the PR's diff.

// Use current goroutine to consistently poll subscription channel for updates
// to send through stream.
for updates := range subscription.updatesChannel {
for updates := range filteredUpdateChannel {
metrics.IncrCounterWithLabels(
metrics.GrpcSendResponseToSubscriberCount,
1,
Expand Down Expand Up @@ -1080,12 +1179,12 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock(
sm.FlushStreamUpdatesWithLock()

// Cache updates to sync local ops queue
sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
syncLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates(
streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue),
lib.MustConvertIntegerToUint32(ctx.BlockHeight()),
ctx.ExecMode(),
)
sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds)
sm.cacheStreamUpdatesByClobPairWithLock(syncLocalUpdates, syncLocalClobPairIds)

// Cache updates for finalized fills.
fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills(
Expand Down
Loading
Loading