@ -4,6 +4,8 @@ import {
FAST_WRITE_RELAY_URLS ,
FAST_WRITE_RELAY_URLS ,
DOCUMENT_RELAY_URLS ,
DOCUMENT_RELAY_URLS ,
FIRST_RELAY_RESULT_GRACE_MS ,
FIRST_RELAY_RESULT_GRACE_MS ,
HTTP_TIMELINE_POLL_INTERVAL_MS ,
HTTP_TIMELINE_POLL_SINCE_OVERLAP_SEC ,
isDocumentRelayKind ,
isDocumentRelayKind ,
isSocialKindBlockedKind ,
isSocialKindBlockedKind ,
relayFilterIncludesDocumentRelayKind ,
relayFilterIncludesDocumentRelayKind ,
@ -2286,6 +2288,23 @@ class ClientService extends EventTarget {
let eosedAt : number | null = null
let eosedAt : number | null = null
let eventIds = new Set < string > ( )
let eventIds = new Set < string > ( )
const httpTimelinePollBases = Array . from (
new Set (
relays
. filter ( ( u ) = > isHttpRelayUrl ( u ) )
. map ( ( u ) = > normalizeHttpRelayUrl ( u ) || u )
. filter ( Boolean )
)
)
let httpPollIntervalId : ReturnType < typeof setInterval > | null = null
let httpPollCursorUnix = 0
const clearHttpTimelinePoll = ( ) = > {
if ( httpPollIntervalId != null ) {
clearInterval ( httpPollIntervalId )
httpPollIntervalId = null
}
}
let firstResultGraceTimer : ReturnType < typeof setTimeout > | null = null
let firstResultGraceTimer : ReturnType < typeof setTimeout > | null = null
const clearFirstResultGraceTimer = ( ) = > {
const clearFirstResultGraceTimer = ( ) = > {
if ( firstResultGraceTimer != null ) {
if ( firstResultGraceTimer != null ) {
@ -2358,50 +2377,8 @@ class ClientService extends EventTarget {
logger . warn ( '[ClientService] Timeline disk hydrate failed' , err )
logger . warn ( '[ClientService] Timeline disk hydrate failed' , err )
}
}
const handleTimelineEose = ( eosed : boolean ) = > {
const applySubscribedTimelineEvent = ( evt : NEvent ) = > {
if ( ! eosed ) return
if ( eosedAt != null ) return
clearFirstResultGraceTimer ( )
eosedAt = dayjs ( ) . unix ( )
if ( ! needSort ) {
return onEvents ( [ . . . events ] , true )
}
events = events . sort ( ( a , b ) = > b . created_at - a . created_at ) . slice ( 0 , filter . limit )
eventIds = new Set ( events . map ( ( e ) = > e . id ) )
const tl = that . timelines [ key ]
if ( ! tl || Array . isArray ( tl ) ) {
that . timelines [ key ] = {
refs : events.map ( ( evt ) = > [ evt . id , evt . created_at ] ) ,
filter ,
urls
}
} else if ( tl . refs . length === 0 ) {
tl . refs = events . map ( ( evt ) = > [ evt . id , evt . created_at ] as TTimelineRef )
} else {
const firstRefCreatedAt = tl . refs [ 0 ] ! [ 1 ]
const newRefs = events
. filter ( ( evt ) = > evt . created_at > firstRefCreatedAt )
. map ( ( evt ) = > [ evt . id , evt . created_at ] as TTimelineRef )
if ( events . length >= filter . limit ) {
tl . refs = newRefs
} else {
tl . refs = newRefs . concat ( tl . refs )
}
}
onEvents ( [ . . . events ] , true )
that . scheduleTimelinePersist ( key )
}
const subCloser = this . subscribe ( relays , filter , {
startLogin ,
onevent : ( evt : NEvent ) = > {
that . addEventToCache ( evt )
that . addEventToCache ( evt )
// not eosed yet, push to events
if ( ! eosedAt ) {
if ( ! eosedAt ) {
if ( eventIds . has ( evt . id ) ) return
if ( eventIds . has ( evt . id ) ) return
eventIds . add ( evt . id )
eventIds . add ( evt . id )
@ -2414,8 +2391,7 @@ class ClientService extends EventTarget {
if ( eventIds . has ( evt . id ) ) return
if ( eventIds . has ( evt . id ) ) return
const wallClockAtEose = eosedAt
const wallClockAtEose = eosedAt
const isBacklogStraggler =
const isBacklogStraggler = evt . created_at + TIMELINE_STRAGGLER_MAX_AGE_SEC < wallClockAtEose
evt . created_at + TIMELINE_STRAGGLER_MAX_AGE_SEC < wallClockAtEose
if ( isBacklogStraggler ) {
if ( isBacklogStraggler ) {
eventIds . add ( evt . id )
eventIds . add ( evt . id )
@ -2464,16 +2440,107 @@ class ClientService extends EventTarget {
timeline . refs . splice ( idx , 0 , [ evt . id , evt . created_at ] )
timeline . refs . splice ( idx , 0 , [ evt . id , evt . created_at ] )
that . scheduleTimelinePersist ( key )
that . scheduleTimelinePersist ( key )
}
const runHttpTimelinePollQuery = async ( pollFilter : Filter ) = > {
if ( httpTimelinePollBases . length === 0 ) return
try {
await this . query (
httpTimelinePollBases ,
pollFilter ,
( evt : NEvent ) = > {
applySubscribedTimelineEvent ( evt )
} ,
{
firstRelayResultGraceMs : false ,
globalTimeout : 25_000 ,
eoseTimeout : 2500
}
)
} catch ( err ) {
logger . debug ( '[ClientService] HTTP index timeline poll failed' , err )
}
}
const armHttpTimelinePollingAfterInitial = ( ) = > {
clearHttpTimelinePoll ( )
if ( httpTimelinePollBases . length === 0 ) return
const newestCreated = events . length > 0 ? Math . max ( . . . events . map ( ( e ) = > e . created_at ) ) : 0
httpPollCursorUnix = Math . max ( eosedAt ? ? 0 , newestCreated )
httpPollIntervalId = setInterval ( ( ) = > {
const base = { . . . ( filter as Filter ) } as Filter & { until? : number }
delete base . until
const since = Math . max ( 0 , httpPollCursorUnix - HTTP_TIMELINE_POLL_SINCE_OVERLAP_SEC )
const pollLimit = Math . min ( Math . max ( filter . limit ? ? 200 , 1 ) , 500 )
const pollFilter : Filter = { . . . base , since , limit : pollLimit }
void runHttpTimelinePollQuery ( pollFilter ) . then ( ( ) = > {
httpPollCursorUnix = dayjs ( ) . unix ( )
} )
} , HTTP_TIMELINE_POLL_INTERVAL_MS )
}
const handleTimelineEose = ( eosed : boolean ) = > {
if ( ! eosed ) return
if ( eosedAt != null ) return
clearFirstResultGraceTimer ( )
eosedAt = dayjs ( ) . unix ( )
if ( ! needSort ) {
armHttpTimelinePollingAfterInitial ( )
return onEvents ( [ . . . events ] , true )
}
events = events . sort ( ( a , b ) = > b . created_at - a . created_at ) . slice ( 0 , filter . limit )
eventIds = new Set ( events . map ( ( e ) = > e . id ) )
const tl = that . timelines [ key ]
if ( ! tl || Array . isArray ( tl ) ) {
that . timelines [ key ] = {
refs : events.map ( ( evt ) = > [ evt . id , evt . created_at ] ) ,
filter ,
urls : relays
}
} else if ( tl . refs . length === 0 ) {
tl . refs = events . map ( ( evt ) = > [ evt . id , evt . created_at ] as TTimelineRef )
} else {
const firstRefCreatedAt = tl . refs [ 0 ] ! [ 1 ]
const newRefs = events
. filter ( ( evt ) = > evt . created_at > firstRefCreatedAt )
. map ( ( evt ) = > [ evt . id , evt . created_at ] as TTimelineRef )
if ( events . length >= filter . limit ) {
tl . refs = newRefs
} else {
tl . refs = newRefs . concat ( tl . refs )
}
}
armHttpTimelinePollingAfterInitial ( )
onEvents ( [ . . . events ] , true )
that . scheduleTimelinePersist ( key )
}
const subCloser = this . subscribe ( relays , filter , {
startLogin ,
onevent : ( evt : NEvent ) = > {
applySubscribedTimelineEvent ( evt )
} ,
} ,
oneose : handleTimelineEose ,
oneose : handleTimelineEose ,
onclose : onClose
onclose : onClose
} ,
} ,
relayReqLog )
relayReqLog )
if ( httpTimelinePollBases . length > 0 ) {
const backfillFilter = { . . . ( filter as Filter ) } as Filter & { until? : number }
delete backfillFilter . until
void runHttpTimelinePollQuery ( backfillFilter )
}
return {
return {
timelineKey : key ,
timelineKey : key ,
closer : ( ) = > {
closer : ( ) = > {
clearFirstResultGraceTimer ( )
clearFirstResultGraceTimer ( )
clearHttpTimelinePoll ( )
onEvents = ( ) = > { }
onEvents = ( ) = > { }
onNew = ( ) = > { }
onNew = ( ) = > { }
subCloser . close ( )
subCloser . close ( )
@ -2602,8 +2669,19 @@ class ClientService extends EventTarget {
} = { }
} = { }
) {
) {
const originalDedupedRelays = Array . from ( new Set ( urls ) )
const originalDedupedRelays = Array . from ( new Set ( urls ) )
let relays = originalDedupedRelays . filter ( ( url ) = > ! isHttpRelayUrl ( url ) )
const httpRelayBases = Array . from (
if ( relays . length === 0 ) relays = [ . . . FAST_READ_RELAY_URLS ]
new Set (
originalDedupedRelays
. filter ( ( u ) = > isHttpRelayUrl ( u ) )
. map ( ( u ) = > normalizeHttpRelayUrl ( u ) || u )
. filter ( Boolean )
)
)
const wsOriginal = originalDedupedRelays . filter ( ( url ) = > ! isHttpRelayUrl ( url ) )
let relays = [ . . . wsOriginal ]
if ( relays . length === 0 && httpRelayBases . length === 0 ) {
relays = [ . . . FAST_READ_RELAY_URLS ]
}
const filters = Array . isArray ( filter ) ? filter : [ filter ]
const filters = Array . isArray ( filter ) ? filter : [ filter ]
relays = withDocumentRelayUrlsForFilters ( relays , filters )
relays = withDocumentRelayUrlsForFilters ( relays , filters )
const stripSocialBlockedRelays =
const stripSocialBlockedRelays =
@ -2612,10 +2690,11 @@ class ClientService extends EventTarget {
if ( stripSocialBlockedRelays ) {
if ( stripSocialBlockedRelays ) {
const socialKindBlockedSet = new Set ( SOCIAL_KIND_BLOCKED_RELAY_URLS . map ( ( u ) = > normalizeUrl ( u ) || u ) )
const socialKindBlockedSet = new Set ( SOCIAL_KIND_BLOCKED_RELAY_URLS . map ( ( u ) = > normalizeUrl ( u ) || u ) )
const stripped = relays . filter ( ( url ) = > ! socialKindBlockedSet . has ( normalizeUrl ( url ) || url ) )
const stripped = relays . filter ( ( url ) = > ! socialKindBlockedSet . has ( normalizeUrl ( url ) || url ) )
relays = relaysAfterSocialKindBlockedStrip ( originalDedupedRelays , stripped )
relays = relaysAfterSocialKindBlockedStrip ( wsOriginal , stripped )
}
}
relays = this . relayUrlsAfterStrikesOrRecover ( relays )
relays = this . relayUrlsAfterStrikesOrRecover ( relays )
const events = await this . queryService . query ( relays , filter , onevent , {
const queryRelays = dedupeNormalizeRelayUrlsOrdered ( [ . . . relays , . . . httpRelayBases ] )
const events = await this . queryService . query ( queryRelays , filter , onevent , {
eoseTimeout ,
eoseTimeout ,
globalTimeout ,
globalTimeout ,
firstRelayResultGraceMs ,
firstRelayResultGraceMs ,