|
|
|
|
@ -63,6 +63,10 @@ class NoteStatsService {
@@ -63,6 +63,10 @@ class NoteStatsService {
|
|
|
|
|
|
|
|
|
|
// Batch processing
|
|
|
|
|
private pendingEvents = new Set<string>() |
|
|
|
|
/** Open note / explicit UI: drained before {@link pendingEvents} so detail pages are not stuck behind feed cards. */ |
|
|
|
|
private pendingForeground = new Set<string>() |
|
|
|
|
/** If a foreground fetch hit {@link processingCache}, re-queue here so the follow-up run uses the priority lane. */ |
|
|
|
|
private deferredRequeueForeground = new Set<string>() |
|
|
|
|
/** Favorite relays passed from the last fetchNoteStats call per note (used in processSingleEvent). */ |
|
|
|
|
private pendingFetchFavoriteRelays = new Map<string, string[] | null | undefined>() |
|
|
|
|
/** Merged favorite URLs requested while this note was already in {@link processingCache}. */ |
|
|
|
|
@ -112,31 +116,87 @@ class NoteStatsService {
@@ -112,31 +116,87 @@ class NoteStatsService {
|
|
|
|
|
}, this.BATCH_DELAY) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async fetchNoteStats(event: Event, _pubkey?: string | null, favoriteRelays?: string[] | null) { |
|
|
|
|
private statsPendingSize() { |
|
|
|
|
return this.pendingForeground.size + this.pendingEvents.size |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** Up to {@link MAX_BATCH_SIZE} ids, foreground queue first (same insertion order within each set). */ |
|
|
|
|
private takeNextStatsSlice(): string[] { |
|
|
|
|
const out: string[] = [] |
|
|
|
|
for (const id of this.pendingForeground) { |
|
|
|
|
if (out.length >= this.MAX_BATCH_SIZE) break |
|
|
|
|
this.pendingForeground.delete(id) |
|
|
|
|
out.push(id) |
|
|
|
|
} |
|
|
|
|
for (const id of this.pendingEvents) { |
|
|
|
|
if (out.length >= this.MAX_BATCH_SIZE) break |
|
|
|
|
this.pendingEvents.delete(id) |
|
|
|
|
out.push(id) |
|
|
|
|
} |
|
|
|
|
return out |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** Coalesce scroll bursts; flush immediately when backlog is large or a foreground note was queued. */ |
|
|
|
|
private maybeFlushStatsBatch(foreground: boolean) { |
|
|
|
|
if (this.processBatchRunning) { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
const backlogLarge = this.pendingEvents.size >= this.MAX_BATCH_SIZE |
|
|
|
|
if (backlogLarge || foreground) { |
|
|
|
|
if (this.batchTimeout) { |
|
|
|
|
clearTimeout(this.batchTimeout) |
|
|
|
|
this.batchTimeout = null |
|
|
|
|
} |
|
|
|
|
void this.processBatch() |
|
|
|
|
} else { |
|
|
|
|
this.armStatsBatchTimer() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Queue relay-backed stats for `event`. Foreground (`opts.foreground`) is for the focused note page / |
|
|
|
|
* article detail so counts are not starved behind spell-feed cards (large pending backlog). |
|
|
|
|
*/ |
|
|
|
|
async fetchNoteStats( |
|
|
|
|
event: Event, |
|
|
|
|
_pubkey?: string | null, |
|
|
|
|
favoriteRelays?: string[] | null, |
|
|
|
|
opts?: { foreground?: boolean } |
|
|
|
|
) { |
|
|
|
|
const eventId = this.statsKey(event.id) |
|
|
|
|
const idShort = `${eventId.slice(0, 12)}…` |
|
|
|
|
const foreground = opts?.foreground === true |
|
|
|
|
|
|
|
|
|
if (this.pendingEvents.has(eventId)) { |
|
|
|
|
this.mergeFavoriteRelaysIntoPending(eventId, favoriteRelays) |
|
|
|
|
const rememberRoot = () => { |
|
|
|
|
if (event.kind === ExtendedKind.RSS_THREAD_ROOT) { |
|
|
|
|
this.pendingSyntheticRootById.set(eventId, event) |
|
|
|
|
} else { |
|
|
|
|
this.pendingStatsRootEventById.set(eventId, event) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (this.pendingEvents.has(eventId) || this.pendingForeground.has(eventId)) { |
|
|
|
|
this.mergeFavoriteRelaysIntoPending(eventId, favoriteRelays) |
|
|
|
|
rememberRoot() |
|
|
|
|
if (foreground) { |
|
|
|
|
this.pendingEvents.delete(eventId) |
|
|
|
|
this.pendingForeground.add(eventId) |
|
|
|
|
} |
|
|
|
|
logger.debug('[NoteStats] fetchNoteStats: merged into existing pending batch', { |
|
|
|
|
eventId: idShort, |
|
|
|
|
kind: event.kind, |
|
|
|
|
pendingSize: this.pendingEvents.size |
|
|
|
|
pendingForeground: this.pendingForeground.size, |
|
|
|
|
pendingBackground: this.pendingEvents.size |
|
|
|
|
}) |
|
|
|
|
this.maybeFlushStatsBatch(foreground) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (this.processingCache.has(eventId)) { |
|
|
|
|
this.mergeFavoriteRelaysIntoDeferred(eventId, favoriteRelays) |
|
|
|
|
if (event.kind === ExtendedKind.RSS_THREAD_ROOT) { |
|
|
|
|
this.pendingSyntheticRootById.set(eventId, event) |
|
|
|
|
} else { |
|
|
|
|
this.pendingStatsRootEventById.set(eventId, event) |
|
|
|
|
rememberRoot() |
|
|
|
|
if (foreground) { |
|
|
|
|
this.deferredRequeueForeground.add(eventId) |
|
|
|
|
} |
|
|
|
|
logger.debug('[NoteStats] fetchNoteStats: deferred (already processing same id)', { |
|
|
|
|
eventId: idShort, |
|
|
|
|
@ -146,42 +206,48 @@ class NoteStatsService {
@@ -146,42 +206,48 @@ class NoteStatsService {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
this.pendingFetchFavoriteRelays.set(eventId, favoriteRelays ?? null) |
|
|
|
|
this.pendingEvents.add(eventId) |
|
|
|
|
if (event.kind === ExtendedKind.RSS_THREAD_ROOT) { |
|
|
|
|
this.pendingSyntheticRootById.set(eventId, event) |
|
|
|
|
if (foreground) { |
|
|
|
|
this.pendingForeground.add(eventId) |
|
|
|
|
} else { |
|
|
|
|
this.pendingStatsRootEventById.set(eventId, event) |
|
|
|
|
this.pendingEvents.add(eventId) |
|
|
|
|
} |
|
|
|
|
rememberRoot() |
|
|
|
|
|
|
|
|
|
logger.debug('[NoteStats] fetchNoteStats: queued new id', { |
|
|
|
|
eventId: idShort, |
|
|
|
|
kind: event.kind, |
|
|
|
|
pendingSize: this.pendingEvents.size, |
|
|
|
|
immediateBatch: this.pendingEvents.size >= this.MAX_BATCH_SIZE |
|
|
|
|
foreground, |
|
|
|
|
pendingForeground: this.pendingForeground.size, |
|
|
|
|
pendingBackground: this.pendingEvents.size, |
|
|
|
|
immediateBatch: this.statsPendingSize() >= this.MAX_BATCH_SIZE |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
this.armStatsBatchTimer() |
|
|
|
|
if (this.pendingEvents.size >= this.MAX_BATCH_SIZE && !this.processBatchRunning) { |
|
|
|
|
if (this.batchTimeout) { |
|
|
|
|
clearTimeout(this.batchTimeout) |
|
|
|
|
this.batchTimeout = null |
|
|
|
|
this.maybeFlushStatsBatch(foreground) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private scheduleStatsBatchContinuation() { |
|
|
|
|
if (this.pendingForeground.size === 0 && this.pendingEvents.size === 0) return |
|
|
|
|
queueMicrotask(() => { |
|
|
|
|
void this.processBatch() |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private async processBatch() { |
|
|
|
|
if (this.processBatchRunning) { |
|
|
|
|
logger.debug('[NoteStats] processBatch: skipped (already running)', { |
|
|
|
|
pendingSize: this.pendingEvents.size |
|
|
|
|
pendingForeground: this.pendingForeground.size, |
|
|
|
|
pendingBackground: this.pendingEvents.size |
|
|
|
|
}) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
if (this.pendingEvents.size === 0) { |
|
|
|
|
if (this.pendingForeground.size === 0 && this.pendingEvents.size === 0) { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
logger.info('[NoteStats] processBatch: running', { pendingSize: this.pendingEvents.size }) |
|
|
|
|
logger.info('[NoteStats] processBatch: running', { |
|
|
|
|
pendingForeground: this.pendingForeground.size, |
|
|
|
|
pendingBackground: this.pendingEvents.size |
|
|
|
|
}) |
|
|
|
|
this.processBatchRunning = true |
|
|
|
|
if (this.batchTimeout) { |
|
|
|
|
clearTimeout(this.batchTimeout) |
|
|
|
|
@ -189,26 +255,22 @@ class NoteStatsService {
@@ -189,26 +255,22 @@ class NoteStatsService {
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
try { |
|
|
|
|
while (this.pendingEvents.size > 0) { |
|
|
|
|
const eventsToProcess = Array.from(this.pendingEvents).slice(0, this.MAX_BATCH_SIZE) |
|
|
|
|
for (const id of eventsToProcess) { |
|
|
|
|
this.pendingEvents.delete(id) |
|
|
|
|
} |
|
|
|
|
const eventsToProcess = this.takeNextStatsSlice() |
|
|
|
|
logger.info('[NoteStats] processBatch slice', { |
|
|
|
|
count: eventsToProcess.length, |
|
|
|
|
ids: eventsToProcess.map((id) => `${id.slice(0, 12)}…`), |
|
|
|
|
remainingPending: this.pendingEvents.size, |
|
|
|
|
remainingForeground: this.pendingForeground.size, |
|
|
|
|
remainingBackground: this.pendingEvents.size, |
|
|
|
|
concurrency: this.STATS_SLICE_CONCURRENCY |
|
|
|
|
}) |
|
|
|
|
for (let i = 0; i < eventsToProcess.length; i += this.STATS_SLICE_CONCURRENCY) { |
|
|
|
|
const chunk = eventsToProcess.slice(i, i + this.STATS_SLICE_CONCURRENCY) |
|
|
|
|
await Promise.all(chunk.map((eventId) => this.processSingleEvent(eventId))) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} finally { |
|
|
|
|
this.processBatchRunning = false |
|
|
|
|
if (this.pendingEvents.size > 0) { |
|
|
|
|
this.armStatsBatchTimer() |
|
|
|
|
if (this.pendingForeground.size > 0 || this.pendingEvents.size > 0) { |
|
|
|
|
this.scheduleStatsBatchContinuation() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
@ -327,17 +389,23 @@ class NoteStatsService {
@@ -327,17 +389,23 @@ class NoteStatsService {
|
|
|
|
|
if (this.inFlightDeferredFavoriteRelays.has(eventId)) { |
|
|
|
|
const deferred = this.inFlightDeferredFavoriteRelays.get(eventId)! |
|
|
|
|
this.inFlightDeferredFavoriteRelays.delete(eventId) |
|
|
|
|
const requeueForeground = this.deferredRequeueForeground.has(eventId) |
|
|
|
|
this.deferredRequeueForeground.delete(eventId) |
|
|
|
|
if (deferred.length > 0) { |
|
|
|
|
if (this.pendingEvents.has(eventId)) { |
|
|
|
|
if (this.pendingEvents.has(eventId) || this.pendingForeground.has(eventId)) { |
|
|
|
|
this.mergeFavoriteRelaysIntoPending(eventId, deferred) |
|
|
|
|
} else { |
|
|
|
|
this.pendingFetchFavoriteRelays.set(eventId, deferred) |
|
|
|
|
if (requeueForeground) { |
|
|
|
|
this.pendingForeground.add(eventId) |
|
|
|
|
} else { |
|
|
|
|
this.pendingEvents.add(eventId) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Build relay list for note stats: SEARCHABLE + FAST_READ + optional user favorites + seen relays + |
|
|
|
|
|