@ -29,7 +29,13 @@ import { applyRelayNip42AckTimeout } from '@/lib/relay-nip42-tuning'
import { isIndexRelayTransportFailure , queryIndexRelay } from '@/lib/index-relay-http'
import { isIndexRelayTransportFailure , queryIndexRelay } from '@/lib/index-relay-http'
import logger from '@/lib/logger'
import logger from '@/lib/logger'
import { isHttpRelayUrl , normalizeHttpRelayUrl , normalizeUrl } from '@/lib/url'
import { isHttpRelayUrl , normalizeHttpRelayUrl , normalizeUrl } from '@/lib/url'
import { RelaySubscribeOpBatch } from '@/services/relay-operation-log.service'
import {
RelaySubscribeOpBatch ,
compactFilterForRelayLog ,
humanizeSubscribeTerminalDetail ,
relayHostForSubscribeLog ,
type RelayOpTerminalRow
} from '@/services/relay-operation-log.service'
import { patchRelayNoticeForFetchFailures } from '@/services/relay-notice-fetch-failure'
import { patchRelayNoticeForFetchFailures } from '@/services/relay-notice-fetch-failure'
import type { Filter , Event as NEvent } from 'nostr-tools'
import type { Filter , Event as NEvent } from 'nostr-tools'
import { SimplePool , EventTemplate , VerifiedEvent , nip19 } from 'nostr-tools'
import { SimplePool , EventTemplate , VerifiedEvent , nip19 } from 'nostr-tools'
@ -47,6 +53,74 @@ function filterForRelay(f: Filter, relaySupportsSearch: boolean): Filter {
const HEX_EVENT_ID_RE = /^[0-9a-f]{64}$/i
const HEX_EVENT_ID_RE = /^[0-9a-f]{64}$/i
let queryReqSeq = 0
function logQueryReqConsolidatedEnd (
reqId : number ,
source : string ,
inputRelays : string [ ] ,
httpBases : string [ ] ,
events : NEvent [ ] ,
terminals : RelayOpTerminalRow [ ] ,
getSeenForEvent : ( eventId : string ) = > string [ ]
) : void {
const kindHistogram : Record < string , number > = { }
for ( const e of events ) {
const k = String ( e . kind )
kindHistogram [ k ] = ( kindHistogram [ k ] ? ? 0 ) + 1
}
const norm = ( u : string ) = > normalizeUrl ( u ) || u
type Row = {
url : string
host : string
terminal? : RelayOpTerminalRow [ 'outcome' ]
detail? : string
eventsReturned : number
}
const byKey = new Map < string , Row > ( )
const rowFor = ( url : string ) : Row = > {
const key = norm ( url )
let r = byKey . get ( key )
if ( ! r ) {
r = { url : key , host : relayHostForSubscribeLog ( key ) , eventsReturned : 0 }
byKey . set ( key , r )
}
return r
}
for ( const t of terminals ) {
const r = rowFor ( t . relayUrl )
r . terminal = t . outcome
r . detail = humanizeSubscribeTerminalDetail ( t . outcome , t . detail )
}
for ( const e of events ) {
const seen = getSeenForEvent ( e . id )
for ( const u of seen ) {
rowFor ( u ) . eventsReturned += 1
}
}
for ( const u of inputRelays ) {
rowFor ( u )
}
for ( const b of httpBases ) {
rowFor ( b )
}
const perRelay = [ . . . byKey . values ( ) ] . sort ( ( a , b ) = > a . host . localeCompare ( b . host ) )
logger . debug ( '[QueryService] req_end' , {
reqId ,
source ,
eventCount : events.length ,
kindHistogram ,
perRelay
} )
}
function decodeEventRefForETagFilter ( raw : string ) : string | null {
function decodeEventRefForETagFilter ( raw : string ) : string | null {
const trimmed = raw . trim ( )
const trimmed = raw . trim ( )
if ( ! trimmed ) return null
if ( ! trimmed ) return null
@ -314,19 +388,6 @@ export class QueryService {
const replaceableRace = options ? . replaceableRace ? ? false
const replaceableRace = options ? . replaceableRace ? ? false
const replaceableRaceWaitMs = options ? . replaceableRaceWaitMs ? ? FIRST_RELAY_RESULT_GRACE_MS
const replaceableRaceWaitMs = options ? . replaceableRaceWaitMs ? ? FIRST_RELAY_RESULT_GRACE_MS
const immediateReturn = options ? . immediateReturn ? ? false
const immediateReturn = options ? . immediateReturn ? ? false
const isExternalSearch = eoseTimeout > 1000
if ( isExternalSearch ) {
logger . debug ( 'query: Starting external relay search' , {
relayCount : urls.length ,
relays : urls ,
eoseTimeout ,
globalTimeout ,
replaceableRace ,
immediateReturn ,
filter : sanitizedFilters
} )
}
const filtersForGrace = sanitizedFilters
const filtersForGrace = sanitizedFilters
const maxLimitForGrace = Math . max ( . . . filtersForGrace . map ( ( f ) = > ( f . limit ? ? 0 ) as number ) , 0 )
const maxLimitForGrace = Math . max ( . . . filtersForGrace . map ( ( f ) = > ( f . limit ? ? 0 ) as number ) , 0 )
@ -352,6 +413,22 @@ export class QueryService {
)
)
const wsQueryUrls = urls . filter ( ( u ) = > ! isHttpRelayUrl ( u ) )
const wsQueryUrls = urls . filter ( ( u ) = > ! isHttpRelayUrl ( u ) )
const reqId = ++ queryReqSeq
const source = options ? . relayOpSource ? ? 'QueryService.query'
const inputRelaysOrdered = Array . from ( new Set ( urls . map ( ( u ) = > normalizeUrl ( u ) || u ) . filter ( Boolean ) ) )
const wsRelayCandidates = Array . from ( new Set ( wsQueryUrls . map ( ( u ) = > normalizeUrl ( u ) || u ) . filter ( Boolean ) ) )
logger . debug ( '[QueryService] req_begin' , {
reqId ,
source ,
relays : inputRelaysOrdered ,
httpRelayBases ,
wsRelayCandidates ,
filters : sanitizedFilters.map ( compactFilterForRelayLog ) ,
eoseTimeout ,
globalTimeout
} )
return await new Promise < NEvent [ ] > ( ( resolve ) = > {
return await new Promise < NEvent [ ] > ( ( resolve ) = > {
const events : NEvent [ ] = [ ]
const events : NEvent [ ] = [ ]
const abortHttp = new AbortController ( )
const abortHttp = new AbortController ( )
@ -364,6 +441,27 @@ export class QueryService {
let firstResultTime : number | null = null
let firstResultTime : number | null = null
let globalTimeoutId : ReturnType < typeof setTimeout > | null = null
let globalTimeoutId : ReturnType < typeof setTimeout > | null = null
let queryFinalizing = false
let queryFinalizing = false
let resolvedSnapshot : NEvent [ ] = [ ]
let reqEndLogged = false
let fallbackReqEndTimer : ReturnType < typeof setTimeout > | null = null
const emitReqEnd = ( terminals : RelayOpTerminalRow [ ] , snapshot : NEvent [ ] ) = > {
if ( reqEndLogged ) return
reqEndLogged = true
if ( fallbackReqEndTimer != null ) {
clearTimeout ( fallbackReqEndTimer )
fallbackReqEndTimer = null
}
logQueryReqConsolidatedEnd (
reqId ,
source ,
inputRelaysOrdered ,
httpRelayBases ,
snapshot ,
terminals ,
( id ) = > this . getSeenEventRelayUrls ( id )
)
}
const httpInflight =
const httpInflight =
httpRelayBases . length === 0
httpRelayBases . length === 0
@ -388,9 +486,7 @@ export class QueryService {
} catch ( e ) {
} catch ( e ) {
if ( ( e as Error ) . name === 'AbortError' ) return
if ( ( e as Error ) . name === 'AbortError' ) return
relaySessionStrikes . recordReadFailure ( base , 'http' )
relaySessionStrikes . recordReadFailure ( base , 'http' )
if ( isIndexRelayTransportFailure ( e ) ) {
if ( ! isIndexRelayTransportFailure ( e ) ) {
logger . debug ( '[QueryService] HTTP index relay unreachable' , { base , error : e } )
} else {
logger . warn ( '[QueryService] HTTP index relay query failed' , { base , error : e } )
logger . warn ( '[QueryService] HTTP index relay query failed' , { base , error : e } )
}
}
}
}
@ -443,10 +539,20 @@ export class QueryService {
if ( replaceableRaceTimeoutId ) clearTimeout ( replaceableRaceTimeoutId )
if ( replaceableRaceTimeoutId ) clearTimeout ( replaceableRaceTimeoutId )
if ( globalTimeoutId ) clearTimeout ( globalTimeoutId )
if ( globalTimeoutId ) clearTimeout ( globalTimeoutId )
sub . close ( )
const resolvedList =
const resolvedList =
replaceableRace && events . length > 0 ? resolveReplaceableRaceEvents ( ) : events
replaceableRace && events . length > 0 ? resolveReplaceableRaceEvents ( ) : events
resolvedSnapshot = resolvedList
if ( wsQueryUrls . length === 0 ) {
emitReqEnd ( [ ] , resolvedList )
} else {
fallbackReqEndTimer = setTimeout ( ( ) = > {
emitReqEnd ( [ ] , resolvedList )
} , Math . min ( globalTimeout + 2500 , 45 _000 ) )
}
sub . close ( )
resolve ( resolvedList )
resolve ( resolvedList )
}
}
@ -557,7 +663,7 @@ export class QueryService {
}
}
}
}
} ,
} ,
{ source : options?.relayOpSource ? ? 'QueryService.query' , logLevel : 'debug' }
{ source : options?.relayOpSource ? ? 'QueryService.query' , logLevel : 'debug' , quiet : true , onBatchEnd : ( rows ) = > emitReqEnd ( rows , resolvedSnapshot ) }
)
)
const sub = {
const sub = {
@ -578,7 +684,13 @@ export class QueryService {
urls : string [ ] ,
urls : string [ ] ,
filter : Filter | Filter [ ] ,
filter : Filter | Filter [ ] ,
callbacks : SubscribeCallbacks ,
callbacks : SubscribeCallbacks ,
relayOpMeta ? : { source : string ; logLevel ? : 'info' | 'debug' }
relayOpMeta ? : {
source : string
logLevel ? : 'info' | 'debug'
/** When true, suppress `[RelayOp] batch_begin` / `batch_end` (used by {@link QueryService.query}). */
quiet? : boolean
onBatchEnd ? : ( rows : RelayOpTerminalRow [ ] ) = > void
}
) : { close : ( ) = > void } {
) : { close : ( ) = > void } {
const filters = sanitizeFiltersBeforeReq ( filter )
const filters = sanitizeFiltersBeforeReq ( filter )
if ( filters . length === 0 ) {
if ( filters . length === 0 ) {
@ -645,7 +757,11 @@ export class QueryService {
const opSource = relayOpMeta ? . source ? ? 'QueryService.subscribe'
const opSource = relayOpMeta ? . source ? ? 'QueryService.subscribe'
const opBatch =
const opBatch =
groupedRequests . length > 0
groupedRequests . length > 0
? new RelaySubscribeOpBatch ( opSource , groupedRequests , { logLevel : relayOpMeta?.logLevel } )
? new RelaySubscribeOpBatch ( opSource , groupedRequests , {
logLevel : relayOpMeta?.logLevel ,
quiet : relayOpMeta?.quiet ,
onBatchEnd : relayOpMeta?.onBatchEnd
} )
: null
: null
opBatch ? . logBegin ( )
opBatch ? . logBegin ( )