You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

283 lines
6.3 KiB

// Package archive provides query augmentation from authoritative archive relays.
// It manages connections to archive relays and fetches events that match local
// queries, caching them locally for future access.
package archive
import (
"context"
"sync"
"time"
"lol.mleku.dev/log"
"git.mleku.dev/mleku/nostr/encoders/event"
"git.mleku.dev/mleku/nostr/encoders/filter"
)
// ArchiveDatabase defines the interface for storing fetched events.
type ArchiveDatabase interface {
SaveEvent(ctx context.Context, ev *event.E) (exists bool, err error)
}
// EventDeliveryChannel defines the interface for streaming results back to clients.
type EventDeliveryChannel interface {
SendEvent(ev *event.E) error
IsConnected() bool
}
// Manager handles connections to archive relays for query augmentation.
type Manager struct {
ctx context.Context
cancel context.CancelFunc
relays []string
timeout time.Duration
db ArchiveDatabase
queryCache *QueryCache
// Connection pool
mu sync.RWMutex
connections map[string]*RelayConnection
// Configuration
enabled bool
}
// Config holds the configuration for the archive manager.
type Config struct {
Enabled bool
Relays []string
TimeoutSec int
CacheTTLHrs int
}
// New creates a new archive manager.
func New(ctx context.Context, db ArchiveDatabase, cfg Config) *Manager {
if !cfg.Enabled || len(cfg.Relays) == 0 {
return &Manager{enabled: false}
}
mgrCtx, cancel := context.WithCancel(ctx)
timeout := time.Duration(cfg.TimeoutSec) * time.Second
if timeout <= 0 {
timeout = 30 * time.Second
}
cacheTTL := time.Duration(cfg.CacheTTLHrs) * time.Hour
if cacheTTL <= 0 {
cacheTTL = 24 * time.Hour
}
m := &Manager{
ctx: mgrCtx,
cancel: cancel,
relays: cfg.Relays,
timeout: timeout,
db: db,
queryCache: NewQueryCache(cacheTTL, 100000), // 100k cached queries
connections: make(map[string]*RelayConnection),
enabled: true,
}
log.I.F("archive manager initialized with %d relays, %v timeout, %v cache TTL",
len(cfg.Relays), timeout, cacheTTL)
return m
}
// IsEnabled returns whether the archive manager is enabled.
func (m *Manager) IsEnabled() bool {
return m.enabled
}
// QueryArchive queries archive relays asynchronously and stores/streams results.
// This should be called in a goroutine after returning local results.
//
// Parameters:
// - subID: the subscription ID for the query
// - connID: the connection ID (for access tracking)
// - f: the filter to query
// - delivered: map of event IDs already delivered to the client
// - listener: optional channel to stream results back (may be nil)
func (m *Manager) QueryArchive(
subID string,
connID string,
f *filter.F,
delivered map[string]struct{},
listener EventDeliveryChannel,
) {
if !m.enabled {
return
}
// Check if this query was recently executed
if m.queryCache.HasQueried(f) {
log.D.F("archive: query cache hit, skipping archive query for sub %s", subID)
return
}
// Mark query as executed
m.queryCache.MarkQueried(f)
// Create query context with timeout
queryCtx, cancel := context.WithTimeout(m.ctx, m.timeout)
defer cancel()
// Query all relays in parallel
var wg sync.WaitGroup
results := make(chan *event.E, 1000)
for _, relayURL := range m.relays {
wg.Add(1)
go func(url string) {
defer wg.Done()
m.queryRelay(queryCtx, url, f, results)
}(relayURL)
}
// Close results channel when all relays are done
go func() {
wg.Wait()
close(results)
}()
// Process results
stored := 0
streamed := 0
for ev := range results {
// Skip if already delivered
evIDStr := string(ev.ID[:])
if _, exists := delivered[evIDStr]; exists {
continue
}
// Store event
exists, err := m.db.SaveEvent(queryCtx, ev)
if err != nil {
log.D.F("archive: failed to save event: %v", err)
continue
}
if !exists {
stored++
}
// Stream to client if still connected
if listener != nil && listener.IsConnected() {
if err := listener.SendEvent(ev); err == nil {
streamed++
delivered[evIDStr] = struct{}{}
}
}
}
if stored > 0 || streamed > 0 {
log.D.F("archive: query %s completed - stored: %d, streamed: %d", subID, stored, streamed)
}
}
// queryRelay queries a single archive relay and sends results to the channel.
func (m *Manager) queryRelay(ctx context.Context, url string, f *filter.F, results chan<- *event.E) {
conn, err := m.getOrCreateConnection(url)
if err != nil {
log.D.F("archive: failed to connect to %s: %v", url, err)
return
}
events, err := conn.Query(ctx, f)
if err != nil {
log.D.F("archive: query failed on %s: %v", url, err)
return
}
for _, ev := range events {
select {
case <-ctx.Done():
return
case results <- ev:
}
}
}
// getOrCreateConnection returns an existing connection or creates a new one.
func (m *Manager) getOrCreateConnection(url string) (*RelayConnection, error) {
m.mu.RLock()
conn, exists := m.connections[url]
m.mu.RUnlock()
if exists && conn.IsConnected() {
return conn, nil
}
m.mu.Lock()
defer m.mu.Unlock()
// Double-check after acquiring write lock
conn, exists = m.connections[url]
if exists && conn.IsConnected() {
return conn, nil
}
// Create new connection
conn = NewRelayConnection(m.ctx, url)
if err := conn.Connect(); err != nil {
return nil, err
}
m.connections[url] = conn
return conn, nil
}
// Stop stops the archive manager and closes all connections.
func (m *Manager) Stop() {
if !m.enabled {
return
}
m.cancel()
m.mu.Lock()
defer m.mu.Unlock()
for _, conn := range m.connections {
conn.Close()
}
m.connections = make(map[string]*RelayConnection)
log.I.F("archive manager stopped")
}
// Stats returns current archive manager statistics.
func (m *Manager) Stats() ManagerStats {
if !m.enabled {
return ManagerStats{}
}
m.mu.RLock()
defer m.mu.RUnlock()
connected := 0
for _, conn := range m.connections {
if conn.IsConnected() {
connected++
}
}
return ManagerStats{
Enabled: m.enabled,
TotalRelays: len(m.relays),
ConnectedRelays: connected,
CachedQueries: m.queryCache.Len(),
MaxCachedQueries: m.queryCache.MaxSize(),
}
}
// ManagerStats holds archive manager statistics.
type ManagerStats struct {
Enabled bool
TotalRelays int
ConnectedRelays int
CachedQueries int
MaxCachedQueries int
}