You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

584 lines
18 KiB

/**
* Nostr client using nostr-tools
* Main interface for Nostr operations using only nostr-tools
*/
import { Relay, type Filter, matchFilter } from 'nostr-tools';
import { config } from './config.js';
import type { NostrEvent } from '../../types/nostr.js';
import { cacheEvent, cacheEvents, getEvent, getEventsByKind, getEventsByPubkey } from '../cache/event-cache.js';
import { filterEvents, shouldHideEvent } from '../event-filter.js';
export interface PublishOptions {
relays?: string[];
skipRelayValidation?: boolean;
}
class NostrClient {
private initialized = false;
private relays: Map<string, Relay> = new Map();
private subscriptions: Map<string, { relay: Relay; sub: any }> = new Map();
private nextSubId = 1;
private activeFetches: Map<string, Promise<NostrEvent[]>> = new Map(); // Track active fetches to prevent duplicates
/**
* Initialize the client
*/
async initialize(): Promise<void> {
if (this.initialized) return;
// Set up global error handler for unhandled promise rejections from relays
if (typeof window !== 'undefined' && !(window as any).__nostrErrorHandlerSet) {
(window as any).__nostrErrorHandlerSet = true;
window.addEventListener('unhandledrejection', (event) => {
const error = event.reason;
if (error && typeof error === 'object') {
const errorMessage = error.message || String(error);
if (errorMessage.includes('SendingOnClosedConnection') || errorMessage.includes('closed')) {
// Suppress these errors as they're handled by our connection management
event.preventDefault();
console.debug('Suppressed closed connection error:', errorMessage);
}
}
});
}
// Connect to default relays with timeout
const connectionPromises = config.defaultRelays.map(async (url) => {
try {
// Add timeout to each connection attempt
await Promise.race([
this.addRelay(url),
new Promise((_, reject) =>
setTimeout(() => reject(new Error('Connection timeout')), 10000)
)
]);
console.log(`Connected to relay: ${url}`);
} catch (error) {
console.warn(`Failed to connect to relay ${url}:`, error);
}
});
// Wait for all connection attempts (don't fail if some fail)
await Promise.allSettled(connectionPromises);
const connectedCount = this.relays.size;
console.log(`Initialized with ${connectedCount}/${config.defaultRelays.length} relays connected`);
this.initialized = true;
}
/**
* Add a relay connection
*/
async addRelay(url: string): Promise<void> {
if (this.relays.has(url)) return;
try {
const relay = await Relay.connect(url);
this.relays.set(url, relay);
} catch (error) {
console.error(`Failed to connect to relay ${url}:`, error);
throw error;
}
}
/**
* Remove a relay connection
*/
async removeRelay(url: string): Promise<void> {
const relay = this.relays.get(url);
if (relay) {
try {
relay.close();
} catch (error) {
// Ignore errors when closing
}
this.relays.delete(url);
}
}
/**
* Check if a relay is still connected and remove it if closed
*/
private checkAndCleanupRelay(relayUrl: string): boolean {
const relay = this.relays.get(relayUrl);
if (!relay) return false;
// Check relay status: 0 = CONNECTING, 1 = OPEN, 2 = CLOSING, 3 = CLOSED
const status = (relay as any).status;
if (status === 3) {
// Relay is closed, remove it
this.relays.delete(relayUrl);
return false;
}
return true;
}
/**
* Add event to cache
*/
private addToCache(event: NostrEvent): void {
// Cache to IndexedDB
cacheEvent(event).catch((error) => {
console.error('Error caching event:', error);
});
}
/**
* Get events from cache that match filters
*/
private async getCachedEvents(filters: Filter[]): Promise<NostrEvent[]> {
const results: NostrEvent[] = [];
const seen = new Set<string>();
// Query IndexedDB for each filter
for (const filter of filters) {
if (filter.kinds && filter.kinds.length === 1) {
const events = await getEventsByKind(filter.kinds[0], filter.limit || 50);
for (const event of events) {
if (seen.has(event.id)) continue;
if (matchFilter(filter, event)) {
results.push(event);
seen.add(event.id);
}
}
}
if (filter.authors && filter.authors.length === 1) {
const events = await getEventsByPubkey(filter.authors[0], filter.limit || 50);
for (const event of events) {
if (seen.has(event.id)) continue;
if (matchFilter(filter, event)) {
results.push(event);
seen.add(event.id);
}
}
}
}
return filterEvents(results);
}
/**
* Publish an event to relays
*/
async publish(event: NostrEvent, options: PublishOptions = {}): Promise<{
success: string[];
failed: Array<{ relay: string; error: string }>;
}> {
const relays = options.relays || Array.from(this.relays.keys());
const results = {
success: [] as string[],
failed: [] as Array<{ relay: string; error: string }>
};
// Add event to cache first
this.addToCache(event);
// Publish to each relay
for (const url of relays) {
const relay = this.relays.get(url);
if (!relay) {
// Try to connect if not already connected
try {
await this.addRelay(url);
const newRelay = this.relays.get(url);
if (newRelay) {
try {
await newRelay.publish(event);
results.success.push(url);
} catch (error) {
results.failed.push({
relay: url,
error: error instanceof Error ? error.message : 'Unknown error'
});
}
}
} catch (error) {
results.failed.push({
relay: url,
error: error instanceof Error ? error.message : 'Failed to connect'
});
}
} else {
try {
await relay.publish(event);
results.success.push(url);
} catch (error) {
results.failed.push({
relay: url,
error: error instanceof Error ? error.message : 'Unknown error'
});
}
}
}
return results;
}
/**
* Subscribe to events
*/
subscribe(
filters: Filter[],
relays: string[],
onEvent: (event: NostrEvent, relay: string) => void,
onEose?: (relay: string) => void
): string {
const subId = `sub_${this.nextSubId++}_${Date.now()}`;
// Filter to only active relays
const activeRelays = relays.filter(url => this.relays.has(url));
for (const url of relays) {
// Skip if relay is not in pool (will try to reconnect below)
if (!this.relays.has(url)) {
// Try to connect if not already connected
this.addRelay(url).then(() => {
const newRelay = this.relays.get(url);
if (newRelay) {
this.setupSubscription(newRelay, url, subId, filters, onEvent, onEose);
}
}).catch((error) => {
console.debug(`Failed to connect to relay ${url}:`, error);
});
continue;
}
const relay = this.relays.get(url);
if (!relay) continue; // Double-check (shouldn't happen, but safety check)
// Try to subscribe, handle errors if relay is closed
try {
this.setupSubscription(relay, url, subId, filters, onEvent, onEose);
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);
if (errorMessage.includes('closed') || errorMessage.includes('SendingOnClosedConnection')) {
console.debug(`Relay ${url} is closed, removing from pool`);
this.relays.delete(url);
} else {
console.error(`Error subscribing to relay ${url}:`, error);
}
}
}
return subId;
}
/**
* Setup a subscription on a relay
*/
private setupSubscription(
relay: Relay,
url: string,
subId: string,
filters: Filter[],
onEvent: (event: NostrEvent, relay: string) => void,
onEose?: (relay: string) => void
): void {
// Check if relay is still in the pool (might have been removed due to close)
if (!this.relays.has(url)) {
console.warn(`Relay ${url} not in pool, skipping subscription`);
return;
}
// Wrap subscription in try-catch and handle both sync and async errors
try {
const client = this;
const sub = relay.subscribe(filters, {
onevent(event: NostrEvent) {
try {
// Check if relay is still in pool before processing
if (!client.relays.has(url)) return;
// Add to cache
client.addToCache(event);
// Call callback
onEvent(event, url);
} catch (err) {
console.error(`Error handling event from relay ${url}:`, err);
}
},
oneose() {
try {
// Check if relay is still in pool before processing
if (!client.relays.has(url)) return;
onEose?.(url);
} catch (err) {
console.error(`Error handling EOSE from relay ${url}:`, err);
}
}
});
// Wrap subscription in a promise to catch async errors
Promise.resolve(sub).catch((err) => {
const errorMessage = err instanceof Error ? err.message : String(err);
if (errorMessage.includes('closed') || errorMessage.includes('SendingOnClosedConnection')) {
console.warn(`Relay ${url} subscription error (closed connection), removing from pool`);
this.relays.delete(url);
// Clean up this subscription
this.subscriptions.delete(`${url}_${subId}`);
} else {
console.error(`Relay ${url} subscription error:`, err);
}
});
this.subscriptions.set(`${url}_${subId}`, { relay, sub });
} catch (error) {
// Handle any other errors gracefully
const errorMessage = error instanceof Error ? error.message : String(error);
if (errorMessage.includes('closed') || errorMessage.includes('SendingOnClosedConnection')) {
console.warn(`Relay ${url} connection is closed, removing from pool`);
this.relays.delete(url);
return;
} else {
console.error(`Error setting up subscription on relay ${url}:`, error);
return;
}
}
}
/**
* Unsubscribe
*/
unsubscribe(subId: string): void {
for (const [key, { sub }] of this.subscriptions.entries()) {
if (key.endsWith(`_${subId}`)) {
sub.close();
this.subscriptions.delete(key);
}
}
}
/**
* Fetch events
*/
async fetchEvents(
filters: Filter[],
relays: string[],
options?: { useCache?: boolean; cacheResults?: boolean; onUpdate?: (events: NostrEvent[]) => void; timeout?: number }
): Promise<NostrEvent[]> {
const { useCache = true, cacheResults = true, onUpdate } = options || {};
// Create a key for this fetch to prevent duplicate concurrent requests
const fetchKey = JSON.stringify({ filters, relays: relays.sort() });
// Check if there's already an active fetch for this combination
const activeFetch = this.activeFetches.get(fetchKey);
if (activeFetch) {
// Return the existing promise to prevent duplicate requests
return activeFetch;
}
// Query from cache first if enabled
if (useCache) {
try {
const cachedEvents = await this.getCachedEvents(filters);
if (cachedEvents.length > 0) {
// Return cached events immediately
// Don't call onUpdate here - only call it when fresh data arrives
// This prevents duplicate updates that cause feed jumping
// Fetch fresh data in background (only if cacheResults is true)
// Add a delay to prevent immediate background refresh that might cause rate limiting
if (cacheResults) {
setTimeout(() => {
// Use a different key for background refresh to allow it to run
const bgFetchKey = `${fetchKey}_bg_${Date.now()}`;
const bgPromise = this.fetchFromRelays(filters, relays, { cacheResults, onUpdate, timeout: options?.timeout });
this.activeFetches.set(bgFetchKey, bgPromise);
bgPromise.finally(() => {
this.activeFetches.delete(bgFetchKey);
}).catch((error) => {
console.error('Error fetching fresh events from relays:', error);
});
}, 1000); // Delay background refresh by 1 second to reduce concurrent requests
}
return cachedEvents;
}
} catch (error) {
console.error('Error loading from cache:', error);
}
}
// Fetch from relays
const fetchPromise = this.fetchFromRelays(filters, relays, { cacheResults, onUpdate, timeout: options?.timeout });
this.activeFetches.set(fetchKey, fetchPromise);
fetchPromise.finally(() => {
this.activeFetches.delete(fetchKey);
});
return fetchPromise;
}
/**
* Fetch events from relays - one request per relay with all filters, sent in parallel
*/
private async fetchFromRelays(
filters: Filter[],
relays: string[],
options: { cacheResults: boolean; onUpdate?: (events: NostrEvent[]) => void; timeout?: number }
): Promise<NostrEvent[]> {
const timeout = options.timeout || config.relayTimeout; // Default 10 seconds
const client = this;
// Filter to only connected relays
let availableRelays = relays.filter(url => this.relays.has(url));
if (availableRelays.length === 0) {
// Try to connect to relays if none are connected
await Promise.allSettled(relays.map(url => this.addRelay(url).catch(() => null)));
availableRelays = relays.filter(url => this.relays.has(url));
if (availableRelays.length === 0) {
return [];
}
}
// Create one subscription per relay with all filters, sent in parallel
const events: Map<string, NostrEvent> = new Map();
const relayPromises = availableRelays.map((relayUrl) => {
return new Promise<void>((resolve) => {
const relay = client.relays.get(relayUrl);
if (!relay) {
resolve();
return;
}
// Check if relay connection is still open, remove if closed
if (!client.checkAndCleanupRelay(relayUrl)) {
resolve();
return;
}
const subId = `sub_${client.nextSubId++}_${Date.now()}`;
let resolved = false;
let timeoutId: ReturnType<typeof setTimeout> | null = null;
const finish = () => {
if (resolved) return;
resolved = true;
if (timeoutId) clearTimeout(timeoutId);
client.unsubscribe(subId);
resolve();
};
try {
const sub = relay.subscribe(filters, {
onevent(event: NostrEvent) {
if (!client.relays.has(relayUrl)) return;
if (shouldHideEvent(event)) return;
events.set(event.id, event);
client.addToCache(event);
},
oneose() {
if (!resolved) {
finish();
}
}
});
client.subscriptions.set(`${relayUrl}_${subId}`, { relay, sub });
// Timeout after specified duration
timeoutId = setTimeout(() => {
if (!resolved) {
finish();
}
}, timeout);
} catch (error: any) {
// Handle errors during subscription creation
if (error && (error.message?.includes('closed') || error.message?.includes('SendingOnClosedConnection'))) {
// Relay closed, remove it
client.relays.delete(relayUrl);
} else {
console.warn(`Error subscribing to relay ${relayUrl}:`, error);
}
finish();
}
});
});
// Wait for all relay requests to complete (or timeout)
await Promise.allSettled(relayPromises);
const eventArray = Array.from(events.values());
const filtered = filterEvents(eventArray);
// Cache results in background
if (options.cacheResults && filtered.length > 0) {
cacheEvents(filtered).catch((error) => {
console.error('Error caching events:', error);
});
}
// Call onUpdate callback
if (options.onUpdate) {
options.onUpdate(filtered);
}
return filtered;
}
/**
* Get event by ID
*/
async getEventById(id: string, relays: string[]): Promise<NostrEvent | null> {
// Try IndexedDB cache first
try {
const dbEvent = await getEvent(id);
if (dbEvent) return dbEvent;
} catch (error) {
console.error('Error loading from IndexedDB:', error);
}
// Fetch from relays
const filters: Filter[] = [{ ids: [id] }];
const events = await this.fetchEvents(filters, relays, { useCache: false });
return events[0] || null;
}
/**
* Get events by filters (from cache only)
*/
async getByFilters(filters: Filter[]): Promise<NostrEvent[]> {
return this.getCachedEvents(filters);
}
/**
* Get config
*/
getConfig() {
return config;
}
/**
* Get connected relays
*/
getConnectedRelays(): string[] {
return Array.from(this.relays.keys());
}
/**
* Close all connections
*/
close(): void {
// Close all subscriptions
for (const { sub } of this.subscriptions.values()) {
sub.close();
}
this.subscriptions.clear();
// Close all relay connections
for (const relay of this.relays.values()) {
relay.close();
}
this.relays.clear();
this.initialized = false;
}
}
export const nostrClient = new NostrClient();