You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

621 lines
20 KiB

package nostr
import (
"context"
"fmt"
"time"
"gitcitadel-online/internal/logger"
"github.com/nbd-wtf/go-nostr"
)
// Client handles connections to Nostr relays with failover support using go-nostr's SimplePool
type Client struct {
pool *nostr.SimplePool
relays []string
feedsRelay string // Relay for feeds
profileRelays []string // Relays for profiles
contactRelays []string // Relays for contact form
requestSem chan struct{} // Semaphore to limit concurrent requests
maxConcurrent int // Maximum concurrent requests
}
// NewClient creates a new Nostr client with relay configuration
// Uses go-nostr's SimplePool for connection management and parallel queries
// Limits concurrent requests to prevent overwhelming relays (default: 5 concurrent requests)
func NewClient(feedsRelay string, profileRelays []string, contactRelays []string) *Client {
ctx := context.Background()
pool := nostr.NewSimplePool(ctx)
// Build relay list: feeds relay first, then profile relays
relays := []string{}
if feedsRelay != "" {
relays = append(relays, feedsRelay)
}
relays = append(relays, profileRelays...)
maxConcurrent := 5 // Limit to 5 concurrent requests to avoid overwhelming relays
return &Client{
pool: pool,
relays: relays,
feedsRelay: feedsRelay,
profileRelays: profileRelays,
contactRelays: contactRelays,
requestSem: make(chan struct{}, maxConcurrent),
maxConcurrent: maxConcurrent,
}
}
// Connect connects to the relays (no-op, SimplePool handles connections lazily)
func (c *Client) Connect(ctx context.Context) error {
// SimplePool handles connections lazily when querying
return nil
}
// ConnectToRelay connects to a single relay using SimplePool (exported for use by services)
func (c *Client) ConnectToRelay(ctx context.Context, url string) (*nostr.Relay, error) {
// Use SimplePool's EnsureRelay which handles connection pooling and reuse
return c.pool.EnsureRelay(url)
}
// FetchEvent fetches a single event by querying all relays in parallel using SimplePool
// Returns the newest event (highest created_at) if multiple events are found
// Rate-limited to prevent overwhelming relays
func (c *Client) FetchEvent(ctx context.Context, filter nostr.Filter) (*nostr.Event, error) {
return c.FetchEventFromRelays(ctx, filter, c.relays)
}
// FetchEventFromRelays fetches a single event from specific relays
func (c *Client) FetchEventFromRelays(ctx context.Context, filter nostr.Filter, relays []string) (*nostr.Event, error) {
// Acquire semaphore to limit concurrent requests
c.requestSem <- struct{}{}
defer func() { <-c.requestSem }()
// Create context with 30-second timeout
queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
logger.WithFields(map[string]interface{}{
"relays": relays,
"kinds": filter.Kinds,
"authors": filter.Authors,
"ids": filter.IDs,
"tags": filter.Tags,
}).Debug("Querying relays using SimplePool")
// Use SimplePool's SubManyEose to query specified relays in parallel
// It automatically handles connection pooling, failover, and deduplication
eventChan := c.pool.SubManyEose(queryCtx, relays, nostr.Filters{filter})
// Collect all events from all relays
var allEvents []*nostr.Event
for incomingEvent := range eventChan {
if incomingEvent.Event != nil {
allEvents = append(allEvents, incomingEvent.Event)
logger.WithFields(map[string]interface{}{
"relay": incomingEvent.Relay.URL,
"event_id": incomingEvent.Event.ID,
"created_at": incomingEvent.Event.CreatedAt,
}).Debug("Received event from relay")
}
}
if len(allEvents) == 0 {
return nil, fmt.Errorf("event not found from any relay")
}
// Find the newest event (highest created_at)
newest := allEvents[0]
for _, event := range allEvents[1:] {
if event.CreatedAt > newest.CreatedAt {
newest = event
}
}
logger.WithFields(map[string]interface{}{
"created_at": newest.CreatedAt,
"total": len(allEvents),
}).Debug("Selected newest event from all relays")
return newest, nil
}
// FetchEvents fetches multiple events by querying all relays in parallel using SimplePool
// Returns deduplicated events, keeping the newest version of each event
// Rate-limited to prevent overwhelming relays
func (c *Client) FetchEvents(ctx context.Context, filter nostr.Filter) ([]*nostr.Event, error) {
return c.FetchEventsFromRelays(ctx, filter, c.relays)
}
// FetchEventsFromRelays fetches multiple events from specific relays
func (c *Client) FetchEventsFromRelays(ctx context.Context, filter nostr.Filter, relays []string) ([]*nostr.Event, error) {
return c.FetchEventsBatchFromRelays(ctx, []nostr.Filter{filter}, relays)
}
// FetchEventsBatch fetches multiple events using multiple filters in a single batched query
// This is more efficient than calling FetchEvents multiple times
// Returns deduplicated events, keeping the newest version of each event
// Rate-limited to prevent overwhelming relays
func (c *Client) FetchEventsBatch(ctx context.Context, filters []nostr.Filter) ([]*nostr.Event, error) {
return c.FetchEventsBatchFromRelays(ctx, filters, c.relays)
}
// FetchEventsBatchFromRelays fetches multiple events from specific relays using multiple filters
func (c *Client) FetchEventsBatchFromRelays(ctx context.Context, filters []nostr.Filter, relays []string) ([]*nostr.Event, error) {
if len(filters) == 0 {
return nil, fmt.Errorf("no filters provided")
}
// Acquire semaphore to limit concurrent requests
c.requestSem <- struct{}{}
defer func() { <-c.requestSem }()
// Create context with 30-second timeout
queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
logger.WithFields(map[string]interface{}{
"relays": relays,
"filters": len(filters),
}).Debug("Querying relays using SimplePool with batched filters")
// Use SimplePool's SubManyEose to query specified relays in parallel with all filters
// It automatically handles connection pooling, failover, and deduplication
eventChan := c.pool.SubManyEose(queryCtx, relays, nostr.Filters(filters))
// Collect all events from all relays, deduplicating by ID and keeping newest
eventMap := make(map[string]*nostr.Event)
for incomingEvent := range eventChan {
if incomingEvent.Event != nil {
existing, exists := eventMap[incomingEvent.Event.ID]
if !exists || incomingEvent.Event.CreatedAt > existing.CreatedAt {
eventMap[incomingEvent.Event.ID] = incomingEvent.Event
logger.WithFields(map[string]interface{}{
"relay": incomingEvent.Relay.URL,
"event_id": incomingEvent.Event.ID,
"created_at": incomingEvent.Event.CreatedAt,
}).Debug("Received event from relay")
}
}
}
if len(eventMap) == 0 {
return nil, fmt.Errorf("no events found from any relay")
}
// Convert map to slice
events := make([]*nostr.Event, 0, len(eventMap))
for _, event := range eventMap {
events = append(events, event)
}
logger.WithField("events", len(events)).Debug("Returning unique events from all relays")
return events, nil
}
// Close closes all relay connections in the pool
func (c *Client) Close() {
// SimplePool manages connections, but we can close individual relays if needed
// The pool will handle cleanup when context is cancelled
}
// GetRelays returns all configured relay URLs
func (c *Client) GetRelays() []string {
return c.relays
}
// GetPrimaryRelay returns the feeds relay for main event fetching
func (c *Client) GetPrimaryRelay() string {
return c.feedsRelay
}
// GetProfileRelays returns relays for profile fetching
func (c *Client) GetProfileRelays() []string {
return c.profileRelays
}
// GetContactRelays returns the relays for contact form submissions
func (c *Client) GetContactRelays() []string {
return c.contactRelays
}
// GetPool returns the underlying SimplePool (for services that need direct access)
func (c *Client) GetPool() *nostr.SimplePool {
return c.pool
}
// HealthCheck performs a health check on the relays
func (c *Client) HealthCheck(ctx context.Context, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// Try to fetch a recent event to test connectivity
// Note: Using kind 1 for health check (this is a standard kind, not configurable)
filter := nostr.Filter{
Kinds: []int{KindNote}, // kind 1 (notes) for testing
Limit: 1,
}
_, err := c.FetchEvents(ctx, filter)
return err
}
// FetchDeletionEvents fetches kind 5 deletion events for the given authors
// Returns a map of deleted event IDs (event ID -> deletion event)
func (c *Client) FetchDeletionEvents(ctx context.Context, authors []string) (map[string]*nostr.Event, error) {
if len(authors) == 0 {
return make(map[string]*nostr.Event), nil
}
// Deduplicate authors
authorSet := make(map[string]bool)
uniqueAuthors := make([]string, 0, len(authors))
for _, author := range authors {
if author != "" && !authorSet[author] {
authorSet[author] = true
uniqueAuthors = append(uniqueAuthors, author)
}
}
if len(uniqueAuthors) == 0 {
return make(map[string]*nostr.Event), nil
}
// Fetch kind 5 deletion events from profile relays
profileRelays := c.GetProfileRelays()
if len(profileRelays) == 0 {
return nil, fmt.Errorf("profile relays not configured")
}
filter := nostr.Filter{
Kinds: []int{KindDelete},
Authors: uniqueAuthors,
// No limit - fetch all deletion events
}
logger.WithFields(map[string]interface{}{
"authors": len(uniqueAuthors),
"relays": len(profileRelays),
}).Debug("Fetching deletion events from profile relays")
deletionEvents, err := c.FetchEventsFromRelays(ctx, filter, profileRelays)
if err != nil {
return nil, fmt.Errorf("failed to fetch deletion events: %w", err)
}
// Parse deletion events - extract event IDs from "e" tags
deletedEventIDs := make(map[string]*nostr.Event)
for _, deletionEvent := range deletionEvents {
// Kind 5 events have "e" tags with the event IDs they're deleting
for _, tag := range deletionEvent.Tags {
if len(tag) > 0 && tag[0] == "e" && len(tag) > 1 {
eventID := tag[1]
// Keep the newest deletion event if multiple deletions exist
existing, exists := deletedEventIDs[eventID]
if !exists || deletionEvent.CreatedAt > existing.CreatedAt {
deletedEventIDs[eventID] = deletionEvent
}
}
}
}
logger.WithFields(map[string]interface{}{
"deletion_events": len(deletionEvents),
"deleted_ids": len(deletedEventIDs),
}).Debug("Parsed deletion events")
return deletedEventIDs, nil
}
// FilterDeletedEvents removes events that have been deleted (kind 5)
// Returns the filtered list of events
func FilterDeletedEvents(events []*nostr.Event, deletedEventIDs map[string]*nostr.Event) []*nostr.Event {
if len(deletedEventIDs) == 0 {
return events
}
filtered := make([]*nostr.Event, 0, len(events))
for _, event := range events {
if _, deleted := deletedEventIDs[event.ID]; !deleted {
filtered = append(filtered, event)
} else {
logger.WithFields(map[string]interface{}{
"event_id": event.ID,
"kind": event.Kind,
}).Debug("Filtering out deleted event")
}
}
logger.WithFields(map[string]interface{}{
"original": len(events),
"filtered": len(filtered),
"removed": len(events) - len(filtered),
}).Debug("Filtered deleted events")
return filtered
}
// ProcessEventsWithCacheResult contains the processed events and their profiles
type ProcessEventsWithCacheResult struct {
Events []*nostr.Event
Profiles map[string]*Profile
}
// ProcessEventsWithCache is the standard process for fetching and processing events
// for kinds 1, 30023, 30040, 30041, 30818. It:
// 1. If indexEventID is provided, fetches the index event and only queries for events referenced in it
// 2. Fetches 2x the display limit (or all index-referenced events if indexEventID is provided)
// 3. Merges with existing cache map
// 4. Deduplicates (keeping newest)
// 5. Fetches deletion events
// 6. Removes deleted events
// 7. Sorts newest-first
// 8. Applies display limit
// 9. Fetches profiles for displayed events
// 10. Returns final events and profiles ready for display
func (c *Client) ProcessEventsWithCache(
ctx context.Context,
kind int,
displayLimit int,
existingEvents map[string]*nostr.Event, // Existing cache map (event ID -> event)
relayURL string, // Relay to fetch from (empty = use primary relay)
indexEventID string, // Optional: index event ID to fetch and filter by
indexKind int, // Optional: kind of the index event (required if indexEventID is provided)
) (*ProcessEventsWithCacheResult, error) {
// Calculate fetch limit (2x display limit, minimum 50)
fetchLimit := displayLimit * 2
if fetchLimit < 50 {
fetchLimit = 50
}
// Determine which relay to use
relays := []string{}
if relayURL != "" {
relays = []string{relayURL}
} else {
primaryRelay := c.GetPrimaryRelay()
if primaryRelay == "" {
return nil, fmt.Errorf("primary relay not configured")
}
relays = []string{primaryRelay}
}
var fetchedEvents []*nostr.Event
var err error
var indexItems []IndexItem
// Step 1: If indexEventID is provided, fetch the index event and extract referenced items
if indexEventID != "" {
// Fetch the index event
indexFilter := nostr.Filter{
IDs: []string{indexEventID},
}
indexEvents, err := c.FetchEventsFromRelays(ctx, indexFilter, relays)
if err != nil {
return nil, fmt.Errorf("failed to fetch index event: %w", err)
}
if len(indexEvents) == 0 {
return nil, fmt.Errorf("index event not found: %s", indexEventID)
}
// Parse the index event
index, err := ParseIndexEvent(indexEvents[0], indexKind)
if err != nil {
return nil, fmt.Errorf("failed to parse index event: %w", err)
}
// Extract items of the target kind from the index
indexItems = make([]IndexItem, 0)
for _, item := range index.Items {
if item.Kind == kind {
indexItems = append(indexItems, item)
}
}
if len(indexItems) == 0 {
// No items of this kind in the index, return empty result
return &ProcessEventsWithCacheResult{
Events: []*nostr.Event{},
Profiles: make(map[string]*Profile),
}, nil
}
logger.WithFields(map[string]interface{}{
"kind": kind,
"index_items": len(indexItems),
"index_event_id": indexEventID,
}).Debug("Fetched index event, querying referenced events")
// Build filters for events referenced in the index
// Group by author to create efficient filters
authorDTags := make(map[string][]string) // author -> list of d tags
for _, item := range indexItems {
if item.Pubkey != "" && item.DTag != "" {
authorDTags[item.Pubkey] = append(authorDTags[item.Pubkey], item.DTag)
}
}
// Fetch events for each author (query by kind, author, and d tags)
allFetchedEvents := make([]*nostr.Event, 0)
for author, dTags := range authorDTags {
// Query for events by this author with any of the d tags
// Note: Nostr filters don't support OR for d tags, so we query each d tag separately
// or query all events by author and filter locally
filter := nostr.Filter{
Kinds: []int{kind},
Authors: []string{author},
// We'll filter by d tags locally after fetching
}
authorEvents, err := c.FetchEventsFromRelays(ctx, filter, relays)
if err != nil {
logger.WithFields(map[string]interface{}{
"author": author,
"error": err,
}).Warn("Failed to fetch events for author, continuing")
continue
}
// Filter by d tags locally
dTagSet := make(map[string]bool)
for _, dTag := range dTags {
dTagSet[dTag] = true
}
for _, event := range authorEvents {
// Extract d tag from event
var eventDTag string
for _, tag := range event.Tags {
if len(tag) > 0 && tag[0] == "d" && len(tag) > 1 {
eventDTag = tag[1]
break
}
}
if eventDTag != "" && dTagSet[eventDTag] {
allFetchedEvents = append(allFetchedEvents, event)
}
}
}
fetchedEvents = allFetchedEvents
logger.WithFields(map[string]interface{}{
"kind": kind,
"fetched": len(fetchedEvents),
"index_items": len(indexItems),
"index_event_id": indexEventID,
}).Debug("Fetched events from index")
} else {
// Step 1: Fetch 2x display limit (standard process)
filter := nostr.Filter{
Kinds: []int{kind},
Limit: fetchLimit,
}
logger.WithFields(map[string]interface{}{
"kind": kind,
"fetch_limit": fetchLimit,
"display_limit": displayLimit,
"relay": relays[0],
}).Debug("Fetching events for processing")
// Use client's FetchEventsFromRelays (works with both specific relay and primary)
fetchedEvents, err = c.FetchEventsFromRelays(ctx, filter, relays)
if err != nil {
return nil, fmt.Errorf("failed to fetch events: %w", err)
}
}
logger.WithFields(map[string]interface{}{
"kind": kind,
"fetched": len(fetchedEvents),
}).Debug("Fetched events from relay")
// Step 2: Merge with existing cache map (existing takes precedence if same ID)
eventMap := make(map[string]*nostr.Event)
// Add existing events first
for id, event := range existingEvents {
eventMap[id] = event
}
// Add/update with fetched events (keep newest if duplicate)
for _, event := range fetchedEvents {
existing, exists := eventMap[event.ID]
if !exists || event.CreatedAt > existing.CreatedAt {
eventMap[event.ID] = event
}
}
logger.WithFields(map[string]interface{}{
"existing": len(existingEvents),
"fetched": len(fetchedEvents),
"merged": len(eventMap),
}).Debug("Merged events with cache")
// Step 3: Convert map to slice (already deduplicated)
allEvents := make([]*nostr.Event, 0, len(eventMap))
for _, event := range eventMap {
allEvents = append(allEvents, event)
}
// Step 4: Fetch deletion events for all authors
authors := make([]string, 0, len(allEvents))
authorSet := make(map[string]bool)
for _, event := range allEvents {
if !authorSet[event.PubKey] {
authors = append(authors, event.PubKey)
authorSet[event.PubKey] = true
}
}
deletedEventIDs, err := c.FetchDeletionEvents(ctx, authors)
if err != nil {
logger.WithField("error", err).Warn("Failed to fetch deletion events, continuing without filtering")
deletedEventIDs = make(map[string]*nostr.Event)
}
// Step 5: Remove deleted events
allEvents = FilterDeletedEvents(allEvents, deletedEventIDs)
// Step 6: Sort newest-first (by created_at descending)
for i := 0; i < len(allEvents)-1; i++ {
for j := i + 1; j < len(allEvents); j++ {
if allEvents[i].CreatedAt < allEvents[j].CreatedAt {
allEvents[i], allEvents[j] = allEvents[j], allEvents[i]
}
}
}
logger.WithFields(map[string]interface{}{
"after_deletion": len(allEvents),
"sorted": true,
}).Debug("Removed deletions and sorted events")
// Step 7: Apply display limit
if displayLimit > 0 && len(allEvents) > displayLimit {
allEvents = allEvents[:displayLimit]
logger.WithFields(map[string]interface{}{
"limited": displayLimit,
}).Debug("Applied display limit")
}
// Ensure we always show the display limit if we have enough events
if len(allEvents) < displayLimit && len(allEvents) > 0 {
logger.WithFields(map[string]interface{}{
"available": len(allEvents),
"requested": displayLimit,
}).Debug("Fewer events available than display limit")
}
logger.WithFields(map[string]interface{}{
"kind": kind,
"final_count": len(allEvents),
"display_limit": displayLimit,
}).Info("Processed events with cache")
// Step 8: Fetch profiles for displayed events
profileAuthors := make([]string, 0, len(allEvents))
profileAuthorSet := make(map[string]bool)
for _, event := range allEvents {
if !profileAuthorSet[event.PubKey] {
profileAuthors = append(profileAuthors, event.PubKey)
profileAuthorSet[event.PubKey] = true
}
}
profiles, err := c.FetchProfilesBatch(ctx, profileAuthors)
if err != nil {
logger.WithField("error", err).Warn("Failed to fetch profiles, continuing without profiles")
profiles = make(map[string]*Profile)
}
logger.WithFields(map[string]interface{}{
"profiles_fetched": len(profiles),
"authors": len(authors),
}).Debug("Fetched profiles for displayed events")
return &ProcessEventsWithCacheResult{
Events: allEvents,
Profiles: profiles,
}, nil
}