package nostr import ( "context" "fmt" "sync" "time" "gitcitadel-online/internal/logger" "github.com/nbd-wtf/go-nostr" ) // Client handles connections to Nostr relays with failover support using go-nostr's SimplePool type Client struct { pool *nostr.SimplePool relays []string feedsRelay string // Relay for feeds profileRelays []string // Relays for profiles contactRelays []string // Relays for contact form mu sync.RWMutex ctx context.Context requestSem chan struct{} // Semaphore to limit concurrent requests maxConcurrent int // Maximum concurrent requests } // NewClient creates a new Nostr client with relay configuration // Uses go-nostr's SimplePool for connection management and parallel queries // Limits concurrent requests to prevent overwhelming relays (default: 5 concurrent requests) func NewClient(feedsRelay string, profileRelays []string, contactRelays []string) *Client { ctx := context.Background() pool := nostr.NewSimplePool(ctx) // Build relay list: feeds relay first, then profile relays relays := []string{} if feedsRelay != "" { relays = append(relays, feedsRelay) } relays = append(relays, profileRelays...) maxConcurrent := 5 // Limit to 5 concurrent requests to avoid overwhelming relays return &Client{ pool: pool, relays: relays, feedsRelay: feedsRelay, profileRelays: profileRelays, contactRelays: contactRelays, ctx: ctx, requestSem: make(chan struct{}, maxConcurrent), maxConcurrent: maxConcurrent, } } // Connect connects to the relays (no-op, SimplePool handles connections lazily) func (c *Client) Connect(ctx context.Context) error { // SimplePool handles connections lazily when querying return nil } // ConnectToRelay connects to a single relay using SimplePool (exported for use by services) func (c *Client) ConnectToRelay(ctx context.Context, url string) (*nostr.Relay, error) { // Use SimplePool's EnsureRelay which handles connection pooling and reuse return c.pool.EnsureRelay(url) } // FetchEvent fetches a single event by querying all relays in parallel using SimplePool // Returns the newest event (highest created_at) if multiple events are found // Rate-limited to prevent overwhelming relays func (c *Client) FetchEvent(ctx context.Context, filter nostr.Filter) (*nostr.Event, error) { return c.FetchEventFromRelays(ctx, filter, c.relays) } // FetchEventFromRelays fetches a single event from specific relays func (c *Client) FetchEventFromRelays(ctx context.Context, filter nostr.Filter, relays []string) (*nostr.Event, error) { // Acquire semaphore to limit concurrent requests c.requestSem <- struct{}{} defer func() { <-c.requestSem }() // Create context with 30-second timeout queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() logger.WithFields(map[string]interface{}{ "relays": relays, "kinds": filter.Kinds, "authors": filter.Authors, "ids": filter.IDs, "tags": filter.Tags, }).Debug("Querying relays using SimplePool") // Use SimplePool's SubManyEose to query specified relays in parallel // It automatically handles connection pooling, failover, and deduplication eventChan := c.pool.SubManyEose(queryCtx, relays, nostr.Filters{filter}) // Collect all events from all relays var allEvents []*nostr.Event for incomingEvent := range eventChan { if incomingEvent.Event != nil { allEvents = append(allEvents, incomingEvent.Event) logger.WithFields(map[string]interface{}{ "relay": incomingEvent.Relay.URL, "event_id": incomingEvent.Event.ID, "created_at": incomingEvent.Event.CreatedAt, }).Debug("Received event from relay") } } if len(allEvents) == 0 { return nil, fmt.Errorf("event not found from any relay") } // Find the newest event (highest created_at) newest := allEvents[0] for _, event := range allEvents[1:] { if event.CreatedAt > newest.CreatedAt { newest = event } } logger.WithFields(map[string]interface{}{ "created_at": newest.CreatedAt, "total": len(allEvents), }).Debug("Selected newest event from all relays") return newest, nil } // FetchEvents fetches multiple events by querying all relays in parallel using SimplePool // Returns deduplicated events, keeping the newest version of each event // Rate-limited to prevent overwhelming relays func (c *Client) FetchEvents(ctx context.Context, filter nostr.Filter) ([]*nostr.Event, error) { return c.FetchEventsFromRelays(ctx, filter, c.relays) } // FetchEventsFromRelays fetches multiple events from specific relays func (c *Client) FetchEventsFromRelays(ctx context.Context, filter nostr.Filter, relays []string) ([]*nostr.Event, error) { return c.FetchEventsBatchFromRelays(ctx, []nostr.Filter{filter}, relays) } // FetchEventsBatch fetches multiple events using multiple filters in a single batched query // This is more efficient than calling FetchEvents multiple times // Returns deduplicated events, keeping the newest version of each event // Rate-limited to prevent overwhelming relays func (c *Client) FetchEventsBatch(ctx context.Context, filters []nostr.Filter) ([]*nostr.Event, error) { return c.FetchEventsBatchFromRelays(ctx, filters, c.relays) } // FetchEventsBatchFromRelays fetches multiple events from specific relays using multiple filters func (c *Client) FetchEventsBatchFromRelays(ctx context.Context, filters []nostr.Filter, relays []string) ([]*nostr.Event, error) { if len(filters) == 0 { return nil, fmt.Errorf("no filters provided") } // Acquire semaphore to limit concurrent requests c.requestSem <- struct{}{} defer func() { <-c.requestSem }() // Create context with 30-second timeout queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() logger.WithFields(map[string]interface{}{ "relays": relays, "filters": len(filters), }).Debug("Querying relays using SimplePool with batched filters") // Use SimplePool's SubManyEose to query specified relays in parallel with all filters // It automatically handles connection pooling, failover, and deduplication eventChan := c.pool.SubManyEose(queryCtx, relays, nostr.Filters(filters)) // Collect all events from all relays, deduplicating by ID and keeping newest eventMap := make(map[string]*nostr.Event) for incomingEvent := range eventChan { if incomingEvent.Event != nil { existing, exists := eventMap[incomingEvent.Event.ID] if !exists || incomingEvent.Event.CreatedAt > existing.CreatedAt { eventMap[incomingEvent.Event.ID] = incomingEvent.Event logger.WithFields(map[string]interface{}{ "relay": incomingEvent.Relay.URL, "event_id": incomingEvent.Event.ID, "created_at": incomingEvent.Event.CreatedAt, }).Debug("Received event from relay") } } } if len(eventMap) == 0 { return nil, fmt.Errorf("no events found from any relay") } // Convert map to slice events := make([]*nostr.Event, 0, len(eventMap)) for _, event := range eventMap { events = append(events, event) } logger.WithField("events", len(events)).Debug("Returning unique events from all relays") return events, nil } // FetchEventByID fetches an event by its ID func (c *Client) FetchEventByID(ctx context.Context, eventID string) (*nostr.Event, error) { filter := nostr.Filter{ IDs: []string{eventID}, } return c.FetchEvent(ctx, filter) } // FetchEventsByKind fetches events of a specific kind func (c *Client) FetchEventsByKind(ctx context.Context, kind int, limit int) ([]*nostr.Event, error) { filter := nostr.Filter{ Kinds: []int{kind}, Limit: limit, } return c.FetchEvents(ctx, filter) } // Close closes all relay connections in the pool func (c *Client) Close() { // SimplePool manages connections, but we can close individual relays if needed // The pool will handle cleanup when context is cancelled } // GetRelays returns all configured relay URLs func (c *Client) GetRelays() []string { return c.relays } // GetPrimaryRelay returns the feeds relay for main event fetching func (c *Client) GetPrimaryRelay() string { return c.feedsRelay } // GetProfileRelays returns relays for profile fetching func (c *Client) GetProfileRelays() []string { return c.profileRelays } // GetContactRelays returns the relays for contact form submissions func (c *Client) GetContactRelays() []string { return c.contactRelays } // GetPool returns the underlying SimplePool (for services that need direct access) func (c *Client) GetPool() *nostr.SimplePool { return c.pool } // HealthCheck performs a health check on the relays func (c *Client) HealthCheck(ctx context.Context, timeout time.Duration) error { ctx, cancel := context.WithTimeout(ctx, timeout) defer cancel() // Try to fetch a recent event to test connectivity // Note: Using kind 1 for health check (this is a standard kind, not configurable) filter := nostr.Filter{ Kinds: []int{KindNote}, // kind 1 (notes) for testing Limit: 1, } _, err := c.FetchEvents(ctx, filter) return err } // FetchDeletionEvents fetches kind 5 deletion events for the given authors // Returns a map of deleted event IDs (event ID -> deletion event) func (c *Client) FetchDeletionEvents(ctx context.Context, authors []string) (map[string]*nostr.Event, error) { if len(authors) == 0 { return make(map[string]*nostr.Event), nil } // Deduplicate authors authorSet := make(map[string]bool) uniqueAuthors := make([]string, 0, len(authors)) for _, author := range authors { if author != "" && !authorSet[author] { authorSet[author] = true uniqueAuthors = append(uniqueAuthors, author) } } if len(uniqueAuthors) == 0 { return make(map[string]*nostr.Event), nil } // Fetch kind 5 deletion events from profile relays profileRelays := c.GetProfileRelays() if len(profileRelays) == 0 { return nil, fmt.Errorf("profile relays not configured") } filter := nostr.Filter{ Kinds: []int{KindDelete}, Authors: uniqueAuthors, // No limit - fetch all deletion events } logger.WithFields(map[string]interface{}{ "authors": len(uniqueAuthors), "relays": len(profileRelays), }).Debug("Fetching deletion events from profile relays") deletionEvents, err := c.FetchEventsFromRelays(ctx, filter, profileRelays) if err != nil { return nil, fmt.Errorf("failed to fetch deletion events: %w", err) } // Parse deletion events - extract event IDs from "e" tags deletedEventIDs := make(map[string]*nostr.Event) for _, deletionEvent := range deletionEvents { // Kind 5 events have "e" tags with the event IDs they're deleting for _, tag := range deletionEvent.Tags { if len(tag) > 0 && tag[0] == "e" && len(tag) > 1 { eventID := tag[1] // Keep the newest deletion event if multiple deletions exist existing, exists := deletedEventIDs[eventID] if !exists || deletionEvent.CreatedAt > existing.CreatedAt { deletedEventIDs[eventID] = deletionEvent } } } } logger.WithFields(map[string]interface{}{ "deletion_events": len(deletionEvents), "deleted_ids": len(deletedEventIDs), }).Debug("Parsed deletion events") return deletedEventIDs, nil } // FilterDeletedEvents removes events that have been deleted (kind 5) // Returns the filtered list of events func FilterDeletedEvents(events []*nostr.Event, deletedEventIDs map[string]*nostr.Event) []*nostr.Event { if len(deletedEventIDs) == 0 { return events } filtered := make([]*nostr.Event, 0, len(events)) for _, event := range events { if _, deleted := deletedEventIDs[event.ID]; !deleted { filtered = append(filtered, event) } else { logger.WithFields(map[string]interface{}{ "event_id": event.ID, "kind": event.Kind, }).Debug("Filtering out deleted event") } } logger.WithFields(map[string]interface{}{ "original": len(events), "filtered": len(filtered), "removed": len(events) - len(filtered), }).Debug("Filtered deleted events") return filtered } // ProcessEventsWithCacheResult contains the processed events and their profiles type ProcessEventsWithCacheResult struct { Events []*nostr.Event Profiles map[string]*Profile } // ProcessEventsWithCache is the standard process for fetching and processing events // for kinds 1, 30023, 30040, 30041, 30818. It: // 1. If indexEventID is provided, fetches the index event and only queries for events referenced in it // 2. Fetches 2x the display limit (or all index-referenced events if indexEventID is provided) // 3. Merges with existing cache map // 4. Deduplicates (keeping newest) // 5. Fetches deletion events // 6. Removes deleted events // 7. Sorts newest-first // 8. Applies display limit // 9. Fetches profiles for displayed events // 10. Returns final events and profiles ready for display func (c *Client) ProcessEventsWithCache( ctx context.Context, kind int, displayLimit int, existingEvents map[string]*nostr.Event, // Existing cache map (event ID -> event) relayURL string, // Relay to fetch from (empty = use primary relay) indexEventID string, // Optional: index event ID to fetch and filter by indexKind int, // Optional: kind of the index event (required if indexEventID is provided) ) (*ProcessEventsWithCacheResult, error) { // Calculate fetch limit (2x display limit, minimum 50) fetchLimit := displayLimit * 2 if fetchLimit < 50 { fetchLimit = 50 } // Determine which relay to use relays := []string{} if relayURL != "" { relays = []string{relayURL} } else { primaryRelay := c.GetPrimaryRelay() if primaryRelay == "" { return nil, fmt.Errorf("primary relay not configured") } relays = []string{primaryRelay} } var fetchedEvents []*nostr.Event var err error var indexItems []IndexItem // Step 1: If indexEventID is provided, fetch the index event and extract referenced items if indexEventID != "" { // Fetch the index event indexFilter := nostr.Filter{ IDs: []string{indexEventID}, } indexEvents, err := c.FetchEventsFromRelays(ctx, indexFilter, relays) if err != nil { return nil, fmt.Errorf("failed to fetch index event: %w", err) } if len(indexEvents) == 0 { return nil, fmt.Errorf("index event not found: %s", indexEventID) } // Parse the index event index, err := ParseIndexEvent(indexEvents[0], indexKind) if err != nil { return nil, fmt.Errorf("failed to parse index event: %w", err) } // Extract items of the target kind from the index indexItems = make([]IndexItem, 0) for _, item := range index.Items { if item.Kind == kind { indexItems = append(indexItems, item) } } if len(indexItems) == 0 { // No items of this kind in the index, return empty result return &ProcessEventsWithCacheResult{ Events: []*nostr.Event{}, Profiles: make(map[string]*Profile), }, nil } logger.WithFields(map[string]interface{}{ "kind": kind, "index_items": len(indexItems), "index_event_id": indexEventID, }).Debug("Fetched index event, querying referenced events") // Build filters for events referenced in the index // Group by author to create efficient filters authorDTags := make(map[string][]string) // author -> list of d tags for _, item := range indexItems { if item.Pubkey != "" && item.DTag != "" { authorDTags[item.Pubkey] = append(authorDTags[item.Pubkey], item.DTag) } } // Fetch events for each author (query by kind, author, and d tags) allFetchedEvents := make([]*nostr.Event, 0) for author, dTags := range authorDTags { // Query for events by this author with any of the d tags // Note: Nostr filters don't support OR for d tags, so we query each d tag separately // or query all events by author and filter locally filter := nostr.Filter{ Kinds: []int{kind}, Authors: []string{author}, // We'll filter by d tags locally after fetching } authorEvents, err := c.FetchEventsFromRelays(ctx, filter, relays) if err != nil { logger.WithFields(map[string]interface{}{ "author": author, "error": err, }).Warn("Failed to fetch events for author, continuing") continue } // Filter by d tags locally dTagSet := make(map[string]bool) for _, dTag := range dTags { dTagSet[dTag] = true } for _, event := range authorEvents { // Extract d tag from event var eventDTag string for _, tag := range event.Tags { if len(tag) > 0 && tag[0] == "d" && len(tag) > 1 { eventDTag = tag[1] break } } if eventDTag != "" && dTagSet[eventDTag] { allFetchedEvents = append(allFetchedEvents, event) } } } fetchedEvents = allFetchedEvents logger.WithFields(map[string]interface{}{ "kind": kind, "fetched": len(fetchedEvents), "index_items": len(indexItems), "index_event_id": indexEventID, }).Debug("Fetched events from index") } else { // Step 1: Fetch 2x display limit (standard process) filter := nostr.Filter{ Kinds: []int{kind}, Limit: fetchLimit, } logger.WithFields(map[string]interface{}{ "kind": kind, "fetch_limit": fetchLimit, "display_limit": displayLimit, "relay": relays[0], }).Debug("Fetching events for processing") // Use client's FetchEventsFromRelays (works with both specific relay and primary) fetchedEvents, err = c.FetchEventsFromRelays(ctx, filter, relays) if err != nil { return nil, fmt.Errorf("failed to fetch events: %w", err) } } logger.WithFields(map[string]interface{}{ "kind": kind, "fetched": len(fetchedEvents), }).Debug("Fetched events from relay") // Step 2: Merge with existing cache map (existing takes precedence if same ID) eventMap := make(map[string]*nostr.Event) // Add existing events first for id, event := range existingEvents { eventMap[id] = event } // Add/update with fetched events (keep newest if duplicate) for _, event := range fetchedEvents { existing, exists := eventMap[event.ID] if !exists || event.CreatedAt > existing.CreatedAt { eventMap[event.ID] = event } } logger.WithFields(map[string]interface{}{ "existing": len(existingEvents), "fetched": len(fetchedEvents), "merged": len(eventMap), }).Debug("Merged events with cache") // Step 3: Convert map to slice (already deduplicated) allEvents := make([]*nostr.Event, 0, len(eventMap)) for _, event := range eventMap { allEvents = append(allEvents, event) } // Step 4: Fetch deletion events for all authors authors := make([]string, 0, len(allEvents)) authorSet := make(map[string]bool) for _, event := range allEvents { if !authorSet[event.PubKey] { authors = append(authors, event.PubKey) authorSet[event.PubKey] = true } } deletedEventIDs, err := c.FetchDeletionEvents(ctx, authors) if err != nil { logger.WithField("error", err).Warn("Failed to fetch deletion events, continuing without filtering") deletedEventIDs = make(map[string]*nostr.Event) } // Step 5: Remove deleted events allEvents = FilterDeletedEvents(allEvents, deletedEventIDs) // Step 6: Sort newest-first (by created_at descending) for i := 0; i < len(allEvents)-1; i++ { for j := i + 1; j < len(allEvents); j++ { if allEvents[i].CreatedAt < allEvents[j].CreatedAt { allEvents[i], allEvents[j] = allEvents[j], allEvents[i] } } } logger.WithFields(map[string]interface{}{ "after_deletion": len(allEvents), "sorted": true, }).Debug("Removed deletions and sorted events") // Step 7: Apply display limit if displayLimit > 0 && len(allEvents) > displayLimit { allEvents = allEvents[:displayLimit] logger.WithFields(map[string]interface{}{ "limited": displayLimit, }).Debug("Applied display limit") } // Ensure we always show the display limit if we have enough events if len(allEvents) < displayLimit && len(allEvents) > 0 { logger.WithFields(map[string]interface{}{ "available": len(allEvents), "requested": displayLimit, }).Debug("Fewer events available than display limit") } logger.WithFields(map[string]interface{}{ "kind": kind, "final_count": len(allEvents), "display_limit": displayLimit, }).Info("Processed events with cache") // Step 8: Fetch profiles for displayed events profileAuthors := make([]string, 0, len(allEvents)) profileAuthorSet := make(map[string]bool) for _, event := range allEvents { if !profileAuthorSet[event.PubKey] { profileAuthors = append(profileAuthors, event.PubKey) profileAuthorSet[event.PubKey] = true } } profiles, err := c.FetchProfilesBatch(ctx, profileAuthors) if err != nil { logger.WithField("error", err).Warn("Failed to fetch profiles, continuing without profiles") profiles = make(map[string]*Profile) } logger.WithFields(map[string]interface{}{ "profiles_fetched": len(profiles), "authors": len(authors), }).Debug("Fetched profiles for displayed events") return &ProcessEventsWithCacheResult{ Events: allEvents, Profiles: profiles, }, nil }