Browse Source

more efficiency gains

imwald
Silberengel 4 weeks ago
parent
commit
6ca4e988c9
  1. 14
      src/constants.ts
  2. 107
      src/lib/relay-pool-idle.ts
  3. 44
      src/lib/relay-strikes.test.ts
  4. 77
      src/lib/relay-strikes.ts
  5. 7
      src/services/client-query.service.ts
  6. 6
      src/services/client.service.ts
  7. 7
      src/services/relay-operation-log.service.ts

14
src/constants.ts

@ -162,6 +162,20 @@ export const PUBLISH_MULTI_RELAY_CONNECTION_CAP_MS = 12_000
/** Max merged URLs per REQ / timeline relay list (see `relay-url-priority`). */ /** Max merged URLs per REQ / timeline relay list (see `relay-url-priority`). */
export const MAX_REQ_RELAY_URLS = MAX_CONCURRENT_RELAY_CONNECTIONS export const MAX_REQ_RELAY_URLS = MAX_CONCURRENT_RELAY_CONNECTIONS
/** Session-park relays that finish REQ waves much slower than peers (see {@link relaySessionStrikes}). */
export const RELAY_SLOW_PARK_ABSOLUTE_MS = 10_000
/** Multiplier over batch median EOSE latency to count a relay as slow in multi-relay waves. */
export const RELAY_SLOW_PARK_MEDIAN_MULTIPLIER = 2.5
/** Slow signals within a session before omitting the relay from multi-relay read stacks. */
export const RELAY_SLOW_PARK_SIGNALS_THRESHOLD = 2
/** How long a session-parked slow relay stays out of multi-relay reads. */
export const RELAY_SLOW_PARK_COOLDOWN_MS = 5 * 60 * 1000
/** Close pooled WebSocket when no SUBs and no pool activity for this long (see {@link initRelayPoolIdle}). */
export const RELAY_POOL_SOCKET_IDLE_MS = 90_000
/** How often to scan for idle relay sockets. */
export const RELAY_POOL_IDLE_SWEEP_INTERVAL_MS = 45_000
/** /**
* Maximum `kinds` length in a single NIP-01 filter. Some relays NOTICE "too many kinds" and reject the * Maximum `kinds` length in a single NIP-01 filter. Some relays NOTICE "too many kinds" and reject the
* entire REQ (e.g. strfry derivatives, relay.vukihreedia.xyz). QueryService splits larger arrays into * entire REQ (e.g. strfry derivatives, relay.vukihreedia.xyz). QueryService splits larger arrays into

107
src/lib/relay-pool-idle.ts

@ -0,0 +1,107 @@
import { RELAY_POOL_IDLE_SWEEP_INTERVAL_MS, RELAY_POOL_SOCKET_IDLE_MS } from '@/constants'
import logger from '@/lib/logger'
import { canonicalRelaySessionKey, normalizeAnyRelayUrl } from '@/lib/url'
import type { SimplePool } from 'nostr-tools'
type HasActiveSubsFn = (canonicalRelayKey: string) => boolean
let pool: SimplePool | null = null
let hasActiveSubs: HasActiveSubsFn | null = null
let sweepTimer: ReturnType<typeof setInterval> | null = null
const lastActivityMs = new Map<string, number>()
function canon(url: string): string {
return canonicalRelaySessionKey(normalizeAnyRelayUrl(url) || url.trim())
}
/** Mark relay URL as recently used (connect, REQ, publish). */
export function touchRelayPoolActivity(url: string): void {
const key = canon(url)
if (!key) return
lastActivityMs.set(key, Date.now())
}
/**
* Wire idle socket sweeps to the app pool. Safe to call once from {@link ClientService} constructor.
*/
export function initRelayPoolIdle(nextPool: SimplePool, nextHasActiveSubs: HasActiveSubsFn): void {
pool = nextPool
hasActiveSubs = nextHasActiveSubs
if (sweepTimer != null) return
sweepTimer = setInterval(() => {
sweepIdleRelayPoolSockets()
}, RELAY_POOL_IDLE_SWEEP_INTERVAL_MS)
}
/** Close connected sockets that have no active SUBs and exceeded {@link RELAY_POOL_SOCKET_IDLE_MS}. */
export function sweepIdleRelayPoolSockets(): void {
if (!pool || !hasActiveSubs) return
const now = Date.now()
let status: Map<string, boolean>
try {
status = pool.listConnectionStatus()
} catch {
return
}
const toClose: string[] = []
for (const [url, connected] of status) {
if (!connected) continue
const key = canon(url)
if (!key) continue
if (hasActiveSubs(key)) continue
const last = lastActivityMs.get(key) ?? 0
if (now - last < RELAY_POOL_SOCKET_IDLE_MS) continue
toClose.push(normalizeAnyRelayUrl(url) || url)
}
if (toClose.length === 0) return
try {
pool.close(toClose)
logger.debug('[RelayPoolIdle] closed idle sockets', { count: toClose.length, relays: toClose })
} catch {
/* ignore */
}
for (const url of toClose) {
lastActivityMs.delete(canon(url))
}
}
/** Close specific relays when they are connected, have no active SUBs, and are not session-parked hot. */
export function closeRelayPoolSocketsIfIdle(urls: readonly string[]): void {
if (!pool || !hasActiveSubs || urls.length === 0) return
let status: Map<string, boolean>
try {
status = pool.listConnectionStatus()
} catch {
return
}
const toClose: string[] = []
for (const raw of urls) {
const key = canon(raw)
if (!key || hasActiveSubs(key)) continue
const normalized = normalizeAnyRelayUrl(raw) || raw
const connected = [...status.entries()].some(
([u, ok]) => ok && canon(u) === key
)
if (connected) toClose.push(normalized)
}
if (toClose.length === 0) return
try {
pool.close(toClose)
logger.debug('[RelayPoolIdle] closed sockets after slow-park', { relays: toClose })
} catch {
/* ignore */
}
}
export function resetRelayPoolIdleForTests(): void {
if (sweepTimer != null) {
clearInterval(sweepTimer)
sweepTimer = null
}
pool = null
hasActiveSubs = null
lastActivityMs.clear()
}

44
src/lib/relay-strikes.test.ts

@ -0,0 +1,44 @@
import { describe, expect, it, beforeEach } from 'vitest'
import { relaySessionStrikes } from './relay-strikes'
import type { RelayOpTerminalRow } from '@/services/relay-operation-log.service'
function row(
url: string,
outcome: RelayOpTerminalRow['outcome'],
msFromBatchStart: number
): RelayOpTerminalRow {
return { cmdIndex: 0, relayUrl: url, outcome, msFromBatchStart }
}
describe('relaySessionStrikes.observeSubscribeBatch', () => {
beforeEach(() => {
relaySessionStrikes.reset()
})
it('session-parks a relay much slower than batch median after two slow waves', () => {
const slow = 'wss://slow.example.com/'
const fast = 'wss://fast.example.com/'
relaySessionStrikes.observeSubscribeBatch([
row(fast, 'eose', 400),
row(slow, 'eose', 12_000)
])
expect(relaySessionStrikes.isReadHttpSkipped(slow)).toBe(false)
relaySessionStrikes.observeSubscribeBatch([
row(fast, 'eose', 500),
row(slow, 'eose', 11_000)
])
expect(relaySessionStrikes.isReadHttpSkipped(slow)).toBe(true)
expect(relaySessionStrikes.isReadHttpSkipped(fast)).toBe(false)
})
it('clears slow parking on fast EOSE via recordReadSuccess', () => {
const url = 'wss://recover.example.com/'
relaySessionStrikes.observeSubscribeBatch([row(url, 'eose', 15_000)])
relaySessionStrikes.observeSubscribeBatch([row(url, 'eose', 14_000)])
expect(relaySessionStrikes.isReadHttpSkipped(url)).toBe(true)
relaySessionStrikes.recordReadSuccess(url)
expect(relaySessionStrikes.isReadHttpSkipped(url)).toBe(false)
})
})

77
src/lib/relay-strikes.ts

@ -1,7 +1,14 @@
import {
RELAY_SLOW_PARK_ABSOLUTE_MS,
RELAY_SLOW_PARK_COOLDOWN_MS,
RELAY_SLOW_PARK_MEDIAN_MULTIPLIER,
RELAY_SLOW_PARK_SIGNALS_THRESHOLD
} from '@/constants'
import type { Event } from 'nostr-tools' import type { Event } from 'nostr-tools'
import { getRelayListFromEvent } from '@/lib/event-metadata' import { getRelayListFromEvent } from '@/lib/event-metadata'
import logger from '@/lib/logger' import logger from '@/lib/logger'
import { canonicalRelaySessionKey, isHttpRelayUrl } from '@/lib/url' import { canonicalRelaySessionKey, isHttpRelayUrl } from '@/lib/url'
import type { RelayOpTerminalRow } from '@/services/relay-operation-log.service'
/** Conservative: 5 read/publish failures → skip until this many ms after last qualifying failure. */ /** Conservative: 5 read/publish failures → skip until this many ms after last qualifying failure. */
const STRIKE_FAILURES_THRESHOLD = 5 const STRIKE_FAILURES_THRESHOLD = 5
@ -31,6 +38,8 @@ type StrikeEntry = {
readFailures: number readFailures: number
readLastStrikeIncrementAt: number readLastStrikeIncrementAt: number
readStrikeSkipUntil: number readStrikeSkipUntil: number
slowSignals: number
slowParkUntil: number
publishFailures: number publishFailures: number
publishLastStrikeIncrementAt: number publishLastStrikeIncrementAt: number
publishStrikeSkipUntil: number publishStrikeSkipUntil: number
@ -47,6 +56,8 @@ function emptyEntry(): StrikeEntry {
readFailures: 0, readFailures: 0,
readLastStrikeIncrementAt: 0, readLastStrikeIncrementAt: 0,
readStrikeSkipUntil: 0, readStrikeSkipUntil: 0,
slowSignals: 0,
slowParkUntil: 0,
publishFailures: 0, publishFailures: 0,
publishLastStrikeIncrementAt: 0, publishLastStrikeIncrementAt: 0,
publishStrikeSkipUntil: 0, publishStrikeSkipUntil: 0,
@ -103,7 +114,7 @@ class RelaySessionStrikes {
if (!key) return false if (!key) return false
const e = this.byKey.get(key) const e = this.byKey.get(key)
if (!e) return false if (!e) return false
return Date.now() < Math.max(e.rateLimitUntil, e.readStrikeSkipUntil) return Date.now() < Math.max(e.rateLimitUntil, e.readStrikeSkipUntil, e.slowParkUntil)
} }
/** True when publish should omit this relay (unless single-target override). */ /** True when publish should omit this relay (unless single-target override). */
@ -172,6 +183,70 @@ class RelaySessionStrikes {
e.readFailures = 0 e.readFailures = 0
e.readStrikeSkipUntil = 0 e.readStrikeSkipUntil = 0
e.readLastStrikeIncrementAt = 0 e.readLastStrikeIncrementAt = 0
e.slowSignals = 0
e.slowParkUntil = 0
}
/**
* After a subscribe/query wave: session-park relays that were much slower than peers (or timed out).
* Returns URLs whose pooled sockets should be closed when idle.
*/
observeSubscribeBatch(rows: readonly RelayOpTerminalRow[]): string[] {
if (rows.length === 0) return []
const now = Date.now()
const socketsToClose: string[] = []
const eoseRows = rows.filter((r) => r.outcome === 'eose')
const sortedLatencies =
eoseRows.length > 0 ? [...eoseRows.map((r) => r.msFromBatchStart)].sort((a, b) => a - b) : []
const medianMs =
sortedLatencies.length > 0
? sortedLatencies[Math.floor((sortedLatencies.length - 1) / 2)]!
: RELAY_SLOW_PARK_ABSOLUTE_MS
const slowThresholdMs =
rows.length > 1
? Math.max(RELAY_SLOW_PARK_ABSOLUTE_MS, Math.round(medianMs * RELAY_SLOW_PARK_MEDIAN_MULTIPLIER))
: RELAY_SLOW_PARK_ABSOLUTE_MS
for (const row of rows) {
const key = sessionKey(row.relayUrl)
if (!key) continue
const timedOut = row.outcome === 'timeout'
const slowEose = row.outcome === 'eose' && row.msFromBatchStart >= slowThresholdMs
const fastEose = row.outcome === 'eose' && row.msFromBatchStart < slowThresholdMs * 0.6
if (timedOut || slowEose) {
const parked = this.recordSlowSignalKey(key, now)
if (parked) socketsToClose.push(row.relayUrl)
if (timedOut) this.recordReadFailureKey(key, 'connection')
continue
}
if (fastEose) {
const e = this.byKey.get(key)
if (e && e.slowSignals > 0) {
e.slowSignals = Math.max(0, e.slowSignals - 1)
}
}
}
return socketsToClose
}
private recordSlowSignalKey(key: string, now: number): boolean {
const e = this.getEntry(key)
if (this.cacheRelayKeys.has(key)) return false
e.slowSignals += 1
if (e.slowSignals < RELAY_SLOW_PARK_SIGNALS_THRESHOLD) return false
e.slowParkUntil = Math.max(e.slowParkUntil, now + RELAY_SLOW_PARK_COOLDOWN_MS)
logger.warn('[RelayStrikes] session-parked slow relay', {
key,
slowSignals: e.slowSignals,
cooldownMs: RELAY_SLOW_PARK_COOLDOWN_MS
})
return true
} }
recordPublishFailure(url: string): void { recordPublishFailure(url: string): void {

7
src/services/client-query.service.ts

@ -351,6 +351,13 @@ export class QueryService {
} }
} }
/** True when this relay still has at least one open REQ subscription (live timeline, etc.). */
relayHasActiveSubscriptions(relayKeyOrUrl: string): boolean {
const key = canonicalRelaySessionKey(normalizeUrl(relayKeyOrUrl) || relayKeyOrUrl.trim())
if (!key) return false
return (this.activeSubCountByRelay.get(key) ?? 0) > 0
}
private canonicalSeenOnEventId(eventId: string): string { private canonicalSeenOnEventId(eventId: string): string {
const t = eventId.trim() const t = eventId.trim()
return /^[0-9a-f]{64}$/i.test(t) ? t.toLowerCase() : t return /^[0-9a-f]{64}$/i.test(t) ? t.toLowerCase() : t

6
src/services/client.service.ts

@ -150,6 +150,7 @@ import {
} from '@/lib/url' } from '@/lib/url'
import { canonicalFeedFilter, canonicalRelayUrls } from '@/features/feed/descriptor' import { canonicalFeedFilter, canonicalRelayUrls } from '@/features/feed/descriptor'
import { feedRelayPolicyUrls } from '@/features/feed/relay-policy' import { feedRelayPolicyUrls } from '@/features/feed/relay-policy'
import { initRelayPoolIdle, touchRelayPoolActivity } from '@/lib/relay-pool-idle'
import { relaySessionStrikes } from '@/lib/relay-strikes' import { relaySessionStrikes } from '@/lib/relay-strikes'
import { isSafari } from '@/lib/utils' import { isSafari } from '@/lib/utils'
import { import {
@ -421,6 +422,7 @@ class ClientService extends EventTarget {
}) })
patchPoolRelayAuthRaceAndFeedback(relay) patchPoolRelayAuthRaceAndFeedback(relay)
applyRelayNip42AckTimeout(relay) applyRelayNip42AckTimeout(relay)
touchRelayPoolActivity(url)
return relay return relay
} }
@ -456,6 +458,10 @@ class ClientService extends EventTarget {
} }
}) })
this.bookstrService = createBookstrService(this.queryService) this.bookstrService = createBookstrService(this.queryService)
initRelayPoolIdle(this.pool, (relayKeyOrUrl) =>
this.queryService.relayHasActiveSubscriptions(relayKeyOrUrl)
)
} }
public static getInstance(): ClientService { public static getInstance(): ClientService {

7
src/services/relay-operation-log.service.ts

@ -1,3 +1,5 @@
import { closeRelayPoolSocketsIfIdle } from '@/lib/relay-pool-idle'
import { relaySessionStrikes } from '@/lib/relay-strikes'
import logger from '@/lib/logger' import logger from '@/lib/logger'
import { normalizeAnyRelayUrl } from '@/lib/url' import { normalizeAnyRelayUrl } from '@/lib/url'
import type { Filter } from 'nostr-tools' import type { Filter } from 'nostr-tools'
@ -307,6 +309,11 @@ export class RelaySubscribeOpBatch {
} }
} }
const parkedCloseUrls = relaySessionStrikes.observeSubscribeBatch(rows)
if (parkedCloseUrls.length > 0) {
closeRelayPoolSocketsIfIdle(parkedCloseUrls)
}
this.onBatchEnd?.(rows) this.onBatchEnd?.(rows)
} }
} }

Loading…
Cancel
Save