From 0d3913eceaa35919df3c6e4d48a3e7256d723c70 Mon Sep 17 00:00:00 2001 From: woikos Date: Thu, 22 Jan 2026 17:48:18 +0100 Subject: [PATCH] Add filter support to negentropy sync (v0.55.3) Adds ORLY_SYNC_NEGENTROPY_FILTER env var for specifying a JSON-encoded nostr filter for selective sync. The filter is used both when building local storage for comparison and in the NEG-OPEN message sent to peers. Example usage: ORLY_SYNC_NEGENTROPY_FILTER='{"kinds":[1,6,7,30023]}' ORLY_SYNC_NEGENTROPY_FILTER='{"authors":["abc123..."],"since":1700000000}' This allows syncing only specific event types between relays instead of the full event set. Files modified: - cmd/orly-sync-negentropy/config.go: Add FilterJSON config and parsing - cmd/orly-sync-negentropy/main.go: Pass filter to manager config - pkg/sync/negentropy/manager.go: Add Filter to Config, use in buildStorage and NEG-OPEN - pkg/version/version: Bump to v0.55.3 Co-Authored-By: Claude Opus 4.5 --- cmd/orly-sync-negentropy/config.go | 20 ++++++++ cmd/orly-sync-negentropy/main.go | 1 + pkg/sync/negentropy/manager.go | 75 +++++++++++++++++++++++++++--- pkg/version/version | 2 +- 4 files changed, 91 insertions(+), 7 deletions(-) diff --git a/cmd/orly-sync-negentropy/config.go b/cmd/orly-sync-negentropy/config.go index 6878081..e0ebe3f 100644 --- a/cmd/orly-sync-negentropy/config.go +++ b/cmd/orly-sync-negentropy/config.go @@ -1,6 +1,7 @@ package main import ( + "encoding/json" "os" "strings" "time" @@ -8,6 +9,8 @@ import ( "go-simpler.org/env" "lol.mleku.dev/chk" "lol.mleku.dev/log" + + "git.mleku.dev/mleku/nostr/encoders/filter" ) // Config holds the negentropy sync service configuration. @@ -29,11 +32,18 @@ type Config struct { FrameSize int `env:"ORLY_SYNC_NEGENTROPY_FRAME_SIZE" default:"4096" usage:"negentropy frame size"` IDSize int `env:"ORLY_SYNC_NEGENTROPY_ID_SIZE" default:"16" usage:"negentropy ID truncation size"` + // Filter configuration - JSON encoded nostr filter + // Example: {"kinds":[1,6,7],"authors":["abc123..."]} + FilterJSON string `env:"ORLY_SYNC_NEGENTROPY_FILTER" usage:"JSON-encoded nostr filter for selective sync"` + // Client session configuration ClientSessionTimeout time.Duration `env:"ORLY_SYNC_NEGENTROPY_SESSION_TIMEOUT" default:"5m" usage:"client session timeout"` // Parsed peers Peers []string + + // Parsed filter + Filter *filter.F } // loadConfig loads configuration from environment variables. @@ -52,5 +62,15 @@ func loadConfig() *Config { } } + // Parse filter from JSON if provided + if cfg.FilterJSON != "" { + cfg.Filter = &filter.F{} + if err := json.Unmarshal([]byte(cfg.FilterJSON), cfg.Filter); err != nil { + log.E.F("failed to parse filter JSON: %v", err) + os.Exit(1) + } + log.I.F("using sync filter: %s", cfg.FilterJSON) + } + return cfg } diff --git a/cmd/orly-sync-negentropy/main.go b/cmd/orly-sync-negentropy/main.go index 8a4e411..a48b890 100644 --- a/cmd/orly-sync-negentropy/main.go +++ b/cmd/orly-sync-negentropy/main.go @@ -69,6 +69,7 @@ func main() { FrameSize: cfg.FrameSize, IDSize: cfg.IDSize, ClientSessionTimeout: cfg.ClientSessionTimeout, + Filter: cfg.Filter, } negentropyMgr := negentropy.NewManager(db, mgrConfig) diff --git a/pkg/sync/negentropy/manager.go b/pkg/sync/negentropy/manager.go index 5c65082..63f7a9a 100644 --- a/pkg/sync/negentropy/manager.go +++ b/pkg/sync/negentropy/manager.go @@ -62,6 +62,7 @@ type Config struct { FrameSize int IDSize int ClientSessionTimeout time.Duration + Filter *filter.F // Optional filter for selective sync } // Manager handles negentropy sync operations. @@ -248,8 +249,9 @@ func (m *Manager) performNegentropy(ctx context.Context, peerURL string) (int64, } // Send NEG-OPEN: ["NEG-OPEN", subscription_id, filter, initial_message] - filter := map[string]any{} // Empty filter = all events - negOpen := []any{"NEG-OPEN", subID, filter, hex.EncodeToString(initialMsg)} + // Use configured filter or empty filter for all events + negFilter := m.filterToMap() + negOpen := []any{"NEG-OPEN", subID, negFilter, hex.EncodeToString(initialMsg)} if err := conn.WriteJSON(negOpen); err != nil { return 0, fmt.Errorf("failed to send NEG-OPEN: %w", err) } @@ -359,11 +361,17 @@ done: func (m *Manager) buildStorage(ctx context.Context) (*negentropy.Vector, error) { storage := negentropy.NewVector() - // Query all events from database using an empty filter - // This returns IdPkTs structs with Id, Pub, Ts, Ser fields + // Build filter - start with configured filter or empty limit := uint(100000) - f := &filter.F{ - Limit: &limit, // Limit to 100k events for now + var f *filter.F + if m.config.Filter != nil { + // Use configured filter with our limit + f = m.config.Filter + f.Limit = &limit + } else { + f = &filter.F{ + Limit: &limit, + } } idPkTs, err := m.db.QueryForIds(ctx, f) @@ -380,6 +388,61 @@ func (m *Manager) buildStorage(ctx context.Context) (*negentropy.Vector, error) return storage, nil } +// filterToMap converts the configured filter to a map for NEG-OPEN message. +func (m *Manager) filterToMap() map[string]any { + result := map[string]any{} + + if m.config.Filter == nil { + return result + } + + f := m.config.Filter + + // Add kinds if present + if f.Kinds != nil && f.Kinds.Len() > 0 { + kinds := make([]int, 0, f.Kinds.Len()) + for _, k := range f.Kinds.K { + kinds = append(kinds, k.ToInt()) + } + result["kinds"] = kinds + } + + // Add authors if present + if f.Authors != nil && f.Authors.Len() > 0 { + authors := make([]string, 0, f.Authors.Len()) + for _, a := range f.Authors.T { + authors = append(authors, hex.EncodeToString(a)) + } + result["authors"] = authors + } + + // Add IDs if present + if f.Ids != nil && f.Ids.Len() > 0 { + ids := make([]string, 0, f.Ids.Len()) + for _, id := range f.Ids.T { + ids = append(ids, hex.EncodeToString(id)) + } + result["ids"] = ids + } + + // Add since if present + if f.Since != nil && f.Since.V != 0 { + result["since"] = f.Since.V + } + + // Add until if present + if f.Until != nil && f.Until.V != 0 { + result["until"] = f.Until.V + } + + // Add limit if present + if f.Limit != nil && *f.Limit > 0 { + result["limit"] = *f.Limit + } + + return result +} + // pushEventsToPeer sends events we have to the peer. // The truncated IDs are 32-char hex prefixes, so we query our local DB and push matching events. func (m *Manager) pushEventsToPeer(ctx context.Context, conn *websocket.Conn, truncatedIDs []string) (int, error) { diff --git a/pkg/version/version b/pkg/version/version index 7e20366..e0a689f 100644 --- a/pkg/version/version +++ b/pkg/version/version @@ -1 +1 @@ -v0.55.2 +v0.55.3