You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
272 lines
7.4 KiB
272 lines
7.4 KiB
package nostr |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"log" |
|
"sync" |
|
"time" |
|
|
|
"github.com/nbd-wtf/go-nostr" |
|
) |
|
|
|
// Client handles connections to Nostr relays with failover support |
|
type Client struct { |
|
primaryRelay string |
|
fallbackRelay string |
|
additionalFallback string |
|
mu sync.RWMutex |
|
lastError error |
|
} |
|
|
|
// NewClient creates a new Nostr client with primary, fallback, and additional fallback relays |
|
func NewClient(primaryRelay, fallbackRelay, additionalFallback string) *Client { |
|
return &Client{ |
|
primaryRelay: primaryRelay, |
|
fallbackRelay: fallbackRelay, |
|
additionalFallback: additionalFallback, |
|
} |
|
} |
|
|
|
// Connect connects to the relays (no-op for now, connections happen on query) |
|
func (c *Client) Connect(ctx context.Context) error { |
|
// Connections are established lazily when querying |
|
return nil |
|
} |
|
|
|
// ConnectToRelay connects to a single relay (exported for use by services) |
|
func (c *Client) ConnectToRelay(ctx context.Context, url string) (*nostr.Relay, error) { |
|
relay, err := nostr.RelayConnect(ctx, url) |
|
if err != nil { |
|
return nil, err |
|
} |
|
return relay, nil |
|
} |
|
|
|
// connectToRelay connects to a single relay (deprecated, use ConnectToRelay) |
|
func (c *Client) connectToRelay(ctx context.Context, url string) (*nostr.Relay, error) { |
|
return c.ConnectToRelay(ctx, url) |
|
} |
|
|
|
// FetchEvent fetches a single event by querying all three relays in parallel with 10-second timeout |
|
// Returns the newest event (highest created_at) if multiple events are found |
|
func (c *Client) FetchEvent(ctx context.Context, filter nostr.Filter) (*nostr.Event, error) { |
|
// Create context with 10-second timeout |
|
queryCtx, cancel := context.WithTimeout(ctx, 10*time.Second) |
|
defer cancel() |
|
|
|
// Query all three relays in parallel |
|
relays := []string{c.primaryRelay, c.fallbackRelay, c.additionalFallback} |
|
type result struct { |
|
relay string |
|
events []*nostr.Event |
|
err error |
|
} |
|
|
|
results := make(chan result, len(relays)) |
|
var wg sync.WaitGroup |
|
|
|
for _, relayURL := range relays { |
|
if relayURL == "" { |
|
continue |
|
} |
|
wg.Add(1) |
|
go func(url string) { |
|
defer wg.Done() |
|
log.Printf("Querying relay %s with filter: Kinds=%v, Authors=%v, IDs=%v, Tags=%v", |
|
url, filter.Kinds, filter.Authors, filter.IDs, filter.Tags) |
|
|
|
relay, err := c.connectToRelay(queryCtx, url) |
|
if err != nil { |
|
log.Printf("Relay %s: connection failed: %v", url, err) |
|
results <- result{relay: url, events: nil, err: err} |
|
return |
|
} |
|
defer relay.Close() |
|
|
|
events, err := relay.QuerySync(queryCtx, filter) |
|
if err != nil { |
|
log.Printf("Relay %s: query failed: %v", url, err) |
|
results <- result{relay: url, events: nil, err: err} |
|
return |
|
} |
|
|
|
log.Printf("Relay %s: returned %d event(s)", url, len(events)) |
|
results <- result{relay: url, events: events, err: nil} |
|
}(relayURL) |
|
} |
|
|
|
// Wait for all queries to complete |
|
go func() { |
|
wg.Wait() |
|
close(results) |
|
}() |
|
|
|
// Collect all events from all relays |
|
var allEvents []*nostr.Event |
|
var lastErr error |
|
for res := range results { |
|
if res.err != nil { |
|
lastErr = res.err |
|
continue |
|
} |
|
if len(res.events) > 0 { |
|
allEvents = append(allEvents, res.events...) |
|
} |
|
} |
|
|
|
if len(allEvents) == 0 { |
|
if lastErr != nil { |
|
return nil, fmt.Errorf("event not found from any relay: %w", lastErr) |
|
} |
|
return nil, fmt.Errorf("event not found from any relay") |
|
} |
|
|
|
// Find the newest event (highest created_at) |
|
newest := allEvents[0] |
|
for _, event := range allEvents[1:] { |
|
if event.CreatedAt > newest.CreatedAt { |
|
newest = event |
|
} |
|
} |
|
|
|
log.Printf("Selected newest event (created_at: %d) from %d total events across all relays", newest.CreatedAt, len(allEvents)) |
|
return newest, nil |
|
} |
|
|
|
// FetchEvents fetches multiple events by querying all three relays in parallel with 10-second timeout |
|
// Returns deduplicated events, keeping the newest version of each event |
|
func (c *Client) FetchEvents(ctx context.Context, filter nostr.Filter) ([]*nostr.Event, error) { |
|
// Create context with 10-second timeout |
|
queryCtx, cancel := context.WithTimeout(ctx, 10*time.Second) |
|
defer cancel() |
|
|
|
// Query all three relays in parallel |
|
relays := []string{c.primaryRelay, c.fallbackRelay, c.additionalFallback} |
|
type result struct { |
|
relay string |
|
events []*nostr.Event |
|
err error |
|
} |
|
|
|
results := make(chan result, len(relays)) |
|
var wg sync.WaitGroup |
|
|
|
for _, relayURL := range relays { |
|
if relayURL == "" { |
|
continue |
|
} |
|
wg.Add(1) |
|
go func(url string) { |
|
defer wg.Done() |
|
log.Printf("Querying relay %s with filter: Kinds=%v, Authors=%v, IDs=%v, Tags=%v, Limit=%d", |
|
url, filter.Kinds, filter.Authors, filter.IDs, filter.Tags, filter.Limit) |
|
|
|
relay, err := c.connectToRelay(queryCtx, url) |
|
if err != nil { |
|
log.Printf("Relay %s: connection failed: %v", url, err) |
|
results <- result{relay: url, events: nil, err: err} |
|
return |
|
} |
|
defer relay.Close() |
|
|
|
events, err := relay.QuerySync(queryCtx, filter) |
|
if err != nil { |
|
log.Printf("Relay %s: query failed: %v", url, err) |
|
results <- result{relay: url, events: nil, err: err} |
|
return |
|
} |
|
|
|
log.Printf("Relay %s: returned %d event(s)", url, len(events)) |
|
results <- result{relay: url, events: events, err: nil} |
|
}(relayURL) |
|
} |
|
|
|
// Wait for all queries to complete |
|
go func() { |
|
wg.Wait() |
|
close(results) |
|
}() |
|
|
|
// Collect all events from all relays, deduplicating by ID and keeping newest |
|
eventMap := make(map[string]*nostr.Event) |
|
var lastErr error |
|
for res := range results { |
|
if res.err != nil { |
|
lastErr = res.err |
|
continue |
|
} |
|
for _, event := range res.events { |
|
existing, exists := eventMap[event.ID] |
|
if !exists || event.CreatedAt > existing.CreatedAt { |
|
eventMap[event.ID] = event |
|
} |
|
} |
|
} |
|
|
|
if len(eventMap) == 0 { |
|
if lastErr != nil { |
|
return nil, fmt.Errorf("no events found from any relay: %w", lastErr) |
|
} |
|
return nil, fmt.Errorf("no events found from any relay") |
|
} |
|
|
|
// Convert map to slice |
|
events := make([]*nostr.Event, 0, len(eventMap)) |
|
for _, event := range eventMap { |
|
events = append(events, event) |
|
} |
|
|
|
log.Printf("Returning %d unique event(s) from all relays", len(events)) |
|
return events, nil |
|
} |
|
|
|
// FetchEventByID fetches an event by its ID |
|
func (c *Client) FetchEventByID(ctx context.Context, eventID string) (*nostr.Event, error) { |
|
filter := nostr.Filter{ |
|
IDs: []string{eventID}, |
|
} |
|
return c.FetchEvent(ctx, filter) |
|
} |
|
|
|
// FetchEventsByKind fetches events of a specific kind |
|
func (c *Client) FetchEventsByKind(ctx context.Context, kind int, limit int) ([]*nostr.Event, error) { |
|
filter := nostr.Filter{ |
|
Kinds: []int{kind}, |
|
Limit: limit, |
|
} |
|
return c.FetchEvents(ctx, filter) |
|
} |
|
|
|
// Close closes all relay connections (no-op for lazy connections) |
|
func (c *Client) Close() { |
|
// Connections are closed after each query, so nothing to do here |
|
} |
|
|
|
// GetLastError returns the last error encountered |
|
func (c *Client) GetLastError() error { |
|
c.mu.RLock() |
|
defer c.mu.RUnlock() |
|
return c.lastError |
|
} |
|
|
|
// IsConnected checks if at least one relay is connected (always true for lazy connections) |
|
func (c *Client) IsConnected() bool { |
|
return true // We connect on-demand |
|
} |
|
|
|
// HealthCheck performs a health check on the relays |
|
func (c *Client) HealthCheck(ctx context.Context, timeout time.Duration) error { |
|
ctx, cancel := context.WithTimeout(ctx, timeout) |
|
defer cancel() |
|
|
|
// Try to fetch a recent event to test connectivity |
|
// Note: Using kind 1 for health check (this is a standard kind, not configurable) |
|
filter := nostr.Filter{ |
|
Kinds: []int{1}, // kind 1 (notes) for testing |
|
Limit: 1, |
|
} |
|
|
|
_, err := c.FetchEvents(ctx, filter) |
|
return err |
|
}
|
|
|