Browse Source

Fix NIP-77 negentropy protocol implementation and add client mode sync

NIP-77 Protocol Fixes:
- Fix role reversal in HandleNegOpen: relay now correctly requires initial
  message from client and calls Reconcile() instead of Start()
- Fix premature event transmission: haveIDs events now only sent when
  reconciliation completes (complete=true)
- Add proper error handling for missing initial message in NEG-OPEN

Sync Client Mode (gRPC IPC):
- Add --server flag to 'orly sync' for connecting to running service
- Enable one-shot sync without direct database access
- Support remote sync operations via gRPC
- Add filterToProto helper for gRPC communication

Documentation:
- Add NIP77_ANALYSIS_AND_FIX.md with detailed protocol analysis
- Add SYNC_CLIENT_MODE.md with usage examples and architecture diagrams

Bug Fix:
- Fix nil pointer panic in handle-req.go when subscriptions map is uninitialized
main
woikos 4 months ago
parent
commit
a5b7e75395
No known key found for this signature in database
  1. 121
      app/handle-negentropy.go
  2. 3
      app/handle-req.go
  3. 204
      cmd/orly/sync/sync.go
  4. 263
      docs/NIP77_ANALYSIS_AND_FIX.md
  5. 247
      docs/SYNC_CLIENT_MODE.md
  6. 44
      pkg/sync/negentropy/server/service.go

121
app/handle-negentropy.go

@ -12,7 +12,9 @@ import ( @@ -12,7 +12,9 @@ import (
"git.mleku.dev/mleku/nostr/encoders/envelopes/eventenvelope"
"git.mleku.dev/mleku/nostr/encoders/filter"
"git.mleku.dev/mleku/nostr/encoders/kind"
"git.mleku.dev/mleku/nostr/encoders/tag"
"git.mleku.dev/mleku/nostr/encoders/timestamp"
negentropyiface "next.orly.dev/pkg/interfaces/negentropy"
commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
)
@ -84,9 +86,10 @@ func (l *Listener) HandleNegOpen(msg []byte) error { @@ -84,9 +86,10 @@ func (l *Listener) HandleNegOpen(msg []byte) error {
return l.sendNegErr("", fmt.Sprintf("invalid subscription_id: %v", err))
}
// Extract filter
var f filter.F
if err := json.Unmarshal(parts[2], &f); err != nil {
// Extract filter - use custom parsing because filter.F's kinds field
// doesn't support standard JSON array unmarshaling
f, err := parseNegentropyFilter(parts[2])
if err != nil {
return l.sendNegErr(subscriptionID, fmt.Sprintf("invalid filter: %v", err))
}
@ -105,7 +108,7 @@ func (l *Listener) HandleNegOpen(msg []byte) error { @@ -105,7 +108,7 @@ func (l *Listener) HandleNegOpen(msg []byte) error {
}
// Convert filter to proto format
protoFilter := filterToProto(&f)
protoFilter := filterToProto(f)
// Call gRPC service
ctx := context.Background()
@ -127,7 +130,7 @@ func (l *Listener) HandleNegOpen(msg []byte) error { @@ -127,7 +130,7 @@ func (l *Listener) HandleNegOpen(msg []byte) error {
// Log need_ids (events client should send us)
if len(needIDs) > 0 {
log.D.F("NEG-OPEN: client needs to send %d events", len(needIDs))
log.D.F("NEG-OPEN: relay needs %d events from client", len(needIDs))
}
// Send NEG-MSG response FIRST (before events)
@ -135,15 +138,14 @@ func (l *Listener) HandleNegOpen(msg []byte) error { @@ -135,15 +138,14 @@ func (l *Listener) HandleNegOpen(msg []byte) error {
return err
}
// If reconciliation is complete, log it
// If reconciliation is complete, send events we have that client needs.
// Per NIP-77: The haves/needs are only final when reconcile returns complete=true.
if complete {
log.D.F("NEG-OPEN: reconciliation complete for %s", subscriptionID)
}
// AFTER sending response, send events for IDs we have that client needs
if len(haveIDs) > 0 {
if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil {
log.E.F("failed to send events for NEG-OPEN: %v", err)
log.D.F("NEG-OPEN: reconciliation complete for %s, sending %d events", subscriptionID, len(haveIDs))
if len(haveIDs) > 0 {
if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil {
log.E.F("failed to send events for NEG-OPEN: %v", err)
}
}
}
@ -204,7 +206,7 @@ func (l *Listener) HandleNegMsg(msg []byte) error { @@ -204,7 +206,7 @@ func (l *Listener) HandleNegMsg(msg []byte) error {
// Log need_ids (events client should send us)
if len(needIDs) > 0 {
log.D.F("NEG-MSG: client needs to send %d events", len(needIDs))
log.D.F("NEG-MSG: relay needs %d events from client", len(needIDs))
}
// Send NEG-MSG response FIRST (before events)
@ -212,15 +214,14 @@ func (l *Listener) HandleNegMsg(msg []byte) error { @@ -212,15 +214,14 @@ func (l *Listener) HandleNegMsg(msg []byte) error {
return err
}
// If reconciliation is complete, log it
// If reconciliation is complete, send events we have that client needs.
// Per NIP-77: The haves/needs are only final when reconcile returns complete=true.
if complete {
log.D.F("NEG-MSG: reconciliation complete for %s", subscriptionID)
}
// AFTER sending response, send events for IDs we have that client needs
if len(haveIDs) > 0 {
if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil {
log.E.F("failed to send events for NEG-MSG: %v", err)
log.D.F("NEG-MSG: reconciliation complete for %s, sending %d events", subscriptionID, len(haveIDs))
if len(haveIDs) > 0 {
if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil {
log.E.F("failed to send events for NEG-MSG: %v", err)
}
}
}
@ -387,6 +388,82 @@ func filterToProto(f *filter.F) *commonv1.Filter { @@ -387,6 +388,82 @@ func filterToProto(f *filter.F) *commonv1.Filter {
return pf
}
// parseNegentropyFilter parses a NIP-01 filter from JSON.
// This is needed because filter.F uses kind.S which doesn't implement
// json.Unmarshaler, so we parse manually and construct the filter.
func parseNegentropyFilter(data []byte) (*filter.F, error) {
// Parse into a generic map first
var raw map[string]json.RawMessage
if err := json.Unmarshal(data, &raw); err != nil {
return nil, err
}
f := filter.New()
// Parse kinds array
if kindsRaw, ok := raw["kinds"]; ok {
var kinds []int
if err := json.Unmarshal(kindsRaw, &kinds); err != nil {
return nil, fmt.Errorf("invalid kinds: %v", err)
}
f.Kinds = kind.FromIntSlice(kinds)
}
// Parse authors array (hex pubkeys)
if authorsRaw, ok := raw["authors"]; ok {
var authors []string
if err := json.Unmarshal(authorsRaw, &authors); err != nil {
return nil, fmt.Errorf("invalid authors: %v", err)
}
f.Authors = tag.NewWithCap(len(authors))
for _, a := range authors {
if decoded, err := hex.DecodeString(a); err == nil {
f.Authors.T = append(f.Authors.T, decoded)
}
}
}
// Parse ids array (hex event IDs)
if idsRaw, ok := raw["ids"]; ok {
var ids []string
if err := json.Unmarshal(idsRaw, &ids); err != nil {
return nil, fmt.Errorf("invalid ids: %v", err)
}
f.Ids = tag.NewWithCap(len(ids))
for _, id := range ids {
if decoded, err := hex.DecodeString(id); err == nil {
f.Ids.T = append(f.Ids.T, decoded)
}
}
}
// Parse since timestamp
if sinceRaw, ok := raw["since"]; ok {
var since int64
if err := json.Unmarshal(sinceRaw, &since); err == nil {
f.Since = timestamp.FromUnix(since)
}
}
// Parse until timestamp
if untilRaw, ok := raw["until"]; ok {
var until int64
if err := json.Unmarshal(untilRaw, &until); err == nil {
f.Until = timestamp.FromUnix(until)
}
}
// Parse limit
if limitRaw, ok := raw["limit"]; ok {
var limit uint
if err := json.Unmarshal(limitRaw, &limit); err == nil {
f.Limit = &limit
}
}
return f, nil
}
// CloseAllNegentropySessions closes all negentropy sessions for a connection
// Called when a WebSocket connection is closed
func (l *Listener) CloseAllNegentropySessions() {

3
app/handle-req.go

@ -820,6 +820,9 @@ func (l *Listener) HandleReq(msg []byte) (err error) { @@ -820,6 +820,9 @@ func (l *Listener) HandleReq(msg []byte) (err error) {
// Track this subscription so we can cancel it on CLOSE or connection close
subID := string(env.Subscription)
l.subscriptionsMu.Lock()
if l.subscriptions == nil {
l.subscriptions = make(map[string]context.CancelFunc)
}
l.subscriptions[subID] = subCancel
l.subscriptionsMu.Unlock()

204
cmd/orly/sync/sync.go

@ -2,6 +2,13 @@ @@ -2,6 +2,13 @@
// Package sync implements the "orly sync" subcommand for sync service operations.
// Supports both one-shot CLI sync (like strfry sync) and running as a gRPC service.
//
// Two modes of operation:
// 1. Direct mode: Opens database directly (like strfry sync)
// 2. Client mode: Connects to running orly-sync-negentropy service via gRPC
//
// Client mode allows you to use a running ORLY relay's database without
// needing direct filesystem access to the database files.
package sync
import (
@ -22,6 +29,8 @@ import ( @@ -22,6 +29,8 @@ import (
"next.orly.dev/pkg/database"
pkgsync "next.orly.dev/pkg/sync"
"next.orly.dev/pkg/sync/negentropy"
negentropygrpc "next.orly.dev/pkg/sync/negentropy/grpc"
commonv1 "next.orly.dev/pkg/proto/orlysync/common/v1"
)
// SyncDirection specifies which direction to sync
@ -35,11 +44,14 @@ const ( @@ -35,11 +44,14 @@ const (
// CLISyncConfig holds configuration for one-shot CLI sync
type CLISyncConfig struct {
RelayURL string
Filter *filter.F
Direction SyncDirection
DataDir string
Verbose bool
RelayURL string
Filter *filter.F
Direction SyncDirection
DataDir string
Verbose bool
// Client mode: connect to running service instead of direct DB access
ServerAddress string // gRPC address of orly-sync-negentropy service
UseClientMode bool // If true, use gRPC client instead of direct DB
}
// Run executes the sync subcommand.
@ -52,6 +64,7 @@ func Run(args []string) { @@ -52,6 +64,7 @@ func Run(args []string) {
var direction string = "down"
var dataDir string
var verbose bool
var serverAddress string
// Parse arguments - look for either service mode (--driver) or CLI mode (relay URL)
positionalArgs := []string{}
@ -78,6 +91,11 @@ func Run(args []string) { @@ -78,6 +91,11 @@ func Run(args []string) {
} else if arg == "--data-dir" && i+1 < len(args) {
dataDir = args[i+1]
i++
} else if strings.HasPrefix(arg, "--server=") {
serverAddress = strings.TrimPrefix(arg, "--server=")
} else if arg == "--server" && i+1 < len(args) {
serverAddress = args[i+1]
i++
} else if arg == "--list-drivers" || arg == "-l" {
listDrivers = true
} else if arg == "--verbose" || arg == "-v" {
@ -111,13 +129,20 @@ func Run(args []string) { @@ -111,13 +129,20 @@ func Run(args []string) {
// Check if this is CLI sync mode (relay URL provided)
if len(positionalArgs) > 0 && (strings.HasPrefix(positionalArgs[0], "ws://") || strings.HasPrefix(positionalArgs[0], "wss://")) {
relayURL = positionalArgs[0]
runCLISync(&CLISyncConfig{
RelayURL: relayURL,
Filter: parseFilterJSON(filterJSON),
Direction: SyncDirection(direction),
DataDir: dataDir,
Verbose: verbose,
})
cfg := &CLISyncConfig{
RelayURL: relayURL,
Filter: parseFilterJSON(filterJSON),
Direction: SyncDirection(direction),
DataDir: dataDir,
Verbose: verbose,
ServerAddress: serverAddress,
UseClientMode: serverAddress != "",
}
if cfg.UseClientMode {
runClientModeSync(cfg)
} else {
runDirectModeSync(cfg)
}
return
}
@ -194,8 +219,9 @@ func parseFilterJSON(jsonStr string) *filter.F { @@ -194,8 +219,9 @@ func parseFilterJSON(jsonStr string) *filter.F {
return f
}
// runCLISync performs a one-shot negentropy sync with a remote relay
func runCLISync(cfg *CLISyncConfig) {
// runDirectModeSync performs a one-shot negentropy sync with a remote relay
// using direct database access (like strfry sync)
func runDirectModeSync(cfg *CLISyncConfig) {
if cfg.Verbose {
log.I.F("CLI sync starting with %s (direction: %s)", cfg.RelayURL, cfg.Direction)
}
@ -267,6 +293,135 @@ func runCLISync(cfg *CLISyncConfig) { @@ -267,6 +293,135 @@ func runCLISync(cfg *CLISyncConfig) {
}
}
// runClientModeSync performs a one-shot negentropy sync with a remote relay
// by connecting to a running orly-sync-negentropy service via gRPC.
// This allows you to sync using a relay's database without direct filesystem access.
func runClientModeSync(cfg *CLISyncConfig) {
if cfg.Verbose {
log.I.F("Client mode sync starting with %s via server %s (direction: %s)",
cfg.RelayURL, cfg.ServerAddress, cfg.Direction)
}
// Set up signal handling
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
fmt.Println("\nInterrupted, closing...")
cancel()
}()
// Connect to the sync service via gRPC
fmt.Printf("Connecting to sync service at %s...\n", cfg.ServerAddress)
client, err := negentropygrpc.New(ctx, &negentropygrpc.ClientConfig{
ServerAddress: cfg.ServerAddress,
ConnectTimeout: 30 * time.Second,
})
if err != nil {
fmt.Fprintf(os.Stderr, "error: failed to connect to sync service: %v\n", err)
os.Exit(1)
}
defer client.Close()
// Wait for the service to be ready
select {
case <-client.Ready():
fmt.Println("Connected to sync service")
case <-time.After(30 * time.Second):
fmt.Fprintf(os.Stderr, "error: sync service not ready\n")
os.Exit(1)
}
// Convert filter to proto format
var protoFilter *commonv1.Filter
if cfg.Filter != nil {
protoFilter = filterToProto(cfg.Filter)
}
// Start sync with the peer
startTime := time.Now()
fmt.Printf("Requesting sync with %s...\n", cfg.RelayURL)
progressCh, err := client.SyncWithPeer(ctx, cfg.RelayURL, protoFilter, 0)
if err != nil {
fmt.Fprintf(os.Stderr, "error: failed to start sync: %v\n", err)
os.Exit(1)
}
// Display progress updates
var lastProgress *negentropygrpc.SyncProgress
for progress := range progressCh {
lastProgress = progress
if progress.Error != "" {
fmt.Fprintf(os.Stderr, "Sync error: %s\n", progress.Error)
os.Exit(1)
}
if cfg.Verbose || progress.Complete {
fmt.Printf("Round %d: have=%d need=%d fetched=%d sent=%d\n",
progress.Round, progress.HaveCount, progress.NeedCount,
progress.FetchedCount, progress.SentCount)
}
}
elapsed := time.Since(startTime)
if lastProgress != nil && lastProgress.Complete {
fmt.Printf("Sync complete in %v\n", elapsed)
} else {
fmt.Printf("Sync finished in %v (may be incomplete)\n", elapsed)
}
}
// filterToProto converts a nostr filter to proto format
func filterToProto(f *filter.F) *commonv1.Filter {
if f == nil {
return nil
}
pf := &commonv1.Filter{}
// Convert kinds
if f.Kinds != nil && f.Kinds.Len() > 0 {
for _, k := range f.Kinds.ToUint16() {
pf.Kinds = append(pf.Kinds, uint32(k))
}
}
// Convert authors (binary to bytes)
if f.Authors != nil && f.Authors.Len() > 0 {
for _, author := range f.Authors.T {
pf.Authors = append(pf.Authors, author)
}
}
// Convert IDs (binary to bytes)
if f.Ids != nil && f.Ids.Len() > 0 {
for _, id := range f.Ids.T {
pf.Ids = append(pf.Ids, id)
}
}
// Convert timestamps
if f.Since != nil {
since := f.Since.V
pf.Since = &since
}
if f.Until != nil {
until := f.Until.V
pf.Until = &until
}
// Convert limit
if f.Limit != nil {
limit := uint32(*f.Limit)
pf.Limit = &limit
}
return pf
}
func runSyncService(driver string, args []string) {
log.I.F("Sync service with driver=%s not yet implemented via unified binary", driver)
log.I.F("Use the standalone binary: orly-sync-%s", driver)
@ -277,13 +432,15 @@ func printSyncHelp() { @@ -277,13 +432,15 @@ func printSyncHelp() {
fmt.Println(`orly sync - Sync operations (NIP-77 negentropy)
Usage:
orly sync <relay_url> [options] One-shot sync with relay (like strfry sync)
orly sync --driver=NAME [options] Run as sync service
orly sync <relay_url> [options] One-shot sync with relay
orly sync <relay_url> --server=ADDR Sync via running service
orly sync --driver=NAME [options] Run as sync service
One-shot sync options:
--filter=JSON Nostr filter JSON (e.g. '{"kinds": [0, 3, 1984]}')
--dir=DIRECTION Sync direction: down, up, both (default: down)
--data-dir=PATH Database directory (default: ~/.local/share/ORLY)
--server=ADDR Connect to running sync service (e.g. 127.0.0.1:50064)
--verbose, -v Verbose output
Service mode options:
@ -306,11 +463,22 @@ Environment variables: @@ -306,11 +463,22 @@ Environment variables:
ORLY_SYNC_TARGET_RELAYS Comma-separated target relay URLs
Examples:
# One-shot sync (like strfry sync)
# One-shot sync with local database (like strfry sync)
orly sync wss://relay.example.com --filter '{"kinds": [0, 3, 1984]}' --dir down
orly sync wss://wot.grapevine.network --filter '{"kinds": [3, 1984, 10000]}' --dir down
# Sync via running ORLY service (no direct DB access needed)
orly sync wss://relay.example.com --server 127.0.0.1:50064 --filter '{"kinds": [1]}' --dir both
# Service mode
orly sync --driver=negentropy Run negentropy sync service
orly sync --list-drivers List available drivers`)
orly sync --list-drivers List available drivers
Client mode (--server):
When using --server, the CLI connects to a running orly-sync-negentropy
service via gRPC instead of opening the database directly. This allows
you to sync from a remote machine without filesystem access to the DB.
The server address is typically 127.0.0.1:50064 (or as configured in
ORLY_LAUNCHER_SYNC_NEGENTROPY_LISTEN).`)
}

263
docs/NIP77_ANALYSIS_AND_FIX.md

@ -0,0 +1,263 @@ @@ -0,0 +1,263 @@
# NIP-77 Implementation Analysis and Fix
## Executive Summary
This document analyzes the NIP-77 negentropy implementation in ORLY and documents fixes made to ensure compatibility with strfry and other NIP-77 compliant implementations.
## Key Issues Fixed
### Issue 1: Role Reversal in NEG-OPEN Handling (CRITICAL)
**Location:** `pkg/sync/negentropy/server/service.go:HandleNegOpen()`
**Problem:** The relay was incorrectly calling `neg.Start()` when no initial message was provided, which violates the NIP-77 protocol specification.
**NIP-77 Specification:**
- The **client (initiator)** creates a Negentropy object, adds all events, seals it, and calls `initiate()` to create the initial message
- The **relay (responder)** creates a Negentropy object, adds all events, seals it, and calls `reconcile()` with the client's initial message
**Incorrect Code:**
```go
if len(req.InitialMessage) > 0 {
respMsg, complete, err = neg.Reconcile(req.InitialMessage)
} else {
// WRONG: Relay should never call Start() - only the initiator does this
respMsg, err = neg.Start()
}
```
**Fixed Code:**
```go
// The client (initiator) MUST provide an initial message in NEG-OPEN.
if len(req.InitialMessage) == 0 {
return &negentropyv1.NegOpenResponse{
Error: "blocked: NEG-OPEN requires initialMessage from initiator",
}, nil
}
// Process the client's initial message
respMsg, complete, err := neg.Reconcile(req.InitialMessage)
```
### Issue 2: Premature Event Transmission
**Location:** `app/handle-negentropy.go:HandleNegOpen()` and `HandleNegMsg()`
**Problem:** Events for `haveIDs` were being sent in every round of reconciliation, instead of only when reconciliation was complete.
**Explanation:**
- The negentropy protocol involves multiple rounds of `NEG-MSG` exchanges
- In intermediate rounds, `haveIDs` and `needIDs` are partial results
- The final sets are only available when `reconcile()` returns `complete=true`
- Sending events prematurely could result in:
- Incomplete event sets being sent
- Wasted bandwidth on events that aren't actually needed
**Fixed Code:**
```go
// Only send events when reconciliation is complete
if complete {
log.D.F("NEG-OPEN: reconciliation complete for %s, sending %d events",
subscriptionID, len(haveIDs))
if len(haveIDs) > 0 {
if err := l.sendEventsForIDs(subscriptionID, haveIDs); err != nil {
log.E.F("failed to send events for NEG-OPEN: %v", err)
}
}
}
```
## How strfry sync Command Works
### Architecture
The strfry `sync` command is a **one-shot CLI tool**, not a persistent daemon:
```bash
# CLI invocation connects, syncs, disconnects
./strfry sync wss://relay.example.com --filter '{"kinds":[0,1]}' --dir both
```
**Key characteristics:**
1. Runs as a separate process, not part of the relay daemon
2. Opens a WebSocket connection, performs sync, closes connection
3. Uses `--dir` flag to control sync direction:
- `down`: Download events from remote relay (pull)
- `up`: Upload events to remote relay (push)
- `both`: Bidirectional sync (default)
- `none`: Only perform reconciliation, no event transfer
### Protocol Flow (strfry sync as initiator)
```
┌─────────────┐ ┌─────────────┐
│ strfry │ (initiator) │ relay │ (responder)
│ (sync) │ │ (orly) │
└──────┬──────┘ └──────┬──────┘
│ │
│ 1. Build local storage │
│ 2. neg.Start() → initialMsg │
│ │
├────── NEG-OPEN ───────────────►│
│ subID, filter, initialMsg │
│ │
│ │ 3. Build local storage
│ │ 4. neg.Reconcile(initialMsg)
│ │ → response, haveIDs, needIDs
│ │
│◄───── NEG-MSG ─────────────────┤
│ response (hex) │
│ │
│ 5. neg.Reconcile(response) │
│ → nextMsg, haveIDs, needIDs │
│ │
│ (repeat NEG-MSG until complete) │
│ │
├────── NEG-CLOSE ──────────────►│
│ │
│ 6. Exchange events: │
│ - Send haveIDs as EVENT │
│ - Receive needIDs as EVENT │
│ - Send OK responses │
│ │
└────── disconnect ──────────────►│
```
### Event Exchange After Reconciliation
After the negentropy reconciliation completes:
**If `--dir up` or `--dir both`:**
```
strfry → EVENT → orly (for each ID in strfry's haveIDs)
orly → OK → strfry
```
**If `--dir down` or `--dir both`:**
```
strfry → REQ (ids=needIDs) → orly
orly → EVENT → strfry (for each requested ID)
orly → EOSE → strfry
strfry → CLOSE → orly
```
## Protocol Compatibility
### What ORLY Now Does Correctly
1. **As Responder (receiving NEG-OPEN):**
- ✅ Builds local storage from filter
- ✅ Creates Negentropy instance (responder mode)
- ✅ Calls `Reconcile()` with client's initial message
- ✅ Returns NEG-MSG response
- ✅ Accumulates haveIDs/needIDs across rounds
- ✅ Sends events for haveIDs only when complete
2. **As Initiator (when syncing to peers):**
- ✅ Calls `Start()` to generate initial message
- ✅ Sends NEG-OPEN with initial message
- ✅ Processes NEG-MSG responses
- ✅ Handles received EVENT messages
### Message Formats
**NEG-OPEN (client → relay):**
```json
["NEG-OPEN", "sub123", {"kinds":[0,1]}, "a1b2c3..."]
```
**NEG-MSG (bidirectional):**
```json
["NEG-MSG", "sub123", "d4e5f6..."]
```
**NEG-CLOSE (client → relay):**
```json
["NEG-CLOSE", "sub123"]
```
**NEG-ERR (relay → client):**
```json
["NEG-ERR", "sub123", "blocked: too many events"]
```
## Testing with strfry
### ORLY as Sync Target (strfry pulls from ORLY)
```bash
# On machine with strfry
./strfry sync wss://your-orly-relay.com --filter '{"kinds": [0, 1]}' --dir down
```
Expected flow:
1. strfry builds local storage
2. strfry sends NEG-OPEN with initial message
3. ORLY responds with NEG-MSG
4. Multiple NEG-MSG rounds until complete
5. strfry sends NEG-CLOSE
6. strfry sends REQ for events it needs
7. ORLY responds with EVENT messages
8. Connection closes
### ORLY as Sync Source (strfry pushes to ORLY)
```bash
# On machine with strfry
./strfry sync wss://your-orly-relay.com --filter '{"kinds": [0, 1]}' --dir up
```
Expected flow:
1. strfry builds local storage
2. strfry sends NEG-OPEN with initial message
3. ORLY responds with NEG-MSG
4. Multiple NEG-MSG rounds until complete
5. strfry sends NEG-CLOSE
6. strfry sends EVENT messages for events ORLY needs
7. ORLY responds with OK messages
8. Connection closes
### Bidirectional Sync
```bash
./strfry sync wss://your-orly-relay.com --filter '{"kinds": [0, 1]}' --dir both
```
This combines both flows - events flow in both directions based on what each side is missing.
## Files Modified
1. **`pkg/sync/negentropy/server/service.go`**
- Fixed role reversal in HandleNegOpen
- Removed incorrect `neg.Start()` call for responder role
- Added proper error handling for missing initial message
2. **`app/handle-negentropy.go`**
- Fixed premature event transmission
- Events for haveIDs now only sent when `complete=true`
- Improved logging for better debugging
## Verification
To verify the fix works correctly:
```bash
# 1. Start ORLY relay with negentropy enabled
export ORLY_NEGENTROPY_ENABLED=true
./orly
# 2. From another machine with strfry, test sync
./strfry sync wss://your-orly-relay.com --filter '{"kinds": [1]}' --dir both
# 3. Check ORLY logs for:
# - "NEG-OPEN: built storage with N events"
# - "NEG-OPEN: reconcile complete=true/false"
# - "NEG-OPEN: reconciliation complete for X, sending Y events"
```
## References
- [NIP-77 Specification](https://github.com/nostr-protocol/nips/blob/master/77.md)
- [strfry sync command](https://github.com/hoytech/strfry/blob/master/src/apps/mesh/cmd_sync.cpp)
- [strfry negentropy protocol docs](https://github.com/hoytech/strfry/blob/master/docs/negentropy.md)
- [Negentropy library](https://github.com/hoytech/negentropy)

247
docs/SYNC_CLIENT_MODE.md

@ -0,0 +1,247 @@ @@ -0,0 +1,247 @@
# ORLY Sync Client Mode
## Overview
ORLY now supports two modes for the `orly sync` command, similar to how strfry's sync command works but with the added flexibility of gRPC-based IPC:
1. **Direct Mode** (like strfry): Opens the database directly
2. **Client Mode** (unique to ORLY): Connects to a running `orly-sync-negentropy` service via gRPC
## Why Client Mode?
### The Problem with Direct Database Access
Both strfry and ORLY's direct mode require:
- Filesystem access to the database files
- The database to support multiple readers (or reader + writer)
- Running on the same machine as the database
### The gRPC IPC Solution
ORLY's IPC architecture allows the sync command to:
- Connect to a running service over the network
- No filesystem access required
- Sync from a remote machine
- Multiple clients can sync simultaneously using the same relay's database
## Architecture Comparison
### strfry sync (Direct Mode Only)
```
┌─────────────────┐ ┌─────────────────┐
│ strfry sync │────────▶│ LMDB (file) │
│ (CLI tool) │ read │ │
└────────┬────────┘ └─────────────────┘
│ WebSocket
┌─────────────────┐
│ Remote Relay │
└─────────────────┘
```
### ORLY sync (Direct Mode - same as strfry)
```
┌─────────────────┐ ┌─────────────────┐
│ orly sync │────────▶│ Badger/Neo4j │
│ (CLI tool) │ read │ (database) │
└────────┬────────┘ └─────────────────┘
│ WebSocket
┌─────────────────┐
│ Remote Relay │
└─────────────────┘
```
### ORLY sync (Client Mode - gRPC IPC)
```
┌─────────────────┐ ┌─────────────────────────────┐
│ orly sync │────────▶│ orly-sync-negentropy │
│ (CLI tool) │ gRPC │ (gRPC service) │
│ │ │ │
│ No DB access │ │ • Manages negentropy │
│ needed! │ │ • Opens database │
└─────────────────┘ │ • Handles NIP-77 protocol │
└──────────────┬──────────────┘
┌──────────────┴──────────────┐
│ │
┌────────▼────────┐ ┌─────────▼─────────┐
│ Badger/Neo4j │ │ Remote Relay │
│ (database) │ │ (WebSocket) │
└─────────────────┘ └───────────────────┘
```
## Usage
### Direct Mode (strfry-style)
```bash
# Sync using local database (must have filesystem access)
orly sync wss://relay.example.com --filter '{"kinds": [0, 3, 1984]}' --dir both
# Options:
# --data-dir=PATH Path to database (default: ~/.local/share/ORLY)
# --filter=JSON Nostr filter to limit sync scope
# --dir=DIR Direction: down, up, both (default: down)
```
### Client Mode (gRPC IPC)
```bash
# Sync using a running ORLY service (no filesystem access needed!)
orly sync wss://relay.example.com --server 127.0.0.1:50064 --filter '{"kinds": [1]}'
# Options:
# --server=ADDR gRPC address of orly-sync-negentropy service
# --filter=JSON Nostr filter to limit sync scope
# --dir=DIR Direction: down, up, both (default: down)
```
## Setting Up the Server
### Option 1: Using orly-launcher (Recommended)
The launcher manages all services automatically:
```bash
# Enable negentropy service
export ORLY_LAUNCHER_SYNC_NEGENTROPY_ENABLED=true
export ORLY_LAUNCHER_SYNC_NEGENTROPY_LISTEN=127.0.0.1:50064
# Run launcher
./orly-launcher
```
### Option 2: Standalone Service
Run the sync service manually:
```bash
# Start database first
ORLY_DB_LISTEN=127.0.0.1:50051 ./orly-db-badger &
# Start sync service
ORLY_LAUNCHER_SYNC_NEGENTROPY_LISTEN=127.0.0.1:50064 \
ORLY_DB_TYPE=grpc \
ORLY_GRPC_SERVER=127.0.0.1:50051 \
./orly-sync-negentropy
```
## Real-World Scenarios
### Scenario 1: Sync from Your Laptop to Production Relay
**Problem:** Your production ORLY relay runs on a server. You want to sync events from another relay using your laptop, but the database is on the server.
**Solution:**
```bash
# On server: ensure gRPC is accessible (bind to 0.0.0.0 or use SSH tunnel)
export ORLY_LAUNCHER_SYNC_NEGENTROPY_LISTEN=0.0.0.0:50064
# On laptop: sync via gRPC
orly sync wss://remote-relay.com \
--server your-server.com:50064 \
--filter '{"kinds": [0, 1, 3]}' \
--dir both
```
### Scenario 2: Multiple Admins Sharing a Relay
**Problem:** Multiple administrators need to sync content to the same relay, but you don't want to give them all filesystem access.
**Solution:**
- Run `orly-sync-negentropy` as a service
- Each admin uses `orly sync --server` to sync
- No direct database access required
### Scenario 3: Automated Sync Scripts
**Problem:** You want to run sync scripts from a CI/CD pipeline or cron job.
**Solution:**
```bash
# Cron job that syncs daily
0 2 * * * /usr/local/bin/orly sync wss://backup-relay.com \
--server 127.0.0.1:50064 \
--filter '{"kinds": [0, 1, 3, 1984]}' \
--dir both \
>> /var/log/orly-sync.log 2>&1
```
## Comparison with strfry
| Feature | strfry sync | ORLY sync (Direct) | ORLY sync (Client) |
|---------|-------------|-------------------|-------------------|
| Direct DB access | Yes | Yes | No |
| Network IPC | No | No | Yes (gRPC) |
| Remote sync | No | No | Yes |
| Multiple readers | LMDB | Badger/Neo4j | Yes (via service) |
| Requires relay running | No | No | Yes |
## Technical Details
### gRPC API
The client mode uses the `NegentropyService.SyncWithPeer` RPC:
```protobuf
rpc SyncWithPeer(SyncPeerRequest) returns (stream SyncProgress);
message SyncPeerRequest {
string peer_url = 1;
Filter filter = 2;
int64 since = 3;
}
message SyncProgress {
string peer_url = 1;
int32 round = 2;
int64 have_count = 3;
int64 need_count = 4;
int64 fetched_count = 5;
int64 sent_count = 6;
bool complete = 7;
string error = 8;
}
```
### Security Considerations
When exposing the gRPC service over the network:
1. **Bind to localhost only** (default) for single-machine use
2. **Use SSH tunnels** for remote access:
```bash
ssh -L 50064:localhost:50064 user@relay-server
orly sync wss://... --server 127.0.0.1:50064
```
3. **Use TLS/mTLS** for production gRPC (future enhancement)
4. **Firewall rules** to restrict access to trusted IPs
## Troubleshooting
### "Failed to connect to sync service"
- Verify the service is running: `ps aux | grep orly-sync-negentropy`
- Check the address matches: `netstat -tlnp | grep 50064`
- Check firewall rules
### "Sync service not ready"
- The service might still be initializing
- Check that the database service is running
- Look at service logs for errors
### "Permission denied" (Direct Mode)
- Ensure you have read access to the database directory
- Check file ownership: `ls -la ~/.local/share/ORLY/`
## Future Enhancements
1. **TLS/mTLS support** for secure remote connections
2. **Authentication** for client connections
3. **Rate limiting** to prevent abuse
4. **Audit logging** for compliance

44
pkg/sync/negentropy/server/service.go

@ -74,31 +74,27 @@ func (s *Service) HandleNegOpen(ctx context.Context, req *negentropyv1.NegOpenRe @@ -74,31 +74,27 @@ func (s *Service) HandleNegOpen(ctx context.Context, req *negentropyv1.NegOpenRe
// Store in session for later use
session.SetNegentropy(neg, storage)
// If we have an initial message from client, process it
var respMsg []byte
var complete bool
if len(req.InitialMessage) > 0 {
respMsg, complete, err = neg.Reconcile(req.InitialMessage)
if err != nil {
log.E.F("NEG-OPEN: reconcile failed: %v", err)
return &negentropyv1.NegOpenResponse{
Message: nil,
Error: fmt.Sprintf("reconcile failed: %v", err),
}, nil
}
log.I.F("NEG-OPEN: reconcile complete=%v, response len=%d", complete, len(respMsg))
} else {
// No initial message, start as server (initiator)
respMsg, err = neg.Start()
if err != nil {
log.E.F("NEG-OPEN: failed to start: %v", err)
return &negentropyv1.NegOpenResponse{
Message: nil,
Error: fmt.Sprintf("failed to start: %v", err),
}, nil
}
log.D.F("NEG-OPEN: started negentropy, initial msg len=%d", len(respMsg))
// The client (initiator) MUST provide an initial message in NEG-OPEN.
// Per NIP-77: The relay acts as responder and calls reconcile() with
// the client's initial message.
if len(req.InitialMessage) == 0 {
log.E.F("NEG-OPEN: missing initial message from client")
return &negentropyv1.NegOpenResponse{
Message: nil,
Error: "blocked: NEG-OPEN requires initialMessage from initiator",
}, nil
}
// Process the client's initial message
respMsg, complete, err := neg.Reconcile(req.InitialMessage)
if err != nil {
log.E.F("NEG-OPEN: reconcile failed: %v", err)
return &negentropyv1.NegOpenResponse{
Message: nil,
Error: fmt.Sprintf("reconcile failed: %v", err),
}, nil
}
log.I.F("NEG-OPEN: reconcile complete=%v, response len=%d", complete, len(respMsg))
// Collect IDs we have that client needs (to send as events)
haveIDs := neg.CollectHaves()

Loading…
Cancel
Save