-
Notifications
You must be signed in to change notification settings - Fork 116
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Increase FNS default numbers and close connections properly with reasons #2634
Conversation
WalkthroughThe changes in this pull request involve modifications to two files: Changes
Possibly related PRs
Suggested labels
Suggested reviewers
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
@Mergifyio backport release/protocol/v8.x |
✅ Backports have been created
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (2)
protocol/streaming/ws/websocket_server.go (1)
119-128
: Consider adding error logging in sendCloseWithReasonWhile the implementation is correct, it would be beneficial to log any errors that occur during the close operation, as they could indicate networking issues or help with debugging.
func sendCloseWithReason(conn *websocket.Conn, closeCode int, reason string) error { closeMessage := websocket.FormatCloseMessage(closeCode, reason) // Set a write deadline to avoid blocking indefinitely conn.SetWriteDeadline(time.Now().Add(CLOSE_DEADLINE)) - return conn.WriteControl( + err := conn.WriteControl( websocket.CloseMessage, closeMessage, time.Now().Add(CLOSE_DEADLINE), ) + if err != nil { + // Use the logger from the WebsocketServer struct + // This would require modifying the function signature to accept the logger + logger.Error("Failed to send close message", "error", err) + } + return err }protocol/app/flags/flags.go (1)
75-76
: Consider documenting the rationale for buffer size increaseWhile the increased buffer sizes align well with the improved WebSocket connection handling, it would be helpful to document:
- The reasoning behind choosing 100,000 as the new default
- Any performance benchmarks that led to this decision
- Guidelines for operators who might need to tune these values
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (2)
protocol/app/flags/flags.go
(1 hunks)protocol/streaming/ws/websocket_server.go
(3 hunks)
🔇 Additional comments (3)
protocol/streaming/ws/websocket_server.go (2)
22-23
: LGTM: Reasonable timeout for connection closure
The 5-second deadline for closing WebSocket connections is a reasonable default that balances between giving enough time for the close message to be sent while preventing indefinite blocking.
73-74
: Improved error handling with proper WebSocket closure
The error handling now properly closes WebSocket connections with specific error reasons instead of just logging errors. This is a significant improvement as it:
- Provides better feedback to clients about why their connection was terminated
- Ensures proper cleanup of WebSocket resources
- Follows WebSocket protocol best practices
Also applies to: 81-82, 89-90, 114-114
protocol/app/flags/flags.go (1)
75-76
: Verify memory impact of increased buffer sizes
The 10x increase in both DefaultGrpcStreamingMaxBatchSize
and DefaultGrpcStreamingMaxChannelBufferSize
could significantly impact memory usage, especially under high load.
Please ensure that:
- The system has sufficient memory to handle these larger buffers
- The increased values align with expected message volumes
- There's proper monitoring in place for memory usage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Outside diff range and nitpick comments (2)
protocol/streaming/ws/websocket_server.go (2)
73-76
: Enhance error logging and error code specificityWhile the error handling is improved with proper connection closure, consider these enhancements:
- Add more context to error logs:
- Use specific close codes for different error types
-ws.logger.Error("Error parsing clobPairIds", "err", err) +ws.logger.Error("Failed to parse clobPairIds query parameter", + "err", err, + "raw_value", r.URL.Query().Get(CLOB_PAIR_IDS_QUERY_PARAM)) -if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil { +// Use specific close codes for different error types +closeCode := websocket.CloseInvalidFramePayloadData +if strings.Contains(err.Error(), "invalid format") { + closeCode = websocket.CloseUnsupportedData +} +if err := sendCloseWithReason(conn, closeCode, err.Error()); err != nil {Also applies to: 83-86, 93-96, 120-122
Line range hint
1-139
: Consider adding WebSocket server robustness featuresThe current implementation could benefit from additional features for production readiness:
- Rate limiting and connection limits to prevent DoS
- Ping/pong mechanism for connection health monitoring
- Graceful shutdown handling in the Handler
Consider implementing:
- A connection limit using a semaphore
- Regular ping/pong checks with timeout
- Context-based cancellation for graceful shutdown
Example ping/pong implementation:
const ( // Add to existing constants pingPeriod = 30 * time.Second pongWait = 60 * time.Second ) func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) { // After upgrading connection conn.SetReadDeadline(time.Now().Add(pongWait)) conn.SetPongHandler(func(string) error { conn.SetReadDeadline(time.Now().Add(pongWait)) return nil }) // Start ping ticker in a separate goroutine ticker := time.NewTicker(pingPeriod) defer ticker.Stop() go func() { for { select { case <-ticker.C: if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(writeWait)); err != nil { return } } } }() }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
protocol/streaming/ws/websocket_server.go
(3 hunks)
🔇 Additional comments (1)
protocol/streaming/ws/websocket_server.go (1)
22-23
: LGTM: Reasonable timeout value for connection closure
The 5-second deadline is appropriate for network operations and follows Go naming conventions.
func sendCloseWithReason(conn *websocket.Conn, closeCode int, reason string) error { | ||
closeMessage := websocket.FormatCloseMessage(closeCode, reason) | ||
// Set a write deadline to avoid blocking indefinitely | ||
if err := conn.SetWriteDeadline(time.Now().Add(CLOSE_DEADLINE)); err != nil { | ||
return err | ||
} | ||
return conn.WriteControl( | ||
websocket.CloseMessage, | ||
closeMessage, | ||
time.Now().Add(CLOSE_DEADLINE), | ||
) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Improve robustness of connection closure handling
The function handles basic connection closure, but could be enhanced for better reliability:
- The same deadline is used twice unnecessarily
- Write deadline should be cleaned up after operation
- Consider adding context for cancellation support
- Error handling could be more comprehensive
-func sendCloseWithReason(conn *websocket.Conn, closeCode int, reason string) error {
+func sendCloseWithReason(ctx context.Context, conn *websocket.Conn, closeCode int, reason string) error {
closeMessage := websocket.FormatCloseMessage(closeCode, reason)
+ deadline := time.Now().Add(CLOSE_DEADLINE)
- // Set a write deadline to avoid blocking indefinitely
- if err := conn.SetWriteDeadline(time.Now().Add(CLOSE_DEADLINE)); err != nil {
+ // Set write deadline and ensure it's cleared
+ if err := conn.SetWriteDeadline(deadline); err != nil {
return fmt.Errorf("failed to set write deadline: %w", err)
}
+ defer conn.SetWriteDeadline(time.Time{})
- return conn.WriteControl(
+ // Use context for cancellation support
+ done := make(chan error, 1)
+ go func() {
+ done <- conn.WriteControl(
websocket.CloseMessage,
closeMessage,
- time.Now().Add(CLOSE_DEADLINE),
- )
+ deadline,
+ )
+ }()
+
+ select {
+ case err := <-done:
+ return err
+ case <-ctx.Done():
+ return ctx.Err()
+ }
}
Update the callers to pass context:
-if err := sendCloseWithReason(conn, closeCode, err.Error()); err != nil {
+if err := sendCloseWithReason(r.Context(), conn, closeCode, err.Error()); err != nil {
Committable suggestion skipped: line range outside the PR's diff.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Outside diff range and nitpick comments (1)
protocol/app/flags/flags_test.go (1)
Line range hint
82-89
: Update gRPC streaming values in TestValidate test casesThe test cases in
TestValidate
still use the old value of 10000 forGrpcStreamingMaxBatchSize
andGrpcStreamingMaxChannelBufferSize
. Consider updating these to match the new default value of 100000 for consistency.Here's an example of the affected test cases that need updating:
- "success - gRPC streaming enabled for validating nodes"
- "success - both grpc and websocket streaming enabled for validating nodes"
Apply this diff to update the values:
"success - gRPC streaming enabled for validating nodes": { flags: flags.Flags{ NonValidatingFullNode: false, GrpcEnable: true, GrpcStreamingEnabled: true, GrpcStreamingFlushIntervalMs: 100, - GrpcStreamingMaxBatchSize: 10000, - GrpcStreamingMaxChannelBufferSize: 10000, + GrpcStreamingMaxBatchSize: 100000, + GrpcStreamingMaxChannelBufferSize: 100000, WebsocketStreamingEnabled: false, }, },Also applies to: 108-116
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
📒 Files selected for processing (1)
protocol/app/flags/flags_test.go
(1 hunks)
🔇 Additional comments (1)
protocol/app/flags/flags_test.go (1)
260-261
: LGTM! Default values correctly updated.
The test case correctly reflects the new default values for gRPC streaming configuration:
expectedGrpcStreamingBatchSize
: 100_000expectedGrpcStreamingMaxChannelBufferSize
: 100_000
Changelist
Test Plan
[Describe how this PR was tested (if applicable)]
Author/Reviewer Checklist
state-breaking
label.indexer-postgres-breaking
label.PrepareProposal
orProcessProposal
, manually add the labelproposal-breaking
.feature:[feature-name]
.backport/[branch-name]
.refactor
,chore
,bug
.Summary by CodeRabbit
New Features
Improvements
Bug Fixes