Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(fullnode) Add filterOrders option to streaming subscription #2676

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

UnbornAztecKing
Copy link

@UnbornAztecKing UnbornAztecKing commented Jan 8, 2025

Changelist

Full Node Streaming provides an initial state and streaming updates for positions, orders, prices and fills.

The subscription API admits an optional sub account ID filter, which is only applied to the initial positions and position changes.

The user would like to apply the sub account ID filter to the order messages, in addition to position messages.

The change will add boolean flags to the Websocket and GRPC streaming API's:

  • filterOrders boolean field for WS request (if not provided, default to False)
  • filter_orders boolean field for StreamOrderbookUpdatesRequest protobuf (if not provided, default to False)

For all endpoints, the previous behavior of not filtering orders for subaccounts is preserved by default.

If filtering orders is not specified, the code path remains the same for looping over stream updates.
If filtering orders is specified, the subscription's updates channel is replaced with a channel receiving
filtered updates from a new goroutine, which is only started in the event filtering orders is indicated.

Test Plan

Unit test

Author/Reviewer Checklist

  • If this PR has changes that result in a different app state given the same prior state and transaction list, manually add the state-breaking label.
  • If the PR has breaking postgres changes to the indexer add the indexer-postgres-breaking label.
  • If this PR isn't state-breaking but has changes that modify behavior in PrepareProposal or ProcessProposal, manually add the label proposal-breaking.
  • If this PR is one of many that implement a specific feature, manually label them all feature:[feature-name].
  • If you wish to for mergify-bot to automatically create a PR to backport your change to a release branch, manually add the label backport/[branch-name].
  • Manually add any of the following labels: refactor, chore, bug.

Summary by CodeRabbit

  • New Features

    • Added an option to filter order updates during streaming
    • Enhanced orderbook subscription with optional order filtering capability
  • Improvements

    • Updated streaming interfaces to support new filtering mechanism
    • Introduced utility functions for managing stream updates
  • Technical Updates

    • Modified protocol buffers and SDK types to include filterOrders parameter
    • Updated WebSocket server to parse new query parameter
    • Added test coverage for new streaming functionality

Copy link
Contributor

coderabbitai bot commented Jan 8, 2025

Walkthrough

This pull request introduces a new boolean parameter filterOrders to the streaming orderbook updates functionality across multiple components of the dYdX protocol. The change enables optional filtering of order updates during streaming, with a default value of false. The modification spans from proto definitions to the WebSocket server, streaming manager, and keeper implementations, ensuring consistent integration of the new filtering capability throughout the system's streaming architecture.

Changes

File Change Summary
indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts Added filterOrders boolean property to StreamOrderbookUpdatesRequest interface and SDK type
proto/dydxprotocol/clob/query.proto Added filter_orders boolean field to StreamOrderbookUpdatesRequest message
protocol/streaming/full_node_streaming_manager.go Added new methods for orderbook subscription, filtering updates, and modified Subscribe method to support filterOrders
protocol/streaming/noop_streaming_manager.go Updated Subscribe method signature to include filterOrders parameter
protocol/streaming/types/interface.go Modified FullNodeStreamingManager interface to include filterOrders in Subscribe method
protocol/streaming/util/util.go Added GetOffChainUpdateV1SubaccountIdNumber utility function
protocol/streaming/ws/websocket_server.go Added parseFilterOrders helper function to handle new query parameter
protocol/x/clob/keeper/grpc_stream_orderbook.go Updated StreamOrderbookUpdates to pass filterOrders parameter

Sequence Diagram

sequenceDiagram
    participant Client
    participant WebSocketServer
    participant StreamingManager
    participant FilterLogic

    Client->>WebSocketServer: Connect with filterOrders=true
    WebSocketServer->>StreamingManager: Subscribe(clobPairIds, subaccountIds, filterOrders)
    StreamingManager->>FilterLogic: Create Subscription
    FilterLogic-->>StreamingManager: Filtered Subscription Created
    StreamingManager-->>WebSocketServer: Subscription Confirmed
    WebSocketServer-->>Client: WebSocket Connection Established
Loading

Possibly related PRs

Suggested reviewers

  • roy-dydx

Poem

🐰 Hop, hop, filtering streams so bright,
Orders dancing left and right,
With filterOrders now in sight,
Our protocol takes a clever flight!
Streaming magic, pure delight! 🌟

Finishing Touches

  • 📝 Generate Docstrings (Beta)

Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR. (Beta)
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@UnbornAztecKing UnbornAztecKing self-assigned this Jan 10, 2025
@UnbornAztecKing UnbornAztecKing marked this pull request as ready for review January 10, 2025 19:02
@UnbornAztecKing UnbornAztecKing requested a review from a team as a code owner January 10, 2025 19:02
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (7)
protocol/streaming/util/util.go (2)

25-26: Improve function documentation.

The current comment is focused on error cases. Consider adding a description of the function's purpose and return values.

-// Error expected if OffChainUpdateV1.UpdateMessage message type is extended to more order events
+// GetOffChainUpdateV1SubaccountIdNumber extracts the SubaccountId.Number from an OffChainUpdateV1 message.
+// Returns the subaccount number and an error if the update message type is not a supported order event
+// (OrderPlace, OrderRemove, OrderUpdate, OrderReplace).

28-42: Consider using constants for error messages.

The error message is hardcoded. Consider defining it as a constant to maintain consistency and make updates easier.

+const (
+    ErrUnsupportedUpdateMessage = "UpdateMessage type not in {OrderPlace, OrderRemove, OrderUpdate, OrderReplace}: %+v"
+)

 switch updateMessage := update.UpdateMessage.(type) {
     // ... case statements ...
     default:
-        return 0, fmt.Errorf(
-            "UpdateMessage type not in {OrderPlace, OrderRemove, OrderUpdate, OrderReplace}: %+v",
-            updateMessage,
-        )
+        return 0, fmt.Errorf(ErrUnsupportedUpdateMessage, updateMessage)
protocol/streaming/util/util_test.go (2)

113-113: Remove debug print statement.

The fmt.Println statement appears to be left from debugging.

-           fmt.Println("expected", id)

21-47: Refactor test data setup.

Consider moving the test data setup into helper functions to improve readability and reusability.

func createTestOrder(subaccountIdNumber uint32) (pv1types.IndexerOrderId, pv1types.IndexerOrder) {
    orderId := pv1types.IndexerOrderId{
        SubaccountId: pv1types.IndexerSubaccountId{
            Owner:  "foo",
            Number: subaccountIdNumber,
        },
        ClientId:   0,
        OrderFlags: 0,
        ClobPairId: 0,
    }
    order := pv1types.IndexerOrder{
        OrderId:  orderId,
        // ... other fields ...
    }
    return orderId, order
}
protocol/streaming/full_node_streaming_manager.go (1)

223-277: Ensure comprehensive error handling and future-proofing for additional update types

In FilterSubaccountStreamUpdates, when GetOffChainUpdateV1SubaccountIdNumber returns an error, the error is logged but not addressed further. Consider whether logging alone is sufficient or if additional error handling is necessary to prevent potential data loss of critical updates.

Moreover, currently, only StreamUpdate_OrderbookUpdate messages are explicitly filtered. If new update message types containing subaccount IDs are introduced in the future, they might bypass filtering. Consider extending the filtering logic to handle other relevant message types to ensure consistent behavior.

protocol/streaming/types/interface.go (1)

19-19: Add documentation for the new filterOrders parameter

Including comments to describe the purpose and usage of the filterOrders parameter in the Subscribe method will improve code readability and assist developers in understanding the subscription behavior.

Apply this diff to add documentation:

 Subscribe(
     clobPairIds []uint32,
     subaccountIds []*satypes.SubaccountId,
     marketIds []uint32,
+    // filterOrders determines whether to filter order updates based on subaccountIds
     filterOrders bool,
     srv OutgoingMessageSender,
 ) (
     err error,
 )
proto/dydxprotocol/clob/query.proto (1)

189-191: Add detailed documentation for the filter_orders field.

The new field would benefit from more comprehensive documentation explaining:

  • Its purpose and behavior
  • The default value implications
  • The relationship with subaccount filtering

Add documentation comments above the field:

  // Market ids for price updates.
  repeated uint32 market_ids = 3;

+  // Indicates whether to filter order updates based on the specified subaccount_ids.
+  // When true, only order updates related to the specified subaccount_ids will be streamed.
+  // When false or unspecified, all order updates will be streamed regardless of subaccount_ids.
+  // This filter works in conjunction with position update filtering specified by subaccount_ids.
  bool filter_orders = 4;
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2a79ca0 and 457459b.

⛔ Files ignored due to path filters (1)
  • protocol/x/clob/types/query.pb.go is excluded by !**/*.pb.go
📒 Files selected for processing (10)
  • indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts (6 hunks)
  • proto/dydxprotocol/clob/query.proto (1 hunks)
  • protocol/streaming/full_node_streaming_manager.go (6 hunks)
  • protocol/streaming/full_node_streaming_manager_test.go (1 hunks)
  • protocol/streaming/noop_streaming_manager.go (1 hunks)
  • protocol/streaming/types/interface.go (1 hunks)
  • protocol/streaming/util/util.go (1 hunks)
  • protocol/streaming/util/util_test.go (1 hunks)
  • protocol/streaming/ws/websocket_server.go (3 hunks)
  • protocol/x/clob/keeper/grpc_stream_orderbook.go (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: test-sim-after-import
  • GitHub Check: test-sim-import-export
  • GitHub Check: test-sim-multi-seed-short
  • GitHub Check: test / run_command
  • GitHub Check: unit-end-to-end-and-integration
  • GitHub Check: test-race
  • GitHub Check: test-coverage-upload
  • GitHub Check: container-tests
  • GitHub Check: Summary
🔇 Additional comments (14)
protocol/streaming/full_node_streaming_manager.go (3)

97-114: Encapsulation of OrderbookSubscription initialization improves code reusability

The new function NewOrderbookSubscription cleanly encapsulates the initialization logic for OrderbookSubscription, enhancing code reuse and maintainability.


116-131: Refactored NewOrderbookSubscription method promotes code simplicity

By delegating to the general NewOrderbookSubscription function, the method within FullNodeStreamingManagerImpl simplifies subscription creation and reduces code duplication.


1182-1187: Correctly caching updates to synchronize local operations queue

The addition of caching syncLocalUpdates ensures that local operations are properly synchronized during StreamBatchUpdatesAfterFinalizeBlock, enhancing the reliability and consistency of the streaming service.

protocol/x/clob/keeper/grpc_stream_orderbook.go (1)

15-15: Inclusion of filterOrders parameter enables order filtering in subscriptions

Adding req.GetFilterOrders() to the Subscribe call correctly integrates the filterOrders option, allowing clients to optionally filter order updates based on their specified criteria.

protocol/streaming/noop_streaming_manager.go (1)

27-27: LGTM!

The no-op implementation correctly implements the updated interface by adding the new filterOrders parameter.

protocol/streaming/ws/websocket_server.go (3)

100-108: LGTM! Error handling is consistent with other parameters.

The implementation follows the established pattern for parameter parsing and error handling.


183-194: LGTM! Helper function follows best practices.

The parseFilterOrders function:

  • Correctly handles empty parameter case
  • Uses appropriate error message format
  • Returns a default value of false when parameter is not provided

123-123: LGTM! Parameter correctly passed to Subscribe method.

The filterOrders parameter is properly passed to the streaming manager's Subscribe method.

protocol/streaming/full_node_streaming_manager_test.go (3)

257-261: LGTM! Well-structured test case definition.

The TestCase struct provides a clear and maintainable way to define test scenarios.


318-454: LGTM! Comprehensive test coverage.

The test suite thoroughly covers:

  • Base scenarios for filtering
  • Edge cases (empty updates, no IDs)
  • Different update types (orderbook fills, taker orders, subaccount updates, price updates)
  • Multiple subaccount combinations

456-477: LGTM! Robust test execution.

The test implementation:

  • Properly handles channel cleanup with defer
  • Uses appropriate timeouts for no-message scenarios
  • Includes clear assertions
indexer/packages/v4-protos/src/codegen/dydxprotocol/clob/query.ts (3)

281-283: LGTM! Clear field documentation.

The new field is well-documented with a clear description of its purpose.


1307-1308: LGTM! Proper protobuf encoding implementation.

The encoding logic correctly:

  • Initializes the field with a default value of false
  • Only encodes the field when it's true

Also applies to: 1333-1336


1380-1382: LGTM! Proper protobuf decoding implementation.

The decoding logic correctly handles the new boolean field.

Comment on lines +51 to +108
tests := map[string]struct {
update ocutypes.OffChainUpdateV1
id uint32
err error
}{
"OrderPlace": {
update: ocutypes.OffChainUpdateV1{
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderPlace{
OrderPlace: &ocutypes.OrderPlaceV1{
Order: &order,
PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_BEST_EFFORT_OPENED,
TimeStamp: _ToPtr(orderPlaceTime),
},
},
},
id: subaccountIdNumber,
err: nil,
},
"OrderRemove": {
update: ocutypes.OffChainUpdateV1{
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderRemove{
OrderRemove: &ocutypes.OrderRemoveV1{
RemovedOrderId: &orderId,
Reason: stypes.OrderRemovalReason_ORDER_REMOVAL_REASON_USER_CANCELED,
RemovalStatus: ocutypes.OrderRemoveV1_ORDER_REMOVAL_STATUS_CANCELED,
TimeStamp: _ToPtr(orderPlaceTime.Add(1 * time.Second)),
},
},
},
id: subaccountIdNumber,
err: nil,
},
"OrderUpdate": {
update: ocutypes.OffChainUpdateV1{
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderUpdate{
OrderUpdate: &ocutypes.OrderUpdateV1{
OrderId: &orderId,
TotalFilledQuantums: fillQuantums,
},
},
},
id: subaccountIdNumber,
err: nil,
},
"OrderReplace": {
update: ocutypes.OffChainUpdateV1{
UpdateMessage: &ocutypes.OffChainUpdateV1_OrderReplace{
OrderReplace: &ocutypes.OrderReplaceV1{
OldOrderId: &orderId,
Order: &newOrder,
PlacementStatus: ocutypes.OrderPlaceV1_ORDER_PLACEMENT_STATUS_OPENED,
TimeStamp: _ToPtr(orderPlaceTime.Add(3 * time.Second)),
},
},
},
id: subaccountIdNumber,
err: nil,
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add negative test cases.

The test suite only covers happy paths. Consider adding test cases for:

  1. Nil UpdateMessage
  2. Unsupported message types
  3. Nil inner objects (Order, OrderId, etc.)

Example additional test cases:

"Nil UpdateMessage": {
    update: ocutypes.OffChainUpdateV1{
        UpdateMessage: nil,
    },
    id:  0,
    err: fmt.Errorf(ErrUnsupportedUpdateMessage, nil),
},
"Unsupported Message Type": {
    update: ocutypes.OffChainUpdateV1{
        UpdateMessage: &ocutypes.OffChainUpdateV1_TradeUpdate{},
    },
    id:  0,
    err: fmt.Errorf(ErrUnsupportedUpdateMessage, &ocutypes.OffChainUpdateV1_TradeUpdate{}),
},

Comment on lines +349 to +366
// If filterOrders, listen to filtered channel and start filter goroutine
// Error if filterOrders but no subaccounts are subscribed
filteredUpdateChannel := subscription.updatesChannel
if filterOrders {
if len(subaccountIds) == 0 {
sm.logger.Error(
fmt.Sprintf(
"filterOrders requires subaccountIds for subscription id: %+v",
subscription.subscriptionId,
),
)
} else {
filteredUpdateChannel = make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize)
defer close(filteredUpdateChannel)
go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger)
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent execution when filterOrders is true without subaccountIds

When filterOrders is true but no subaccountIds are provided, the code logs an error but continues execution. This can lead to subscribers receiving unfiltered order updates, contrary to their request. It's advisable to return an error or terminate the subscription to prevent unintended behavior.

Apply this diff to return an error when filterOrders is requested without subaccountIds:

 if filterOrders {
     if len(subaccountIds) == 0 {
         sm.logger.Error(
             fmt.Sprintf(
                 "filterOrders requires subaccountIds for subscription id: %+v",
                 subscription.subscriptionId,
             ),
         )
+        return types.ErrInvalidStreamingRequest
     } else {
         filteredUpdateChannel = make(chan []clobtypes.StreamUpdate, sm.maxSubscriptionChannelSize)
         defer close(filteredUpdateChannel)
         go subscription.FilterSubaccountStreamUpdates(filteredUpdateChannel, sm.logger)
     }
 }

Committable suggestion skipped: line range outside the PR's diff.

Comment on lines +189 to +191

// Filter order updates in addition to position updates
bool filter_orders = 4;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Implementation of filter_orders parameter appears incomplete

The filter_orders parameter has been added to the protocol definition and is properly passed through the interface chain, but there's no evidence of actual filtering logic implementation in the codebase. This could lead to the parameter being ineffective.

  • protocol/streaming/full_node_streaming_manager.go: The streaming manager accepts the parameter but doesn't appear to use it for filtering order updates
  • Consider implementing the filtering logic in AddOrderUpdatesToCache or related methods where updates are processed
🔗 Analysis chain

Verify integration with existing streaming functionality.

The field addition looks correct, but let's verify its integration with the existing streaming system.

Run the following to check for consistent handling across the codebase:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the integration of filter_orders field with streaming functionality
# Check for corresponding handler implementations
rg -g '*.{go,proto}' -A 5 'StreamOrderbookUpdates.*Request'

# Check for any existing filtering logic that might need updates
rg -g '*.go' -A 10 'func.*Subscribe'

# Look for tests covering the new functionality
rg -g '*_test.{go,proto}' -A 5 'filter.*orders'

Length of output: 14812


Script:

#!/bin/bash
# Check for filtering logic implementation in streaming manager
rg -g '*.go' -A 15 'filterOrders.*bool' protocol/streaming/

# Look for any tests verifying the filtering functionality
rg -g '*_test.go' -B 5 -A 10 'filterOrders.*true' protocol/streaming/

Length of output: 2484


Script:

#!/bin/bash
# Look for order update streaming implementation
rg -g '*.go' -B 5 -A 10 'SendStreamUpdates|SendOrderbookUpdates' protocol/streaming/

# Check for any filtering logic in the update sending code
rg -g '*.go' -B 5 -A 10 'func.*Update.*Order' protocol/streaming/

# Look for any order-related streaming logic
ast-grep --pattern 'if $filterOrders {
  $$$
}'

Length of output: 7169


Script:

#!/bin/bash
# Look for message sending implementation
rg -g '*.go' -B 5 -A 15 'func.*Send.*Message' protocol/streaming/

# Check subscriber management and cache handling
rg -g '*.go' -B 5 -A 15 'AddOrderUpdatesToCache|HandleNewSubscriber' protocol/streaming/

# Look for any references to filter_orders in the implementation
rg -g '*.go' -l 'filter.*orders' protocol/streaming/

Length of output: 6862

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

1 participant