diff --git a/src/constants.ts b/src/constants.ts index dbcf1bd3..d212c1bb 100644 --- a/src/constants.ts +++ b/src/constants.ts @@ -162,6 +162,20 @@ export const PUBLISH_MULTI_RELAY_CONNECTION_CAP_MS = 12_000 /** Max merged URLs per REQ / timeline relay list (see `relay-url-priority`). */ export const MAX_REQ_RELAY_URLS = MAX_CONCURRENT_RELAY_CONNECTIONS +/** Session-park relays that finish REQ waves much slower than peers (see {@link relaySessionStrikes}). */ +export const RELAY_SLOW_PARK_ABSOLUTE_MS = 10_000 +/** Multiplier over batch median EOSE latency to count a relay as slow in multi-relay waves. */ +export const RELAY_SLOW_PARK_MEDIAN_MULTIPLIER = 2.5 +/** Slow signals within a session before omitting the relay from multi-relay read stacks. */ +export const RELAY_SLOW_PARK_SIGNALS_THRESHOLD = 2 +/** How long a session-parked slow relay stays out of multi-relay reads. */ +export const RELAY_SLOW_PARK_COOLDOWN_MS = 5 * 60 * 1000 + +/** Close pooled WebSocket when no SUBs and no pool activity for this long (see {@link initRelayPoolIdle}). */ +export const RELAY_POOL_SOCKET_IDLE_MS = 90_000 +/** How often to scan for idle relay sockets. */ +export const RELAY_POOL_IDLE_SWEEP_INTERVAL_MS = 45_000 + /** * Maximum `kinds` length in a single NIP-01 filter. Some relays NOTICE "too many kinds" and reject the * entire REQ (e.g. strfry derivatives, relay.vukihreedia.xyz). QueryService splits larger arrays into diff --git a/src/lib/relay-pool-idle.ts b/src/lib/relay-pool-idle.ts new file mode 100644 index 00000000..62a9889c --- /dev/null +++ b/src/lib/relay-pool-idle.ts @@ -0,0 +1,107 @@ +import { RELAY_POOL_IDLE_SWEEP_INTERVAL_MS, RELAY_POOL_SOCKET_IDLE_MS } from '@/constants' +import logger from '@/lib/logger' +import { canonicalRelaySessionKey, normalizeAnyRelayUrl } from '@/lib/url' +import type { SimplePool } from 'nostr-tools' + +type HasActiveSubsFn = (canonicalRelayKey: string) => boolean + +let pool: SimplePool | null = null +let hasActiveSubs: HasActiveSubsFn | null = null +let sweepTimer: ReturnType | null = null +const lastActivityMs = new Map() + +function canon(url: string): string { + return canonicalRelaySessionKey(normalizeAnyRelayUrl(url) || url.trim()) +} + +/** Mark relay URL as recently used (connect, REQ, publish). */ +export function touchRelayPoolActivity(url: string): void { + const key = canon(url) + if (!key) return + lastActivityMs.set(key, Date.now()) +} + +/** + * Wire idle socket sweeps to the app pool. Safe to call once from {@link ClientService} constructor. + */ +export function initRelayPoolIdle(nextPool: SimplePool, nextHasActiveSubs: HasActiveSubsFn): void { + pool = nextPool + hasActiveSubs = nextHasActiveSubs + if (sweepTimer != null) return + sweepTimer = setInterval(() => { + sweepIdleRelayPoolSockets() + }, RELAY_POOL_IDLE_SWEEP_INTERVAL_MS) +} + +/** Close connected sockets that have no active SUBs and exceeded {@link RELAY_POOL_SOCKET_IDLE_MS}. */ +export function sweepIdleRelayPoolSockets(): void { + if (!pool || !hasActiveSubs) return + const now = Date.now() + let status: Map + try { + status = pool.listConnectionStatus() + } catch { + return + } + + const toClose: string[] = [] + for (const [url, connected] of status) { + if (!connected) continue + const key = canon(url) + if (!key) continue + if (hasActiveSubs(key)) continue + const last = lastActivityMs.get(key) ?? 0 + if (now - last < RELAY_POOL_SOCKET_IDLE_MS) continue + toClose.push(normalizeAnyRelayUrl(url) || url) + } + + if (toClose.length === 0) return + try { + pool.close(toClose) + logger.debug('[RelayPoolIdle] closed idle sockets', { count: toClose.length, relays: toClose }) + } catch { + /* ignore */ + } + for (const url of toClose) { + lastActivityMs.delete(canon(url)) + } +} + +/** Close specific relays when they are connected, have no active SUBs, and are not session-parked hot. */ +export function closeRelayPoolSocketsIfIdle(urls: readonly string[]): void { + if (!pool || !hasActiveSubs || urls.length === 0) return + let status: Map + try { + status = pool.listConnectionStatus() + } catch { + return + } + + const toClose: string[] = [] + for (const raw of urls) { + const key = canon(raw) + if (!key || hasActiveSubs(key)) continue + const normalized = normalizeAnyRelayUrl(raw) || raw + const connected = [...status.entries()].some( + ([u, ok]) => ok && canon(u) === key + ) + if (connected) toClose.push(normalized) + } + if (toClose.length === 0) return + try { + pool.close(toClose) + logger.debug('[RelayPoolIdle] closed sockets after slow-park', { relays: toClose }) + } catch { + /* ignore */ + } +} + +export function resetRelayPoolIdleForTests(): void { + if (sweepTimer != null) { + clearInterval(sweepTimer) + sweepTimer = null + } + pool = null + hasActiveSubs = null + lastActivityMs.clear() +} diff --git a/src/lib/relay-strikes.test.ts b/src/lib/relay-strikes.test.ts new file mode 100644 index 00000000..3b57e46b --- /dev/null +++ b/src/lib/relay-strikes.test.ts @@ -0,0 +1,44 @@ +import { describe, expect, it, beforeEach } from 'vitest' +import { relaySessionStrikes } from './relay-strikes' +import type { RelayOpTerminalRow } from '@/services/relay-operation-log.service' + +function row( + url: string, + outcome: RelayOpTerminalRow['outcome'], + msFromBatchStart: number +): RelayOpTerminalRow { + return { cmdIndex: 0, relayUrl: url, outcome, msFromBatchStart } +} + +describe('relaySessionStrikes.observeSubscribeBatch', () => { + beforeEach(() => { + relaySessionStrikes.reset() + }) + + it('session-parks a relay much slower than batch median after two slow waves', () => { + const slow = 'wss://slow.example.com/' + const fast = 'wss://fast.example.com/' + + relaySessionStrikes.observeSubscribeBatch([ + row(fast, 'eose', 400), + row(slow, 'eose', 12_000) + ]) + expect(relaySessionStrikes.isReadHttpSkipped(slow)).toBe(false) + + relaySessionStrikes.observeSubscribeBatch([ + row(fast, 'eose', 500), + row(slow, 'eose', 11_000) + ]) + expect(relaySessionStrikes.isReadHttpSkipped(slow)).toBe(true) + expect(relaySessionStrikes.isReadHttpSkipped(fast)).toBe(false) + }) + + it('clears slow parking on fast EOSE via recordReadSuccess', () => { + const url = 'wss://recover.example.com/' + relaySessionStrikes.observeSubscribeBatch([row(url, 'eose', 15_000)]) + relaySessionStrikes.observeSubscribeBatch([row(url, 'eose', 14_000)]) + expect(relaySessionStrikes.isReadHttpSkipped(url)).toBe(true) + relaySessionStrikes.recordReadSuccess(url) + expect(relaySessionStrikes.isReadHttpSkipped(url)).toBe(false) + }) +}) diff --git a/src/lib/relay-strikes.ts b/src/lib/relay-strikes.ts index 460ce74a..78be33b8 100644 --- a/src/lib/relay-strikes.ts +++ b/src/lib/relay-strikes.ts @@ -1,7 +1,14 @@ +import { + RELAY_SLOW_PARK_ABSOLUTE_MS, + RELAY_SLOW_PARK_COOLDOWN_MS, + RELAY_SLOW_PARK_MEDIAN_MULTIPLIER, + RELAY_SLOW_PARK_SIGNALS_THRESHOLD +} from '@/constants' import type { Event } from 'nostr-tools' import { getRelayListFromEvent } from '@/lib/event-metadata' import logger from '@/lib/logger' import { canonicalRelaySessionKey, isHttpRelayUrl } from '@/lib/url' +import type { RelayOpTerminalRow } from '@/services/relay-operation-log.service' /** Conservative: 5 read/publish failures → skip until this many ms after last qualifying failure. */ const STRIKE_FAILURES_THRESHOLD = 5 @@ -31,6 +38,8 @@ type StrikeEntry = { readFailures: number readLastStrikeIncrementAt: number readStrikeSkipUntil: number + slowSignals: number + slowParkUntil: number publishFailures: number publishLastStrikeIncrementAt: number publishStrikeSkipUntil: number @@ -47,6 +56,8 @@ function emptyEntry(): StrikeEntry { readFailures: 0, readLastStrikeIncrementAt: 0, readStrikeSkipUntil: 0, + slowSignals: 0, + slowParkUntil: 0, publishFailures: 0, publishLastStrikeIncrementAt: 0, publishStrikeSkipUntil: 0, @@ -103,7 +114,7 @@ class RelaySessionStrikes { if (!key) return false const e = this.byKey.get(key) if (!e) return false - return Date.now() < Math.max(e.rateLimitUntil, e.readStrikeSkipUntil) + return Date.now() < Math.max(e.rateLimitUntil, e.readStrikeSkipUntil, e.slowParkUntil) } /** True when publish should omit this relay (unless single-target override). */ @@ -172,6 +183,70 @@ class RelaySessionStrikes { e.readFailures = 0 e.readStrikeSkipUntil = 0 e.readLastStrikeIncrementAt = 0 + e.slowSignals = 0 + e.slowParkUntil = 0 + } + + /** + * After a subscribe/query wave: session-park relays that were much slower than peers (or timed out). + * Returns URLs whose pooled sockets should be closed when idle. + */ + observeSubscribeBatch(rows: readonly RelayOpTerminalRow[]): string[] { + if (rows.length === 0) return [] + const now = Date.now() + const socketsToClose: string[] = [] + + const eoseRows = rows.filter((r) => r.outcome === 'eose') + const sortedLatencies = + eoseRows.length > 0 ? [...eoseRows.map((r) => r.msFromBatchStart)].sort((a, b) => a - b) : [] + const medianMs = + sortedLatencies.length > 0 + ? sortedLatencies[Math.floor((sortedLatencies.length - 1) / 2)]! + : RELAY_SLOW_PARK_ABSOLUTE_MS + + const slowThresholdMs = + rows.length > 1 + ? Math.max(RELAY_SLOW_PARK_ABSOLUTE_MS, Math.round(medianMs * RELAY_SLOW_PARK_MEDIAN_MULTIPLIER)) + : RELAY_SLOW_PARK_ABSOLUTE_MS + + for (const row of rows) { + const key = sessionKey(row.relayUrl) + if (!key) continue + + const timedOut = row.outcome === 'timeout' + const slowEose = row.outcome === 'eose' && row.msFromBatchStart >= slowThresholdMs + const fastEose = row.outcome === 'eose' && row.msFromBatchStart < slowThresholdMs * 0.6 + + if (timedOut || slowEose) { + const parked = this.recordSlowSignalKey(key, now) + if (parked) socketsToClose.push(row.relayUrl) + if (timedOut) this.recordReadFailureKey(key, 'connection') + continue + } + + if (fastEose) { + const e = this.byKey.get(key) + if (e && e.slowSignals > 0) { + e.slowSignals = Math.max(0, e.slowSignals - 1) + } + } + } + + return socketsToClose + } + + private recordSlowSignalKey(key: string, now: number): boolean { + const e = this.getEntry(key) + if (this.cacheRelayKeys.has(key)) return false + e.slowSignals += 1 + if (e.slowSignals < RELAY_SLOW_PARK_SIGNALS_THRESHOLD) return false + e.slowParkUntil = Math.max(e.slowParkUntil, now + RELAY_SLOW_PARK_COOLDOWN_MS) + logger.warn('[RelayStrikes] session-parked slow relay', { + key, + slowSignals: e.slowSignals, + cooldownMs: RELAY_SLOW_PARK_COOLDOWN_MS + }) + return true } recordPublishFailure(url: string): void { diff --git a/src/services/client-query.service.ts b/src/services/client-query.service.ts index 938468c3..4d497af6 100644 --- a/src/services/client-query.service.ts +++ b/src/services/client-query.service.ts @@ -351,6 +351,13 @@ export class QueryService { } } + /** True when this relay still has at least one open REQ subscription (live timeline, etc.). */ + relayHasActiveSubscriptions(relayKeyOrUrl: string): boolean { + const key = canonicalRelaySessionKey(normalizeUrl(relayKeyOrUrl) || relayKeyOrUrl.trim()) + if (!key) return false + return (this.activeSubCountByRelay.get(key) ?? 0) > 0 + } + private canonicalSeenOnEventId(eventId: string): string { const t = eventId.trim() return /^[0-9a-f]{64}$/i.test(t) ? t.toLowerCase() : t diff --git a/src/services/client.service.ts b/src/services/client.service.ts index fcaa7537..d0c3a4ee 100644 --- a/src/services/client.service.ts +++ b/src/services/client.service.ts @@ -150,6 +150,7 @@ import { } from '@/lib/url' import { canonicalFeedFilter, canonicalRelayUrls } from '@/features/feed/descriptor' import { feedRelayPolicyUrls } from '@/features/feed/relay-policy' +import { initRelayPoolIdle, touchRelayPoolActivity } from '@/lib/relay-pool-idle' import { relaySessionStrikes } from '@/lib/relay-strikes' import { isSafari } from '@/lib/utils' import { @@ -421,6 +422,7 @@ class ClientService extends EventTarget { }) patchPoolRelayAuthRaceAndFeedback(relay) applyRelayNip42AckTimeout(relay) + touchRelayPoolActivity(url) return relay } @@ -456,6 +458,10 @@ class ClientService extends EventTarget { } }) this.bookstrService = createBookstrService(this.queryService) + + initRelayPoolIdle(this.pool, (relayKeyOrUrl) => + this.queryService.relayHasActiveSubscriptions(relayKeyOrUrl) + ) } public static getInstance(): ClientService { diff --git a/src/services/relay-operation-log.service.ts b/src/services/relay-operation-log.service.ts index 1a0b8fb3..b1c7fbcd 100644 --- a/src/services/relay-operation-log.service.ts +++ b/src/services/relay-operation-log.service.ts @@ -1,3 +1,5 @@ +import { closeRelayPoolSocketsIfIdle } from '@/lib/relay-pool-idle' +import { relaySessionStrikes } from '@/lib/relay-strikes' import logger from '@/lib/logger' import { normalizeAnyRelayUrl } from '@/lib/url' import type { Filter } from 'nostr-tools' @@ -307,6 +309,11 @@ export class RelaySubscribeOpBatch { } } + const parkedCloseUrls = relaySessionStrikes.observeSubscribeBatch(rows) + if (parkedCloseUrls.length > 0) { + closeRelayPoolSocketsIfIdle(parkedCloseUrls) + } + this.onBatchEnd?.(rows) } }