@ -7,6 +7,8 @@ import {
relayFilterIncludesSocialKindBlockedKind ,
relayFilterIncludesSocialKindBlockedKind ,
SOCIAL_KIND_BLOCKED_RELAY_URLS ,
SOCIAL_KIND_BLOCKED_RELAY_URLS ,
MAX_PUBLISH_RELAYS ,
MAX_PUBLISH_RELAYS ,
RELAY_POOL_CONNECTION_TIMEOUT_MS ,
RELAY_READ_ONLY_POOL_CONNECT_TIMEOUT_MS ,
TIMELINE_SHARD_SUBSCRIBE_CONCURRENCY ,
TIMELINE_SHARD_SUBSCRIBE_CONCURRENCY ,
OUTBOX_PUBLISH_RETRY_DELAY_MS ,
OUTBOX_PUBLISH_RETRY_DELAY_MS ,
NIP66_DISCOVERY_RELAY_URLS ,
NIP66_DISCOVERY_RELAY_URLS ,
@ -30,6 +32,15 @@ function canonicalSeenOnEventId(eventId: string): string {
import { shouldDropEventOnIngest } from '@/lib/event-ingest-filter'
import { shouldDropEventOnIngest } from '@/lib/event-ingest-filter'
import { getHttpRelayListFromEvent , getProfileFromEvent , getRelayListFromEvent } from '@/lib/event-metadata'
import { getHttpRelayListFromEvent , getProfileFromEvent , getRelayListFromEvent } from '@/lib/event-metadata'
import logger from '@/lib/logger'
import logger from '@/lib/logger'
import { installNostrRelayAuthRaceMitigation } from '@/lib/nostr-relay-auth-patch'
import { queueRelayAuthSign } from '@/lib/relay-auth-sign-queue'
import {
authenticateNip42Relay ,
isRelayAuthRequiredCloseReason ,
isRelayAuthRequiredErrorMessage ,
isRelaySubscriptionClosedByCaller
} from '@/lib/relay-nip42-auth'
import { applyRelayNip42AckTimeout } from '@/lib/relay-nip42-tuning'
import { buildDeletionRelayUrls , dispatchTombstonesUpdated } from '@/lib/tombstone-events'
import { buildDeletionRelayUrls , dispatchTombstonesUpdated } from '@/lib/tombstone-events'
import { hexPubkeysEqual , isValidPubkey , pubkeyToNpub , userIdToPubkey } from '@/lib/pubkey'
import { hexPubkeysEqual , isValidPubkey , pubkeyToNpub , userIdToPubkey } from '@/lib/pubkey'
import { getPubkeysFromPTags , tagNameEquals } from '@/lib/tag'
import { getPubkeysFromPTags , tagNameEquals } from '@/lib/tag'
@ -84,9 +95,14 @@ import {
RelaySubscribeOpBatch
RelaySubscribeOpBatch
} from '@/services/relay-operation-log.service'
} from '@/services/relay-operation-log.service'
import { QueryService } from './client-query.service'
import { QueryService } from './client-query.service'
import { EventService } from './client-events.service'
import { ReplaceableEventService } from './client-replaceable-events.service'
import { MacroService , createBookstrService } from './client-macro.service'
/** Live timeline REQ: dead relays fail fast; EOSE caps “connected but silent” relays. */
/** Fired on `window` when session relay strike counts change (subscribe in single-relay UI). */
const SUBSCRIBE_RELAY_CONNECTION_TIMEOUT_MS = 2800
export const JUMBLE_SESSION_RELAY_STRIKES_CHANGED = 'jumble:session-relay-strikes-changed' as const
/** Live timeline REQ: EOSE caps “connected but silent” relays. */
const SUBSCRIBE_RELAY_EOSE_TIMEOUT_MS = 4800
const SUBSCRIBE_RELAY_EOSE_TIMEOUT_MS = 4800
/ * *
/ * *
@ -111,6 +127,10 @@ function summarizeFiltersForRelayLog(filters: Filter[]): Record<string, unknown>
return out
return out
}
}
const READ_ONLY_RELAY_CONNECT_BOOST_URLS = new Set (
READ_ONLY_RELAY_URLS . map ( ( u ) = > normalizeUrl ( u ) || u )
)
/** Hostname (+ path when not "/") for readable publish / retry console lines. */
/** Hostname (+ path when not "/") for readable publish / retry console lines. */
function relayHostForUserLog ( url : string ) : string {
function relayHostForUserLog ( url : string ) : string {
const n = normalizeUrl ( url ) || url
const n = normalizeUrl ( url ) || url
@ -122,9 +142,6 @@ function relayHostForUserLog(url: string): string {
return n
return n
}
}
}
}
import { EventService } from './client-events.service'
import { ReplaceableEventService } from './client-replaceable-events.service'
import { MacroService , createBookstrService } from './client-macro.service'
type TTimelineRef = [ string , number ]
type TTimelineRef = [ string , number ]
@ -187,7 +204,7 @@ class ClientService extends EventTarget {
* { @link ClientService . SESSION_RELAY_FAILURE_STRIKE_THRESHOLD } strikes we skip that relay for reads and publishes until reload .
* { @link ClientService . SESSION_RELAY_FAILURE_STRIKE_THRESHOLD } strikes we skip that relay for reads and publishes until reload .
* /
* /
private publishStrikeCount = new Map < string , number > ( )
private publishStrikeCount = new Map < string , number > ( )
private static readonly SESSION_RELAY_FAILURE_STRIKE_THRESHOLD = 2
public static readonly SESSION_RELAY_FAILURE_STRIKE_THRESHOLD = 2
/** Session-only: relay URL -> { successCount, sumLatencyMs } for preferring faster, proven relays when picking "random" relays. */
/** Session-only: relay URL -> { successCount, sumLatencyMs } for preferring faster, proven relays when picking "random" relays. */
private sessionRelayPublishStats = new Map < string , { successCount : number ; sumLatencyMs : number } > ( )
private sessionRelayPublishStats = new Map < string , { successCount : number ; sumLatencyMs : number } > ( )
@ -200,8 +217,26 @@ class ClientService extends EventTarget {
constructor ( ) {
constructor ( ) {
super ( )
super ( )
installNostrRelayAuthRaceMitigation ( )
this . pool = new SimplePool ( )
this . pool = new SimplePool ( )
this . pool . trackRelays = true
this . pool . trackRelays = true
const rawEnsureRelay = this . pool . ensureRelay . bind ( this . pool )
this . pool . ensureRelay = async (
url : string ,
params ? : { connectionTimeout? : number ; abort? : AbortSignal }
) = > {
const n = normalizeUrl ( url ) || url
const base = params ? . connectionTimeout ? ? RELAY_POOL_CONNECTION_TIMEOUT_MS
const connectionTimeout = READ_ONLY_RELAY_CONNECT_BOOST_URLS . has ( n )
? Math . max ( base , RELAY_READ_ONLY_POOL_CONNECT_TIMEOUT_MS )
: base
const relay = await rawEnsureRelay ( url , {
. . . params ,
connectionTimeout
} )
applyRelayNip42AckTimeout ( relay )
return relay
}
// Initialize sub-services
// Initialize sub-services
this . queryService = new QueryService ( this . pool , {
this . queryService = new QueryService ( this . pool , {
@ -288,6 +323,23 @@ class ClientService extends EventTarget {
this . signer = signer
this . signer = signer
this . signerType = signerType
this . signerType = signerType
this . queryService . setSigner ( signer , signerType )
this . queryService . setSigner ( signer , signerType )
/ * *
* NIP - 42 : answer ` AUTH ` on the wire only for read - only aggregators ( ` READ_ONLY_RELAY_URLS ` , e . g . aggr ) .
* They often require AUTH before REQ ; ` master ` - style auth only on ` CLOSED ` is too late . Other relays stay
* on reactive ` relay.auth() ` after ` auth-required ` to avoid double - sign races with the wider pool .
* /
if ( signer && signerType !== 'npub' ) {
this . pool . automaticallyAuth = ( relayURL : string ) = > {
const n = normalizeUrl ( relayURL ) || relayURL
if ( ! READ_ONLY_RELAY_CONNECT_BOOST_URLS . has ( n ) ) return null
return async ( event : EventTemplate ) = > {
const evt = await queueRelayAuthSign ( ( ) = > signer . signEvent ( event ) )
return evt as VerifiedEvent
}
}
} else {
this . pool . automaticallyAuth = undefined
}
}
}
/** NIP-66: fetch relay discovery events (30166) in background to supplement search/NIP support. */
/** NIP-66: fetch relay discovery events (30166) in background to supplement search/NIP support. */
@ -843,6 +895,30 @@ class ClientService extends EventTarget {
/** One failed publish or subscribe connection per normalized URL (accumulates until {@link SESSION_RELAY_FAILURE_STRIKE_THRESHOLD}). */
/** One failed publish or subscribe connection per normalized URL (accumulates until {@link SESSION_RELAY_FAILURE_STRIKE_THRESHOLD}). */
/** NOTICE "failed to fetch events" (relay DB/backend) — same session strike as a failed connection. */
/** NOTICE "failed to fetch events" (relay DB/backend) — same session strike as a failed connection. */
private notifySessionRelayStrikesChanged ( affectedUrl? : string ) : void {
if ( typeof window === 'undefined' ) return
window . dispatchEvent (
new CustomEvent ( JUMBLE_SESSION_RELAY_STRIKES_CHANGED , {
detail : { url : affectedUrl }
} )
)
}
/** Strikes accumulated this session for this relay (connection / NOTICE failures). */
getSessionRelayStrikeCountForUrl ( url : string ) : number {
const n = normalizeAnyRelayUrl ( url ) || url
return this . publishStrikeCount . get ( n ) ? ? 0
}
getSessionRelayFailureStrikeThreshold ( ) : number {
return ClientService . SESSION_RELAY_FAILURE_STRIKE_THRESHOLD
}
/** True when this relay is skipped for reads/publishes until strikes are cleared. */
isSessionRelayStrikedForReads ( url : string ) : boolean {
return this . getSessionRelayStrikeCountForUrl ( url ) >= this . getSessionRelayFailureStrikeThreshold ( )
}
private recordRelayNoticeFetchFailure ( url : string , noticeMessage : string ) {
private recordRelayNoticeFetchFailure ( url : string , noticeMessage : string ) {
const n = normalizeAnyRelayUrl ( url ) || url
const n = normalizeAnyRelayUrl ( url ) || url
if ( ! n ) return
if ( ! n ) return
@ -872,6 +948,7 @@ class ClientService extends EventTarget {
strikes : count
strikes : count
} )
} )
}
}
this . notifySessionRelayStrikesChanged ( n )
}
}
private filterSessionStrikedRelays ( urls : string [ ] ) : string [ ] {
private filterSessionStrikedRelays ( urls : string [ ] ) : string [ ] {
@ -888,6 +965,7 @@ class ClientService extends EventTarget {
if ( this . publishStrikeCount . size === 0 ) return
if ( this . publishStrikeCount . size === 0 ) return
logger . info ( '[Relay] Session relay strikes cleared' , { relayCount : this.publishStrikeCount.size } )
logger . info ( '[Relay] Session relay strikes cleared' , { relayCount : this.publishStrikeCount.size } )
this . publishStrikeCount . clear ( )
this . publishStrikeCount . clear ( )
this . notifySessionRelayStrikesChanged ( )
}
}
/ * *
/ * *
@ -900,6 +978,7 @@ class ClientService extends EventTarget {
const had = this . publishStrikeCount . delete ( n )
const had = this . publishStrikeCount . delete ( n )
if ( had ) {
if ( had ) {
logger . info ( '[Relay] Session strikes cleared for relay (manual)' , { url : n } )
logger . info ( '[Relay] Session strikes cleared for relay (manual)' , { url : n } )
this . notifySessionRelayStrikesChanged ( n )
}
}
return had
return had
}
}
@ -926,6 +1005,7 @@ class ClientService extends EventTarget {
batchUrlCount : unique.length ,
batchUrlCount : unique.length ,
strikeEntriesCleared : cleared
strikeEntriesCleared : cleared
} )
} )
this . notifySessionRelayStrikesChanged ( )
return this . filterSessionStrikedRelays ( unique )
return this . filterSessionStrikedRelays ( unique )
}
}
return filtered
return filtered
@ -1278,12 +1358,14 @@ class ClientService extends EventTarget {
logger . warn ( ` [PublishEvent] Publish failed, checking if auth required ` , { url , error : error.message } )
logger . warn ( ` [PublishEvent] Publish failed, checking if auth required ` , { url , error : error.message } )
if (
if (
error instanceof Error &&
error instanceof Error &&
error . message . startsWith ( 'auth-required' ) &&
isRelayAuthRequiredErrorMessage ( error . message ) &&
that . canSignerAuthenticateRelay ( )
that . canSignerAuthenticateRelay ( )
) {
) {
logger . debug ( ` [PublishEvent] Auth required, attempting authentication ` , { url } )
logger . debug ( ` [PublishEvent] Auth required, attempting authentication ` , { url } )
return relay
applyRelayNip42AckTimeout ( relay )
. auth ( ( authEvt : EventTemplate ) = > that . signer ! . signEvent ( authEvt ) )
return authenticateNip42Relay ( relay , ( authEvt : EventTemplate ) = >
queueRelayAuthSign ( ( ) = > that . signer ! . signEvent ( authEvt ) )
)
. then ( ( ) = > {
. then ( ( ) = > {
logger . debug ( ` [PublishEvent] Auth successful, retrying publish ` , { url } )
logger . debug ( ` [PublishEvent] Auth successful, retrying publish ` , { url } )
return relay . publish ( event )
return relay . publish ( event )
@ -1852,6 +1934,9 @@ class ClientService extends EventTarget {
: undefined
: undefined
const subs : { relayKey : string ; close : ( ) = > void } [ ] = [ ]
const subs : { relayKey : string ; close : ( ) = > void } [ ] = [ ]
/** Ignore a follow-up `closed by caller` while NIP-42 auth + resubscribe is in flight (parent `close()` must not finalize the batch early). */
const nip42ResubscribePending = new Set < number > ( )
const nip42HasAuthedOnce = new Set < number > ( )
const allOpened = Promise . all (
const allOpened = Promise . all (
groupedRequests . map ( async ( { url , filters : relayFilters } , i ) = > {
groupedRequests . map ( async ( { url , filters : relayFilters } , i ) = > {
await that . queryService . acquireGlobalRelayConnectionSlot ( )
await that . queryService . acquireGlobalRelayConnectionSlot ( )
@ -1860,7 +1945,7 @@ class ClientService extends EventTarget {
await that . queryService . acquireSubSlot ( relayKey )
await that . queryService . acquireSubSlot ( relayKey )
let relay : AbstractRelay
let relay : AbstractRelay
try {
try {
relay = await that . pool . ensureRelay ( url , { connectionTimeout : SUBSCRIBE_ RELAY_CONNECTION_TIMEOUT_MS } )
relay = await that . pool . ensureRelay ( url , { connectionTimeout : RELAY_POOL _CONNECTION_TIMEOUT_MS } )
patchRelayNoticeForFetchFailures ( relay , relayKey , ( u , m ) = >
patchRelayNoticeForFetchFailures ( relay , relayKey , ( u , m ) = >
that . recordRelayNoticeFetchFailure ( u , m )
that . recordRelayNoticeFetchFailure ( u , m )
)
)
@ -1888,15 +1973,24 @@ class ClientService extends EventTarget {
} ,
} ,
oneose : ( ) = > handleEose ( i ) ,
oneose : ( ) = > handleEose ( i ) ,
onclose : ( reason : string ) = > {
onclose : ( reason : string ) = > {
if ( isRelaySubscriptionClosedByCaller ( reason ) && nip42ResubscribePending . has ( i ) ) {
return
}
releaseOnce ( )
releaseOnce ( )
if ( reason . startsWith ( 'auth-required: ' ) && that . canSignerAuthenticateRelay ( ) ) {
if (
relay
isRelayAuthRequiredCloseReason ( reason ) &&
. auth ( async ( authEvt : EventTemplate ) = > {
that . canSignerAuthenticateRelay ( ) &&
const evt = await that . signer ! . signEvent ( authEvt )
! nip42HasAuthedOnce . has ( i )
if ( ! evt ) throw new Error ( 'sign event failed' )
) {
return evt as VerifiedEvent
nip42ResubscribePending . add ( i )
} )
applyRelayNip42AckTimeout ( relay )
authenticateNip42Relay ( relay , async ( authEvt : EventTemplate ) = > {
const evt = await queueRelayAuthSign ( ( ) = > that . signer ! . signEvent ( authEvt ) )
if ( ! evt ) throw new Error ( 'sign event failed' )
return evt as VerifiedEvent
} )
. then ( async ( ) = > {
. then ( async ( ) = > {
nip42HasAuthedOnce . add ( i )
await that . queryService . acquireGlobalRelayConnectionSlot ( )
await that . queryService . acquireGlobalRelayConnectionSlot ( )
try {
try {
await that . queryService . acquireSubSlot ( relayKey )
await that . queryService . acquireSubSlot ( relayKey )
@ -1905,12 +1999,13 @@ class ClientService extends EventTarget {
let liveRelay : AbstractRelay
let liveRelay : AbstractRelay
try {
try {
liveRelay = await that . pool . ensureRelay ( url , {
liveRelay = await that . pool . ensureRelay ( url , {
connectionTimeout : SUBSCRIBE_ RELAY_CONNECTION_TIMEOUT_MS
connectionTimeout : RELAY_POOL _CONNECTION_TIMEOUT_MS
} )
} )
patchRelayNoticeForFetchFailures ( liveRelay , relayKey , ( u , m ) = >
patchRelayNoticeForFetchFailures ( liveRelay , relayKey , ( u , m ) = >
that . recordRelayNoticeFetchFailure ( u , m )
that . recordRelayNoticeFetchFailure ( u , m )
)
)
} catch ( err ) {
} catch ( err ) {
nip42ResubscribePending . delete ( i )
that . recordSessionRelayFailure ( url )
that . recordSessionRelayFailure ( url )
that . queryService . releaseSubSlot ( relayKey )
that . queryService . releaseSubSlot ( relayKey )
handleClose ( i , ( err as Error ) ? . message ? ? String ( err ) )
handleClose ( i , ( err as Error ) ? . message ? ? String ( err ) )
@ -1952,7 +2047,9 @@ class ClientService extends EventTarget {
sub2 . close ( )
sub2 . close ( )
}
}
} )
} )
nip42ResubscribePending . delete ( i )
} catch ( err ) {
} catch ( err ) {
nip42ResubscribePending . delete ( i )
releaseSlot2 ( )
releaseSlot2 ( )
handleClose ( i , ( err as Error ) ? . message ? ? String ( err ) )
handleClose ( i , ( err as Error ) ? . message ? ? String ( err ) )
}
}
@ -1960,12 +2057,13 @@ class ClientService extends EventTarget {
that . queryService . releaseGlobalRelayConnectionSlot ( )
that . queryService . releaseGlobalRelayConnectionSlot ( )
}
}
} )
} )
. catch ( ( err ) = > {
. catch ( ( ) = > {
handleClose ( i , ` auth failed: ${ ( err as Error ) ? . message ? ? err } ` )
nip42ResubscribePending . delete ( i )
handleClose ( i , reason )
} )
} )
return
return
}
}
if ( reason . startsWith ( 'auth-required: ' ) ) {
if ( isRelayAuthRequiredCloseReason ( reason ) ) {
startLogin ? . ( )
startLogin ? . ( )
}
}
handleClose ( i , reason )
handleClose ( i , reason )
@ -2427,7 +2525,7 @@ class ClientService extends EventTarget {
}
}
const relayForConn = usableAfterStrikes [ 0 ] !
const relayForConn = usableAfterStrikes [ 0 ] !
try {
try {
await this . pool . ensureRelay ( relayForConn , { connectionTimeout : 12_000 } )
await this . pool . ensureRelay ( relayForConn , { connectionTimeout : RELAY_POOL_CONNECTION_TIMEOUT_MS } )
} catch ( e ) {
} catch ( e ) {
this . recordSessionRelayFailure ( relayForConn )
this . recordSessionRelayFailure ( relayForConn )
const msg = e instanceof Error ? e.message : String ( e )
const msg = e instanceof Error ? e.message : String ( e )