From c466d73a57263f87b632bfbcb48eb4b45c10e826 Mon Sep 17 00:00:00 2001 From: buttercat1791 Date: Wed, 23 Jul 2025 10:11:28 -0500 Subject: [PATCH 1/9] Add resource pool class for WebSocket connections --- src/lib/data_structures/websocket_pool.ts | 226 ++++++++++++++++++++++ 1 file changed, 226 insertions(+) create mode 100644 src/lib/data_structures/websocket_pool.ts diff --git a/src/lib/data_structures/websocket_pool.ts b/src/lib/data_structures/websocket_pool.ts new file mode 100644 index 0000000..5b34a9e --- /dev/null +++ b/src/lib/data_structures/websocket_pool.ts @@ -0,0 +1,226 @@ +/** + * A resource pool for WebSocket connections. + * The pool maintains a set of up to 4 connections for each URL. Connections are acquired from the + * pool on a per-URL basis. Idle connections are automatically closed after 60 seconds. + */ +export class WebSocketPool { + static #instance: WebSocketPool; + + #pool: Map = new Map(); + #busy: Set = new Set(); + #waitQueue: Map) => void>> = new Map(); + #idleTimers: Map = new Map(); + #maxConnectionsPerUrl; + #idleTimeoutMs; + + /** + * Private constructor invoked when the singleton instance is first created. + * @param maxConnectionsPerUrl - The maximum number of connections to maintain for each URL. + * @param idleTimeoutMs - The timeout in milliseconds after which idle connections will be + * closed. Defaults to 60 seconds. + */ + private constructor(maxConnectionsPerUrl: number = 4, idleTimeoutMs: number = 60000) { + this.#maxConnectionsPerUrl = maxConnectionsPerUrl; + this.#idleTimeoutMs = idleTimeoutMs; + } + + /** + * Returns the singleton instance of the WebsocketPool. + * @returns The singleton instance. + */ + public static get instance(): WebSocketPool { + if (!WebSocketPool.#instance) { + WebSocketPool.#instance = new WebSocketPool(); + } + return WebSocketPool.#instance; + } + + // #region Resource Management Interface + + /** + * Acquires a WebSocket connection for the specified URL. If a connection is available in the + * pool, that connection is returned. If no connection is available but the pool is not full, a + * new connection is created and returned. If the pool is full, the request is queued until a + * connection is released, at which point the newly-available connection is returned to the + * caller. + * + * @param url - The URL to connect to. + * @returns A promise that resolves with a WebSocket connection. + */ + public acquire(url: string): Promise { + const availableSocket = this.#findAvailableSocket(url); + if (availableSocket) { + // Clear any idle timer for this socket since it's being used again. + this.#clearIdleTimer(availableSocket); + // Add the reference to the socket resource to the busy set. + this.#busy.add(availableSocket); + return Promise.resolve(availableSocket); + } + + const socketsForUrl = this.#pool.get(url) || []; + if (socketsForUrl.length < this.#maxConnectionsPerUrl) { + return this.#createSocket(url); + } + + return new Promise(resolve => { + if (!this.#waitQueue.has(url)) { + this.#waitQueue.set(url, []); + } + this.#waitQueue.get(url)!.push(resolve); + }); + } + + /** + * Releases a WebSocket connection back to the pool. If there are pending requests for the same + * URL, the connection is passed to the requestor in the queue. Otherwise, the connection is + * marked as available. + * + * @param ws - The WebSocket connection to release. + */ + public release(ws: WebSocket): void { + const url = ws.url; + + const waitingResolvers = this.#waitQueue.get(url); + if (waitingResolvers?.length > 0) { + const resolver = waitingResolvers.shift()!; + + // Cleanup empty queues immediately + if (waitingResolvers.length === 0) { + this.#waitQueue.delete(url); + } + + resolver(ws); + return; + } + + // If no requestors are waiting, delete the reference from the busy set and start idle timer. + this.#busy.delete(ws); + this.#startIdleTimer(ws); + } + + /** + * Sets the idle timeout for WebSocket connections. + * + * @param timeoutMs - The timeout in milliseconds after which idle connections will be closed. + */ + public setIdleTimeout(timeoutMs: number): void { + this.#idleTimeoutMs = timeoutMs; + } + + /** + * Gets the current idle timeout setting. + * + * @returns The current idle timeout in milliseconds. + */ + public getIdleTimeout(): number { + return this.#idleTimeoutMs; + } + + /** + * Closes all WebSocket connections and "drains" the pool. + */ + public drain(): void { + // Clear all idle timers first + for (const timer of this.#idleTimers.values()) { + clearTimeout(timer); + } + this.#idleTimers.clear(); + + for (const sockets of this.#pool.values()) { + for (const ws of sockets) { + ws.onclose = null; + ws.close(); + } + } + this.#pool.clear(); + this.#busy.clear(); + this.#waitQueue.clear(); + } + + // #endregion + + // #region Private Helper Methods + + #findAvailableSocket(url: string): WebSocket | null { + const sockets = this.#pool.get(url); + if (!sockets) { + return null; + } + return sockets.find(ws => !this.#busy.has(ws) && ws.readyState === WebSocket.OPEN) ?? null; + } + + #createSocket(url: string): Promise { + return new Promise((resolve, reject) => { + try { + const ws = new WebSocket(url); + ws.onopen = () => { + const sockets = this.#pool.get(url) || []; + sockets.push(ws); + this.#pool.set(url, sockets); + this.#busy.add(ws); + resolve(ws); + }; + + // Remove the socket from the pool when it is closed. The socket may be closed by either + // the client or the server. + ws.onclose = () => this.#removeSocket(ws); + + ws.onerror = () => reject(new Error('WebSocket error')); + } catch (error) { + reject(error); + } + }); + } + + #removeSocket(ws: WebSocket): void { + const url = ws.url; + const sockets = this.#pool.get(url); + if (sockets) { + const index = sockets.indexOf(ws); + if (index > -1) { + sockets.splice(index, 1); + } + if (sockets.length === 0) { + this.#pool.delete(url); + } + } + this.#busy.delete(ws); + this.#clearIdleTimer(ws); + } + + /** + * Starts an idle timer for the specified WebSocket. The connection will be automatically + * closed after the idle timeout period if it remains unused. + * + * @param ws - The WebSocket to start the idle timer for. + */ + #startIdleTimer(ws: WebSocket): void { + // Clear any existing timer first + this.#clearIdleTimer(ws); + + const timer = setTimeout(() => { + // Check if the socket is still idle (not in busy set) before closing + if (!this.#busy.has(ws) && ws.readyState === WebSocket.OPEN) { + ws.close(); + this.#removeSocket(ws); + } + }, this.#idleTimeoutMs); + + this.#idleTimers.set(ws, timer); + } + + /** + * Clears the idle timer for the specified WebSocket. + * + * @param ws - The WebSocket to clear the idle timer for. + */ + #clearIdleTimer(ws: WebSocket): void { + const timer = this.#idleTimers.get(ws); + if (timer) { + clearTimeout(timer); + this.#idleTimers.delete(ws); + } + } + + // #endregion +} From 2839e2547ab8af24764f8a3f7ab87b9d328da8a8 Mon Sep 17 00:00:00 2001 From: buttercat1791 Date: Wed, 23 Jul 2025 21:55:40 -0500 Subject: [PATCH 2/9] Improve `WebSocketPool` class - Share a single connection per URL - Set a limit to the number of connections to prevent resource exhaustion - Prevent certain race conditions --- src/lib/data_structures/websocket_pool.ts | 248 +++++++++++++++------- 1 file changed, 166 insertions(+), 82 deletions(-) diff --git a/src/lib/data_structures/websocket_pool.ts b/src/lib/data_structures/websocket_pool.ts index 5b34a9e..f8bc396 100644 --- a/src/lib/data_structures/websocket_pool.ts +++ b/src/lib/data_structures/websocket_pool.ts @@ -1,27 +1,39 @@ +interface WebSocketPoolWaitingQueueItem { + url: string; + resolve: (ws: WebSocket) => void; + reject: (reason?: any) => void; +} + /** - * A resource pool for WebSocket connections. - * The pool maintains a set of up to 4 connections for each URL. Connections are acquired from the - * pool on a per-URL basis. Idle connections are automatically closed after 60 seconds. + * A resource pool for WebSocket connections. Its purpose is to allow multiple requestors to share + * an open WebSocket connection for greater resource efficiency. + * + * The pool maintains a single connection per URL. Requestors may acquire a reference to a + * connection, and are expected to release it when it is no longer needed. If the number of + * requestors using a connection drops to zero and no new requestors acquire it for a set period, + * the connection is closed. */ export class WebSocketPool { static #instance: WebSocketPool; - #pool: Map = new Map(); - #busy: Set = new Set(); - #waitQueue: Map) => void>> = new Map(); - #idleTimers: Map = new Map(); - #maxConnectionsPerUrl; - #idleTimeoutMs; + #pool: Map = new Map(); + #connecting: Map> = new Map(); + #refCounts: Map = new Map(); + #idleTimers: Map> = new Map(); + #idleTimeoutMs: number; + #maxConnections: number; + #waitingQueue: WebSocketPoolWaitingQueueItem[] = []; /** * Private constructor invoked when the singleton instance is first created. - * @param maxConnectionsPerUrl - The maximum number of connections to maintain for each URL. * @param idleTimeoutMs - The timeout in milliseconds after which idle connections will be * closed. Defaults to 60 seconds. + * @param maxConnections - The maximum number of simultaneous WebSocket connections. Defaults to + * 16. */ - private constructor(maxConnectionsPerUrl: number = 4, idleTimeoutMs: number = 60000) { - this.#maxConnectionsPerUrl = maxConnectionsPerUrl; + private constructor(idleTimeoutMs: number = 60000, maxConnections: number = 16) { this.#idleTimeoutMs = idleTimeoutMs; + this.#maxConnections = maxConnections; } /** @@ -47,27 +59,39 @@ export class WebSocketPool { * @param url - The URL to connect to. * @returns A promise that resolves with a WebSocket connection. */ - public acquire(url: string): Promise { - const availableSocket = this.#findAvailableSocket(url); - if (availableSocket) { - // Clear any idle timer for this socket since it's being used again. - this.#clearIdleTimer(availableSocket); - // Add the reference to the socket resource to the busy set. - this.#busy.add(availableSocket); - return Promise.resolve(availableSocket); - } + public async acquire(url: string): Promise { + const normalizedUrl = this.#normalizeUrl(url); + const existingSocket = this.#pool.get(normalizedUrl); - const socketsForUrl = this.#pool.get(url) || []; - if (socketsForUrl.length < this.#maxConnectionsPerUrl) { - return this.#createSocket(url); + if (existingSocket && existingSocket.readyState < WebSocket.CLOSING) { + this.#checkOutSocket(existingSocket); + return Promise.resolve(existingSocket); } - return new Promise(resolve => { - if (!this.#waitQueue.has(url)) { - this.#waitQueue.set(url, []); + const connectingPromise = this.#connecting.get(normalizedUrl); + if (connectingPromise) { + const ws = await connectingPromise; + if (ws.readyState === WebSocket.OPEN) { + this.#checkOutSocket(ws); + return ws; } - this.#waitQueue.get(url)!.push(resolve); + throw new Error(`[WebSocketPool] WebSocket connection failed for ${normalizedUrl}`); + } + + if (this.#pool.size + this.#connecting.size >= this.#maxConnections) { + return new Promise((resolve, reject) => { + this.#waitingQueue.push({ url: normalizedUrl, resolve, reject }); + }); + } + + const newConnectionPromise = this.#createSocket(normalizedUrl); + this.#connecting.set(normalizedUrl, newConnectionPromise); + + newConnectionPromise.finally(() => { + this.#connecting.delete(normalizedUrl); }); + + return newConnectionPromise; } /** @@ -78,24 +102,19 @@ export class WebSocketPool { * @param ws - The WebSocket connection to release. */ public release(ws: WebSocket): void { - const url = ws.url; + const currentCount = this.#refCounts.get(ws); + if (!currentCount) { + throw new Error('[WebSocketPool] Attempted to release an unmanaged WebSocket connection.'); + } - const waitingResolvers = this.#waitQueue.get(url); - if (waitingResolvers?.length > 0) { - const resolver = waitingResolvers.shift()!; - - // Cleanup empty queues immediately - if (waitingResolvers.length === 0) { - this.#waitQueue.delete(url); + if (currentCount > 0) { + const newCount = currentCount - 1; + this.#refCounts.set(ws, newCount); + + if (newCount === 0) { + this.#startIdleTimer(ws); } - - resolver(ws); - return; } - - // If no requestors are waiting, delete the reference from the busy set and start idle timer. - this.#busy.delete(ws); - this.#startIdleTimer(ws); } /** @@ -116,6 +135,25 @@ export class WebSocketPool { return this.#idleTimeoutMs; } + /** + * Sets the maximum number of simultaneous WebSocket connections. + * + * @param limit - The new connection limit. + */ + public set maxConnections(limit: number) { + this.#maxConnections = limit; + this.#processWaitingQueue(); + } + + /** + * Gets the current maximum number of simultaneous WebSocket connections. + * + * @returns The current connection limit. + */ + public get maxConnections(): number { + return this.#maxConnections; + } + /** * Closes all WebSocket connections and "drains" the pool. */ @@ -126,47 +164,50 @@ export class WebSocketPool { } this.#idleTimers.clear(); - for (const sockets of this.#pool.values()) { - for (const ws of sockets) { - ws.onclose = null; - ws.close(); - } + for (const { reject } of this.#waitingQueue) { + reject(new Error('[WebSocketPool] Draining pool.')); + } + this.#waitingQueue = []; + + for (const promise of this.#connecting.values()) { + // While we can't cancel the connection attempt, we can prevent callers from using it. + promise.catch(() => { + /* ignore connection errors during drain */ + }); + } + this.#connecting.clear(); + + for (const ws of this.#pool.values()) { + ws.close(); } this.#pool.clear(); - this.#busy.clear(); - this.#waitQueue.clear(); + this.#refCounts.clear(); } // #endregion // #region Private Helper Methods - #findAvailableSocket(url: string): WebSocket | null { - const sockets = this.#pool.get(url); - if (!sockets) { - return null; - } - return sockets.find(ws => !this.#busy.has(ws) && ws.readyState === WebSocket.OPEN) ?? null; - } - #createSocket(url: string): Promise { return new Promise((resolve, reject) => { try { const ws = new WebSocket(url); ws.onopen = () => { - const sockets = this.#pool.get(url) || []; - sockets.push(ws); - this.#pool.set(url, sockets); - this.#busy.add(ws); + this.#pool.set(url, ws); + this.#refCounts.set(ws, 1); + + // Remove the socket from the pool when it is closed. The socket may be closed by + // either the client or the server. + ws.onclose = () => this.#removeSocket(ws); resolve(ws); }; - // Remove the socket from the pool when it is closed. The socket may be closed by either - // the client or the server. - ws.onclose = () => this.#removeSocket(ws); - - ws.onerror = () => reject(new Error('WebSocket error')); + ws.onerror = (event) => { + this.#processWaitingQueue(); + reject(new Error(`[WebSocketPool] WebSocket connection failed for ${url}: ${event.type}`)); + }; } catch (error) { + this.#processWaitingQueue(); reject(error); } }); @@ -174,33 +215,25 @@ export class WebSocketPool { #removeSocket(ws: WebSocket): void { const url = ws.url; - const sockets = this.#pool.get(url); - if (sockets) { - const index = sockets.indexOf(ws); - if (index > -1) { - sockets.splice(index, 1); - } - if (sockets.length === 0) { - this.#pool.delete(url); - } - } - this.#busy.delete(ws); + this.#pool.delete(url); + this.#refCounts.delete(ws); this.#clearIdleTimer(ws); + this.#processWaitingQueue(); } /** * Starts an idle timer for the specified WebSocket. The connection will be automatically * closed after the idle timeout period if it remains unused. * - * @param ws - The WebSocket to start the idle timer for. + * @param ws - The WebSocket for which to start the idle timer. */ #startIdleTimer(ws: WebSocket): void { // Clear any existing timer first this.#clearIdleTimer(ws); const timer = setTimeout(() => { - // Check if the socket is still idle (not in busy set) before closing - if (!this.#busy.has(ws) && ws.readyState === WebSocket.OPEN) { + const refCount = this.#refCounts.get(ws); + if ((!refCount || refCount === 0) && ws.readyState === WebSocket.OPEN) { ws.close(); this.#removeSocket(ws); } @@ -212,7 +245,7 @@ export class WebSocketPool { /** * Clears the idle timer for the specified WebSocket. * - * @param ws - The WebSocket to clear the idle timer for. + * @param ws - The WebSocket for which to clear the idle timer. */ #clearIdleTimer(ws: WebSocket): void { const timer = this.#idleTimers.get(ws); @@ -222,5 +255,56 @@ export class WebSocketPool { } } + #processWaitingQueue(): void { + while ( + this.#waitingQueue.length > 0 && + this.#pool.size + this.#connecting.size < this.#maxConnections + ) { + const nextInQueue = this.#waitingQueue.shift(); + + if (!nextInQueue) { + continue; + } + + const { url, resolve, reject } = nextInQueue; + // Re-check if a connection for this URL was created while this request was in the queue + const existingSocket = this.#pool.get(url); + if (existingSocket && existingSocket.readyState < WebSocket.CLOSING) { + this.#checkOutSocket(existingSocket); + resolve(existingSocket); + } else { + const connectingPromise = this.#connecting.get(url); + if (connectingPromise) { + connectingPromise.then(resolve, reject); + } + } + } + } + + #checkOutSocket(ws: WebSocket): void { + const count = (this.#refCounts.get(ws) || 0) + 1; + this.#refCounts.set(ws, count); + this.#clearIdleTimer(ws); + } + + #normalizeUrl(url: string): string { + try { + const urlObj = new URL(url); + // The URL constructor correctly normalizes scheme and hostname casing. + let normalized = urlObj.toString(); + + // The logic to remove a trailing slash for connection coalescing can be kept, + // but should be done on the normalized string. + if (urlObj.pathname !== '/' && normalized.endsWith('/')) { + normalized = normalized.slice(0, -1); + } + + return normalized; + } catch { + // If URL is invalid, return it as-is and let WebSocket constructor handle the error. + return url; + } + } + // #endregion } From 0743cc3629100bd32b4a024daab26aa786fe7e5d Mon Sep 17 00:00:00 2001 From: buttercat1791 Date: Wed, 23 Jul 2025 23:54:55 -0500 Subject: [PATCH 3/9] Update Deno lockfile --- deno.lock | 1 - 1 file changed, 1 deletion(-) diff --git a/deno.lock b/deno.lock index 098f9db..35676a3 100644 --- a/deno.lock +++ b/deno.lock @@ -3,7 +3,6 @@ "specifiers": { "npm:@noble/curves@^1.9.4": "1.9.4", "npm:@noble/hashes@^1.8.0": "1.8.0", - "npm:@noble/secp256k1@^2.3.0": "2.3.0", "npm:@nostr-dev-kit/ndk-cache-dexie@2.6": "2.6.33_nostr-tools@2.15.1__typescript@5.8.3_typescript@5.8.3", "npm:@nostr-dev-kit/ndk-cache-dexie@^2.6.33": "2.6.33_nostr-tools@2.15.1__typescript@5.8.3_typescript@5.8.3", "npm:@nostr-dev-kit/ndk@^2.14.32": "2.14.32_nostr-tools@2.15.1__typescript@5.8.3_typescript@5.8.3", From 9a95ee450882520801b247ecf1e8f78e1f41a502 Mon Sep 17 00:00:00 2001 From: buttercat1791 Date: Wed, 23 Jul 2025 23:55:11 -0500 Subject: [PATCH 4/9] Remove unused imports --- src/lib/components/EventInput.svelte | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/lib/components/EventInput.svelte b/src/lib/components/EventInput.svelte index cc2e2a3..b00fecd 100644 --- a/src/lib/components/EventInput.svelte +++ b/src/lib/components/EventInput.svelte @@ -3,7 +3,6 @@ getTitleTagForEvent, getDTagForEvent, requiresDTag, - hasDTag, validateNotAsciidoc, validateAsciiDoc, build30040EventSet, @@ -22,7 +21,6 @@ import { prefixNostrAddresses } from "$lib/utils/nostrUtils"; import { activeInboxRelays, activeOutboxRelays } from "$lib/ndk"; import { Button } from "flowbite-svelte"; - import { nip19 } from "nostr-tools"; import { goto } from "$app/navigation"; let kind = $state(30023); From c79b844f65f3be97a9bb16aa4649f1f16e06b0e5 Mon Sep 17 00:00:00 2001 From: buttercat1791 Date: Wed, 23 Jul 2025 23:55:31 -0500 Subject: [PATCH 5/9] Use `WebSocketPool` for raw WebSocket connections --- src/lib/components/EventInput.svelte | 20 +++----- src/lib/ndk.ts | 15 +++--- src/lib/utils/community_checker.ts | 35 ++++++-------- src/lib/utils/relayDiagnostics.ts | 70 +++++++--------------------- 4 files changed, 46 insertions(+), 94 deletions(-) diff --git a/src/lib/components/EventInput.svelte b/src/lib/components/EventInput.svelte index b00fecd..7834a11 100644 --- a/src/lib/components/EventInput.svelte +++ b/src/lib/components/EventInput.svelte @@ -22,6 +22,7 @@ import { activeInboxRelays, activeOutboxRelays } from "$lib/ndk"; import { Button } from "flowbite-svelte"; import { goto } from "$app/navigation"; + import { WebSocketPool } from "$lib/data_structures/websocket_pool"; let kind = $state(30023); let tags = $state<[string, string][]>([]); @@ -306,17 +307,14 @@ for (const relayUrl of relays) { try { - const ws = new WebSocket(relayUrl); + const ws = await WebSocketPool.instance.acquire(relayUrl); + await new Promise((resolve, reject) => { const timeout = setTimeout(() => { - ws.close(); + WebSocketPool.instance.release(ws); reject(new Error("Timeout")); }, 5000); - ws.onopen = () => { - ws.send(JSON.stringify(["EVENT", signedEvent])); - }; - ws.onmessage = (e) => { const [type, id, ok, message] = JSON.parse(e.data); if (type === "OK" && id === signedEvent.id) { @@ -324,20 +322,14 @@ if (ok) { published = true; relaysPublished.push(relayUrl); - ws.close(); + WebSocketPool.instance.release(ws); resolve(); } else { - ws.close(); + WebSocketPool.instance.release(ws); reject(new Error(message)); } } }; - - ws.onerror = () => { - clearTimeout(timeout); - ws.close(); - reject(new Error("WebSocket error")); - }; }); if (published) break; } catch (e) { diff --git a/src/lib/ndk.ts b/src/lib/ndk.ts index af039c4..17dbf69 100644 --- a/src/lib/ndk.ts +++ b/src/lib/ndk.ts @@ -21,6 +21,7 @@ export { testRelayConnection }; import { userStore } from "./stores/userStore.ts"; import { userPubkey } from "./stores/authStore.Svelte.ts"; import { startNetworkStatusMonitoring, stopNetworkStatusMonitoring } from "./stores/networkStore.ts"; +import { WebSocketPool } from "./data_structures/websocket_pool.ts"; export const ndkInstance: Writable = writable(); export const ndkSignedIn = writable(false); @@ -199,16 +200,14 @@ export function checkWebSocketSupport(): void { // Test if secure WebSocket is supported try { - const testWs = new WebSocket("wss://echo.websocket.org"); - testWs.onopen = () => { + WebSocketPool.instance.acquire("wss://echo.websocket.org").then((ws) => { console.debug("[NDK.ts] ✓ Secure WebSocket (wss://) is supported"); - testWs.close(); - }; - testWs.onerror = () => { + WebSocketPool.instance.release(ws); + }).catch((_) => { console.warn("[NDK.ts] ✗ Secure WebSocket (wss://) may not be supported"); - }; - } catch (error) { - console.warn("[NDK.ts] ✗ WebSocket test failed:", error); + }); + } catch { + console.warn("[NDK.ts] ✗ WebSocket test failed"); } } diff --git a/src/lib/utils/community_checker.ts b/src/lib/utils/community_checker.ts index 7e6ea11..1d19d91 100644 --- a/src/lib/utils/community_checker.ts +++ b/src/lib/utils/community_checker.ts @@ -1,4 +1,5 @@ import { communityRelays } from "$lib/consts"; +import { WebSocketPool } from "../data_structures/websocket_pool.ts"; import { RELAY_CONSTANTS, SEARCH_LIMITS } from "./search_constants"; // Cache for pubkeys with kind 1 events on communityRelay @@ -16,37 +17,31 @@ export async function checkCommunity(pubkey: string): Promise { // Try each community relay until we find one that works for (const relayUrl of communityRelays) { try { - const ws = new WebSocket(relayUrl); + const ws = await WebSocketPool.instance.acquire(relayUrl); const result = await new Promise((resolve) => { - ws.onopen = () => { - ws.send( - JSON.stringify([ - "REQ", - RELAY_CONSTANTS.COMMUNITY_REQUEST_ID, - { - kinds: RELAY_CONSTANTS.COMMUNITY_REQUEST_KINDS, - authors: [pubkey], - limit: SEARCH_LIMITS.COMMUNITY_CHECK, - }, - ]), - ); - }; + ws.send( + JSON.stringify([ + "REQ", + RELAY_CONSTANTS.COMMUNITY_REQUEST_ID, + { + kinds: RELAY_CONSTANTS.COMMUNITY_REQUEST_KINDS, + authors: [pubkey], + limit: SEARCH_LIMITS.COMMUNITY_CHECK, + }, + ]), + ); ws.onmessage = (event) => { const data = JSON.parse(event.data); if (data[0] === "EVENT" && data[2]?.kind === 1) { communityCache.set(pubkey, true); - ws.close(); + WebSocketPool.instance.release(ws); resolve(true); } else if (data[0] === "EOSE") { communityCache.set(pubkey, false); - ws.close(); + WebSocketPool.instance.release(ws); resolve(false); } }; - ws.onerror = () => { - ws.close(); - resolve(false); - }; }); if (result) { diff --git a/src/lib/utils/relayDiagnostics.ts b/src/lib/utils/relayDiagnostics.ts index df61930..6be37c4 100644 --- a/src/lib/utils/relayDiagnostics.ts +++ b/src/lib/utils/relayDiagnostics.ts @@ -1,3 +1,4 @@ +import { WebSocketPool } from "../data_structures/websocket_pool.ts"; import { activeInboxRelays, activeOutboxRelays } from "../ndk.ts"; import { TIMEOUTS } from "./search_constants.ts"; import { get } from "svelte/store"; @@ -13,72 +14,37 @@ export interface RelayDiagnostic { /** * Tests connection to a single relay */ -export function testRelay(url: string): Promise { +export async function testRelay(url: string): Promise { const startTime = Date.now(); + const ws = await WebSocketPool.instance.acquire(url); return new Promise((resolve) => { - const ws = new WebSocket(url); - let resolved = false; - const timeout = setTimeout(() => { - if (!resolved) { - resolved = true; - ws.close(); - resolve({ - url, - connected: false, - requiresAuth: false, - error: "Connection timeout", - responseTime: Date.now() - startTime, - }); - } + WebSocketPool.instance.release(ws); + resolve({ + url, + connected: false, + requiresAuth: false, + error: "Connection timeout", + responseTime: Date.now() - startTime, + }); }, TIMEOUTS.RELAY_DIAGNOSTICS); - ws.onopen = () => { - if (!resolved) { - resolved = true; + ws.onmessage = (event) => { + const data = JSON.parse(event.data); + if (data[0] === "NOTICE" && data[1]?.includes("auth-required")) { clearTimeout(timeout); - ws.close(); + WebSocketPool.instance.release(ws); resolve({ url, connected: true, - requiresAuth: false, + requiresAuth: true, responseTime: Date.now() - startTime, }); } - }; - - ws.onerror = () => { - if (!resolved) { - resolved = true; - clearTimeout(timeout); - resolve({ - url, - connected: false, - requiresAuth: false, - error: "WebSocket error", - responseTime: Date.now() - startTime, - }); - } - }; - - ws.onmessage = (event) => { - const data = JSON.parse(event.data); - if (data[0] === "NOTICE" && data[1]?.includes("auth-required")) { - if (!resolved) { - resolved = true; - clearTimeout(timeout); - ws.close(); - resolve({ - url, - connected: true, - requiresAuth: true, - responseTime: Date.now() - startTime, - }); - } - } - }; + } }); + } /** From 96dde51c44863ace80ded81872ae1f7abf13361e Mon Sep 17 00:00:00 2001 From: buttercat1791 Date: Thu, 24 Jul 2025 00:01:00 -0500 Subject: [PATCH 6/9] Change prescribed quote style for Cursor --- .cursor/rules/alexandria.mdc | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/.cursor/rules/alexandria.mdc b/.cursor/rules/alexandria.mdc index 7a0724f..45d67a6 100644 --- a/.cursor/rules/alexandria.mdc +++ b/.cursor/rules/alexandria.mdc @@ -49,7 +49,7 @@ Observe the following style guidelines when writing code: - Use blocks enclosed by curly brackets when writing control flow expressions such as `for` and `while` loops, and `if` and `switch` statements. - Begin `case` expressions in a `switch` statement at the same indentation level as the `switch` itself. Indent code within a `case` block. - Limit line length to 100 characters; break statements across lines if necessary. -- Default to single quotes. +- Default to double quotes. ### HTML @@ -57,5 +57,3 @@ Observe the following style guidelines when writing code: - Break long tags across multiple lines. - Use Tailwind 4 utility classes for styling. - Default to single quotes. - - From eb682846e00db4c8fcd882a68aab07541b3c4718 Mon Sep 17 00:00:00 2001 From: buttercat1791 Date: Thu, 24 Jul 2025 00:01:21 -0500 Subject: [PATCH 7/9] Recommend `WebSocketPool` usage to Cursor --- .cursor/rules/alexandria.mdc | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.cursor/rules/alexandria.mdc b/.cursor/rules/alexandria.mdc index 45d67a6..f45fffd 100644 --- a/.cursor/rules/alexandria.mdc +++ b/.cursor/rules/alexandria.mdc @@ -19,6 +19,12 @@ In reader mode, Alexandria loads a document tree from a root publication index e Svelte components in Alexandria use TypeScript exclusively over plain JavaScript. Styles are defined via Tailwind 4 utility classes, and some custom utility classes are defined in [app.css](mdc:src/app.css). The app runs on Deno, but maintains compatibility with Node.js. +### Utilities + +The project contains a number of modules that define utility classes and functions to support the app's operations. Make use of these utilities when they are relevant to reduce code duplication and maintain common patterns: + +- Use the `WebSocketPool` class defined in [websocket_pool.ts](../src/lib/data_structures/websocket_pool.ts) when handling raw WebSockets to efficiently manage connections. + ## General Guidelines When responding to prompts, adhere to the following rules: From fee33e8f92f1c7810c4543aa1a996f01937e36fb Mon Sep 17 00:00:00 2001 From: buttercat1791 Date: Thu, 24 Jul 2025 23:30:20 -0500 Subject: [PATCH 8/9] `#instance` -> `#shared` --- src/lib/data_structures/websocket_pool.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/lib/data_structures/websocket_pool.ts b/src/lib/data_structures/websocket_pool.ts index f8bc396..831e32a 100644 --- a/src/lib/data_structures/websocket_pool.ts +++ b/src/lib/data_structures/websocket_pool.ts @@ -14,7 +14,7 @@ interface WebSocketPoolWaitingQueueItem { * the connection is closed. */ export class WebSocketPool { - static #instance: WebSocketPool; + static #shared: WebSocketPool; #pool: Map = new Map(); #connecting: Map> = new Map(); @@ -41,10 +41,10 @@ export class WebSocketPool { * @returns The singleton instance. */ public static get instance(): WebSocketPool { - if (!WebSocketPool.#instance) { - WebSocketPool.#instance = new WebSocketPool(); + if (!WebSocketPool.#shared) { + WebSocketPool.#shared = new WebSocketPool(); } - return WebSocketPool.#instance; + return WebSocketPool.#shared; } // #region Resource Management Interface From 2a571224362d42155ef9f88349d50c46bad8aaff Mon Sep 17 00:00:00 2001 From: buttercat1791 Date: Fri, 25 Jul 2025 00:51:21 -0500 Subject: [PATCH 9/9] Introduce WebSocket handles and simplify - The `acquire`, `release`, and `drain` method signatures remain unchanged. - `WebSocketHandle` objects are introduced to tie each socket's state to the socket itself. - Initial connection logic is simplified. - Some minor improvements and validations are introduced based on AI-assisted code review. --- src/lib/data_structures/websocket_pool.ts | 276 ++++++++++++---------- 1 file changed, 152 insertions(+), 124 deletions(-) diff --git a/src/lib/data_structures/websocket_pool.ts b/src/lib/data_structures/websocket_pool.ts index 831e32a..fca0325 100644 --- a/src/lib/data_structures/websocket_pool.ts +++ b/src/lib/data_structures/websocket_pool.ts @@ -1,6 +1,16 @@ +/** + * A representation of a WebSocket connection used internally by the WebSocketPool. Attaches + * information about the connection's state within the resource pool to the connection itself. + */ +interface WebSocketHandle { + ws: WebSocket; + refCount: number; + idleTimer?: ReturnType; +} + interface WebSocketPoolWaitingQueueItem { url: string; - resolve: (ws: WebSocket) => void; + resolve: (handle: WebSocketHandle) => void; reject: (reason?: any) => void; } @@ -16,10 +26,11 @@ interface WebSocketPoolWaitingQueueItem { export class WebSocketPool { static #shared: WebSocketPool; - #pool: Map = new Map(); - #connecting: Map> = new Map(); - #refCounts: Map = new Map(); - #idleTimers: Map> = new Map(); + /** + * A map of WebSocket URLs to the handles containing the connection and its state. + */ + #pool: Map = new Map(); + #idleTimeoutMs: number; #maxConnections: number; #waitingQueue: WebSocketPoolWaitingQueueItem[] = []; @@ -50,71 +61,38 @@ export class WebSocketPool { // #region Resource Management Interface /** - * Acquires a WebSocket connection for the specified URL. If a connection is available in the - * pool, that connection is returned. If no connection is available but the pool is not full, a - * new connection is created and returned. If the pool is full, the request is queued until a - * connection is released, at which point the newly-available connection is returned to the - * caller. + * Sets the maximum number of simultaneous WebSocket connections. * - * @param url - The URL to connect to. - * @returns A promise that resolves with a WebSocket connection. + * @param limit - The new connection limit. */ - public async acquire(url: string): Promise { - const normalizedUrl = this.#normalizeUrl(url); - const existingSocket = this.#pool.get(normalizedUrl); - - if (existingSocket && existingSocket.readyState < WebSocket.CLOSING) { - this.#checkOutSocket(existingSocket); - return Promise.resolve(existingSocket); + public set maxConnections(limit: number) { + if (limit === this.#maxConnections) { + return; } - const connectingPromise = this.#connecting.get(normalizedUrl); - if (connectingPromise) { - const ws = await connectingPromise; - if (ws.readyState === WebSocket.OPEN) { - this.#checkOutSocket(ws); - return ws; - } - throw new Error(`[WebSocketPool] WebSocket connection failed for ${normalizedUrl}`); + if (limit == null || isNaN(limit)) { + throw new Error('[WebSocketPool] Connection limit must be a number.'); } - if (this.#pool.size + this.#connecting.size >= this.#maxConnections) { - return new Promise((resolve, reject) => { - this.#waitingQueue.push({ url: normalizedUrl, resolve, reject }); - }); + if (limit <= 0) { + throw new Error('[WebSocketPool] Connection limit must be greater than 0.'); } - const newConnectionPromise = this.#createSocket(normalizedUrl); - this.#connecting.set(normalizedUrl, newConnectionPromise); - - newConnectionPromise.finally(() => { - this.#connecting.delete(normalizedUrl); - }); + if (!Number.isInteger(limit)) { + throw new Error('[WebSocketPool] Connection limit must be an integer.'); + } - return newConnectionPromise; + this.#maxConnections = limit; + this.#processWaitingQueue(); } /** - * Releases a WebSocket connection back to the pool. If there are pending requests for the same - * URL, the connection is passed to the requestor in the queue. Otherwise, the connection is - * marked as available. + * Gets the current maximum number of simultaneous WebSocket connections. * - * @param ws - The WebSocket connection to release. + * @returns The current connection limit. */ - public release(ws: WebSocket): void { - const currentCount = this.#refCounts.get(ws); - if (!currentCount) { - throw new Error('[WebSocketPool] Attempted to release an unmanaged WebSocket connection.'); - } - - if (currentCount > 0) { - const newCount = currentCount - 1; - this.#refCounts.set(ws, newCount); - - if (newCount === 0) { - this.#startIdleTimer(ws); - } - } + public get maxConnections(): number { + return this.#maxConnections; } /** @@ -122,7 +100,23 @@ export class WebSocketPool { * * @param timeoutMs - The timeout in milliseconds after which idle connections will be closed. */ - public setIdleTimeout(timeoutMs: number): void { + public set idleTimeoutMs(timeoutMs: number) { + if (timeoutMs === this.#idleTimeoutMs) { + return; + } + + if (timeoutMs == null || isNaN(timeoutMs)) { + throw new Error('[WebSocketPool] Idle timeout must be a number.'); + } + + if (timeoutMs <= 0) { + throw new Error('[WebSocketPool] Idle timeout must be greater than 0.'); + } + + if (!Number.isInteger(timeoutMs)) { + throw new Error('[WebSocketPool] Idle timeout must be an integer.'); + } + this.#idleTimeoutMs = timeoutMs; } @@ -131,27 +125,66 @@ export class WebSocketPool { * * @returns The current idle timeout in milliseconds. */ - public getIdleTimeout(): number { + public get idleTimeoutMs(): number { return this.#idleTimeoutMs; } /** - * Sets the maximum number of simultaneous WebSocket connections. + * Acquires a WebSocket connection for the specified URL. If a connection is available in the + * pool, that connection is returned. If no connection is available but the pool is not full, a + * new connection is created and returned. If the pool is full, the request is queued until a + * connection is released, at which point the newly-available connection is returned to the + * caller. * - * @param limit - The new connection limit. + * @param url - The URL to connect to. + * @returns A promise that resolves with a WebSocket connection. */ - public set maxConnections(limit: number) { - this.#maxConnections = limit; - this.#processWaitingQueue(); + public async acquire(url: string): Promise { + const normalizedUrl = this.#normalizeUrl(url); + const handle = this.#pool.get(normalizedUrl); + + try { + if (handle && handle.ws.readyState === WebSocket.OPEN) { + this.#checkOut(handle); + return handle.ws; + } + + if (this.#pool.size >= this.#maxConnections) { + return new Promise((resolve, reject) => { + this.#waitingQueue.push({ + url: normalizedUrl, + resolve: (handle) => resolve(handle.ws), + reject, + }); + }); + } + + const newHandle = await this.#createSocket(normalizedUrl); + return newHandle.ws; + } catch (error) { + throw new Error( + `[WebSocketPool] Failed to acquire connection for ${normalizedUrl}: ${error}` + ); + } } /** - * Gets the current maximum number of simultaneous WebSocket connections. + * Releases a WebSocket connection back to the pool. If there are pending requests for the same + * URL, the connection is passed to the requestor in the queue. Otherwise, the connection is + * marked as available. * - * @returns The current connection limit. + * @param handle - The WebSocketHandle to release. */ - public get maxConnections(): number { - return this.#maxConnections; + public release(ws: WebSocket): void { + const normalizedUrl = this.#normalizeUrl(ws.url); + const handle = this.#pool.get(normalizedUrl); + if (!handle) { + throw new Error('[WebSocketPool] Attempted to release an unmanaged WebSocket connection.'); + } + + if (--handle.refCount === 0) { + this.#startIdleTimer(handle); + } } /** @@ -159,52 +192,47 @@ export class WebSocketPool { */ public drain(): void { // Clear all idle timers first - for (const timer of this.#idleTimers.values()) { - clearTimeout(timer); + for (const handle of this.#pool.values()) { + this.#clearIdleTimer(handle); } - this.#idleTimers.clear(); for (const { reject } of this.#waitingQueue) { reject(new Error('[WebSocketPool] Draining pool.')); } this.#waitingQueue = []; - for (const promise of this.#connecting.values()) { - // While we can't cancel the connection attempt, we can prevent callers from using it. - promise.catch(() => { - /* ignore connection errors during drain */ - }); - } - this.#connecting.clear(); - - for (const ws of this.#pool.values()) { - ws.close(); + for (const handle of this.#pool.values()) { + handle.ws.close(); } this.#pool.clear(); - this.#refCounts.clear(); } // #endregion // #region Private Helper Methods - #createSocket(url: string): Promise { + #createSocket(url: string): Promise { return new Promise((resolve, reject) => { try { - const ws = new WebSocket(url); - ws.onopen = () => { - this.#pool.set(url, ws); - this.#refCounts.set(ws, 1); + const handle: WebSocketHandle = { + ws: new WebSocket(url), + refCount: 1, + }; + handle.ws.onopen = () => { + this.#pool.set(url, handle); // Remove the socket from the pool when it is closed. The socket may be closed by // either the client or the server. - ws.onclose = () => this.#removeSocket(ws); - resolve(ws); + handle.ws.onclose = () => this.#removeSocket(handle); + resolve(handle); }; - ws.onerror = (event) => { + handle.ws.onerror = (event) => { + this.#removeSocket(handle); this.#processWaitingQueue(); - reject(new Error(`[WebSocketPool] WebSocket connection failed for ${url}: ${event.type}`)); + reject( + new Error(`[WebSocketPool] WebSocket connection failed for ${url}: ${event.type}`) + ); }; } catch (error) { this.#processWaitingQueue(); @@ -213,11 +241,10 @@ export class WebSocketPool { }); } - #removeSocket(ws: WebSocket): void { - const url = ws.url; - this.#pool.delete(url); - this.#refCounts.delete(ws); - this.#clearIdleTimer(ws); + #removeSocket(handle: WebSocketHandle): void { + this.#clearIdleTimer(handle); + handle.ws.onopen = handle.ws.onerror = handle.ws.onclose = null; + this.#pool.delete(this.#normalizeUrl(handle.ws.url)); this.#processWaitingQueue(); } @@ -227,64 +254,65 @@ export class WebSocketPool { * * @param ws - The WebSocket for which to start the idle timer. */ - #startIdleTimer(ws: WebSocket): void { + #startIdleTimer(handle: WebSocketHandle): void { // Clear any existing timer first - this.#clearIdleTimer(ws); + this.#clearIdleTimer(handle); - const timer = setTimeout(() => { - const refCount = this.#refCounts.get(ws); - if ((!refCount || refCount === 0) && ws.readyState === WebSocket.OPEN) { - ws.close(); - this.#removeSocket(ws); + handle.idleTimer = setTimeout(() => { + const refCount = handle.refCount; + if (refCount === 0 && handle.ws.readyState === WebSocket.OPEN) { + handle.ws.close(); + this.#removeSocket(handle); } }, this.#idleTimeoutMs); - - this.#idleTimers.set(ws, timer); } /** * Clears the idle timer for the specified WebSocket. * - * @param ws - The WebSocket for which to clear the idle timer. + * @param handle - The WebSocketHandle for which to clear the idle timer. */ - #clearIdleTimer(ws: WebSocket): void { - const timer = this.#idleTimers.get(ws); + #clearIdleTimer(handle: WebSocketHandle): void { + const timer = handle.idleTimer; if (timer) { clearTimeout(timer); - this.#idleTimers.delete(ws); + handle.idleTimer = undefined; } } + /** + * Processes pending requests to acquire a connection. Reuses existing connections when possible. + */ #processWaitingQueue(): void { while ( this.#waitingQueue.length > 0 && - this.#pool.size + this.#connecting.size < this.#maxConnections + this.#pool.size < this.#maxConnections ) { const nextInQueue = this.#waitingQueue.shift(); - if (!nextInQueue) { continue; } const { url, resolve, reject } = nextInQueue; - // Re-check if a connection for this URL was created while this request was in the queue - const existingSocket = this.#pool.get(url); - if (existingSocket && existingSocket.readyState < WebSocket.CLOSING) { - this.#checkOutSocket(existingSocket); - resolve(existingSocket); - } else { - const connectingPromise = this.#connecting.get(url); - if (connectingPromise) { - connectingPromise.then(resolve, reject); - } + + const existingHandle = this.#pool.get(url); + if (existingHandle && existingHandle.ws.readyState === WebSocket.OPEN) { + this.#checkOut(existingHandle); + resolve(existingHandle); + return; } + + this.#createSocket(url).then(resolve, reject); } } - #checkOutSocket(ws: WebSocket): void { - const count = (this.#refCounts.get(ws) || 0) + 1; - this.#refCounts.set(ws, count); - this.#clearIdleTimer(ws); + #checkOut(handle: WebSocketHandle): void { + if (handle.refCount == null) { + throw new Error('[WebSocketPool] Handle refCount unexpectedly null.'); + } + + ++handle.refCount; + this.#clearIdleTimer(handle); } #normalizeUrl(url: string): string {