-
Notifications
You must be signed in to change notification settings - Fork 115
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(fullnode) Add filterOrders option to streaming subscription #2676
base: main
Are you sure you want to change the base?
feat(fullnode) Add filterOrders option to streaming subscription #2676
Conversation
WalkthroughThis pull request introduces a new boolean parameter Changes
Sequence DiagramsequenceDiagram
participant Client
participant WebSocketServer
participant StreamingManager
participant FilterLogic
Client->>WebSocketServer: Connect with filterOrders=true
WebSocketServer->>StreamingManager: Subscribe(clobPairIds, subaccountIds, filterOrders)
StreamingManager->>FilterLogic: Create Subscription
FilterLogic-->>StreamingManager: Filtered Subscription Created
StreamingManager-->>WebSocketServer: Subscription Confirmed
WebSocketServer-->>Client: WebSocket Connection Established
Possibly related PRs
Suggested reviewers
Poem
Finishing Touches
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (7)
protocol/streaming/util/util.go (2)
25-26
: Improve function documentation.The current comment is focused on error cases. Consider adding a description of the function's purpose and return values.
-// Error expected if OffChainUpdateV1.UpdateMessage message type is extended to more order events +// GetOffChainUpdateV1SubaccountIdNumber extracts the SubaccountId.Number from an OffChainUpdateV1 message. +// Returns the subaccount number and an error if the update message type is not a supported order event +// (OrderPlace, OrderRemove, OrderUpdate, OrderReplace).
28-42
: Consider using constants for error messages.The error message is hardcoded. Consider defining it as a constant to maintain consistency and make updates easier.
+const ( + ErrUnsupportedUpdateMessage = "UpdateMessage type not in {OrderPlace, OrderRemove, OrderUpdate, OrderReplace}: %+v" +) switch updateMessage := update.UpdateMessage.(type) { // ... case statements ... default: - return 0, fmt.Errorf( - "UpdateMessage type not in {OrderPlace, OrderRemove, OrderUpdate, OrderReplace}: %+v", - updateMessage, - ) + return 0, fmt.Errorf(ErrUnsupportedUpdateMessage, updateMessage)protocol/streaming/util/util_test.go (2)
113-113
: Remove debug print statement.The
fmt.Println
statement appears to be left from debugging.- fmt.Println("expected", id)
21-47
: Refactor test data setup.Consider moving the test data setup into helper functions to improve readability and reusability.
func createTestOrder(subaccountIdNumber uint32) (pv1types.IndexerOrderId, pv1types.IndexerOrder) { orderId := pv1types.IndexerOrderId{ SubaccountId: pv1types.IndexerSubaccountId{ Owner: "foo", Number: subaccountIdNumber, }, ClientId: 0, OrderFlags: 0, ClobPairId: 0, } order := pv1types.IndexerOrder{ OrderId: orderId, // ... other fields ... } return orderId, order }protocol/streaming/full_node_streaming_manager.go (1)
223-277
: Ensure comprehensive error handling and future-proofing for additional update typesIn
FilterSubaccountStreamUpdates
, whenGetOffChainUpdateV1SubaccountIdNumber
returns an error, the error is logged but not addressed further. Consider whether logging alone is sufficient or if additional error handling is necessary to prevent potential data loss of critical updates.Moreover, currently, only
StreamUpdate_OrderbookUpdate
messages are explicitly filtered. If new update message types containing subaccount IDs are introduced in the future, they might bypass filtering. Consider extending the filtering logic to handle other relevant message types to ensure consistent behavior.protocol/streaming/types/interface.go (1)
19-19
: Add documentation for the newfilterOrders
parameterIncluding comments to describe the purpose and usage of the
filterOrders
parameter in theSubscribe
method will improve code readability and assist developers in understanding the subscription behavior.Apply this diff to add documentation:
Subscribe( clobPairIds []uint32, subaccountIds []*satypes.SubaccountId, marketIds []uint32, + // filterOrders determines whether to filter order updates based on subaccountIds filterOrders bool, srv OutgoingMessageSender, ) ( err error, )
proto/dydxprotocol/clob/query.proto (1)
189-191
: Add detailed documentation for the filter_orders field.The new field would benefit from more comprehensive documentation explaining:
- Its purpose and behavior
- The default value implications
- The relationship with subaccount filtering
Add documentation comments above the field:
// Market ids for price updates. repeated uint32 market_ids = 3; + // Indicates whether to filter order updates based on the specified subaccount_ids. + // When true, only order updates related to the specified subaccount_ids will be streamed. + // When false or unspecified, all order updates will be streamed regardless of subaccount_ids. + // This filter works in conjunction with position update filtering specified by subaccount_ids. bool filter_orders = 4;
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
protocol/x/clob/types/query.pb.go
is excluded by!**/*.pb.go
📒 Files selected for processing (10)
indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts
(6 hunks)proto/dydxprotocol/clob/query.proto
(1 hunks)protocol/streaming/full_node_streaming_manager.go
(6 hunks)protocol/streaming/full_node_streaming_manager_test.go
(1 hunks)protocol/streaming/noop_streaming_manager.go
(1 hunks)protocol/streaming/types/interface.go
(1 hunks)protocol/streaming/util/util.go
(1 hunks)protocol/streaming/util/util_test.go
(1 hunks)protocol/streaming/ws/websocket_server.go
(3 hunks)protocol/x/clob/keeper/grpc_stream_orderbook.go
(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
- GitHub Check: test-sim-after-import
- GitHub Check: test-sim-import-export
- GitHub Check: test-sim-multi-seed-short
- GitHub Check: test / run_command
- GitHub Check: unit-end-to-end-and-integration
- GitHub Check: test-race
- GitHub Check: test-coverage-upload
- GitHub Check: container-tests
- GitHub Check: Summary
🔇 Additional comments (14)
protocol/streaming/full_node_streaming_manager.go (3)
97-114
: Encapsulation ofOrderbookSubscription
initialization improves code reusabilityThe new function
NewOrderbookSubscription
cleanly encapsulates the initialization logic forOrderbookSubscription
, enhancing code reuse and maintainability.
116-131
: RefactoredNewOrderbookSubscription
method promotes code simplicityBy delegating to the general
NewOrderbookSubscription
function, the method withinFullNodeStreamingManagerImpl
simplifies subscription creation and reduces code duplication.
1182-1187
: Correctly caching updates to synchronize local operations queueThe addition of caching
syncLocalUpdates
ensures that local operations are properly synchronized duringStreamBatchUpdatesAfterFinalizeBlock
, enhancing the reliability and consistency of the streaming service.protocol/x/clob/keeper/grpc_stream_orderbook.go (1)
15-15
: Inclusion offilterOrders
parameter enables order filtering in subscriptionsAdding
req.GetFilterOrders()
to theSubscribe
call correctly integrates thefilterOrders
option, allowing clients to optionally filter order updates based on their specified criteria.protocol/streaming/noop_streaming_manager.go (1)
27-27
: LGTM!The no-op implementation correctly implements the updated interface by adding the new
filterOrders
parameter.protocol/streaming/ws/websocket_server.go (3)
100-108
: LGTM! Error handling is consistent with other parameters.The implementation follows the established pattern for parameter parsing and error handling.
183-194
: LGTM! Helper function follows best practices.The
parseFilterOrders
function:
- Correctly handles empty parameter case
- Uses appropriate error message format
- Returns a default value of
false
when parameter is not provided
123-123
: LGTM! Parameter correctly passed to Subscribe method.The
filterOrders
parameter is properly passed to the streaming manager's Subscribe method.protocol/streaming/full_node_streaming_manager_test.go (3)
257-261
: LGTM! Well-structured test case definition.The
TestCase
struct provides a clear and maintainable way to define test scenarios.
318-454
: LGTM! Comprehensive test coverage.The test suite thoroughly covers:
- Base scenarios for filtering
- Edge cases (empty updates, no IDs)
- Different update types (orderbook fills, taker orders, subaccount updates, price updates)
- Multiple subaccount combinations
456-477
: LGTM! Robust test execution.The test implementation:
- Properly handles channel cleanup with defer
- Uses appropriate timeouts for no-message scenarios
- Includes clear assertions
indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts (3)
281-283
: LGTM! Clear field documentation.The new field is well-documented with a clear description of its purpose.
1307-1308
: LGTM! Proper protobuf encoding implementation.The encoding logic correctly:
- Initializes the field with a default value of
false
- Only encodes the field when it's
true
Also applies to: 1333-1336
1380-1382
: LGTM! Proper protobuf decoding implementation.The decoding logic correctly handles the new boolean field.
tests := map[string]struct { | ||
update ocutypes.OffChainUpdateV1 | ||
id uint32 | ||
err error | ||
}{ | ||
"OrderPlace": { | ||
update: ocutypes.OffChainUpdateV1{ | ||
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderPlace{ | ||
OrderPlace: &ocutypes.OrderPlaceV1{ | ||
Order: &order, | ||
PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_BEST_EFFORT_OPENED, | ||
TimeStamp: _ToPtr(orderPlaceTime), | ||
}, | ||
}, | ||
}, | ||
id: subaccountIdNumber, | ||
err: nil, | ||
}, | ||
"OrderRemove": { | ||
update: ocutypes.OffChainUpdateV1{ | ||
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderRemove{ | ||
OrderRemove: &ocutypes.OrderRemoveV1{ | ||
RemovedOrderId: &orderId, | ||
Reason: stypes.OrderRemovalReason_ORDER_REMOVAL_REASON_USER_CANCELED, | ||
RemovalStatus: ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_CANCELED, | ||
TimeStamp: _ToPtr(orderPlaceTime.Add(1 * time.Second)), | ||
}, | ||
}, | ||
}, | ||
id: subaccountIdNumber, | ||
err: nil, | ||
}, | ||
"OrderUpdate": { | ||
update: ocutypes.OffChainUpdateV1{ | ||
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderUpdate{ | ||
OrderUpdate: &ocutypes.OrderUpdateV1{ | ||
OrderId: &orderId, | ||
TotalFilledQuantums: fillQuantums, | ||
}, | ||
}, | ||
}, | ||
id: subaccountIdNumber, | ||
err: nil, | ||
}, | ||
"OrderReplace": { | ||
update: ocutypes.OffChainUpdateV1{ | ||
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderReplace{ | ||
OrderReplace: &ocutypes.OrderReplaceV1{ | ||
OldOrderId: &orderId, | ||
Order: &newOrder, | ||
PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_OPENED, | ||
TimeStamp: _ToPtr(orderPlaceTime.Add(3 * time.Second)), | ||
}, | ||
}, | ||
}, | ||
id: subaccountIdNumber, | ||
err: nil, | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add negative test cases.
The test suite only covers happy paths. Consider adding test cases for:
- Nil UpdateMessage
- Unsupported message types
- Nil inner objects (Order, OrderId, etc.)
Example additional test cases:
"Nil UpdateMessage": {
update: ocutypes.OffChainUpdateV1{
UpdateMessage: nil,
},
id: 0,
err: fmt.Errorf(ErrUnsupportedUpdateMessage, nil),
},
"Unsupported Message Type": {
update: ocutypes.OffChainUpdateV1{
UpdateMessage: &ocutypes.OffChainUpdateV1_TradeUpdate{},
},
id: 0,
err: fmt.Errorf(ErrUnsupportedUpdateMessage, &ocutypes.OffChainUpdateV1_TradeUpdate{}),
},
// If filterOrders, listen to filtered channel and start filter goroutine | ||
// Error if filterOrders but no subaccounts are subscribed | ||
filteredUpdateChannel := subscription.updatesChannel | ||
if filterOrders { | ||
if len(subaccountIds) == 0 { | ||
sm.logger.Error( | ||
fmt.Sprintf( | ||
"filterOrders requires subaccountIds for subscription id: %+v", | ||
subscription.subscriptionId, | ||
), | ||
) | ||
} else { | ||
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize) | ||
defer close(filteredUpdateChannel) | ||
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Prevent execution when filterOrders
is true without subaccountIds
When filterOrders
is true but no subaccountIds
are provided, the code logs an error but continues execution. This can lead to subscribers receiving unfiltered order updates, contrary to their request. It's advisable to return an error or terminate the subscription to prevent unintended behavior.
Apply this diff to return an error when filterOrders
is requested without subaccountIds
:
if filterOrders {
if len(subaccountIds) == 0 {
sm.logger.Error(
fmt.Sprintf(
"filterOrders requires subaccountIds for subscription id: %+v",
subscription.subscriptionId,
),
)
+ return types.ErrInvalidStreamingRequest
} else {
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize)
defer close(filteredUpdateChannel)
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger)
}
}
Committable suggestion skipped: line range outside the PR's diff.
|
||
// Filter order updates in addition to position updates | ||
bool filter_orders = 4; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Implementation of filter_orders
parameter appears incomplete
The filter_orders
parameter has been added to the protocol definition and is properly passed through the interface chain, but there's no evidence of actual filtering logic implementation in the codebase. This could lead to the parameter being ineffective.
protocol/streaming/full_node_streaming_manager.go
: The streaming manager accepts the parameter but doesn't appear to use it for filtering order updates- Consider implementing the filtering logic in
AddOrderUpdatesToCache
or related methods where updates are processed
🔗 Analysis chain
Verify integration with existing streaming functionality.
The field addition looks correct, but let's verify its integration with the existing streaming system.
Run the following to check for consistent handling across the codebase:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the integration of filter_orders field with streaming functionality
# Check for corresponding handler implementations
rg -g '*.{go,proto}' -A 5 'StreamOrderbookUpdates.*Request'
# Check for any existing filtering logic that might need updates
rg -g '*.go' -A 10 'func.*Subscribe'
# Look for tests covering the new functionality
rg -g '*_test.{go,proto}' -A 5 'filter.*orders'
Length of output: 14812
Script:
#!/bin/bash
# Check for filtering logic implementation in streaming manager
rg -g '*.go' -A 15 'filterOrders.*bool' protocol/streaming/
# Look for any tests verifying the filtering functionality
rg -g '*_test.go' -B 5 -A 10 'filterOrders.*true' protocol/streaming/
Length of output: 2484
Script:
#!/bin/bash
# Look for order update streaming implementation
rg -g '*.go' -B 5 -A 10 'SendStreamUpdates|SendOrderbookUpdates' protocol/streaming/
# Check for any filtering logic in the update sending code
rg -g '*.go' -B 5 -A 10 'func.*Update.*Order' protocol/streaming/
# Look for any order-related streaming logic
ast-grep --pattern 'if $filterOrders {
$$$
}'
Length of output: 7169
Script:
#!/bin/bash
# Look for message sending implementation
rg -g '*.go' -B 5 -A 15 'func.*Send.*Message' protocol/streaming/
# Check subscriber management and cache handling
rg -g '*.go' -B 5 -A 15 'AddOrderUpdatesToCache|HandleNewSubscriber' protocol/streaming/
# Look for any references to filter_orders in the implementation
rg -g '*.go' -l 'filter.*orders' protocol/streaming/
Length of output: 6862
Changelist
Full Node Streaming provides an initial state and streaming updates for positions, orders, prices and fills.
The subscription API admits an optional sub account ID filter, which is only applied to the initial positions and position changes.
The user would like to apply the sub account ID filter to the order messages, in addition to position messages.
The change will add boolean flags to the Websocket and GRPC streaming API's:
For all endpoints, the previous behavior of not filtering orders for subaccounts is preserved by default.
If filtering orders is not specified, the code path remains the same for looping over stream updates.
If filtering orders is specified, the subscription's updates channel is replaced with a channel receiving
filtered updates from a new goroutine, which is only started in the event filtering orders is indicated.
Test Plan
Unit test
Author/Reviewer Checklist
state-breaking
label.indexer-postgres-breaking
label.PrepareProposal
orProcessProposal
, manually add the labelproposal-breaking
.feature:[feature-name]
.backport/[branch-name]
.refactor
,chore
,bug
.Summary by CodeRabbit
New Features
Improvements
Technical Updates
filterOrders
parameter