From a5b7e75395cd4f43223fba0dbad309b0accf9dc2 Mon Sep 17 00:00:00 2001 From: woikos Date: Wed, 28 Jan 2026 19:31:48 +0100 Subject: [PATCH] Fix NIP-77 negentropy protocol implementation and add client mode sync NIP-77 Protocol Fixes: - Fix role reversal in HandleNegOpen: relay now correctly requires initial message from client and calls Reconcile() instead of Start() - Fix premature event transmission: haveIDs events now only sent when reconciliation completes (complete=true) - Add proper error handling for missing initial message in NEG-OPEN Sync Client Mode (gRPC IPC): - Add --server flag to 'orly sync' for connecting to running service - Enable one-shot sync without direct database access - Support remote sync operations via gRPC - Add filterToProto helper for gRPC communication Documentation: - Add NIP77_ANALYSIS_AND_FIX.md with detailed protocol analysis - Add SYNC_CLIENT_MODE.md with usage examples and architecture diagrams Bug Fix: - Fix nil pointer panic in handle-req.go when subscriptions map is uninitialized --- app/handle-negentropy.go | 121 +++++++++--- app/handle-req.go | 3 + cmd/orly/sync/sync.go | 204 ++++++++++++++++++-- docs/NIP77_ANALYSIS_AND_FIX.md | 263 ++++++++++++++++++++++++++ docs/SYNC_CLIENT_MODE.md | 247 ++++++++++++++++++++++++ pkg/sync/negentropy/server/service.go | 44 ++--- 6 files changed, 818 insertions(+), 64 deletions(-) create mode 100644 docs/NIP77_ANALYSIS_AND_FIX.md create mode 100644 docs/SYNC_CLIENT_MODE.md diff --git a/app/handle-negentropy.go b/app/handle-negentropy.go index 6b866a4..3822084 100644 --- a/app/handle-negentropy.go +++ b/app/handle-negentropy.go @@ -12,7 +12,9 @@ import ( "git.mleku.dev/mleku/nostr/encoders/envelopes/eventenvelope" "git.mleku.dev/mleku/nostr/encoders/filter" + "git.mleku.dev/mleku/nostr/encoders/kind" "git.mleku.dev/mleku/nostr/encoders/tag" + "git.mleku.dev/mleku/nostr/encoders/timestamp" negentropyiface "next.orly.dev/pkg/interfaces/negentropy" commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1" ) @@ -84,9 +86,10 @@ func (l *Listener) HandleNegOpen(msg []byte) error { return l.sendNegErr("", fmt.Sprintf("invalid subscription_id: %v", err)) } - // Extract filter - var f filter.F - if err := json.Unmarshal(parts[2], &f); err != nil { + // Extract filter - use custom parsing because filter.F's kinds field + // doesn't support standard JSON array unmarshaling + f, err := parseNegentropyFilter(parts[2]) + if err != nil { return l.sendNegErr(subscriptionID, fmt.Sprintf("invalid filter: %v", err)) } @@ -105,7 +108,7 @@ func (l *Listener) HandleNegOpen(msg []byte) error { } // Convert filter to proto format - protoFilter := filterToProto(&f) + protoFilter := filterToProto(f) // Call gRPC service ctx := context.Background() @@ -127,7 +130,7 @@ func (l *Listener) HandleNegOpen(msg []byte) error { // Log need_ids (events client should send us) if len(needIDs) > 0 { - log.D.F("NEG-OPEN: client needs to send %d events", len(needIDs)) + log.D.F("NEG-OPEN: relay needs %d events from client", len(needIDs)) } // Send NEG-MSG response FIRST (before events) @@ -135,15 +138,14 @@ func (l *Listener) HandleNegOpen(msg []byte) error { return err } - // If reconciliation is complete, log it + // If reconciliation is complete, send events we have that client needs. + // Per NIP-77: The haves/needs are only final when reconcile returns complete=true. if complete { - log.D.F("NEG-OPEN: reconciliation complete for %s", subscriptionID) - } - - // AFTER sending response, send events for IDs we have that client needs - if len(haveIDs) > 0 { - if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil { - log.E.F("failed to send events for NEG-OPEN: %v", err) + log.D.F("NEG-OPEN: reconciliation complete for %s, sending %d events", subscriptionID, len(haveIDs)) + if len(haveIDs) > 0 { + if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil { + log.E.F("failed to send events for NEG-OPEN: %v", err) + } } } @@ -204,7 +206,7 @@ func (l *Listener) HandleNegMsg(msg []byte) error { // Log need_ids (events client should send us) if len(needIDs) > 0 { - log.D.F("NEG-MSG: client needs to send %d events", len(needIDs)) + log.D.F("NEG-MSG: relay needs %d events from client", len(needIDs)) } // Send NEG-MSG response FIRST (before events) @@ -212,15 +214,14 @@ func (l *Listener) HandleNegMsg(msg []byte) error { return err } - // If reconciliation is complete, log it + // If reconciliation is complete, send events we have that client needs. + // Per NIP-77: The haves/needs are only final when reconcile returns complete=true. if complete { - log.D.F("NEG-MSG: reconciliation complete for %s", subscriptionID) - } - - // AFTER sending response, send events for IDs we have that client needs - if len(haveIDs) > 0 { - if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil { - log.E.F("failed to send events for NEG-MSG: %v", err) + log.D.F("NEG-MSG: reconciliation complete for %s, sending %d events", subscriptionID, len(haveIDs)) + if len(haveIDs) > 0 { + if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil { + log.E.F("failed to send events for NEG-MSG: %v", err) + } } } @@ -387,6 +388,82 @@ func filterToProto(f *filter.F) *commonv1.Filter { return pf } +// parseNegentropyFilter parses a NIP-01 filter from JSON. +// This is needed because filter.F uses kind.S which doesn't implement +// json.Unmarshaler, so we parse manually and construct the filter. +func parseNegentropyFilter(data []byte) (*filter.F, error) { + // Parse into a generic map first + var raw map[string]json.RawMessage + if err := json.Unmarshal(data, &raw); err != nil { + return nil, err + } + + f := filter.New() + + // Parse kinds array + if kindsRaw, ok := raw["kinds"]; ok { + var kinds []int + if err := json.Unmarshal(kindsRaw, &kinds); err != nil { + return nil, fmt.Errorf("invalid kinds: %v", err) + } + f.Kinds = kind.FromIntSlice(kinds) + } + + // Parse authors array (hex pubkeys) + if authorsRaw, ok := raw["authors"]; ok { + var authors []string + if err := json.Unmarshal(authorsRaw, &authors); err != nil { + return nil, fmt.Errorf("invalid authors: %v", err) + } + f.Authors = tag.NewWithCap(len(authors)) + for _, a := range authors { + if decoded, err := hex.DecodeString(a); err == nil { + f.Authors.T = append(f.Authors.T, decoded) + } + } + } + + // Parse ids array (hex event IDs) + if idsRaw, ok := raw["ids"]; ok { + var ids []string + if err := json.Unmarshal(idsRaw, &ids); err != nil { + return nil, fmt.Errorf("invalid ids: %v", err) + } + f.Ids = tag.NewWithCap(len(ids)) + for _, id := range ids { + if decoded, err := hex.DecodeString(id); err == nil { + f.Ids.T = append(f.Ids.T, decoded) + } + } + } + + // Parse since timestamp + if sinceRaw, ok := raw["since"]; ok { + var since int64 + if err := json.Unmarshal(sinceRaw, &since); err == nil { + f.Since = timestamp.FromUnix(since) + } + } + + // Parse until timestamp + if untilRaw, ok := raw["until"]; ok { + var until int64 + if err := json.Unmarshal(untilRaw, &until); err == nil { + f.Until = timestamp.FromUnix(until) + } + } + + // Parse limit + if limitRaw, ok := raw["limit"]; ok { + var limit uint + if err := json.Unmarshal(limitRaw, &limit); err == nil { + f.Limit = &limit + } + } + + return f, nil +} + // CloseAllNegentropySessions closes all negentropy sessions for a connection // Called when a WebSocket connection is closed func (l *Listener) CloseAllNegentropySessions() { diff --git a/app/handle-req.go b/app/handle-req.go index 723a08e..bfbc9a5 100644 --- a/app/handle-req.go +++ b/app/handle-req.go @@ -820,6 +820,9 @@ func (l *Listener) HandleReq(msg []byte) (err error) { // Track this subscription so we can cancel it on CLOSE or connection close subID := string(env.Subscription) l.subscriptionsMu.Lock() + if l.subscriptions == nil { + l.subscriptions = make(map[string]context.CancelFunc) + } l.subscriptions[subID] = subCancel l.subscriptionsMu.Unlock() diff --git a/cmd/orly/sync/sync.go b/cmd/orly/sync/sync.go index 9c7f681..419dbc9 100644 --- a/cmd/orly/sync/sync.go +++ b/cmd/orly/sync/sync.go @@ -2,6 +2,13 @@ // Package sync implements the "orly sync" subcommand for sync service operations. // Supports both one-shot CLI sync (like strfry sync) and running as a gRPC service. +// +// Two modes of operation: +// 1. Direct mode: Opens database directly (like strfry sync) +// 2. Client mode: Connects to running orly-sync-negentropy service via gRPC +// +// Client mode allows you to use a running ORLY relay's database without +// needing direct filesystem access to the database files. package sync import ( @@ -22,6 +29,8 @@ import ( "next.orly.dev/pkg/database" pkgsync "next.orly.dev/pkg/sync" "next.orly.dev/pkg/sync/negentropy" + negentropygrpc "next.orly.dev/pkg/sync/negentropy/grpc" + commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1" ) // SyncDirection specifies which direction to sync @@ -35,11 +44,14 @@ const ( // CLISyncConfig holds configuration for one-shot CLI sync type CLISyncConfig struct { - RelayURL string - Filter *filter.F - Direction SyncDirection - DataDir string - Verbose bool + RelayURL string + Filter *filter.F + Direction SyncDirection + DataDir string + Verbose bool + // Client mode: connect to running service instead of direct DB access + ServerAddress string // gRPC address of orly-sync-negentropy service + UseClientMode bool // If true, use gRPC client instead of direct DB } // Run executes the sync subcommand. @@ -52,6 +64,7 @@ func Run(args []string) { var direction string = "down" var dataDir string var verbose bool + var serverAddress string // Parse arguments - look for either service mode (--driver) or CLI mode (relay URL) positionalArgs := []string{} @@ -78,6 +91,11 @@ func Run(args []string) { } else if arg == "--data-dir" && i+1 < len(args) { dataDir = args[i+1] i++ + } else if strings.HasPrefix(arg, "--server=") { + serverAddress = strings.TrimPrefix(arg, "--server=") + } else if arg == "--server" && i+1 < len(args) { + serverAddress = args[i+1] + i++ } else if arg == "--list-drivers" || arg == "-l" { listDrivers = true } else if arg == "--verbose" || arg == "-v" { @@ -111,13 +129,20 @@ func Run(args []string) { // Check if this is CLI sync mode (relay URL provided) if len(positionalArgs) > 0 && (strings.HasPrefix(positionalArgs[0], "ws://") || strings.HasPrefix(positionalArgs[0], "wss://")) { relayURL = positionalArgs[0] - runCLISync(&CLISyncConfig{ - RelayURL: relayURL, - Filter: parseFilterJSON(filterJSON), - Direction: SyncDirection(direction), - DataDir: dataDir, - Verbose: verbose, - }) + cfg := &CLISyncConfig{ + RelayURL: relayURL, + Filter: parseFilterJSON(filterJSON), + Direction: SyncDirection(direction), + DataDir: dataDir, + Verbose: verbose, + ServerAddress: serverAddress, + UseClientMode: serverAddress != "", + } + if cfg.UseClientMode { + runClientModeSync(cfg) + } else { + runDirectModeSync(cfg) + } return } @@ -194,8 +219,9 @@ func parseFilterJSON(jsonStr string) *filter.F { return f } -// runCLISync performs a one-shot negentropy sync with a remote relay -func runCLISync(cfg *CLISyncConfig) { +// runDirectModeSync performs a one-shot negentropy sync with a remote relay +// using direct database access (like strfry sync) +func runDirectModeSync(cfg *CLISyncConfig) { if cfg.Verbose { log.I.F("CLI sync starting with %s (direction: %s)", cfg.RelayURL, cfg.Direction) } @@ -267,6 +293,135 @@ func runCLISync(cfg *CLISyncConfig) { } } +// runClientModeSync performs a one-shot negentropy sync with a remote relay +// by connecting to a running orly-sync-negentropy service via gRPC. +// This allows you to sync using a relay's database without direct filesystem access. +func runClientModeSync(cfg *CLISyncConfig) { + if cfg.Verbose { + log.I.F("Client mode sync starting with %s via server %s (direction: %s)", + cfg.RelayURL, cfg.ServerAddress, cfg.Direction) + } + + // Set up signal handling + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigChan + fmt.Println("\nInterrupted, closing...") + cancel() + }() + + // Connect to the sync service via gRPC + fmt.Printf("Connecting to sync service at %s...\n", cfg.ServerAddress) + client, err := negentropygrpc.New(ctx, &negentropygrpc.ClientConfig{ + ServerAddress: cfg.ServerAddress, + ConnectTimeout: 30 * time.Second, + }) + if err != nil { + fmt.Fprintf(os.Stderr, "error: failed to connect to sync service: %v\n", err) + os.Exit(1) + } + defer client.Close() + + // Wait for the service to be ready + select { + case <-client.Ready(): + fmt.Println("Connected to sync service") + case <-time.After(30 * time.Second): + fmt.Fprintf(os.Stderr, "error: sync service not ready\n") + os.Exit(1) + } + + // Convert filter to proto format + var protoFilter *commonv1.Filter + if cfg.Filter != nil { + protoFilter = filterToProto(cfg.Filter) + } + + // Start sync with the peer + startTime := time.Now() + fmt.Printf("Requesting sync with %s...\n", cfg.RelayURL) + + progressCh, err := client.SyncWithPeer(ctx, cfg.RelayURL, protoFilter, 0) + if err != nil { + fmt.Fprintf(os.Stderr, "error: failed to start sync: %v\n", err) + os.Exit(1) + } + + // Display progress updates + var lastProgress *negentropygrpc.SyncProgress + for progress := range progressCh { + lastProgress = progress + if progress.Error != "" { + fmt.Fprintf(os.Stderr, "Sync error: %s\n", progress.Error) + os.Exit(1) + } + if cfg.Verbose || progress.Complete { + fmt.Printf("Round %d: have=%d need=%d fetched=%d sent=%d\n", + progress.Round, progress.HaveCount, progress.NeedCount, + progress.FetchedCount, progress.SentCount) + } + } + + elapsed := time.Since(startTime) + if lastProgress != nil && lastProgress.Complete { + fmt.Printf("Sync complete in %v\n", elapsed) + } else { + fmt.Printf("Sync finished in %v (may be incomplete)\n", elapsed) + } +} + +// filterToProto converts a nostr filter to proto format +func filterToProto(f *filter.F) *commonv1.Filter { + if f == nil { + return nil + } + + pf := &commonv1.Filter{} + + // Convert kinds + if f.Kinds != nil && f.Kinds.Len() > 0 { + for _, k := range f.Kinds.ToUint16() { + pf.Kinds = append(pf.Kinds, uint32(k)) + } + } + + // Convert authors (binary to bytes) + if f.Authors != nil && f.Authors.Len() > 0 { + for _, author := range f.Authors.T { + pf.Authors = append(pf.Authors, author) + } + } + + // Convert IDs (binary to bytes) + if f.Ids != nil && f.Ids.Len() > 0 { + for _, id := range f.Ids.T { + pf.Ids = append(pf.Ids, id) + } + } + + // Convert timestamps + if f.Since != nil { + since := f.Since.V + pf.Since = &since + } + if f.Until != nil { + until := f.Until.V + pf.Until = &until + } + + // Convert limit + if f.Limit != nil { + limit := uint32(*f.Limit) + pf.Limit = &limit + } + + return pf +} + func runSyncService(driver string, args []string) { log.I.F("Sync service with driver=%s not yet implemented via unified binary", driver) log.I.F("Use the standalone binary: orly-sync-%s", driver) @@ -277,13 +432,15 @@ func printSyncHelp() { fmt.Println(`orly sync - Sync operations (NIP-77 negentropy) Usage: - orly sync [options] One-shot sync with relay (like strfry sync) - orly sync --driver=NAME [options] Run as sync service + orly sync [options] One-shot sync with relay + orly sync --server=ADDR Sync via running service + orly sync --driver=NAME [options] Run as sync service One-shot sync options: --filter=JSON Nostr filter JSON (e.g. '{"kinds": [0, 3, 1984]}') --dir=DIRECTION Sync direction: down, up, both (default: down) --data-dir=PATH Database directory (default: ~/.local/share/ORLY) + --server=ADDR Connect to running sync service (e.g. 127.0.0.1:50064) --verbose, -v Verbose output Service mode options: @@ -306,11 +463,22 @@ Environment variables: ORLY_SYNC_TARGET_RELAYS Comma-separated target relay URLs Examples: - # One-shot sync (like strfry sync) + # One-shot sync with local database (like strfry sync) orly sync wss://relay.example.com --filter '{"kinds": [0, 3, 1984]}' --dir down orly sync wss://wot.grapevine.network --filter '{"kinds": [3, 1984, 10000]}' --dir down + # Sync via running ORLY service (no direct DB access needed) + orly sync wss://relay.example.com --server 127.0.0.1:50064 --filter '{"kinds": [1]}' --dir both + # Service mode orly sync --driver=negentropy Run negentropy sync service - orly sync --list-drivers List available drivers`) + orly sync --list-drivers List available drivers + +Client mode (--server): + When using --server, the CLI connects to a running orly-sync-negentropy + service via gRPC instead of opening the database directly. This allows + you to sync from a remote machine without filesystem access to the DB. + + The server address is typically 127.0.0.1:50064 (or as configured in + ORLY_LAUNCHER_SYNC_NEGENTROPY_LISTEN).`) } diff --git a/docs/NIP77_ANALYSIS_AND_FIX.md b/docs/NIP77_ANALYSIS_AND_FIX.md new file mode 100644 index 0000000..6748a15 --- /dev/null +++ b/docs/NIP77_ANALYSIS_AND_FIX.md @@ -0,0 +1,263 @@ +# NIP-77 Implementation Analysis and Fix + +## Executive Summary + +This document analyzes the NIP-77 negentropy implementation in ORLY and documents fixes made to ensure compatibility with strfry and other NIP-77 compliant implementations. + +## Key Issues Fixed + +### Issue 1: Role Reversal in NEG-OPEN Handling (CRITICAL) + +**Location:** `pkg/sync/negentropy/server/service.go:HandleNegOpen()` + +**Problem:** The relay was incorrectly calling `neg.Start()` when no initial message was provided, which violates the NIP-77 protocol specification. + +**NIP-77 Specification:** +- The **client (initiator)** creates a Negentropy object, adds all events, seals it, and calls `initiate()` to create the initial message +- The **relay (responder)** creates a Negentropy object, adds all events, seals it, and calls `reconcile()` with the client's initial message + +**Incorrect Code:** +```go +if len(req.InitialMessage) > 0 { + respMsg, complete, err = neg.Reconcile(req.InitialMessage) +} else { + // WRONG: Relay should never call Start() - only the initiator does this + respMsg, err = neg.Start() +} +``` + +**Fixed Code:** +```go +// The client (initiator) MUST provide an initial message in NEG-OPEN. +if len(req.InitialMessage) == 0 { + return &negentropyv1.NegOpenResponse{ + Error: "blocked: NEG-OPEN requires initialMessage from initiator", + }, nil +} + +// Process the client's initial message +respMsg, complete, err := neg.Reconcile(req.InitialMessage) +``` + +### Issue 2: Premature Event Transmission + +**Location:** `app/handle-negentropy.go:HandleNegOpen()` and `HandleNegMsg()` + +**Problem:** Events for `haveIDs` were being sent in every round of reconciliation, instead of only when reconciliation was complete. + +**Explanation:** +- The negentropy protocol involves multiple rounds of `NEG-MSG` exchanges +- In intermediate rounds, `haveIDs` and `needIDs` are partial results +- The final sets are only available when `reconcile()` returns `complete=true` +- Sending events prematurely could result in: + - Incomplete event sets being sent + - Wasted bandwidth on events that aren't actually needed + +**Fixed Code:** +```go +// Only send events when reconciliation is complete +if complete { + log.D.F("NEG-OPEN: reconciliation complete for %s, sending %d events", + subscriptionID, len(haveIDs)) + if len(haveIDs) > 0 { + if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil { + log.E.F("failed to send events for NEG-OPEN: %v", err) + } + } +} +``` + +## How strfry sync Command Works + +### Architecture + +The strfry `sync` command is a **one-shot CLI tool**, not a persistent daemon: + +```bash +# CLI invocation connects, syncs, disconnects +./strfry sync wss://relay.example.com --filter '{"kinds":[0,1]}' --dir both +``` + +**Key characteristics:** +1. Runs as a separate process, not part of the relay daemon +2. Opens a WebSocket connection, performs sync, closes connection +3. Uses `--dir` flag to control sync direction: + - `down`: Download events from remote relay (pull) + - `up`: Upload events to remote relay (push) + - `both`: Bidirectional sync (default) + - `none`: Only perform reconciliation, no event transfer + +### Protocol Flow (strfry sync as initiator) + +``` +┌─────────────┐ ┌─────────────┐ +│ strfry │ (initiator) │ relay │ (responder) +│ (sync) │ │ (orly) │ +└──────┬──────┘ └──────┬──────┘ + │ │ + │ 1. Build local storage │ + │ 2. neg.Start() → initialMsg │ + │ │ + ├────── NEG-OPEN ───────────────►│ + │ subID, filter, initialMsg │ + │ │ + │ │ 3. Build local storage + │ │ 4. neg.Reconcile(initialMsg) + │ │ → response, haveIDs, needIDs + │ │ + │◄───── NEG-MSG ─────────────────┤ + │ response (hex) │ + │ │ + │ 5. neg.Reconcile(response) │ + │ → nextMsg, haveIDs, needIDs │ + │ │ + │ (repeat NEG-MSG until complete) │ + │ │ + ├────── NEG-CLOSE ──────────────►│ + │ │ + │ 6. Exchange events: │ + │ - Send haveIDs as EVENT │ + │ - Receive needIDs as EVENT │ + │ - Send OK responses │ + │ │ + └────── disconnect ──────────────►│ +``` + +### Event Exchange After Reconciliation + +After the negentropy reconciliation completes: + +**If `--dir up` or `--dir both`:** +``` +strfry → EVENT → orly (for each ID in strfry's haveIDs) +orly → OK → strfry +``` + +**If `--dir down` or `--dir both`:** +``` +strfry → REQ (ids=needIDs) → orly +orly → EVENT → strfry (for each requested ID) +orly → EOSE → strfry +strfry → CLOSE → orly +``` + +## Protocol Compatibility + +### What ORLY Now Does Correctly + +1. **As Responder (receiving NEG-OPEN):** + - ✅ Builds local storage from filter + - ✅ Creates Negentropy instance (responder mode) + - ✅ Calls `Reconcile()` with client's initial message + - ✅ Returns NEG-MSG response + - ✅ Accumulates haveIDs/needIDs across rounds + - ✅ Sends events for haveIDs only when complete + +2. **As Initiator (when syncing to peers):** + - ✅ Calls `Start()` to generate initial message + - ✅ Sends NEG-OPEN with initial message + - ✅ Processes NEG-MSG responses + - ✅ Handles received EVENT messages + +### Message Formats + +**NEG-OPEN (client → relay):** +```json +["NEG-OPEN", "sub123", {"kinds":[0,1]}, "a1b2c3..."] +``` + +**NEG-MSG (bidirectional):** +```json +["NEG-MSG", "sub123", "d4e5f6..."] +``` + +**NEG-CLOSE (client → relay):** +```json +["NEG-CLOSE", "sub123"] +``` + +**NEG-ERR (relay → client):** +```json +["NEG-ERR", "sub123", "blocked: too many events"] +``` + +## Testing with strfry + +### ORLY as Sync Target (strfry pulls from ORLY) + +```bash +# On machine with strfry +./strfry sync wss://your-orly-relay.com --filter '{"kinds": [0, 1]}' --dir down +``` + +Expected flow: +1. strfry builds local storage +2. strfry sends NEG-OPEN with initial message +3. ORLY responds with NEG-MSG +4. Multiple NEG-MSG rounds until complete +5. strfry sends NEG-CLOSE +6. strfry sends REQ for events it needs +7. ORLY responds with EVENT messages +8. Connection closes + +### ORLY as Sync Source (strfry pushes to ORLY) + +```bash +# On machine with strfry +./strfry sync wss://your-orly-relay.com --filter '{"kinds": [0, 1]}' --dir up +``` + +Expected flow: +1. strfry builds local storage +2. strfry sends NEG-OPEN with initial message +3. ORLY responds with NEG-MSG +4. Multiple NEG-MSG rounds until complete +5. strfry sends NEG-CLOSE +6. strfry sends EVENT messages for events ORLY needs +7. ORLY responds with OK messages +8. Connection closes + +### Bidirectional Sync + +```bash +./strfry sync wss://your-orly-relay.com --filter '{"kinds": [0, 1]}' --dir both +``` + +This combines both flows - events flow in both directions based on what each side is missing. + +## Files Modified + +1. **`pkg/sync/negentropy/server/service.go`** + - Fixed role reversal in HandleNegOpen + - Removed incorrect `neg.Start()` call for responder role + - Added proper error handling for missing initial message + +2. **`app/handle-negentropy.go`** + - Fixed premature event transmission + - Events for haveIDs now only sent when `complete=true` + - Improved logging for better debugging + +## Verification + +To verify the fix works correctly: + +```bash +# 1. Start ORLY relay with negentropy enabled +export ORLY_NEGENTROPY_ENABLED=true +./orly + +# 2. From another machine with strfry, test sync +./strfry sync wss://your-orly-relay.com --filter '{"kinds": [1]}' --dir both + +# 3. Check ORLY logs for: +# - "NEG-OPEN: built storage with N events" +# - "NEG-OPEN: reconcile complete=true/false" +# - "NEG-OPEN: reconciliation complete for X, sending Y events" +``` + +## References + +- [NIP-77 Specification](https://github.com/nostr-protocol/nips/blob/master/77.md) +- [strfry sync command](https://github.com/hoytech/strfry/blob/master/src/apps/mesh/cmd_sync.cpp) +- [strfry negentropy protocol docs](https://github.com/hoytech/strfry/blob/master/docs/negentropy.md) +- [Negentropy library](https://github.com/hoytech/negentropy) diff --git a/docs/SYNC_CLIENT_MODE.md b/docs/SYNC_CLIENT_MODE.md new file mode 100644 index 0000000..5a340b2 --- /dev/null +++ b/docs/SYNC_CLIENT_MODE.md @@ -0,0 +1,247 @@ +# ORLY Sync Client Mode + +## Overview + +ORLY now supports two modes for the `orly sync` command, similar to how strfry's sync command works but with the added flexibility of gRPC-based IPC: + +1. **Direct Mode** (like strfry): Opens the database directly +2. **Client Mode** (unique to ORLY): Connects to a running `orly-sync-negentropy` service via gRPC + +## Why Client Mode? + +### The Problem with Direct Database Access + +Both strfry and ORLY's direct mode require: +- Filesystem access to the database files +- The database to support multiple readers (or reader + writer) +- Running on the same machine as the database + +### The gRPC IPC Solution + +ORLY's IPC architecture allows the sync command to: +- Connect to a running service over the network +- No filesystem access required +- Sync from a remote machine +- Multiple clients can sync simultaneously using the same relay's database + +## Architecture Comparison + +### strfry sync (Direct Mode Only) +``` +┌─────────────────┐ ┌─────────────────┐ +│ strfry sync │────────▶│ LMDB (file) │ +│ (CLI tool) │ read │ │ +└────────┬────────┘ └─────────────────┘ + │ + │ WebSocket + ▼ +┌─────────────────┐ +│ Remote Relay │ +└─────────────────┘ +``` + +### ORLY sync (Direct Mode - same as strfry) +``` +┌─────────────────┐ ┌─────────────────┐ +│ orly sync │────────▶│ Badger/Neo4j │ +│ (CLI tool) │ read │ (database) │ +└────────┬────────┘ └─────────────────┘ + │ + │ WebSocket + ▼ +┌─────────────────┐ +│ Remote Relay │ +└─────────────────┘ +``` + +### ORLY sync (Client Mode - gRPC IPC) +``` +┌─────────────────┐ ┌─────────────────────────────┐ +│ orly sync │────────▶│ orly-sync-negentropy │ +│ (CLI tool) │ gRPC │ (gRPC service) │ +│ │ │ │ +│ No DB access │ │ • Manages negentropy │ +│ needed! │ │ • Opens database │ +└─────────────────┘ │ • Handles NIP-77 protocol │ + └──────────────┬──────────────┘ + │ + ┌──────────────┴──────────────┐ + │ │ + ┌────────▼────────┐ ┌─────────▼─────────┐ + │ Badger/Neo4j │ │ Remote Relay │ + │ (database) │ │ (WebSocket) │ + └─────────────────┘ └───────────────────┘ +``` + +## Usage + +### Direct Mode (strfry-style) + +```bash +# Sync using local database (must have filesystem access) +orly sync wss://relay.example.com --filter '{"kinds": [0, 3, 1984]}' --dir both + +# Options: +# --data-dir=PATH Path to database (default: ~/.local/share/ORLY) +# --filter=JSON Nostr filter to limit sync scope +# --dir=DIR Direction: down, up, both (default: down) +``` + +### Client Mode (gRPC IPC) + +```bash +# Sync using a running ORLY service (no filesystem access needed!) +orly sync wss://relay.example.com --server 127.0.0.1:50064 --filter '{"kinds": [1]}' + +# Options: +# --server=ADDR gRPC address of orly-sync-negentropy service +# --filter=JSON Nostr filter to limit sync scope +# --dir=DIR Direction: down, up, both (default: down) +``` + +## Setting Up the Server + +### Option 1: Using orly-launcher (Recommended) + +The launcher manages all services automatically: + +```bash +# Enable negentropy service +export ORLY_LAUNCHER_SYNC_NEGENTROPY_ENABLED=true +export ORLY_LAUNCHER_SYNC_NEGENTROPY_LISTEN=127.0.0.1:50064 + +# Run launcher +./orly-launcher +``` + +### Option 2: Standalone Service + +Run the sync service manually: + +```bash +# Start database first +ORLY_DB_LISTEN=127.0.0.1:50051 ./orly-db-badger & + +# Start sync service +ORLY_LAUNCHER_SYNC_NEGENTROPY_LISTEN=127.0.0.1:50064 \ +ORLY_DB_TYPE=grpc \ +ORLY_GRPC_SERVER=127.0.0.1:50051 \ +./orly-sync-negentropy +``` + +## Real-World Scenarios + +### Scenario 1: Sync from Your Laptop to Production Relay + +**Problem:** Your production ORLY relay runs on a server. You want to sync events from another relay using your laptop, but the database is on the server. + +**Solution:** +```bash +# On server: ensure gRPC is accessible (bind to 0.0.0.0 or use SSH tunnel) +export ORLY_LAUNCHER_SYNC_NEGENTROPY_LISTEN=0.0.0.0:50064 + +# On laptop: sync via gRPC +orly sync wss://remote-relay.com \ + --server your-server.com:50064 \ + --filter '{"kinds": [0, 1, 3]}' \ + --dir both +``` + +### Scenario 2: Multiple Admins Sharing a Relay + +**Problem:** Multiple administrators need to sync content to the same relay, but you don't want to give them all filesystem access. + +**Solution:** +- Run `orly-sync-negentropy` as a service +- Each admin uses `orly sync --server` to sync +- No direct database access required + +### Scenario 3: Automated Sync Scripts + +**Problem:** You want to run sync scripts from a CI/CD pipeline or cron job. + +**Solution:** +```bash +# Cron job that syncs daily +0 2 * * * /usr/local/bin/orly sync wss://backup-relay.com \ + --server 127.0.0.1:50064 \ + --filter '{"kinds": [0, 1, 3, 1984]}' \ + --dir both \ + >> /var/log/orly-sync.log 2>&1 +``` + +## Comparison with strfry + +| Feature | strfry sync | ORLY sync (Direct) | ORLY sync (Client) | +|---------|-------------|-------------------|-------------------| +| Direct DB access | Yes | Yes | No | +| Network IPC | No | No | Yes (gRPC) | +| Remote sync | No | No | Yes | +| Multiple readers | LMDB | Badger/Neo4j | Yes (via service) | +| Requires relay running | No | No | Yes | + +## Technical Details + +### gRPC API + +The client mode uses the `NegentropyService.SyncWithPeer` RPC: + +```protobuf +rpc SyncWithPeer(SyncPeerRequest) returns (stream SyncProgress); + +message SyncPeerRequest { + string peer_url = 1; + Filter filter = 2; + int64 since = 3; +} + +message SyncProgress { + string peer_url = 1; + int32 round = 2; + int64 have_count = 3; + int64 need_count = 4; + int64 fetched_count = 5; + int64 sent_count = 6; + bool complete = 7; + string error = 8; +} +``` + +### Security Considerations + +When exposing the gRPC service over the network: + +1. **Bind to localhost only** (default) for single-machine use +2. **Use SSH tunnels** for remote access: + ```bash + ssh -L 50064:localhost:50064 user@relay-server + orly sync wss://... --server 127.0.0.1:50064 + ``` +3. **Use TLS/mTLS** for production gRPC (future enhancement) +4. **Firewall rules** to restrict access to trusted IPs + +## Troubleshooting + +### "Failed to connect to sync service" + +- Verify the service is running: `ps aux | grep orly-sync-negentropy` +- Check the address matches: `netstat -tlnp | grep 50064` +- Check firewall rules + +### "Sync service not ready" + +- The service might still be initializing +- Check that the database service is running +- Look at service logs for errors + +### "Permission denied" (Direct Mode) + +- Ensure you have read access to the database directory +- Check file ownership: `ls -la ~/.local/share/ORLY/` + +## Future Enhancements + +1. **TLS/mTLS support** for secure remote connections +2. **Authentication** for client connections +3. **Rate limiting** to prevent abuse +4. **Audit logging** for compliance diff --git a/pkg/sync/negentropy/server/service.go b/pkg/sync/negentropy/server/service.go index f82a309..a12bfb9 100644 --- a/pkg/sync/negentropy/server/service.go +++ b/pkg/sync/negentropy/server/service.go @@ -74,31 +74,27 @@ func (s *Service) HandleNegOpen(ctx context.Context, req *negentropyv1.NegOpenRe // Store in session for later use session.SetNegentropy(neg, storage) - // If we have an initial message from client, process it - var respMsg []byte - var complete bool - if len(req.InitialMessage) > 0 { - respMsg, complete, err = neg.Reconcile(req.InitialMessage) - if err != nil { - log.E.F("NEG-OPEN: reconcile failed: %v", err) - return &negentropyv1.NegOpenResponse{ - Message: nil, - Error: fmt.Sprintf("reconcile failed: %v", err), - }, nil - } - log.I.F("NEG-OPEN: reconcile complete=%v, response len=%d", complete, len(respMsg)) - } else { - // No initial message, start as server (initiator) - respMsg, err = neg.Start() - if err != nil { - log.E.F("NEG-OPEN: failed to start: %v", err) - return &negentropyv1.NegOpenResponse{ - Message: nil, - Error: fmt.Sprintf("failed to start: %v", err), - }, nil - } - log.D.F("NEG-OPEN: started negentropy, initial msg len=%d", len(respMsg)) + // The client (initiator) MUST provide an initial message in NEG-OPEN. + // Per NIP-77: The relay acts as responder and calls reconcile() with + // the client's initial message. + if len(req.InitialMessage) == 0 { + log.E.F("NEG-OPEN: missing initial message from client") + return &negentropyv1.NegOpenResponse{ + Message: nil, + Error: "blocked: NEG-OPEN requires initialMessage from initiator", + }, nil + } + + // Process the client's initial message + respMsg, complete, err := neg.Reconcile(req.InitialMessage) + if err != nil { + log.E.F("NEG-OPEN: reconcile failed: %v", err) + return &negentropyv1.NegOpenResponse{ + Message: nil, + Error: fmt.Sprintf("reconcile failed: %v", err), + }, nil } + log.I.F("NEG-OPEN: reconcile complete=%v, response len=%d", complete, len(respMsg)) // Collect IDs we have that client needs (to send as events) haveIDs := neg.CollectHaves()