You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

230 lines
7.5 KiB

package nostr
import (
"context"
"fmt"
"sync"
"time"
"gitcitadel-online/internal/logger"
"github.com/nbd-wtf/go-nostr"
)
// Client handles connections to Nostr relays with failover support using go-nostr's SimplePool
type Client struct {
pool *nostr.SimplePool
relays []string
mu sync.RWMutex
ctx context.Context
requestSem chan struct{} // Semaphore to limit concurrent requests
maxConcurrent int // Maximum concurrent requests
}
// NewClient creates a new Nostr client with primary, fallback, and additional fallback relays
// Uses go-nostr's SimplePool for connection management and parallel queries
// Limits concurrent requests to prevent overwhelming relays (default: 5 concurrent requests)
func NewClient(primaryRelay, fallbackRelay, additionalFallback string) *Client {
ctx := context.Background()
pool := nostr.NewSimplePool(ctx)
relays := []string{}
if primaryRelay != "" {
relays = append(relays, primaryRelay)
}
if fallbackRelay != "" {
relays = append(relays, fallbackRelay)
}
if additionalFallback != "" {
relays = append(relays, additionalFallback)
}
maxConcurrent := 5 // Limit to 5 concurrent requests to avoid overwhelming relays
return &Client{
pool: pool,
relays: relays,
ctx: ctx,
requestSem: make(chan struct{}, maxConcurrent),
maxConcurrent: maxConcurrent,
}
}
// Connect connects to the relays (no-op, SimplePool handles connections lazily)
func (c *Client) Connect(ctx context.Context) error {
// SimplePool handles connections lazily when querying
return nil
}
// ConnectToRelay connects to a single relay using SimplePool (exported for use by services)
func (c *Client) ConnectToRelay(ctx context.Context, url string) (*nostr.Relay, error) {
// Use SimplePool's EnsureRelay which handles connection pooling and reuse
return c.pool.EnsureRelay(url)
}
// FetchEvent fetches a single event by querying all relays in parallel using SimplePool
// Returns the newest event (highest created_at) if multiple events are found
// Rate-limited to prevent overwhelming relays
func (c *Client) FetchEvent(ctx context.Context, filter nostr.Filter) (*nostr.Event, error) {
// Acquire semaphore to limit concurrent requests
c.requestSem <- struct{}{}
defer func() { <-c.requestSem }()
// Create context with 30-second timeout
queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
logger.WithFields(map[string]interface{}{
"relays": c.relays,
"kinds": filter.Kinds,
"authors": filter.Authors,
"ids": filter.IDs,
"tags": filter.Tags,
}).Debug("Querying relays using SimplePool")
// Use SimplePool's SubManyEose to query all relays in parallel
// It automatically handles connection pooling, failover, and deduplication
eventChan := c.pool.SubManyEose(queryCtx, c.relays, nostr.Filters{filter})
// Collect all events from all relays
var allEvents []*nostr.Event
for incomingEvent := range eventChan {
if incomingEvent.Event != nil {
allEvents = append(allEvents, incomingEvent.Event)
logger.WithFields(map[string]interface{}{
"relay": incomingEvent.Relay.URL,
"event_id": incomingEvent.Event.ID,
"created_at": incomingEvent.Event.CreatedAt,
}).Debug("Received event from relay")
}
}
if len(allEvents) == 0 {
return nil, fmt.Errorf("event not found from any relay")
}
// Find the newest event (highest created_at)
newest := allEvents[0]
for _, event := range allEvents[1:] {
if event.CreatedAt > newest.CreatedAt {
newest = event
}
}
logger.WithFields(map[string]interface{}{
"created_at": newest.CreatedAt,
"total": len(allEvents),
}).Debug("Selected newest event from all relays")
return newest, nil
}
// FetchEvents fetches multiple events by querying all relays in parallel using SimplePool
// Returns deduplicated events, keeping the newest version of each event
// Rate-limited to prevent overwhelming relays
func (c *Client) FetchEvents(ctx context.Context, filter nostr.Filter) ([]*nostr.Event, error) {
return c.FetchEventsBatch(ctx, []nostr.Filter{filter})
}
// FetchEventsBatch fetches multiple events using multiple filters in a single batched query
// This is more efficient than calling FetchEvents multiple times
// Returns deduplicated events, keeping the newest version of each event
// Rate-limited to prevent overwhelming relays
func (c *Client) FetchEventsBatch(ctx context.Context, filters []nostr.Filter) ([]*nostr.Event, error) {
if len(filters) == 0 {
return nil, fmt.Errorf("no filters provided")
}
// Acquire semaphore to limit concurrent requests
c.requestSem <- struct{}{}
defer func() { <-c.requestSem }()
// Create context with 30-second timeout
queryCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
logger.WithFields(map[string]interface{}{
"relays": c.relays,
"filters": len(filters),
}).Debug("Querying relays using SimplePool with batched filters")
// Use SimplePool's SubManyEose to query all relays in parallel with all filters
// It automatically handles connection pooling, failover, and deduplication
eventChan := c.pool.SubManyEose(queryCtx, c.relays, nostr.Filters(filters))
// Collect all events from all relays, deduplicating by ID and keeping newest
eventMap := make(map[string]*nostr.Event)
for incomingEvent := range eventChan {
if incomingEvent.Event != nil {
existing, exists := eventMap[incomingEvent.Event.ID]
if !exists || incomingEvent.Event.CreatedAt > existing.CreatedAt {
eventMap[incomingEvent.Event.ID] = incomingEvent.Event
logger.WithFields(map[string]interface{}{
"relay": incomingEvent.Relay.URL,
"event_id": incomingEvent.Event.ID,
"created_at": incomingEvent.Event.CreatedAt,
}).Debug("Received event from relay")
}
}
}
if len(eventMap) == 0 {
return nil, fmt.Errorf("no events found from any relay")
}
// Convert map to slice
events := make([]*nostr.Event, 0, len(eventMap))
for _, event := range eventMap {
events = append(events, event)
}
logger.WithField("events", len(events)).Debug("Returning unique events from all relays")
return events, nil
}
// FetchEventByID fetches an event by its ID
func (c *Client) FetchEventByID(ctx context.Context, eventID string) (*nostr.Event, error) {
filter := nostr.Filter{
IDs: []string{eventID},
}
return c.FetchEvent(ctx, filter)
}
// FetchEventsByKind fetches events of a specific kind
func (c *Client) FetchEventsByKind(ctx context.Context, kind int, limit int) ([]*nostr.Event, error) {
filter := nostr.Filter{
Kinds: []int{kind},
Limit: limit,
}
return c.FetchEvents(ctx, filter)
}
// Close closes all relay connections in the pool
func (c *Client) Close() {
// SimplePool manages connections, but we can close individual relays if needed
// The pool will handle cleanup when context is cancelled
}
// GetRelays returns all configured relay URLs
func (c *Client) GetRelays() []string {
return c.relays
}
// GetPool returns the underlying SimplePool (for services that need direct access)
func (c *Client) GetPool() *nostr.SimplePool {
return c.pool
}
// HealthCheck performs a health check on the relays
func (c *Client) HealthCheck(ctx context.Context, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// Try to fetch a recent event to test connectivity
// Note: Using kind 1 for health check (this is a standard kind, not configurable)
filter := nostr.Filter{
Kinds: []int{KindNote}, // kind 1 (notes) for testing
Limit: 1,
}
_, err := c.FetchEvents(ctx, filter)
return err
}