/** * Subscription manager for Nostr subscriptions */ import { relayPool } from './relay-pool.js'; import type { NostrEvent } from '../../types/nostr.js'; export interface NostrFilter { ids?: string[]; authors?: string[]; kinds?: number[]; '#e'?: string[]; '#p'?: string[]; since?: number; until?: number; limit?: number; } export type EventCallback = (event: NostrEvent, relay: string) => void; export type EoseCallback = (relay: string) => void; class SubscriptionManager { private subscriptions: Map = new Map(); private nextSubId = 1; /** * Create a new subscription */ subscribe( subId: string, relays: string[], filters: NostrFilter[], onEvent: EventCallback, onEose?: EoseCallback ): void { // Close existing subscription if any this.unsubscribe(subId); const subscription: Subscription = { id: subId, relays, filters, onEvent, onEose, messageHandlers: new Map() }; // Set up message handlers for each relay for (const relayUrl of relays) { const ws = relayPool.getRelay(relayUrl); if (!ws) continue; const handler = (event: MessageEvent) => { try { const data = JSON.parse(event.data); if (Array.isArray(data)) { const [type, ...rest] = data; if (type === 'EVENT' && rest[0] === subId) { const event = rest[1] as NostrEvent; if (this.matchesFilters(event, filters)) { onEvent(event, relayUrl); } } else if (type === 'EOSE' && rest[0] === subId) { onEose?.(relayUrl); } } } catch (error) { console.error('Error parsing relay message:', error); } }; ws.addEventListener('message', handler); subscription.messageHandlers.set(relayUrl, handler); // Send subscription request const message = JSON.stringify(['REQ', subId, ...filters]); relayPool.send(relayUrl, message); } this.subscriptions.set(subId, subscription); } /** * Check if event matches filters */ private matchesFilters(event: NostrEvent, filters: NostrFilter[]): boolean { return filters.some((filter) => { if (filter.ids && !filter.ids.includes(event.id)) return false; if (filter.authors && !filter.authors.includes(event.pubkey)) return false; if (filter.kinds && !filter.kinds.includes(event.kind)) return false; if (filter.since && event.created_at < filter.since) return false; if (filter.until && event.created_at > filter.until) return false; // Tag filters if (filter['#e']) { const hasE = event.tags.some((t) => t[0] === 'e' && filter['#e']!.includes(t[1])); if (!hasE) return false; } if (filter['#p']) { const hasP = event.tags.some((t) => t[0] === 'p' && filter['#p']!.includes(t[1])); if (!hasP) return false; } return true; }); } /** * Unsubscribe from a subscription */ unsubscribe(subId: string): void { const subscription = this.subscriptions.get(subId); if (!subscription) return; // Remove message handlers for (const [relayUrl, handler] of subscription.messageHandlers.entries()) { const ws = relayPool.getRelay(relayUrl); if (ws) { ws.removeEventListener('message', handler); } // Send close message const message = JSON.stringify(['CLOSE', subId]); relayPool.send(relayUrl, message); } this.subscriptions.delete(subId); } /** * Generate a unique subscription ID */ generateSubId(): string { return `sub_${this.nextSubId++}_${Date.now()}`; } /** * Close all subscriptions */ closeAll(): void { for (const subId of this.subscriptions.keys()) { this.unsubscribe(subId); } } } interface Subscription { id: string; relays: string[]; filters: NostrFilter[]; onEvent: EventCallback; onEose?: EoseCallback; messageHandlers: Map void>; } export const subscriptionManager = new SubscriptionManager();