diff --git a/src/lib/data_structures/websocket_pool.ts b/src/lib/data_structures/websocket_pool.ts index 831e32a..fca0325 100644 --- a/src/lib/data_structures/websocket_pool.ts +++ b/src/lib/data_structures/websocket_pool.ts @@ -1,6 +1,16 @@ +/** + * A representation of a WebSocket connection used internally by the WebSocketPool. Attaches + * information about the connection's state within the resource pool to the connection itself. + */ +interface WebSocketHandle { + ws: WebSocket; + refCount: number; + idleTimer?: ReturnType; +} + interface WebSocketPoolWaitingQueueItem { url: string; - resolve: (ws: WebSocket) => void; + resolve: (handle: WebSocketHandle) => void; reject: (reason?: any) => void; } @@ -16,10 +26,11 @@ interface WebSocketPoolWaitingQueueItem { export class WebSocketPool { static #shared: WebSocketPool; - #pool: Map = new Map(); - #connecting: Map> = new Map(); - #refCounts: Map = new Map(); - #idleTimers: Map> = new Map(); + /** + * A map of WebSocket URLs to the handles containing the connection and its state. + */ + #pool: Map = new Map(); + #idleTimeoutMs: number; #maxConnections: number; #waitingQueue: WebSocketPoolWaitingQueueItem[] = []; @@ -50,71 +61,38 @@ export class WebSocketPool { // #region Resource Management Interface /** - * Acquires a WebSocket connection for the specified URL. If a connection is available in the - * pool, that connection is returned. If no connection is available but the pool is not full, a - * new connection is created and returned. If the pool is full, the request is queued until a - * connection is released, at which point the newly-available connection is returned to the - * caller. + * Sets the maximum number of simultaneous WebSocket connections. * - * @param url - The URL to connect to. - * @returns A promise that resolves with a WebSocket connection. + * @param limit - The new connection limit. */ - public async acquire(url: string): Promise { - const normalizedUrl = this.#normalizeUrl(url); - const existingSocket = this.#pool.get(normalizedUrl); - - if (existingSocket && existingSocket.readyState < WebSocket.CLOSING) { - this.#checkOutSocket(existingSocket); - return Promise.resolve(existingSocket); + public set maxConnections(limit: number) { + if (limit === this.#maxConnections) { + return; } - const connectingPromise = this.#connecting.get(normalizedUrl); - if (connectingPromise) { - const ws = await connectingPromise; - if (ws.readyState === WebSocket.OPEN) { - this.#checkOutSocket(ws); - return ws; - } - throw new Error(`[WebSocketPool] WebSocket connection failed for ${normalizedUrl}`); + if (limit == null || isNaN(limit)) { + throw new Error('[WebSocketPool] Connection limit must be a number.'); } - if (this.#pool.size + this.#connecting.size >= this.#maxConnections) { - return new Promise((resolve, reject) => { - this.#waitingQueue.push({ url: normalizedUrl, resolve, reject }); - }); + if (limit <= 0) { + throw new Error('[WebSocketPool] Connection limit must be greater than 0.'); } - const newConnectionPromise = this.#createSocket(normalizedUrl); - this.#connecting.set(normalizedUrl, newConnectionPromise); - - newConnectionPromise.finally(() => { - this.#connecting.delete(normalizedUrl); - }); + if (!Number.isInteger(limit)) { + throw new Error('[WebSocketPool] Connection limit must be an integer.'); + } - return newConnectionPromise; + this.#maxConnections = limit; + this.#processWaitingQueue(); } /** - * Releases a WebSocket connection back to the pool. If there are pending requests for the same - * URL, the connection is passed to the requestor in the queue. Otherwise, the connection is - * marked as available. + * Gets the current maximum number of simultaneous WebSocket connections. * - * @param ws - The WebSocket connection to release. + * @returns The current connection limit. */ - public release(ws: WebSocket): void { - const currentCount = this.#refCounts.get(ws); - if (!currentCount) { - throw new Error('[WebSocketPool] Attempted to release an unmanaged WebSocket connection.'); - } - - if (currentCount > 0) { - const newCount = currentCount - 1; - this.#refCounts.set(ws, newCount); - - if (newCount === 0) { - this.#startIdleTimer(ws); - } - } + public get maxConnections(): number { + return this.#maxConnections; } /** @@ -122,7 +100,23 @@ export class WebSocketPool { * * @param timeoutMs - The timeout in milliseconds after which idle connections will be closed. */ - public setIdleTimeout(timeoutMs: number): void { + public set idleTimeoutMs(timeoutMs: number) { + if (timeoutMs === this.#idleTimeoutMs) { + return; + } + + if (timeoutMs == null || isNaN(timeoutMs)) { + throw new Error('[WebSocketPool] Idle timeout must be a number.'); + } + + if (timeoutMs <= 0) { + throw new Error('[WebSocketPool] Idle timeout must be greater than 0.'); + } + + if (!Number.isInteger(timeoutMs)) { + throw new Error('[WebSocketPool] Idle timeout must be an integer.'); + } + this.#idleTimeoutMs = timeoutMs; } @@ -131,27 +125,66 @@ export class WebSocketPool { * * @returns The current idle timeout in milliseconds. */ - public getIdleTimeout(): number { + public get idleTimeoutMs(): number { return this.#idleTimeoutMs; } /** - * Sets the maximum number of simultaneous WebSocket connections. + * Acquires a WebSocket connection for the specified URL. If a connection is available in the + * pool, that connection is returned. If no connection is available but the pool is not full, a + * new connection is created and returned. If the pool is full, the request is queued until a + * connection is released, at which point the newly-available connection is returned to the + * caller. * - * @param limit - The new connection limit. + * @param url - The URL to connect to. + * @returns A promise that resolves with a WebSocket connection. */ - public set maxConnections(limit: number) { - this.#maxConnections = limit; - this.#processWaitingQueue(); + public async acquire(url: string): Promise { + const normalizedUrl = this.#normalizeUrl(url); + const handle = this.#pool.get(normalizedUrl); + + try { + if (handle && handle.ws.readyState === WebSocket.OPEN) { + this.#checkOut(handle); + return handle.ws; + } + + if (this.#pool.size >= this.#maxConnections) { + return new Promise((resolve, reject) => { + this.#waitingQueue.push({ + url: normalizedUrl, + resolve: (handle) => resolve(handle.ws), + reject, + }); + }); + } + + const newHandle = await this.#createSocket(normalizedUrl); + return newHandle.ws; + } catch (error) { + throw new Error( + `[WebSocketPool] Failed to acquire connection for ${normalizedUrl}: ${error}` + ); + } } /** - * Gets the current maximum number of simultaneous WebSocket connections. + * Releases a WebSocket connection back to the pool. If there are pending requests for the same + * URL, the connection is passed to the requestor in the queue. Otherwise, the connection is + * marked as available. * - * @returns The current connection limit. + * @param handle - The WebSocketHandle to release. */ - public get maxConnections(): number { - return this.#maxConnections; + public release(ws: WebSocket): void { + const normalizedUrl = this.#normalizeUrl(ws.url); + const handle = this.#pool.get(normalizedUrl); + if (!handle) { + throw new Error('[WebSocketPool] Attempted to release an unmanaged WebSocket connection.'); + } + + if (--handle.refCount === 0) { + this.#startIdleTimer(handle); + } } /** @@ -159,52 +192,47 @@ export class WebSocketPool { */ public drain(): void { // Clear all idle timers first - for (const timer of this.#idleTimers.values()) { - clearTimeout(timer); + for (const handle of this.#pool.values()) { + this.#clearIdleTimer(handle); } - this.#idleTimers.clear(); for (const { reject } of this.#waitingQueue) { reject(new Error('[WebSocketPool] Draining pool.')); } this.#waitingQueue = []; - for (const promise of this.#connecting.values()) { - // While we can't cancel the connection attempt, we can prevent callers from using it. - promise.catch(() => { - /* ignore connection errors during drain */ - }); - } - this.#connecting.clear(); - - for (const ws of this.#pool.values()) { - ws.close(); + for (const handle of this.#pool.values()) { + handle.ws.close(); } this.#pool.clear(); - this.#refCounts.clear(); } // #endregion // #region Private Helper Methods - #createSocket(url: string): Promise { + #createSocket(url: string): Promise { return new Promise((resolve, reject) => { try { - const ws = new WebSocket(url); - ws.onopen = () => { - this.#pool.set(url, ws); - this.#refCounts.set(ws, 1); + const handle: WebSocketHandle = { + ws: new WebSocket(url), + refCount: 1, + }; + handle.ws.onopen = () => { + this.#pool.set(url, handle); // Remove the socket from the pool when it is closed. The socket may be closed by // either the client or the server. - ws.onclose = () => this.#removeSocket(ws); - resolve(ws); + handle.ws.onclose = () => this.#removeSocket(handle); + resolve(handle); }; - ws.onerror = (event) => { + handle.ws.onerror = (event) => { + this.#removeSocket(handle); this.#processWaitingQueue(); - reject(new Error(`[WebSocketPool] WebSocket connection failed for ${url}: ${event.type}`)); + reject( + new Error(`[WebSocketPool] WebSocket connection failed for ${url}: ${event.type}`) + ); }; } catch (error) { this.#processWaitingQueue(); @@ -213,11 +241,10 @@ export class WebSocketPool { }); } - #removeSocket(ws: WebSocket): void { - const url = ws.url; - this.#pool.delete(url); - this.#refCounts.delete(ws); - this.#clearIdleTimer(ws); + #removeSocket(handle: WebSocketHandle): void { + this.#clearIdleTimer(handle); + handle.ws.onopen = handle.ws.onerror = handle.ws.onclose = null; + this.#pool.delete(this.#normalizeUrl(handle.ws.url)); this.#processWaitingQueue(); } @@ -227,64 +254,65 @@ export class WebSocketPool { * * @param ws - The WebSocket for which to start the idle timer. */ - #startIdleTimer(ws: WebSocket): void { + #startIdleTimer(handle: WebSocketHandle): void { // Clear any existing timer first - this.#clearIdleTimer(ws); + this.#clearIdleTimer(handle); - const timer = setTimeout(() => { - const refCount = this.#refCounts.get(ws); - if ((!refCount || refCount === 0) && ws.readyState === WebSocket.OPEN) { - ws.close(); - this.#removeSocket(ws); + handle.idleTimer = setTimeout(() => { + const refCount = handle.refCount; + if (refCount === 0 && handle.ws.readyState === WebSocket.OPEN) { + handle.ws.close(); + this.#removeSocket(handle); } }, this.#idleTimeoutMs); - - this.#idleTimers.set(ws, timer); } /** * Clears the idle timer for the specified WebSocket. * - * @param ws - The WebSocket for which to clear the idle timer. + * @param handle - The WebSocketHandle for which to clear the idle timer. */ - #clearIdleTimer(ws: WebSocket): void { - const timer = this.#idleTimers.get(ws); + #clearIdleTimer(handle: WebSocketHandle): void { + const timer = handle.idleTimer; if (timer) { clearTimeout(timer); - this.#idleTimers.delete(ws); + handle.idleTimer = undefined; } } + /** + * Processes pending requests to acquire a connection. Reuses existing connections when possible. + */ #processWaitingQueue(): void { while ( this.#waitingQueue.length > 0 && - this.#pool.size + this.#connecting.size < this.#maxConnections + this.#pool.size < this.#maxConnections ) { const nextInQueue = this.#waitingQueue.shift(); - if (!nextInQueue) { continue; } const { url, resolve, reject } = nextInQueue; - // Re-check if a connection for this URL was created while this request was in the queue - const existingSocket = this.#pool.get(url); - if (existingSocket && existingSocket.readyState < WebSocket.CLOSING) { - this.#checkOutSocket(existingSocket); - resolve(existingSocket); - } else { - const connectingPromise = this.#connecting.get(url); - if (connectingPromise) { - connectingPromise.then(resolve, reject); - } + + const existingHandle = this.#pool.get(url); + if (existingHandle && existingHandle.ws.readyState === WebSocket.OPEN) { + this.#checkOut(existingHandle); + resolve(existingHandle); + return; } + + this.#createSocket(url).then(resolve, reject); } } - #checkOutSocket(ws: WebSocket): void { - const count = (this.#refCounts.get(ws) || 0) + 1; - this.#refCounts.set(ws, count); - this.#clearIdleTimer(ws); + #checkOut(handle: WebSocketHandle): void { + if (handle.refCount == null) { + throw new Error('[WebSocketPool] Handle refCount unexpectedly null.'); + } + + ++handle.refCount; + this.#clearIdleTimer(handle); } #normalizeUrl(url: string): string {