Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(lazer/js-sdk): add promises for open and disconnected #2254

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 75 additions & 55 deletions lazer/sdk/js/examples/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,65 +6,85 @@
// Ignore debug messages
console.debug = () => {};

const client = new PythLazerClient(
["wss://pyth-lazer.dourolabs.app/v1/stream"],
"access_token",
3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3.
console // Optionally log socket operations (to the console in this case.)
);
async function main() {
const client = await PythLazerClient.create(
["wss://pyth-lazer.dourolabs.app/v1/stream"],
"access_token",
3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3.
console // Optionally log socket operations (to the console in this case.)
);

client.addMessageListener((message) => {
console.info("got message:", message);
switch (message.type) {
case "json": {
if (message.value.type == "streamUpdated") {
console.info(
"stream updated for subscription",
message.value.subscriptionId,
":",
message.value.parsed?.priceFeeds
);
}
break;
}
case "binary": {
if ("solana" in message.value) {
console.info("solana message:", message.value.solana?.toString("hex"));
// Monitor for all connections being down
client.onAllConnectionsDown().then(() => {

Check failure on line 18 in lazer/sdk/js/examples/index.ts

View workflow job for this annotation

GitHub Actions / test

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator
// Handle complete connection failure.
// The connections will keep attempting to reconnect with expo backoff.
// To shutdown the client completely, call shutdown().
console.error("All connections are down!");
});

client.addMessageListener((message) => {
console.info("got message:", message);
switch (message.type) {
case "json": {
if (message.value.type == "streamUpdated") {
console.info(
"stream updated for subscription",
message.value.subscriptionId,
":",
message.value.parsed?.priceFeeds
);
}
break;
}
if ("evm" in message.value) {
console.info("evm message:", message.value.evm?.toString("hex"));
case "binary": {
if ("solana" in message.value) {
console.info(
"solana message:",
message.value.solana?.toString("hex")
);
}
if ("evm" in message.value) {
console.info("evm message:", message.value.evm?.toString("hex"));
}
break;
}
break;
}
}
});
});

// Create and remove one or more subscriptions on the fly
await client.subscribe({
type: "subscribe",
subscriptionId: 1,
priceFeedIds: [1, 2],
properties: ["price"],
chains: ["solana"],
deliveryFormat: "binary",
channel: "fixed_rate@200ms",
parsed: false,
jsonBinaryEncoding: "base64",
});
await client.subscribe({
type: "subscribe",
subscriptionId: 2,
priceFeedIds: [1, 2, 3, 4, 5],
properties: ["price"],
chains: ["evm"],
deliveryFormat: "json",
channel: "fixed_rate@200ms",
parsed: true,
jsonBinaryEncoding: "hex",
});
// Create and remove one or more subscriptions on the fly
await client.subscribe({
type: "subscribe",
subscriptionId: 1,
priceFeedIds: [1, 2],
properties: ["price"],
chains: ["solana"],
deliveryFormat: "binary",
channel: "fixed_rate@200ms",
parsed: false,
jsonBinaryEncoding: "base64",
});
await client.subscribe({
type: "subscribe",
subscriptionId: 2,
priceFeedIds: [1, 2, 3, 4, 5],
properties: ["price"],
chains: ["evm"],
deliveryFormat: "json",
channel: "fixed_rate@200ms",
parsed: true,
jsonBinaryEncoding: "hex",
});

await new Promise((resolve) => setTimeout(resolve, 10_000));

await new Promise((resolve) => setTimeout(resolve, 10_000));
await client.unsubscribe(1);
await client.unsubscribe(2);

await client.unsubscribe(1);
await client.unsubscribe(2);
client.shutdown();
await new Promise((resolve) => setTimeout(resolve, 10_000));
client.shutdown();
}

main().catch((error) => {

Check failure on line 87 in lazer/sdk/js/examples/index.ts

View workflow job for this annotation

GitHub Actions / test

Prefer top-level await over using a promise chain

Check failure on line 87 in lazer/sdk/js/examples/index.ts

View workflow job for this annotation

GitHub Actions / test

Prefer the safe `: unknown` for a catch callback variable
console.error("Unhandled error:", error);
process.exit(1);

Check failure on line 89 in lazer/sdk/js/examples/index.ts

View workflow job for this annotation

GitHub Actions / test

Don't use process.exit(); throw an error instead

Check failure on line 89 in lazer/sdk/js/examples/index.ts

View workflow job for this annotation

GitHub Actions / test

Only use `process.exit()` in CLI apps. Throw an error instead
});
16 changes: 12 additions & 4 deletions lazer/sdk/js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ const UINT32_NUM_BYTES = 4;
const UINT64_NUM_BYTES = 8;

export class PythLazerClient {
wsp: WebSocketPool;
private constructor(private readonly wsp: WebSocketPool) {}

/**
* Creates a new PythLazerClient instance.
Expand All @@ -39,13 +39,14 @@ export class PythLazerClient {
* @param numConnections - The number of parallel WebSocket connections to establish (default: 3). A higher number gives a more reliable stream.
* @param logger - Optional logger to get socket level logs. Compatible with most loggers such as the built-in console and `bunyan`.
*/
constructor(
static async create(
urls: string[],
token: string,
numConnections = 3,
logger: Logger = dummyLogger
) {
this.wsp = new WebSocketPool(urls, token, numConnections, logger);
): Promise<PythLazerClient> {
const wsp = await WebSocketPool.create(urls, token, numConnections, logger);
return new PythLazerClient(wsp);
}

addMessageListener(handler: (event: JsonOrBinaryResponse) => void) {
Expand Down Expand Up @@ -110,6 +111,13 @@ export class PythLazerClient {
await this.wsp.sendRequest(request);
}

/**
* Returns a promise that resolves when all WebSocket connections are down or attempting to reconnect
*/
onAllConnectionsDown(): Promise<void> {
return this.wsp.onAllConnectionsDown();
}

shutdown(): void {
this.wsp.shutdown();
}
Expand Down
78 changes: 56 additions & 22 deletions lazer/sdk/js/src/socket/resilient-web-socket.ts
Original file line number Diff line number Diff line change
@@ -1,21 +1,10 @@
import type { ClientRequestArgs } from "node:http";

Check failure on line 1 in lazer/sdk/js/src/socket/resilient-web-socket.ts

View workflow job for this annotation

GitHub Actions / test

There should be at least one empty line between import groups

import WebSocket, { type ClientOptions, type ErrorEvent } from "isomorphic-ws";
import type { Logger } from "ts-log";

// Reconnect with expo backoff if we don't get a message or ping for 10 seconds
const HEARTBEAT_TIMEOUT_DURATION = 10_000;
const CONNECTION_TIMEOUT = 5000;

/**
* This class wraps websocket to provide a resilient web socket client.
*
* It will reconnect if connection fails with exponential backoff. Also, it will reconnect
* if it receives no ping request or regular message from server within a while as indication
* of timeout (assuming the server sends either regularly).
*
* This class also logs events if logger is given and by replacing onError method you can handle
* connection errors yourself (e.g: do not retry and close the connection).
*/
export class ResilientWebSocket {
endpoint: string;
wsClient: undefined | WebSocket;
Expand All @@ -24,10 +13,23 @@
private wsFailedAttempts: number;
private heartbeatTimeout: undefined | NodeJS.Timeout;
private logger: undefined | Logger;
private connectionPromise: Promise<void> | undefined;
private resolveConnection: (() => void) | undefined;
private rejectConnection: ((error: Error) => void) | undefined;
private _isReconnecting: boolean = false;

Check failure on line 19 in lazer/sdk/js/src/socket/resilient-web-socket.ts

View workflow job for this annotation

GitHub Actions / test

Type boolean trivially inferred from a boolean literal, remove type annotation

get isReconnecting(): boolean {
return this._isReconnecting;
}

get isConnected(): boolean {
return this.wsClient?.readyState === WebSocket.OPEN;
}

onError: (error: ErrorEvent) => void;
onMessage: (data: WebSocket.Data) => void;
onReconnect: () => void;

constructor(
endpoint: string,
wsOptions?: ClientOptions | ClientRequestArgs,
Expand Down Expand Up @@ -64,23 +66,48 @@
}
}

startWebSocket(): void {
async startWebSocket(): Promise<void> {
if (this.wsClient !== undefined) {
return;
// If there's an existing connection attempt, wait for it
if (this.connectionPromise) {
return this.connectionPromise;
}
return Promise.resolve();

Check failure on line 75 in lazer/sdk/js/src/socket/resilient-web-socket.ts

View workflow job for this annotation

GitHub Actions / test

Prefer `return value` over `return Promise.resolve(value)`
}

this.logger?.info(`Creating Web Socket client`);

// Create a new promise for this connection attempt
this.connectionPromise = new Promise((resolve, reject) => {
this.resolveConnection = resolve;
this.rejectConnection = reject;
});

// Set a connection timeout
const timeoutId = setTimeout(() => {
if (this.rejectConnection) {
this.rejectConnection(
new Error(`Connection timeout after ${CONNECTION_TIMEOUT}ms`)

Check failure on line 90 in lazer/sdk/js/src/socket/resilient-web-socket.ts

View workflow job for this annotation

GitHub Actions / test

Invalid type "5000" of template literal expression
);
}
}, CONNECTION_TIMEOUT);

this.wsClient = new WebSocket(this.endpoint, this.wsOptions);
this.wsUserClosed = false;

this.wsClient.addEventListener("open", () => {
this.wsFailedAttempts = 0;
this.resetHeartbeat();
clearTimeout(timeoutId);
this._isReconnecting = false;
this.resolveConnection?.();
});

this.wsClient.addEventListener("error", (event) => {
this.onError(event);
if (this.rejectConnection) {
this.rejectConnection(new Error("WebSocket connection failed"));
}
});

this.wsClient.addEventListener("message", (event) => {
Expand All @@ -89,24 +116,23 @@
});

this.wsClient.addEventListener("close", () => {
clearTimeout(timeoutId);
if (this.rejectConnection) {
this.rejectConnection(new Error("WebSocket closed before connecting"));
}
void this.handleClose();
});

// Handle ping events if supported (Node.js only)
if ("on" in this.wsClient) {
// Ping handler is undefined in browser side
this.wsClient.on("ping", () => {
this.logger?.info("Ping received");
this.resetHeartbeat();
});
}

return this.connectionPromise;
}

/**
* Reset the heartbeat timeout. This is called when we receive any message (ping or regular)
* from the server. If we don't receive any message within HEARTBEAT_TIMEOUT_DURATION,
* we assume the connection is dead and reconnect.
*/
private resetHeartbeat(): void {
if (this.heartbeatTimeout !== undefined) {
clearTimeout(this.heartbeatTimeout);
Expand Down Expand Up @@ -145,8 +171,13 @@
} else {
this.wsFailedAttempts += 1;
this.wsClient = undefined;
this.connectionPromise = undefined;
this.resolveConnection = undefined;
this.rejectConnection = undefined;

const waitTime = expoBackoff(this.wsFailedAttempts);

this._isReconnecting = true;
this.logger?.error(
"Connection closed unexpectedly or because of timeout. Reconnecting after " +
String(waitTime) +
Expand All @@ -163,7 +194,7 @@
return;
}

this.startWebSocket();
await this.startWebSocket();
await this.waitForMaybeReadyWebSocket();

if (this.wsClient === undefined) {
Expand All @@ -180,6 +211,9 @@
if (this.wsClient !== undefined) {
const client = this.wsClient;
this.wsClient = undefined;
this.connectionPromise = undefined;
this.resolveConnection = undefined;
this.rejectConnection = undefined;
client.close();
}
this.wsUserClosed = true;
Expand Down
Loading
Loading