You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
230 lines
7.5 KiB
230 lines
7.5 KiB
package nostr |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"sync" |
|
"time" |
|
|
|
"gitcitadel-online/internal/logger" |
|
|
|
"github.com/nbd-wtf/go-nostr" |
|
) |
|
|
|
// Client handles connections to Nostr relays with failover support using go-nostr's SimplePool |
|
type Client struct { |
|
pool *nostr.SimplePool |
|
relays []string |
|
mu sync.RWMutex |
|
ctx context.Context |
|
requestSem chan struct{} // Semaphore to limit concurrent requests |
|
maxConcurrent int // Maximum concurrent requests |
|
} |
|
|
|
// NewClient creates a new Nostr client with primary, fallback, and additional fallback relays |
|
// Uses go-nostr's SimplePool for connection management and parallel queries |
|
// Limits concurrent requests to prevent overwhelming relays (default: 5 concurrent requests) |
|
func NewClient(primaryRelay, fallbackRelay, additionalFallback string) *Client { |
|
ctx := context.Background() |
|
pool := nostr.NewSimplePool(ctx) |
|
|
|
relays := []string{} |
|
if primaryRelay != "" { |
|
relays = append(relays, primaryRelay) |
|
} |
|
if fallbackRelay != "" { |
|
relays = append(relays, fallbackRelay) |
|
} |
|
if additionalFallback != "" { |
|
relays = append(relays, additionalFallback) |
|
} |
|
|
|
maxConcurrent := 5 // Limit to 5 concurrent requests to avoid overwhelming relays |
|
return &Client{ |
|
pool: pool, |
|
relays: relays, |
|
ctx: ctx, |
|
requestSem: make(chan struct{}, maxConcurrent), |
|
maxConcurrent: maxConcurrent, |
|
} |
|
} |
|
|
|
// Connect connects to the relays (no-op, SimplePool handles connections lazily) |
|
func (c *Client) Connect(ctx context.Context) error { |
|
// SimplePool handles connections lazily when querying |
|
return nil |
|
} |
|
|
|
// ConnectToRelay connects to a single relay using SimplePool (exported for use by services) |
|
func (c *Client) ConnectToRelay(ctx context.Context, url string) (*nostr.Relay, error) { |
|
// Use SimplePool's EnsureRelay which handles connection pooling and reuse |
|
return c.pool.EnsureRelay(url) |
|
} |
|
|
|
// FetchEvent fetches a single event by querying all relays in parallel using SimplePool |
|
// Returns the newest event (highest created_at) if multiple events are found |
|
// Rate-limited to prevent overwhelming relays |
|
func (c *Client) FetchEvent(ctx context.Context, filter nostr.Filter) (*nostr.Event, error) { |
|
// Acquire semaphore to limit concurrent requests |
|
c.requestSem <- struct{}{} |
|
defer func() { <-c.requestSem }() |
|
|
|
// Create context with 30-second timeout |
|
queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) |
|
defer cancel() |
|
|
|
logger.WithFields(map[string]interface{}{ |
|
"relays": c.relays, |
|
"kinds": filter.Kinds, |
|
"authors": filter.Authors, |
|
"ids": filter.IDs, |
|
"tags": filter.Tags, |
|
}).Debug("Querying relays using SimplePool") |
|
|
|
// Use SimplePool's SubManyEose to query all relays in parallel |
|
// It automatically handles connection pooling, failover, and deduplication |
|
eventChan := c.pool.SubManyEose(queryCtx, c.relays, nostr.Filters{filter}) |
|
|
|
// Collect all events from all relays |
|
var allEvents []*nostr.Event |
|
for incomingEvent := range eventChan { |
|
if incomingEvent.Event != nil { |
|
allEvents = append(allEvents, incomingEvent.Event) |
|
logger.WithFields(map[string]interface{}{ |
|
"relay": incomingEvent.Relay.URL, |
|
"event_id": incomingEvent.Event.ID, |
|
"created_at": incomingEvent.Event.CreatedAt, |
|
}).Debug("Received event from relay") |
|
} |
|
} |
|
|
|
if len(allEvents) == 0 { |
|
return nil, fmt.Errorf("event not found from any relay") |
|
} |
|
|
|
// Find the newest event (highest created_at) |
|
newest := allEvents[0] |
|
for _, event := range allEvents[1:] { |
|
if event.CreatedAt > newest.CreatedAt { |
|
newest = event |
|
} |
|
} |
|
|
|
logger.WithFields(map[string]interface{}{ |
|
"created_at": newest.CreatedAt, |
|
"total": len(allEvents), |
|
}).Debug("Selected newest event from all relays") |
|
return newest, nil |
|
} |
|
|
|
// FetchEvents fetches multiple events by querying all relays in parallel using SimplePool |
|
// Returns deduplicated events, keeping the newest version of each event |
|
// Rate-limited to prevent overwhelming relays |
|
func (c *Client) FetchEvents(ctx context.Context, filter nostr.Filter) ([]*nostr.Event, error) { |
|
return c.FetchEventsBatch(ctx, []nostr.Filter{filter}) |
|
} |
|
|
|
// FetchEventsBatch fetches multiple events using multiple filters in a single batched query |
|
// This is more efficient than calling FetchEvents multiple times |
|
// Returns deduplicated events, keeping the newest version of each event |
|
// Rate-limited to prevent overwhelming relays |
|
func (c *Client) FetchEventsBatch(ctx context.Context, filters []nostr.Filter) ([]*nostr.Event, error) { |
|
if len(filters) == 0 { |
|
return nil, fmt.Errorf("no filters provided") |
|
} |
|
|
|
// Acquire semaphore to limit concurrent requests |
|
c.requestSem <- struct{}{} |
|
defer func() { <-c.requestSem }() |
|
|
|
// Create context with 30-second timeout |
|
queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second) |
|
defer cancel() |
|
|
|
logger.WithFields(map[string]interface{}{ |
|
"relays": c.relays, |
|
"filters": len(filters), |
|
}).Debug("Querying relays using SimplePool with batched filters") |
|
|
|
// Use SimplePool's SubManyEose to query all relays in parallel with all filters |
|
// It automatically handles connection pooling, failover, and deduplication |
|
eventChan := c.pool.SubManyEose(queryCtx, c.relays, nostr.Filters(filters)) |
|
|
|
// Collect all events from all relays, deduplicating by ID and keeping newest |
|
eventMap := make(map[string]*nostr.Event) |
|
for incomingEvent := range eventChan { |
|
if incomingEvent.Event != nil { |
|
existing, exists := eventMap[incomingEvent.Event.ID] |
|
if !exists || incomingEvent.Event.CreatedAt > existing.CreatedAt { |
|
eventMap[incomingEvent.Event.ID] = incomingEvent.Event |
|
logger.WithFields(map[string]interface{}{ |
|
"relay": incomingEvent.Relay.URL, |
|
"event_id": incomingEvent.Event.ID, |
|
"created_at": incomingEvent.Event.CreatedAt, |
|
}).Debug("Received event from relay") |
|
} |
|
} |
|
} |
|
|
|
if len(eventMap) == 0 { |
|
return nil, fmt.Errorf("no events found from any relay") |
|
} |
|
|
|
// Convert map to slice |
|
events := make([]*nostr.Event, 0, len(eventMap)) |
|
for _, event := range eventMap { |
|
events = append(events, event) |
|
} |
|
|
|
logger.WithField("events", len(events)).Debug("Returning unique events from all relays") |
|
return events, nil |
|
} |
|
|
|
// FetchEventByID fetches an event by its ID |
|
func (c *Client) FetchEventByID(ctx context.Context, eventID string) (*nostr.Event, error) { |
|
filter := nostr.Filter{ |
|
IDs: []string{eventID}, |
|
} |
|
return c.FetchEvent(ctx, filter) |
|
} |
|
|
|
// FetchEventsByKind fetches events of a specific kind |
|
func (c *Client) FetchEventsByKind(ctx context.Context, kind int, limit int) ([]*nostr.Event, error) { |
|
filter := nostr.Filter{ |
|
Kinds: []int{kind}, |
|
Limit: limit, |
|
} |
|
return c.FetchEvents(ctx, filter) |
|
} |
|
|
|
// Close closes all relay connections in the pool |
|
func (c *Client) Close() { |
|
// SimplePool manages connections, but we can close individual relays if needed |
|
// The pool will handle cleanup when context is cancelled |
|
} |
|
|
|
// GetRelays returns all configured relay URLs |
|
func (c *Client) GetRelays() []string { |
|
return c.relays |
|
} |
|
|
|
// GetPool returns the underlying SimplePool (for services that need direct access) |
|
func (c *Client) GetPool() *nostr.SimplePool { |
|
return c.pool |
|
} |
|
|
|
// HealthCheck performs a health check on the relays |
|
func (c *Client) HealthCheck(ctx context.Context, timeout time.Duration) error { |
|
ctx, cancel := context.WithTimeout(ctx, timeout) |
|
defer cancel() |
|
|
|
// Try to fetch a recent event to test connectivity |
|
// Note: Using kind 1 for health check (this is a standard kind, not configurable) |
|
filter := nostr.Filter{ |
|
Kinds: []int{1}, // kind 1 (notes) for testing |
|
Limit: 1, |
|
} |
|
|
|
_, err := c.FetchEvents(ctx, filter) |
|
return err |
|
}
|
|
|