|
|
|
|
@ -36,15 +36,18 @@ class NoteStatsService {
@@ -36,15 +36,18 @@ class NoteStatsService {
|
|
|
|
|
private noteStatsMap: Map<string, Partial<TNoteStats>> = new Map() |
|
|
|
|
private noteStatsSubscribers = new Map<string, Set<() => void>>() |
|
|
|
|
private processingCache = new Set<string>() |
|
|
|
|
private lastProcessedTime = new Map<string, number>() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// Batch processing
|
|
|
|
|
private pendingEvents = new Set<string>() |
|
|
|
|
/** Favorite relays passed from the last fetchNoteStats call per note (used in processSingleEvent). */ |
|
|
|
|
private pendingFetchFavoriteRelays = new Map<string, string[] | null | undefined>() |
|
|
|
|
/** Merged favorite URLs requested while this note was already in {@link processingCache}. */ |
|
|
|
|
private inFlightDeferredFavoriteRelays = new Map<string, string[]>() |
|
|
|
|
private batchTimeout: NodeJS.Timeout | null = null |
|
|
|
|
private readonly BATCH_DELAY = 1000 // 1 second batch delay
|
|
|
|
|
private readonly MAX_BATCH_SIZE = 10 // Process up to 10 events at once
|
|
|
|
|
/** Prevents overlapping processBatch runs (reentrant calls corrupted pendingEvents). */ |
|
|
|
|
private processBatchRunning = false |
|
|
|
|
private readonly BATCH_DELAY = 200 |
|
|
|
|
private readonly MAX_BATCH_SIZE = 24 |
|
|
|
|
|
|
|
|
|
constructor() { |
|
|
|
|
if (!NoteStatsService.instance) { |
|
|
|
|
@ -53,56 +56,84 @@ class NoteStatsService {
@@ -53,56 +56,84 @@ class NoteStatsService {
|
|
|
|
|
return NoteStatsService.instance |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** Merge extra relay URLs into the pending fetch context for this note (deduped). */ |
|
|
|
|
private mergeFavoriteRelaysIntoPending(eventId: string, extra: string[] | null | undefined) { |
|
|
|
|
if (!extra?.length) return |
|
|
|
|
const cur = this.pendingFetchFavoriteRelays.get(eventId) |
|
|
|
|
const merged = new Set<string>([...(cur ?? []), ...extra]) |
|
|
|
|
this.pendingFetchFavoriteRelays.set(eventId, [...merged]) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private mergeFavoriteRelaysIntoDeferred(eventId: string, extra: string[] | null | undefined) { |
|
|
|
|
if (!extra?.length) return |
|
|
|
|
const cur = this.inFlightDeferredFavoriteRelays.get(eventId) |
|
|
|
|
const merged = new Set<string>([...(cur ?? []), ...extra]) |
|
|
|
|
this.inFlightDeferredFavoriteRelays.set(eventId, [...merged]) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private armStatsBatchTimer() { |
|
|
|
|
if (this.batchTimeout) { |
|
|
|
|
clearTimeout(this.batchTimeout) |
|
|
|
|
} |
|
|
|
|
this.batchTimeout = setTimeout(() => { |
|
|
|
|
this.batchTimeout = null |
|
|
|
|
void this.processBatch() |
|
|
|
|
}, this.BATCH_DELAY) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
async fetchNoteStats(event: Event, _pubkey?: string | null, favoriteRelays?: string[] | null) { |
|
|
|
|
const eventId = event.id |
|
|
|
|
|
|
|
|
|
// Rate limiting: Don't process the same event more than once per 10 seconds
|
|
|
|
|
const now = Date.now() |
|
|
|
|
const lastProcessed = this.lastProcessedTime.get(eventId) |
|
|
|
|
if (lastProcessed && now - lastProcessed < 10000) { |
|
|
|
|
logger.debug('[NoteStats] Skipping duplicate fetch for event', eventId.substring(0, 8), 'too soon') |
|
|
|
|
|
|
|
|
|
if (this.pendingEvents.has(eventId)) { |
|
|
|
|
this.mergeFavoriteRelaysIntoPending(eventId, favoriteRelays) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if (this.processingCache.has(eventId)) { |
|
|
|
|
this.mergeFavoriteRelaysIntoDeferred(eventId, favoriteRelays) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
this.pendingFetchFavoriteRelays.set(eventId, favoriteRelays ?? null) |
|
|
|
|
this.pendingEvents.add(eventId) |
|
|
|
|
this.lastProcessedTime.set(eventId, now) |
|
|
|
|
|
|
|
|
|
// Clear existing timeout and set new one
|
|
|
|
|
if (this.batchTimeout) { |
|
|
|
|
clearTimeout(this.batchTimeout) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
this.batchTimeout = setTimeout(() => { |
|
|
|
|
this.processBatch() |
|
|
|
|
}, this.BATCH_DELAY) |
|
|
|
|
|
|
|
|
|
// If we have enough events or this is urgent, process immediately
|
|
|
|
|
if (this.pendingEvents.size >= this.MAX_BATCH_SIZE) { |
|
|
|
|
this.processBatch() |
|
|
|
|
|
|
|
|
|
this.armStatsBatchTimer() |
|
|
|
|
if (this.pendingEvents.size >= this.MAX_BATCH_SIZE && !this.processBatchRunning) { |
|
|
|
|
if (this.batchTimeout) { |
|
|
|
|
clearTimeout(this.batchTimeout) |
|
|
|
|
this.batchTimeout = null |
|
|
|
|
} |
|
|
|
|
void this.processBatch() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
private async processBatch() { |
|
|
|
|
if (this.pendingEvents.size === 0) return |
|
|
|
|
|
|
|
|
|
const eventsToProcess = Array.from(this.pendingEvents).slice(0, this.MAX_BATCH_SIZE) |
|
|
|
|
for (const id of eventsToProcess) { |
|
|
|
|
this.pendingEvents.delete(id) |
|
|
|
|
if (this.processBatchRunning) { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
if (this.pendingEvents.size === 0) { |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
this.processBatchRunning = true |
|
|
|
|
if (this.batchTimeout) { |
|
|
|
|
clearTimeout(this.batchTimeout) |
|
|
|
|
this.batchTimeout = null |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
await Promise.all(eventsToProcess.map((eventId) => this.processSingleEvent(eventId))) |
|
|
|
|
|
|
|
|
|
if (this.pendingEvents.size > 0) { |
|
|
|
|
this.batchTimeout = setTimeout(() => { |
|
|
|
|
this.batchTimeout = null |
|
|
|
|
this.processBatch() |
|
|
|
|
}, this.BATCH_DELAY) |
|
|
|
|
try { |
|
|
|
|
while (this.pendingEvents.size > 0) { |
|
|
|
|
const eventsToProcess = Array.from(this.pendingEvents).slice(0, this.MAX_BATCH_SIZE) |
|
|
|
|
for (const id of eventsToProcess) { |
|
|
|
|
this.pendingEvents.delete(id) |
|
|
|
|
} |
|
|
|
|
await Promise.all(eventsToProcess.map((eventId) => this.processSingleEvent(eventId))) |
|
|
|
|
} |
|
|
|
|
} finally { |
|
|
|
|
this.processBatchRunning = false |
|
|
|
|
if (this.pendingEvents.size > 0) { |
|
|
|
|
this.armStatsBatchTimer() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
@ -154,6 +185,18 @@ class NoteStatsService {
@@ -154,6 +185,18 @@ class NoteStatsService {
|
|
|
|
|
this.notifyNoteStats(event.id) |
|
|
|
|
} finally { |
|
|
|
|
this.processingCache.delete(eventId) |
|
|
|
|
if (this.inFlightDeferredFavoriteRelays.has(eventId)) { |
|
|
|
|
const deferred = this.inFlightDeferredFavoriteRelays.get(eventId)! |
|
|
|
|
this.inFlightDeferredFavoriteRelays.delete(eventId) |
|
|
|
|
if (deferred.length > 0) { |
|
|
|
|
if (this.pendingEvents.has(eventId)) { |
|
|
|
|
this.mergeFavoriteRelaysIntoPending(eventId, deferred) |
|
|
|
|
} else { |
|
|
|
|
this.pendingFetchFavoriteRelays.set(eventId, deferred) |
|
|
|
|
this.pendingEvents.add(eventId) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|