diff --git a/src/lib/data_structures/websocket_pool.ts b/src/lib/data_structures/websocket_pool.ts index 5b34a9e..f8bc396 100644 --- a/src/lib/data_structures/websocket_pool.ts +++ b/src/lib/data_structures/websocket_pool.ts @@ -1,27 +1,39 @@ +interface WebSocketPoolWaitingQueueItem { + url: string; + resolve: (ws: WebSocket) => void; + reject: (reason?: any) => void; +} + /** - * A resource pool for WebSocket connections. - * The pool maintains a set of up to 4 connections for each URL. Connections are acquired from the - * pool on a per-URL basis. Idle connections are automatically closed after 60 seconds. + * A resource pool for WebSocket connections. Its purpose is to allow multiple requestors to share + * an open WebSocket connection for greater resource efficiency. + * + * The pool maintains a single connection per URL. Requestors may acquire a reference to a + * connection, and are expected to release it when it is no longer needed. If the number of + * requestors using a connection drops to zero and no new requestors acquire it for a set period, + * the connection is closed. */ export class WebSocketPool { static #instance: WebSocketPool; - #pool: Map = new Map(); - #busy: Set = new Set(); - #waitQueue: Map) => void>> = new Map(); - #idleTimers: Map = new Map(); - #maxConnectionsPerUrl; - #idleTimeoutMs; + #pool: Map = new Map(); + #connecting: Map> = new Map(); + #refCounts: Map = new Map(); + #idleTimers: Map> = new Map(); + #idleTimeoutMs: number; + #maxConnections: number; + #waitingQueue: WebSocketPoolWaitingQueueItem[] = []; /** * Private constructor invoked when the singleton instance is first created. - * @param maxConnectionsPerUrl - The maximum number of connections to maintain for each URL. * @param idleTimeoutMs - The timeout in milliseconds after which idle connections will be * closed. Defaults to 60 seconds. + * @param maxConnections - The maximum number of simultaneous WebSocket connections. Defaults to + * 16. */ - private constructor(maxConnectionsPerUrl: number = 4, idleTimeoutMs: number = 60000) { - this.#maxConnectionsPerUrl = maxConnectionsPerUrl; + private constructor(idleTimeoutMs: number = 60000, maxConnections: number = 16) { this.#idleTimeoutMs = idleTimeoutMs; + this.#maxConnections = maxConnections; } /** @@ -47,27 +59,39 @@ export class WebSocketPool { * @param url - The URL to connect to. * @returns A promise that resolves with a WebSocket connection. */ - public acquire(url: string): Promise { - const availableSocket = this.#findAvailableSocket(url); - if (availableSocket) { - // Clear any idle timer for this socket since it's being used again. - this.#clearIdleTimer(availableSocket); - // Add the reference to the socket resource to the busy set. - this.#busy.add(availableSocket); - return Promise.resolve(availableSocket); - } + public async acquire(url: string): Promise { + const normalizedUrl = this.#normalizeUrl(url); + const existingSocket = this.#pool.get(normalizedUrl); - const socketsForUrl = this.#pool.get(url) || []; - if (socketsForUrl.length < this.#maxConnectionsPerUrl) { - return this.#createSocket(url); + if (existingSocket && existingSocket.readyState < WebSocket.CLOSING) { + this.#checkOutSocket(existingSocket); + return Promise.resolve(existingSocket); } - return new Promise(resolve => { - if (!this.#waitQueue.has(url)) { - this.#waitQueue.set(url, []); + const connectingPromise = this.#connecting.get(normalizedUrl); + if (connectingPromise) { + const ws = await connectingPromise; + if (ws.readyState === WebSocket.OPEN) { + this.#checkOutSocket(ws); + return ws; } - this.#waitQueue.get(url)!.push(resolve); + throw new Error(`[WebSocketPool] WebSocket connection failed for ${normalizedUrl}`); + } + + if (this.#pool.size + this.#connecting.size >= this.#maxConnections) { + return new Promise((resolve, reject) => { + this.#waitingQueue.push({ url: normalizedUrl, resolve, reject }); + }); + } + + const newConnectionPromise = this.#createSocket(normalizedUrl); + this.#connecting.set(normalizedUrl, newConnectionPromise); + + newConnectionPromise.finally(() => { + this.#connecting.delete(normalizedUrl); }); + + return newConnectionPromise; } /** @@ -78,24 +102,19 @@ export class WebSocketPool { * @param ws - The WebSocket connection to release. */ public release(ws: WebSocket): void { - const url = ws.url; + const currentCount = this.#refCounts.get(ws); + if (!currentCount) { + throw new Error('[WebSocketPool] Attempted to release an unmanaged WebSocket connection.'); + } - const waitingResolvers = this.#waitQueue.get(url); - if (waitingResolvers?.length > 0) { - const resolver = waitingResolvers.shift()!; - - // Cleanup empty queues immediately - if (waitingResolvers.length === 0) { - this.#waitQueue.delete(url); + if (currentCount > 0) { + const newCount = currentCount - 1; + this.#refCounts.set(ws, newCount); + + if (newCount === 0) { + this.#startIdleTimer(ws); } - - resolver(ws); - return; } - - // If no requestors are waiting, delete the reference from the busy set and start idle timer. - this.#busy.delete(ws); - this.#startIdleTimer(ws); } /** @@ -116,6 +135,25 @@ export class WebSocketPool { return this.#idleTimeoutMs; } + /** + * Sets the maximum number of simultaneous WebSocket connections. + * + * @param limit - The new connection limit. + */ + public set maxConnections(limit: number) { + this.#maxConnections = limit; + this.#processWaitingQueue(); + } + + /** + * Gets the current maximum number of simultaneous WebSocket connections. + * + * @returns The current connection limit. + */ + public get maxConnections(): number { + return this.#maxConnections; + } + /** * Closes all WebSocket connections and "drains" the pool. */ @@ -126,47 +164,50 @@ export class WebSocketPool { } this.#idleTimers.clear(); - for (const sockets of this.#pool.values()) { - for (const ws of sockets) { - ws.onclose = null; - ws.close(); - } + for (const { reject } of this.#waitingQueue) { + reject(new Error('[WebSocketPool] Draining pool.')); + } + this.#waitingQueue = []; + + for (const promise of this.#connecting.values()) { + // While we can't cancel the connection attempt, we can prevent callers from using it. + promise.catch(() => { + /* ignore connection errors during drain */ + }); + } + this.#connecting.clear(); + + for (const ws of this.#pool.values()) { + ws.close(); } this.#pool.clear(); - this.#busy.clear(); - this.#waitQueue.clear(); + this.#refCounts.clear(); } // #endregion // #region Private Helper Methods - #findAvailableSocket(url: string): WebSocket | null { - const sockets = this.#pool.get(url); - if (!sockets) { - return null; - } - return sockets.find(ws => !this.#busy.has(ws) && ws.readyState === WebSocket.OPEN) ?? null; - } - #createSocket(url: string): Promise { return new Promise((resolve, reject) => { try { const ws = new WebSocket(url); ws.onopen = () => { - const sockets = this.#pool.get(url) || []; - sockets.push(ws); - this.#pool.set(url, sockets); - this.#busy.add(ws); + this.#pool.set(url, ws); + this.#refCounts.set(ws, 1); + + // Remove the socket from the pool when it is closed. The socket may be closed by + // either the client or the server. + ws.onclose = () => this.#removeSocket(ws); resolve(ws); }; - // Remove the socket from the pool when it is closed. The socket may be closed by either - // the client or the server. - ws.onclose = () => this.#removeSocket(ws); - - ws.onerror = () => reject(new Error('WebSocket error')); + ws.onerror = (event) => { + this.#processWaitingQueue(); + reject(new Error(`[WebSocketPool] WebSocket connection failed for ${url}: ${event.type}`)); + }; } catch (error) { + this.#processWaitingQueue(); reject(error); } }); @@ -174,33 +215,25 @@ export class WebSocketPool { #removeSocket(ws: WebSocket): void { const url = ws.url; - const sockets = this.#pool.get(url); - if (sockets) { - const index = sockets.indexOf(ws); - if (index > -1) { - sockets.splice(index, 1); - } - if (sockets.length === 0) { - this.#pool.delete(url); - } - } - this.#busy.delete(ws); + this.#pool.delete(url); + this.#refCounts.delete(ws); this.#clearIdleTimer(ws); + this.#processWaitingQueue(); } /** * Starts an idle timer for the specified WebSocket. The connection will be automatically * closed after the idle timeout period if it remains unused. * - * @param ws - The WebSocket to start the idle timer for. + * @param ws - The WebSocket for which to start the idle timer. */ #startIdleTimer(ws: WebSocket): void { // Clear any existing timer first this.#clearIdleTimer(ws); const timer = setTimeout(() => { - // Check if the socket is still idle (not in busy set) before closing - if (!this.#busy.has(ws) && ws.readyState === WebSocket.OPEN) { + const refCount = this.#refCounts.get(ws); + if ((!refCount || refCount === 0) && ws.readyState === WebSocket.OPEN) { ws.close(); this.#removeSocket(ws); } @@ -212,7 +245,7 @@ export class WebSocketPool { /** * Clears the idle timer for the specified WebSocket. * - * @param ws - The WebSocket to clear the idle timer for. + * @param ws - The WebSocket for which to clear the idle timer. */ #clearIdleTimer(ws: WebSocket): void { const timer = this.#idleTimers.get(ws); @@ -222,5 +255,56 @@ export class WebSocketPool { } } + #processWaitingQueue(): void { + while ( + this.#waitingQueue.length > 0 && + this.#pool.size + this.#connecting.size < this.#maxConnections + ) { + const nextInQueue = this.#waitingQueue.shift(); + + if (!nextInQueue) { + continue; + } + + const { url, resolve, reject } = nextInQueue; + // Re-check if a connection for this URL was created while this request was in the queue + const existingSocket = this.#pool.get(url); + if (existingSocket && existingSocket.readyState < WebSocket.CLOSING) { + this.#checkOutSocket(existingSocket); + resolve(existingSocket); + } else { + const connectingPromise = this.#connecting.get(url); + if (connectingPromise) { + connectingPromise.then(resolve, reject); + } + } + } + } + + #checkOutSocket(ws: WebSocket): void { + const count = (this.#refCounts.get(ws) || 0) + 1; + this.#refCounts.set(ws, count); + this.#clearIdleTimer(ws); + } + + #normalizeUrl(url: string): string { + try { + const urlObj = new URL(url); + // The URL constructor correctly normalizes scheme and hostname casing. + let normalized = urlObj.toString(); + + // The logic to remove a trailing slash for connection coalescing can be kept, + // but should be done on the normalized string. + if (urlObj.pathname !== '/' && normalized.endsWith('/')) { + normalized = normalized.slice(0, -1); + } + + return normalized; + } catch { + // If URL is invalid, return it as-is and let WebSocket constructor handle the error. + return url; + } + } + // #endregion }