|
|
|
@ -222,6 +222,16 @@ export interface QueryOptions { |
|
|
|
firstRelayResultGraceMs?: number | false |
|
|
|
firstRelayResultGraceMs?: number | false |
|
|
|
/** Label for {@link RelaySubscribeOpBatch} when this query opens REQs. */ |
|
|
|
/** Label for {@link RelaySubscribeOpBatch} when this query opens REQs. */ |
|
|
|
relayOpSource?: string |
|
|
|
relayOpSource?: string |
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* When aborted (e.g. React effect cleanup / HMR), closes WS + HTTP index work promptly instead of waiting for |
|
|
|
|
|
|
|
* {@link globalTimeout}. Prevents overlapping NIP-50 shards from stacking until the tab OOMs. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
signal?: AbortSignal |
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* When true, this query ignores {@link QueryService.interruptBackgroundQueries} (e.g. NIP-50 shard with its own |
|
|
|
|
|
|
|
* AbortController, or other foreground work that must not be tied to the global background token). |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
foreground?: boolean |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
export interface SubscribeCallbacks { |
|
|
|
export interface SubscribeCallbacks { |
|
|
|
@ -255,6 +265,21 @@ export class QueryService { |
|
|
|
private globalRelayConnectionSlotsInUse = 0 |
|
|
|
private globalRelayConnectionSlotsInUse = 0 |
|
|
|
private globalRelayConnectionWaitQueue: Array<() => void> = [] |
|
|
|
private globalRelayConnectionWaitQueue: Array<() => void> = [] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* Aborted whenever {@link interruptBackgroundQueries} runs. Default {@link query} runs listen until close so |
|
|
|
|
|
|
|
* feed / prefetch / replaceable fetches yield to search and publish. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
private backgroundInterruptController = new AbortController() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
|
|
|
* Best-effort: abort in-flight {@link query} calls that did not pass `foreground: true`, then reset the token so |
|
|
|
|
|
|
|
* new background work uses a fresh signal. |
|
|
|
|
|
|
|
*/ |
|
|
|
|
|
|
|
interruptBackgroundQueries(): void { |
|
|
|
|
|
|
|
this.backgroundInterruptController.abort() |
|
|
|
|
|
|
|
this.backgroundInterruptController = new AbortController() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
async acquireGlobalRelayConnectionSlot(): Promise<void> { |
|
|
|
async acquireGlobalRelayConnectionSlot(): Promise<void> { |
|
|
|
if (this.globalRelayConnectionSlotsInUse < MAX_CONCURRENT_RELAY_CONNECTIONS) { |
|
|
|
if (this.globalRelayConnectionSlotsInUse < MAX_CONCURRENT_RELAY_CONNECTIONS) { |
|
|
|
this.globalRelayConnectionSlotsInUse++ |
|
|
|
this.globalRelayConnectionSlotsInUse++ |
|
|
|
@ -359,6 +384,7 @@ export class QueryService { |
|
|
|
): Promise<NEvent[]> { |
|
|
|
): Promise<NEvent[]> { |
|
|
|
const sanitizedFilters = sanitizeFiltersBeforeReq(filter) |
|
|
|
const sanitizedFilters = sanitizeFiltersBeforeReq(filter) |
|
|
|
if (sanitizedFilters.length === 0) return [] |
|
|
|
if (sanitizedFilters.length === 0) return [] |
|
|
|
|
|
|
|
if (options?.signal?.aborted) return [] |
|
|
|
|
|
|
|
|
|
|
|
const maxFilters = RELAY_REQ_MAX_FILTERS_PER_MESSAGE |
|
|
|
const maxFilters = RELAY_REQ_MAX_FILTERS_PER_MESSAGE |
|
|
|
if (sanitizedFilters.length > maxFilters) { |
|
|
|
if (sanitizedFilters.length > maxFilters) { |
|
|
|
@ -418,10 +444,12 @@ export class QueryService { |
|
|
|
const reqId = ++queryReqSeq |
|
|
|
const reqId = ++queryReqSeq |
|
|
|
const source = options?.relayOpSource ?? 'QueryService.query' |
|
|
|
const source = options?.relayOpSource ?? 'QueryService.query' |
|
|
|
const inputRelaysOrdered = Array.from(new Set(urls.map((u) => normalizeUrl(u) || u).filter(Boolean))) |
|
|
|
const inputRelaysOrdered = Array.from(new Set(urls.map((u) => normalizeUrl(u) || u).filter(Boolean))) |
|
|
|
const wsRelayCandidates = Array.from(new Set(wsQueryUrls.map((u) => normalizeUrl(u) || u).filter(Boolean))) |
|
|
|
|
|
|
|
|
|
|
|
const foreground = options?.foreground === true |
|
|
|
|
|
|
|
|
|
|
|
return await new Promise<NEvent[]>((resolve) => { |
|
|
|
return await new Promise<NEvent[]>((resolve) => { |
|
|
|
const events: NEvent[] = [] |
|
|
|
const events: NEvent[] = [] |
|
|
|
|
|
|
|
const cancelAbortRegistrations: Array<() => void> = [] |
|
|
|
const abortHttp = new AbortController() |
|
|
|
const abortHttp = new AbortController() |
|
|
|
let resolveTimeout: ReturnType<typeof setTimeout> | null = null |
|
|
|
let resolveTimeout: ReturnType<typeof setTimeout> | null = null |
|
|
|
let firstResultGraceTimeoutId: ReturnType<typeof setTimeout> | null = null |
|
|
|
let firstResultGraceTimeoutId: ReturnType<typeof setTimeout> | null = null |
|
|
|
@ -523,6 +551,10 @@ export class QueryService { |
|
|
|
*/ |
|
|
|
*/ |
|
|
|
const finalizeOnce = () => { |
|
|
|
const finalizeOnce = () => { |
|
|
|
if (resolved) return |
|
|
|
if (resolved) return |
|
|
|
|
|
|
|
for (const detach of cancelAbortRegistrations) { |
|
|
|
|
|
|
|
detach() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
cancelAbortRegistrations.length = 0 |
|
|
|
resolved = true |
|
|
|
resolved = true |
|
|
|
if (resolveTimeout) clearTimeout(resolveTimeout) |
|
|
|
if (resolveTimeout) clearTimeout(resolveTimeout) |
|
|
|
if (firstResultGraceTimeoutId) clearTimeout(firstResultGraceTimeoutId) |
|
|
|
if (firstResultGraceTimeoutId) clearTimeout(firstResultGraceTimeoutId) |
|
|
|
@ -664,6 +696,29 @@ export class QueryService { |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
const onAbortQuery = () => { |
|
|
|
|
|
|
|
sub.close() |
|
|
|
|
|
|
|
resolveWithEvents() |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
const registerQueryAbort = (sig: AbortSignal) => { |
|
|
|
|
|
|
|
if (sig.aborted) { |
|
|
|
|
|
|
|
queueMicrotask(() => { |
|
|
|
|
|
|
|
onAbortQuery() |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
return |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
sig.addEventListener('abort', onAbortQuery, { once: true }) |
|
|
|
|
|
|
|
cancelAbortRegistrations.push(() => { |
|
|
|
|
|
|
|
sig.removeEventListener('abort', onAbortQuery) |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (!foreground) { |
|
|
|
|
|
|
|
registerQueryAbort(this.backgroundInterruptController.signal) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if (options?.signal) { |
|
|
|
|
|
|
|
registerQueryAbort(options.signal) |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
globalTimeoutId = setTimeout(() => resolveWithEvents(), globalTimeout) |
|
|
|
globalTimeoutId = setTimeout(() => resolveWithEvents(), globalTimeout) |
|
|
|
}) |
|
|
|
}) |
|
|
|
} |
|
|
|
} |
|
|
|
|