Browse Source

Introduce WebSocket handles and simplify

- The `acquire`, `release`, and `drain` method signatures remain unchanged.
- `WebSocketHandle` objects are introduced to tie each socket's state to the socket itself.
- Initial connection logic is simplified.
- Some minor improvements and validations are introduced based on AI-assisted code review.
master
buttercat1791 8 months ago
parent
commit
2a57122436
  1. 274
      src/lib/data_structures/websocket_pool.ts

274
src/lib/data_structures/websocket_pool.ts

@ -1,6 +1,16 @@ @@ -1,6 +1,16 @@
/**
* A representation of a WebSocket connection used internally by the WebSocketPool. Attaches
* information about the connection's state within the resource pool to the connection itself.
*/
interface WebSocketHandle {
ws: WebSocket;
refCount: number;
idleTimer?: ReturnType<typeof setTimeout>;
}
interface WebSocketPoolWaitingQueueItem {
url: string;
resolve: (ws: WebSocket) => void;
resolve: (handle: WebSocketHandle) => void;
reject: (reason?: any) => void;
}
@ -16,10 +26,11 @@ interface WebSocketPoolWaitingQueueItem { @@ -16,10 +26,11 @@ interface WebSocketPoolWaitingQueueItem {
export class WebSocketPool {
static #shared: WebSocketPool;
#pool: Map<string, WebSocket> = new Map();
#connecting: Map<string, Promise<WebSocket>> = new Map();
#refCounts: Map<WebSocket, number> = new Map();
#idleTimers: Map<WebSocket, ReturnType<typeof setTimeout>> = new Map();
/**
* A map of WebSocket URLs to the handles containing the connection and its state.
*/
#pool: Map<string, WebSocketHandle> = new Map();
#idleTimeoutMs: number;
#maxConnections: number;
#waitingQueue: WebSocketPoolWaitingQueueItem[] = [];
@ -50,71 +61,38 @@ export class WebSocketPool { @@ -50,71 +61,38 @@ export class WebSocketPool {
// #region Resource Management Interface
/**
* Acquires a WebSocket connection for the specified URL. If a connection is available in the
* pool, that connection is returned. If no connection is available but the pool is not full, a
* new connection is created and returned. If the pool is full, the request is queued until a
* connection is released, at which point the newly-available connection is returned to the
* caller.
* Sets the maximum number of simultaneous WebSocket connections.
*
* @param url - The URL to connect to.
* @returns A promise that resolves with a WebSocket connection.
* @param limit - The new connection limit.
*/
public async acquire(url: string): Promise<WebSocket> {
const normalizedUrl = this.#normalizeUrl(url);
const existingSocket = this.#pool.get(normalizedUrl);
if (existingSocket && existingSocket.readyState < WebSocket.CLOSING) {
this.#checkOutSocket(existingSocket);
return Promise.resolve(existingSocket);
public set maxConnections(limit: number) {
if (limit === this.#maxConnections) {
return;
}
const connectingPromise = this.#connecting.get(normalizedUrl);
if (connectingPromise) {
const ws = await connectingPromise;
if (ws.readyState === WebSocket.OPEN) {
this.#checkOutSocket(ws);
return ws;
}
throw new Error(`[WebSocketPool] WebSocket connection failed for ${normalizedUrl}`);
if (limit == null || isNaN(limit)) {
throw new Error('[WebSocketPool] Connection limit must be a number.');
}
if (this.#pool.size + this.#connecting.size >= this.#maxConnections) {
return new Promise((resolve, reject) => {
this.#waitingQueue.push({ url: normalizedUrl, resolve, reject });
});
if (limit <= 0) {
throw new Error('[WebSocketPool] Connection limit must be greater than 0.');
}
const newConnectionPromise = this.#createSocket(normalizedUrl);
this.#connecting.set(normalizedUrl, newConnectionPromise);
newConnectionPromise.finally(() => {
this.#connecting.delete(normalizedUrl);
});
if (!Number.isInteger(limit)) {
throw new Error('[WebSocketPool] Connection limit must be an integer.');
}
return newConnectionPromise;
this.#maxConnections = limit;
this.#processWaitingQueue();
}
/**
* Releases a WebSocket connection back to the pool. If there are pending requests for the same
* URL, the connection is passed to the requestor in the queue. Otherwise, the connection is
* marked as available.
* Gets the current maximum number of simultaneous WebSocket connections.
*
* @param ws - The WebSocket connection to release.
* @returns The current connection limit.
*/
public release(ws: WebSocket): void {
const currentCount = this.#refCounts.get(ws);
if (!currentCount) {
throw new Error('[WebSocketPool] Attempted to release an unmanaged WebSocket connection.');
}
if (currentCount > 0) {
const newCount = currentCount - 1;
this.#refCounts.set(ws, newCount);
if (newCount === 0) {
this.#startIdleTimer(ws);
}
}
public get maxConnections(): number {
return this.#maxConnections;
}
/**
@ -122,7 +100,23 @@ export class WebSocketPool { @@ -122,7 +100,23 @@ export class WebSocketPool {
*
* @param timeoutMs - The timeout in milliseconds after which idle connections will be closed.
*/
public setIdleTimeout(timeoutMs: number): void {
public set idleTimeoutMs(timeoutMs: number) {
if (timeoutMs === this.#idleTimeoutMs) {
return;
}
if (timeoutMs == null || isNaN(timeoutMs)) {
throw new Error('[WebSocketPool] Idle timeout must be a number.');
}
if (timeoutMs <= 0) {
throw new Error('[WebSocketPool] Idle timeout must be greater than 0.');
}
if (!Number.isInteger(timeoutMs)) {
throw new Error('[WebSocketPool] Idle timeout must be an integer.');
}
this.#idleTimeoutMs = timeoutMs;
}
@ -131,27 +125,66 @@ export class WebSocketPool { @@ -131,27 +125,66 @@ export class WebSocketPool {
*
* @returns The current idle timeout in milliseconds.
*/
public getIdleTimeout(): number {
public get idleTimeoutMs(): number {
return this.#idleTimeoutMs;
}
/**
* Sets the maximum number of simultaneous WebSocket connections.
* Acquires a WebSocket connection for the specified URL. If a connection is available in the
* pool, that connection is returned. If no connection is available but the pool is not full, a
* new connection is created and returned. If the pool is full, the request is queued until a
* connection is released, at which point the newly-available connection is returned to the
* caller.
*
* @param limit - The new connection limit.
* @param url - The URL to connect to.
* @returns A promise that resolves with a WebSocket connection.
*/
public set maxConnections(limit: number) {
this.#maxConnections = limit;
this.#processWaitingQueue();
public async acquire(url: string): Promise<WebSocket> {
const normalizedUrl = this.#normalizeUrl(url);
const handle = this.#pool.get(normalizedUrl);
try {
if (handle && handle.ws.readyState === WebSocket.OPEN) {
this.#checkOut(handle);
return handle.ws;
}
if (this.#pool.size >= this.#maxConnections) {
return new Promise((resolve, reject) => {
this.#waitingQueue.push({
url: normalizedUrl,
resolve: (handle) => resolve(handle.ws),
reject,
});
});
}
const newHandle = await this.#createSocket(normalizedUrl);
return newHandle.ws;
} catch (error) {
throw new Error(
`[WebSocketPool] Failed to acquire connection for ${normalizedUrl}: ${error}`
);
}
}
/**
* Gets the current maximum number of simultaneous WebSocket connections.
* Releases a WebSocket connection back to the pool. If there are pending requests for the same
* URL, the connection is passed to the requestor in the queue. Otherwise, the connection is
* marked as available.
*
* @returns The current connection limit.
* @param handle - The WebSocketHandle to release.
*/
public get maxConnections(): number {
return this.#maxConnections;
public release(ws: WebSocket): void {
const normalizedUrl = this.#normalizeUrl(ws.url);
const handle = this.#pool.get(normalizedUrl);
if (!handle) {
throw new Error('[WebSocketPool] Attempted to release an unmanaged WebSocket connection.');
}
if (--handle.refCount === 0) {
this.#startIdleTimer(handle);
}
}
/**
@ -159,52 +192,47 @@ export class WebSocketPool { @@ -159,52 +192,47 @@ export class WebSocketPool {
*/
public drain(): void {
// Clear all idle timers first
for (const timer of this.#idleTimers.values()) {
clearTimeout(timer);
for (const handle of this.#pool.values()) {
this.#clearIdleTimer(handle);
}
this.#idleTimers.clear();
for (const { reject } of this.#waitingQueue) {
reject(new Error('[WebSocketPool] Draining pool.'));
}
this.#waitingQueue = [];
for (const promise of this.#connecting.values()) {
// While we can't cancel the connection attempt, we can prevent callers from using it.
promise.catch(() => {
/* ignore connection errors during drain */
});
}
this.#connecting.clear();
for (const ws of this.#pool.values()) {
ws.close();
for (const handle of this.#pool.values()) {
handle.ws.close();
}
this.#pool.clear();
this.#refCounts.clear();
}
// #endregion
// #region Private Helper Methods
#createSocket(url: string): Promise<WebSocket> {
#createSocket(url: string): Promise<WebSocketHandle> {
return new Promise((resolve, reject) => {
try {
const ws = new WebSocket(url);
ws.onopen = () => {
this.#pool.set(url, ws);
this.#refCounts.set(ws, 1);
const handle: WebSocketHandle = {
ws: new WebSocket(url),
refCount: 1,
};
handle.ws.onopen = () => {
this.#pool.set(url, handle);
// Remove the socket from the pool when it is closed. The socket may be closed by
// either the client or the server.
ws.onclose = () => this.#removeSocket(ws);
resolve(ws);
handle.ws.onclose = () => this.#removeSocket(handle);
resolve(handle);
};
ws.onerror = (event) => {
handle.ws.onerror = (event) => {
this.#removeSocket(handle);
this.#processWaitingQueue();
reject(new Error(`[WebSocketPool] WebSocket connection failed for ${url}: ${event.type}`));
reject(
new Error(`[WebSocketPool] WebSocket connection failed for ${url}: ${event.type}`)
);
};
} catch (error) {
this.#processWaitingQueue();
@ -213,11 +241,10 @@ export class WebSocketPool { @@ -213,11 +241,10 @@ export class WebSocketPool {
});
}
#removeSocket(ws: WebSocket): void {
const url = ws.url;
this.#pool.delete(url);
this.#refCounts.delete(ws);
this.#clearIdleTimer(ws);
#removeSocket(handle: WebSocketHandle): void {
this.#clearIdleTimer(handle);
handle.ws.onopen = handle.ws.onerror = handle.ws.onclose = null;
this.#pool.delete(this.#normalizeUrl(handle.ws.url));
this.#processWaitingQueue();
}
@ -227,64 +254,65 @@ export class WebSocketPool { @@ -227,64 +254,65 @@ export class WebSocketPool {
*
* @param ws - The WebSocket for which to start the idle timer.
*/
#startIdleTimer(ws: WebSocket): void {
#startIdleTimer(handle: WebSocketHandle): void {
// Clear any existing timer first
this.#clearIdleTimer(ws);
this.#clearIdleTimer(handle);
const timer = setTimeout(() => {
const refCount = this.#refCounts.get(ws);
if ((!refCount || refCount === 0) && ws.readyState === WebSocket.OPEN) {
ws.close();
this.#removeSocket(ws);
handle.idleTimer = setTimeout(() => {
const refCount = handle.refCount;
if (refCount === 0 && handle.ws.readyState === WebSocket.OPEN) {
handle.ws.close();
this.#removeSocket(handle);
}
}, this.#idleTimeoutMs);
this.#idleTimers.set(ws, timer);
}
/**
* Clears the idle timer for the specified WebSocket.
*
* @param ws - The WebSocket for which to clear the idle timer.
* @param handle - The WebSocketHandle for which to clear the idle timer.
*/
#clearIdleTimer(ws: WebSocket): void {
const timer = this.#idleTimers.get(ws);
#clearIdleTimer(handle: WebSocketHandle): void {
const timer = handle.idleTimer;
if (timer) {
clearTimeout(timer);
this.#idleTimers.delete(ws);
handle.idleTimer = undefined;
}
}
/**
* Processes pending requests to acquire a connection. Reuses existing connections when possible.
*/
#processWaitingQueue(): void {
while (
this.#waitingQueue.length > 0 &&
this.#pool.size + this.#connecting.size < this.#maxConnections
this.#pool.size < this.#maxConnections
) {
const nextInQueue = this.#waitingQueue.shift();
if (!nextInQueue) {
continue;
}
const { url, resolve, reject } = nextInQueue;
// Re-check if a connection for this URL was created while this request was in the queue
const existingSocket = this.#pool.get(url);
if (existingSocket && existingSocket.readyState < WebSocket.CLOSING) {
this.#checkOutSocket(existingSocket);
resolve(existingSocket);
} else {
const connectingPromise = this.#connecting.get(url);
if (connectingPromise) {
connectingPromise.then(resolve, reject);
const existingHandle = this.#pool.get(url);
if (existingHandle && existingHandle.ws.readyState === WebSocket.OPEN) {
this.#checkOut(existingHandle);
resolve(existingHandle);
return;
}
this.#createSocket(url).then(resolve, reject);
}
}
#checkOut(handle: WebSocketHandle): void {
if (handle.refCount == null) {
throw new Error('[WebSocketPool] Handle refCount unexpectedly null.');
}
#checkOutSocket(ws: WebSocket): void {
const count = (this.#refCounts.get(ws) || 0) + 1;
this.#refCounts.set(ws, count);
this.#clearIdleTimer(ws);
++handle.refCount;
this.#clearIdleTimer(handle);
}
#normalizeUrl(url: string): string {

Loading…
Cancel
Save