From af0ba94bf732fe61f6d7de645a022c7299c30d03 Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Tue, 14 Jan 2025 17:15:28 -0800 Subject: [PATCH 1/8] feat: add promise for connection open --- lazer/sdk/js/examples/index.ts | 131 ++++++++++-------- lazer/sdk/js/src/client.ts | 9 +- .../sdk/js/src/socket/resilient-web-socket.ts | 67 ++++++--- lazer/sdk/js/src/socket/web-socket-pool.ts | 86 +++++------- 4 files changed, 160 insertions(+), 133 deletions(-) diff --git a/lazer/sdk/js/examples/index.ts b/lazer/sdk/js/examples/index.ts index 3f32cf2d4..963e9b206 100644 --- a/lazer/sdk/js/examples/index.ts +++ b/lazer/sdk/js/examples/index.ts @@ -6,65 +6,82 @@ import { PythLazerClient } from "../src/index.js"; // Ignore debug messages console.debug = () => {}; -const client = new PythLazerClient( - ["wss://pyth-lazer.dourolabs.app/v1/stream"], - "access_token", - 3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3. - console // Optionally log socket operations (to the console in this case.) -); +async function main() { + try { + const client = await PythLazerClient.create( + ["wss://pyth-lazer.dourolabs.app/v1/stream"], + "access_token", + 3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3. + console // Optionally log socket operations (to the console in this case.) + ); -client.addMessageListener((message) => { - console.info("got message:", message); - switch (message.type) { - case "json": { - if (message.value.type == "streamUpdated") { - console.info( - "stream updated for subscription", - message.value.subscriptionId, - ":", - message.value.parsed?.priceFeeds - ); + client.addMessageListener((message) => { + console.info("got message:", message); + switch (message.type) { + case "json": { + if (message.value.type == "streamUpdated") { + console.info( + "stream updated for subscription", + message.value.subscriptionId, + ":", + message.value.parsed?.priceFeeds + ); + } + break; + } + case "binary": { + if ("solana" in message.value) { + console.info( + "solana message:", + message.value.solana?.toString("hex") + ); + } + if ("evm" in message.value) { + console.info("evm message:", message.value.evm?.toString("hex")); + } + break; + } } - break; - } - case "binary": { - if ("solana" in message.value) { - console.info("solana message:", message.value.solana?.toString("hex")); - } - if ("evm" in message.value) { - console.info("evm message:", message.value.evm?.toString("hex")); - } - break; - } - } -}); + }); -// Create and remove one or more subscriptions on the fly -await client.subscribe({ - type: "subscribe", - subscriptionId: 1, - priceFeedIds: [1, 2], - properties: ["price"], - chains: ["solana"], - deliveryFormat: "binary", - channel: "fixed_rate@200ms", - parsed: false, - jsonBinaryEncoding: "base64", -}); -await client.subscribe({ - type: "subscribe", - subscriptionId: 2, - priceFeedIds: [1, 2, 3, 4, 5], - properties: ["price"], - chains: ["evm"], - deliveryFormat: "json", - channel: "fixed_rate@200ms", - parsed: true, - jsonBinaryEncoding: "hex", -}); + // Create and remove one or more subscriptions on the fly + await client.subscribe({ + type: "subscribe", + subscriptionId: 1, + priceFeedIds: [1, 2], + properties: ["price"], + chains: ["solana"], + deliveryFormat: "binary", + channel: "fixed_rate@200ms", + parsed: false, + jsonBinaryEncoding: "base64", + }); + await client.subscribe({ + type: "subscribe", + subscriptionId: 2, + priceFeedIds: [1, 2, 3, 4, 5], + properties: ["price"], + chains: ["evm"], + deliveryFormat: "json", + channel: "fixed_rate@200ms", + parsed: true, + jsonBinaryEncoding: "hex", + }); + + await new Promise((resolve) => setTimeout(resolve, 10_000)); -await new Promise((resolve) => setTimeout(resolve, 10_000)); + await client.unsubscribe(1); + await client.unsubscribe(2); -await client.unsubscribe(1); -await client.unsubscribe(2); -client.shutdown(); + await new Promise((resolve) => setTimeout(resolve, 10_000)); + client.shutdown(); + } catch (error) { + console.error("Error initializing client:", error); + process.exit(1); + } +} + +main().catch((error) => { + console.error("Unhandled error:", error); + process.exit(1); +}); diff --git a/lazer/sdk/js/src/client.ts b/lazer/sdk/js/src/client.ts index fc732e5ac..ca7d28517 100644 --- a/lazer/sdk/js/src/client.ts +++ b/lazer/sdk/js/src/client.ts @@ -30,7 +30,7 @@ const UINT32_NUM_BYTES = 4; const UINT64_NUM_BYTES = 8; export class PythLazerClient { - wsp: WebSocketPool; + private constructor(private readonly wsp: WebSocketPool) {} /** * Creates a new PythLazerClient instance. @@ -39,13 +39,14 @@ export class PythLazerClient { * @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream. * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`. */ - constructor( + static async create( urls: string[], token: string, numConnections = 3, logger: Logger = dummyLogger - ) { - this.wsp = new WebSocketPool(urls, token, numConnections, logger); + ): Promise { + const wsp = await WebSocketPool.create(urls, token, numConnections, logger); + return new PythLazerClient(wsp); } addMessageListener(handler: (event: JsonOrBinaryResponse) => void) { diff --git a/lazer/sdk/js/src/socket/resilient-web-socket.ts b/lazer/sdk/js/src/socket/resilient-web-socket.ts index 7b221f324..f6c48e1e8 100644 --- a/lazer/sdk/js/src/socket/resilient-web-socket.ts +++ b/lazer/sdk/js/src/socket/resilient-web-socket.ts @@ -1,21 +1,10 @@ import type { ClientRequestArgs } from "node:http"; - import WebSocket, { type ClientOptions, type ErrorEvent } from "isomorphic-ws"; import type { Logger } from "ts-log"; -// Reconnect with expo backoff if we don't get a message or ping for 10 seconds const HEARTBEAT_TIMEOUT_DURATION = 10_000; +const CONNECTION_TIMEOUT = 5000; -/** - * This class wraps websocket to provide a resilient web socket client. - * - * It will reconnect if connection fails with exponential backoff. Also, it will reconnect - * if it receives no ping request or regular message from server within a while as indication - * of timeout (assuming the server sends either regularly). - * - * This class also logs events if logger is given and by replacing onError method you can handle - * connection errors yourself (e.g: do not retry and close the connection). - */ export class ResilientWebSocket { endpoint: string; wsClient: undefined | WebSocket; @@ -24,10 +13,14 @@ export class ResilientWebSocket { private wsFailedAttempts: number; private heartbeatTimeout: undefined | NodeJS.Timeout; private logger: undefined | Logger; + private connectionPromise: Promise | undefined; + private resolveConnection: (() => void) | undefined; + private rejectConnection: ((error: Error) => void) | undefined; onError: (error: ErrorEvent) => void; onMessage: (data: WebSocket.Data) => void; onReconnect: () => void; + constructor( endpoint: string, wsOptions?: ClientOptions | ClientRequestArgs, @@ -64,23 +57,47 @@ export class ResilientWebSocket { } } - startWebSocket(): void { + async startWebSocket(): Promise { if (this.wsClient !== undefined) { - return; + // If there's an existing connection attempt, wait for it + if (this.connectionPromise) { + return this.connectionPromise; + } + return Promise.resolve(); } this.logger?.info(`Creating Web Socket client`); + // Create a new promise for this connection attempt + this.connectionPromise = new Promise((resolve, reject) => { + this.resolveConnection = resolve; + this.rejectConnection = reject; + }); + + // Set a connection timeout + const timeoutId = setTimeout(() => { + if (this.rejectConnection) { + this.rejectConnection( + new Error(`Connection timeout after ${CONNECTION_TIMEOUT}ms`) + ); + } + }, CONNECTION_TIMEOUT); + this.wsClient = new WebSocket(this.endpoint, this.wsOptions); this.wsUserClosed = false; this.wsClient.addEventListener("open", () => { this.wsFailedAttempts = 0; this.resetHeartbeat(); + clearTimeout(timeoutId); + this.resolveConnection?.(); }); this.wsClient.addEventListener("error", (event) => { this.onError(event); + if (this.rejectConnection) { + this.rejectConnection(new Error("WebSocket connection failed")); + } }); this.wsClient.addEventListener("message", (event) => { @@ -89,24 +106,23 @@ export class ResilientWebSocket { }); this.wsClient.addEventListener("close", () => { + clearTimeout(timeoutId); + if (this.rejectConnection) { + this.rejectConnection(new Error("WebSocket closed before connecting")); + } void this.handleClose(); }); - // Handle ping events if supported (Node.js only) if ("on" in this.wsClient) { - // Ping handler is undefined in browser side this.wsClient.on("ping", () => { this.logger?.info("Ping received"); this.resetHeartbeat(); }); } + + return this.connectionPromise; } - /** - * Reset the heartbeat timeout. This is called when we receive any message (ping or regular) - * from the server. If we don't receive any message within HEARTBEAT_TIMEOUT_DURATION, - * we assume the connection is dead and reconnect. - */ private resetHeartbeat(): void { if (this.heartbeatTimeout !== undefined) { clearTimeout(this.heartbeatTimeout); @@ -145,6 +161,10 @@ export class ResilientWebSocket { } else { this.wsFailedAttempts += 1; this.wsClient = undefined; + this.connectionPromise = undefined; + this.resolveConnection = undefined; + this.rejectConnection = undefined; + const waitTime = expoBackoff(this.wsFailedAttempts); this.logger?.error( @@ -163,7 +183,7 @@ export class ResilientWebSocket { return; } - this.startWebSocket(); + await this.startWebSocket(); await this.waitForMaybeReadyWebSocket(); if (this.wsClient === undefined) { @@ -180,6 +200,9 @@ export class ResilientWebSocket { if (this.wsClient !== undefined) { const client = this.wsClient; this.wsClient = undefined; + this.connectionPromise = undefined; + this.resolveConnection = undefined; + this.rejectConnection = undefined; client.close(); } this.wsUserClosed = true; diff --git a/lazer/sdk/js/src/socket/web-socket-pool.ts b/lazer/sdk/js/src/socket/web-socket-pool.ts index c35d9c6f3..8743c0de2 100644 --- a/lazer/sdk/js/src/socket/web-socket-pool.ts +++ b/lazer/sdk/js/src/socket/web-socket-pool.ts @@ -5,7 +5,6 @@ import { dummyLogger, type Logger } from "ts-log"; import { ResilientWebSocket } from "./resilient-web-socket.js"; import type { Request, Response } from "../protocol.js"; -// Number of redundant parallel WebSocket connections const DEFAULT_NUM_CONNECTIONS = 3; export class WebSocketPool { @@ -14,6 +13,13 @@ export class WebSocketPool { private subscriptions: Map; // id -> subscription Request private messageListeners: ((event: WebSocket.Data) => void)[]; + private constructor(private readonly logger: Logger = dummyLogger) { + this.rwsPool = []; + this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds + this.subscriptions = new Map(); + this.messageListeners = []; + } + /** * Creates a new WebSocketPool instance that uses multiple redundant WebSocket connections for reliability. * Usage semantics are similar to using a regular WebSocket client. @@ -22,22 +28,21 @@ export class WebSocketPool { * @param numConnections - Number of parallel WebSocket connections to maintain (default: 3) * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`. */ - constructor( + static async create( urls: string[], token: string, numConnections: number = DEFAULT_NUM_CONNECTIONS, - private readonly logger: Logger = dummyLogger - ) { + logger: Logger = dummyLogger + ): Promise { if (urls.length === 0) { throw new Error("No URLs provided"); } - // This cache is used to deduplicate messages received across different websocket clients in the pool. - // A TTL cache is used to prevent unbounded memory usage. A very short TTL of 10 seconds is chosen since - // deduplication only needs to happen between messages received very close together in time. - this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds - this.rwsPool = []; - this.subscriptions = new Map(); - this.messageListeners = []; + + const pool = new WebSocketPool(logger); + + // Create all websocket instances + const connectionPromises: Promise[] = []; + for (let i = 0; i < numConnections; i++) { const url = urls[i % urls.length]; if (!url) { @@ -52,36 +57,44 @@ export class WebSocketPool { // If a websocket client unexpectedly disconnects, ResilientWebSocket will reestablish // the connection and call the onReconnect callback. - // When we reconnect, replay all subscription messages to resume the data stream. rws.onReconnect = () => { if (rws.wsUserClosed) { return; } - for (const [, request] of this.subscriptions) { + for (const [, request] of pool.subscriptions) { try { void rws.send(JSON.stringify(request)); } catch (error) { - this.logger.error( + pool.logger.error( "Failed to resend subscription on reconnect:", error ); } } }; + // Handle all client messages ourselves. Dedupe before sending to registered message handlers. - rws.onMessage = this.dedupeHandler; - this.rwsPool.push(rws); + rws.onMessage = pool.dedupeHandler; + pool.rwsPool.push(rws); + + // Start the websocket and collect the promise + connectionPromises.push(rws.startWebSocket()); } - // Let it rip - // TODO: wait for sockets to receive `open` msg before subscribing? - for (const rws of this.rwsPool) { - rws.startWebSocket(); + // Wait for all connections to be established + try { + await Promise.all(connectionPromises); + } catch (error) { + // If any connection fails, clean up and throw + pool.shutdown(); + throw error; } - this.logger.info( - `Using ${numConnections.toString()} redundant WebSocket connections` + pool.logger.info( + `Successfully established ${numConnections.toString()} redundant WebSocket connections` ); + + return pool; } /** @@ -105,23 +118,18 @@ export class WebSocketPool { * multiple connections before forwarding to registered handlers */ dedupeHandler = (data: WebSocket.Data): void => { - // For string data, use the whole string as the cache key. This avoids expensive JSON parsing during deduping. - // For binary data, use the hex string representation as the cache key const cacheKey = typeof data === "string" ? data : Buffer.from(data as Buffer).toString("hex"); - // If we've seen this exact message recently, drop it if (this.cache.has(cacheKey)) { this.logger.debug("Dropping duplicate message"); return; } - // Haven't seen this message, cache it and forward to handlers this.cache.set(cacheKey, true); - // Check for errors in JSON responses if (typeof data === "string") { this.handleErrorMessages(data); } @@ -131,28 +139,18 @@ export class WebSocketPool { } }; - /** - * Sends a message to all websockets in the pool - * @param request - The request to send - */ async sendRequest(request: Request): Promise { - // Send to all websockets in the pool const sendPromises = this.rwsPool.map(async (rws) => { try { await rws.send(JSON.stringify(request)); } catch (error) { this.logger.error("Failed to send request:", error); - throw error; // Re-throw the error + throw error; } }); await Promise.all(sendPromises); } - /** - * Adds a subscription by sending a subscribe request to all websockets in the pool - * and storing it for replay on reconnection - * @param request - The subscription request to send - */ async addSubscription(request: Request): Promise { if (request.type !== "subscribe") { throw new Error("Request must be a subscribe request"); @@ -161,11 +159,6 @@ export class WebSocketPool { await this.sendRequest(request); } - /** - * Removes a subscription by sending an unsubscribe request to all websockets in the pool - * and removing it from stored subscriptions - * @param subscriptionId - The ID of the subscription to remove - */ async removeSubscription(subscriptionId: number): Promise { this.subscriptions.delete(subscriptionId); const request: Request = { @@ -175,17 +168,10 @@ export class WebSocketPool { await this.sendRequest(request); } - /** - * Adds a message handler function to receive websocket messages - * @param handler - Function that will be called with each received message - */ addMessageListener(handler: (data: WebSocket.Data) => void): void { this.messageListeners.push(handler); } - /** - * Elegantly closes all websocket connections in the pool - */ shutdown(): void { for (const rws of this.rwsPool) { rws.closeWebSocket(); From 83ae95ead9e8e8a3402577a91fbe8b86df4e2959 Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Tue, 14 Jan 2025 17:16:53 -0800 Subject: [PATCH 2/8] feat: add promise for all connections down --- lazer/sdk/js/examples/index.ts | 131 +++++++++--------- lazer/sdk/js/src/client.ts | 7 + .../sdk/js/src/socket/resilient-web-socket.ts | 11 ++ lazer/sdk/js/src/socket/web-socket-pool.ts | 44 ++++++ 4 files changed, 129 insertions(+), 64 deletions(-) diff --git a/lazer/sdk/js/examples/index.ts b/lazer/sdk/js/examples/index.ts index 963e9b206..3ca3b3993 100644 --- a/lazer/sdk/js/examples/index.ts +++ b/lazer/sdk/js/examples/index.ts @@ -7,78 +7,81 @@ import { PythLazerClient } from "../src/index.js"; console.debug = () => {}; async function main() { - try { - const client = await PythLazerClient.create( - ["wss://pyth-lazer.dourolabs.app/v1/stream"], - "access_token", - 3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3. - console // Optionally log socket operations (to the console in this case.) - ); + const client = await PythLazerClient.create( + ["wss://pyth-lazer.dourolabs.app/v1/stream"], + "access_token", + 3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3. + console // Optionally log socket operations (to the console in this case.) + ); - client.addMessageListener((message) => { - console.info("got message:", message); - switch (message.type) { - case "json": { - if (message.value.type == "streamUpdated") { - console.info( - "stream updated for subscription", - message.value.subscriptionId, - ":", - message.value.parsed?.priceFeeds - ); - } - break; + // Monitor for all connections being down + client.onAllConnectionsDown().then(() => { + // Handle complete connection failure. + // The connections will keep attempting to reconnect with expo backoff. + // To shutdown the client completely, call shutdown(). + console.error("All connections are down!"); + }); + + client.addMessageListener((message) => { + console.info("got message:", message); + switch (message.type) { + case "json": { + if (message.value.type == "streamUpdated") { + console.info( + "stream updated for subscription", + message.value.subscriptionId, + ":", + message.value.parsed?.priceFeeds + ); + } + break; + } + case "binary": { + if ("solana" in message.value) { + console.info( + "solana message:", + message.value.solana?.toString("hex") + ); } - case "binary": { - if ("solana" in message.value) { - console.info( - "solana message:", - message.value.solana?.toString("hex") - ); - } - if ("evm" in message.value) { - console.info("evm message:", message.value.evm?.toString("hex")); - } - break; + if ("evm" in message.value) { + console.info("evm message:", message.value.evm?.toString("hex")); } + break; } - }); + } + }); - // Create and remove one or more subscriptions on the fly - await client.subscribe({ - type: "subscribe", - subscriptionId: 1, - priceFeedIds: [1, 2], - properties: ["price"], - chains: ["solana"], - deliveryFormat: "binary", - channel: "fixed_rate@200ms", - parsed: false, - jsonBinaryEncoding: "base64", - }); - await client.subscribe({ - type: "subscribe", - subscriptionId: 2, - priceFeedIds: [1, 2, 3, 4, 5], - properties: ["price"], - chains: ["evm"], - deliveryFormat: "json", - channel: "fixed_rate@200ms", - parsed: true, - jsonBinaryEncoding: "hex", - }); + // Create and remove one or more subscriptions on the fly + await client.subscribe({ + type: "subscribe", + subscriptionId: 1, + priceFeedIds: [1, 2], + properties: ["price"], + chains: ["solana"], + deliveryFormat: "binary", + channel: "fixed_rate@200ms", + parsed: false, + jsonBinaryEncoding: "base64", + }); + await client.subscribe({ + type: "subscribe", + subscriptionId: 2, + priceFeedIds: [1, 2, 3, 4, 5], + properties: ["price"], + chains: ["evm"], + deliveryFormat: "json", + channel: "fixed_rate@200ms", + parsed: true, + jsonBinaryEncoding: "hex", + }); - await new Promise((resolve) => setTimeout(resolve, 10_000)); + await new Promise((resolve) => setTimeout(resolve, 10_000)); - await client.unsubscribe(1); - await client.unsubscribe(2); + await client.unsubscribe(1); + await client.unsubscribe(2); - await new Promise((resolve) => setTimeout(resolve, 10_000)); - client.shutdown(); - } catch (error) { - console.error("Error initializing client:", error); - process.exit(1); - } + await new Promise((resolve) => setTimeout(resolve, 10_000)); + client.shutdown(); } main().catch((error) => { diff --git a/lazer/sdk/js/src/client.ts b/lazer/sdk/js/src/client.ts index ca7d28517..05052c7ec 100644 --- a/lazer/sdk/js/src/client.ts +++ b/lazer/sdk/js/src/client.ts @@ -111,6 +111,13 @@ export class PythLazerClient { await this.wsp.sendRequest(request); } + /** + * Returns a promise that resolves when all WebSocket connections are down or attempting to reconnect + */ + onAllConnectionsDown(): Promise { + return this.wsp.onAllConnectionsDown(); + } + shutdown(): void { this.wsp.shutdown(); } diff --git a/lazer/sdk/js/src/socket/resilient-web-socket.ts b/lazer/sdk/js/src/socket/resilient-web-socket.ts index f6c48e1e8..8ee1588d4 100644 --- a/lazer/sdk/js/src/socket/resilient-web-socket.ts +++ b/lazer/sdk/js/src/socket/resilient-web-socket.ts @@ -16,6 +16,15 @@ export class ResilientWebSocket { private connectionPromise: Promise | undefined; private resolveConnection: (() => void) | undefined; private rejectConnection: ((error: Error) => void) | undefined; + private _isReconnecting: boolean = false; + + get isReconnecting(): boolean { + return this._isReconnecting; + } + + get isConnected(): boolean { + return this.wsClient?.readyState === WebSocket.OPEN; + } onError: (error: ErrorEvent) => void; onMessage: (data: WebSocket.Data) => void; @@ -90,6 +99,7 @@ export class ResilientWebSocket { this.wsFailedAttempts = 0; this.resetHeartbeat(); clearTimeout(timeoutId); + this._isReconnecting = false; this.resolveConnection?.(); }); @@ -167,6 +177,7 @@ export class ResilientWebSocket { const waitTime = expoBackoff(this.wsFailedAttempts); + this._isReconnecting = true; this.logger?.error( "Connection closed unexpectedly or because of timeout. Reconnecting after " + String(waitTime) + diff --git a/lazer/sdk/js/src/socket/web-socket-pool.ts b/lazer/sdk/js/src/socket/web-socket-pool.ts index 8743c0de2..99ecd9eb9 100644 --- a/lazer/sdk/js/src/socket/web-socket-pool.ts +++ b/lazer/sdk/js/src/socket/web-socket-pool.ts @@ -12,12 +12,18 @@ export class WebSocketPool { private cache: TTLCache; private subscriptions: Map; // id -> subscription Request private messageListeners: ((event: WebSocket.Data) => void)[]; + private allConnectionsDownListeners: (() => void)[]; + private wasAllDown: boolean = true; private constructor(private readonly logger: Logger = dummyLogger) { this.rwsPool = []; this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds this.subscriptions = new Map(); this.messageListeners = []; + this.allConnectionsDownListeners = []; + + // Start monitoring connection states + setInterval(() => this.checkConnectionStates(), 100); } /** @@ -172,6 +178,43 @@ export class WebSocketPool { this.messageListeners.push(handler); } + /** + * Monitors if all websocket connections are currently down or in reconnecting state + * Returns a promise that resolves when all connections are down + */ + onAllConnectionsDown(): Promise { + return new Promise((resolve) => { + if (this.areAllConnectionsDown()) { + resolve(); + } else { + this.allConnectionsDownListeners.push(resolve); + } + }); + } + + private areAllConnectionsDown(): boolean { + return this.rwsPool.every((ws) => !ws.isConnected || ws.isReconnecting); + } + + private checkConnectionStates(): void { + const allDown = this.areAllConnectionsDown(); + + // If all connections just went down + if (allDown && !this.wasAllDown) { + this.wasAllDown = true; + this.logger.error("All WebSocket connections are down or reconnecting"); + // Notify all listeners + while (this.allConnectionsDownListeners.length > 0) { + const listener = this.allConnectionsDownListeners.shift(); + listener?.(); + } + } + // If at least one connection was restored + if (!allDown && this.wasAllDown) { + this.wasAllDown = false; + } + } + shutdown(): void { for (const rws of this.rwsPool) { rws.closeWebSocket(); @@ -179,5 +222,6 @@ export class WebSocketPool { this.rwsPool = []; this.subscriptions.clear(); this.messageListeners = []; + this.allConnectionsDownListeners = []; } } From ffb94226a44fdf6860be22ca633fc72491278b36 Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Tue, 14 Jan 2025 20:20:07 -0800 Subject: [PATCH 3/8] refactor: rename socket files --- lazer/sdk/js/examples/index.ts | 2 -- .../socket/{resilient-web-socket.ts => resilient-websocket.ts} | 0 .../sdk/js/src/socket/{web-socket-pool.ts => websocket-pool.ts} | 0 3 files changed, 2 deletions(-) rename lazer/sdk/js/src/socket/{resilient-web-socket.ts => resilient-websocket.ts} (100%) rename lazer/sdk/js/src/socket/{web-socket-pool.ts => websocket-pool.ts} (100%) diff --git a/lazer/sdk/js/examples/index.ts b/lazer/sdk/js/examples/index.ts index 3ca3b3993..1390c3f6e 100644 --- a/lazer/sdk/js/examples/index.ts +++ b/lazer/sdk/js/examples/index.ts @@ -79,8 +79,6 @@ async function main() { await client.unsubscribe(1); await client.unsubscribe(2); - - await new Promise((resolve) => setTimeout(resolve, 10_000)); client.shutdown(); } diff --git a/lazer/sdk/js/src/socket/resilient-web-socket.ts b/lazer/sdk/js/src/socket/resilient-websocket.ts similarity index 100% rename from lazer/sdk/js/src/socket/resilient-web-socket.ts rename to lazer/sdk/js/src/socket/resilient-websocket.ts diff --git a/lazer/sdk/js/src/socket/web-socket-pool.ts b/lazer/sdk/js/src/socket/websocket-pool.ts similarity index 100% rename from lazer/sdk/js/src/socket/web-socket-pool.ts rename to lazer/sdk/js/src/socket/websocket-pool.ts From d0bc650f447d771012298c31a7636f1c1e287648 Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Tue, 14 Jan 2025 20:33:10 -0800 Subject: [PATCH 4/8] fix: lint --- lazer/sdk/js/examples/index.ts | 6 +++--- lazer/sdk/js/src/client.ts | 4 ++-- lazer/sdk/js/src/socket/resilient-websocket.ts | 7 ++++--- lazer/sdk/js/src/socket/websocket-pool.ts | 12 +++++++----- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/lazer/sdk/js/examples/index.ts b/lazer/sdk/js/examples/index.ts index 1390c3f6e..3a301029e 100644 --- a/lazer/sdk/js/examples/index.ts +++ b/lazer/sdk/js/examples/index.ts @@ -15,7 +15,7 @@ async function main() { ); // Monitor for all connections being down - client.onAllConnectionsDown().then(() => { + void client.onAllConnectionsDown().then(() => { // Handle complete connection failure. // The connections will keep attempting to reconnect with expo backoff. // To shutdown the client completely, call shutdown(). @@ -82,7 +82,7 @@ async function main() { client.shutdown(); } -main().catch((error) => { +await main().catch((error: unknown) => { console.error("Unhandled error:", error); - process.exit(1); + throw error; }); diff --git a/lazer/sdk/js/src/client.ts b/lazer/sdk/js/src/client.ts index 05052c7ec..827e28747 100644 --- a/lazer/sdk/js/src/client.ts +++ b/lazer/sdk/js/src/client.ts @@ -10,7 +10,7 @@ import { type Response, SOLANA_FORMAT_MAGIC_BE, } from "./protocol.js"; -import { WebSocketPool } from "./socket/web-socket-pool.js"; +import { WebSocketPool } from "./socket/websocket-pool.js"; export type BinaryResponse = { subscriptionId: number; @@ -36,7 +36,7 @@ export class PythLazerClient { * Creates a new PythLazerClient instance. * @param urls - List of WebSocket URLs of the Pyth Lazer service * @param token - The access token for authentication - * @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream. + * @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream. The connections will round-robin across the provided URLs. * @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`. */ static async create( diff --git a/lazer/sdk/js/src/socket/resilient-websocket.ts b/lazer/sdk/js/src/socket/resilient-websocket.ts index 8ee1588d4..0c88c83bd 100644 --- a/lazer/sdk/js/src/socket/resilient-websocket.ts +++ b/lazer/sdk/js/src/socket/resilient-websocket.ts @@ -1,4 +1,5 @@ import type { ClientRequestArgs } from "node:http"; + import WebSocket, { type ClientOptions, type ErrorEvent } from "isomorphic-ws"; import type { Logger } from "ts-log"; @@ -16,7 +17,7 @@ export class ResilientWebSocket { private connectionPromise: Promise | undefined; private resolveConnection: (() => void) | undefined; private rejectConnection: ((error: Error) => void) | undefined; - private _isReconnecting: boolean = false; + private _isReconnecting = false; get isReconnecting(): boolean { return this._isReconnecting; @@ -72,7 +73,7 @@ export class ResilientWebSocket { if (this.connectionPromise) { return this.connectionPromise; } - return Promise.resolve(); + return; } this.logger?.info(`Creating Web Socket client`); @@ -87,7 +88,7 @@ export class ResilientWebSocket { const timeoutId = setTimeout(() => { if (this.rejectConnection) { this.rejectConnection( - new Error(`Connection timeout after ${CONNECTION_TIMEOUT}ms`) + new Error(`Connection timeout after ${String(CONNECTION_TIMEOUT)}ms`) ); } }, CONNECTION_TIMEOUT); diff --git a/lazer/sdk/js/src/socket/websocket-pool.ts b/lazer/sdk/js/src/socket/websocket-pool.ts index 99ecd9eb9..af0757d64 100644 --- a/lazer/sdk/js/src/socket/websocket-pool.ts +++ b/lazer/sdk/js/src/socket/websocket-pool.ts @@ -2,7 +2,7 @@ import TTLCache from "@isaacs/ttlcache"; import WebSocket from "isomorphic-ws"; import { dummyLogger, type Logger } from "ts-log"; -import { ResilientWebSocket } from "./resilient-web-socket.js"; +import { ResilientWebSocket } from "./resilient-websocket.js"; import type { Request, Response } from "../protocol.js"; const DEFAULT_NUM_CONNECTIONS = 3; @@ -13,7 +13,7 @@ export class WebSocketPool { private subscriptions: Map; // id -> subscription Request private messageListeners: ((event: WebSocket.Data) => void)[]; private allConnectionsDownListeners: (() => void)[]; - private wasAllDown: boolean = true; + private wasAllDown = true; private constructor(private readonly logger: Logger = dummyLogger) { this.rwsPool = []; @@ -23,7 +23,9 @@ export class WebSocketPool { this.allConnectionsDownListeners = []; // Start monitoring connection states - setInterval(() => this.checkConnectionStates(), 100); + setInterval(() => { + this.checkConnectionStates(); + }, 100); } /** @@ -179,8 +181,8 @@ export class WebSocketPool { } /** - * Monitors if all websocket connections are currently down or in reconnecting state - * Returns a promise that resolves when all connections are down + * Monitors if all websocket connections are currently down or in reconnecting state. + * Returns a promise that resolves when all connections are down. */ onAllConnectionsDown(): Promise { return new Promise((resolve) => { From 2a520bcb83fa98f1a5d86477556514b4786a5843 Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Tue, 14 Jan 2025 20:33:48 -0800 Subject: [PATCH 5/8] feat: bump ver --- lazer/sdk/js/package.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lazer/sdk/js/package.json b/lazer/sdk/js/package.json index 29a76a74c..95e5a906d 100644 --- a/lazer/sdk/js/package.json +++ b/lazer/sdk/js/package.json @@ -1,6 +1,6 @@ { "name": "@pythnetwork/pyth-lazer-sdk", - "version": "0.2.1", + "version": "0.3.0", "description": "Pyth Lazer SDK", "publishConfig": { "access": "public" From ae740e289ed6b774f1db28c0e5a82fdb37469d8a Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Tue, 14 Jan 2025 20:47:58 -0800 Subject: [PATCH 6/8] feat: better interface for allConnectionsDown events --- lazer/sdk/js/examples/index.ts | 132 ++++++++++------------ lazer/sdk/js/src/client.ts | 14 ++- lazer/sdk/js/src/socket/websocket-pool.ts | 19 +--- 3 files changed, 77 insertions(+), 88 deletions(-) diff --git a/lazer/sdk/js/examples/index.ts b/lazer/sdk/js/examples/index.ts index 3a301029e..d5a1091a2 100644 --- a/lazer/sdk/js/examples/index.ts +++ b/lazer/sdk/js/examples/index.ts @@ -6,83 +6,71 @@ import { PythLazerClient } from "../src/index.js"; // Ignore debug messages console.debug = () => {}; -async function main() { - const client = await PythLazerClient.create( - ["wss://pyth-lazer.dourolabs.app/v1/stream"], - "access_token", - 3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3. - console // Optionally log socket operations (to the console in this case.) - ); +const client = await PythLazerClient.create( + ["wss://pyth-lazer.dourolabs.app/v1/stream"], + "access_token", + 3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3. + console // Optionally log socket operations (to the console in this case.) +); - // Monitor for all connections being down - void client.onAllConnectionsDown().then(() => { - // Handle complete connection failure. - // The connections will keep attempting to reconnect with expo backoff. - // To shutdown the client completely, call shutdown(). - console.error("All connections are down!"); - }); +// Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down) +// The connections may still try to reconnect in the background. To shut down the client completely, call shutdown(). +client.addAllConnectionsDownHandler(() => { + console.error("All connections are down!"); +}); - client.addMessageListener((message) => { - console.info("got message:", message); - switch (message.type) { - case "json": { - if (message.value.type == "streamUpdated") { - console.info( - "stream updated for subscription", - message.value.subscriptionId, - ":", - message.value.parsed?.priceFeeds - ); - } - break; +client.addMessageListener((message) => { + console.info("got message:", message); + switch (message.type) { + case "json": { + if (message.value.type == "streamUpdated") { + console.info( + "stream updated for subscription", + message.value.subscriptionId, + ":", + message.value.parsed?.priceFeeds + ); + } + break; + } + case "binary": { + if ("solana" in message.value) { + console.info("solana message:", message.value.solana?.toString("hex")); } - case "binary": { - if ("solana" in message.value) { - console.info( - "solana message:", - message.value.solana?.toString("hex") - ); - } - if ("evm" in message.value) { - console.info("evm message:", message.value.evm?.toString("hex")); - } - break; + if ("evm" in message.value) { + console.info("evm message:", message.value.evm?.toString("hex")); } + break; } - }); - - // Create and remove one or more subscriptions on the fly - await client.subscribe({ - type: "subscribe", - subscriptionId: 1, - priceFeedIds: [1, 2], - properties: ["price"], - chains: ["solana"], - deliveryFormat: "binary", - channel: "fixed_rate@200ms", - parsed: false, - jsonBinaryEncoding: "base64", - }); - await client.subscribe({ - type: "subscribe", - subscriptionId: 2, - priceFeedIds: [1, 2, 3, 4, 5], - properties: ["price"], - chains: ["evm"], - deliveryFormat: "json", - channel: "fixed_rate@200ms", - parsed: true, - jsonBinaryEncoding: "hex", - }); + } +}); - await new Promise((resolve) => setTimeout(resolve, 10_000)); +// Create and remove one or more subscriptions on the fly +await client.subscribe({ + type: "subscribe", + subscriptionId: 1, + priceFeedIds: [1, 2], + properties: ["price"], + chains: ["solana"], + deliveryFormat: "binary", + channel: "fixed_rate@200ms", + parsed: false, + jsonBinaryEncoding: "base64", +}); +await client.subscribe({ + type: "subscribe", + subscriptionId: 2, + priceFeedIds: [1, 2, 3, 4, 5], + properties: ["price"], + chains: ["evm"], + deliveryFormat: "json", + channel: "fixed_rate@200ms", + parsed: true, + jsonBinaryEncoding: "hex", +}); - await client.unsubscribe(1); - await client.unsubscribe(2); - client.shutdown(); -} +await new Promise((resolve) => setTimeout(resolve, 10_000)); -await main().catch((error: unknown) => { - console.error("Unhandled error:", error); - throw error; -}); +await client.unsubscribe(1); +await client.unsubscribe(2); +client.shutdown(); diff --git a/lazer/sdk/js/src/client.ts b/lazer/sdk/js/src/client.ts index 827e28747..26dd0371e 100644 --- a/lazer/sdk/js/src/client.ts +++ b/lazer/sdk/js/src/client.ts @@ -49,6 +49,12 @@ export class PythLazerClient { return new PythLazerClient(wsp); } + /** + * Adds a message listener that receives either JSON or binary responses from the WebSocket connections. + * The listener will be called for each message received, with deduplication across redundant connections. + * @param handler - Callback function that receives the parsed message. The message can be either a JSON response + * or a binary response containing EVM, Solana, or parsed payload data. + */ addMessageListener(handler: (event: JsonOrBinaryResponse) => void) { this.wsp.addMessageListener((data: WebSocket.Data) => { if (typeof data == "string") { @@ -112,10 +118,12 @@ export class PythLazerClient { } /** - * Returns a promise that resolves when all WebSocket connections are down or attempting to reconnect + * Registers a handler function that will be called whenever all WebSocket connections are down or attempting to reconnect. + * The connections may still try to reconnect in the background. To shut down the pool, call `shutdown()`. + * @param handler - Function to be called when all connections are down */ - onAllConnectionsDown(): Promise { - return this.wsp.onAllConnectionsDown(); + addAllConnectionsDownHandler(handler: () => void): void { + this.wsp.addAllConnectionsDownListener(handler); } shutdown(): void { diff --git a/lazer/sdk/js/src/socket/websocket-pool.ts b/lazer/sdk/js/src/socket/websocket-pool.ts index af0757d64..3ee44ba15 100644 --- a/lazer/sdk/js/src/socket/websocket-pool.ts +++ b/lazer/sdk/js/src/socket/websocket-pool.ts @@ -181,17 +181,11 @@ export class WebSocketPool { } /** - * Monitors if all websocket connections are currently down or in reconnecting state. - * Returns a promise that resolves when all connections are down. + * Calls the handler if all websocket connections are currently down or in reconnecting state. + * The connections may still try to reconnect in the background. */ - onAllConnectionsDown(): Promise { - return new Promise((resolve) => { - if (this.areAllConnectionsDown()) { - resolve(); - } else { - this.allConnectionsDownListeners.push(resolve); - } - }); + addAllConnectionsDownListener(handler: () => void): void { + this.allConnectionsDownListeners.push(handler); } private areAllConnectionsDown(): boolean { @@ -206,9 +200,8 @@ export class WebSocketPool { this.wasAllDown = true; this.logger.error("All WebSocket connections are down or reconnecting"); // Notify all listeners - while (this.allConnectionsDownListeners.length > 0) { - const listener = this.allConnectionsDownListeners.shift(); - listener?.(); + for (const listener of this.allConnectionsDownListeners) { + listener(); } } // If at least one connection was restored From 059e6b5ceff02fc1585458d6ebb1a1cda844dbf9 Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Tue, 14 Jan 2025 20:49:14 -0800 Subject: [PATCH 7/8] fix: docs --- lazer/sdk/js/examples/index.ts | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/lazer/sdk/js/examples/index.ts b/lazer/sdk/js/examples/index.ts index d5a1091a2..0e7d3f13d 100644 --- a/lazer/sdk/js/examples/index.ts +++ b/lazer/sdk/js/examples/index.ts @@ -13,12 +13,7 @@ const client = await PythLazerClient.create( console // Optionally log socket operations (to the console in this case.) ); -// Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down) -// The connections may still try to reconnect in the background. To shut down the client completely, call shutdown(). -client.addAllConnectionsDownHandler(() => { - console.error("All connections are down!"); -}); - +// Read and process messages from the Lazer stream client.addMessageListener((message) => { console.info("got message:", message); switch (message.type) { @@ -45,6 +40,12 @@ client.addMessageListener((message) => { } }); +// Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down) +// The connections may still try to reconnect in the background. To shut down the client completely, call shutdown(). +client.addAllConnectionsDownHandler(() => { + console.error("All connections are down!"); +}); + // Create and remove one or more subscriptions on the fly await client.subscribe({ type: "subscribe", From 1be7862bf856fc5fe05a879cf7efdc5cd161cade Mon Sep 17 00:00:00 2001 From: Tejas Badadare Date: Tue, 14 Jan 2025 20:51:00 -0800 Subject: [PATCH 8/8] fix: naming --- lazer/sdk/js/examples/index.ts | 2 +- lazer/sdk/js/src/client.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lazer/sdk/js/examples/index.ts b/lazer/sdk/js/examples/index.ts index 0e7d3f13d..ff3102f84 100644 --- a/lazer/sdk/js/examples/index.ts +++ b/lazer/sdk/js/examples/index.ts @@ -42,7 +42,7 @@ client.addMessageListener((message) => { // Monitor for all connections in the pool being down simultaneously (e.g. if the internet goes down) // The connections may still try to reconnect in the background. To shut down the client completely, call shutdown(). -client.addAllConnectionsDownHandler(() => { +client.addAllConnectionsDownListener(() => { console.error("All connections are down!"); }); diff --git a/lazer/sdk/js/src/client.ts b/lazer/sdk/js/src/client.ts index 26dd0371e..755b42612 100644 --- a/lazer/sdk/js/src/client.ts +++ b/lazer/sdk/js/src/client.ts @@ -122,7 +122,7 @@ export class PythLazerClient { * The connections may still try to reconnect in the background. To shut down the pool, call `shutdown()`. * @param handler - Function to be called when all connections are down */ - addAllConnectionsDownHandler(handler: () => void): void { + addAllConnectionsDownListener(handler: () => void): void { this.wsp.addAllConnectionsDownListener(handler); }