Skip to content

Commit

Permalink
feat: add promise for all connections down
Browse files Browse the repository at this point in the history
  • Loading branch information
tejasbadadare committed Jan 15, 2025
1 parent af0ba94 commit 83ae95e
Show file tree
Hide file tree
Showing 4 changed files with 129 additions and 64 deletions.
131 changes: 67 additions & 64 deletions lazer/sdk/js/examples/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,78 +7,81 @@ import { PythLazerClient } from "../src/index.js";
console.debug = () => {};

async function main() {
try {
const client = await PythLazerClient.create(
["wss://pyth-lazer.dourolabs.app/v1/stream"],
"access_token",
3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3.
console // Optionally log socket operations (to the console in this case.)
);
const client = await PythLazerClient.create(
["wss://pyth-lazer.dourolabs.app/v1/stream"],
"access_token",
3, // Optionally specify number of parallel redundant connections to reduce the chance of dropped messages. The connections will round-robin across the provided URLs. Default is 3.
console // Optionally log socket operations (to the console in this case.)
);

client.addMessageListener((message) => {
console.info("got message:", message);
switch (message.type) {
case "json": {
if (message.value.type == "streamUpdated") {
console.info(
"stream updated for subscription",
message.value.subscriptionId,
":",
message.value.parsed?.priceFeeds
);
}
break;
// Monitor for all connections being down
client.onAllConnectionsDown().then(() => {

Check failure on line 18 in lazer/sdk/js/examples/index.ts

View workflow job for this annotation

GitHub Actions / test

Promises must be awaited, end with a call to .catch, end with a call to .then with a rejection handler or be explicitly marked as ignored with the `void` operator
// Handle complete connection failure.
// The connections will keep attempting to reconnect with expo backoff.
// To shutdown the client completely, call shutdown().
console.error("All connections are down!");
});

client.addMessageListener((message) => {
console.info("got message:", message);
switch (message.type) {
case "json": {
if (message.value.type == "streamUpdated") {
console.info(
"stream updated for subscription",
message.value.subscriptionId,
":",
message.value.parsed?.priceFeeds
);
}
break;
}
case "binary": {
if ("solana" in message.value) {
console.info(
"solana message:",
message.value.solana?.toString("hex")
);
}
case "binary": {
if ("solana" in message.value) {
console.info(
"solana message:",
message.value.solana?.toString("hex")
);
}
if ("evm" in message.value) {
console.info("evm message:", message.value.evm?.toString("hex"));
}
break;
if ("evm" in message.value) {
console.info("evm message:", message.value.evm?.toString("hex"));
}
break;
}
});
}
});

// Create and remove one or more subscriptions on the fly
await client.subscribe({
type: "subscribe",
subscriptionId: 1,
priceFeedIds: [1, 2],
properties: ["price"],
chains: ["solana"],
deliveryFormat: "binary",
channel: "fixed_rate@200ms",
parsed: false,
jsonBinaryEncoding: "base64",
});
await client.subscribe({
type: "subscribe",
subscriptionId: 2,
priceFeedIds: [1, 2, 3, 4, 5],
properties: ["price"],
chains: ["evm"],
deliveryFormat: "json",
channel: "fixed_rate@200ms",
parsed: true,
jsonBinaryEncoding: "hex",
});
// Create and remove one or more subscriptions on the fly
await client.subscribe({
type: "subscribe",
subscriptionId: 1,
priceFeedIds: [1, 2],
properties: ["price"],
chains: ["solana"],
deliveryFormat: "binary",
channel: "fixed_rate@200ms",
parsed: false,
jsonBinaryEncoding: "base64",
});
await client.subscribe({
type: "subscribe",
subscriptionId: 2,
priceFeedIds: [1, 2, 3, 4, 5],
properties: ["price"],
chains: ["evm"],
deliveryFormat: "json",
channel: "fixed_rate@200ms",
parsed: true,
jsonBinaryEncoding: "hex",
});

await new Promise((resolve) => setTimeout(resolve, 10_000));
await new Promise((resolve) => setTimeout(resolve, 10_000));

await client.unsubscribe(1);
await client.unsubscribe(2);
await client.unsubscribe(1);
await client.unsubscribe(2);

await new Promise((resolve) => setTimeout(resolve, 10_000));
client.shutdown();
} catch (error) {
console.error("Error initializing client:", error);
process.exit(1);
}
await new Promise((resolve) => setTimeout(resolve, 10_000));
client.shutdown();
}

main().catch((error) => {

Check failure on line 87 in lazer/sdk/js/examples/index.ts

View workflow job for this annotation

GitHub Actions / test

Prefer top-level await over using a promise chain

Check failure on line 87 in lazer/sdk/js/examples/index.ts

View workflow job for this annotation

GitHub Actions / test

Prefer the safe `: unknown` for a catch callback variable
Expand Down
7 changes: 7 additions & 0 deletions lazer/sdk/js/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,13 @@ export class PythLazerClient {
await this.wsp.sendRequest(request);
}

/**
* Returns a promise that resolves when all WebSocket connections are down or attempting to reconnect
*/
onAllConnectionsDown(): Promise<void> {
return this.wsp.onAllConnectionsDown();
}

shutdown(): void {
this.wsp.shutdown();
}
Expand Down
11 changes: 11 additions & 0 deletions lazer/sdk/js/src/socket/resilient-web-socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,15 @@ export class ResilientWebSocket {
private connectionPromise: Promise<void> | undefined;
private resolveConnection: (() => void) | undefined;
private rejectConnection: ((error: Error) => void) | undefined;
private _isReconnecting: boolean = false;

Check failure on line 19 in lazer/sdk/js/src/socket/resilient-web-socket.ts

View workflow job for this annotation

GitHub Actions / test

Type boolean trivially inferred from a boolean literal, remove type annotation

get isReconnecting(): boolean {
return this._isReconnecting;
}

get isConnected(): boolean {
return this.wsClient?.readyState === WebSocket.OPEN;
}

onError: (error: ErrorEvent) => void;
onMessage: (data: WebSocket.Data) => void;
Expand Down Expand Up @@ -90,6 +99,7 @@ export class ResilientWebSocket {
this.wsFailedAttempts = 0;
this.resetHeartbeat();
clearTimeout(timeoutId);
this._isReconnecting = false;
this.resolveConnection?.();
});

Expand Down Expand Up @@ -167,6 +177,7 @@ export class ResilientWebSocket {

const waitTime = expoBackoff(this.wsFailedAttempts);

this._isReconnecting = true;
this.logger?.error(
"Connection closed unexpectedly or because of timeout. Reconnecting after " +
String(waitTime) +
Expand Down
44 changes: 44 additions & 0 deletions lazer/sdk/js/src/socket/web-socket-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,18 @@ export class WebSocketPool {
private cache: TTLCache<string, boolean>;
private subscriptions: Map<number, Request>; // id -> subscription Request
private messageListeners: ((event: WebSocket.Data) => void)[];
private allConnectionsDownListeners: (() => void)[];
private wasAllDown: boolean = true;

Check failure on line 16 in lazer/sdk/js/src/socket/web-socket-pool.ts

View workflow job for this annotation

GitHub Actions / test

Type boolean trivially inferred from a boolean literal, remove type annotation

private constructor(private readonly logger: Logger = dummyLogger) {
this.rwsPool = [];
this.cache = new TTLCache({ ttl: 1000 * 10 }); // TTL of 10 seconds
this.subscriptions = new Map();
this.messageListeners = [];
this.allConnectionsDownListeners = [];

// Start monitoring connection states
setInterval(() => this.checkConnectionStates(), 100);
}

/**
Expand Down Expand Up @@ -172,12 +178,50 @@ export class WebSocketPool {
this.messageListeners.push(handler);
}

/**
* Monitors if all websocket connections are currently down or in reconnecting state
* Returns a promise that resolves when all connections are down
*/
onAllConnectionsDown(): Promise<void> {
return new Promise((resolve) => {
if (this.areAllConnectionsDown()) {
resolve();
} else {
this.allConnectionsDownListeners.push(resolve);
}
});
}

private areAllConnectionsDown(): boolean {
return this.rwsPool.every((ws) => !ws.isConnected || ws.isReconnecting);
}

private checkConnectionStates(): void {
const allDown = this.areAllConnectionsDown();

// If all connections just went down
if (allDown && !this.wasAllDown) {
this.wasAllDown = true;
this.logger.error("All WebSocket connections are down or reconnecting");
// Notify all listeners
while (this.allConnectionsDownListeners.length > 0) {
const listener = this.allConnectionsDownListeners.shift();
listener?.();
}
}
// If at least one connection was restored
if (!allDown && this.wasAllDown) {
this.wasAllDown = false;
}
}

shutdown(): void {
for (const rws of this.rwsPool) {
rws.closeWebSocket();
}
this.rwsPool = [];
this.subscriptions.clear();
this.messageListeners = [];
this.allConnectionsDownListeners = [];
}
}

0 comments on commit 83ae95e

Please sign in to comment.