@ -49,19 +49,16 @@ import { AbstractRelay } from 'nostr-tools/abstract-relay'
@@ -49,19 +49,16 @@ import { AbstractRelay } from 'nostr-tools/abstract-relay'
import indexedDb from './indexed-db.service'
import nip66Service from './nip66.service'
import { QueryService } from './client-query.service'
/** Live timeline REQ: dead relays fail fast; EOSE caps “connected but silent” relays. */
const SUBSCRIBE_RELAY_CONNECTION_TIMEOUT_MS = 2800
const SUBSCRIBE_RELAY_EOSE_TIMEOUT_MS = 4800
import { EventService } from './client-events.service'
import { ReplaceableEventService } from './client-replaceable-events.service'
import { MacroService , createBookstrService } from './client-macro.service'
type TTimelineRef = [ string , number ]
/ * *
* Timeline bootstrap used to await up to ` filter.limit ` IndexedDB reads before opening a live REQ ,
* which blocked first paint for many seconds . We only prefetch this many newest refs ; the subscription
* streams the rest immediately .
* /
const TIMELINE_CACHE_PREFETCH_CAP = 48
class ClientService extends EventTarget {
static instance : ClientService
@ -861,18 +858,13 @@ class ClientService extends EventTarget {
@@ -861,18 +858,13 @@ class ClientService extends EventTarget {
{
startLogin ,
needSort = true ,
useCache = false ,
omitDefaultSinceWhenUseCache = false ,
firstRelayResultGraceMs = FIRST_RELAY_RESULT_GRACE_MS
} : {
startLogin ? : ( ) = > void
needSort? : boolean
useCache? : boolean
/** When useCache is true but there are no timeline refs yet, skip the default 24h `since` so REQ stays unbounded (spell feeds / catalog). */
omitDefaultSinceWhenUseCache? : boolean
/ * *
* After the first live event before EOSE , wait this long then treat initial load as EOSE ( query - style finalize ) .
* Spells pass { @link FIRST_RELAY_RESULT_GRACE_MS } explicitly ; feeds may override .
* Ignored by { @link ClientService . subscribeTimeline } ( kept for compatibility ) . Initial completion is
* aggregate relay EOSE only ; per - event results stream via ` onEvents ` without faking EOSE .
* /
firstRelayResultGraceMs? : number
} = { }
@ -882,30 +874,37 @@ class ClientService extends EventTarget {
@@ -882,30 +874,37 @@ class ClientService extends EventTarget {
let eventIdSet = new Set < string > ( )
let events : NEvent [ ] = [ ]
let eosedCount = 0
/** One merged buffer — slice using the largest child `limit` so a later child with a smaller limit cannot drop other relays’ events. */
const mergedTimelineLimit = Math . max (
500 ,
. . . subRequests . map ( ( { filter } ) = >
typeof filter . limit === 'number' && filter . limit > 0 ? filter.limit : 0
)
)
/** First merged batch goes out synchronously so the list paints without waiting a frame. */
let outerMergedDelivered = false
/** One React update per animation frame after the first paint — limits setEvents/profile churn. */
let outerFlushRaf : number | null = null
const scheduleOuterFlush = ( ) = > {
const snapshot = events . length ? [ . . . events ] : [ ]
const allEosed = eosedCount >= requestCount
if ( ! outerMergedDelivered && ( snapshot . length > 0 || allEosed ) ) {
outerMergedDelivered = true
if ( outerFlushRaf != null ) {
cancelAnimationFrame ( outerFlushRaf )
outerFlushRaf = null
}
let outerFlushQueued = false
let outerFlushBump = 0
const scheduleOuterFlush = ( immediate = false ) = > {
const run = ( ) = > {
outerFlushQueued = false
const snapshot = events . length ? [ . . . events ] : [ ]
const allEosed = eosedCount >= requestCount
onEvents ( snapshot , allEosed )
}
if ( immediate || eosedCount >= requestCount || events . length <= 1 ) {
outerFlushBump ++
outerFlushQueued = false
run ( )
return
}
if ( outerFlushRaf != null ) {
cancelAnimationFrame ( outerFlushRaf )
if ( ! outerFlushQueued ) {
outerFlushQueued = true
const b = outerFlushBump
queueMicrotask ( ( ) = > {
if ( b !== outerFlushBump ) return
run ( )
} )
}
outerFlushRaf = requestAnimationFrame ( ( ) = > {
outerFlushRaf = null
onEvents ( events . length ? [ . . . events ] : [ ] , eosedCount >= requestCount )
} )
}
const subs = await Promise . all (
@ -924,10 +923,12 @@ class ClientService extends EventTarget {
@@ -924,10 +923,12 @@ class ClientService extends EventTarget {
eventIdSet . add ( evt . id )
events . push ( evt )
} )
events = events . sort ( ( a , b ) = > b . created_at - a . created_at ) . slice ( 0 , filter . limit )
events = events
. sort ( ( a , b ) = > b . created_at - a . created_at )
. slice ( 0 , mergedTimelineLimit )
eventIdSet = new Set ( events . map ( ( evt ) = > evt . id ) )
scheduleOuterFlush ( )
scheduleOuterFlush ( ! ! _eosed )
} ,
onNew : ( evt ) = > {
if ( newEventIdSet . has ( evt . id ) ) return
@ -936,7 +937,7 @@ class ClientService extends EventTarget {
@@ -936,7 +937,7 @@ class ClientService extends EventTarget {
} ,
onClose
} ,
{ startLogin , needSort , useCache , omitDefaultSinceWhenUseCache , firstRelayResultGraceMs }
{ startLogin , needSort , firstRelayResultGraceMs }
)
} )
)
@ -944,18 +945,8 @@ class ClientService extends EventTarget {
@@ -944,18 +945,8 @@ class ClientService extends EventTarget {
const key = this . generateMultipleTimelinesKey ( subRequests )
this . timelines [ key ] = subs . map ( ( sub ) = > sub . timelineKey )
if ( outerFlushRaf != null ) {
cancelAnimationFrame ( outerFlushRaf )
outerFlushRaf = null
onEvents ( events . length ? [ . . . events ] : [ ] , eosedCount >= requestCount )
}
return {
closer : ( ) = > {
if ( outerFlushRaf != null ) {
cancelAnimationFrame ( outerFlushRaf )
outerFlushRaf = null
}
onEvents = ( ) = > { }
onNew = ( ) = > { }
subs . forEach ( ( sub ) = > {
@ -1102,7 +1093,7 @@ class ClientService extends EventTarget {
@@ -1102,7 +1093,7 @@ class ClientService extends EventTarget {
await that . queryService . acquireSubSlot ( relayKey )
let relay : AbstractRelay
try {
relay = await that . pool . ensureRelay ( url , { connectionTimeout : 5000 } )
relay = await that . pool . ensureRelay ( url , { connectionTimeout : SUBSCRIBE_RELAY_CONNECTION_TIMEOUT_MS } )
} catch ( err ) {
that . queryService . releaseSubSlot ( relayKey )
handleClose ( i , ( err as Error ) ? . message ? ? String ( err ) )
@ -1136,7 +1127,9 @@ class ClientService extends EventTarget {
@@ -1136,7 +1127,9 @@ class ClientService extends EventTarget {
// resubscribe on a fresh connection from ensureRelay (fixes SendingOnClosedConnection).
let liveRelay : AbstractRelay
try {
liveRelay = await that . pool . ensureRelay ( url , { connectionTimeout : 5000 } )
liveRelay = await that . pool . ensureRelay ( url , {
connectionTimeout : SUBSCRIBE_RELAY_CONNECTION_TIMEOUT_MS
} )
} catch ( err ) {
that . queryService . releaseSubSlot ( relayKey )
handleClose ( i , ( err as Error ) ? . message ? ? String ( err ) )
@ -1159,7 +1152,7 @@ class ClientService extends EventTarget {
@@ -1159,7 +1152,7 @@ class ClientService extends EventTarget {
handleClose ( i , reason2 )
} ,
alreadyHaveEvent : localAlreadyHaveEvent ,
eoseTimeout : 10_000
eoseTimeout : SUBSCRIBE_RELAY_EOSE_TIMEOUT_MS
} )
subs . push ( {
relayKey ,
@ -1184,7 +1177,7 @@ class ClientService extends EventTarget {
@@ -1184,7 +1177,7 @@ class ClientService extends EventTarget {
handleClose ( i , reason )
} ,
alreadyHaveEvent : localAlreadyHaveEvent ,
eoseTimeout : 10_000
eoseTimeout : SUBSCRIBE_RELAY_EOSE_TIMEOUT_MS
} )
subs . push ( {
relayKey ,
@ -1236,23 +1229,19 @@ class ClientService extends EventTarget {
@@ -1236,23 +1229,19 @@ class ClientService extends EventTarget {
{
startLogin ,
needSort = true ,
useCache = false ,
omitDefaultSinceWhenUseCache = false ,
firstRelayResultGraceMs = FIRST_RELAY_RESULT_GRACE_MS
/** @deprecated No longer used; streaming does not fake EOSE (see flushStreamingSnapshot). Kept for call-site compatibility. */
firstRelayResultGraceMs : _unusedFirstRelayGraceMs = FIRST_RELAY_RESULT_GRACE_MS
} : {
startLogin ? : ( ) = > void
needSort? : boolean
useCache? : boolean
omitDefaultSinceWhenUseCache? : boolean
firstRelayResultGraceMs? : number
} = { }
) {
void _unusedFirstRelayGraceMs
const relays = Array . from ( new Set ( urls ) )
const key = this . generateTimelineKey ( relays , filter )
let timeline = this . timelines [ key ]
// CRITICAL FIX: Always initialize timeline object, even when useCache is false
// This ensures refs are always available for pagination tracking
if ( ! timeline || Array . isArray ( timeline ) ) {
this . timelines [ key ] = {
refs : [ ] ,
@ -1261,207 +1250,83 @@ class ClientService extends EventTarget {
@@ -1261,207 +1250,83 @@ class ClientService extends EventTarget {
}
timeline = this . timelines [ key ]
}
let cachedEvents : NEvent [ ] = [ ]
let since : number | undefined
const oneDayAgo = dayjs ( ) . subtract ( 24 , 'hours' ) . unix ( )
// eslint-disable-next-line @typescript-eslint/no-this-alias
const that = this
let events : NEvent [ ] = [ ]
let eosedAt : number | null = null
let initialBatchScheduled = false
let lastDeliveredCount = 0
let progressiveIntervalId : ReturnType < typeof setInterval > | null = null
let firstRelayResultGraceTimer : ReturnType < typeof setTimeout > | null = null
const PROGRESSIVE_INTERVAL_MS = 100 // Backup tick while relays stream without new onevent bursts
const MIN_NEW_EVENTS_AFTER_FIRST = 1
const mergeTimelineLiveAndCache = ( ) : NEvent [ ] = > {
const sortedLive = [ . . . events ] . sort ( ( a , b ) = > b . created_at - a . created_at ) . slice ( 0 , filter . limit )
if ( ! needSort || ! useCache || cachedEvents . length === 0 ) {
return sortedLive
}
const byId = new Map < string , NEvent > ( )
for ( const e of cachedEvents ) {
byId . set ( e . id , e )
}
for ( const e of sortedLive ) {
byId . set ( e . id , e )
}
return [ . . . byId . values ( ) ] . sort ( ( a , b ) = > b . created_at - a . created_at ) . slice ( 0 , filter . limit )
}
const deliverProgressive = ( ) = > {
/ * *
* Stream every matching event to the UI immediately . Do * * not * * use a "grace EOSE" timer : it set ` eosedAt `
* to wall - clock time while relays were still returning historical rows , so ` evt.created_at > eosedAt ` was
* almost always false and later relay results were dropped until the feed looked empty / slow .
* Real initial completion is only when { @link ClientService . subscribe } fires aggregate ` oneose ` ( all relays ) .
* /
let streamFlushMicrotask = false
const flushStreamingSnapshot = ( ) = > {
if ( eosedAt ) return
const combined = mergeTimelineLiveAndCache ( )
if ( combined . length === 0 ) return
const newEventCount = combined . length - lastDeliveredCount
const isFirstPaint = lastDeliveredCount === 0
const shouldDeliver =
isFirstPaint
? combined . length >= 1
: newEventCount >= MIN_NEW_EVENTS_AFTER_FIRST || combined . length >= filter . limit * 0.5
if ( shouldDeliver ) {
lastDeliveredCount = combined . length
onEvents ( combined , false )
}
}
// CRITICAL: Only use cache if explicitly enabled (for profile timelines)
// Main feeds (home, notifications) should always fetch fresh from relays
if ( useCache && timeline && ! Array . isArray ( timeline ) && timeline . refs . length && needSort ) {
const refs = timeline . refs
const prefetchN = Math . min ( refs . length , filter . limit , TIMELINE_CACHE_PREFETCH_CAP )
// Spell / catalog feeds: refs already carry created_at — set `since` immediately and open the live REQ
// without awaiting dozens of IndexedDB reads (that delayed first events by seconds).
if ( omitDefaultSinceWhenUseCache && refs [ 0 ] ! [ 1 ] >= oneDayAgo ) {
since = refs [ 0 ] ! [ 1 ] + 1
void ( async ( ) = > {
try {
const loaded = (
await Promise . all ( refs . slice ( 0 , prefetchN ) . map ( ( [ id ] ) = > that . eventService . fetchEvent ( id ) ) )
) . filter ( ( evt ) : evt is NEvent = > ! ! evt )
if ( ! loaded . length ) return
loaded . sort ( ( a , b ) = > b . created_at - a . created_at )
const recent = loaded . filter ( ( evt ) = > evt . created_at >= oneDayAgo )
if ( ! recent . length ) return
cachedEvents = recent
deliverProgressive ( )
} catch {
// ignore
}
} ) ( )
} else if ( ! omitDefaultSinceWhenUseCache ) {
cachedEvents = (
await Promise . all ( refs . slice ( 0 , prefetchN ) . map ( ( [ id ] ) = > this . eventService . fetchEvent ( id ) ) )
) . filter ( ( evt ) : evt is NEvent = > ! ! evt )
if ( cachedEvents . length ) {
cachedEvents . sort ( ( a , b ) = > b . created_at - a . created_at )
const recentCachedEvents = cachedEvents . filter ( ( evt ) = > evt . created_at >= oneDayAgo )
if ( recentCachedEvents . length > 0 ) {
onEvents ( [ . . . recentCachedEvents ] , false )
since = recentCachedEvents [ 0 ] . created_at + 1
} else {
cachedEvents = [ ]
}
const emit = ( ) = > {
streamFlushMicrotask = false
if ( eosedAt ) return
if ( needSort ) {
const sorted = [ . . . events ] . sort ( ( a , b ) = > b . created_at - a . created_at ) . slice ( 0 , filter . limit )
onEvents ( sorted , false )
} else {
onEvents ( [ . . . events ] , false )
}
}
}
// CRITICAL FIX: Only set since parameter if caching is enabled
// When useCache is false, we want to stream raw from relays without time restrictions
// This allows relay feeds to show all available events, not just recent ones
if ( ! since && needSort && useCache && ! omitDefaultSinceWhenUseCache ) {
since = oneDayAgo
if ( events . length <= 1 ) {
streamFlushMicrotask = false
emit ( )
return
}
if ( ! streamFlushMicrotask ) {
streamFlushMicrotask = true
queueMicrotask ( emit )
}
}
const handleTimelineEose = ( eosed : boolean ) = > {
if ( eosed && eosedAt != null ) return
if ( ! eosed ) return
if ( eosedAt != null ) return
eosedAt = dayjs ( ) . unix ( )
if ( eosed && ! eosedAt ) {
if ( firstRelayResultGraceTimer != null ) {
clearTimeout ( firstRelayResultGraceTimer )
firstRelayResultGraceTimer = null
}
eosedAt = dayjs ( ) . unix ( )
if ( progressiveIntervalId ) {
clearInterval ( progressiveIntervalId )
progressiveIntervalId = null
}
}
// (algo feeds) no need to sort and cache
if ( ! needSort ) {
return onEvents ( [ . . . events ] , ! ! eosedAt )
}
if ( ! eosed ) {
events = events . sort ( ( a , b ) = > b . created_at - a . created_at ) . slice ( 0 , filter . limit )
// Only include cached events if caching is enabled
return onEvents ( [ . . . ( useCache ? events . concat ( cachedEvents ) . slice ( 0 , filter . limit ) : events ) ] , false )
return onEvents ( [ . . . events ] , true )
}
events = events . sort ( ( a , b ) = > b . created_at - a . created_at ) . slice ( 0 , filter . limit )
// Only update timeline cache if caching is enabled
if ( useCache ) {
const timeline = that . timelines [ key ]
// no cache yet
if ( ! timeline || Array . isArray ( timeline ) || ! timeline . refs . length ) {
that . timelines [ key ] = {
refs : events.map ( ( evt ) = > [ evt . id , evt . created_at ] ) ,
filter ,
urls
}
return onEvents ( [ . . . events ] , true )
const tl = that . timelines [ key ]
if ( ! tl || Array . isArray ( tl ) ) {
that . timelines [ key ] = {
refs : events.map ( ( evt ) = > [ evt . id , evt . created_at ] ) ,
filter ,
urls
}
// Prevent concurrent requests from duplicating the same event
const firstRefCreatedAt = timeline . refs [ 0 ] [ 1 ]
} else {
const firstRefCreatedAt = tl . refs . length > 0 ? tl . refs [ 0 ] [ 1 ] : dayjs ( ) . unix ( )
const newRefs = events
. filter ( ( evt ) = > evt . created_at > firstRefCreatedAt )
. map ( ( evt ) = > [ evt . id , evt . created_at ] as TTimelineRef )
if ( events . length >= filter . limit ) {
// if new refs are more than limit, means old refs are too old, replace them
timeline . refs = newRefs
onEvents ( [ . . . events ] , true )
} else {
// merge new refs with old refs
timeline . refs = newRefs . concat ( timeline . refs )
onEvents ( [ . . . events . concat ( cachedEvents ) . slice ( 0 , filter . limit ) ] , true )
}
} else {
// No caching for initial load, but still need to initialize timeline.refs for loadMoreTimeline pagination
const timeline = that . timelines [ key ]
if ( ! timeline || Array . isArray ( timeline ) ) {
// Initialize timeline with refs for pagination (even though we don't use cache for initial load)
that . timelines [ key ] = {
refs : events.map ( ( evt ) = > [ evt . id , evt . created_at ] ) ,
filter ,
urls
}
tl . refs = newRefs
} else {
// Update refs with new events for pagination tracking
const firstRefCreatedAt = timeline . refs . length > 0 ? timeline . refs [ 0 ] [ 1 ] : dayjs ( ) . unix ( )
const newRefs = events
. filter ( ( evt ) = > evt . created_at > firstRefCreatedAt )
. map ( ( evt ) = > [ evt . id , evt . created_at ] as TTimelineRef )
if ( events . length >= filter . limit ) {
timeline . refs = newRefs
} else {
timeline . refs = newRefs . concat ( timeline . refs )
}
tl . refs = newRefs . concat ( tl . refs )
}
// Return events directly (no cache concatenation)
onEvents ( [ . . . events ] , true )
}
onEvents ( [ . . . events ] , true )
}
const subCloser = this . subscribe ( relays , since ? { . . . filter , since } : filter , {
const subCloser = this . subscribe ( relays , filter , {
startLogin ,
onevent : ( evt : NEvent ) = > {
that . addEventToCache ( evt )
// not eosed yet, push to events
if ( ! eosedAt ) {
events . push ( evt )
if ( firstRelayResultGraceTimer == null ) {
firstRelayResultGraceTimer = setTimeout ( ( ) = > {
firstRelayResultGraceTimer = null
handleTimelineEose ( true )
} , firstRelayResultGraceMs )
}
// Deliver on every live event before EOSE (plus interval as a safety net)
if ( needSort && events . length >= 1 ) {
if ( ! initialBatchScheduled ) {
initialBatchScheduled = true
if ( ! progressiveIntervalId ) {
progressiveIntervalId = setInterval ( deliverProgressive , PROGRESSIVE_INTERVAL_MS )
}
}
deliverProgressive ( )
}
flushStreamingSnapshot ( )
return
}
// new event
@ -1469,7 +1334,7 @@ class ClientService extends EventTarget {
@@ -1469,7 +1334,7 @@ class ClientService extends EventTarget {
onNew ( evt )
}
// Update timeline refs for pagination tracking (even when useCache is false)
// Update timeline refs for pagination tracking
// This is needed for loadMoreTimeline to know what events have been loaded
const timeline = that . timelines [ key ]
if ( ! timeline || Array . isArray ( timeline ) ) {
@ -1506,14 +1371,6 @@ class ClientService extends EventTarget {
@@ -1506,14 +1371,6 @@ class ClientService extends EventTarget {
return {
timelineKey : key ,
closer : ( ) = > {
if ( firstRelayResultGraceTimer != null ) {
clearTimeout ( firstRelayResultGraceTimer )
firstRelayResultGraceTimer = null
}
if ( progressiveIntervalId ) {
clearInterval ( progressiveIntervalId )
progressiveIntervalId = null
}
onEvents = ( ) = > { }
onNew = ( ) = > { }
subCloser . close ( )
@ -1525,86 +1382,38 @@ class ClientService extends EventTarget {
@@ -1525,86 +1382,38 @@ class ClientService extends EventTarget {
const timeline = this . timelines [ key ]
if ( ! timeline || Array . isArray ( timeline ) ) return [ ]
const { filter , urls , refs } = timeline
// Try to load from cache if refs exist
let cachedEvents : NEvent [ ] = [ ]
if ( refs && refs . length > 0 ) {
const startIdx = refs . findIndex ( ( [ , createdAt ] ) = > createdAt <= until )
if ( startIdx >= 0 ) {
cachedEvents = (
await Promise . all (
refs . slice ( startIdx , startIdx + limit ) . map ( ( [ id ] ) = > this . eventService . fetchEvent ( id ) )
)
) . filter ( ( evt ) : evt is NEvent = > ! ! evt ) as NEvent [ ]
}
if ( cachedEvents . length >= limit ) {
return cachedEvents
}
}
const { filter , urls } = timeline
// CRITICAL FIX: Always query relay for more events, even if we have some cached
// This ensures we continue fetching from relays when scrolling, not just from cache
// Calculate the correct until timestamp based on what we already have
until = cachedEvents . length ? cachedEvents [ cachedEvents . length - 1 ] . created_at - 1 : until
limit = limit - cachedEvents . length
// CRITICAL: Ensure we always query the relay, even if limit is small
// This prevents the feed from stopping when we have few cached events
if ( limit <= 0 ) {
limit = 100 // Minimum limit to ensure we get more events from relay
}
// Query relay for more events with proper until parameter for pagination
let events = await this . query ( urls , { . . . filter , until , limit } )
events . forEach ( ( evt ) = > {
this . addEventToCache ( evt )
} )
events = events . sort ( ( a , b ) = > b . created_at - a . created_at ) . slice ( 0 , limit )
// Update refs for pagination tracking (even when useCache is false)
// Initialize refs if empty
if ( ! timeline . refs ) {
timeline . refs = [ ]
}
// Prevent duplicate events in refs
const existingRefIds = new Set ( timeline . refs . map ( ( [ id ] ) = > id ) )
const newRefs : TTimelineRef [ ] = [ ]
// Add cached events to refs if not already present
for ( const evt of cachedEvents ) {
if ( ! existingRefIds . has ( evt . id ) ) {
newRefs . push ( [ evt . id , evt . created_at ] )
existingRefIds . add ( evt . id )
}
}
// Add new events from relay to refs
for ( const evt of events ) {
if ( ! existingRefIds . has ( evt . id ) ) {
newRefs . push ( [ evt . id , evt . created_at ] )
existingRefIds . add ( evt . id )
}
}
// Sort new refs by created_at descending and merge with existing refs
newRefs . sort ( ( a , b ) = > b [ 1 ] - a [ 1 ] )
// Merge with existing refs, maintaining sorted order
if ( timeline . refs . length > 0 ) {
const lastRefCreatedAt = timeline . refs [ timeline . refs . length - 1 ] [ 1 ]
// Only add events that are older than the last ref (for pagination)
const olderRefs = newRefs . filter ( ( [ , createdAt ] ) = > createdAt < lastRefCreatedAt )
timeline . refs . push ( . . . olderRefs )
// Keep refs sorted
timeline . refs . sort ( ( a , b ) = > b [ 1 ] - a [ 1 ] )
} else {
// No existing refs, add all new refs
timeline . refs . push ( . . . newRefs )
}
return [ . . . cachedEvents , . . . events ]
return events
}
/** =========== Event =========== */