|
|
|
|
@ -1,4 +1,11 @@
@@ -1,4 +1,11 @@
|
|
|
|
|
import { BIG_RELAY_URLS, BOOKSTR_RELAY_URLS, ExtendedKind, FAST_READ_RELAY_URLS, FAST_WRITE_RELAY_URLS, PROFILE_FETCH_RELAY_URLS, PROFILE_RELAY_URLS, SEARCHABLE_RELAY_URLS } from '@/constants' |
|
|
|
|
|
|
|
|
|
/** NIP-01 filter keys only; NIP-50 adds `search` which non-searchable relays reject. */ |
|
|
|
|
function filterForRelay(f: Filter, relaySupportsSearch: boolean): Filter { |
|
|
|
|
if (relaySupportsSearch) return f |
|
|
|
|
const { search: _search, ...rest } = f |
|
|
|
|
return rest as Filter |
|
|
|
|
} |
|
|
|
|
import { |
|
|
|
|
compareEvents, |
|
|
|
|
getReplaceableCoordinate, |
|
|
|
|
@ -65,6 +72,11 @@ class ClientService extends EventTarget {
@@ -65,6 +72,11 @@ class ClientService extends EventTarget {
|
|
|
|
|
tokenize: 'forward' |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
/** Max concurrent REQ subscriptions per relay (many relays enforce ~10; we stay under to avoid NOTICE/rejection) */ |
|
|
|
|
private static readonly MAX_CONCURRENT_SUBS_PER_RELAY = 8 |
|
|
|
|
private activeSubCountByRelay = new Map<string, number>() |
|
|
|
|
private subSlotWaitQueueByRelay = new Map<string, Array<() => void>>() |
|
|
|
|
|
|
|
|
|
constructor() { |
|
|
|
|
super() |
|
|
|
|
this.pool = new SimplePool() |
|
|
|
|
@ -83,6 +95,43 @@ class ClientService extends EventTarget {
@@ -83,6 +95,43 @@ class ClientService extends EventTarget {
|
|
|
|
|
await indexedDb.iterateProfileEvents((profileEvent) => this.addUsernameToIndex(profileEvent)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Acquire a slot to open a new subscription to the given relay. Resolves when we're under the per-relay limit. |
|
|
|
|
* Call releaseSubSlot(relayKey) when the subscription closes (user close() or relay onclose). |
|
|
|
|
*/ |
|
|
|
|
private acquireSubSlot(relayKey: string): Promise<void> { |
|
|
|
|
const count = this.activeSubCountByRelay.get(relayKey) ?? 0 |
|
|
|
|
if (count < ClientService.MAX_CONCURRENT_SUBS_PER_RELAY) { |
|
|
|
|
this.activeSubCountByRelay.set(relayKey, count + 1) |
|
|
|
|
return Promise.resolve() |
|
|
|
|
} |
|
|
|
|
return new Promise<void>((resolve) => { |
|
|
|
|
let queue = this.subSlotWaitQueueByRelay.get(relayKey) |
|
|
|
|
if (!queue) { |
|
|
|
|
queue = [] |
|
|
|
|
this.subSlotWaitQueueByRelay.set(relayKey, queue) |
|
|
|
|
} |
|
|
|
|
queue.push(() => { |
|
|
|
|
const n = this.activeSubCountByRelay.get(relayKey) ?? 0 |
|
|
|
|
this.activeSubCountByRelay.set(relayKey, n + 1) |
|
|
|
|
resolve() |
|
|
|
|
}) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Release a subscription slot for the relay. Wakes the next waiter if any. |
|
|
|
|
*/ |
|
|
|
|
private releaseSubSlot(relayKey: string): void { |
|
|
|
|
const count = (this.activeSubCountByRelay.get(relayKey) ?? 1) - 1 |
|
|
|
|
this.activeSubCountByRelay.set(relayKey, Math.max(0, count)) |
|
|
|
|
const queue = this.subSlotWaitQueueByRelay.get(relayKey) |
|
|
|
|
if (queue?.length) { |
|
|
|
|
const next = queue.shift()! |
|
|
|
|
next() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
/** |
|
|
|
|
* Determine which relays to publish an event to. |
|
|
|
|
* Fallbacks (used when user relay list is empty or fetch fails): |
|
|
|
|
@ -647,35 +696,128 @@ class ClientService extends EventTarget {
@@ -647,35 +696,128 @@ class ClientService extends EventTarget {
|
|
|
|
|
const that = this |
|
|
|
|
const _knownIds = new Set<string>() |
|
|
|
|
|
|
|
|
|
// One request per (relay, filter) so pool groups by relay and sends one REQ per relay with all filters
|
|
|
|
|
const requests = relays.flatMap((url) => |
|
|
|
|
filters.map((f) => ({ url: normalizeUrl(url) || url, filter: f })) |
|
|
|
|
) |
|
|
|
|
// Group by relay (same as pool.subscribeMap) so one REQ per relay with all filters
|
|
|
|
|
const grouped = new Map<string, Filter[]>() |
|
|
|
|
for (const url of relays) { |
|
|
|
|
const key = normalizeUrl(url) || url |
|
|
|
|
if (!grouped.has(key)) grouped.set(key, []) |
|
|
|
|
grouped.get(key)!.push(...filters) |
|
|
|
|
} |
|
|
|
|
const searchableSet = new Set(SEARCHABLE_RELAY_URLS.map((u) => normalizeUrl(u) || u)) |
|
|
|
|
const groupedRequests = Array.from(grouped.entries()).map(([url, f]) => { |
|
|
|
|
const relaySupportsSearch = searchableSet.has(url) |
|
|
|
|
const filtersForRelay = f.map((one) => filterForRelay(one, relaySupportsSearch)) |
|
|
|
|
return { url, filters: filtersForRelay } |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
const poolCloser = this.pool.subscribeMap(requests, { |
|
|
|
|
onevent: (evt: NEvent) => onevent?.(evt), |
|
|
|
|
oneose: () => oneose?.(true), |
|
|
|
|
onclose: (reasons: string[]) => { |
|
|
|
|
relays.forEach((url, i) => onclose?.(url, reasons[i] ?? '')) |
|
|
|
|
onAllClose?.(reasons) |
|
|
|
|
}, |
|
|
|
|
onauth: async (authEvt: EventTemplate) => { |
|
|
|
|
if (that.signer) { |
|
|
|
|
const evt = await that.signer.signEvent(authEvt) |
|
|
|
|
if (!evt) throw new Error('sign event failed') |
|
|
|
|
return evt as VerifiedEvent |
|
|
|
|
const eosesReceived: boolean[] = [] |
|
|
|
|
const closesReceived: (string | undefined)[] = [] |
|
|
|
|
const handleEose = (i: number) => { |
|
|
|
|
if (eosesReceived[i]) return |
|
|
|
|
eosesReceived[i] = true |
|
|
|
|
if (eosesReceived.filter(Boolean).length === groupedRequests.length) { |
|
|
|
|
oneose?.(true) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
const handleClose = (i: number, reason: string) => { |
|
|
|
|
if (closesReceived[i] !== undefined) return |
|
|
|
|
handleEose(i) |
|
|
|
|
closesReceived[i] = reason |
|
|
|
|
const { url } = groupedRequests[i]! |
|
|
|
|
onclose?.(url, reason) |
|
|
|
|
if (closesReceived.every((r) => r !== undefined)) { |
|
|
|
|
onAllClose?.(closesReceived as string[]) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const localAlreadyHaveEvent = (id: string) => { |
|
|
|
|
const have = _knownIds.has(id) |
|
|
|
|
if (have) return true |
|
|
|
|
_knownIds.add(id) |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const subs: { relayKey: string; close: () => void }[] = [] |
|
|
|
|
const allOpened = Promise.all( |
|
|
|
|
groupedRequests.map(async ({ url, filters: relayFilters }, i) => { |
|
|
|
|
const relayKey = normalizeUrl(url) || url |
|
|
|
|
await that.acquireSubSlot(relayKey) |
|
|
|
|
let relay: AbstractRelay |
|
|
|
|
try { |
|
|
|
|
relay = await that.pool.ensureRelay(url, { connectionTimeout: 5000 }) |
|
|
|
|
} catch (err) { |
|
|
|
|
that.releaseSubSlot(relayKey) |
|
|
|
|
handleClose(i, (err as Error)?.message ?? String(err)) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
let slotReleased = false |
|
|
|
|
const releaseOnce = () => { |
|
|
|
|
if (!slotReleased) { |
|
|
|
|
slotReleased = true |
|
|
|
|
that.releaseSubSlot(relayKey) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
startLogin?.() |
|
|
|
|
throw new Error('Login required') |
|
|
|
|
}, |
|
|
|
|
alreadyHaveEvent: (id: string) => { |
|
|
|
|
const have = _knownIds.has(id) |
|
|
|
|
if (have) return true |
|
|
|
|
_knownIds.add(id) |
|
|
|
|
return false |
|
|
|
|
}, |
|
|
|
|
eoseTimeout: 10_000 |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
const sub = relay.subscribe(relayFilters, { |
|
|
|
|
receivedEvent: (_relay, id) => that.trackEventSeenOn(id, _relay), |
|
|
|
|
onevent: (evt: NEvent) => onevent?.(evt), |
|
|
|
|
oneose: () => handleEose(i), |
|
|
|
|
onclose: (reason: string) => { |
|
|
|
|
releaseOnce() |
|
|
|
|
if (reason.startsWith('auth-required: ') && that.signer) { |
|
|
|
|
relay.auth(async (authEvt: EventTemplate) => { |
|
|
|
|
const evt = await that.signer!.signEvent(authEvt) |
|
|
|
|
if (!evt) throw new Error('sign event failed') |
|
|
|
|
return evt as VerifiedEvent |
|
|
|
|
}).then(() => that.acquireSubSlot(relayKey)).then(() => { |
|
|
|
|
let slotReleased2 = false |
|
|
|
|
const releaseSlot2 = () => { |
|
|
|
|
if (!slotReleased2) { |
|
|
|
|
slotReleased2 = true |
|
|
|
|
that.releaseSubSlot(relayKey) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
const sub2 = relay.subscribe(relayFilters, { |
|
|
|
|
receivedEvent: (_relay, id) => that.trackEventSeenOn(id, _relay), |
|
|
|
|
onevent: (evt: NEvent) => onevent?.(evt), |
|
|
|
|
oneose: () => handleEose(i), |
|
|
|
|
onclose: (reason2: string) => { |
|
|
|
|
releaseSlot2() |
|
|
|
|
handleClose(i, reason2) |
|
|
|
|
}, |
|
|
|
|
alreadyHaveEvent: localAlreadyHaveEvent, |
|
|
|
|
eoseTimeout: 10_000 |
|
|
|
|
}) |
|
|
|
|
subs.push({ |
|
|
|
|
relayKey, |
|
|
|
|
close: () => { |
|
|
|
|
releaseSlot2() |
|
|
|
|
sub2.close() |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
}).catch((err) => { |
|
|
|
|
handleClose(i, `auth failed: ${(err as Error)?.message ?? err}`) |
|
|
|
|
}) |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
if (reason.startsWith('auth-required: ')) { |
|
|
|
|
startLogin?.() |
|
|
|
|
} |
|
|
|
|
handleClose(i, reason) |
|
|
|
|
}, |
|
|
|
|
alreadyHaveEvent: localAlreadyHaveEvent, |
|
|
|
|
eoseTimeout: 10_000 |
|
|
|
|
}) |
|
|
|
|
subs.push({ |
|
|
|
|
relayKey, |
|
|
|
|
close: () => { |
|
|
|
|
releaseOnce() |
|
|
|
|
sub.close() |
|
|
|
|
} |
|
|
|
|
}) |
|
|
|
|
}) |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
const handleNewEventFromInternal = (data: Event) => { |
|
|
|
|
const customEvent = data as CustomEvent<NEvent> |
|
|
|
|
@ -695,7 +837,9 @@ class ClientService extends EventTarget {
@@ -695,7 +837,9 @@ class ClientService extends EventTarget {
|
|
|
|
|
return { |
|
|
|
|
close: () => { |
|
|
|
|
this.removeEventListener('newEvent', handleNewEventFromInternal) |
|
|
|
|
poolCloser.close() |
|
|
|
|
allOpened.then(() => { |
|
|
|
|
subs.forEach(({ close: subClose }) => subClose()) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|