-
Notifications
You must be signed in to change notification settings - Fork 115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(fullnode) Add filterOrders option to streaming subscription #2676
base: main
Are you sure you want to change the base?
Changes from all commits
05ff9f3
309f2ec
39213e7
97d08ce
457459b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,27 +2,25 @@ package streaming | |
|
||
import ( | ||
"fmt" | ||
"slices" | ||
"sync" | ||
"sync/atomic" | ||
"time" | ||
|
||
"github.com/dydxprotocol/v4-chain/protocol/lib" | ||
pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types" | ||
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" | ||
|
||
"cosmossdk.io/log" | ||
storetypes "cosmossdk.io/store/types" | ||
"github.com/cosmos/cosmos-sdk/codec" | ||
sdk "github.com/cosmos/cosmos-sdk/types" | ||
ante_types "github.com/dydxprotocol/v4-chain/protocol/app/ante/types" | ||
"github.com/dydxprotocol/v4-chain/protocol/finalizeblock" | ||
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" | ||
"github.com/dydxprotocol/v4-chain/protocol/lib" | ||
"github.com/dydxprotocol/v4-chain/protocol/lib/metrics" | ||
"github.com/dydxprotocol/v4-chain/protocol/streaming/types" | ||
streaming_util "github.com/dydxprotocol/v4-chain/protocol/streaming/util" | ||
clobtypes "github.com/dydxprotocol/v4-chain/protocol/x/clob/types" | ||
|
||
ocutypes "github.com/dydxprotocol/v4-chain/protocol/indexer/off_chain_updates/types" | ||
|
||
"github.com/dydxprotocol/v4-chain/protocol/finalizeblock" | ||
pricestypes "github.com/dydxprotocol/v4-chain/protocol/x/prices/types" | ||
satypes "github.com/dydxprotocol/v4-chain/protocol/x/subaccounts/types" | ||
) | ||
|
||
var _ types.FullNodeStreamingManager = (*FullNodeStreamingManagerImpl)(nil) | ||
|
@@ -96,6 +94,41 @@ type OrderbookSubscription struct { | |
nextSnapshotBlock uint32 | ||
} | ||
|
||
func NewOrderbookSubscription( | ||
subscriptionId uint32, | ||
clobPairIds []uint32, | ||
subaccountIds []satypes.SubaccountId, | ||
marketIds []uint32, | ||
messageSender types.OutgoingMessageSender, | ||
updatesChannel chan []clobtypes.StreamUpdate, | ||
) *OrderbookSubscription { | ||
return &OrderbookSubscription{ | ||
subscriptionId: subscriptionId, | ||
initialized: &atomic.Bool{}, // False by default. | ||
clobPairIds: clobPairIds, | ||
subaccountIds: subaccountIds, | ||
marketIds: marketIds, | ||
messageSender: messageSender, | ||
updatesChannel: updatesChannel, | ||
} | ||
} | ||
|
||
func (sm *FullNodeStreamingManagerImpl) NewOrderbookSubscription( | ||
clobPairIds []uint32, | ||
subaccountIds []satypes.SubaccountId, | ||
marketIds []uint32, | ||
messageSender types.OutgoingMessageSender, | ||
) *OrderbookSubscription { | ||
return NewOrderbookSubscription( | ||
sm.getNextAvailableSubscriptionId(), | ||
clobPairIds, | ||
subaccountIds, | ||
marketIds, | ||
messageSender, | ||
make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize), | ||
) | ||
} | ||
|
||
func (sub *OrderbookSubscription) IsInitialized() bool { | ||
return sub.initialized.Load() | ||
} | ||
|
@@ -187,11 +220,68 @@ func (sm *FullNodeStreamingManagerImpl) getNextAvailableSubscriptionId() uint32 | |
return id | ||
} | ||
|
||
// Filter StreamUpdates for subaccountIdNumbers | ||
// If a StreamUpdate_OrderUpdate contains no updates for subscribed subaccounts, drop message | ||
// If a StreamUpdate_OrderUpdate contains updates for subscribed subaccounts, construct a new | ||
// StreamUpdate_OrderUpdate with updates only for subscribed subaccounts | ||
func (sub *OrderbookSubscription) FilterSubaccountStreamUpdates( | ||
output chan []clobtypes.StreamUpdate, | ||
logger log.Logger, | ||
) { | ||
subaccountIdNumbers := make([]uint32, len(sub.subaccountIds)) | ||
for i, subaccountId := range sub.subaccountIds { | ||
subaccountIdNumbers[i] = subaccountId.Number | ||
} | ||
|
||
// If reflection becomes too expensive, split updatesChannel by message type | ||
for updates := range sub.updatesChannel { | ||
filteredUpdates := []clobtypes.StreamUpdate{} | ||
for _, update := range updates { | ||
switch updateMessage := update.UpdateMessage.(type) { | ||
case *clobtypes.StreamUpdate_OrderbookUpdate: | ||
orderBookUpdates := []ocutypes.OffChainUpdateV1{} | ||
for _, orderBookUpdate := range updateMessage.OrderbookUpdate.Updates { | ||
orderBookUpdateSubaccountIdNumber, err := streaming_util.GetOffChainUpdateV1SubaccountIdNumber(orderBookUpdate) | ||
if err == nil { | ||
if slices.Contains(subaccountIdNumbers, orderBookUpdateSubaccountIdNumber) { | ||
orderBookUpdates = append(orderBookUpdates, orderBookUpdate) | ||
} | ||
} else { | ||
logger.Error(err.Error()) | ||
} | ||
} | ||
// Drop the StreamUpdate_OrderbookUpdate if all updates inside were dropped | ||
if len(orderBookUpdates) > 0 { | ||
if len(orderBookUpdates) < len(updateMessage.OrderbookUpdate.Updates) { | ||
update = clobtypes.StreamUpdate{ | ||
BlockHeight: update.BlockHeight, | ||
ExecMode: update.ExecMode, | ||
UpdateMessage: &clobtypes.StreamUpdate_OrderbookUpdate{ | ||
OrderbookUpdate: &clobtypes.StreamOrderbookUpdate{ | ||
Snapshot: updateMessage.OrderbookUpdate.Snapshot, | ||
Updates: orderBookUpdates, | ||
}, | ||
}, | ||
} | ||
} | ||
filteredUpdates = append(filteredUpdates, update) | ||
} | ||
default: | ||
filteredUpdates = append(filteredUpdates, update) | ||
} | ||
} | ||
if len(filteredUpdates) > 0 { | ||
output <- filteredUpdates | ||
} | ||
} | ||
} | ||
|
||
// Subscribe subscribes to the orderbook updates stream. | ||
func (sm *FullNodeStreamingManagerImpl) Subscribe( | ||
clobPairIds []uint32, | ||
subaccountIds []*satypes.SubaccountId, | ||
marketIds []uint32, | ||
filterOrders bool, | ||
messageSender types.OutgoingMessageSender, | ||
) ( | ||
err error, | ||
|
@@ -207,17 +297,8 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( | |
sIds[i] = *subaccountId | ||
} | ||
|
||
subscriptionId := sm.getNextAvailableSubscriptionId() | ||
subscription := sm.NewOrderbookSubscription(clobPairIds, sIds, marketIds, messageSender) | ||
|
||
subscription := &OrderbookSubscription{ | ||
subscriptionId: subscriptionId, | ||
initialized: &atomic.Bool{}, // False by default. | ||
clobPairIds: clobPairIds, | ||
subaccountIds: sIds, | ||
marketIds: marketIds, | ||
messageSender: messageSender, | ||
updatesChannel: make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize), | ||
} | ||
for _, clobPairId := range clobPairIds { | ||
// if clobPairId exists in the map, append the subscription id to the slice | ||
// otherwise, create a new slice with the subscription id | ||
|
@@ -265,9 +346,27 @@ func (sm *FullNodeStreamingManagerImpl) Subscribe( | |
sm.EmitMetrics() | ||
sm.Unlock() | ||
|
||
// If filterOrders, listen to filtered channel and start filter goroutine | ||
// Error if filterOrders but no subaccounts are subscribed | ||
filteredUpdateChannel := subscription.updatesChannel | ||
if filterOrders { | ||
if len(subaccountIds) == 0 { | ||
sm.logger.Error( | ||
fmt.Sprintf( | ||
"filterOrders requires subaccountIds for subscription id: %+v", | ||
subscription.subscriptionId, | ||
), | ||
) | ||
} else { | ||
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize) | ||
defer close(filteredUpdateChannel) | ||
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger) | ||
} | ||
} | ||
|
||
Comment on lines
+349
to
+366
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prevent execution when When Apply this diff to return an error when if filterOrders {
if len(subaccountIds) == 0 {
sm.logger.Error(
fmt.Sprintf(
"filterOrders requires subaccountIds for subscription id: %+v",
subscription.subscriptionId,
),
)
+ return types.ErrInvalidStreamingRequest
} else {
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize)
defer close(filteredUpdateChannel)
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger)
}
}
|
||
// Use current goroutine to consistently poll subscription channel for updates | ||
// to send through stream. | ||
for updates := range subscription.updatesChannel { | ||
for updates := range filteredUpdateChannel { | ||
metrics.IncrCounterWithLabels( | ||
metrics.GrpcSendResponseToSubscriberCount, | ||
1, | ||
|
@@ -1080,12 +1179,12 @@ func (sm *FullNodeStreamingManagerImpl) StreamBatchUpdatesAfterFinalizeBlock( | |
sm.FlushStreamUpdatesWithLock() | ||
|
||
// Cache updates to sync local ops queue | ||
sycnLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates( | ||
syncLocalUpdates, syncLocalClobPairIds := getStreamUpdatesFromOffchainUpdates( | ||
streaming_util.GetOffchainUpdatesV1(orderBookUpdatesToSyncLocalOpsQueue), | ||
lib.MustConvertIntegerToUint32(ctx.BlockHeight()), | ||
ctx.ExecMode(), | ||
) | ||
sm.cacheStreamUpdatesByClobPairWithLock(sycnLocalUpdates, syncLocalClobPairIds) | ||
sm.cacheStreamUpdatesByClobPairWithLock(syncLocalUpdates, syncLocalClobPairIds) | ||
|
||
// Cache updates for finalized fills. | ||
fillStreamUpdates, fillClobPairIds := sm.getStreamUpdatesForOrderbookFills( | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Implementation of
filter_orders
parameter appears incompleteThe
filter_orders
parameter has been added to the protocol definition and is properly passed through the interface chain, but there's no evidence of actual filtering logic implementation in the codebase. This could lead to the parameter being ineffective.protocol/streaming/full_node_streaming_manager.go
: The streaming manager accepts the parameter but doesn't appear to use it for filtering order updatesAddOrderUpdatesToCache
or related methods where updates are processed🔗 Analysis chain
Verify integration with existing streaming functionality.
The field addition looks correct, but let's verify its integration with the existing streaming system.
Run the following to check for consistent handling across the codebase:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
Length of output: 14812
Script:
Length of output: 2484
Script:
Length of output: 7169
Script:
Length of output: 6862