-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[CT-1326] send price updates after block is finalized #2611
Conversation
WalkthroughThe pull request introduces significant enhancements to the application's streaming capabilities by modifying the Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 4
🧹 Outside diff range and nitpick comments (8)
protocol/streaming/types/interface.go (1)
55-58
: LGTM! Consider error handling in implementations.The
SendPriceUpdate
method signature is clean and consistent with other streaming methods in the interface.When implementing this method, ensure proper error handling for scenarios such as:
- Network interruptions during streaming
- Invalid price update data
- Context cancellation or timeouts
protocol/x/prices/keeper/keeper.go (2)
44-44
: Consider adding nil validation for streamingManagerWhile the streaming manager is correctly added to the constructor, consider adding nil validation to prevent potential runtime issues.
func NewKeeper( ... streamingManager streamingtypes.FullNodeStreamingManager, ) *Keeper { + if streamingManager == nil { + panic("streamingManager cannot be nil") + } return &Keeper{ ... streamingManager: streamingManager, } }Also applies to: 55-55
75-77
: Add documentation for the exported methodConsider adding a documentation comment to describe the purpose and usage of this exported method.
+// GetFullNodeStreamingManager returns the streaming manager instance used for price updates. +// The streaming manager is responsible for broadcasting price updates after block finalization. func (k Keeper) GetFullNodeStreamingManager() streamingtypes.FullNodeStreamingManager { return k.streamingManager }protocol/x/prices/keeper/market_price.go (1)
101-102
: Consider optimizing streaming checksThe streaming manager checks (
Enabled()
andTracksMarketId()
) are performed inside the price update loop. This could be optimized by checkingEnabled()
once before the loop.Consider this optimization:
+ // Check if streaming is enabled before processing updates + streamingEnabled := k.GetFullNodeStreamingManager().Enabled() // Writes to the store are delayed so that the updates are atomically applied to state. for _, marketPrice := range updatedMarketPrices { // ... existing store update code ... - if k.GetFullNodeStreamingManager().Enabled() { + if streamingEnabled { if k.GetFullNodeStreamingManager().TracksMarketId(marketPrice.Id) { // ... streaming code ... } } }protocol/streaming/full_node_streaming_manager.go (4)
200-201
: Consider providing a more descriptive error messageThe current error returned is
types.ErrInvalidStreamingRequest
. It would be more helpful to provide a specific error message indicating that at least one ofclobPairIds
,subaccountIds
, ormarketIds
must be provided for a valid subscription.Apply this diff to improve the error message:
func (sm *FullNodeStreamingManagerImpl) Subscribe( clobPairIds []uint32, subaccountIds []*satypes.SubaccountId, marketIds []uint32, messageSender types.OutgoingMessageSender, ) ( err error, ) { // Perform some basic validation on the request. if len(clobPairIds) == 0 && len(subaccountIds) == 0 && len(marketIds) == 0 { - return types.ErrInvalidStreamingRequest + return fmt.Errorf("at least one of clobPairIds, subaccountIds, or marketIds must be provided") }
496-496
: Align method comment with method nameThe method comment uses
SendPriceUpdates
, but the method name isSendPriceUpdate
. For clarity and consistency, the comment should match the method name.Apply this diff to correct the comment:
-// SendPriceUpdates sends price updates to the subscribers. +// SendPriceUpdate sends a price update to the subscribers.
819-819
: Correct the comment to reflect price updatesThe comment incorrectly mentions 'subaccount updates' instead of 'price updates'. It should accurately describe the function's purpose.
Apply this diff to correct the comment:
- // Group subaccount updates by subaccount id. + // Group price updates by market ID.
1113-1113
: Correct the comment to reflect price updatesThe comment refers to 'finalized subaccount updates' but should refer to 'finalized price updates'.
Apply this diff to correct the comment:
- // Finally, cache updates for finalized subaccount updates + // Finally, cache updates for finalized price updates
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (8)
protocol/app/app.go
(1 hunks)protocol/lib/metrics/metric_keys.go
(1 hunks)protocol/streaming/full_node_streaming_manager.go
(10 hunks)protocol/streaming/noop_streaming_manager.go
(2 hunks)protocol/streaming/types/interface.go
(1 hunks)protocol/testutil/keeper/prices.go
(2 hunks)protocol/x/prices/keeper/keeper.go
(5 hunks)protocol/x/prices/keeper/market_price.go
(1 hunks)
🔇 Additional comments (12)
protocol/streaming/types/interface.go (1)
63-63
: LGTM! Clean addition to tracking capabilities.
The TracksMarketId
method complements the existing tracking functionality and follows a consistent pattern with TracksSubaccountId
.
protocol/x/prices/keeper/keeper.go (2)
14-14
: LGTM!
The streaming types import is correctly added and follows Go conventions.
28-29
: Verify streaming manager usage across the codebase
The streaming manager field is correctly added to the Keeper struct. Let's verify its usage pattern across the codebase to ensure consistent integration.
✅ Verification successful
Based on the search results, I can now provide a final response:
Streaming manager is properly integrated and used across the codebase
The streaming manager field is correctly integrated with consistent usage patterns:
- Properly initialized in NewKeeper constructors across multiple modules (prices, clob, subaccounts)
- Accessed through getter methods
GetFullNodeStreamingManager()
in each keeper - Used appropriately in critical flows:
- CLOB module uses it for streaming orderbook fills and batch updates
- Proper nil/enabled checks before streaming (
if streamingManager := k.GetFullNodeStreamingManager(); streamingManager.Enabled()
) - Used in initialization of new streams with proper context
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check how the streaming manager is used in price-related operations
# Expected: Find calls to streamingManager in price update logic
# Look for price update methods that should use streaming manager
ast-grep --pattern 'func $$$UpdatePrice$$$ {
$$$
streamingManager$$$
$$$
}'
# Check for any potential nil checks on streaming manager
rg 'streamingManager\s*[!=]=\s*nil' -t go
Length of output: 127
Script:
#!/bin/bash
# Let's broaden our search to find all usages of streamingManager
# Find all references to streamingManager
rg "streamingManager" -A 3 -B 3 --type go
# Find any method calls on streamingManager
ast-grep --pattern 'k.streamingManager.$_($$$)'
# Look for initialization of streamingManager in NewKeeper
ast-grep --pattern 'func NewKeeper($$$) *Keeper {
$$$
streamingManager: $_,
$$$
}'
Length of output: 20847
protocol/streaming/noop_streaming_manager.go (2)
97-101
: LGTM! Verify consistency with actual implementation.
The empty implementation follows the no-op pattern correctly.
Let's verify the actual implementation's signature to ensure this no-op version matches:
✅ Verification successful
Signature matches perfectly with the no-op implementation
The verification confirms that the SendPriceUpdate
method signature in the no-op implementation exactly matches the actual implementation in FullNodeStreamingManagerImpl
:
- Both accept identical parameters:
ctx sdk.Context
andpriceUpdate pricestypes.StreamPriceUpdate
- Both have the same return type (void)
The empty implementation in NoopGrpcStreamingManager
correctly follows the interface contract while implementing the no-op pattern.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Compare with the actual implementation's signature
ast-grep --pattern 'func ($_ *FullNodeStreamingManagerImpl) SendPriceUpdate($_, $_) {
$$$
}'
Length of output: 2085
57-59
: LGTM! Verify interface compliance.
The implementation follows the no-op pattern correctly by always returning false
.
Let's verify this method is properly defined in the interface:
✅ Verification successful
Implementation matches interface correctly
The TracksMarketId
method in NoopGrpcStreamingManager
correctly implements the method defined in the FullNodeStreamingManager
interface with the exact same signature TracksMarketId(marketId uint32) bool
. The no-op implementation returning false
is consistent with the pattern used throughout this no-op implementation.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the method signature in the FullNodeStreamingManager interface
ast-grep --pattern 'interface FullNodeStreamingManager {
$$$
TracksMarketId($_) $_
$$$
}'
Length of output: 97
Script:
#!/bin/bash
# Let's try to find the interface definition using ripgrep
rg "type\s+FullNodeStreamingManager\s+interface" -A 10
Length of output: 699
Script:
#!/bin/bash
# Let's get more context of the interface definition
rg "type\s+FullNodeStreamingManager\s+interface" -A 30
Length of output: 2241
Script:
#!/bin/bash
# Let's get even more context to find TracksMarketId method
rg "type\s+FullNodeStreamingManager\s+interface" -A 50
Length of output: 3477
Script:
#!/bin/bash
# Let's get the complete interface definition to find TracksMarketId
rg "type\s+FullNodeStreamingManager\s+interface" -A 100
Length of output: 4387
protocol/x/prices/keeper/market_price.go (1)
99-112
: Verify timing of price update streaming
The PR objective mentions sending price updates "after block is finalized", but the current implementation sends updates during block execution in UpdateMarketPrices
. Please verify if this meets the requirements or if we need to move this to a post-commit hook.
Let's check how other modules handle post-finalization events:
protocol/testutil/keeper/prices.go (2)
7-7
: LGTM: Import statement correctly added
The new import for the streaming package is properly placed and aligns with the PR's objective of implementing price update streaming functionality.
128-128
: LGTM: Appropriate use of no-op streaming manager in test utilities
The integration of NewNoopGrpcStreamingManager()
is appropriate for test scenarios, as it provides a null implementation that won't interfere with test execution while satisfying the keeper's dependencies.
Let's verify the streaming manager implementation in the actual keeper:
✅ Verification successful
Based on the search results, I can now generate the final response:
LGTM: Appropriate use of no-op streaming manager in test utilities
The use of NewNoopGrpcStreamingManager()
in test utilities is correct. The search results confirm that:
- The actual prices keeper uses a proper streaming manager implementation (
protocol/x/prices/keeper/keeper.go
) - The streaming manager is a crucial component in the production system, used across multiple modules (clob, prices, subaccounts)
- The no-op implementation is specifically designed for test scenarios where streaming functionality is not needed
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify the streaming manager implementation in the prices keeper
# Expected: Find the actual keeper implementation showing how streaming manager is used
# Search for the keeper implementation
ast-grep --pattern 'type Keeper struct {
$$$
streamingManager $_
$$$
}'
# Search for streaming manager usage in price updates
rg -A 5 'streamingManager.*UpdatePrice'
Length of output: 121
Script:
#!/bin/bash
# Let's try a broader search to understand the streaming manager usage
# Search for the Keeper struct definition in prices package
rg -l "type Keeper struct" --type go
# Search for any streaming manager related code
rg -A 5 "streamingManager" --type go
# Search for streaming package imports
rg -l "streaming" --type go
Length of output: 14273
protocol/streaming/full_node_streaming_manager.go (3)
575-581
: Method TracksMarketId
implemented correctly
The TracksMarketId
method correctly checks if a market ID is being tracked by the streaming manager. The use of locking ensures thread safety.
811-834
: Function getStreamUpdatesForPriceUpdates
is well-implemented
The function correctly creates stream updates for price updates and collects the associated market IDs.
Line range hint 1069-1120
: Integration of price updates in batch streaming is correct
The addition of handling for finalizedPriceUpdates
in StreamBatchUpdatesAfterFinalizeBlock
properly integrates price updates into the streaming flow after block finalization.
protocol/app/app.go (1)
1013-1013
: Integration of FullNodeStreamingManager
into PricesKeeper
looks good
The addition of app.FullNodeStreamingManager
to PricesKeeper
enables streaming capabilities for price updates. This enhancement appears well-implemented and should facilitate real-time data handling as intended.
// If GRPC streaming is on, emit a price update to stream. | ||
if k.GetFullNodeStreamingManager().Enabled() { | ||
if k.GetFullNodeStreamingManager().TracksMarketId(marketPrice.Id) { | ||
k.GetFullNodeStreamingManager().SendPriceUpdate( | ||
ctx, | ||
types.StreamPriceUpdate{ | ||
MarketId: marketPrice.Id, | ||
Price: marketPrice, | ||
Snapshot: false, | ||
}, | ||
) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add error handling for SendPriceUpdate
The SendPriceUpdate
call lacks error handling. If the streaming operation fails, we should at least log the error to maintain observability.
Consider applying this change:
- k.GetFullNodeStreamingManager().SendPriceUpdate(
- ctx,
- types.StreamPriceUpdate{
- MarketId: marketPrice.Id,
- Price: marketPrice,
- Snapshot: false,
- },
- )
+ if err := k.GetFullNodeStreamingManager().SendPriceUpdate(
+ ctx,
+ types.StreamPriceUpdate{
+ MarketId: marketPrice.Id,
+ Price: marketPrice,
+ Snapshot: false,
+ },
+ ); err != nil {
+ k.Logger(ctx).Error(
+ "failed to send price update stream",
+ "market_id", marketPrice.Id,
+ "error", err,
+ )
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// If GRPC streaming is on, emit a price update to stream. | |
if k.GetFullNodeStreamingManager().Enabled() { | |
if k.GetFullNodeStreamingManager().TracksMarketId(marketPrice.Id) { | |
k.GetFullNodeStreamingManager().SendPriceUpdate( | |
ctx, | |
types.StreamPriceUpdate{ | |
MarketId: marketPrice.Id, | |
Price: marketPrice, | |
Snapshot: false, | |
}, | |
) | |
} | |
} | |
// If GRPC streaming is on, emit a price update to stream. | |
if k.GetFullNodeStreamingManager().Enabled() { | |
if k.GetFullNodeStreamingManager().TracksMarketId(marketPrice.Id) { | |
if err := k.GetFullNodeStreamingManager().SendPriceUpdate( | |
ctx, | |
types.StreamPriceUpdate{ | |
MarketId: marketPrice.Id, | |
Price: marketPrice, | |
Snapshot: false, | |
}, | |
); err != nil { | |
k.Logger(ctx).Error( | |
"failed to send price update stream", | |
"market_id", marketPrice.Id, | |
"error", err, | |
) | |
} | |
} | |
} |
@@ -74,6 +74,7 @@ const ( | |||
GrpcSendOrderbookUpdatesLatency = "grpc_send_orderbook_updates_latency" | |||
GrpcSendOrderbookSnapshotLatency = "grpc_send_orderbook_snapshot_latency" | |||
GrpcSendSubaccountUpdateCount = "grpc_send_subaccount_update_count" | |||
GrpcSendPriceUpdateCount = "grpc_send_price_update_count" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
The metric is not being incremented when sending price updates
The GrpcSendPriceUpdateCount
metric is defined but not being used in the codebase. The SendPriceUpdate
method in FullNodeStreamingManagerImpl
should increment this metric similar to how GrpcSendSubaccountUpdateCount
is incremented in other streaming operations.
protocol/streaming/full_node_streaming_manager.go
: Add metric increment inSendPriceUpdate
method
🔗 Analysis chain
LGTM! The metric key follows established guidelines.
The new metric constant GrpcSendPriceUpdateCount
is well-placed in the gRPC metrics section and follows the naming conventions. Its purpose aligns with the PR's objective of sending price updates after block finalization.
Let's verify the metric's usage in the codebase:
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Check if the new metric is properly utilized in the price update functionality
# Expected: The metric should be used in the streaming manager's price update logic
# Search for metric usage
rg "GrpcSendPriceUpdateCount" --type go
# Search for related price update functionality
rg "SendPriceUpdate.*metrics\." --type go
Length of output: 199
Script:
#!/bin/bash
# Let's broaden our search to find price update-related functionality and metrics
# Search for price update-related code
rg "PriceUpdate" --type go -B 2 -A 2
# Search for any metrics increment in gRPC-related files
fd "grpc" --type f --extension go --exec rg -l "metrics\.(Inc|Add)"
Length of output: 116379
Script:
#!/bin/bash
# Let's search for the usage of the new metric in gRPC-related code
rg "GrpcSendPriceUpdateCount" --type go -B 2 -A 2
# Search for price update streaming functionality in gRPC server code
fd "grpc" --type f --extension go --exec rg -l "SendPriceUpdate"
# Search for metrics usage in streaming manager
rg "metrics\." --type go -B 2 -A 2 ./protocol/streaming/
Length of output: 9331
Script:
#!/bin/bash
# Let's search for the price update streaming implementation in the streaming manager
rg "SendPriceUpdate" --type go -B 3 -A 3
# Search for price update related code in the streaming manager
rg "PriceUpdate" --type go -B 3 -A 3 ./protocol/streaming/
Length of output: 17108
if !lib.IsDeliverTxMode(ctx) { | ||
// If not `DeliverTx`, return since there is no optimistic price updates. | ||
return | ||
} | ||
|
||
metrics.IncrCounter( | ||
metrics.GrpcSendSubaccountUpdateCount, | ||
1, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Incorrect metrics counter used in SendPriceUpdate
In the SendPriceUpdate
method, the metrics counter GrpcSendSubaccountUpdateCount
is incremented, which is intended for subaccount updates. A dedicated metrics counter for price updates should be used instead.
Apply this diff to fix the metrics counter:
metrics.IncrCounter(
- metrics.GrpcSendSubaccountUpdateCount,
+ metrics.GrpcSendPriceUpdateCount,
1,
)
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
if !lib.IsDeliverTxMode(ctx) { | |
// If not `DeliverTx`, return since there is no optimistic price updates. | |
return | |
} | |
metrics.IncrCounter( | |
metrics.GrpcSendSubaccountUpdateCount, | |
1, | |
) | |
if !lib.IsDeliverTxMode(ctx) { | |
// If not `DeliverTx`, return since there is no optimistic price updates. | |
return | |
} | |
metrics.IncrCounter( | |
metrics.GrpcSendPriceUpdateCount, | |
1, | |
) |
// cacheStreamUpdatesByMarketIdWithLock adds stream updates to cache, | ||
// and store corresponding market ids. | ||
// This method requires the lock and assumes that the lock has already been | ||
// acquired by the caller. | ||
func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock( | ||
streamUpdates []clobtypes.StreamUpdate, | ||
marketIds []uint32, | ||
) { | ||
sm.streamUpdateCache = append(sm.streamUpdateCache, streamUpdates...) | ||
for _, marketId := range marketIds { | ||
sm.streamUpdateSubscriptionCache = append( | ||
sm.streamUpdateSubscriptionCache, | ||
sm.marketIdToSubscriptionIdMapping[marketId], | ||
) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Potential misalignment between stream updates and subscription IDs
In cacheStreamUpdatesByMarketIdWithLock
, there may be a misalignment between streamUpdates
and marketIds
. This can lead to incorrect mapping of updates to subscription IDs if the lengths of streamUpdates
and marketIds
differ or if they are not aligned.
To ensure correct mapping, each update should correspond to its related market ID:
func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock(
streamUpdates []clobtypes.StreamUpdate,
marketIds []uint32,
) {
+ if len(streamUpdates) != len(marketIds) {
+ sm.logger.Error("Mismatch between stream updates and market IDs lengths")
+ return
+ }
for i, update := range streamUpdates {
sm.streamUpdateCache = append(sm.streamUpdateCache, update)
marketId := marketIds[i]
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.marketIdToSubscriptionIdMapping[marketId],
)
}
}
This ensures that each update is matched with the correct market ID and its associated subscriptions.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
// cacheStreamUpdatesByMarketIdWithLock adds stream updates to cache, | |
// and store corresponding market ids. | |
// This method requires the lock and assumes that the lock has already been | |
// acquired by the caller. | |
func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock( | |
streamUpdates []clobtypes.StreamUpdate, | |
marketIds []uint32, | |
) { | |
sm.streamUpdateCache = append(sm.streamUpdateCache, streamUpdates...) | |
for _, marketId := range marketIds { | |
sm.streamUpdateSubscriptionCache = append( | |
sm.streamUpdateSubscriptionCache, | |
sm.marketIdToSubscriptionIdMapping[marketId], | |
) | |
} | |
} | |
// cacheStreamUpdatesByMarketIdWithLock adds stream updates to cache, | |
// and store corresponding market ids. | |
// This method requires the lock and assumes that the lock has already been | |
// acquired by the caller. | |
func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock( | |
streamUpdates []clobtypes.StreamUpdate, | |
marketIds []uint32, | |
) { | |
if len(streamUpdates) != len(marketIds) { | |
sm.logger.Error("Mismatch between stream updates and market IDs lengths") | |
return | |
} | |
for i, update := range streamUpdates { | |
sm.streamUpdateCache = append(sm.streamUpdateCache, update) | |
marketId := marketIds[i] | |
sm.streamUpdateSubscriptionCache = append( | |
sm.streamUpdateSubscriptionCache, | |
sm.marketIdToSubscriptionIdMapping[marketId], | |
) | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems helpful as a sanity check?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@teddyding, yes, adding this check is helpful as a sanity check to ensure that the lengths of streamUpdates
and marketIds
are aligned. This can prevent incorrect mapping of updates to subscription IDs if they differ or are not aligned. Here's the updated function with the check:
func (sm *FullNodeStreamingManagerImpl) cacheStreamUpdatesByMarketIdWithLock(
streamUpdates []clobtypes.StreamUpdate,
marketIds []uint32,
) {
if len(streamUpdates) != len(marketIds) {
sm.logger.Error("Mismatch between stream updates and market IDs lengths")
return
}
for i, update := range streamUpdates {
sm.streamUpdateCache = append(sm.streamUpdateCache, update)
marketId := marketIds[i]
sm.streamUpdateSubscriptionCache = append(
sm.streamUpdateSubscriptionCache,
sm.marketIdToSubscriptionIdMapping[marketId],
)
}
}
This ensures that each update is correctly associated with its corresponding market ID and subscriptions.
} | ||
|
||
metrics.IncrCounter( | ||
metrics.GrpcSendSubaccountUpdateCount, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update metric name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
protocol/streaming/full_node_streaming_manager.go (2)
819-820
: Fix incorrect comment.The comment refers to subaccount updates but this is for price updates.
- // Group subaccount updates by subaccount id. + // Group price updates by market id.
820-822
: Consider pre-allocating slices for better performance.Pre-allocate slices with the known size to avoid reallocations.
- streamUpdates = make([]clobtypes.StreamUpdate, 0) - marketIds = make([]uint32, 0) + streamUpdates = make([]clobtypes.StreamUpdate, 0, len(priceUpdates)) + marketIds = make([]uint32, 0, len(priceUpdates))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
protocol/streaming/full_node_streaming_manager.go
(10 hunks)
🔇 Additional comments (4)
protocol/streaming/full_node_streaming_manager.go (4)
496-521
: LGTM: Price update streaming implementation is correct.
The implementation follows the established pattern for streaming updates and correctly:
- Checks for DeliverTx mode
- Uses the appropriate metrics counter
- Stages events for finalization
575-581
: LGTM: Market ID tracking implementation is consistent.
The implementation follows the same pattern as TracksSubaccountId
with proper lock handling.
1039-1058
: LGTM: Market ID caching implementation is robust.
The implementation includes proper validation and follows the established caching pattern.
1073-1074
: LGTM: Price updates correctly integrated into batch processing.
The changes properly integrate price updates into the finalization and batch processing flow, maintaining consistency with the existing patterns for other update types.
Also applies to: 1117-1124, 1138-1138, 1156-1157, 1172-1172
Changelist
[Describe or list the changes made in this PR]
Test Plan
[Describe how this PR was tested (if applicable)]
Author/Reviewer Checklist
state-breaking
label.indexer-postgres-breaking
label.PrepareProposal
orProcessProposal
, manually add the labelproposal-breaking
.feature:[feature-name]
.backport/[branch-name]
.refactor
,chore
,bug
.Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
These changes significantly improve the application's functionality and user experience by enabling better data streaming and management.