You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

287 lines
7.8 KiB

package nostr
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/nbd-wtf/go-nostr"
)
// Client handles connections to Nostr relays with failover support
type Client struct {
primaryRelay string
fallbackRelay string
additionalFallback string
mu sync.RWMutex
lastError error
}
// NewClient creates a new Nostr client with primary, fallback, and additional fallback relays
func NewClient(primaryRelay, fallbackRelay, additionalFallback string) *Client {
return &Client{
primaryRelay: primaryRelay,
fallbackRelay: fallbackRelay,
additionalFallback: additionalFallback,
}
}
// Connect connects to the relays (no-op for now, connections happen on query)
func (c *Client) Connect(ctx context.Context) error {
// Connections are established lazily when querying
return nil
}
// ConnectToRelay connects to a single relay (exported for use by services)
func (c *Client) ConnectToRelay(ctx context.Context, url string) (*nostr.Relay, error) {
relay, err := nostr.RelayConnect(ctx, url)
if err != nil {
return nil, err
}
return relay, nil
}
// connectToRelay connects to a single relay (deprecated, use ConnectToRelay)
func (c *Client) connectToRelay(ctx context.Context, url string) (*nostr.Relay, error) {
return c.ConnectToRelay(ctx, url)
}
// FetchEvent fetches a single event by querying all three relays in parallel with 10-second timeout
// Returns the newest event (highest created_at) if multiple events are found
func (c *Client) FetchEvent(ctx context.Context, filter nostr.Filter) (*nostr.Event, error) {
// Create context with 10-second timeout
queryCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// Query all three relays in parallel
relays := []string{c.primaryRelay, c.fallbackRelay, c.additionalFallback}
type result struct {
relay string
events []*nostr.Event
err error
}
results := make(chan result, len(relays))
var wg sync.WaitGroup
for _, relayURL := range relays {
if relayURL == "" {
continue
}
wg.Add(1)
go func(url string) {
defer wg.Done()
log.Printf("Querying relay %s with filter: Kinds=%v, Authors=%v, IDs=%v, Tags=%v",
url, filter.Kinds, filter.Authors, filter.IDs, filter.Tags)
relay, err := c.connectToRelay(queryCtx, url)
if err != nil {
log.Printf("Relay %s: connection failed: %v", url, err)
results <- result{relay: url, events: nil, err: err}
return
}
defer relay.Close()
events, err := relay.QuerySync(queryCtx, filter)
if err != nil {
log.Printf("Relay %s: query failed: %v", url, err)
results <- result{relay: url, events: nil, err: err}
return
}
log.Printf("Relay %s: returned %d event(s)", url, len(events))
results <- result{relay: url, events: events, err: nil}
}(relayURL)
}
// Wait for all queries to complete
go func() {
wg.Wait()
close(results)
}()
// Collect all events from all relays
var allEvents []*nostr.Event
var lastErr error
for res := range results {
if res.err != nil {
lastErr = res.err
continue
}
if len(res.events) > 0 {
allEvents = append(allEvents, res.events...)
}
}
if len(allEvents) == 0 {
if lastErr != nil {
return nil, fmt.Errorf("event not found from any relay: %w", lastErr)
}
return nil, fmt.Errorf("event not found from any relay")
}
// Find the newest event (highest created_at)
newest := allEvents[0]
for _, event := range allEvents[1:] {
if event.CreatedAt > newest.CreatedAt {
newest = event
}
}
log.Printf("Selected newest event (created_at: %d) from %d total events across all relays", newest.CreatedAt, len(allEvents))
return newest, nil
}
// FetchEvents fetches multiple events by querying all three relays in parallel with 10-second timeout
// Returns deduplicated events, keeping the newest version of each event
func (c *Client) FetchEvents(ctx context.Context, filter nostr.Filter) ([]*nostr.Event, error) {
// Create context with 10-second timeout
queryCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
// Query all three relays in parallel
relays := []string{c.primaryRelay, c.fallbackRelay, c.additionalFallback}
type result struct {
relay string
events []*nostr.Event
err error
}
results := make(chan result, len(relays))
var wg sync.WaitGroup
for _, relayURL := range relays {
if relayURL == "" {
continue
}
wg.Add(1)
go func(url string) {
defer wg.Done()
log.Printf("Querying relay %s with filter: Kinds=%v, Authors=%v, IDs=%v, Tags=%v, Limit=%d",
url, filter.Kinds, filter.Authors, filter.IDs, filter.Tags, filter.Limit)
relay, err := c.connectToRelay(queryCtx, url)
if err != nil {
log.Printf("Relay %s: connection failed: %v", url, err)
results <- result{relay: url, events: nil, err: err}
return
}
defer relay.Close()
events, err := relay.QuerySync(queryCtx, filter)
if err != nil {
log.Printf("Relay %s: query failed: %v", url, err)
results <- result{relay: url, events: nil, err: err}
return
}
log.Printf("Relay %s: returned %d event(s)", url, len(events))
results <- result{relay: url, events: events, err: nil}
}(relayURL)
}
// Wait for all queries to complete
go func() {
wg.Wait()
close(results)
}()
// Collect all events from all relays, deduplicating by ID and keeping newest
eventMap := make(map[string]*nostr.Event)
var lastErr error
for res := range results {
if res.err != nil {
lastErr = res.err
continue
}
for _, event := range res.events {
existing, exists := eventMap[event.ID]
if !exists || event.CreatedAt > existing.CreatedAt {
eventMap[event.ID] = event
}
}
}
if len(eventMap) == 0 {
if lastErr != nil {
return nil, fmt.Errorf("no events found from any relay: %w", lastErr)
}
return nil, fmt.Errorf("no events found from any relay")
}
// Convert map to slice
events := make([]*nostr.Event, 0, len(eventMap))
for _, event := range eventMap {
events = append(events, event)
}
log.Printf("Returning %d unique event(s) from all relays", len(events))
return events, nil
}
// FetchEventByID fetches an event by its ID
func (c *Client) FetchEventByID(ctx context.Context, eventID string) (*nostr.Event, error) {
filter := nostr.Filter{
IDs: []string{eventID},
}
return c.FetchEvent(ctx, filter)
}
// FetchEventsByKind fetches events of a specific kind
func (c *Client) FetchEventsByKind(ctx context.Context, kind int, limit int) ([]*nostr.Event, error) {
filter := nostr.Filter{
Kinds: []int{kind},
Limit: limit,
}
return c.FetchEvents(ctx, filter)
}
// Close closes all relay connections (no-op for lazy connections)
func (c *Client) Close() {
// Connections are closed after each query, so nothing to do here
}
// GetLastError returns the last error encountered
func (c *Client) GetLastError() error {
c.mu.RLock()
defer c.mu.RUnlock()
return c.lastError
}
// IsConnected checks if at least one relay is connected (always true for lazy connections)
func (c *Client) IsConnected() bool {
return true // We connect on-demand
}
// GetRelays returns all configured relay URLs (primary, fallback, additional fallback)
func (c *Client) GetRelays() []string {
relays := []string{}
if c.primaryRelay != "" {
relays = append(relays, c.primaryRelay)
}
if c.fallbackRelay != "" {
relays = append(relays, c.fallbackRelay)
}
if c.additionalFallback != "" {
relays = append(relays, c.additionalFallback)
}
return relays
}
// HealthCheck performs a health check on the relays
func (c *Client) HealthCheck(ctx context.Context, timeout time.Duration) error {
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
// Try to fetch a recent event to test connectivity
// Note: Using kind 1 for health check (this is a standard kind, not configurable)
filter := nostr.Filter{
Kinds: []int{1}, // kind 1 (notes) for testing
Limit: 1,
}
_, err := c.FetchEvents(ctx, filter)
return err
}