You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

146 lines
3.9 KiB

/**
* Subscription manager for Nostr subscriptions
*/
import { relayPool } from './relay-pool.js';
import type { NostrEvent, NostrFilter } from '../../types/nostr.js';
export type EventCallback = (event: NostrEvent, relay: string) => void;
export type EoseCallback = (relay: string) => void;
class SubscriptionManager {
private subscriptions: Map<string, Subscription> = new Map();
private nextSubId = 1;
/**
* Create a new subscription
*/
subscribe(
subId: string,
relays: string[],
filters: NostrFilter[],
onEvent: EventCallback,
onEose?: EoseCallback
): void {
// Close existing subscription if any
this.unsubscribe(subId);
const subscription: Subscription = {
id: subId,
relays,
filters,
onEvent,
onEose,
messageHandlers: new Map()
};
// Set up message handlers for each relay
for (const relayUrl of relays) {
const ws = relayPool.getRelay(relayUrl);
if (!ws) continue;
const handler = (event: MessageEvent) => {
try {
const data = JSON.parse(event.data);
if (Array.isArray(data)) {
const [type, ...rest] = data;
if (type === 'EVENT' && rest[0] === subId) {
const event = rest[1] as NostrEvent;
if (this.matchesFilters(event, filters)) {
onEvent(event, relayUrl);
}
} else if (type === 'EOSE' && rest[0] === subId) {
onEose?.(relayUrl);
}
}
} catch (error) {
console.error('Error parsing relay message:', error);
}
};
ws.addEventListener('message', handler);
subscription.messageHandlers.set(relayUrl, handler);
// Send subscription request
const message = JSON.stringify(['REQ', subId, ...filters]);
relayPool.send(relayUrl, message);
}
this.subscriptions.set(subId, subscription);
}
/**
* Check if event matches filters
*/
private matchesFilters(event: NostrEvent, filters: NostrFilter[]): boolean {
return filters.some((filter) => {
if (filter.ids && !filter.ids.includes(event.id)) return false;
if (filter.authors && !filter.authors.includes(event.pubkey)) return false;
if (filter.kinds && !filter.kinds.includes(event.kind)) return false;
if (filter.since && event.created_at < filter.since) return false;
if (filter.until && event.created_at > filter.until) return false;
// Tag filters
if (filter['#e']) {
const hasE = event.tags.some((t) => t[0] === 'e' && filter['#e']!.includes(t[1]));
if (!hasE) return false;
}
if (filter['#p']) {
const hasP = event.tags.some((t) => t[0] === 'p' && filter['#p']!.includes(t[1]));
if (!hasP) return false;
}
return true;
});
}
/**
* Unsubscribe from a subscription
*/
unsubscribe(subId: string): void {
const subscription = this.subscriptions.get(subId);
if (!subscription) return;
// Remove message handlers
for (const [relayUrl, handler] of subscription.messageHandlers.entries()) {
const ws = relayPool.getRelay(relayUrl);
if (ws) {
ws.removeEventListener('message', handler);
}
// Send close message
const message = JSON.stringify(['CLOSE', subId]);
relayPool.send(relayUrl, message);
}
this.subscriptions.delete(subId);
}
/**
* Generate a unique subscription ID
*/
generateSubId(): string {
return `sub_${this.nextSubId++}_${Date.now()}`;
}
/**
* Close all subscriptions
*/
closeAll(): void {
for (const subId of this.subscriptions.keys()) {
this.unsubscribe(subId);
}
}
}
interface Subscription {
id: string;
relays: string[];
filters: NostrFilter[];
onEvent: EventCallback;
onEose?: EoseCallback;
messageHandlers: Map<string, (event: MessageEvent) => void>;
}
export const subscriptionManager = new SubscriptionManager();