diff --git a/app/config/config.go b/app/config/config.go index 60578c0..af50413 100644 --- a/app/config/config.go +++ b/app/config/config.go @@ -160,6 +160,18 @@ type C struct { // Cluster replication configuration ClusterPropagatePrivilegedEvents bool `env:"ORLY_CLUSTER_PROPAGATE_PRIVILEGED_EVENTS" default:"true" usage:"propagate privileged events (DMs, gift wraps, etc.) to relay peers for replication"` + // Archive relay configuration (query augmentation from authoritative archives) + ArchiveEnabled bool `env:"ORLY_ARCHIVE_ENABLED" default:"false" usage:"enable archive relay query augmentation (fetch from archives, cache locally)"` + ArchiveRelays []string `env:"ORLY_ARCHIVE_RELAYS" default:"wss://archive.orly.dev/" usage:"comma-separated list of archive relay URLs for query augmentation"` + ArchiveTimeoutSec int `env:"ORLY_ARCHIVE_TIMEOUT_SEC" default:"30" usage:"timeout in seconds for archive relay queries"` + ArchiveCacheTTLHrs int `env:"ORLY_ARCHIVE_CACHE_TTL_HRS" default:"24" usage:"hours to cache query fingerprints to avoid repeated archive requests"` + + // Storage management configuration (access-based garbage collection) + MaxStorageBytes int64 `env:"ORLY_MAX_STORAGE_BYTES" default:"0" usage:"maximum storage in bytes (0=auto-detect 80%% of filesystem)"` + GCEnabled bool `env:"ORLY_GC_ENABLED" default:"true" usage:"enable continuous garbage collection based on access patterns"` + GCIntervalSec int `env:"ORLY_GC_INTERVAL_SEC" default:"60" usage:"seconds between GC runs when storage exceeds limit"` + GCBatchSize int `env:"ORLY_GC_BATCH_SIZE" default:"1000" usage:"number of events to consider per GC run"` + // ServeMode is set programmatically by the 'serve' subcommand to grant full owner // access to all users (no env tag - internal use only) ServeMode bool @@ -590,3 +602,33 @@ func (cfg *C) GetCashuConfigValues() ( scopes, cfg.CashuReauthorize } + +// GetArchiveConfigValues returns the archive relay configuration values. +// This avoids circular imports with pkg/archive while allowing main.go to construct +// the archive manager configuration. +func (cfg *C) GetArchiveConfigValues() ( + enabled bool, + relays []string, + timeoutSec int, + cacheTTLHrs int, +) { + return cfg.ArchiveEnabled, + cfg.ArchiveRelays, + cfg.ArchiveTimeoutSec, + cfg.ArchiveCacheTTLHrs +} + +// GetStorageConfigValues returns the storage management configuration values. +// This avoids circular imports with pkg/storage while allowing main.go to construct +// the garbage collector and access tracker configuration. +func (cfg *C) GetStorageConfigValues() ( + maxStorageBytes int64, + gcEnabled bool, + gcIntervalSec int, + gcBatchSize int, +) { + return cfg.MaxStorageBytes, + cfg.GCEnabled, + cfg.GCIntervalSec, + cfg.GCBatchSize +} diff --git a/app/handle-req.go b/app/handle-req.go index c8d8ffb..7894361 100644 --- a/app/handle-req.go +++ b/app/handle-req.go @@ -690,6 +690,31 @@ func (l *Listener) HandleReq(msg []byte) (err error) { Write(l); chk.E(err) { return } + + // Record access for returned events (for GC access-based ranking) + if l.accessTracker != nil && len(events) > 0 { + go func(evts event.S, connID string) { + for _, ev := range evts { + if ser, err := l.DB.GetSerialById(ev.ID); err == nil && ser != nil { + l.accessTracker.RecordAccess(ser.Get(), connID) + } + } + }(events, l.connectionID) + } + + // Trigger archive relay query if enabled (background fetch + stream results) + if l.archiveManager != nil && l.archiveManager.IsEnabled() && len(*env.Filters) > 0 { + // Use first filter for archive query + f := (*env.Filters)[0] + go l.archiveManager.QueryArchive( + string(env.Subscription), + l.connectionID, + f, + seen, + l, // implements EventDeliveryChannel + ) + } + // if the query was for just Ids, we know there can't be any more results, // so cancel the subscription. cancel := true diff --git a/app/handle-websocket.go b/app/handle-websocket.go index e3fb62f..e7a9367 100644 --- a/app/handle-websocket.go +++ b/app/handle-websocket.go @@ -3,6 +3,7 @@ package app import ( "context" "crypto/rand" + "fmt" "net/http" "strings" "time" @@ -99,15 +100,17 @@ whitelist: handlerSemSize = 100 // Default if not configured } + now := time.Now() listener := &Listener{ ctx: ctx, cancel: cancel, Server: s, conn: conn, remote: remote, + connectionID: fmt.Sprintf("%s-%d", remote, now.UnixNano()), // Unique connection ID for access tracking req: r, cashuToken: cashuToken, // Verified Cashu access token (nil if none provided) - startTime: time.Now(), + startTime: now, writeChan: make(chan publish.WriteRequest, 100), // Buffered channel for writes writeDone: make(chan struct{}), messageQueue: make(chan messageRequest, 100), // Buffered channel for message processing diff --git a/app/listener.go b/app/listener.go index 17f7854..ee5d32f 100644 --- a/app/listener.go +++ b/app/listener.go @@ -28,6 +28,7 @@ type Listener struct { ctx context.Context cancel context.CancelFunc // Cancel function for this listener's context remote string + connectionID string // Unique identifier for this connection (for access tracking) req *http.Request challenge atomicutils.Bytes authedPubkey atomicutils.Bytes @@ -112,6 +113,29 @@ func (l *Listener) Write(p []byte) (n int, err error) { } } +// SendEvent sends an event to the client. Implements archive.EventDeliveryChannel. +func (l *Listener) SendEvent(ev *event.E) error { + if ev == nil { + return nil + } + // Serialize the event as an EVENT envelope + data := ev.Serialize() + // Use Write to send + _, err := l.Write(data) + return err +} + +// IsConnected returns whether the client connection is still active. +// Implements archive.EventDeliveryChannel. +func (l *Listener) IsConnected() bool { + select { + case <-l.ctx.Done(): + return false + default: + return l.conn != nil + } +} + // WriteControl sends a control message through the write channel func (l *Listener) WriteControl(messageType int, data []byte, deadline time.Time) (err error) { // Defensive: recover from any panic when sending to closed channel diff --git a/app/main.go b/app/main.go index de9ed33..40daed5 100644 --- a/app/main.go +++ b/app/main.go @@ -29,8 +29,10 @@ import ( cashuiface "next.orly.dev/pkg/interfaces/cashu" "next.orly.dev/pkg/ratelimit" "next.orly.dev/pkg/spider" + "next.orly.dev/pkg/storage" dsync "next.orly.dev/pkg/sync" "next.orly.dev/pkg/wireguard" + "next.orly.dev/pkg/archive" "git.mleku.dev/mleku/nostr/interfaces/signer/p8k" ) @@ -512,6 +514,40 @@ func Run( } } + // Initialize access tracker for storage management (only for Badger backend) + if badgerDB, ok := db.(*database.D); ok { + l.accessTracker = storage.NewAccessTracker(badgerDB, 100000) // 100k dedup cache + l.accessTracker.Start() + log.I.F("access tracker initialized") + + // Initialize garbage collector if enabled + maxBytes, gcEnabled, gcIntervalSec, gcBatchSize := cfg.GetStorageConfigValues() + if gcEnabled { + gcCfg := storage.GCConfig{ + MaxStorageBytes: maxBytes, + Interval: time.Duration(gcIntervalSec) * time.Second, + BatchSize: gcBatchSize, + MinAgeSec: 3600, // Minimum 1 hour before eviction + } + l.garbageCollector = storage.NewGarbageCollector(ctx, badgerDB, l.accessTracker, gcCfg) + l.garbageCollector.Start() + log.I.F("garbage collector started (interval: %ds, batch: %d)", gcIntervalSec, gcBatchSize) + } + } + + // Initialize archive relay manager if enabled + archiveEnabled, archiveRelays, archiveTimeoutSec, archiveCacheTTLHrs := cfg.GetArchiveConfigValues() + if archiveEnabled && len(archiveRelays) > 0 { + archiveCfg := archive.Config{ + Enabled: true, + Relays: archiveRelays, + TimeoutSec: archiveTimeoutSec, + CacheTTLHrs: archiveCacheTTLHrs, + } + l.archiveManager = archive.New(ctx, db, archiveCfg) + log.I.F("archive relay manager initialized with %d relays", len(archiveRelays)) + } + // Start rate limiter if enabled if limiter != nil && limiter.IsEnabled() { limiter.Start() @@ -621,6 +657,24 @@ func Run( log.I.F("rate limiter stopped") } + // Stop archive manager if running + if l.archiveManager != nil { + l.archiveManager.Stop() + log.I.F("archive manager stopped") + } + + // Stop garbage collector if running + if l.garbageCollector != nil { + l.garbageCollector.Stop() + log.I.F("garbage collector stopped") + } + + // Stop access tracker if running + if l.accessTracker != nil { + l.accessTracker.Stop() + log.I.F("access tracker stopped") + } + // Stop bunker server if running if l.bunkerServer != nil { l.bunkerServer.Stop() diff --git a/app/server.go b/app/server.go index c2376ea..5434897 100644 --- a/app/server.go +++ b/app/server.go @@ -38,8 +38,10 @@ import ( "next.orly.dev/pkg/cashu/verifier" "next.orly.dev/pkg/ratelimit" "next.orly.dev/pkg/spider" + "next.orly.dev/pkg/storage" dsync "next.orly.dev/pkg/sync" "next.orly.dev/pkg/wireguard" + "next.orly.dev/pkg/archive" ) type Server struct { @@ -91,6 +93,11 @@ type Server struct { // Cashu access token system (NIP-XX) CashuIssuer *issuer.Issuer CashuVerifier *verifier.Verifier + + // Archive relay and storage management + archiveManager *archive.Manager + accessTracker *storage.AccessTracker + garbageCollector *storage.GarbageCollector } // isIPBlacklisted checks if an IP address is blacklisted using the managed ACL system diff --git a/cmd/benchmark/relysqlite_wrapper.go b/cmd/benchmark/relysqlite_wrapper.go index 785a174..052e095 100644 --- a/cmd/benchmark/relysqlite_wrapper.go +++ b/cmd/benchmark/relysqlite_wrapper.go @@ -278,6 +278,17 @@ func (w *RelySQLiteWrapper) EventIdsBySerial(start uint64, count int) (evs []uin return nil, fmt.Errorf("not implemented") } +// Access tracking stubs (not needed for benchmarking) +func (w *RelySQLiteWrapper) RecordEventAccess(serial uint64, connectionID string) error { + return nil // No-op for benchmarking +} +func (w *RelySQLiteWrapper) GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) { + return 0, 0, nil +} +func (w *RelySQLiteWrapper) GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error) { + return nil, nil +} + // Helper function to check if a kind is replaceable func isReplaceableKind(kind int) bool { return (kind >= 10000 && kind < 20000) || kind == 0 || kind == 3 diff --git a/pkg/archive/archive.go b/pkg/archive/archive.go new file mode 100644 index 0000000..8fecd6c --- /dev/null +++ b/pkg/archive/archive.go @@ -0,0 +1,283 @@ +// Package archive provides query augmentation from authoritative archive relays. +// It manages connections to archive relays and fetches events that match local +// queries, caching them locally for future access. +package archive + +import ( + "context" + "sync" + "time" + + "lol.mleku.dev/log" + + "git.mleku.dev/mleku/nostr/encoders/event" + "git.mleku.dev/mleku/nostr/encoders/filter" +) + +// ArchiveDatabase defines the interface for storing fetched events. +type ArchiveDatabase interface { + SaveEvent(ctx context.Context, ev *event.E) (exists bool, err error) +} + +// EventDeliveryChannel defines the interface for streaming results back to clients. +type EventDeliveryChannel interface { + SendEvent(ev *event.E) error + IsConnected() bool +} + +// Manager handles connections to archive relays for query augmentation. +type Manager struct { + ctx context.Context + cancel context.CancelFunc + + relays []string + timeout time.Duration + db ArchiveDatabase + queryCache *QueryCache + + // Connection pool + mu sync.RWMutex + connections map[string]*RelayConnection + + // Configuration + enabled bool +} + +// Config holds the configuration for the archive manager. +type Config struct { + Enabled bool + Relays []string + TimeoutSec int + CacheTTLHrs int +} + +// New creates a new archive manager. +func New(ctx context.Context, db ArchiveDatabase, cfg Config) *Manager { + if !cfg.Enabled || len(cfg.Relays) == 0 { + return &Manager{enabled: false} + } + + mgrCtx, cancel := context.WithCancel(ctx) + + timeout := time.Duration(cfg.TimeoutSec) * time.Second + if timeout <= 0 { + timeout = 30 * time.Second + } + + cacheTTL := time.Duration(cfg.CacheTTLHrs) * time.Hour + if cacheTTL <= 0 { + cacheTTL = 24 * time.Hour + } + + m := &Manager{ + ctx: mgrCtx, + cancel: cancel, + relays: cfg.Relays, + timeout: timeout, + db: db, + queryCache: NewQueryCache(cacheTTL, 100000), // 100k cached queries + connections: make(map[string]*RelayConnection), + enabled: true, + } + + log.I.F("archive manager initialized with %d relays, %v timeout, %v cache TTL", + len(cfg.Relays), timeout, cacheTTL) + + return m +} + +// IsEnabled returns whether the archive manager is enabled. +func (m *Manager) IsEnabled() bool { + return m.enabled +} + +// QueryArchive queries archive relays asynchronously and stores/streams results. +// This should be called in a goroutine after returning local results. +// +// Parameters: +// - subID: the subscription ID for the query +// - connID: the connection ID (for access tracking) +// - f: the filter to query +// - delivered: map of event IDs already delivered to the client +// - listener: optional channel to stream results back (may be nil) +func (m *Manager) QueryArchive( + subID string, + connID string, + f *filter.F, + delivered map[string]struct{}, + listener EventDeliveryChannel, +) { + if !m.enabled { + return + } + + // Check if this query was recently executed + if m.queryCache.HasQueried(f) { + log.D.F("archive: query cache hit, skipping archive query for sub %s", subID) + return + } + + // Mark query as executed + m.queryCache.MarkQueried(f) + + // Create query context with timeout + queryCtx, cancel := context.WithTimeout(m.ctx, m.timeout) + defer cancel() + + // Query all relays in parallel + var wg sync.WaitGroup + results := make(chan *event.E, 1000) + + for _, relayURL := range m.relays { + wg.Add(1) + go func(url string) { + defer wg.Done() + m.queryRelay(queryCtx, url, f, results) + }(relayURL) + } + + // Close results channel when all relays are done + go func() { + wg.Wait() + close(results) + }() + + // Process results + stored := 0 + streamed := 0 + + for ev := range results { + // Skip if already delivered + evIDStr := string(ev.ID[:]) + if _, exists := delivered[evIDStr]; exists { + continue + } + + // Store event + exists, err := m.db.SaveEvent(queryCtx, ev) + if err != nil { + log.D.F("archive: failed to save event: %v", err) + continue + } + if !exists { + stored++ + } + + // Stream to client if still connected + if listener != nil && listener.IsConnected() { + if err := listener.SendEvent(ev); err == nil { + streamed++ + delivered[evIDStr] = struct{}{} + } + } + } + + if stored > 0 || streamed > 0 { + log.D.F("archive: query %s completed - stored: %d, streamed: %d", subID, stored, streamed) + } +} + +// queryRelay queries a single archive relay and sends results to the channel. +func (m *Manager) queryRelay(ctx context.Context, url string, f *filter.F, results chan<- *event.E) { + conn, err := m.getOrCreateConnection(url) + if err != nil { + log.D.F("archive: failed to connect to %s: %v", url, err) + return + } + + events, err := conn.Query(ctx, f) + if err != nil { + log.D.F("archive: query failed on %s: %v", url, err) + return + } + + for _, ev := range events { + select { + case <-ctx.Done(): + return + case results <- ev: + } + } +} + +// getOrCreateConnection returns an existing connection or creates a new one. +func (m *Manager) getOrCreateConnection(url string) (*RelayConnection, error) { + m.mu.RLock() + conn, exists := m.connections[url] + m.mu.RUnlock() + + if exists && conn.IsConnected() { + return conn, nil + } + + m.mu.Lock() + defer m.mu.Unlock() + + // Double-check after acquiring write lock + conn, exists = m.connections[url] + if exists && conn.IsConnected() { + return conn, nil + } + + // Create new connection + conn = NewRelayConnection(m.ctx, url) + if err := conn.Connect(); err != nil { + return nil, err + } + + m.connections[url] = conn + return conn, nil +} + +// Stop stops the archive manager and closes all connections. +func (m *Manager) Stop() { + if !m.enabled { + return + } + + m.cancel() + + m.mu.Lock() + defer m.mu.Unlock() + + for _, conn := range m.connections { + conn.Close() + } + m.connections = make(map[string]*RelayConnection) + + log.I.F("archive manager stopped") +} + +// Stats returns current archive manager statistics. +func (m *Manager) Stats() ManagerStats { + if !m.enabled { + return ManagerStats{} + } + + m.mu.RLock() + defer m.mu.RUnlock() + + connected := 0 + for _, conn := range m.connections { + if conn.IsConnected() { + connected++ + } + } + + return ManagerStats{ + Enabled: m.enabled, + TotalRelays: len(m.relays), + ConnectedRelays: connected, + CachedQueries: m.queryCache.Len(), + MaxCachedQueries: m.queryCache.MaxSize(), + } +} + +// ManagerStats holds archive manager statistics. +type ManagerStats struct { + Enabled bool + TotalRelays int + ConnectedRelays int + CachedQueries int + MaxCachedQueries int +} diff --git a/pkg/archive/connection.go b/pkg/archive/connection.go new file mode 100644 index 0000000..ca46c91 --- /dev/null +++ b/pkg/archive/connection.go @@ -0,0 +1,175 @@ +package archive + +import ( + "context" + "sync" + "time" + + "git.mleku.dev/mleku/nostr/encoders/event" + "git.mleku.dev/mleku/nostr/encoders/filter" + "git.mleku.dev/mleku/nostr/ws" + "lol.mleku.dev/log" +) + +// RelayConnection manages a single archive relay connection. +type RelayConnection struct { + url string + client *ws.Client + ctx context.Context + cancel context.CancelFunc + + // Connection state + mu sync.RWMutex + lastConnect time.Time + reconnectDelay time.Duration + connected bool +} + +const ( + // Initial delay between reconnection attempts + initialReconnectDelay = 5 * time.Second + // Maximum delay between reconnection attempts + maxReconnectDelay = 5 * time.Minute + // Connection timeout + connectTimeout = 10 * time.Second + // Query timeout (per query, not global) + queryTimeout = 30 * time.Second +) + +// NewRelayConnection creates a new relay connection. +func NewRelayConnection(parentCtx context.Context, url string) *RelayConnection { + ctx, cancel := context.WithCancel(parentCtx) + return &RelayConnection{ + url: url, + ctx: ctx, + cancel: cancel, + reconnectDelay: initialReconnectDelay, + } +} + +// Connect establishes a connection to the archive relay. +func (rc *RelayConnection) Connect() error { + rc.mu.Lock() + defer rc.mu.Unlock() + + if rc.connected && rc.client != nil { + return nil + } + + connectCtx, cancel := context.WithTimeout(rc.ctx, connectTimeout) + defer cancel() + + client, err := ws.RelayConnect(connectCtx, rc.url) + if err != nil { + rc.reconnectDelay = min(rc.reconnectDelay*2, maxReconnectDelay) + return err + } + + rc.client = client + rc.connected = true + rc.lastConnect = time.Now() + rc.reconnectDelay = initialReconnectDelay + + log.D.F("archive: connected to %s", rc.url) + + return nil +} + +// Query executes a query against the archive relay. +// Returns a slice of events matching the filter. +func (rc *RelayConnection) Query(ctx context.Context, f *filter.F) ([]*event.E, error) { + rc.mu.RLock() + client := rc.client + connected := rc.connected + rc.mu.RUnlock() + + if !connected || client == nil { + if err := rc.Connect(); err != nil { + return nil, err + } + rc.mu.RLock() + client = rc.client + rc.mu.RUnlock() + } + + // Create query context with timeout + queryCtx, cancel := context.WithTimeout(ctx, queryTimeout) + defer cancel() + + // Subscribe to the filter + sub, err := client.Subscribe(queryCtx, filter.NewS(f)) + if err != nil { + rc.handleDisconnection() + return nil, err + } + defer sub.Unsub() + + // Collect events until EOSE or timeout + var events []*event.E + + for { + select { + case <-queryCtx.Done(): + return events, nil + case <-sub.EndOfStoredEvents: + return events, nil + case ev := <-sub.Events: + if ev == nil { + return events, nil + } + events = append(events, ev) + } + } +} + +// handleDisconnection marks the connection as disconnected. +func (rc *RelayConnection) handleDisconnection() { + rc.mu.Lock() + defer rc.mu.Unlock() + + rc.connected = false + if rc.client != nil { + rc.client.Close() + rc.client = nil + } +} + +// IsConnected returns whether the relay is currently connected. +func (rc *RelayConnection) IsConnected() bool { + rc.mu.RLock() + defer rc.mu.RUnlock() + + if !rc.connected || rc.client == nil { + return false + } + + // Check if client is still connected + return rc.client.IsConnected() +} + +// Close closes the relay connection. +func (rc *RelayConnection) Close() { + rc.cancel() + + rc.mu.Lock() + defer rc.mu.Unlock() + + rc.connected = false + if rc.client != nil { + rc.client.Close() + rc.client = nil + } +} + +// URL returns the relay URL. +func (rc *RelayConnection) URL() string { + return rc.url +} + +// min returns the smaller of two durations. +func min(a, b time.Duration) time.Duration { + if a < b { + return a + } + return b +} diff --git a/pkg/archive/query_cache.go b/pkg/archive/query_cache.go new file mode 100644 index 0000000..0e08a58 --- /dev/null +++ b/pkg/archive/query_cache.go @@ -0,0 +1,238 @@ +package archive + +import ( + "container/list" + "crypto/sha256" + "encoding/binary" + "encoding/hex" + "sort" + "sync" + "time" + + "git.mleku.dev/mleku/nostr/encoders/filter" +) + +// QueryCache tracks which filters have been queried recently to avoid +// repeated requests to archive relays for the same filter. +type QueryCache struct { + mu sync.RWMutex + entries map[string]*list.Element + order *list.List + maxSize int + ttl time.Duration +} + +// queryCacheEntry holds a cached query fingerprint and timestamp. +type queryCacheEntry struct { + fingerprint string + queriedAt time.Time +} + +// NewQueryCache creates a new query cache. +func NewQueryCache(ttl time.Duration, maxSize int) *QueryCache { + if maxSize <= 0 { + maxSize = 100000 + } + if ttl <= 0 { + ttl = 24 * time.Hour + } + + return &QueryCache{ + entries: make(map[string]*list.Element), + order: list.New(), + maxSize: maxSize, + ttl: ttl, + } +} + +// HasQueried returns true if the filter was queried within the TTL. +func (qc *QueryCache) HasQueried(f *filter.F) bool { + fingerprint := qc.normalizeAndHash(f) + + qc.mu.RLock() + elem, exists := qc.entries[fingerprint] + qc.mu.RUnlock() + + if !exists { + return false + } + + entry := elem.Value.(*queryCacheEntry) + + // Check if still within TTL + if time.Since(entry.queriedAt) > qc.ttl { + // Expired - remove it + qc.mu.Lock() + if elem, exists := qc.entries[fingerprint]; exists { + delete(qc.entries, fingerprint) + qc.order.Remove(elem) + } + qc.mu.Unlock() + return false + } + + return true +} + +// MarkQueried marks a filter as having been queried. +func (qc *QueryCache) MarkQueried(f *filter.F) { + fingerprint := qc.normalizeAndHash(f) + + qc.mu.Lock() + defer qc.mu.Unlock() + + // Update existing entry + if elem, exists := qc.entries[fingerprint]; exists { + qc.order.MoveToFront(elem) + elem.Value.(*queryCacheEntry).queriedAt = time.Now() + return + } + + // Evict oldest if at capacity + if len(qc.entries) >= qc.maxSize { + oldest := qc.order.Back() + if oldest != nil { + entry := oldest.Value.(*queryCacheEntry) + delete(qc.entries, entry.fingerprint) + qc.order.Remove(oldest) + } + } + + // Add new entry + entry := &queryCacheEntry{ + fingerprint: fingerprint, + queriedAt: time.Now(), + } + elem := qc.order.PushFront(entry) + qc.entries[fingerprint] = elem +} + +// normalizeAndHash creates a canonical fingerprint for a filter. +// This ensures that differently-ordered filters with the same content +// produce identical fingerprints. +func (qc *QueryCache) normalizeAndHash(f *filter.F) string { + h := sha256.New() + + // Normalize and hash IDs (sorted) + if f.Ids != nil && f.Ids.Len() > 0 { + ids := make([]string, 0, f.Ids.Len()) + for _, id := range f.Ids.T { + ids = append(ids, string(id)) + } + sort.Strings(ids) + h.Write([]byte("ids:")) + for _, id := range ids { + h.Write([]byte(id)) + } + } + + // Normalize and hash Authors (sorted) + if f.Authors != nil && f.Authors.Len() > 0 { + authors := make([]string, 0, f.Authors.Len()) + for _, author := range f.Authors.T { + authors = append(authors, string(author)) + } + sort.Strings(authors) + h.Write([]byte("authors:")) + for _, a := range authors { + h.Write([]byte(a)) + } + } + + // Normalize and hash Kinds (sorted) + if f.Kinds != nil && f.Kinds.Len() > 0 { + kinds := f.Kinds.ToUint16() + sort.Slice(kinds, func(i, j int) bool { return kinds[i] < kinds[j] }) + h.Write([]byte("kinds:")) + for _, k := range kinds { + var buf [2]byte + binary.BigEndian.PutUint16(buf[:], k) + h.Write(buf[:]) + } + } + + // Normalize and hash Tags (sorted by key, then values) + if f.Tags != nil && f.Tags.Len() > 0 { + // Collect all tag keys and sort them + tagMap := make(map[string][]string) + for _, t := range *f.Tags { + if t.Len() > 0 { + key := string(t.Key()) + values := make([]string, 0, t.Len()-1) + for j := 1; j < t.Len(); j++ { + values = append(values, string(t.T[j])) + } + sort.Strings(values) + tagMap[key] = values + } + } + + // Sort keys and hash + keys := make([]string, 0, len(tagMap)) + for k := range tagMap { + keys = append(keys, k) + } + sort.Strings(keys) + + h.Write([]byte("tags:")) + for _, k := range keys { + h.Write([]byte(k)) + h.Write([]byte(":")) + for _, v := range tagMap[k] { + h.Write([]byte(v)) + } + } + } + + // Hash Since timestamp + if f.Since != nil { + h.Write([]byte("since:")) + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], uint64(f.Since.V)) + h.Write(buf[:]) + } + + // Hash Until timestamp + if f.Until != nil { + h.Write([]byte("until:")) + var buf [8]byte + binary.BigEndian.PutUint64(buf[:], uint64(f.Until.V)) + h.Write(buf[:]) + } + + // Hash Limit + if f.Limit != nil && *f.Limit > 0 { + h.Write([]byte("limit:")) + var buf [4]byte + binary.BigEndian.PutUint32(buf[:], uint32(*f.Limit)) + h.Write(buf[:]) + } + + // Hash Search (NIP-50) + if len(f.Search) > 0 { + h.Write([]byte("search:")) + h.Write(f.Search) + } + + return hex.EncodeToString(h.Sum(nil)) +} + +// Len returns the number of cached queries. +func (qc *QueryCache) Len() int { + qc.mu.RLock() + defer qc.mu.RUnlock() + return len(qc.entries) +} + +// MaxSize returns the maximum cache size. +func (qc *QueryCache) MaxSize() int { + return qc.maxSize +} + +// Clear removes all entries from the cache. +func (qc *QueryCache) Clear() { + qc.mu.Lock() + defer qc.mu.Unlock() + qc.entries = make(map[string]*list.Element) + qc.order.Init() +} diff --git a/pkg/database/access_tracking.go b/pkg/database/access_tracking.go new file mode 100644 index 0000000..04a033d --- /dev/null +++ b/pkg/database/access_tracking.go @@ -0,0 +1,182 @@ +//go:build !(js && wasm) + +package database + +import ( + "encoding/binary" + "sort" + "time" + + "github.com/dgraph-io/badger/v4" +) + +const ( + // accessTrackingPrefix is the key prefix for access tracking records. + // Key format: acc:{8-byte serial} -> {8-byte lastAccessTime}{4-byte accessCount} + accessTrackingPrefix = "acc:" +) + +// RecordEventAccess updates access tracking for an event. +// This increments the access count and updates the last access time. +// The connectionID is currently not used for deduplication in the database layer, +// but is passed for potential future use. Deduplication is handled in the +// higher-level AccessTracker which maintains an in-memory cache. +func (d *D) RecordEventAccess(serial uint64, connectionID string) error { + key := d.accessKey(serial) + + return d.Update(func(txn *badger.Txn) error { + var lastAccess int64 + var accessCount uint32 + + // Try to get existing record + item, err := txn.Get(key) + if err == nil { + err = item.Value(func(val []byte) error { + if len(val) >= 12 { + lastAccess = int64(binary.BigEndian.Uint64(val[0:8])) + accessCount = binary.BigEndian.Uint32(val[8:12]) + } + return nil + }) + if err != nil { + return err + } + } else if err != badger.ErrKeyNotFound { + return err + } + + // Update values + _ = lastAccess // unused in simple increment mode + lastAccess = time.Now().Unix() + accessCount++ + + // Write back + val := make([]byte, 12) + binary.BigEndian.PutUint64(val[0:8], uint64(lastAccess)) + binary.BigEndian.PutUint32(val[8:12], accessCount) + + return txn.Set(key, val) + }) +} + +// GetEventAccessInfo returns access information for an event. +// Returns (0, 0, nil) if the event has never been accessed. +func (d *D) GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) { + key := d.accessKey(serial) + + err = d.View(func(txn *badger.Txn) error { + item, gerr := txn.Get(key) + if gerr != nil { + if gerr == badger.ErrKeyNotFound { + // Not found is not an error - just return zeros + return nil + } + return gerr + } + + return item.Value(func(val []byte) error { + if len(val) >= 12 { + lastAccess = int64(binary.BigEndian.Uint64(val[0:8])) + accessCount = binary.BigEndian.Uint32(val[8:12]) + } + return nil + }) + }) + + return +} + +// accessEntry holds access metadata for sorting +type accessEntry struct { + serial uint64 + lastAccess int64 + count uint32 +} + +// GetLeastAccessedEvents returns event serials sorted by coldness. +// Events with older last access times and lower access counts are returned first. +// limit: maximum number of events to return +// minAgeSec: minimum age in seconds since last access (events accessed more recently are excluded) +func (d *D) GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error) { + cutoffTime := time.Now().Unix() - minAgeSec + + var entries []accessEntry + + err = d.View(func(txn *badger.Txn) error { + prefix := []byte(accessTrackingPrefix) + opts := badger.DefaultIteratorOptions + opts.Prefix = prefix + opts.PrefetchValues = true + it := txn.NewIterator(opts) + defer it.Close() + + for it.Rewind(); it.Valid(); it.Next() { + item := it.Item() + key := item.Key() + + // Extract serial from key (after prefix) + if len(key) <= len(prefix) { + continue + } + serial := binary.BigEndian.Uint64(key[len(prefix):]) + + var lastAccess int64 + var accessCount uint32 + + err := item.Value(func(val []byte) error { + if len(val) >= 12 { + lastAccess = int64(binary.BigEndian.Uint64(val[0:8])) + accessCount = binary.BigEndian.Uint32(val[8:12]) + } + return nil + }) + if err != nil { + continue + } + + // Only include events older than cutoff + if lastAccess < cutoffTime { + entries = append(entries, accessEntry{serial, lastAccess, accessCount}) + } + } + return nil + }) + + if err != nil { + return nil, err + } + + // Sort by coldness score (older + fewer accesses = colder = lower score) + // Score = lastAccess + (accessCount * 3600) + // Lower score = colder = evict first + sort.Slice(entries, func(i, j int) bool { + scoreI := entries[i].lastAccess + int64(entries[i].count)*3600 + scoreJ := entries[j].lastAccess + int64(entries[j].count)*3600 + return scoreI < scoreJ + }) + + // Return up to limit + for i := 0; i < len(entries) && i < limit; i++ { + serials = append(serials, entries[i].serial) + } + + return serials, nil +} + +// accessKey generates the database key for an access tracking record. +func (d *D) accessKey(serial uint64) []byte { + key := make([]byte, len(accessTrackingPrefix)+8) + copy(key, accessTrackingPrefix) + binary.BigEndian.PutUint64(key[len(accessTrackingPrefix):], serial) + return key +} + +// DeleteAccessRecord removes the access tracking record for an event. +// This should be called when an event is deleted. +func (d *D) DeleteAccessRecord(serial uint64) error { + key := d.accessKey(serial) + + return d.Update(func(txn *badger.Txn) error { + return txn.Delete(key) + }) +} diff --git a/pkg/database/interface.go b/pkg/database/interface.go index f841eb2..0220d80 100644 --- a/pkg/database/interface.go +++ b/pkg/database/interface.go @@ -104,6 +104,16 @@ type Database interface { CacheEvents(f *filter.F, events event.S) InvalidateQueryCache() + // Access tracking for storage management (garbage collection based on access patterns) + // RecordEventAccess records an access to an event by a connection. + // The connectionID is used to deduplicate accesses from the same connection. + RecordEventAccess(serial uint64, connectionID string) error + // GetEventAccessInfo returns the last access time and access count for an event. + GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) + // GetLeastAccessedEvents returns event serials sorted by coldness (oldest/lowest access). + // limit: max events to return, minAgeSec: minimum age in seconds since last access. + GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error) + // Utility methods EventIdsBySerial(start uint64, count int) (evs []uint64, err error) } diff --git a/pkg/neo4j/access_tracking.go b/pkg/neo4j/access_tracking.go new file mode 100644 index 0000000..4e165ce --- /dev/null +++ b/pkg/neo4j/access_tracking.go @@ -0,0 +1,94 @@ +package neo4j + +import ( + "context" + "fmt" + "time" +) + +// RecordEventAccess updates access tracking for an event in Neo4j. +// This creates or updates an AccessTrack node for the event. +func (n *N) RecordEventAccess(serial uint64, connectionID string) error { + cypher := ` +MERGE (a:AccessTrack {serial: $serial}) +ON CREATE SET a.lastAccess = $now, a.count = 1 +ON MATCH SET a.lastAccess = $now, a.count = a.count + 1` + + params := map[string]any{ + "serial": int64(serial), // Neo4j uses int64 + "now": time.Now().Unix(), + } + + _, err := n.ExecuteWrite(context.Background(), cypher, params) + if err != nil { + return fmt.Errorf("failed to record event access: %w", err) + } + + return nil +} + +// GetEventAccessInfo returns access information for an event. +func (n *N) GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) { + cypher := "MATCH (a:AccessTrack {serial: $serial}) RETURN a.lastAccess AS lastAccess, a.count AS count" + params := map[string]any{"serial": int64(serial)} + + result, err := n.ExecuteRead(context.Background(), cypher, params) + if err != nil { + return 0, 0, fmt.Errorf("failed to get event access info: %w", err) + } + + ctx := context.Background() + if result.Next(ctx) { + record := result.Record() + if record != nil { + if la, found := record.Get("lastAccess"); found { + if v, ok := la.(int64); ok { + lastAccess = v + } + } + if c, found := record.Get("count"); found { + if v, ok := c.(int64); ok { + accessCount = uint32(v) + } + } + } + } + + return lastAccess, accessCount, nil +} + +// GetLeastAccessedEvents returns event serials sorted by coldness. +func (n *N) GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error) { + cutoffTime := time.Now().Unix() - minAgeSec + + cypher := ` +MATCH (a:AccessTrack) +WHERE a.lastAccess < $cutoff +RETURN a.serial AS serial, a.lastAccess AS lastAccess, a.count AS count +ORDER BY (a.lastAccess + a.count * 3600) ASC +LIMIT $limit` + + params := map[string]any{ + "cutoff": cutoffTime, + "limit": limit, + } + + result, err := n.ExecuteRead(context.Background(), cypher, params) + if err != nil { + return nil, fmt.Errorf("failed to get least accessed events: %w", err) + } + + ctx := context.Background() + for result.Next(ctx) { + record := result.Record() + if record != nil { + if s, found := record.Get("serial"); found { + if v, ok := s.(int64); ok { + serials = append(serials, uint64(v)) + } + } + } + } + + return serials, nil +} diff --git a/pkg/storage/access_tracker.go b/pkg/storage/access_tracker.go new file mode 100644 index 0000000..983e5db --- /dev/null +++ b/pkg/storage/access_tracker.go @@ -0,0 +1,163 @@ +//go:build !windows + +package storage + +import ( + "container/list" + "context" + "sync" + + "lol.mleku.dev/log" +) + +// AccessTrackerDatabase defines the interface for the underlying database +// that stores access tracking information. +type AccessTrackerDatabase interface { + RecordEventAccess(serial uint64, connectionID string) error + GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) + GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error) +} + +// accessKey is the composite key for deduplication: serial + connectionID +type accessKey struct { + Serial uint64 + ConnectionID string +} + +// AccessTracker tracks event access patterns with session deduplication. +// It maintains an in-memory cache to deduplicate accesses from the same +// connection, reducing database writes while ensuring unique session counting. +type AccessTracker struct { + db AccessTrackerDatabase + + // Deduplication cache: tracks which (serial, connectionID) pairs + // have already been recorded in this session window + mu sync.RWMutex + seen map[accessKey]struct{} + seenOrder *list.List // LRU order for eviction + seenElements map[accessKey]*list.Element + maxSeen int // Maximum entries in dedup cache + + // Flush interval for stats + ctx context.Context + cancel context.CancelFunc +} + +// NewAccessTracker creates a new access tracker. +// maxSeenEntries controls the size of the deduplication cache. +func NewAccessTracker(db AccessTrackerDatabase, maxSeenEntries int) *AccessTracker { + if maxSeenEntries <= 0 { + maxSeenEntries = 100000 // Default: 100k entries + } + + ctx, cancel := context.WithCancel(context.Background()) + + return &AccessTracker{ + db: db, + seen: make(map[accessKey]struct{}), + seenOrder: list.New(), + seenElements: make(map[accessKey]*list.Element), + maxSeen: maxSeenEntries, + ctx: ctx, + cancel: cancel, + } +} + +// RecordAccess records an access to an event by a connection. +// Deduplicates accesses from the same connection within the cache window. +// Returns true if this was a new access, false if deduplicated. +func (t *AccessTracker) RecordAccess(serial uint64, connectionID string) (bool, error) { + key := accessKey{Serial: serial, ConnectionID: connectionID} + + t.mu.Lock() + // Check if already seen + if _, exists := t.seen[key]; exists { + // Move to front (most recent) + if elem, ok := t.seenElements[key]; ok { + t.seenOrder.MoveToFront(elem) + } + t.mu.Unlock() + return false, nil // Deduplicated + } + + // Evict oldest if at capacity + if len(t.seen) >= t.maxSeen { + oldest := t.seenOrder.Back() + if oldest != nil { + oldKey := oldest.Value.(accessKey) + delete(t.seen, oldKey) + delete(t.seenElements, oldKey) + t.seenOrder.Remove(oldest) + } + } + + // Add to cache + t.seen[key] = struct{}{} + elem := t.seenOrder.PushFront(key) + t.seenElements[key] = elem + t.mu.Unlock() + + // Record to database + if err := t.db.RecordEventAccess(serial, connectionID); err != nil { + return true, err + } + + return true, nil +} + +// GetAccessInfo returns the access information for an event. +func (t *AccessTracker) GetAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) { + return t.db.GetEventAccessInfo(serial) +} + +// GetColdestEvents returns event serials sorted by coldness. +// limit: max events to return +// minAgeSec: minimum age in seconds since last access +func (t *AccessTracker) GetColdestEvents(limit int, minAgeSec int64) ([]uint64, error) { + return t.db.GetLeastAccessedEvents(limit, minAgeSec) +} + +// ClearConnection removes all dedup entries for a specific connection. +// Call this when a connection closes to free up cache space. +func (t *AccessTracker) ClearConnection(connectionID string) { + t.mu.Lock() + defer t.mu.Unlock() + + // Find and remove all entries for this connection + for key, elem := range t.seenElements { + if key.ConnectionID == connectionID { + delete(t.seen, key) + delete(t.seenElements, key) + t.seenOrder.Remove(elem) + } + } +} + +// Stats returns current cache statistics. +func (t *AccessTracker) Stats() AccessTrackerStats { + t.mu.RLock() + defer t.mu.RUnlock() + + return AccessTrackerStats{ + CachedEntries: len(t.seen), + MaxEntries: t.maxSeen, + } +} + +// AccessTrackerStats holds access tracker statistics. +type AccessTrackerStats struct { + CachedEntries int + MaxEntries int +} + +// Start starts any background goroutines for the tracker. +// Currently a no-op but provided for future use. +func (t *AccessTracker) Start() { + log.I.F("access tracker started with %d max dedup entries", t.maxSeen) +} + +// Stop stops the access tracker and releases resources. +func (t *AccessTracker) Stop() { + t.cancel() + log.I.F("access tracker stopped") +} diff --git a/pkg/storage/gc.go b/pkg/storage/gc.go new file mode 100644 index 0000000..9280589 --- /dev/null +++ b/pkg/storage/gc.go @@ -0,0 +1,278 @@ +//go:build !windows + +package storage + +import ( + "context" + "sync" + "sync/atomic" + "time" + + "lol.mleku.dev/log" + + "next.orly.dev/pkg/database/indexes/types" + "git.mleku.dev/mleku/nostr/encoders/event" +) + +// GCDatabase defines the interface for database operations needed by the GC. +type GCDatabase interface { + Path() string + FetchEventBySerial(ser *types.Uint40) (ev *event.E, err error) + DeleteEventBySerial(ctx context.Context, ser *types.Uint40, ev *event.E) error +} + +// GarbageCollector manages continuous event eviction based on access patterns. +// It monitors storage usage and evicts the least accessed events when the +// storage limit is exceeded. +type GarbageCollector struct { + ctx context.Context + cancel context.CancelFunc + db GCDatabase + tracker *AccessTracker + + // Configuration + dataDir string + maxBytes int64 // 0 = auto-calculate + interval time.Duration + batchSize int + minAgeSec int64 // Minimum age before considering for eviction + + // State + mu sync.Mutex + running bool + evictedCount uint64 + lastRun time.Time +} + +// GCConfig holds configuration for the garbage collector. +type GCConfig struct { + MaxStorageBytes int64 // 0 = auto-calculate (80% of filesystem) + Interval time.Duration // How often to check storage + BatchSize int // Events to consider per GC run + MinAgeSec int64 // Minimum age before eviction (default: 1 hour) +} + +// DefaultGCConfig returns a default GC configuration. +func DefaultGCConfig() GCConfig { + return GCConfig{ + MaxStorageBytes: 0, // Auto-detect + Interval: time.Minute, // Check every minute + BatchSize: 1000, // 1000 events per run + MinAgeSec: 3600, // 1 hour minimum age + } +} + +// NewGarbageCollector creates a new garbage collector. +func NewGarbageCollector( + ctx context.Context, + db GCDatabase, + tracker *AccessTracker, + cfg GCConfig, +) *GarbageCollector { + gcCtx, cancel := context.WithCancel(ctx) + + if cfg.BatchSize <= 0 { + cfg.BatchSize = 1000 + } + if cfg.Interval <= 0 { + cfg.Interval = time.Minute + } + if cfg.MinAgeSec <= 0 { + cfg.MinAgeSec = 3600 // 1 hour + } + + return &GarbageCollector{ + ctx: gcCtx, + cancel: cancel, + db: db, + tracker: tracker, + dataDir: db.Path(), + maxBytes: cfg.MaxStorageBytes, + interval: cfg.Interval, + batchSize: cfg.BatchSize, + minAgeSec: cfg.MinAgeSec, + } +} + +// Start begins the garbage collection loop. +func (gc *GarbageCollector) Start() { + gc.mu.Lock() + if gc.running { + gc.mu.Unlock() + return + } + gc.running = true + gc.mu.Unlock() + + go gc.runLoop() + log.I.F("garbage collector started (interval: %s, batch: %d)", gc.interval, gc.batchSize) +} + +// Stop stops the garbage collector. +func (gc *GarbageCollector) Stop() { + gc.cancel() + gc.mu.Lock() + gc.running = false + gc.mu.Unlock() + log.I.F("garbage collector stopped (total evicted: %d)", atomic.LoadUint64(&gc.evictedCount)) +} + +// runLoop is the main GC loop. +func (gc *GarbageCollector) runLoop() { + ticker := time.NewTicker(gc.interval) + defer ticker.Stop() + + for { + select { + case <-gc.ctx.Done(): + return + case <-ticker.C: + if err := gc.runCycle(); err != nil { + log.W.F("GC cycle error: %v", err) + } + } + } +} + +// runCycle executes one garbage collection cycle. +func (gc *GarbageCollector) runCycle() error { + gc.mu.Lock() + gc.lastRun = time.Now() + gc.mu.Unlock() + + // Check if we need to run GC + shouldRun, currentBytes, maxBytes, err := gc.shouldRunGC() + if err != nil { + return err + } + + if !shouldRun { + return nil + } + + log.D.F("GC triggered: current=%d MB, max=%d MB (%.1f%%)", + currentBytes/(1024*1024), + maxBytes/(1024*1024), + float64(currentBytes)/float64(maxBytes)*100) + + // Get coldest events + serials, err := gc.tracker.GetColdestEvents(gc.batchSize, gc.minAgeSec) + if err != nil { + return err + } + + if len(serials) == 0 { + log.D.F("GC: no events eligible for eviction") + return nil + } + + // Evict events + evicted, err := gc.evictEvents(serials) + if err != nil { + return err + } + + atomic.AddUint64(&gc.evictedCount, uint64(evicted)) + log.I.F("GC: evicted %d events (total: %d)", evicted, atomic.LoadUint64(&gc.evictedCount)) + + return nil +} + +// shouldRunGC checks if storage limit is exceeded. +func (gc *GarbageCollector) shouldRunGC() (bool, int64, int64, error) { + // Calculate max storage (dynamic based on filesystem) + maxBytes, err := CalculateMaxStorage(gc.dataDir, gc.maxBytes) + if err != nil { + return false, 0, 0, err + } + + // Get current usage + currentBytes, err := GetCurrentStorageUsage(gc.dataDir) + if err != nil { + return false, 0, 0, err + } + + return currentBytes > maxBytes, currentBytes, maxBytes, nil +} + +// evictEvents evicts the specified events from the database. +func (gc *GarbageCollector) evictEvents(serials []uint64) (int, error) { + evicted := 0 + + for _, serial := range serials { + // Check context for cancellation + select { + case <-gc.ctx.Done(): + return evicted, gc.ctx.Err() + default: + } + + // Convert serial to Uint40 + ser := &types.Uint40{} + if err := ser.Set(serial); err != nil { + log.D.F("GC: invalid serial %d: %v", serial, err) + continue + } + + // Fetch the event + ev, err := gc.db.FetchEventBySerial(ser) + if err != nil { + log.D.F("GC: failed to fetch event %d: %v", serial, err) + continue + } + if ev == nil { + continue // Already deleted + } + + // Delete the event + if err := gc.db.DeleteEventBySerial(gc.ctx, ser, ev); err != nil { + log.D.F("GC: failed to delete event %d: %v", serial, err) + continue + } + + evicted++ + + // Rate limit to avoid overwhelming the database + if evicted%100 == 0 { + time.Sleep(10 * time.Millisecond) + } + } + + return evicted, nil +} + +// Stats returns current GC statistics. +func (gc *GarbageCollector) Stats() GCStats { + gc.mu.Lock() + lastRun := gc.lastRun + running := gc.running + gc.mu.Unlock() + + // Get storage info + currentBytes, _ := GetCurrentStorageUsage(gc.dataDir) + maxBytes, _ := CalculateMaxStorage(gc.dataDir, gc.maxBytes) + + var percentage float64 + if maxBytes > 0 { + percentage = float64(currentBytes) / float64(maxBytes) * 100 + } + + return GCStats{ + Running: running, + LastRunTime: lastRun, + TotalEvicted: atomic.LoadUint64(&gc.evictedCount), + CurrentStorageBytes: currentBytes, + MaxStorageBytes: maxBytes, + StoragePercentage: percentage, + } +} + +// GCStats holds garbage collector statistics. +type GCStats struct { + Running bool + LastRunTime time.Time + TotalEvicted uint64 + CurrentStorageBytes int64 + MaxStorageBytes int64 + StoragePercentage float64 +} diff --git a/pkg/storage/limits.go b/pkg/storage/limits.go new file mode 100644 index 0000000..5e90bf2 --- /dev/null +++ b/pkg/storage/limits.go @@ -0,0 +1,65 @@ +//go:build !windows + +// Package storage provides storage management functionality including filesystem +// space detection, access tracking for events, and garbage collection based on +// access patterns. +package storage + +import ( + "syscall" +) + +// FilesystemStats holds information about filesystem space usage. +type FilesystemStats struct { + Total uint64 // Total bytes on filesystem + Available uint64 // Available bytes (for unprivileged users) + Used uint64 // Used bytes +} + +// GetFilesystemStats returns filesystem space information for the given path. +// The path should be a directory within the filesystem to check. +func GetFilesystemStats(path string) (stats FilesystemStats, err error) { + var stat syscall.Statfs_t + if err = syscall.Statfs(path, &stat); err != nil { + return + } + stats.Total = stat.Blocks * uint64(stat.Bsize) + stats.Available = stat.Bavail * uint64(stat.Bsize) + stats.Used = stats.Total - stats.Available + return +} + +// CalculateMaxStorage calculates the maximum storage limit for the relay. +// If configuredMax > 0, it returns that value directly. +// Otherwise, it returns 80% of the available filesystem space. +func CalculateMaxStorage(dataDir string, configuredMax int64) (int64, error) { + if configuredMax > 0 { + return configuredMax, nil + } + + stats, err := GetFilesystemStats(dataDir) + if err != nil { + return 0, err + } + + // Return 80% of available space + maxBytes := int64(float64(stats.Available) * 0.8) + + // Also ensure we don't exceed 80% of total filesystem + maxTotal := int64(float64(stats.Total) * 0.8) + if maxBytes > maxTotal { + maxBytes = maxTotal + } + + return maxBytes, nil +} + +// GetCurrentStorageUsage calculates the current storage usage of the data directory. +// This is an approximation based on filesystem stats for the given path. +func GetCurrentStorageUsage(dataDir string) (int64, error) { + stats, err := GetFilesystemStats(dataDir) + if err != nil { + return 0, err + } + return int64(stats.Used), err +} diff --git a/pkg/storage/limits_windows.go b/pkg/storage/limits_windows.go new file mode 100644 index 0000000..5bb0a7e --- /dev/null +++ b/pkg/storage/limits_windows.go @@ -0,0 +1,38 @@ +//go:build windows + +package storage + +import ( + "errors" +) + +// FilesystemStats holds information about filesystem space usage. +type FilesystemStats struct { + Total uint64 // Total bytes on filesystem + Available uint64 // Available bytes (for unprivileged users) + Used uint64 // Used bytes +} + +// GetFilesystemStats returns filesystem space information for the given path. +// Windows implementation is not yet supported. +func GetFilesystemStats(path string) (stats FilesystemStats, err error) { + // TODO: Implement using syscall.GetDiskFreeSpaceEx + err = errors.New("filesystem stats not implemented on Windows") + return +} + +// CalculateMaxStorage calculates the maximum storage limit for the relay. +// If configuredMax > 0, it returns that value directly. +// Windows auto-detection is not yet supported. +func CalculateMaxStorage(dataDir string, configuredMax int64) (int64, error) { + if configuredMax > 0 { + return configuredMax, nil + } + return 0, errors.New("auto-detect storage limit not implemented on Windows; set ORLY_MAX_STORAGE_BYTES manually") +} + +// GetCurrentStorageUsage calculates the current storage usage of the data directory. +// Windows implementation is not yet supported. +func GetCurrentStorageUsage(dataDir string) (int64, error) { + return 0, errors.New("storage usage detection not implemented on Windows") +} diff --git a/pkg/version/version b/pkg/version/version index 2cb781d..67d3cf4 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.44.7 +v0.45.0 diff --git a/pkg/wasmdb/access_tracking.go b/pkg/wasmdb/access_tracking.go new file mode 100644 index 0000000..701eab0 --- /dev/null +++ b/pkg/wasmdb/access_tracking.go @@ -0,0 +1,24 @@ +//go:build js && wasm + +package wasmdb + +// RecordEventAccess is a stub for WasmDB. +// Access tracking is not implemented for the WebAssembly backend as it's +// primarily used for client-side applications where storage management +// is handled differently. +func (w *W) RecordEventAccess(serial uint64, connectionID string) error { + // No-op for WasmDB + return nil +} + +// GetEventAccessInfo is a stub for WasmDB. +func (w *W) GetEventAccessInfo(serial uint64) (lastAccess int64, accessCount uint32, err error) { + // No-op for WasmDB - return zeros + return 0, 0, nil +} + +// GetLeastAccessedEvents is a stub for WasmDB. +func (w *W) GetLeastAccessedEvents(limit int, minAgeSec int64) (serials []uint64, err error) { + // No-op for WasmDB - return empty slice + return nil, nil +}