Browse Source

Improve `WebSocketPool` class

- Share a single connection per URL
- Set a limit to the number of connections to prevent resource exhaustion
- Prevent certain race conditions
master
buttercat1791 8 months ago
parent
commit
2839e2547a
  1. 240
      src/lib/data_structures/websocket_pool.ts

240
src/lib/data_structures/websocket_pool.ts

@ -1,27 +1,39 @@ @@ -1,27 +1,39 @@
interface WebSocketPoolWaitingQueueItem {
url: string;
resolve: (ws: WebSocket) => void;
reject: (reason?: any) => void;
}
/**
* A resource pool for WebSocket connections.
* The pool maintains a set of up to 4 connections for each URL. Connections are acquired from the
* pool on a per-URL basis. Idle connections are automatically closed after 60 seconds.
* A resource pool for WebSocket connections. Its purpose is to allow multiple requestors to share
* an open WebSocket connection for greater resource efficiency.
*
* The pool maintains a single connection per URL. Requestors may acquire a reference to a
* connection, and are expected to release it when it is no longer needed. If the number of
* requestors using a connection drops to zero and no new requestors acquire it for a set period,
* the connection is closed.
*/
export class WebSocketPool {
static #instance: WebSocketPool;
#pool: Map<string, WebSocket[]> = new Map();
#busy: Set<WebSocket> = new Set();
#waitQueue: Map<string, Array<(ws: WebSocket | PromiseLike<WebSocket>) => void>> = new Map();
#idleTimers: Map<WebSocket, number> = new Map();
#maxConnectionsPerUrl;
#idleTimeoutMs;
#pool: Map<string, WebSocket> = new Map();
#connecting: Map<string, Promise<WebSocket>> = new Map();
#refCounts: Map<WebSocket, number> = new Map();
#idleTimers: Map<WebSocket, ReturnType<typeof setTimeout>> = new Map();
#idleTimeoutMs: number;
#maxConnections: number;
#waitingQueue: WebSocketPoolWaitingQueueItem[] = [];
/**
* Private constructor invoked when the singleton instance is first created.
* @param maxConnectionsPerUrl - The maximum number of connections to maintain for each URL.
* @param idleTimeoutMs - The timeout in milliseconds after which idle connections will be
* closed. Defaults to 60 seconds.
* @param maxConnections - The maximum number of simultaneous WebSocket connections. Defaults to
* 16.
*/
private constructor(maxConnectionsPerUrl: number = 4, idleTimeoutMs: number = 60000) {
this.#maxConnectionsPerUrl = maxConnectionsPerUrl;
private constructor(idleTimeoutMs: number = 60000, maxConnections: number = 16) {
this.#idleTimeoutMs = idleTimeoutMs;
this.#maxConnections = maxConnections;
}
/**
@ -47,27 +59,39 @@ export class WebSocketPool { @@ -47,27 +59,39 @@ export class WebSocketPool {
* @param url - The URL to connect to.
* @returns A promise that resolves with a WebSocket connection.
*/
public acquire(url: string): Promise<WebSocket> {
const availableSocket = this.#findAvailableSocket(url);
if (availableSocket) {
// Clear any idle timer for this socket since it's being used again.
this.#clearIdleTimer(availableSocket);
// Add the reference to the socket resource to the busy set.
this.#busy.add(availableSocket);
return Promise.resolve(availableSocket);
public async acquire(url: string): Promise<WebSocket> {
const normalizedUrl = this.#normalizeUrl(url);
const existingSocket = this.#pool.get(normalizedUrl);
if (existingSocket && existingSocket.readyState < WebSocket.CLOSING) {
this.#checkOutSocket(existingSocket);
return Promise.resolve(existingSocket);
}
const socketsForUrl = this.#pool.get(url) || [];
if (socketsForUrl.length < this.#maxConnectionsPerUrl) {
return this.#createSocket(url);
const connectingPromise = this.#connecting.get(normalizedUrl);
if (connectingPromise) {
const ws = await connectingPromise;
if (ws.readyState === WebSocket.OPEN) {
this.#checkOutSocket(ws);
return ws;
}
throw new Error(`[WebSocketPool] WebSocket connection failed for ${normalizedUrl}`);
}
return new Promise(resolve => {
if (!this.#waitQueue.has(url)) {
this.#waitQueue.set(url, []);
if (this.#pool.size + this.#connecting.size >= this.#maxConnections) {
return new Promise((resolve, reject) => {
this.#waitingQueue.push({ url: normalizedUrl, resolve, reject });
});
}
this.#waitQueue.get(url)!.push(resolve);
const newConnectionPromise = this.#createSocket(normalizedUrl);
this.#connecting.set(normalizedUrl, newConnectionPromise);
newConnectionPromise.finally(() => {
this.#connecting.delete(normalizedUrl);
});
return newConnectionPromise;
}
/**
@ -78,25 +102,20 @@ export class WebSocketPool { @@ -78,25 +102,20 @@ export class WebSocketPool {
* @param ws - The WebSocket connection to release.
*/
public release(ws: WebSocket): void {
const url = ws.url;
const waitingResolvers = this.#waitQueue.get(url);
if (waitingResolvers?.length > 0) {
const resolver = waitingResolvers.shift()!;
// Cleanup empty queues immediately
if (waitingResolvers.length === 0) {
this.#waitQueue.delete(url);
const currentCount = this.#refCounts.get(ws);
if (!currentCount) {
throw new Error('[WebSocketPool] Attempted to release an unmanaged WebSocket connection.');
}
resolver(ws);
return;
}
if (currentCount > 0) {
const newCount = currentCount - 1;
this.#refCounts.set(ws, newCount);
// If no requestors are waiting, delete the reference from the busy set and start idle timer.
this.#busy.delete(ws);
if (newCount === 0) {
this.#startIdleTimer(ws);
}
}
}
/**
* Sets the idle timeout for WebSocket connections.
@ -116,6 +135,25 @@ export class WebSocketPool { @@ -116,6 +135,25 @@ export class WebSocketPool {
return this.#idleTimeoutMs;
}
/**
* Sets the maximum number of simultaneous WebSocket connections.
*
* @param limit - The new connection limit.
*/
public set maxConnections(limit: number) {
this.#maxConnections = limit;
this.#processWaitingQueue();
}
/**
* Gets the current maximum number of simultaneous WebSocket connections.
*
* @returns The current connection limit.
*/
public get maxConnections(): number {
return this.#maxConnections;
}
/**
* Closes all WebSocket connections and "drains" the pool.
*/
@ -126,47 +164,50 @@ export class WebSocketPool { @@ -126,47 +164,50 @@ export class WebSocketPool {
}
this.#idleTimers.clear();
for (const sockets of this.#pool.values()) {
for (const ws of sockets) {
ws.onclose = null;
ws.close();
for (const { reject } of this.#waitingQueue) {
reject(new Error('[WebSocketPool] Draining pool.'));
}
this.#waitingQueue = [];
for (const promise of this.#connecting.values()) {
// While we can't cancel the connection attempt, we can prevent callers from using it.
promise.catch(() => {
/* ignore connection errors during drain */
});
}
this.#connecting.clear();
for (const ws of this.#pool.values()) {
ws.close();
}
this.#pool.clear();
this.#busy.clear();
this.#waitQueue.clear();
this.#refCounts.clear();
}
// #endregion
// #region Private Helper Methods
#findAvailableSocket(url: string): WebSocket | null {
const sockets = this.#pool.get(url);
if (!sockets) {
return null;
}
return sockets.find(ws => !this.#busy.has(ws) && ws.readyState === WebSocket.OPEN) ?? null;
}
#createSocket(url: string): Promise<WebSocket> {
return new Promise((resolve, reject) => {
try {
const ws = new WebSocket(url);
ws.onopen = () => {
const sockets = this.#pool.get(url) || [];
sockets.push(ws);
this.#pool.set(url, sockets);
this.#busy.add(ws);
resolve(ws);
};
this.#pool.set(url, ws);
this.#refCounts.set(ws, 1);
// Remove the socket from the pool when it is closed. The socket may be closed by either
// the client or the server.
// Remove the socket from the pool when it is closed. The socket may be closed by
// either the client or the server.
ws.onclose = () => this.#removeSocket(ws);
resolve(ws);
};
ws.onerror = () => reject(new Error('WebSocket error'));
ws.onerror = (event) => {
this.#processWaitingQueue();
reject(new Error(`[WebSocketPool] WebSocket connection failed for ${url}: ${event.type}`));
};
} catch (error) {
this.#processWaitingQueue();
reject(error);
}
});
@ -174,33 +215,25 @@ export class WebSocketPool { @@ -174,33 +215,25 @@ export class WebSocketPool {
#removeSocket(ws: WebSocket): void {
const url = ws.url;
const sockets = this.#pool.get(url);
if (sockets) {
const index = sockets.indexOf(ws);
if (index > -1) {
sockets.splice(index, 1);
}
if (sockets.length === 0) {
this.#pool.delete(url);
}
}
this.#busy.delete(ws);
this.#refCounts.delete(ws);
this.#clearIdleTimer(ws);
this.#processWaitingQueue();
}
/**
* Starts an idle timer for the specified WebSocket. The connection will be automatically
* closed after the idle timeout period if it remains unused.
*
* @param ws - The WebSocket to start the idle timer for.
* @param ws - The WebSocket for which to start the idle timer.
*/
#startIdleTimer(ws: WebSocket): void {
// Clear any existing timer first
this.#clearIdleTimer(ws);
const timer = setTimeout(() => {
// Check if the socket is still idle (not in busy set) before closing
if (!this.#busy.has(ws) && ws.readyState === WebSocket.OPEN) {
const refCount = this.#refCounts.get(ws);
if ((!refCount || refCount === 0) && ws.readyState === WebSocket.OPEN) {
ws.close();
this.#removeSocket(ws);
}
@ -212,7 +245,7 @@ export class WebSocketPool { @@ -212,7 +245,7 @@ export class WebSocketPool {
/**
* Clears the idle timer for the specified WebSocket.
*
* @param ws - The WebSocket to clear the idle timer for.
* @param ws - The WebSocket for which to clear the idle timer.
*/
#clearIdleTimer(ws: WebSocket): void {
const timer = this.#idleTimers.get(ws);
@ -222,5 +255,56 @@ export class WebSocketPool { @@ -222,5 +255,56 @@ export class WebSocketPool {
}
}
#processWaitingQueue(): void {
while (
this.#waitingQueue.length > 0 &&
this.#pool.size + this.#connecting.size < this.#maxConnections
) {
const nextInQueue = this.#waitingQueue.shift();
if (!nextInQueue) {
continue;
}
const { url, resolve, reject } = nextInQueue;
// Re-check if a connection for this URL was created while this request was in the queue
const existingSocket = this.#pool.get(url);
if (existingSocket && existingSocket.readyState < WebSocket.CLOSING) {
this.#checkOutSocket(existingSocket);
resolve(existingSocket);
} else {
const connectingPromise = this.#connecting.get(url);
if (connectingPromise) {
connectingPromise.then(resolve, reject);
}
}
}
}
#checkOutSocket(ws: WebSocket): void {
const count = (this.#refCounts.get(ws) || 0) + 1;
this.#refCounts.set(ws, count);
this.#clearIdleTimer(ws);
}
#normalizeUrl(url: string): string {
try {
const urlObj = new URL(url);
// The URL constructor correctly normalizes scheme and hostname casing.
let normalized = urlObj.toString();
// The logic to remove a trailing slash for connection coalescing can be kept,
// but should be done on the normalized string.
if (urlObj.pathname !== '/' && normalized.endsWith('/')) {
normalized = normalized.slice(0, -1);
}
return normalized;
} catch {
// If URL is invalid, return it as-is and let WebSocket constructor handle the error.
return url;
}
}
// #endregion
}

Loading…
Cancel
Save