You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

435 lines
15 KiB

import { KIND_1_BLOCKED_RELAY_URLS, SEARCHABLE_RELAY_URLS } from '@/constants'
import logger from '@/lib/logger'
import { normalizeUrl } from '@/lib/url'
import type { Filter, Event as NEvent } from 'nostr-tools'
import { SimplePool, EventTemplate, VerifiedEvent } from 'nostr-tools'
import type { AbstractRelay } from 'nostr-tools/abstract-relay'
import nip66Service from './nip66.service'
import type { ISigner, TSignerType } from '@/types'
/** NIP-01 filter keys only; NIP-50 adds `search` which non-searchable relays reject. */
function filterForRelay(f: Filter, relaySupportsSearch: boolean): Filter {
if (relaySupportsSearch) return f
const { search: _search, ...rest } = f
return rest as Filter
}
export interface QueryOptions {
eoseTimeout?: number
globalTimeout?: number
/** For replaceable events: race strategy - wait 2s after first result, then return best */
replaceableRace?: boolean
/** For non-replaceable single events: return immediately on first match */
immediateReturn?: boolean
}
export interface SubscribeCallbacks {
onevent?: (evt: NEvent) => void
oneose?: (eosed: boolean) => void
onclose?: (url: string, reason: string) => void
startLogin?: () => void
onAllClose?: (reasons: string[]) => void
}
export class QueryService {
private pool: SimplePool
private signer?: ISigner
private signerType?: TSignerType
/** Max concurrent REQ subscriptions per relay */
private static readonly MAX_CONCURRENT_SUBS_PER_RELAY = 8
private activeSubCountByRelay = new Map<string, number>()
private subSlotWaitQueueByRelay = new Map<string, Array<() => void>>()
private eventSeenOnRelays = new Map<string, Set<string>>()
constructor(pool: SimplePool) {
this.pool = pool
}
setSigner(signer: ISigner | undefined, signerType: TSignerType | undefined) {
this.signer = signer
this.signerType = signerType
}
private canSignerAuthenticateRelay(): boolean {
if (!this.signer) return false
if (this.signerType === 'npub') return false
return true
}
async acquireSubSlot(relayKey: string): Promise<void> {
const count = this.activeSubCountByRelay.get(relayKey) ?? 0
if (count < QueryService.MAX_CONCURRENT_SUBS_PER_RELAY) {
this.activeSubCountByRelay.set(relayKey, count + 1)
return Promise.resolve()
}
return new Promise<void>((resolve) => {
let queue = this.subSlotWaitQueueByRelay.get(relayKey)
if (!queue) {
queue = []
this.subSlotWaitQueueByRelay.set(relayKey, queue)
}
queue.push(() => {
const n = this.activeSubCountByRelay.get(relayKey) ?? 0
this.activeSubCountByRelay.set(relayKey, n + 1)
resolve()
})
})
}
releaseSubSlot(relayKey: string): void {
const count = (this.activeSubCountByRelay.get(relayKey) ?? 1) - 1
this.activeSubCountByRelay.set(relayKey, Math.max(0, count))
const queue = this.subSlotWaitQueueByRelay.get(relayKey)
if (queue?.length) {
const next = queue.shift()!
next()
}
}
trackEventSeenOn(eventId: string, relay: AbstractRelay): void {
const url = relay.url
let set = this.eventSeenOnRelays.get(eventId)
if (!set) {
set = new Set()
this.eventSeenOnRelays.set(eventId, set)
}
set.add(url)
}
getSeenEventRelayUrls(eventId: string): string[] {
return Array.from(this.eventSeenOnRelays.get(eventId) ?? [])
}
/**
* Core query method with race-based fetching strategies
*/
async query(
urls: string[],
filter: Filter | Filter[],
onevent?: (evt: NEvent) => void,
options?: QueryOptions
): Promise<NEvent[]> {
const eoseTimeout = options?.eoseTimeout ?? 500
const globalTimeout = options?.globalTimeout ?? 10000
const replaceableRace = options?.replaceableRace ?? false
const immediateReturn = options?.immediateReturn ?? false
const isExternalSearch = eoseTimeout > 1000
if (isExternalSearch) {
logger.debug('query: Starting external relay search', {
relayCount: urls.length,
relays: urls,
eoseTimeout,
globalTimeout,
replaceableRace,
immediateReturn,
filter: Array.isArray(filter) ? filter : [filter]
})
}
const FIRST_RESULT_GRACE_MS = 1200
const REPLACEABLE_RACE_WAIT_MS = 2000
return await new Promise<NEvent[]>((resolve) => {
const events: NEvent[] = []
let resolveTimeout: ReturnType<typeof setTimeout> | null = null
let firstResultGraceTimeoutId: ReturnType<typeof setTimeout> | null = null
let replaceableRaceTimeoutId: ReturnType<typeof setTimeout> | null = null
let allEosed = false
let eventCount = 0
let resolved = false
let firstResultTime: number | null = null
let globalTimeoutId: ReturnType<typeof setTimeout> | null = null
const resolveWithEvents = () => {
if (resolved) return
resolved = true
if (resolveTimeout) clearTimeout(resolveTimeout)
if (firstResultGraceTimeoutId) clearTimeout(firstResultGraceTimeoutId)
if (replaceableRaceTimeoutId) clearTimeout(replaceableRaceTimeoutId)
if (globalTimeoutId) clearTimeout(globalTimeoutId)
sub.close()
if (replaceableRace && events.length > 0) {
const bestEvent = events.reduce((best, current) =>
current.created_at > best.created_at ? current : best
)
resolve([bestEvent])
} else {
resolve(events)
}
}
const sub = this.subscribe(urls, filter, {
onevent(evt) {
eventCount++
onevent?.(evt)
events.push(evt)
if (firstResultTime === null) {
firstResultTime = Date.now()
}
const filters = Array.isArray(filter) ? filter : [filter]
const maxLimit = Math.max(...filters.map((f) => (f.limit ?? 0) as number), 0)
const isSingleEventFetch = maxLimit === 1
const hasIdFilter = filters.some(f => f.ids && f.ids.length > 0)
if (immediateReturn && hasIdFilter && isSingleEventFetch && events.length > 0) {
resolveWithEvents()
return
}
if (replaceableRace && firstResultTime !== null && !replaceableRaceTimeoutId) {
replaceableRaceTimeoutId = setTimeout(() => {
replaceableRaceTimeoutId = null
resolveWithEvents()
}, REPLACEABLE_RACE_WAIT_MS)
}
if (!replaceableRace && !immediateReturn && isSingleEventFetch && events.length === 1 && !firstResultGraceTimeoutId) {
firstResultGraceTimeoutId = setTimeout(() => {
firstResultGraceTimeoutId = null
resolveWithEvents()
}, FIRST_RESULT_GRACE_MS)
}
if (hasIdFilter && isSingleEventFetch && events.length > 0 && allEosed && !replaceableRace && !immediateReturn) {
if (firstResultGraceTimeoutId) clearTimeout(firstResultGraceTimeoutId)
if (resolveTimeout) clearTimeout(resolveTimeout)
resolveTimeout = setTimeout(() => resolveWithEvents(), 100)
}
},
oneose: (eosed) => {
if (eosed) {
allEosed = true
if (replaceableRace) {
if (events.length > 0 && replaceableRaceTimeoutId) return
if (events.length > 0) {
resolveWithEvents()
return
}
}
if (immediateReturn && events.length > 0) {
resolveWithEvents()
return
}
if (firstResultGraceTimeoutId) clearTimeout(firstResultGraceTimeoutId)
if (resolveTimeout) clearTimeout(resolveTimeout)
resolveTimeout = setTimeout(() => resolveWithEvents(), eoseTimeout)
}
},
onclose: (_url, _reason) => {
if (allEosed) return
if (events.length > 0 && !resolveTimeout) {
resolveTimeout = setTimeout(() => resolveWithEvents(), 1000)
}
}
})
globalTimeoutId = setTimeout(() => resolveWithEvents(), globalTimeout)
})
}
/**
* Subscribe to events from relays
*/
subscribe(
urls: string[],
filter: Filter | Filter[],
callbacks: SubscribeCallbacks
): { close: () => void } {
let relays = Array.from(new Set(urls))
const filters = Array.isArray(filter) ? filter : [filter]
const hasKind1 = filters.some((f) => f.kinds && (Array.isArray(f.kinds) ? f.kinds.includes(1) : f.kinds === 1))
if (hasKind1 && KIND_1_BLOCKED_RELAY_URLS.length > 0) {
const kind1BlockedSet = new Set(KIND_1_BLOCKED_RELAY_URLS.map((u) => normalizeUrl(u) || u))
relays = relays.filter((url) => !kind1BlockedSet.has(normalizeUrl(url) || url))
}
const _knownIds = new Set<string>()
const grouped = new Map<string, Filter[]>()
for (const url of relays) {
const key = normalizeUrl(url) || url
if (!grouped.has(key)) grouped.set(key, [])
grouped.get(key)!.push(...filters)
}
const searchableSet = new Set([
...SEARCHABLE_RELAY_URLS.map((u) => normalizeUrl(u) || u),
...nip66Service.getSearchableRelayUrls().map((u) => normalizeUrl(u) || u)
])
const groupedRequests = Array.from(grouped.entries()).map(([url, f]) => {
const relaySupportsSearch = searchableSet.has(url) || nip66Service.isRelaySearchable(url)
const filtersForRelay = f.map((one) => filterForRelay(one, relaySupportsSearch))
return { url, filters: filtersForRelay }
})
const eosesReceived: boolean[] = []
const closesReceived: (string | undefined)[] = []
const handleEose = (i: number) => {
if (eosesReceived[i]) return
eosesReceived[i] = true
if (eosesReceived.filter(Boolean).length === groupedRequests.length) {
callbacks.oneose?.(true)
}
}
const handleClose = (i: number, reason: string) => {
if (closesReceived[i] !== undefined) return
handleEose(i)
closesReceived[i] = reason
const { url } = groupedRequests[i]!
callbacks.onclose?.(url, reason)
if (closesReceived.every((r) => r !== undefined)) {
callbacks.onAllClose?.(closesReceived as string[])
}
}
const localAlreadyHaveEvent = (id: string) => {
const have = _knownIds.has(id)
if (have) return true
_knownIds.add(id)
return false
}
const subs: { relayKey: string; close: () => void }[] = []
const allOpened = Promise.all(
groupedRequests.map(async ({ url, filters: relayFilters }, i) => {
const relayKey = normalizeUrl(url) || url
await this.acquireSubSlot(relayKey)
let relay: AbstractRelay
try {
relay = await this.pool.ensureRelay(url, { connectionTimeout: 5000 })
} catch (err) {
this.releaseSubSlot(relayKey)
handleClose(i, (err as Error)?.message ?? String(err))
return
}
let slotReleased = false
const releaseOnce = () => {
if (!slotReleased) {
slotReleased = true
this.releaseSubSlot(relayKey)
}
}
const sub = relay.subscribe(relayFilters, {
receivedEvent: (_relay, id) => this.trackEventSeenOn(id, _relay),
onevent: (evt: NEvent) => callbacks.onevent?.(evt),
oneose: () => handleEose(i),
onclose: (reason: string) => {
releaseOnce()
if (reason.startsWith('auth-required: ') && this.canSignerAuthenticateRelay()) {
relay
.auth(async (authEvt: EventTemplate) => {
const evt = await this.signer!.signEvent(authEvt)
if (!evt) throw new Error('sign event failed')
return evt as VerifiedEvent
})
.then(async () => {
await this.acquireSubSlot(relayKey)
let liveRelay: AbstractRelay
try {
liveRelay = await this.pool.ensureRelay(url, { connectionTimeout: 5000 })
} catch (err) {
this.releaseSubSlot(relayKey)
handleClose(i, (err as Error)?.message ?? String(err))
return
}
let slotReleased2 = false
const releaseSlot2 = () => {
if (!slotReleased2) {
slotReleased2 = true
this.releaseSubSlot(relayKey)
}
}
try {
const sub2 = liveRelay.subscribe(relayFilters, {
receivedEvent: (_relay, id) => this.trackEventSeenOn(id, _relay),
onevent: (evt: NEvent) => callbacks.onevent?.(evt),
oneose: () => handleEose(i),
onclose: (reason2: string) => {
releaseSlot2()
handleClose(i, reason2)
},
alreadyHaveEvent: localAlreadyHaveEvent,
eoseTimeout: 10_000
})
subs.push({
relayKey,
close: () => {
releaseSlot2()
sub2.close()
}
})
} catch (err) {
releaseSlot2()
handleClose(i, (err as Error)?.message ?? String(err))
}
})
.catch((err) => {
handleClose(i, `auth failed: ${(err as Error)?.message ?? err}`)
})
return
}
if (reason.startsWith('auth-required: ')) {
callbacks.startLogin?.()
}
handleClose(i, reason)
},
alreadyHaveEvent: localAlreadyHaveEvent,
eoseTimeout: 10_000
})
subs.push({
relayKey,
close: () => {
releaseOnce()
sub.close()
}
})
})
)
return {
close: () => {
allOpened.then(() => {
subs.forEach(({ close: subClose }) => subClose())
})
}
}
}
/**
* Fetch events with caching support
*/
async fetchEvents(
urls: string[],
filter: Filter | Filter[],
options?: {
onevent?: (evt: NEvent) => void
eoseTimeout?: number
globalTimeout?: number
} & QueryOptions
): Promise<NEvent[]> {
let relays = Array.from(new Set(urls))
if (relays.length === 0) {
const { FAST_READ_RELAY_URLS } = await import('@/constants')
relays = [...FAST_READ_RELAY_URLS]
}
const filters = Array.isArray(filter) ? filter : [filter]
const hasKind1 = filters.some((f) => f.kinds && (Array.isArray(f.kinds) ? f.kinds.includes(1) : f.kinds === 1))
if (hasKind1 && KIND_1_BLOCKED_RELAY_URLS.length > 0) {
const kind1BlockedSet = new Set(KIND_1_BLOCKED_RELAY_URLS.map((u) => normalizeUrl(u) || u))
relays = relays.filter((url) => !kind1BlockedSet.has(normalizeUrl(url) || url))
}
return this.query(relays, filter, options?.onevent, options)
}
}