From 0ef04b24997b1f9ec845cf8d3404bc40b9c1344d Mon Sep 17 00:00:00 2001 From: Jay Yu <103467857+jayy04@users.noreply.github.com> Date: Mon, 9 Dec 2024 15:58:57 -0500 Subject: [PATCH 1/3] Increase FNS default numbers and close connections properly with reasons --- protocol/app/flags/flags.go | 4 +-- protocol/streaming/ws/websocket_server.go | 35 +++++++++++++---------- 2 files changed, 22 insertions(+), 17 deletions(-) diff --git a/protocol/app/flags/flags.go b/protocol/app/flags/flags.go index 1a0ad084e15..cd1cfa80f79 100644 --- a/protocol/app/flags/flags.go +++ b/protocol/app/flags/flags.go @@ -72,8 +72,8 @@ const ( DefaultGrpcStreamingEnabled = false DefaultGrpcStreamingFlushIntervalMs = 50 - DefaultGrpcStreamingMaxBatchSize = 10000 - DefaultGrpcStreamingMaxChannelBufferSize = 10000 + DefaultGrpcStreamingMaxBatchSize = 100_000 + DefaultGrpcStreamingMaxChannelBufferSize = 100_000 DefaultWebsocketStreamingEnabled = false DefaultWebsocketStreamingPort = 9092 DefaultFullNodeStreamingSnapshotInterval = 0 diff --git a/protocol/streaming/ws/websocket_server.go b/protocol/streaming/ws/websocket_server.go index 3cbb9219a44..8e6535f42d7 100644 --- a/protocol/streaming/ws/websocket_server.go +++ b/protocol/streaming/ws/websocket_server.go @@ -19,6 +19,8 @@ import ( const ( CLOB_PAIR_IDS_QUERY_PARAM = "clobPairIds" MARKET_IDS_QUERY_PARAM = "marketIds" + + CLOSE_DEADLINE = 5 * time.Second ) var upgrader = websocket.Upgrader{ @@ -68,33 +70,24 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) { // Parse clobPairIds from query parameters clobPairIds, err := parseUint32(r, CLOB_PAIR_IDS_QUERY_PARAM) if err != nil { - ws.logger.Error( - "Error parsing clobPairIds", - "err", err, - ) - http.Error(w, err.Error(), http.StatusBadRequest) + ws.logger.Error("Error parsing clobPairIds", "err", err) + sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()) return } // Parse marketIds from query parameters marketIds, err := parseUint32(r, MARKET_IDS_QUERY_PARAM) if err != nil { - ws.logger.Error( - "Error parsing marketIds", - "err", err, - ) - http.Error(w, err.Error(), http.StatusBadRequest) + ws.logger.Error("Error parsing marketIds", "err", err) + sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()) return } // Parse subaccountIds from query parameters subaccountIds, err := parseSubaccountIds(r) if err != nil { - ws.logger.Error( - "Error parsing subaccountIds", - "err", err, - ) - http.Error(w, err.Error(), http.StatusBadRequest) + ws.logger.Error("Error parsing subaccountIds", "err", err) + sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()) return } @@ -118,10 +111,22 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) { "Ending handler for websocket connection", "err", err, ) + sendCloseWithReason(conn, websocket.CloseInternalServerErr, err.Error()) return } } +func sendCloseWithReason(conn *websocket.Conn, closeCode int, reason string) error { + closeMessage := websocket.FormatCloseMessage(closeCode, reason) + // Set a write deadline to avoid blocking indefinitely + conn.SetWriteDeadline(time.Now().Add(CLOSE_DEADLINE)) + return conn.WriteControl( + websocket.CloseMessage, + closeMessage, + time.Now().Add(CLOSE_DEADLINE), + ) +} + // parseSubaccountIds is a helper function to parse the subaccountIds from the query parameters. func parseSubaccountIds(r *http.Request) ([]*satypes.SubaccountId, error) { subaccountIdsParam := r.URL.Query().Get("subaccountIds") From b62cca1d9aaaeeefb585fc48dad90d3d6ca0721f Mon Sep 17 00:00:00 2001 From: Jay Yu <103467857+jayy04@users.noreply.github.com> Date: Mon, 9 Dec 2024 16:10:05 -0500 Subject: [PATCH 2/3] fix lint --- protocol/streaming/ws/websocket_server.go | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/protocol/streaming/ws/websocket_server.go b/protocol/streaming/ws/websocket_server.go index 8e6535f42d7..33a7434e427 100644 --- a/protocol/streaming/ws/websocket_server.go +++ b/protocol/streaming/ws/websocket_server.go @@ -71,7 +71,9 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) { clobPairIds, err := parseUint32(r, CLOB_PAIR_IDS_QUERY_PARAM) if err != nil { ws.logger.Error("Error parsing clobPairIds", "err", err) - sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()) + if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil { + ws.logger.Error("Error sending close message", "err", err) + } return } @@ -79,7 +81,9 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) { marketIds, err := parseUint32(r, MARKET_IDS_QUERY_PARAM) if err != nil { ws.logger.Error("Error parsing marketIds", "err", err) - sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()) + if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil { + ws.logger.Error("Error sending close message", "err", err) + } return } @@ -87,7 +91,9 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) { subaccountIds, err := parseSubaccountIds(r) if err != nil { ws.logger.Error("Error parsing subaccountIds", "err", err) - sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()) + if err := sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error()); err != nil { + ws.logger.Error("Error sending close message", "err", err) + } return } @@ -111,7 +117,9 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) { "Ending handler for websocket connection", "err", err, ) - sendCloseWithReason(conn, websocket.CloseInternalServerErr, err.Error()) + if err := sendCloseWithReason(conn, websocket.CloseInternalServerErr, err.Error()); err != nil { + ws.logger.Error("Error sending close message", "err", err) + } return } } @@ -119,7 +127,9 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) { func sendCloseWithReason(conn *websocket.Conn, closeCode int, reason string) error { closeMessage := websocket.FormatCloseMessage(closeCode, reason) // Set a write deadline to avoid blocking indefinitely - conn.SetWriteDeadline(time.Now().Add(CLOSE_DEADLINE)) + if err := conn.SetWriteDeadline(time.Now().Add(CLOSE_DEADLINE)); err != nil { + return err + } return conn.WriteControl( websocket.CloseMessage, closeMessage, From 71e72c6aa1021c4ec40802fb0e53cf3b1103a7c8 Mon Sep 17 00:00:00 2001 From: Jay Yu <103467857+jayy04@users.noreply.github.com> Date: Mon, 9 Dec 2024 18:05:46 -0500 Subject: [PATCH 3/3] fix test --- protocol/app/flags/flags_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/protocol/app/flags/flags_test.go b/protocol/app/flags/flags_test.go index 8b3ed1a8f48..b4107335b10 100644 --- a/protocol/app/flags/flags_test.go +++ b/protocol/app/flags/flags_test.go @@ -257,8 +257,8 @@ func TestGetFlagValuesFromOptions(t *testing.T) { expectedGrpcEnable: true, expectedGrpcStreamingEnable: false, expectedGrpcStreamingFlushMs: 50, - expectedGrpcStreamingBatchSize: 10000, - expectedGrpcStreamingMaxChannelBufferSize: 10000, + expectedGrpcStreamingBatchSize: 100_000, + expectedGrpcStreamingMaxChannelBufferSize: 100_000, expectedWebsocketEnabled: false, expectedWebsocketPort: 9092, expectedFullNodeStreamingSnapshotInterval: 0,