Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Increase FNS default numbers and close connections properly with reasons #2634

Merged
merged 3 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions protocol/app/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ const (

DefaultGrpcStreamingEnabled = false
DefaultGrpcStreamingFlushIntervalMs = 50
DefaultGrpcStreamingMaxBatchSize = 10000
DefaultGrpcStreamingMaxChannelBufferSize = 10000
DefaultGrpcStreamingMaxBatchSize = 100_000
DefaultGrpcStreamingMaxChannelBufferSize = 100_000
DefaultWebsocketStreamingEnabled = false
DefaultWebsocketStreamingPort = 9092
DefaultFullNodeStreamingSnapshotInterval = 0
Expand Down
35 changes: 20 additions & 15 deletions protocol/streaming/ws/websocket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
const (
CLOB_PAIR_IDS_QUERY_PARAM = "clobPairIds"
MARKET_IDS_QUERY_PARAM = "marketIds"

CLOSE_DEADLINE = 5 * time.Second
)

var upgrader = websocket.Upgrader{
Expand Down Expand Up @@ -68,33 +70,24 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) {
// Parse clobPairIds from query parameters
clobPairIds, err := parseUint32(r, CLOB_PAIR_IDS_QUERY_PARAM)
if err != nil {
ws.logger.Error(
"Error parsing clobPairIds",
"err", err,
)
http.Error(w, err.Error(), http.StatusBadRequest)
ws.logger.Error("Error parsing clobPairIds", "err", err)
sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error())
return
}

// Parse marketIds from query parameters
marketIds, err := parseUint32(r, MARKET_IDS_QUERY_PARAM)
if err != nil {
ws.logger.Error(
"Error parsing marketIds",
"err", err,
)
http.Error(w, err.Error(), http.StatusBadRequest)
ws.logger.Error("Error parsing marketIds", "err", err)
sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error())
return
}

// Parse subaccountIds from query parameters
subaccountIds, err := parseSubaccountIds(r)
if err != nil {
ws.logger.Error(
"Error parsing subaccountIds",
"err", err,
)
http.Error(w, err.Error(), http.StatusBadRequest)
ws.logger.Error("Error parsing subaccountIds", "err", err)
sendCloseWithReason(conn, websocket.CloseUnsupportedData, err.Error())
return
}

Expand All @@ -118,10 +111,22 @@ func (ws *WebsocketServer) Handler(w http.ResponseWriter, r *http.Request) {
"Ending handler for websocket connection",
"err", err,
)
sendCloseWithReason(conn, websocket.CloseInternalServerErr, err.Error())
return
}
}

func sendCloseWithReason(conn *websocket.Conn, closeCode int, reason string) error {
closeMessage := websocket.FormatCloseMessage(closeCode, reason)
// Set a write deadline to avoid blocking indefinitely
conn.SetWriteDeadline(time.Now().Add(CLOSE_DEADLINE))
return conn.WriteControl(
websocket.CloseMessage,
closeMessage,
time.Now().Add(CLOSE_DEADLINE),
)
}

// parseSubaccountIds is a helper function to parse the subaccountIds from the query parameters.
func parseSubaccountIds(r *http.Request) ([]*satypes.SubaccountId, error) {
subaccountIdsParam := r.URL.Query().Get("subaccountIds")
Expand Down
Loading